Skip to content

Commit 2bc9cf5

Browse files
committed
re-emit from last overlap speech
1 parent 1326c24 commit 2bc9cf5

File tree

3 files changed

+78
-24
lines changed

3 files changed

+78
-24
lines changed

livekit-agents/livekit/agents/inference/bargein.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class BargeinEvent:
5959
inference_duration: float = 0.0
6060
"""Time taken to perform the inference, in seconds."""
6161

62+
overlap_speech_started_at: float | None = None
63+
"""Timestamp (in seconds) when the overlap speech started. Useful for emitting held transcripts."""
64+
6265

6366
class BargeinError(BaseModel):
6467
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -247,6 +250,7 @@ def __init__(self, bargein_detector: BargeinDetector, conn_options: APIConnectOp
247250
self._conn_options = conn_options
248251
self._sample_rate = bargein_detector._sample_rate
249252
self._resampler: rtc.AudioResampler | None = None
253+
self._overlap_speech_started_at: float | None = None
250254

251255
@abstractmethod
252256
async def _run(self) -> None: ...
@@ -315,6 +319,7 @@ def start_overlap_speech(self) -> None:
315319
self._check_input_not_ended()
316320
self._check_not_closed()
317321
self._input_ch.send_nowait(self._OverlapSpeechStartedSentinel())
322+
self._overlap_speech_started_at = time.time()
318323

319324
def end_overlap_speech(self) -> None:
320325
"""Mark the end of the overlap speech"""
@@ -494,13 +499,15 @@ async def _send_task() -> None:
494499
timestamp=time.time(),
495500
is_bargein=is_bargein,
496501
inference_duration=inference_duration,
502+
overlap_speech_started_at=self._overlap_speech_started_at,
497503
)
498504
)
499505

500506
if is_bargein:
501507
ev = BargeinEvent(
502508
type=BargeinEventType.BARGEIN,
503509
timestamp=time.time(),
510+
overlap_speech_started_at=self._overlap_speech_started_at,
504511
)
505512
self._event_ch.send_nowait(ev)
506513
self._bargein_detector.emit("bargein_detected", ev)
@@ -696,13 +703,15 @@ async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
696703
timestamp=time.time(),
697704
is_bargein=is_bargein_result,
698705
inference_duration=inference_duration,
706+
overlap_speech_started_at=self._overlap_speech_started_at,
699707
)
700708
)
701709
elif msg_type == "bargein_detected":
702710
logger.debug("bargein detected")
703711
ev = BargeinEvent(
704712
type=BargeinEventType.BARGEIN,
705713
timestamp=time.time(),
714+
overlap_speech_started_at=self._overlap_speech_started_at,
706715
)
707716
self._event_ch.send_nowait(ev)
708717
self._bargein_detector.emit("bargein_detected", ev)

livekit-agents/livekit/agents/voice/agent_activity.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,15 +1281,23 @@ def on_vad_inference_done(self, ev: vad.VADEvent) -> None:
12811281
self._interrupt_by_audio_activity()
12821282

12831283
def on_bargein_detected(self, ev: inference.BargeinEvent) -> None:
1284-
logger.debug("bargein detected", extra={"timestamp": ev.timestamp})
1284+
logger.debug(
1285+
"bargein detected",
1286+
extra={
1287+
"timestamp": ev.timestamp,
1288+
"overlap_speech_started_at": ev.overlap_speech_started_at,
1289+
},
1290+
)
12851291
# restore interruption by audio activity
12861292
self._interruption_by_audio_activity_enabled = self._turn_detection not in (
12871293
"manual",
12881294
"realtime_llm",
12891295
)
12901296
self._interrupt_by_audio_activity()
12911297
if self._audio_recognition:
1292-
self._audio_recognition.end_barge_in_monitoring(ev.timestamp)
1298+
self._audio_recognition.end_barge_in_monitoring(
1299+
ev.overlap_speech_started_at or ev.timestamp
1300+
)
12931301

12941302
def on_bargein_inference_done(self, ev: inference.BargeinEvent) -> None:
12951303
self._interruption_by_audio_activity_enabled = False

livekit-agents/livekit/agents/voice/audio_recognition.py

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,65 @@ def end_barge_in_monitoring(self, ignore_until: float) -> None:
247247
else min(ignore_until, self._ignore_until)
248248
)
249249

