Skip to content

Commit aed026c

Browse files
authored
Fix: Race Condition When Participant Disconnects During STT/TTS Processing (#861)
1 parent cd10b4d commit aed026c

File tree

7 files changed

+110
-44
lines changed

7 files changed

+110
-44
lines changed

.changeset/calm-rivers-flow.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@livekit/agents": patch
3+
"@livekit/agents-plugin-cartesia": patch
4+
"@livekit/agents-plugin-deepgram": patch
5+
"@livekit/agents-plugin-elevenlabs": patch
6+
"@livekit/agents-plugin-neuphonic": patch
7+
---
8+
9+
Fix race condition where STT/TTS processing could throw "Queue is closed" error when a participant disconnects. These events are now logged as warnings instead of errors.

agents/src/inference/stt.ts

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -459,52 +459,67 @@ export class SpeechStream<TModel extends STTModels> extends BaseSpeechStream {
459459
}
460460

461461
private processTranscript(data: Record<string, any>, isFinal: boolean) {
462+
// Check if queue is closed to avoid race condition during disconnect
463+
if (this.queue.closed) return;
464+
462465
const requestId = data.request_id ?? this.requestId;
463466
const text = data.transcript ?? '';
464467
const language = data.language ?? this.opts.language ?? 'en';
465468

466469
if (!text && !isFinal) return;
467470

468-
// We'll have a more accurate way of detecting when speech started when we have VAD
469-
if (!this.speaking) {
470-
this.speaking = true;
471-
this.queue.put({ type: SpeechEventType.START_OF_SPEECH });
472-
}
471+
try {
472+
// We'll have a more accurate way of detecting when speech started when we have VAD
473+
if (!this.speaking) {
474+
this.speaking = true;
475+
this.queue.put({ type: SpeechEventType.START_OF_SPEECH });
476+
}
473477

474-
const speechData: SpeechData = {
475-
language,
476-
startTime: data.start ?? 0,
477-
endTime: data.duration ?? 0,
478-
confidence: data.confidence ?? 1.0,
479-
text,
480-
};
478+
const speechData: SpeechData = {
479+
language,
480+
startTime: data.start ?? 0,
481+
endTime: data.duration ?? 0,
482+
confidence: data.confidence ?? 1.0,
483+
text,
484+
};
485+
486+
if (isFinal) {
487+
if (this.speechDuration > 0) {
488+
this.queue.put({
489+
type: SpeechEventType.RECOGNITION_USAGE,
490+
requestId,
491+
recognitionUsage: { audioDuration: this.speechDuration },
492+
});
493+
this.speechDuration = 0;
494+
}
481495

482-
if (isFinal) {
483-
if (this.speechDuration > 0) {
484496
this.queue.put({
485-
type: SpeechEventType.RECOGNITION_USAGE,
497+
type: SpeechEventType.FINAL_TRANSCRIPT,
486498
requestId,
487-
recognitionUsage: { audioDuration: this.speechDuration },
499+
alternatives: [speechData],
488500
});
489-
this.speechDuration = 0;
490-
}
491-
492-
this.queue.put({
493-
type: SpeechEventType.FINAL_TRANSCRIPT,
494-
requestId,
495-
alternatives: [speechData],
496-
});
497501

498-
if (this.speaking) {
499-
this.speaking = false;
500-
this.queue.put({ type: SpeechEventType.END_OF_SPEECH });
502+
if (this.speaking) {
503+
this.speaking = false;
504+
this.queue.put({ type: SpeechEventType.END_OF_SPEECH });
505+
}
506+
} else {
507+
this.queue.put({
508+
type: SpeechEventType.INTERIM_TRANSCRIPT,
509+
requestId,
510+
alternatives: [speechData],
511+
});
512+
}
513+
} catch (e) {
514+
if (e instanceof Error && e.message.includes('Queue is closed')) {
515+
// Expected behavior on disconnect, log as warning
516+
this.#logger.warn(
517+
{ err: e },
518+
'Queue closed during transcript processing (expected during disconnect)',
519+
);
520+
} else {
521+
this.#logger.error({ err: e }, 'Error putting transcript to queue');
501522
}
502-
} else {
503-
this.queue.put({
504-
type: SpeechEventType.INTERIM_TRANSCRIPT,
505-
requestId,
506-
alternatives: [speechData],
507-
});
508523
}
509524
}
510525
}

agents/src/stt/stt.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,18 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
257257

258258
protected async monitorMetrics() {
259259
for await (const event of this.queue) {
260-
this.output.put(event);
260+
if (!this.output.closed) {
261+
try {
262+
this.output.put(event);
263+
} catch (e) {
264+
if (e instanceof Error && e.message.includes('Queue is closed')) {
265+
this.logger.warn(
266+
{ err: e },
267+
'Queue closed during transcript processing (expected during disconnect)',
268+
);
269+
}
270+
}
271+
}
261272
if (event.type !== SpeechEventType.RECOGNITION_USAGE) continue;
262273
const metrics: STTMetrics = {
263274
type: 'stt_metrics',
@@ -270,7 +281,9 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
270281
};
271282
this.#stt.emit('metrics_collected', metrics);
272283
}
273-
this.output.close();
284+
if (!this.output.closed) {
285+
this.output.close();
286+
}
274287
}
275288

276289
protected abstract run(): Promise<void>;
@@ -336,9 +349,9 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
336349

337350
/** Close both the input and output of the STT stream */
338351
close() {
339-
this.input.close();
340-
this.queue.close();
341-
this.output.close();
352+
if (!this.input.closed) this.input.close();
353+
if (!this.queue.closed) this.queue.close();
354+
if (!this.output.closed) this.output.close();
342355
this.closed = true;
343356
}
344357

plugins/cartesia/src/tts.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,14 @@ export class SynthesizeStream extends tts.SynthesizeStream {
305305
} catch (err) {
306306
// skip log error for normal websocket close
307307
if (err instanceof Error && !err.message.includes('WebSocket closed')) {
308-
this.#logger.error({ err }, 'Error in recvTask from Cartesia WebSocket');
308+
if (err.message.includes('Queue is closed')) {
309+
this.#logger.warn(
310+
{ err },
311+
'Queue closed during transcript processing (expected during disconnect)',
312+
);
313+
} else {
314+
this.#logger.error({ err }, 'Error in recvTask from Cartesia WebSocket');
315+
}
309316
}
310317
clearTTSChunkTimeout();
311318
break;

plugins/deepgram/src/stt.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,13 @@ export class SpeechStream extends stt.SpeechStream {
315315
// It's also possible we receive a transcript without a SpeechStarted event.
316316
if (this.#speaking) return;
317317
this.#speaking = true;
318-
this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH });
318+
if (!this.queue.closed) {
319+
try {
320+
this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH });
321+
} catch (e) {
322+
// ignore
323+
}
324+
}
319325
break;
320326
}
321327
// see this page:

