diff --git a/.changeset/sweet-suits-slide.md b/.changeset/sweet-suits-slide.md new file mode 100644 index 000000000..3371a27f2 --- /dev/null +++ b/.changeset/sweet-suits-slide.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents-plugin-elevenlabs': patch +--- + +multi stream websocket connection for ElevenLabs TTS diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index f3aa6e874..7161ac315 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -4,17 +4,22 @@ import { AsyncIterableQueue, AudioByteStream, + Future, + Task, log, shortuuid, + stream, tokenize, tts, + waitForAbort, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; -import { URL } from 'node:url'; +import type { ReadableStream } from 'node:stream/web'; import { type RawData, WebSocket } from 'ws'; import type { TTSEncoding, TTSModels } from './models.js'; const DEFAULT_INACTIVITY_TIMEOUT = 300; +const AUTHORIZATION_HEADER = 'xi-api-key'; type Voice = { id: string; @@ -27,6 +32,7 @@ type VoiceSettings = { stability: number; // 0..1 similarity_boost: number; // 0..1 style?: number; // 0..1 + speed?: number; // 0.8..1.2 use_speaker_boost: boolean; }; @@ -43,7 +49,12 @@ const DEFAULT_VOICE: Voice = { }; const API_BASE_URL_V1 = 'https://api.elevenlabs.io/v1/'; -const AUTHORIZATION_HEADER = 'xi-api-key'; + +interface TimedString { + text: string; + startTime: number; + endTime: number; +} export interface TTSOptions { apiKey?: string; @@ -58,6 +69,7 @@ export interface TTSOptions { enableSsmlParsing: boolean; inactivityTimeout: number; syncAlignment: boolean; + preferredAlignment: 'normalized' | 'original'; autoMode?: boolean; } @@ -70,10 +82,615 @@ const defaultTTSOptionsBase = { enableSsmlParsing: false, inactivityTimeout: DEFAULT_INACTIVITY_TIMEOUT, syncAlignment: true, + preferredAlignment: 'normalized' as const, }; +// ============================================================================ +// Helper Functions +// ============================================================================ + +/** + * Convert character-level timing to word-level timing + * Returns timed words and the remaining text buffer + */ +function toTimedWords( + text: string, + startTimesMs: number[], + durationsMs: number[], + flush: boolean = false, +): [TimedString[], string] { + if (!text || startTimesMs.length === 0 || durationsMs.length === 0) { + return [[], '']; + } + + const { splitWords } = tokenize.basic; + + // Calculate timestamps (N+1) + const lastStartTime = startTimesMs[startTimesMs.length - 1]; + const lastDuration = durationsMs[durationsMs.length - 1]; + if (lastStartTime === undefined || lastDuration === undefined) { + return [[], text]; + } + const timestamps = [...startTimesMs, lastStartTime + lastDuration]; + + // Split text into words + const words = splitWords(text, false); + const timedWords: TimedString[] = []; + + if (words.length === 0) { + return [[], text]; + } + + const startIndices = words.map((w) => w[1]); + let end = 0; + + // We don't know if the last word is complete, always leave it as remaining + for (let i = 0; i < startIndices.length - 1; i++) { + const start = startIndices[i]; + const nextStart = startIndices[i + 1]; + if (start === undefined || nextStart === undefined) continue; + end = nextStart; + const startT = timestamps[start]; + const endT = timestamps[end]; + if (startT === undefined || endT === undefined) continue; + timedWords.push({ + text: text.substring(start, end), + startTime: startT / 1000, + endTime: endT / 1000, + }); + } + + if (flush && startIndices.length > 0) { + const start = startIndices[startIndices.length - 1]; + if (start !== undefined) { + end = text.length; + const startT = timestamps[start]; + const endT = timestamps[timestamps.length - 1]; + if (startT !== undefined && endT !== undefined) { + timedWords.push({ + text: text.substring(start, end), + startTime: startT / 1000, + endTime: endT / 1000, + }); + } + } + } else if (startIndices.length > 0) { + const lastStart = startIndices[startIndices.length - 1]; + if (lastStart !== undefined) { + end = lastStart; + } + } + + return [timedWords, text.substring(end)]; +} + +// ============================================================================ +// WebSocket Connection Manager - Manages persistent connection with multi-stream support +// ============================================================================ + +interface StreamContext { + contextId: string; + audioChannel: stream.StreamChannel; + transcriptChannel: stream.StreamChannel; + eos: Future; + timeoutTimer: NodeJS.Timeout | null; + timeoutSeconds: number; + textBuffer: string; + startTimesMs: number[]; + durationsMs: number[]; +} + +interface SynthesizeContent { + type: 'synthesize'; + contextId: string; + text: string; + flush: boolean; +} + +interface CloseContext { + type: 'close'; + contextId: string; +} + +type QueueMessage = SynthesizeContent | CloseContext; + +/** + * Manages a single persistent WebSocket connection for multi-stream TTS. + * Allows multiple synthesize requests to share one connection via context IDs. + */ +class WebSocketManager { + private ws: WebSocket | null = null; + private opts: TTSOptions; + private logger = log(); + private inputChannel = stream.createStreamChannel(); + private contextData = new Map(); + private activeContexts = new Set(); + private sendTask: Task | null = null; + private recvTask: Task | null = null; + private keepaliveInterval: NodeJS.Timeout | null = null; + private closed = false; + private isCurrent = true; + + constructor(opts: TTSOptions) { + this.opts = opts; + } + + async connect(): Promise { + if (this.ws || this.closed) { + return; + } + + const url = this.buildMultiStreamUrl(); + const headers = { + [AUTHORIZATION_HEADER]: this.opts.apiKey!, + }; + + this.ws = new WebSocket(url, { headers }); + + // Wait for connection to open + const connFut = new Future(); + + if (!this.ws) { + throw new Error('WebSocket not initialized'); + } + + this.ws.once('open', () => connFut.resolve()); + this.ws.once('error', (error) => connFut.reject(error)); + this.ws.once('close', (code) => connFut.reject(new Error(`WebSocket returned ${code}`))); + + await connFut.await; + + // Start keepalive ping + this.keepaliveInterval = setInterval(() => { + try { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.ping(); + } + } catch { + if (this.keepaliveInterval) { + clearInterval(this.keepaliveInterval); + this.keepaliveInterval = null; + } + } + }, 5000); + + // Start send and recv loops + const abortController = new AbortController(); + this.sendTask = Task.from( + async (controller) => await this.sendLoop(controller), + abortController, + ); + this.recvTask = Task.from( + async (controller) => await this.recvLoop(controller), + abortController, + ); + } + + registerContext(contextId: string, timeoutSeconds: number = 30): void { + if (!this.contextData.has(contextId)) { + this.contextData.set(contextId, { + contextId, + audioChannel: stream.createStreamChannel(), + transcriptChannel: stream.createStreamChannel(), + eos: new Future(), + timeoutTimer: null, + timeoutSeconds, + textBuffer: '', + startTimesMs: [], + durationsMs: [], + }); + } + } + + sendContent(contextId: string, text: string, flush: boolean = false): void { + if (this.closed || !this.ws || this.ws.readyState !== 1) { + throw new Error('WebSocket connection is closed'); + } + + this.inputChannel.write({ + type: 'synthesize', + contextId, + text, + flush, + }); + } + + closeContext(contextId: string): void { + if (this.closed || !this.ws || this.ws.readyState !== 1) { + throw new Error('WebSocket connection is closed'); + } + + this.inputChannel.write({ + type: 'close', + contextId, + }); + } + + getContextAudioStream(contextId: string): ReadableStream | null { + return this.contextData.get(contextId)?.audioChannel.stream() ?? null; + } + + getContextTranscriptStream(contextId: string): ReadableStream | null { + return this.contextData.get(contextId)?.transcriptChannel.stream() ?? null; + } + + getContextEOSPromise(contextId: string): Promise | null { + return this.contextData.get(contextId)?.eos.await ?? null; + } + + markNonCurrent(): void { + this.isCurrent = false; + } + + get isClosed(): boolean { + return this.closed; + } + + private cleanupContext(contextId: string): void { + const ctx = this.contextData.get(contextId); + if (ctx) { + if (ctx.timeoutTimer) { + clearTimeout(ctx.timeoutTimer); + } + ctx.audioChannel.close(); + ctx.transcriptChannel.close(); + } + this.contextData.delete(contextId); + this.activeContexts.delete(contextId); + } + + private startTimeoutTimer(contextId: string): void { + const ctx = this.contextData.get(contextId); + if (!ctx || ctx.timeoutTimer) { + return; + } + + ctx.timeoutTimer = setTimeout(() => { + this.logger.error( + { contextId }, + `TTS: Context timed out after ${ctx.timeoutSeconds} seconds`, + ); + ctx.eos.reject(new Error(`TTS timed out after ${ctx.timeoutSeconds} seconds`)); + this.cleanupContext(contextId); + }, ctx.timeoutSeconds * 1000); + } + + async close(): Promise { + if (this.closed) { + return; + } + + this.closed = true; + await this.inputChannel.close(); + + // Clear all timeout timers + for (const ctx of this.contextData.values()) { + if (ctx.timeoutTimer) { + clearTimeout(ctx.timeoutTimer); + } + } + + this.contextData.clear(); + this.activeContexts.clear(); + + if (this.keepaliveInterval) { + clearInterval(this.keepaliveInterval); + this.keepaliveInterval = null; + } + + if (this.ws) { + this.ws.removeAllListeners(); + this.ws.close(); + this.ws = null; + } + + if (this.sendTask) { + try { + this.sendTask.cancel(); + await this.sendTask.result; + } catch { + // Expected when queue closes + } + } + + if (this.recvTask) { + try { + this.recvTask.cancel(); + await this.recvTask.result; + } catch { + // Expected when connection closes + } + } + } + + private buildMultiStreamUrl(): string { + const baseURL = this.opts.baseURL + .replace('https://', 'wss://') + .replace('http://', 'ws://') + .replace(/\/$/, ''); + + const voiceId = this.opts.voice.id; + let urlStr = `${baseURL}/text-to-speech/${voiceId}/multi-stream-input?`; + + const params: string[] = []; + params.push(`model_id=${this.opts.modelID}`); + params.push(`output_format=${this.opts.encoding}`); + params.push(`enable_ssml_parsing=${this.opts.enableSsmlParsing}`); + params.push(`sync_alignment=${this.opts.syncAlignment}`); + params.push(`inactivity_timeout=${this.opts.inactivityTimeout}`); + + if (this.opts.streamingLatency !== undefined) { + params.push(`optimize_streaming_latency=${this.opts.streamingLatency}`); + } + + if (this.opts.autoMode !== undefined) { + params.push(`auto_mode=${this.opts.autoMode}`); + } + + if (this.opts.languageCode) { + params.push(`language_code=${this.opts.languageCode}`); + } + + urlStr += params.join('&'); + return urlStr; + } + + private async sendLoop(controller: AbortController): Promise { + const reader = this.inputChannel.stream().getReader(); + try { + while (!controller.signal.aborted) { + const result = await Promise.race([reader.read(), waitForAbort(controller.signal)]); + + if (result === undefined) return; // aborted + if (result.done) { + controller.abort(); + break; + } + + const msg = result.value; + + if (!this.ws || this.ws.readyState !== 1) { + break; + } + + if (msg.type === 'synthesize') { + const isNewContext = !this.activeContexts.has(msg.contextId); + + // If not current and new context, ignore (connection is draining) + if (!this.isCurrent && isNewContext) { + continue; + } + + if (isNewContext) { + const voiceSettings = this.opts.voice.settings || {}; + const initPkt = { + text: ' ', + voice_settings: voiceSettings, + context_id: msg.contextId, + ...(this.opts.chunkLengthSchedule && { + generation_config: { + chunk_length_schedule: this.opts.chunkLengthSchedule, + }, + }), + }; + + this.ws.send(JSON.stringify(initPkt)); + this.activeContexts.add(msg.contextId); + } + + const pkt: { text: string; context_id: string; flush?: boolean } = { + text: msg.text, + context_id: msg.contextId, + }; + if (msg.flush) { + pkt.flush = true; + } + + // Start timeout timer for this context + this.startTimeoutTimer(msg.contextId); + + this.ws.send(JSON.stringify(pkt)); + } else if (msg.type === 'close') { + if (this.activeContexts.has(msg.contextId)) { + const closePkt = { + context_id: msg.contextId, + close_context: true, + }; + this.ws.send(JSON.stringify(closePkt)); + this.activeContexts.delete(msg.contextId); + } + } else { + this.logger.error(`TTS: Unknown msg type: ${msg}`); + } + } + } catch (error) { + this.logger.error({ error }, 'TTS: Error in send loop'); + } finally { + reader.releaseLock(); + if (!this.closed) { + await this.close(); + } + } + } + + private async recvLoop(controller: AbortController): Promise { + try { + const listenMessage = new Promise((resolve, reject) => { + if (!this.ws) { + reject(new Error('WebSocket not available')); + return; + } + + this.ws.on('message', (msg: RawData) => { + if (controller.signal.aborted) { + return; + } + + try { + const data = JSON.parse(msg.toString()) as Record; + const contextId = (data.contextId || data.context_id) as string | undefined; + + if (!contextId || !this.contextData.has(contextId)) { + return; + } + + const context = this.contextData.get(contextId)!; + + this.logger.debug({ data }, 'TTS: Incoming message'); + + if (data.error) { + this.logger.error({ contextId, error: data.error }, 'TTS: ElevenLabs error'); + this.cleanupContext(contextId); + return; + } + + // Process alignment data if available + const alignment = + this.opts.preferredAlignment === 'normalized' + ? data.normalizedAlignment + : data.alignment; + + if (alignment && typeof alignment === 'object') { + const alignmentObj = alignment as { + chars?: string[]; + charStartTimesMs?: number[]; + charsStartTimesMs?: number[]; + charDurationsMs?: number[]; + charsDurationsMs?: number[]; + }; + + const chars = alignmentObj.chars; + const starts = alignmentObj.charStartTimesMs || alignmentObj.charsStartTimesMs; + const durs = alignmentObj.charDurationsMs || alignmentObj.charsDurationsMs; + + if ( + chars && + starts && + durs && + chars.length === durs.length && + starts.length === durs.length + ) { + context.textBuffer += chars.join(''); + + // Handle multi-character items in chars array + for (let i = 0; i < chars.length; i++) { + const char = chars[i]; + const start = starts[i]; + const dur = durs[i]; + + if (char === undefined || start === undefined || dur === undefined) { + continue; + } + + if (char.length > 1) { + // Add padding for multi-character items + for (let j = 0; j < char.length - 1; j++) { + context.startTimesMs.push(start); + context.durationsMs.push(0); + } + } + context.startTimesMs.push(start); + context.durationsMs.push(dur); + } + + // Convert to timed words + const [timedWords, remainingText] = toTimedWords( + context.textBuffer, + context.startTimesMs, + context.durationsMs, + ); + + if (timedWords.length > 0) { + context.transcriptChannel.write(timedWords); + } + + // Update buffers with remaining text + context.textBuffer = remainingText; + context.startTimesMs = context.startTimesMs.slice(-remainingText.length); + context.durationsMs = context.durationsMs.slice(-remainingText.length); + } + } + + if (data.audio) { + const audioBuffer = Buffer.from(data.audio as string, 'base64'); + const audioArray = new Int8Array(audioBuffer); + context.audioChannel.write(audioArray); + + // Cancel timeout when audio is received + if (context.timeoutTimer) { + clearTimeout(context.timeoutTimer); + context.timeoutTimer = null; + } + } + + if (data.isFinal) { + // Flush remaining text buffer + if (context.textBuffer.length > 0) { + const [timedWords] = toTimedWords( + context.textBuffer, + context.startTimesMs, + context.durationsMs, + true, + ); + if (timedWords.length > 0) { + context.transcriptChannel.write(timedWords); + } + } + + context.eos.resolve(); + this.cleanupContext(contextId); + + if (!this.isCurrent && this.activeContexts.size === 0) { + this.logger.debug('TTS: No active contexts, shutting down'); + resolve(); + } + } + + if (this.closed) { + resolve(); + } + } catch (parseError) { + this.logger.warn({ parseError }, 'TTS: Failed to parse message'); + } + }); + + this.ws.once('close', (code, reason) => { + if (!this.closed) { + this.logger.error(`TTS: WebSocket closed unexpectedly with code ${code}: ${reason}`); + reject(new Error('WebSocket closed')); + } else { + resolve(); + } + }); + + this.ws.once('error', (error) => { + this.logger.error({ error }, 'TTS: WebSocket error'); + reject(error); + }); + }); + + await Promise.race([listenMessage, waitForAbort(controller.signal)]); + } catch (error) { + this.logger.error({ error }, 'TTS: Recv loop error'); + for (const context of this.contextData.values()) { + context.eos.reject(error as Error); + } + } finally { + if (!this.closed) { + await this.close(); + } + } + } +} + +// ============================================================================ +// TTS Implementation +// ============================================================================ + export class TTS extends tts.TTS { #opts: TTSOptions; + #connection: WebSocketManager | null = null; + #connectionLock: Promise | null = null; label = 'elevenlabs.TTS'; constructor(opts: Partial = {}) { @@ -136,158 +753,173 @@ export class TTS extends tts.TTS { }); } - synthesize(): tts.ChunkedStream { - throw new Error('Chunked responses are not supported on ElevenLabs TTS'); + async getCurrentConnection(): Promise { + // Wait for any ongoing connection attempt + if (this.#connectionLock) { + await this.#connectionLock; + if (this.#connection && !this.#connection.isClosed) { + return this.#connection; + } + } + + // Create new lock for this connection attempt + const newConnectionLock = (async () => { + // Mark old connection as non-current if it exists + if (this.#connection && !this.#connection.isClosed) { + this.#connection.markNonCurrent(); + } + + // Create and connect new manager + const manager = new WebSocketManager(this.#opts); + await manager.connect(); + this.#connection = manager; + })(); + + this.#connectionLock = newConnectionLock; + try { + await newConnectionLock; + } finally { + this.#connectionLock = null; + } + + return this.#connection!; + } + + synthesize(text: string): tts.ChunkedStream { + return new ChunkedStream(this, this.#opts, text); } stream(): tts.SynthesizeStream { return new SynthesizeStream(this, this.#opts); } + + async aclose(): Promise { + if (this.#connection) { + await this.#connection.close(); + this.#connection = null; + } + } } export class SynthesizeStream extends tts.SynthesizeStream { #opts: TTSOptions; #logger = log(); + #tts: TTS; + #contextId: string; + #connection: WebSocketManager | null = null; label = 'elevenlabs.SynthesizeStream'; - readonly streamURL: URL; constructor(tts: TTS, opts: TTSOptions) { super(tts); + this.#tts = tts; this.#opts = opts; - this.closed = false; - - // add trailing slash to URL if needed - const baseURL = opts.baseURL + (opts.baseURL.endsWith('/') ? '' : '/'); - - this.streamURL = new URL(`text-to-speech/${opts.voice.id}/stream-input`, baseURL); - const params = { - model_id: opts.modelID, - output_format: opts.encoding, - enable_ssml_parsing: `${opts.enableSsmlParsing}`, - sync_alignment: `${opts.syncAlignment}`, - ...(opts.autoMode !== undefined && { auto_mode: `${opts.autoMode}` }), - ...(opts.languageCode && { language_code: opts.languageCode }), - ...(opts.inactivityTimeout && { inactivity_timeout: `${opts.inactivityTimeout}` }), - ...(opts.streamingLatency && { optimize_streaming_latency: `${opts.streamingLatency}` }), - }; - Object.entries(params).forEach(([k, v]) => this.streamURL.searchParams.append(k, v)); - this.streamURL.protocol = this.streamURL.protocol.replace('http', 'ws'); + this.#contextId = shortuuid(); } protected async run() { - const segments = new AsyncIterableQueue(); + try { + // Get persistent connection + this.#connection = await this.#tts.getCurrentConnection(); + this.#connection.registerContext(this.#contextId); - const tokenizeInput = async () => { - let stream: tokenize.WordStream | tokenize.SentenceStream | null = null; - for await (const text of this.input) { - if (this.abortController.signal.aborted) { - break; - } - if (text === SynthesizeStream.FLUSH_SENTINEL) { - stream?.endInput(); - stream = null; - } else { - if (!stream) { - stream = this.#opts.wordTokenizer.stream(); - segments.put(stream); + const segments = new AsyncIterableQueue(); + + const tokenizeInput = async () => { + let stream: tokenize.WordStream | tokenize.SentenceStream | null = null; + + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + this.input.next(), + waitForAbort(this.abortController.signal), + ]); + + if (result === undefined) break; // aborted + if (result.done) { + break; } - stream.pushText(text); - } - } - segments.close(); - }; - const runStream = async () => { - for await (const stream of segments) { - if (this.abortController.signal.aborted) { - break; + if (result.value === SynthesizeStream.FLUSH_SENTINEL) { + stream?.endInput(); + stream = null; + } else { + if (!stream) { + stream = this.#opts.wordTokenizer.stream(); + segments.put(stream); + } + stream.pushText(result.value); + } } - await this.#runWS(stream); - this.queue.put(SynthesizeStream.END_OF_STREAM); - } - }; - await Promise.all([tokenizeInput(), runStream()]); - } + segments.close(); + }; - async #runWS(stream: tokenize.WordStream | tokenize.SentenceStream, maxRetry = 3) { - let retries = 0; - let ws: WebSocket; - while (true) { - ws = new WebSocket(this.streamURL, { - headers: { [AUTHORIZATION_HEADER]: this.#opts.apiKey }, - }); + const runStream = async () => { + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + segments.next(), + waitForAbort(this.abortController.signal), + ]); - ws.on('error', (error) => { - this.abortController.abort(); - this.#logger.error({ error }, 'Error connecting to ElevenLabs'); - }); + if (result === undefined) break; // aborted + if (result.done) { + break; + } - try { - await new Promise((resolve, reject) => { - ws.on('open', resolve); - ws.on('error', (error) => reject(error)); - ws.on('close', (code) => reject(`WebSocket returned ${code}`)); - }); - break; - } catch (e) { - if (retries >= maxRetry) { - throw new Error(`failed to connect to ElevenLabs after ${retries} attempts: ${e}`); + await this.runSynthesis(result.value); } + }; - const delay = Math.min(retries * 5, 5); - retries++; - - this.#logger.warn( - `failed to connect to ElevenLabs, retrying in ${delay} seconds: ${e} (${retries}/${maxRetry})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay * 1000)); + await Promise.all([tokenizeInput(), runStream()]); + } catch (err) { + if (err instanceof Error && !err.message.includes('WebSocket closed')) { + if (err.message.includes('Queue is closed')) { + this.#logger.warn( + { err }, + 'Queue closed during transcript processing (expected during disconnect)', + ); + return; + } + } + this.#logger.error({ err }, 'Error in ElevenLabs run()'); + throw err; + } finally { + if (this.#connection) { + try { + this.#connection.closeContext(this.#contextId); + } catch { + // Connection may be closed + } } } + } - const requestId = shortuuid(); - const segmentId = shortuuid(); - - // simple helper to make sure what we send to ws.send - const wsSend = (data: { - // (SynthesizeContent from python) - text: string; - // setting flush somehow never finishes the current speech generation - // https://github.com/livekit/agents-js/pull/820#issuecomment-3517138706 - // flush?: boolean; - // initialization - voice_settings?: VoiceSettings; - generation_config?: { - chunk_length_schedule: number[]; - }; - }) => { - ws.send(JSON.stringify(data)); - }; + private async runSynthesis(stream: tokenize.WordStream | tokenize.SentenceStream): Promise { + if (!this.#connection) { + throw new Error('Connection not established'); + } - wsSend({ - text: ' ', - voice_settings: this.#opts.voice.settings, - ...(this.#opts.chunkLengthSchedule && { - generation_config: { - chunk_length_schedule: this.#opts.chunkLengthSchedule, - }, - }), - }); - let eosSent = false; + const bstream = new AudioByteStream(sampleRateFromFormat(this.#opts.encoding), 1); const sendTask = async () => { // Determine if we should flush on each chunk (sentence) - /*const flushOnChunk = + const flushOnChunk = this.#opts.wordTokenizer instanceof tokenize.SentenceTokenizer && this.#opts.autoMode !== undefined && - this.#opts.autoMode;*/ + this.#opts.autoMode; let xmlContent: string[] = []; - for await (const data of stream) { - if (this.abortController.signal.aborted) { + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + stream.next(), + waitForAbort(this.abortController.signal), + ]); + + if (result === undefined) break; // aborted + if (result.done) { break; } - let text = data.token; + + let text = result.value.token; if ((this.#opts.enableSsmlParsing && text.startsWith(' { if (lastFrame) { - this.queue.put({ requestId, segmentId, frame: lastFrame, final }); + this.queue.put({ + requestId: this.#contextId, + segmentId, + frame: lastFrame, + deltaText: lastDeltaText, + final, + }); lastFrame = undefined; + lastDeltaText = undefined; } }; const listenTask = async () => { - let finalReceived = false; - const bstream = new AudioByteStream(sampleRateFromFormat(this.#opts.encoding), 1); - while (!this.closed && !this.abortController.signal.aborted) { + const audioStream = this.#connection!.getContextAudioStream(this.#contextId); + if (!audioStream) { + return; + } + + const transcriptStream = this.#connection!.getContextTranscriptStream(this.#contextId); + if (!transcriptStream) { + return; + } + + const eosPromise = this.#connection!.getContextEOSPromise(this.#contextId); + if (!eosPromise) { + return; + } + + // Process audio and transcript as they arrive, until EOS + const processAudio = async () => { + const reader = audioStream.getReader(); try { - await new Promise((resolve, reject) => { - ws.removeAllListeners(); - ws.on('message', (data) => resolve(data)); - ws.on('close', (code, reason) => { - if (!eosSent) { - this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - } - if (!finalReceived) { - reject(new Error('WebSocket closed')); - } - }); - }).then((msg) => { - const json = JSON.parse(msg.toString()); - // remove the "audio" field from the json object when printing - if ('audio' in json && json.audio !== null) { - const data = Buffer.from(json.audio, 'base64'); - for (const frame of bstream.write( - data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength), - )) { - sendLastFrame(segmentId, false); - lastFrame = frame; - } - } else if (json.isFinal) { - finalReceived = true; - for (const frame of bstream.flush()) { - sendLastFrame(segmentId, false); - lastFrame = frame; - } - sendLastFrame(segmentId, true); - this.queue.put(SynthesizeStream.END_OF_STREAM); + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + reader.read(), + waitForAbort(this.abortController.signal), + ]); + if (result === undefined) return; // aborted + if (result.done) { + break; + } - if (segmentId === requestId || this.abortController.signal.aborted) { - ws.close(); - return; - } + for (const frame of bstream.write(result.value)) { + sendLastFrame(this.#contextId, false); + lastFrame = frame; } - }); - } catch (err) { - // skip log error for normal websocket close - if (err instanceof Error && !err.message.includes('WebSocket closed')) { - if (err.message.includes('Queue is closed')) { - this.#logger.warn( - { err }, - 'Queue closed during transcript processing (expected during disconnect)', - ); - } else { - this.#logger.error({ err }, 'Error in listenTask from ElevenLabs WebSocket'); + } + } finally { + reader.releaseLock(); + } + }; + + const processTranscript = async () => { + const reader = transcriptStream.getReader(); + try { + while (!this.abortController.signal.aborted) { + const result = await Promise.race([ + reader.read(), + waitForAbort(this.abortController.signal), + ]); + if (result === undefined) return; // aborted + if (result.done) { + break; + } + + // Concatenate timed words into deltaText + // In a more sophisticated implementation, we could track timing + // and associate words with specific frames + const deltaText = result.value.map((w) => w.text).join(''); + if (deltaText) { + lastDeltaText = deltaText; } } - break; + } finally { + reader.releaseLock(); + } + }; + + // Start processing immediately, but catch errors of promises immediately + await new Promise(async (resolve, reject) => { + const audioTask = processAudio().catch(reject); + const transcriptTask = processTranscript().catch(reject); + + try { + // Wait for EOS to be signaled + await eosPromise; + + // Ensure both streams are fully drained after EOS + await Promise.all([audioTask, transcriptTask]); + resolve(); + } catch (err) { + reject(err); } + }); + + // Flush remaining frames + for (const frame of bstream.flush()) { + sendLastFrame(this.#contextId, false); + lastFrame = frame; } + + sendLastFrame(this.#contextId, true); + this.queue.put(SynthesizeStream.END_OF_STREAM); }; await Promise.all([sendTask(), listenTask()]); } } +/** + * ChunkedStream - Synthesize using the HTTP chunked API endpoint + * This is a non-streaming approach that sends the full text and receives audio in chunks + */ +export class ChunkedStream extends tts.ChunkedStream { + #opts: TTSOptions; + #text: string; + #logger = log(); + label = 'elevenlabs.ChunkedStream'; + + constructor(tts: TTS, opts: TTSOptions, text: string) { + super(text, tts); + this.#opts = opts; + this.#text = text; + } + + protected async run() { + const requestId = shortuuid(); + const bstream = new AudioByteStream(sampleRateFromFormat(this.#opts.encoding), 1); + + const voiceSettings = this.#opts.voice.settings || undefined; + + try { + const url = buildSynthesizeUrl(this.#opts); + const response = await fetch(url, { + method: 'POST', + headers: { + [AUTHORIZATION_HEADER]: this.#opts.apiKey!, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + text: this.#text, + model_id: this.#opts.modelID, + voice_settings: voiceSettings, + }), + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ElevenLabs API error ${response.status}: ${errorText}`); + } + + const contentType = response.headers.get('content-type') || ''; + if (!contentType.startsWith('audio/')) { + const content = await response.text(); + throw new Error(`ElevenLabs returned non-audio data: ${content}`); + } + + if (!response.body) { + throw new Error('Response body is null'); + } + + const reader = response.body.getReader(); + let lastFrame: AudioFrame | undefined; + + const sendLastFrame = (final: boolean) => { + if (lastFrame) { + this.queue.put({ + requestId, + segmentId: requestId, + frame: lastFrame, + final, + }); + lastFrame = undefined; + } + }; + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + // Convert Uint8Array to Int8Array for AudioByteStream + const int8Data = new Int8Array(value.buffer, value.byteOffset, value.byteLength); + for (const frame of bstream.write(int8Data)) { + sendLastFrame(false); + lastFrame = frame; + } + } + + // Flush remaining frames + for (const frame of bstream.flush()) { + sendLastFrame(false); + lastFrame = frame; + } + + sendLastFrame(true); + } catch (error) { + this.#logger.error({ error }, 'Error in ElevenLabs ChunkedStream'); + throw error; + } + } +} + +/** + * Build the URL for the HTTP synthesize endpoint + */ +function buildSynthesizeUrl(opts: TTSOptions): string { + const baseURL = opts.baseURL.replace(/\/$/, ''); + const voiceId = opts.voice.id; + const params: string[] = []; + + params.push(`model_id=${opts.modelID}`); + params.push(`output_format=${opts.encoding}`); + + if (opts.streamingLatency !== undefined) { + params.push(`optimize_streaming_latency=${opts.streamingLatency}`); + } + + return `${baseURL}/text-to-speech/${voiceId}/stream?${params.join('&')}`; +} + const sampleRateFromFormat = (encoding: TTSEncoding): number => { return Number(encoding.split('_')[1]); };