diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 137b38dc7..4a03460e7 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -625,11 +625,21 @@ export class AgentActivity implements RecognitionHooks { return; } + // Refactored interruption word count check: + // - Always apply minInterruptionWords filtering when STT is available and minInterruptionWords > 0 + // - Apply check to all STT results: empty string, undefined, or any length + // - This ensures consistent behavior across all interruption scenarios if (this.stt && this.agentSession.options.minInterruptionWords > 0 && this.audioRecognition) { const text = this.audioRecognition.currentTranscript; - // TODO(shubhra): better word splitting for multi-language - if (text && splitWords(text, true).length < this.agentSession.options.minInterruptionWords) { + + // Normalize text: convert undefined/null to empty string for consistent word counting + const normalizedText = text ?? ''; + const wordCount = splitWords(normalizedText, true).length; + + // Only allow interruption if word count meets or exceeds minInterruptionWords + // This applies to all cases: empty strings, partial speech, and full speech + if (wordCount < this.agentSession.options.minInterruptionWords) { return; } } @@ -767,19 +777,30 @@ export class AgentActivity implements RecognitionHooks { return true; } + // Refactored interruption word count check for consistency with onVADInferenceDone: + // - Always apply minInterruptionWords filtering when STT is available and minInterruptionWords > 0 + // - Use consistent word splitting logic with splitWords (matching onVADInferenceDone pattern) if ( this.stt && this.turnDetection !== 'manual' && this._currentSpeech && this._currentSpeech.allowInterruptions && !this._currentSpeech.interrupted && - this.agentSession.options.minInterruptionWords > 0 && - info.newTranscript.split(' ').length < this.agentSession.options.minInterruptionWords + this.agentSession.options.minInterruptionWords > 0 ) { - // avoid interruption if the new_transcript is too short - this.cancelPreemptiveGeneration(); - this.logger.info('skipping user input, new_transcript is too short'); - return false; + const wordCount = splitWords(info.newTranscript, true).length; + if (wordCount < this.agentSession.options.minInterruptionWords) { + // avoid interruption if the new_transcript contains fewer words than minInterruptionWords + this.cancelPreemptiveGeneration(); + this.logger.info( + { + wordCount, + minInterruptionWords: this.agentSession.options.minInterruptionWords, + }, + 'skipping user input, word count below minimum interruption threshold', + ); + return false; + } } const oldTask = this._userTurnCompletedTask; diff --git a/agents/src/voice/interruption_detection.test.ts b/agents/src/voice/interruption_detection.test.ts new file mode 100644 index 000000000..63d3ec9a0 --- /dev/null +++ b/agents/src/voice/interruption_detection.test.ts @@ -0,0 +1,151 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Unit tests for interruption detection logic in AgentActivity. + * + * Tests the refactored minInterruptionWords check which ensures: + * - Consistent word count filtering across all speech scenarios + * - Proper handling of empty strings, undefined, and short speech + * - Interruptions allowed only when word count meets or exceeds minInterruptionWords threshold + */ +import { describe, expect, it } from 'vitest'; +import { splitWords } from '../tokenize/basic/word.js'; + +describe('Interruption Detection - Word Counting', () => { + describe('Word Splitting Behavior', () => { + it('should count empty string as 0 words', () => { + const text = ''; + const wordCount = splitWords(text, true).length; + expect(wordCount).toBe(0); + }); + + it('should count single word correctly', () => { + const text = 'hello'; + const wordCount = splitWords(text, true).length; + expect(wordCount).toBe(1); + }); + + it('should count two words correctly', () => { + const text = 'hello world'; + const wordCount = splitWords(text, true).length; + expect(wordCount).toBe(2); + }); + + it('should count multiple words correctly', () => { + const text = 'hello this is a full sentence'; + const wordCount = splitWords(text, true).length; + expect(wordCount).toBe(6); + }); + + it('should handle punctuation correctly', () => { + const text = 'hello, world!'; + const wordCount = splitWords(text, true).length; + expect(wordCount).toBe(2); + }); + + it('should handle multiple spaces between words', () => { + const text = 'hello world'; + const wordCount = splitWords(text, true).length; + expect(wordCount).toBe(2); + }); + + it('should count whitespace-only string as 0 words', () => { + const text = ' '; + const wordCount = splitWords(text, true).length; + expect(wordCount).toBe(0); + }); + + it('should handle leading and trailing whitespace', () => { + const text = ' hello world '; + const wordCount = splitWords(text, true).length; + expect(wordCount).toBe(2); + }); + }); + + describe('Integration: Full Interruption Check Logic', () => { + it('should block interruption for empty transcript with threshold 2', () => { + const text = ''; + const minInterruptionWords = 2; + + const normalizedText = text ?? ''; + const wordCount = splitWords(normalizedText, true).length; + const shouldBlock = wordCount < minInterruptionWords; + + expect(normalizedText).toBe(''); + expect(wordCount).toBe(0); + expect(shouldBlock).toBe(true); + }); + + it('should block interruption for undefined transcript with threshold 2', () => { + const text: string | undefined = undefined; + const minInterruptionWords = 2; + + const normalizedText = text ?? ''; + const wordCount = splitWords(normalizedText, true).length; + const shouldBlock = wordCount < minInterruptionWords; + + expect(normalizedText).toBe(''); + expect(wordCount).toBe(0); + expect(shouldBlock).toBe(true); + }); + + it('should block interruption for single word with threshold 2', () => { + const text = 'hello'; + const minInterruptionWords = 2; + + const normalizedText = text ?? ''; + const wordCount = splitWords(normalizedText, true).length; + const shouldBlock = wordCount < minInterruptionWords; + + expect(normalizedText).toBe('hello'); + expect(wordCount).toBe(1); + expect(shouldBlock).toBe(true); + }); + + it('should allow interruption when word count exactly meets threshold', () => { + const text = 'hello world'; + const minInterruptionWords = 2; + + const normalizedText = text ?? ''; + const wordCount = splitWords(normalizedText, true).length; + const shouldBlock = wordCount < minInterruptionWords; + + expect(normalizedText).toBe('hello world'); + expect(wordCount).toBe(2); + expect(shouldBlock).toBe(false); + }); + + it('should allow interruption when word count exceeds threshold', () => { + const text = 'hello this is a full sentence'; + const minInterruptionWords = 2; + + const normalizedText = text ?? ''; + const wordCount = splitWords(normalizedText, true).length; + const shouldBlock = wordCount < minInterruptionWords; + + expect(normalizedText).toBe('hello this is a full sentence'); + expect(wordCount).toBe(6); + expect(shouldBlock).toBe(false); + }); + + it('should apply consistent word counting logic in both methods', () => { + const transcripts = ['', 'hello', 'hello world', 'this is a longer sentence']; + const threshold = 2; + + transcripts.forEach((transcript) => { + const text1 = transcript; + const normalizedText1 = text1 ?? ''; + const wordCount1 = splitWords(normalizedText1, true).length; + const shouldBlock1 = wordCount1 < threshold; + + const wordCount2 = splitWords(transcript, true).length; + const shouldBlock2 = wordCount2 < threshold; + + expect(wordCount1).toBe(wordCount2); + expect(shouldBlock1).toBe(shouldBlock2); + }); + }); + }); +}); diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 31793f981..34e905b7f 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -1,6 +1,16 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 + +/** + * REFACTORED: Persistent WebSocket Connection for ElevenLabs TTS + * + * Key improvements: + * - Single persistent WebSocket per TTS instance (multi-stream API) + * - Multiple TTS requests multiplexed via context IDs + * - Efficient send/recv loops with proper lifecycle management + * - Graceful connection draining when connection is replaced + */ import { AsyncIterableQueue, AudioByteStream, @@ -10,11 +20,11 @@ import { tts, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; -import { URL } from 'node:url'; import { type RawData, WebSocket } from 'ws'; import type { TTSEncoding, TTSModels } from './models.js'; const DEFAULT_INACTIVITY_TIMEOUT = 300; +const AUTHORIZATION_HEADER = 'xi-api-key'; type Voice = { id: string; @@ -43,7 +53,6 @@ const DEFAULT_VOICE: Voice = { }; const API_BASE_URL_V1 = 'https://api.elevenlabs.io/v1/'; -const AUTHORIZATION_HEADER = 'xi-api-key'; export interface TTSOptions { apiKey?: string; @@ -73,8 +82,378 @@ const defaultTTSOptions: TTSOptions = { syncAlignment: true, }; +// ============================================================================ +// WebSocket Connection Manager - Manages persistent connection with multi-stream support +// ============================================================================ + +interface StreamContext { + contextId: string; + eos: boolean; + audioBuffer: Int8Array[]; +} + +interface SynthesizeContent { + type: 'synthesize'; + contextId: string; + text: string; + flush: boolean; +} + +interface CloseContext { + type: 'close'; + contextId: string; +} + +type QueueMessage = SynthesizeContent | CloseContext; + +/** + * Manages a single persistent WebSocket connection for multi-stream TTS. + * Allows multiple synthesize requests to share one connection via context IDs. + */ +class WebSocketManager { + private ws: WebSocket | null = null; + private opts: TTSOptions; + private logger = log(); + private inputQueue = new AsyncIterableQueue(); + private contextData = new Map(); + private activeContexts = new Set(); + private sendTask: Promise | null = null; + private recvTask: Promise | null = null; + private closed = false; + private isCurrent = true; + + constructor(opts: TTSOptions) { + this.opts = opts; + } + + async connect(): Promise { + if (this.ws || this.closed) { + return; + } + + const url = this.buildMultiStreamUrl(); + const headers = { + [AUTHORIZATION_HEADER]: this.opts.apiKey!, + }; + + this.ws = new WebSocket(url, { headers }); + + // Wait for connection to open + await new Promise((resolve, reject) => { + if (!this.ws) { + reject(new Error('WebSocket not initialized')); + return; + } + + const ws = this.ws; + let resolved = false; + + const openHandler = () => { + resolved = true; + ws.removeListener('open', openHandler); + ws.removeListener('error', errorHandler); + resolve(); + }; + + const errorHandler = (error: Error) => { + if (!resolved) { + ws.removeListener('open', openHandler); + reject(new Error(`WebSocket connection failed: ${error.message}`)); + } + }; + + ws.on('open', openHandler); + ws.on('error', errorHandler); + }); + + // Start send and recv loops + this.sendTask = this.sendLoop(); + this.recvTask = this.recvLoop(); + } + + registerContext(contextId: string): void { + if (!this.contextData.has(contextId)) { + this.contextData.set(contextId, { + contextId, + eos: false, + audioBuffer: [], + }); + } + } + + sendContent(contextId: string, text: string, flush: boolean = false): void { + if (this.closed || !this.ws || this.ws.readyState !== 1) { + throw new Error('WebSocket connection is closed'); + } + + this.inputQueue.put({ + type: 'synthesize', + contextId, + text, + flush, + }); + } + + closeContext(contextId: string): void { + if (this.closed || !this.ws || this.ws.readyState !== 1) { + throw new Error('WebSocket connection is closed'); + } + + this.inputQueue.put({ + type: 'close', + contextId, + }); + } + + getContextAudio(contextId: string): Int8Array[] | null { + return this.contextData.get(contextId)?.audioBuffer ?? null; + } + + isContextEOS(contextId: string): boolean { + return this.contextData.get(contextId)?.eos ?? false; + } + + markNonCurrent(): void { + this.isCurrent = false; + } + + get isClosed(): boolean { + return this.closed; + } + + async close(): Promise { + if (this.closed) { + return; + } + + this.closed = true; + this.inputQueue.close(); + + this.contextData.clear(); + this.activeContexts.clear(); + + if (this.ws) { + this.ws.close(); + this.ws = null; + } + + if (this.sendTask) { + try { + await this.sendTask; + } catch { + // Expected when queue closes + } + } + + if (this.recvTask) { + try { + await this.recvTask; + } catch { + // Expected when connection closes + } + } + } + + private buildMultiStreamUrl(): string { + const baseURL = this.opts.baseURL + .replace('https://', 'wss://') + .replace('http://', 'ws://') + .replace(/\/$/, ''); + + const voiceId = this.opts.voice.id; + let urlStr = `${baseURL}/text-to-speech/${voiceId}/multi-stream-input?`; + + const params: string[] = []; + params.push(`model_id=${this.opts.modelID}`); + params.push(`output_format=${this.opts.encoding}`); + params.push(`enable_ssml_parsing=${this.opts.enableSsmlParsing}`); + params.push(`sync_alignment=${this.opts.syncAlignment}`); + params.push(`inactivity_timeout=${this.opts.inactivityTimeout}`); + + if (this.opts.streamingLatency !== undefined) { + params.push(`optimize_streaming_latency=${this.opts.streamingLatency}`); + } + + if (this.opts.autoMode !== undefined) { + params.push(`auto_mode=${this.opts.autoMode}`); + } + + if (this.opts.languageCode) { + params.push(`language_code=${this.opts.languageCode}`); + } + + urlStr += params.join('&'); + return urlStr; + } + + private async sendLoop(): Promise { + try { + for await (const msg of this.inputQueue) { + if (!this.ws || this.ws.readyState !== 1) { + break; + } + + if (msg.type === 'synthesize') { + const isNewContext = !this.activeContexts.has(msg.contextId); + + // If not current and new context, ignore (connection is draining) + if (!this.isCurrent && isNewContext) { + continue; + } + + if (isNewContext) { + const voiceSettings = this.opts.voice.settings || {}; + const initPkt = { + text: ' ', + voice_settings: voiceSettings, + context_id: msg.contextId, + ...(this.opts.chunkLengthSchedule && { + generation_config: { + chunk_length_schedule: this.opts.chunkLengthSchedule, + }, + }), + }; + + this.ws.send(JSON.stringify(initPkt)); + this.activeContexts.add(msg.contextId); + } + + const textPkt = { + text: msg.text + ' ', + context_id: msg.contextId, + }; + + this.ws.send(JSON.stringify(textPkt)); + + if (msg.flush) { + const flushPkt = { + text: '', + context_id: msg.contextId, + }; + this.ws.send(JSON.stringify(flushPkt)); + } + } else if (msg.type === 'close') { + if (this.activeContexts.has(msg.contextId)) { + const closePkt = { + context_id: msg.contextId, + close_context: true, + }; + this.ws.send(JSON.stringify(closePkt)); + this.activeContexts.delete(msg.contextId); + } + } + } + } catch (error) { + this.logger.error({ error }, 'Error in send loop'); + } finally { + if (!this.closed) { + await this.close(); + } + } + } + + private async recvLoop(): Promise { + try { + while (!this.closed && this.ws && this.ws.readyState === 1) { + const msg = await new Promise((resolve, reject) => { + if (!this.ws) { + reject(new Error('WebSocket not available')); + return; + } + + const ws = this.ws; + let resolved = false; + + const messageHandler = (data: RawData) => { + if (!resolved) { + resolved = true; + ws.removeListener('message', messageHandler); + ws.removeListener('close', closeHandler); + ws.removeListener('error', errorHandler); + resolve(data); + } + }; + + const closeHandler = () => { + if (!resolved) { + resolved = true; + ws.removeListener('message', messageHandler); + ws.removeListener('error', errorHandler); + reject(new Error('WebSocket closed')); + } + }; + + const errorHandler = (error: Error) => { + if (!resolved) { + resolved = true; + ws.removeListener('message', messageHandler); + ws.removeListener('close', closeHandler); + reject(error); + } + }; + + ws.on('message', messageHandler); + ws.on('close', closeHandler); + ws.on('error', errorHandler); + }); + + try { + const data = JSON.parse(msg.toString()) as Record; + const contextId = (data.contextId || data.context_id) as string | undefined; + + if (!contextId || !this.contextData.has(contextId)) { + continue; + } + + const context = this.contextData.get(contextId)!; + + if (data.error) { + this.logger.error({ contextId, error: data.error }, 'ElevenLabs error'); + this.contextData.delete(contextId); + continue; + } + + if (data.audio) { + const audioBuffer = Buffer.from(data.audio as string, 'base64'); + const audioArray = new Int8Array(audioBuffer); + context.audioBuffer.push(audioArray); + } + + if (data.isFinal) { + context.eos = true; + this.activeContexts.delete(contextId); + + if (!this.isCurrent && this.activeContexts.size === 0) { + this.logger.debug('No active contexts, shutting down'); + break; + } + } + } catch (parseError) { + this.logger.warn({ parseError }, 'Failed to parse message'); + } + } + } catch (error) { + this.logger.error({ error }, 'Recv loop error'); + for (const context of this.contextData.values()) { + context.eos = true; + } + } finally { + if (!this.closed) { + await this.close(); + } + } + } +} + +// ============================================================================ +// TTS Implementation +// ============================================================================ + export class TTS extends tts.TTS { #opts: TTSOptions; + #logger = log(); + #connection: WebSocketManager | null = null; + #connectionLock: Promise | null = null; label = 'elevenlabs.TTS'; constructor(opts: Partial = {}) { @@ -117,6 +496,38 @@ export class TTS extends tts.TTS { }); } + async getCurrentConnection(): Promise { + // Wait for any ongoing connection attempt + if (this.#connectionLock) { + await this.#connectionLock; + if (this.#connection && !this.#connection.isClosed) { + return this.#connection; + } + } + + // Create new lock for this connection attempt + const newConnectionLock = (async () => { + // Mark old connection as non-current if it exists + if (this.#connection && !this.#connection.isClosed) { + this.#connection.markNonCurrent(); + } + + // Create and connect new manager + const manager = new WebSocketManager(this.#opts); + await manager.connect(); + this.#connection = manager; + })(); + + this.#connectionLock = newConnectionLock; + try { + await newConnectionLock; + } finally { + this.#connectionLock = null; + } + + return this.#connection!; + } + synthesize(): tts.ChunkedStream { throw new Error('Chunked responses are not supported on ElevenLabs TTS'); } @@ -124,123 +535,86 @@ export class TTS extends tts.TTS { stream(): tts.SynthesizeStream { return new SynthesizeStream(this, this.#opts); } + + async aclose(): Promise { + if (this.#connection) { + await this.#connection.close(); + this.#connection = null; + } + } } export class SynthesizeStream extends tts.SynthesizeStream { #opts: TTSOptions; #logger = log(); + #tts: TTS; + #contextId: string; + #connection: WebSocketManager | null = null; label = 'elevenlabs.SynthesizeStream'; - readonly streamURL: URL; constructor(tts: TTS, opts: TTSOptions) { super(tts); + this.#tts = tts; this.#opts = opts; - this.closed = false; - - // add trailing slash to URL if needed - const baseURL = opts.baseURL + (opts.baseURL.endsWith('/') ? '' : '/'); - - this.streamURL = new URL(`text-to-speech/${opts.voice.id}/stream-input`, baseURL); - const params = { - model_id: opts.modelID, - output_format: opts.encoding, - enable_ssml_parsing: `${opts.enableSsmlParsing}`, - sync_alignment: `${opts.syncAlignment}`, - ...(opts.autoMode !== undefined && { auto_mode: `${opts.autoMode}` }), - ...(opts.languageCode && { language_code: opts.languageCode }), - ...(opts.inactivityTimeout && { inactivity_timeout: `${opts.inactivityTimeout}` }), - ...(opts.streamingLatency && { optimize_streaming_latency: `${opts.streamingLatency}` }), - }; - Object.entries(params).forEach(([k, v]) => this.streamURL.searchParams.append(k, v)); - this.streamURL.protocol = this.streamURL.protocol.replace('http', 'ws'); + this.#contextId = shortuuid(); } protected async run() { - const segments = new AsyncIterableQueue(); - - const tokenizeInput = async () => { - let stream: tokenize.WordStream | null = null; - for await (const text of this.input) { - if (this.abortController.signal.aborted) { - break; + try { + // Get persistent connection + this.#connection = await this.#tts.getCurrentConnection(); + this.#connection.registerContext(this.#contextId); + + const segments = new AsyncIterableQueue(); + + const tokenizeInput = async () => { + let stream: tokenize.WordStream | null = null; + for await (const text of this.input) { + if (this.abortController.signal.aborted) { + break; + } + if (text === SynthesizeStream.FLUSH_SENTINEL) { + stream?.endInput(); + stream = null; + } else { + if (!stream) { + stream = this.#opts.wordTokenizer.stream(); + segments.put(stream); + } + stream.pushText(text); + } } - if (text === SynthesizeStream.FLUSH_SENTINEL) { - stream?.endInput(); - stream = null; - } else { - if (!stream) { - stream = this.#opts.wordTokenizer.stream(); - segments.put(stream); + segments.close(); + }; + + const runStream = async () => { + for await (const stream of segments) { + if (this.abortController.signal.aborted) { + break; } - stream.pushText(text); + await this.runSynthesis(stream); + this.queue.put(SynthesizeStream.END_OF_STREAM); } - } - segments.close(); - }; + }; - const runStream = async () => { - for await (const stream of segments) { - if (this.abortController.signal.aborted) { - break; + await Promise.all([tokenizeInput(), runStream()]); + } finally { + if (this.#connection) { + try { + this.#connection.closeContext(this.#contextId); + } catch { + // Connection may be closed } - await this.#runWS(stream); - this.queue.put(SynthesizeStream.END_OF_STREAM); } - }; - - await Promise.all([tokenizeInput(), runStream()]); + } } - async #runWS(stream: tokenize.WordStream, maxRetry = 3) { - let retries = 0; - let ws: WebSocket; - while (true) { - ws = new WebSocket(this.streamURL, { - headers: { [AUTHORIZATION_HEADER]: this.#opts.apiKey }, - }); - - ws.on('error', (error) => { - this.abortController.abort(); - this.#logger.error({ error }, 'Error connecting to ElevenLabs'); - }); - - try { - await new Promise((resolve, reject) => { - ws.on('open', resolve); - ws.on('error', (error) => reject(error)); - ws.on('close', (code) => reject(`WebSocket returned ${code}`)); - }); - break; - } catch (e) { - if (retries >= maxRetry) { - throw new Error(`failed to connect to ElevenLabs after ${retries} attempts: ${e}`); - } - - const delay = Math.min(retries * 5, 5); - retries++; - - this.#logger.warn( - `failed to connect to ElevenLabs, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay * 1000)); - } + private async runSynthesis(stream: tokenize.WordStream): Promise { + if (!this.#connection) { + throw new Error('Connection not established'); } - const requestId = shortuuid(); - const segmentId = shortuuid(); - - ws.send( - JSON.stringify({ - text: ' ', - voice_settings: this.#opts.voice.settings, - ...(this.#opts.chunkLengthSchedule && { - generation_config: { - chunk_length_schedule: this.#opts.chunkLengthSchedule, - }, - }), - }), - ); - let eosSent = false; + const bstream = new AudioByteStream(sampleRateFromFormat(this.#opts.encoding), 1); const sendTask = async () => { let xmlContent: string[] = []; @@ -260,74 +634,55 @@ export class SynthesizeStream extends tts.SynthesizeStream { } } - ws.send(JSON.stringify({ text: text + ' ' })); // must always end with a space + this.#connection!.sendContent(this.#contextId, text, false); } if (xmlContent.length) { - this.#logger.warn('ElevenLabs stream ended with incomplete XML content'); + this.#logger.warn('Stream ended with incomplete XML content'); } - // no more tokens, mark eos - ws.send(JSON.stringify({ text: '' })); - eosSent = true; + // Signal end of stream + this.#connection!.sendContent(this.#contextId, '', true); }; let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { if (lastFrame) { - this.queue.put({ requestId, segmentId, frame: lastFrame, final }); + this.queue.put({ + requestId: this.#contextId, + segmentId, + frame: lastFrame, + final, + }); lastFrame = undefined; } }; const listenTask = async () => { - let finalReceived = false; - const bstream = new AudioByteStream(sampleRateFromFormat(this.#opts.encoding), 1); - while (!this.closed && !this.abortController.signal.aborted) { - try { - await new Promise((resolve, reject) => { - ws.removeAllListeners(); - ws.on('message', (data) => resolve(data)); - ws.on('close', (code, reason) => { - if (!eosSent) { - this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - } - if (!finalReceived) { - reject(new Error('WebSocket closed')); - } - }); - }).then((msg) => { - const json = JSON.parse(msg.toString()); - // remove the "audio" field from the json object when printing - if ('audio' in json && json.audio !== null) { - const data = new Int8Array(Buffer.from(json.audio, 'base64')); - for (const frame of bstream.write(data)) { - sendLastFrame(segmentId, false); - lastFrame = frame; - } - } else if (json.isFinal) { - finalReceived = true; - for (const frame of bstream.flush()) { - sendLastFrame(segmentId, false); - lastFrame = frame; - } - sendLastFrame(segmentId, true); - this.queue.put(SynthesizeStream.END_OF_STREAM); - - if (segmentId === requestId || this.abortController.signal.aborted) { - ws.close(); - return; - } - } - }); - } catch (err) { - // skip log error for normal websocket close - if (err instanceof Error && !err.message.includes('WebSocket closed')) { - this.#logger.error({ err }, 'Error in listenTask from ElevenLabs WebSocket'); + // Wait for EOS and collect audio + while (!this.#connection!.isContextEOS(this.#contextId)) { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + // Get all audio buffers and process + const audioBuffers = this.#connection!.getContextAudio(this.#contextId); + if (audioBuffers) { + for (const buffer of audioBuffers) { + for (const frame of bstream.write(buffer)) { + sendLastFrame(this.#contextId, false); + lastFrame = frame; } - break; } } + + // Flush remaining frames + for (const frame of bstream.flush()) { + sendLastFrame(this.#contextId, false); + lastFrame = frame; + } + + sendLastFrame(this.#contextId, true); + this.queue.put(SynthesizeStream.END_OF_STREAM); }; await Promise.all([sendTask(), listenTask()]);