250+
# flush held transcripts if possible
251+
task = asyncio.create_task(self._flush_held_transcripts())
252+
task.add_done_callback(lambda _: self._tasks.discard(task))
253+
self._tasks.add(task)
254+
255+
async def _flush_held_transcripts(self) -> None:
256+
"""Flush held transcripts whose *end time* is after the ignore_until timestamp.
257+
258+
If the event has no timestamps, we assume it is the same as the next valid event.
259+
"""
260+
if not self._barge_in_enabled:
261+
return
262+
if not is_given(self._ignore_until):
263+
return
264+
if not self._transcript_buffer:
265+
return
266+
267+
emit_from_index = float("inf")
268+
for i, ev in enumerate(self._transcript_buffer):
269+
if not ev.alternatives:
270+
emit_from_index = min(emit_from_index, i)
271+
continue
272+
# vendor doesn't set timestamps properly, in which case we just return
273+
if ev.alternatives[0].start_time == ev.alternatives[0].end_time == 0:
274+
self._transcript_buffer.clear()
275+
return
276+
277+
if (
278+
ev.alternatives[0].start_time + ev.alternatives[0].end_time + self._input_started_at
279+
< self._ignore_until
280+
):
281+
emit_from_index = float("inf")
282+
else:
283+
emit_from_index = min(emit_from_index, i)
284+
break
285+
286+
# extract events to emit and reset BEFORE iterating
287+
# to prevent recursive calls
288+
events_to_emit = (
289+
self._transcript_buffer[emit_from_index:] if emit_from_index != float("inf") else []
290+
)
291+
self._transcript_buffer.clear()
292+
self._ignore_until = NOT_GIVEN
293+
294+
for ev in events_to_emit:
295+
logger.debug(
296+
"re-emitting held transcript",
297+
extra={
298+
"event": ev.type,
299+
"ignore_until": self._ignore_until if is_given(self._ignore_until) else None,
300+
},
301+
)
302+
await self._on_stt_event(ev)
303+
250304
def _should_hold_stt_event(self, ev: stt.SpeechEvent) -> bool:
251305
"""Test if the event should be held until the ignore_until timestamp."""
252306
if not self._barge_in_enabled:
253307
return False
308+
254309
if self._agent_speaking:
255310
return True
256311

@@ -270,13 +325,11 @@ def _should_hold_stt_event(self, ev: stt.SpeechEvent) -> bool:
270325
# 3. the event is for audio sent before the ignore_until timestamp
271326
and self._input_started_at is not None
272327
and not (ev.alternatives[0].start_time == ev.alternatives[0].end_time == 0)
273-
and ev.alternatives[0].start_time + ev.alternatives[0].end_time
274-
< self._ignore_until - self._input_started_at
328+
and ev.alternatives[0].start_time + ev.alternatives[0].end_time + self._input_started_at
329+
< self._ignore_until
275330
):
276331
return True
277332

278-
# ignore_until expired or we don't have the right timestamp
279-
self._ignore_until = NOT_GIVEN
280333
return False
281334

282335
def push_audio(self, frame: rtc.AudioFrame) -> None:
@@ -484,7 +537,7 @@ async def _on_stt_event(self, ev: stt.SpeechEvent) -> None:
484537
# - hold the event until the ignore_until expires
485538
# - release only relevant events
486539
# - allow RECOGNITION_USAGE to pass through immediately
487-
if ev.type != stt.SpeechEventType.RECOGNITION_USAGE:
540+
if ev.type != stt.SpeechEventType.RECOGNITION_USAGE and self._barge_in_enabled:
488541
if self._should_hold_stt_event(ev):
489542
logger.debug(
490543
"holding event until ignore_until expires",
@@ -498,23 +551,7 @@ async def _on_stt_event(self, ev: stt.SpeechEvent) -> None:
498551
self._transcript_buffer.append(ev)
499552
return
500553
elif self._transcript_buffer:
501-
# emit the preceding sentinel event immediately before this event
502-
# assuming *only one* sentinel event could precede the current event
503-
# ignore if the previous event is not a sentinel event
504-
logger.debug(
505-
"emitting held events",
506-
extra={
507-
"event": ev.type,
508-
"previous_event": self._transcript_buffer[-1].type,
509-
},
510-
)
511-
prev_event = self._transcript_buffer.pop()
512-
self._transcript_buffer.clear()
513-
if prev_event.type in {
514-
stt.SpeechEventType.START_OF_SPEECH,
515-
}:
516-
await self._on_stt_event(prev_event)
517-
554+
await self._flush_held_transcripts()
518555
# no return here to allow the new event to be processed normally
519556

520557
if ev.type == stt.SpeechEventType.FINAL_TRANSCRIPT:

0 commit comments

Comments
 (0)