plugins/elevenlabs/src/tts.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,10 @@ export class SynthesizeStream extends tts.SynthesizeStream {
344344
const json = JSON.parse(msg.toString());
345345
// remove the "audio" field from the json object when printing
346346
if ('audio' in json && json.audio !== null) {
347-
const data = new Int8Array(Buffer.from(json.audio, 'base64'));
348-
for (const frame of bstream.write(data)) {
347+
const data = Buffer.from(json.audio, 'base64');
348+
for (const frame of bstream.write(
349+
data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength),
350+
)) {
349351
sendLastFrame(segmentId, false);
350352
lastFrame = frame;
351353
}
@@ -367,7 +369,14 @@ export class SynthesizeStream extends tts.SynthesizeStream {
367369
} catch (err) {
368370
// skip log error for normal websocket close
369371
if (err instanceof Error && !err.message.includes('WebSocket closed')) {
370-
this.#logger.error({ err }, 'Error in listenTask from ElevenLabs WebSocket');
372+
if (err.message.includes('Queue is closed')) {
373+
this.#logger.warn(
374+
{ err },
375+
'Queue closed during transcript processing (expected during disconnect)',
376+
);
377+
} else {
378+
this.#logger.error({ err }, 'Error in listenTask from ElevenLabs WebSocket');
379+
}
371380
}
372381
break;
373382
}

plugins/neuphonic/src/tts.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,14 @@ export class SynthesizeStream extends tts.SynthesizeStream {
237237
});
238238
} catch (err) {
239239
if (err instanceof Error && !err.message.includes('WebSocket closed prematurely')) {
240-
this.#logger.error({ err }, 'Error in recvTask from Neuphonic WebSocket');
240+
if (err.message.includes('Queue is closed')) {
241+
this.#logger.warn(
242+
{ err },
243+
'Queue closed during transcript processing (expected during disconnect)',
244+
);
245+
} else {
246+
this.#logger.error({ err }, 'Error in recvTask from Neuphonic WebSocket');
247+
}
241248
}
242249
break;
243250
}

0 commit comments

Comments
 (0)