From c9a53a542795fb2cdb0360e86faa1da8ec119460 Mon Sep 17 00:00:00 2001 From: simon Date: Fri, 14 Nov 2025 13:03:32 +0100 Subject: [PATCH 1/8] Multi Stream WebSocket Connection for ElevenLabs TTS based on the work of @Devesh36 --- plugins/elevenlabs/src/tts.ts | 720 +++++++++++++++++++++++++--------- 1 file changed, 545 insertions(+), 175 deletions(-) diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index f3aa6e874..2b10a4145 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -10,11 +10,11 @@ import { tts, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; -import { URL } from 'node:url'; import { type RawData, WebSocket } from 'ws'; import type { TTSEncoding, TTSModels } from './models.js'; const DEFAULT_INACTIVITY_TIMEOUT = 300; +const AUTHORIZATION_HEADER = 'xi-api-key'; type Voice = { id: string; @@ -43,7 +43,6 @@ const DEFAULT_VOICE: Voice = { }; const API_BASE_URL_V1 = 'https://api.elevenlabs.io/v1/'; -const AUTHORIZATION_HEADER = 'xi-api-key'; export interface TTSOptions { apiKey?: string; @@ -72,8 +71,423 @@ const defaultTTSOptionsBase = { syncAlignment: true, }; +// ============================================================================ +// WebSocket Connection Manager - Manages persistent connection with multi-stream support +// ============================================================================ + +interface DeferredPromise { + promise: Promise; + resolve: (value: T) => void; + reject: (reason?: any) => void; +} + +interface StreamContext { + contextId: string; + audioQueue: AsyncIterableQueue; + eos: DeferredPromise; + timeoutTimer: NodeJS.Timeout | null; + timeoutSeconds: number; +} + +interface SynthesizeContent { + type: 'synthesize'; + contextId: string; + text: string; + flush: boolean; +} + +interface CloseContext { + type: 'close'; + contextId: string; +} + +type QueueMessage = SynthesizeContent | CloseContext; + +/** + * Manages a single persistent WebSocket connection for multi-stream TTS. + * Allows multiple synthesize requests to share one connection via context IDs. + */ +class WebSocketManager { + private ws: WebSocket | null = null; + private opts: TTSOptions; + private logger = log(); + private inputQueue = new AsyncIterableQueue(); + private contextData = new Map(); + private activeContexts = new Set(); + private sendTask: Promise | null = null; + private recvTask: Promise | null = null; + private keepaliveInterval: NodeJS.Timeout | null = null; + private closed = false; + private isCurrent = true; + + constructor(opts: TTSOptions) { + this.opts = opts; + } + + async connect(): Promise { + if (this.ws || this.closed) { + return; + } + + const url = this.buildMultiStreamUrl(); + const headers = { + [AUTHORIZATION_HEADER]: this.opts.apiKey!, + }; + + this.ws = new WebSocket(url, { headers }); + + // Wait for connection to open + await new Promise((resolve, reject) => { + if (!this.ws) { + reject(new Error('WebSocket not initialized')); + return; + } + this.ws.once('open', resolve); + this.ws.once('error', (error) => reject(error)); + this.ws.once('close', (code) => reject(`WebSocket returned ${code}`)); + }); + + // Start keepalive ping + this.keepaliveInterval = setInterval(() => { + try { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.ping(); + } + } catch { + if (this.keepaliveInterval) { + clearInterval(this.keepaliveInterval); + this.keepaliveInterval = null; + } + } + }, 5000); + + // Start send and recv loops + this.sendTask = this.sendLoop(); + this.recvTask = this.recvLoop(); + } + + registerContext(contextId: string, timeoutSeconds: number = 30): void { + if (!this.contextData.has(contextId)) { + const eos: DeferredPromise = {} as DeferredPromise; + eos.promise = new Promise((resolve, reject) => { + eos.resolve = resolve; + eos.reject = reject; + }); + + this.contextData.set(contextId, { + contextId, + audioQueue: new AsyncIterableQueue(), + eos: eos, + timeoutTimer: null, + timeoutSeconds, + }); + } + } + + sendContent(contextId: string, text: string, flush: boolean = false): void { + if (this.closed || !this.ws || this.ws.readyState !== 1) { + throw new Error('WebSocket connection is closed'); + } + + this.inputQueue.put({ + type: 'synthesize', + contextId, + text, + flush, + }); + } + + closeContext(contextId: string): void { + if (this.closed || !this.ws || this.ws.readyState !== 1) { + throw new Error('WebSocket connection is closed'); + } + + this.inputQueue.put({ + type: 'close', + contextId, + }); + } + + getContextAudioQueue(contextId: string): AsyncIterableQueue | null { + return this.contextData.get(contextId)?.audioQueue ?? null; + } + + getContextEOSPromise(contextId: string): Promise | null { + return this.contextData.get(contextId)?.eos.promise ?? null; + } + + markNonCurrent(): void { + this.isCurrent = false; + } + + get isClosed(): boolean { + return this.closed; + } + + private cleanupContext(contextId: string): void { + const ctx = this.contextData.get(contextId); + if (ctx) { + if (ctx.timeoutTimer) { + clearTimeout(ctx.timeoutTimer); + } + ctx.audioQueue.close(); + } + this.contextData.delete(contextId); + this.activeContexts.delete(contextId); + } + + private startTimeoutTimer(contextId: string): void { + const ctx = this.contextData.get(contextId); + if (!ctx || ctx.timeoutTimer) { + return; + } + + ctx.timeoutTimer = setTimeout(() => { + this.logger.error( + { contextId }, + `TTS: Context timed out after ${ctx.timeoutSeconds} seconds`, + ); + ctx.eos.reject(new Error(`TTS timed out after ${ctx.timeoutSeconds} seconds`)); + this.cleanupContext(contextId); + }, ctx.timeoutSeconds * 1000); + } + + async close(): Promise { + if (this.closed) { + return; + } + + this.closed = true; + this.inputQueue.close(); + + // Clear all timeout timers + for (const ctx of this.contextData.values()) { + if (ctx.timeoutTimer) { + clearTimeout(ctx.timeoutTimer); + } + } + + this.contextData.clear(); + this.activeContexts.clear(); + + if (this.keepaliveInterval) { + clearInterval(this.keepaliveInterval); + this.keepaliveInterval = null; + } + + if (this.ws) { + this.ws.removeAllListeners(); + this.ws.close(); + this.ws = null; + } + + if (this.sendTask) { + try { + await this.sendTask; + } catch { + // Expected when queue closes + } + } + + if (this.recvTask) { + try { + await this.recvTask; + } catch { + // Expected when connection closes + } + } + } + + private buildMultiStreamUrl(): string { + const baseURL = this.opts.baseURL + .replace('https://', 'wss://') + .replace('http://', 'ws://') + .replace(/\/$/, ''); + + const voiceId = this.opts.voice.id; + let urlStr = `${baseURL}/text-to-speech/${voiceId}/multi-stream-input?`; + + const params: string[] = []; + params.push(`model_id=${this.opts.modelID}`); + params.push(`output_format=${this.opts.encoding}`); + params.push(`enable_ssml_parsing=${this.opts.enableSsmlParsing}`); + params.push(`sync_alignment=${this.opts.syncAlignment}`); + params.push(`inactivity_timeout=${this.opts.inactivityTimeout}`); + + if (this.opts.streamingLatency !== undefined) { + params.push(`optimize_streaming_latency=${this.opts.streamingLatency}`); + } + + if (this.opts.autoMode !== undefined) { + params.push(`auto_mode=${this.opts.autoMode}`); + } + + if (this.opts.languageCode) { + params.push(`language_code=${this.opts.languageCode}`); + } + + urlStr += params.join('&'); + return urlStr; + } + + private async sendLoop(): Promise { + try { + for await (const msg of this.inputQueue) { + if (!this.ws || this.ws.readyState !== 1) { + break; + } + + if (msg.type === 'synthesize') { + const isNewContext = !this.activeContexts.has(msg.contextId); + + // If not current and new context, ignore (connection is draining) + if (!this.isCurrent && isNewContext) { + continue; + } + + if (isNewContext) { + const voiceSettings = this.opts.voice.settings || {}; + const initPkt = { + text: ' ', + voice_settings: voiceSettings, + context_id: msg.contextId, + ...(this.opts.chunkLengthSchedule && { + generation_config: { + chunk_length_schedule: this.opts.chunkLengthSchedule, + }, + }), + }; + + this.ws.send(JSON.stringify(initPkt)); + this.activeContexts.add(msg.contextId); + } + + const pkt: { text: string; context_id: string; flush?: boolean } = { + text: msg.text, + context_id: msg.contextId, + }; + if (msg.flush) { + pkt.flush = true; + } + + // Start timeout timer for this context + this.startTimeoutTimer(msg.contextId); + + this.ws.send(JSON.stringify(pkt)); + } else if (msg.type === 'close') { + if (this.activeContexts.has(msg.contextId)) { + const closePkt = { + context_id: msg.contextId, + close_context: true, + }; + this.ws.send(JSON.stringify(closePkt)); + this.activeContexts.delete(msg.contextId); + } + } else { + this.logger.error(`TTS: Unknown msg type: ${msg}`); + } + } + } catch (error) { + this.logger.error({ error }, 'TTS: Error in send loop'); + } finally { + if (!this.closed) { + await this.close(); + } + } + } + + private async recvLoop(): Promise { + try { + await new Promise((resolve, reject) => { + if (!this.ws) { + reject(new Error('WebSocket not available')); + return; + } + + this.ws.on('message', (msg: RawData) => { + try { + const data = JSON.parse(msg.toString()) as Record; + const contextId = (data.contextId || data.context_id) as string | undefined; + + if (!contextId || !this.contextData.has(contextId)) { + return; + } + + const context = this.contextData.get(contextId)!; + + this.logger.debug({ data }, 'TTS: Incoming message'); + + if (data.error) { + this.logger.error({ contextId, error: data.error }, 'TTS: ElevenLabs error'); + this.cleanupContext(contextId); + return; + } + + if (data.audio) { + const audioBuffer = Buffer.from(data.audio as string, 'base64'); + const audioArray = new Int8Array(audioBuffer); + context.audioQueue.put(audioArray); + + // Cancel timeout when audio is received + if (context.timeoutTimer) { + clearTimeout(context.timeoutTimer); + context.timeoutTimer = null; + } + } + + if (data.isFinal) { + context.eos.resolve(); + this.cleanupContext(contextId); + + if (!this.isCurrent && this.activeContexts.size === 0) { + this.logger.debug('TTS: No active contexts, shutting down'); + resolve(); + } + } + + if (this.closed) { + resolve(); + } + } catch (parseError) { + this.logger.warn({ parseError }, 'TTS: Failed to parse message'); + } + }); + + this.ws.once('close', (code, reason) => { + if (!this.closed) { + this.logger.error(`TTS: WebSocket closed unexpectedly with code ${code}: ${reason}`); + reject(new Error('WebSocket closed')); + } else { + resolve(); + } + }); + + this.ws.once('error', (error) => { + this.logger.error({ error }, 'TTS: WebSocket error'); + reject(error); + }); + }); + } catch (error) { + this.logger.error({ error }, 'TTS: Recv loop error'); + for (const context of this.contextData.values()) { + context.eos.reject(error); + } + } finally { + if (!this.closed) { + await this.close(); + } + } + } +} + +// ============================================================================ +// TTS Implementation +// ============================================================================ + export class TTS extends tts.TTS { #opts: TTSOptions; + #connection: WebSocketManager | null = null; + #connectionLock: Promise | null = null; label = 'elevenlabs.TTS'; constructor(opts: Partial = {}) { @@ -136,6 +550,38 @@ export class TTS extends tts.TTS { }); } + async getCurrentConnection(): Promise { + // Wait for any ongoing connection attempt + if (this.#connectionLock) { + await this.#connectionLock; + if (this.#connection && !this.#connection.isClosed) { + return this.#connection; + } + } + + // Create new lock for this connection attempt + const newConnectionLock = (async () => { + // Mark old connection as non-current if it exists + if (this.#connection && !this.#connection.isClosed) { + this.#connection.markNonCurrent(); + } + + // Create and connect new manager + const manager = new WebSocketManager(this.#opts); + await manager.connect(); + this.#connection = manager; + })(); + + this.#connectionLock = newConnectionLock; + try { + await newConnectionLock; + } finally { + this.#connectionLock = null; + } + + return this.#connection!; + } + synthesize(): tts.ChunkedStream { throw new Error('Chunked responses are not supported on ElevenLabs TTS'); } @@ -143,144 +589,93 @@ export class TTS extends tts.TTS { stream(): tts.SynthesizeStream { return new SynthesizeStream(this, this.#opts); } + + async aclose(): Promise { + if (this.#connection) { + await this.#connection.close(); + this.#connection = null; + } + } } export class SynthesizeStream extends tts.SynthesizeStream { #opts: TTSOptions; #logger = log(); + #tts: TTS; + #contextId: string; + #connection: WebSocketManager | null = null; label = 'elevenlabs.SynthesizeStream'; - readonly streamURL: URL; constructor(tts: TTS, opts: TTSOptions) { super(tts); + this.#tts = tts; this.#opts = opts; - this.closed = false; - - // add trailing slash to URL if needed - const baseURL = opts.baseURL + (opts.baseURL.endsWith('/') ? '' : '/'); - - this.streamURL = new URL(`text-to-speech/${opts.voice.id}/stream-input`, baseURL); - const params = { - model_id: opts.modelID, - output_format: opts.encoding, - enable_ssml_parsing: `${opts.enableSsmlParsing}`, - sync_alignment: `${opts.syncAlignment}`, - ...(opts.autoMode !== undefined && { auto_mode: `${opts.autoMode}` }), - ...(opts.languageCode && { language_code: opts.languageCode }), - ...(opts.inactivityTimeout && { inactivity_timeout: `${opts.inactivityTimeout}` }), - ...(opts.streamingLatency && { optimize_streaming_latency: `${opts.streamingLatency}` }), - }; - Object.entries(params).forEach(([k, v]) => this.streamURL.searchParams.append(k, v)); - this.streamURL.protocol = this.streamURL.protocol.replace('http', 'ws'); + this.#contextId = shortuuid(); } protected async run() { - const segments = new AsyncIterableQueue(); - - const tokenizeInput = async () => { - let stream: tokenize.WordStream | tokenize.SentenceStream | null = null; - for await (const text of this.input) { - if (this.abortController.signal.aborted) { - break; - } - if (text === SynthesizeStream.FLUSH_SENTINEL) { - stream?.endInput(); - stream = null; - } else { - if (!stream) { - stream = this.#opts.wordTokenizer.stream(); - segments.put(stream); + try { + // Get persistent connection + this.#connection = await this.#tts.getCurrentConnection(); + this.#connection.registerContext(this.#contextId); + + const segments = new AsyncIterableQueue(); + + const tokenizeInput = async () => { + let stream: tokenize.WordStream | tokenize.SentenceStream | null = null; + for await (const text of this.input) { + if (this.abortController.signal.aborted) { + break; + } + if (text === SynthesizeStream.FLUSH_SENTINEL) { + stream?.endInput(); + stream = null; + } else { + if (!stream) { + stream = this.#opts.wordTokenizer.stream(); + segments.put(stream); + } + stream.pushText(text); } - stream.pushText(text); } - } - segments.close(); - }; + segments.close(); + }; - const runStream = async () => { - for await (const stream of segments) { - if (this.abortController.signal.aborted) { - break; + const runStream = async () => { + for await (const stream of segments) { + if (this.abortController.signal.aborted) { + break; + } + await this.runSynthesis(stream); + this.queue.put(SynthesizeStream.END_OF_STREAM); } - await this.#runWS(stream); - this.queue.put(SynthesizeStream.END_OF_STREAM); - } - }; - - await Promise.all([tokenizeInput(), runStream()]); - } - - async #runWS(stream: tokenize.WordStream | tokenize.SentenceStream, maxRetry = 3) { - let retries = 0; - let ws: WebSocket; - while (true) { - ws = new WebSocket(this.streamURL, { - headers: { [AUTHORIZATION_HEADER]: this.#opts.apiKey }, - }); - - ws.on('error', (error) => { - this.abortController.abort(); - this.#logger.error({ error }, 'Error connecting to ElevenLabs'); - }); + }; - try { - await new Promise((resolve, reject) => { - ws.on('open', resolve); - ws.on('error', (error) => reject(error)); - ws.on('close', (code) => reject(`WebSocket returned ${code}`)); - }); - break; - } catch (e) { - if (retries >= maxRetry) { - throw new Error(`failed to connect to ElevenLabs after ${retries} attempts: ${e}`); + await Promise.all([tokenizeInput(), runStream()]); + } finally { + if (this.#connection) { + try { + this.#connection.closeContext(this.#contextId); + } catch { + // Connection may be closed } - - const delay = Math.min(retries * 5, 5); - retries++; - - this.#logger.warn( - `failed to connect to ElevenLabs, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay * 1000)); } } + } - const requestId = shortuuid(); - const segmentId = shortuuid(); - - // simple helper to make sure what we send to ws.send - const wsSend = (data: { - // (SynthesizeContent from python) - text: string; - // setting flush somehow never finishes the current speech generation - // https://github.com/livekit/agents-js/pull/820#issuecomment-3517138706 - // flush?: boolean; - // initialization - voice_settings?: VoiceSettings; - generation_config?: { - chunk_length_schedule: number[]; - }; - }) => { - ws.send(JSON.stringify(data)); - }; + private async runSynthesis(stream: tokenize.WordStream | tokenize.SentenceStream): Promise { + if (!this.#connection) { + throw new Error('Connection not established'); + } - wsSend({ - text: ' ', - voice_settings: this.#opts.voice.settings, - ...(this.#opts.chunkLengthSchedule && { - generation_config: { - chunk_length_schedule: this.#opts.chunkLengthSchedule, - }, - }), - }); - let eosSent = false; + const bstream = new AudioByteStream(sampleRateFromFormat(this.#opts.encoding), 1); const sendTask = async () => { // Determine if we should flush on each chunk (sentence) - /*const flushOnChunk = + const flushOnChunk = this.#opts.wordTokenizer instanceof tokenize.SentenceTokenizer && this.#opts.autoMode !== undefined && - this.#opts.autoMode;*/ + this.#opts.autoMode; let xmlContent: string[] = []; for await (const data of stream) { @@ -299,88 +694,63 @@ export class SynthesizeStream extends tts.SynthesizeStream { } } - wsSend({ - text: text + ' ', // must always end with a space - // ...(flushOnChunk && { flush: true }), - }); + this.#connection!.sendContent(this.#contextId, text + ' ', flushOnChunk); } if (xmlContent.length) { - this.#logger.warn('ElevenLabs stream ended with incomplete XML content'); + this.#logger.warn('TTS: Stream ended with incomplete XML content'); } - // no more tokens, mark eos with flush - // setting flush somehow never finishes the current speech generation - // wsSend({ text: '', flush: true }); - wsSend({ text: '' }); - eosSent = true; + // Signal end of stream with flush + this.#connection!.sendContent(this.#contextId, '', true); + this.#connection!.closeContext(this.#contextId); }; let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { if (lastFrame) { - this.queue.put({ requestId, segmentId, frame: lastFrame, final }); + this.queue.put({ + requestId: this.#contextId, + segmentId, + frame: lastFrame, + final, + }); lastFrame = undefined; } }; const listenTask = async () => { - let finalReceived = false; - const bstream = new AudioByteStream(sampleRateFromFormat(this.#opts.encoding), 1); - while (!this.closed && !this.abortController.signal.aborted) { - try { - await new Promise((resolve, reject) => { - ws.removeAllListeners(); - ws.on('message', (data) => resolve(data)); - ws.on('close', (code, reason) => { - if (!eosSent) { - this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - } - if (!finalReceived) { - reject(new Error('WebSocket closed')); - } - }); - }).then((msg) => { - const json = JSON.parse(msg.toString()); - // remove the "audio" field from the json object when printing - if ('audio' in json && json.audio !== null) { - const data = Buffer.from(json.audio, 'base64'); - for (const frame of bstream.write( - data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength), - )) { - sendLastFrame(segmentId, false); - lastFrame = frame; - } - } else if (json.isFinal) { - finalReceived = true; - for (const frame of bstream.flush()) { - sendLastFrame(segmentId, false); - lastFrame = frame; - } - sendLastFrame(segmentId, true); - this.queue.put(SynthesizeStream.END_OF_STREAM); + const audioQueue = this.#connection!.getContextAudioQueue(this.#contextId); + if (!audioQueue) { + return; + } - if (segmentId === requestId || this.abortController.signal.aborted) { - ws.close(); - return; - } - } - }); - } catch (err) { - // skip log error for normal websocket close - if (err instanceof Error && !err.message.includes('WebSocket closed')) { - if (err.message.includes('Queue is closed')) { - this.#logger.warn( - { err }, - 'Queue closed during transcript processing (expected during disconnect)', - ); - } else { - this.#logger.error({ err }, 'Error in listenTask from ElevenLabs WebSocket'); - } + const eosPromise = this.#connection!.getContextEOSPromise(this.#contextId); + if (!eosPromise) { + return; + } + + // Process audio as it arrives, until EOS + const processAudio = async () => { + for await (const buffer of audioQueue) { + for (const frame of bstream.write(buffer)) { + sendLastFrame(this.#contextId, false); + lastFrame = frame; } - break; } + }; + + // Race between audio processing and EOS + await Promise.race([processAudio(), eosPromise]); + + // Flush remaining frames + for (const frame of bstream.flush()) { + sendLastFrame(this.#contextId, false); + lastFrame = frame; } + + sendLastFrame(this.#contextId, true); + this.queue.put(SynthesizeStream.END_OF_STREAM); }; await Promise.all([sendTask(), listenTask()]); From 9b85548fa12996fdc6a5b7c17b0cfdd413ecda73 Mon Sep 17 00:00:00 2001 From: simon Date: Fri, 14 Nov 2025 13:53:35 +0100 Subject: [PATCH 2/8] add alignment and improve queue draining --- plugins/elevenlabs/src/tts.ts | 222 +++++++++++++++++++++++++++++++++- 1 file changed, 218 insertions(+), 4 deletions(-) diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 2b10a4145..454f30cf9 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -44,6 +44,12 @@ const DEFAULT_VOICE: Voice = { const API_BASE_URL_V1 = 'https://api.elevenlabs.io/v1/'; +interface TimedWord { + text: string; + startTime: number; + endTime: number; +} + export interface TTSOptions { apiKey?: string; voice: Voice; @@ -57,6 +63,7 @@ export interface TTSOptions { enableSsmlParsing: boolean; inactivityTimeout: number; syncAlignment: boolean; + preferredAlignment: 'normalized' | 'original'; autoMode?: boolean; } @@ -69,8 +76,88 @@ const defaultTTSOptionsBase = { enableSsmlParsing: false, inactivityTimeout: DEFAULT_INACTIVITY_TIMEOUT, syncAlignment: true, + preferredAlignment: 'normalized' as const, }; +// ============================================================================ +// Helper Functions +// ============================================================================ + +/** + * Convert character-level timing to word-level timing + * Returns timed words and the remaining text buffer + */ +function toTimedWords( + text: string, + startTimesMs: number[], + durationsMs: number[], + flush: boolean = false, +): [TimedWord[], string] { + if (!text || startTimesMs.length === 0 || durationsMs.length === 0) { + return [[], '']; + } + + const { splitWords } = tokenize.basic; + + // Calculate timestamps (N+1) + const lastStartTime = startTimesMs[startTimesMs.length - 1]; + const lastDuration = durationsMs[durationsMs.length - 1]; + if (lastStartTime === undefined || lastDuration === undefined) { + return [[], text]; + } + const timestamps = [...startTimesMs, lastStartTime + lastDuration]; + + // Split text into words + const words = splitWords(text, false); + const timedWords: TimedWord[] = []; + + if (words.length === 0) { + return [[], text]; + } + + const startIndices = words.map((w) => w[1]); + let end = 0; + + // We don't know if the last word is complete, always leave it as remaining + for (let i = 0; i < startIndices.length - 1; i++) { + const start = startIndices[i]; + const nextStart = startIndices[i + 1]; + if (start === undefined || nextStart === undefined) continue; + end = nextStart; + const startT = timestamps[start]; + const endT = timestamps[end]; + if (startT === undefined || endT === undefined) continue; + timedWords.push({ + text: text.substring(start, end), + startTime: startT / 1000, + endTime: endT / 1000, + }); + } + + if (flush && startIndices.length > 0) { + const start = startIndices[startIndices.length - 1]; + if (start !== undefined) { + end = text.length; + const startT = timestamps[start]; + const endT = timestamps[timestamps.length - 1]; + if (startT !== undefined && endT !== undefined) { + timedWords.push({ + text: text.substring(start, end), + startTime: startT / 1000, + endTime: endT / 1000, + }); + } + } + } else if (startIndices.length > 0) { + const lastStart = startIndices[startIndices.length - 1]; + if (lastStart !== undefined) { + end = lastStart; + } + } + + return [timedWords, text.substring(end)]; +} + // ============================================================================ // WebSocket Connection Manager - Manages persistent connection with multi-stream support // ============================================================================ @@ -78,15 +165,19 @@ const defaultTTSOptionsBase = { interface DeferredPromise { promise: Promise; resolve: (value: T) => void; - reject: (reason?: any) => void; + reject: (reason?: unknown) => void; } interface StreamContext { contextId: string; audioQueue: AsyncIterableQueue; + transcriptQueue: AsyncIterableQueue; eos: DeferredPromise; timeoutTimer: NodeJS.Timeout | null; timeoutSeconds: number; + textBuffer: string; + startTimesMs: number[]; + durationsMs: number[]; } interface SynthesizeContent { @@ -177,9 +268,13 @@ class WebSocketManager { this.contextData.set(contextId, { contextId, audioQueue: new AsyncIterableQueue(), + transcriptQueue: new AsyncIterableQueue(), eos: eos, timeoutTimer: null, timeoutSeconds, + textBuffer: '', + startTimesMs: [], + durationsMs: [], }); } } @@ -212,6 +307,10 @@ class WebSocketManager { return this.contextData.get(contextId)?.audioQueue ?? null; } + getContextTranscriptQueue(contextId: string): AsyncIterableQueue | null { + return this.contextData.get(contextId)?.transcriptQueue ?? null; + } + getContextEOSPromise(contextId: string): Promise | null { return this.contextData.get(contextId)?.eos.promise ?? null; } @@ -231,6 +330,7 @@ class WebSocketManager { clearTimeout(ctx.timeoutTimer); } ctx.audioQueue.close(); + ctx.transcriptQueue.close(); } this.contextData.delete(contextId); this.activeContexts.delete(contextId); @@ -423,6 +523,73 @@ class WebSocketManager { return; } + // Process alignment data if available + const alignment = + this.opts.preferredAlignment === 'normalized' + ? data.normalizedAlignment + : data.alignment; + + if (alignment && typeof alignment === 'object') { + const alignmentObj = alignment as { + chars?: string[]; + charStartTimesMs?: number[]; + charsStartTimesMs?: number[]; + charDurationsMs?: number[]; + charsDurationsMs?: number[]; + }; + + const chars = alignmentObj.chars; + const starts = alignmentObj.charStartTimesMs || alignmentObj.charsStartTimesMs; + const durs = alignmentObj.charDurationsMs || alignmentObj.charsDurationsMs; + + if ( + chars && + starts && + durs && + chars.length === durs.length && + starts.length === durs.length + ) { + context.textBuffer += chars.join(''); + + // Handle multi-character items in chars array + for (let i = 0; i < chars.length; i++) { + const char = chars[i]; + const start = starts[i]; + const dur = durs[i]; + + if (char === undefined || start === undefined || dur === undefined) { + continue; + } + + if (char.length > 1) { + // Add padding for multi-character items + for (let j = 0; j < char.length - 1; j++) { + context.startTimesMs.push(start); + context.durationsMs.push(0); + } + } + context.startTimesMs.push(start); + context.durationsMs.push(dur); + } + + // Convert to timed words + const [timedWords, remainingText] = toTimedWords( + context.textBuffer, + context.startTimesMs, + context.durationsMs, + ); + + if (timedWords.length > 0) { + context.transcriptQueue.put(timedWords); + } + + // Update buffers with remaining text + context.textBuffer = remainingText; + context.startTimesMs = context.startTimesMs.slice(-remainingText.length); + context.durationsMs = context.durationsMs.slice(-remainingText.length); + } + } + if (data.audio) { const audioBuffer = Buffer.from(data.audio as string, 'base64'); const audioArray = new Int8Array(audioBuffer); @@ -436,6 +603,19 @@ class WebSocketManager { } if (data.isFinal) { + // Flush remaining text buffer + if (context.textBuffer.length > 0) { + const [timedWords] = toTimedWords( + context.textBuffer, + context.startTimesMs, + context.durationsMs, + true, + ); + if (timedWords.length > 0) { + context.transcriptQueue.put(timedWords); + } + } + context.eos.resolve(); this.cleanupContext(contextId); @@ -707,15 +887,18 @@ export class SynthesizeStream extends tts.SynthesizeStream { }; let lastFrame: AudioFrame | undefined; + let lastDeltaText: string | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { if (lastFrame) { this.queue.put({ requestId: this.#contextId, segmentId, frame: lastFrame, + deltaText: lastDeltaText, final, }); lastFrame = undefined; + lastDeltaText = undefined; } }; @@ -725,12 +908,17 @@ export class SynthesizeStream extends tts.SynthesizeStream { return; } + const transcriptQueue = this.#connection!.getContextTranscriptQueue(this.#contextId); + if (!transcriptQueue) { + return; + } + const eosPromise = this.#connection!.getContextEOSPromise(this.#contextId); if (!eosPromise) { return; } - // Process audio as it arrives, until EOS + // Process audio and transcript as they arrive, until EOS const processAudio = async () => { for await (const buffer of audioQueue) { for (const frame of bstream.write(buffer)) { @@ -740,8 +928,34 @@ export class SynthesizeStream extends tts.SynthesizeStream { } }; - // Race between audio processing and EOS - await Promise.race([processAudio(), eosPromise]); + const processTranscript = async () => { + for await (const timedWords of transcriptQueue) { + // Concatenate timed words into deltaText + // In a more sophisticated implementation, we could track timing + // and associate words with specific frames + const deltaText = timedWords.map((w) => w.text).join(''); + if (deltaText) { + lastDeltaText = deltaText; + } + } + }; + + // Start processing immediately, but catch errors of promises immediately + await new Promise(async (resolve, reject) => { + const audioTask = processAudio().catch(reject); + const transcriptTask = processTranscript().catch(reject); + + try { + // Wait for EOS to be signaled + await eosPromise; + + // Ensure both queues are fully drained after EOS + await Promise.all([audioTask, transcriptTask]); + resolve(); + } catch (err) { + reject(err); + } + }); // Flush remaining frames for (const frame of bstream.flush()) { From 56a0c817f7a3b5d5e0bc56ad543e022daa9879b9 Mon Sep 17 00:00:00 2001 From: simon Date: Fri, 14 Nov 2025 15:41:22 +0100 Subject: [PATCH 3/8] add changeset --- .changeset/sweet-suits-slide.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/sweet-suits-slide.md diff --git a/.changeset/sweet-suits-slide.md b/.changeset/sweet-suits-slide.md new file mode 100644 index 000000000..3371a27f2 --- /dev/null +++ b/.changeset/sweet-suits-slide.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents-plugin-elevenlabs': patch +--- + +multi stream websocket connection for ElevenLabs TTS From e38fab8dada8260309350b7c0aeee41887c641aa Mon Sep 17 00:00:00 2001 From: simon Date: Mon, 17 Nov 2025 13:57:54 +0100 Subject: [PATCH 4/8] address review comments --- plugins/elevenlabs/src/tts.ts | 205 ++++++++++++++++++++++------------ 1 file changed, 131 insertions(+), 74 deletions(-) diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 454f30cf9..4d698f4a1 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -4,12 +4,17 @@ import { AsyncIterableQueue, AudioByteStream, + Future, + Task, log, shortuuid, + stream, tokenize, tts, + waitForAbort, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; import { type RawData, WebSocket } from 'ws'; import type { TTSEncoding, TTSModels } from './models.js'; @@ -44,7 +49,7 @@ const DEFAULT_VOICE: Voice = { const API_BASE_URL_V1 = 'https://api.elevenlabs.io/v1/'; -interface TimedWord { +interface TimedString { text: string; startTime: number; endTime: number; @@ -92,7 +97,7 @@ function toTimedWords( startTimesMs: number[], durationsMs: number[], flush: boolean = false, -): [TimedWord[], string] { +): [TimedString[], string] { if (!text || startTimesMs.length === 0 || durationsMs.length === 0) { return [[], '']; } @@ -109,7 +114,7 @@ function toTimedWords( // Split text into words const words = splitWords(text, false); - const timedWords: TimedWord[] = []; + const timedWords: TimedString[] = []; if (words.length === 0) { return [[], text]; @@ -162,17 +167,11 @@ function toTimedWords( // WebSocket Connection Manager - Manages persistent connection with multi-stream support // ============================================================================ -interface DeferredPromise { - promise: Promise; - resolve: (value: T) => void; - reject: (reason?: unknown) => void; -} - interface StreamContext { contextId: string; - audioQueue: AsyncIterableQueue; - transcriptQueue: AsyncIterableQueue; - eos: DeferredPromise; + audioChannel: stream.StreamChannel; + transcriptChannel: stream.StreamChannel; + eos: Future; timeoutTimer: NodeJS.Timeout | null; timeoutSeconds: number; textBuffer: string; @@ -202,11 +201,11 @@ class WebSocketManager { private ws: WebSocket | null = null; private opts: TTSOptions; private logger = log(); - private inputQueue = new AsyncIterableQueue(); + private inputChannel = stream.createStreamChannel(); private contextData = new Map(); private activeContexts = new Set(); - private sendTask: Promise | null = null; - private recvTask: Promise | null = null; + private sendTask: Task | null = null; + private recvTask: Task | null = null; private keepaliveInterval: NodeJS.Timeout | null = null; private closed = false; private isCurrent = true; @@ -228,15 +227,17 @@ class WebSocketManager { this.ws = new WebSocket(url, { headers }); // Wait for connection to open - await new Promise((resolve, reject) => { - if (!this.ws) { - reject(new Error('WebSocket not initialized')); - return; - } - this.ws.once('open', resolve); - this.ws.once('error', (error) => reject(error)); - this.ws.once('close', (code) => reject(`WebSocket returned ${code}`)); - }); + const connFut = new Future(); + + if (!this.ws) { + throw new Error('WebSocket not initialized'); + } + + this.ws.once('open', () => connFut.resolve()); + this.ws.once('error', (error) => connFut.reject(error)); + this.ws.once('close', (code) => connFut.reject(new Error(`WebSocket returned ${code}`))); + + await connFut.await; // Start keepalive ping this.keepaliveInterval = setInterval(() => { @@ -253,23 +254,24 @@ class WebSocketManager { }, 5000); // Start send and recv loops - this.sendTask = this.sendLoop(); - this.recvTask = this.recvLoop(); + const abortController = new AbortController(); + this.sendTask = Task.from( + async (controller) => await this.sendLoop(controller), + abortController, + ); + this.recvTask = Task.from( + async (controller) => await this.recvLoop(controller), + abortController, + ); } registerContext(contextId: string, timeoutSeconds: number = 30): void { if (!this.contextData.has(contextId)) { - const eos: DeferredPromise = {} as DeferredPromise; - eos.promise = new Promise((resolve, reject) => { - eos.resolve = resolve; - eos.reject = reject; - }); - this.contextData.set(contextId, { contextId, - audioQueue: new AsyncIterableQueue(), - transcriptQueue: new AsyncIterableQueue(), - eos: eos, + audioChannel: stream.createStreamChannel(), + transcriptChannel: stream.createStreamChannel(), + eos: new Future(), timeoutTimer: null, timeoutSeconds, textBuffer: '', @@ -284,7 +286,7 @@ class WebSocketManager { throw new Error('WebSocket connection is closed'); } - this.inputQueue.put({ + this.inputChannel.write({ type: 'synthesize', contextId, text, @@ -297,22 +299,22 @@ class WebSocketManager { throw new Error('WebSocket connection is closed'); } - this.inputQueue.put({ + this.inputChannel.write({ type: 'close', contextId, }); } - getContextAudioQueue(contextId: string): AsyncIterableQueue | null { - return this.contextData.get(contextId)?.audioQueue ?? null; + getContextAudioStream(contextId: string): ReadableStream | null { + return this.contextData.get(contextId)?.audioChannel.stream() ?? null; } - getContextTranscriptQueue(contextId: string): AsyncIterableQueue | null { - return this.contextData.get(contextId)?.transcriptQueue ?? null; + getContextTranscriptStream(contextId: string): ReadableStream | null { + return this.contextData.get(contextId)?.transcriptChannel.stream() ?? null; } getContextEOSPromise(contextId: string): Promise | null { - return this.contextData.get(contextId)?.eos.promise ?? null; + return this.contextData.get(contextId)?.eos.await ?? null; } markNonCurrent(): void { @@ -329,8 +331,8 @@ class WebSocketManager { if (ctx.timeoutTimer) { clearTimeout(ctx.timeoutTimer); } - ctx.audioQueue.close(); - ctx.transcriptQueue.close(); + ctx.audioChannel.close(); + ctx.transcriptChannel.close(); } this.contextData.delete(contextId); this.activeContexts.delete(contextId); @@ -358,7 +360,7 @@ class WebSocketManager { } this.closed = true; - this.inputQueue.close(); + await this.inputChannel.close(); // Clear all timeout timers for (const ctx of this.contextData.values()) { @@ -383,7 +385,8 @@ class WebSocketManager { if (this.sendTask) { try { - await this.sendTask; + this.sendTask.cancel(); + await this.sendTask.result; } catch { // Expected when queue closes } @@ -391,7 +394,8 @@ class WebSocketManager { if (this.recvTask) { try { - await this.recvTask; + this.recvTask.cancel(); + await this.recvTask.result; } catch { // Expected when connection closes } @@ -430,9 +434,20 @@ class WebSocketManager { return urlStr; } - private async sendLoop(): Promise { + private async sendLoop(controller: AbortController): Promise { + const reader = this.inputChannel.stream().getReader(); try { - for await (const msg of this.inputQueue) { + while (!controller.signal.aborted) { + const result = await Promise.race([reader.read(), waitForAbort(controller.signal)]); + + if (result === undefined) return; // aborted + if (result.done) { + controller.abort(); + break; + } + + const msg = result.value; + if (!this.ws || this.ws.readyState !== 1) { break; } @@ -490,21 +505,26 @@ class WebSocketManager { } catch (error) { this.logger.error({ error }, 'TTS: Error in send loop'); } finally { + reader.releaseLock(); if (!this.closed) { await this.close(); } } } - private async recvLoop(): Promise { + private async recvLoop(controller: AbortController): Promise { try { - await new Promise((resolve, reject) => { + const listenMessage = new Promise((resolve, reject) => { if (!this.ws) { reject(new Error('WebSocket not available')); return; } this.ws.on('message', (msg: RawData) => { + if (controller.signal.aborted) { + return; + } + try { const data = JSON.parse(msg.toString()) as Record; const contextId = (data.contextId || data.context_id) as string | undefined; @@ -580,7 +600,7 @@ class WebSocketManager { ); if (timedWords.length > 0) { - context.transcriptQueue.put(timedWords); + context.transcriptChannel.write(timedWords); } // Update buffers with remaining text @@ -593,7 +613,7 @@ class WebSocketManager { if (data.audio) { const audioBuffer = Buffer.from(data.audio as string, 'base64'); const audioArray = new Int8Array(audioBuffer); - context.audioQueue.put(audioArray); + context.audioChannel.write(audioArray); // Cancel timeout when audio is received if (context.timeoutTimer) { @@ -612,7 +632,7 @@ class WebSocketManager { true, ); if (timedWords.length > 0) { - context.transcriptQueue.put(timedWords); + context.transcriptChannel.write(timedWords); } } @@ -647,10 +667,12 @@ class WebSocketManager { reject(error); }); }); + + await Promise.race([listenMessage, waitForAbort(controller.signal)]); } catch (error) { this.logger.error({ error }, 'TTS: Recv loop error'); for (const context of this.contextData.values()) { - context.eos.reject(error); + context.eos.reject(error as Error); } } finally { if (!this.closed) { @@ -858,11 +880,18 @@ export class SynthesizeStream extends tts.SynthesizeStream { this.#opts.autoMode; let xmlContent: string[] = []; - for await (const data of stream) { - if (this.abortController.signal.aborted) { + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + stream.next(), + waitForAbort(this.abortController.signal), + ]); + + if (result === undefined) break; // aborted + if (result.done) { break; } - let text = data.token; + + let text = result.value.token; if ((this.#opts.enableSsmlParsing && text.startsWith(' { - const audioQueue = this.#connection!.getContextAudioQueue(this.#contextId); - if (!audioQueue) { + const audioStream = this.#connection!.getContextAudioStream(this.#contextId); + if (!audioStream) { return; } - const transcriptQueue = this.#connection!.getContextTranscriptQueue(this.#contextId); - if (!transcriptQueue) { + const transcriptStream = this.#connection!.getContextTranscriptStream(this.#contextId); + if (!transcriptStream) { return; } @@ -920,23 +949,51 @@ export class SynthesizeStream extends tts.SynthesizeStream { // Process audio and transcript as they arrive, until EOS const processAudio = async () => { - for await (const buffer of audioQueue) { - for (const frame of bstream.write(buffer)) { - sendLastFrame(this.#contextId, false); - lastFrame = frame; + const reader = audioStream.getReader(); + try { + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + reader.read(), + waitForAbort(this.abortController.signal), + ]); + if (result === undefined) return; // aborted + if (result.done) { + break; + } + + for (const frame of bstream.write(result.value)) { + sendLastFrame(this.#contextId, false); + lastFrame = frame; + } } + } finally { + reader.releaseLock(); } }; const processTranscript = async () => { - for await (const timedWords of transcriptQueue) { - // Concatenate timed words into deltaText - // In a more sophisticated implementation, we could track timing - // and associate words with specific frames - const deltaText = timedWords.map((w) => w.text).join(''); - if (deltaText) { - lastDeltaText = deltaText; + const reader = transcriptStream.getReader(); + try { + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + reader.read(), + waitForAbort(this.abortController.signal), + ]); + if (result === undefined) return; // aborted + if (result.done) { + break; + } + + // Concatenate timed words into deltaText + // In a more sophisticated implementation, we could track timing + // and associate words with specific frames + const deltaText = result.value.map((w) => w.text).join(''); + if (deltaText) { + lastDeltaText = deltaText; + } } + } finally { + reader.releaseLock(); } }; @@ -949,7 +1006,7 @@ export class SynthesizeStream extends tts.SynthesizeStream { // Wait for EOS to be signaled await eosPromise; - // Ensure both queues are fully drained after EOS + // Ensure both streams are fully drained after EOS await Promise.all([audioTask, transcriptTask]); resolve(); } catch (err) { From 9980e5bb4bd1ca2fba38797cbf6797e53f724a46 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 19 Nov 2025 14:15:44 +0100 Subject: [PATCH 5/8] use waitForAbort and .next() to fullfill AsyncIterableQueue's requirements --- plugins/elevenlabs/src/tts.ts | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 4d698f4a1..b98fdb29e 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -825,11 +825,19 @@ export class SynthesizeStream extends tts.SynthesizeStream { const tokenizeInput = async () => { let stream: tokenize.WordStream | tokenize.SentenceStream | null = null; - for await (const text of this.input) { - if (this.abortController.signal.aborted) { + + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + this.input.next(), + waitForAbort(this.abortController.signal), + ]); + + if (result === undefined) break; // aborted + if (result.done) { break; } - if (text === SynthesizeStream.FLUSH_SENTINEL) { + + if (result.value === SynthesizeStream.FLUSH_SENTINEL) { stream?.endInput(); stream = null; } else { @@ -837,19 +845,26 @@ export class SynthesizeStream extends tts.SynthesizeStream { stream = this.#opts.wordTokenizer.stream(); segments.put(stream); } - stream.pushText(text); + stream.pushText(result.value); } } + segments.close(); }; const runStream = async () => { - for await (const stream of segments) { - if (this.abortController.signal.aborted) { + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + segments.next(), + waitForAbort(this.abortController.signal), + ]); + + if (result === undefined) break; // aborted + if (result.done) { break; } - await this.runSynthesis(stream); - this.queue.put(SynthesizeStream.END_OF_STREAM); + + await this.runSynthesis(result.value); } }; From ff754c9c9916ebdb1a5611d325e973d8ad1c4a5f Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 26 Nov 2025 12:28:23 +0100 Subject: [PATCH 6/8] add speed parameter --- plugins/elevenlabs/src/tts.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index b98fdb29e..7553b0776 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -32,6 +32,7 @@ type VoiceSettings = { stability: number; // 0..1 similarity_boost: number; // 0..1 style?: number; // 0..1 + speed?: number; // 0.8..1.2 use_speaker_boost: boolean; }; From a4c1eff2d5e6ce9400fbcafba8af1a17acfb0854 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 26 Nov 2025 13:39:56 +0100 Subject: [PATCH 7/8] apply #861 (Fix: Race Condition When Participant Disconnects During STT/TTS Processing) --- plugins/elevenlabs/src/tts.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 7553b0776..9cc1afd60 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -870,6 +870,18 @@ export class SynthesizeStream extends tts.SynthesizeStream { }; await Promise.all([tokenizeInput(), runStream()]); + } catch (err) { + if (err instanceof Error && !err.message.includes('WebSocket closed')) { + if (err.message.includes('Queue is closed')) { + this.#logger.warn( + { err }, + 'Queue closed during transcript processing (expected during disconnect)', + ); + return; + } + } + this.#logger.error({ err }, 'Error in ElevenLabs run()'); + throw err; } finally { if (this.#connection) { try { From 7d92dd12deba1135c1a8d7d8a8aa4e6860e56746 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 26 Nov 2025 21:44:18 +0100 Subject: [PATCH 8/8] implement synthesize method --- plugins/elevenlabs/src/tts.ts | 117 +++++++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 2 deletions(-) diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 9cc1afd60..7161ac315 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -785,8 +785,8 @@ export class TTS extends tts.TTS { return this.#connection!; } - synthesize(): tts.ChunkedStream { - throw new Error('Chunked responses are not supported on ElevenLabs TTS'); + synthesize(text: string): tts.ChunkedStream { + return new ChunkedStream(this, this.#opts, text); } stream(): tts.SynthesizeStream { @@ -1056,6 +1056,119 @@ export class SynthesizeStream extends tts.SynthesizeStream { } } +/** + * ChunkedStream - Synthesize using the HTTP chunked API endpoint + * This is a non-streaming approach that sends the full text and receives audio in chunks + */ +export class ChunkedStream extends tts.ChunkedStream { + #opts: TTSOptions; + #text: string; + #logger = log(); + label = 'elevenlabs.ChunkedStream'; + + constructor(tts: TTS, opts: TTSOptions, text: string) { + super(text, tts); + this.#opts = opts; + this.#text = text; + } + + protected async run() { + const requestId = shortuuid(); + const bstream = new AudioByteStream(sampleRateFromFormat(this.#opts.encoding), 1); + + const voiceSettings = this.#opts.voice.settings || undefined; + + try { + const url = buildSynthesizeUrl(this.#opts); + const response = await fetch(url, { + method: 'POST', + headers: { + [AUTHORIZATION_HEADER]: this.#opts.apiKey!, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + text: this.#text, + model_id: this.#opts.modelID, + voice_settings: voiceSettings, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ElevenLabs API error ${response.status}: ${errorText}`); + } + + const contentType = response.headers.get('content-type') || ''; + if (!contentType.startsWith('audio/')) { + const content = await response.text(); + throw new Error(`ElevenLabs returned non-audio data: ${content}`); + } + + if (!response.body) { + throw new Error('Response body is null'); + } + + const reader = response.body.getReader(); + let lastFrame: AudioFrame | undefined; + + const sendLastFrame = (final: boolean) => { + if (lastFrame) { + this.queue.put({ + requestId, + segmentId: requestId, + frame: lastFrame, + final, + }); + lastFrame = undefined; + } + }; + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + // Convert Uint8Array to Int8Array for AudioByteStream + const int8Data = new Int8Array(value.buffer, value.byteOffset, value.byteLength); + for (const frame of bstream.write(int8Data)) { + sendLastFrame(false); + lastFrame = frame; + } + } + + // Flush remaining frames + for (const frame of bstream.flush()) { + sendLastFrame(false); + lastFrame = frame; + } + + sendLastFrame(true); + } catch (error) { + this.#logger.error({ error }, 'Error in ElevenLabs ChunkedStream'); + throw error; + } + } +} + +/** + * Build the URL for the HTTP synthesize endpoint + */ +function buildSynthesizeUrl(opts: TTSOptions): string { + const baseURL = opts.baseURL.replace(/\/$/, ''); + const voiceId = opts.voice.id; + const params: string[] = []; + + params.push(`model_id=${opts.modelID}`); + params.push(`output_format=${opts.encoding}`); + + if (opts.streamingLatency !== undefined) { + params.push(`optimize_streaming_latency=${opts.streamingLatency}`); + } + + return `${baseURL}/text-to-speech/${voiceId}/stream?${params.join('&')}`; +} + const sampleRateFromFormat = (encoding: TTSEncoding): number => { return Number(encoding.split('_')[1]); };