Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion livekit-agents/livekit/agents/telemetry/trace_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
ATTR_ROOM_NAME = "lk.room_name"
ATTR_SESSION_OPTIONS = "lk.session_options"

# assistant turn
# agent turn
ATTR_AGENT_TURN_ID = "lk.generation_id"
ATTR_AGENT_PARENT_TURN_ID = "lk.parent_generation_id"
ATTR_USER_INPUT = "lk.user_input"
ATTR_INSTRUCTIONS = "lk.instructions"
ATTR_SPEECH_INTERRUPTED = "lk.interrupted"
Expand All @@ -27,6 +29,7 @@
ATTR_RESPONSE_FUNCTION_CALLS = "lk.response.function_calls"

# function tool
ATTR_FUNCTION_TOOL_ID = "lk.function_tool.id"
ATTR_FUNCTION_TOOL_NAME = "lk.function_tool.name"
ATTR_FUNCTION_TOOL_ARGS = "lk.function_tool.arguments"
ATTR_FUNCTION_TOOL_IS_ERROR = "lk.function_tool.is_error"
Expand Down
142 changes: 121 additions & 21 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,33 @@ async def _tts_task(
add_to_chat_ctx: bool,
model_settings: ModelSettings,
) -> None:
with tracer.start_as_current_span(
"agent_turn", context=self._session._root_span_context
) as current_span:
current_span.set_attribute(trace_types.ATTR_AGENT_TURN_ID, speech_handle.generation_id)
if parent_id := speech_handle.parent_generation_id:
current_span.set_attribute(trace_types.ATTR_AGENT_PARENT_TURN_ID, parent_id)
speech_handle._agent_turn_context = otel_context.get_current()

await self._tts_task_impl(
speech_handle=speech_handle,
text=text,
audio=audio,
add_to_chat_ctx=add_to_chat_ctx,
model_settings=model_settings,
)

async def _tts_task_impl(
self,
speech_handle: SpeechHandle,
text: str | AsyncIterable[str],
audio: AsyncIterable[rtc.AudioFrame] | None,
add_to_chat_ctx: bool,
model_settings: ModelSettings,
) -> None:
current_span = trace.get_current_span()
current_span.set_attribute(trace_types.ATTR_SPEECH_ID, speech_handle.id)

tr_output = (
self._session.output.transcription
if self._session.output.transcription_enabled
Expand All @@ -1588,6 +1615,7 @@ async def _tts_task(
speech_handle._clear_authorization()

if speech_handle.interrupted:
current_span.set_attribute(trace_types.ATTR_SPEECH_INTERRUPTED, True)
await utils.aio.cancel_and_wait(wait_for_authorization)
return

Expand All @@ -1607,11 +1635,16 @@ async def _read_text() -> AsyncIterable[str]:
audio_source = _read_text()

tasks: list[asyncio.Task[Any]] = []
started_speaking_at: float | None = None
stopped_speaking_at: float | None = None

def _on_first_frame(_: asyncio.Future[None]) -> None:
nonlocal started_speaking_at
started_speaking_at = time.time()
self._session._update_agent_state("speaking")

audio_out: _AudioOutput | None = None
tts_gen_data: _TTSGenerationData | None = None
if audio_output is not None:
if audio is None:
# generate audio using TTS
Expand Down Expand Up @@ -1664,6 +1697,8 @@ def _on_first_frame(_: asyncio.Future[None]) -> None:
[asyncio.ensure_future(audio_output.wait_for_playout())]
)

stopped_speaking_at = time.time()
current_span.set_attribute(trace_types.ATTR_SPEECH_INTERRUPTED, speech_handle.interrupted)
if speech_handle.interrupted:
await utils.aio.cancel_and_wait(*tasks)

Expand All @@ -1674,30 +1709,40 @@ def _on_first_frame(_: asyncio.Future[None]) -> None:
if tee is not None:
await tee.aclose()

if add_to_chat_ctx:
# use synchronized transcript when available after interruption
forwarded_text = text_out.text if text_out else ""
if speech_handle.interrupted and audio_output is not None:
playback_ev = await audio_output.wait_for_playout()
# use synchronized transcript when available after interruption
forwarded_text = text_out.text if text_out else ""
if speech_handle.interrupted and audio_output is not None:
playback_ev = await audio_output.wait_for_playout()

if audio_out is not None and audio_out.first_frame_fut.done():
if playback_ev.synchronized_transcript is not None:
forwarded_text = playback_ev.synchronized_transcript
else:
forwarded_text = ""
if audio_out is not None and audio_out.first_frame_fut.done():
if playback_ev.synchronized_transcript is not None:
forwarded_text = playback_ev.synchronized_transcript
else:
forwarded_text = ""
current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, forwarded_text)

msg: llm.ChatMessage | None = None
if forwarded_text:
msg = self._agent._chat_ctx.add_message(
role="assistant", content=forwarded_text, interrupted=speech_handle.interrupted
)
speech_handle._item_added([msg])
self._session._conversation_item_added(msg)
if forwarded_text and add_to_chat_ctx:
assistant_metrics: llm.MetricsReport = {}

if tts_gen_data and tts_gen_data.ttfb is not None:
assistant_metrics["tts_node_ttfb"] = tts_gen_data.ttfb

if stopped_speaking_at and started_speaking_at:
assistant_metrics["started_speaking_at"] = started_speaking_at
assistant_metrics["stopped_speaking_at"] = stopped_speaking_at

msg = self._agent._chat_ctx.add_message(
role="assistant",
content=forwarded_text,
interrupted=speech_handle.interrupted,
metrics=assistant_metrics,
)
speech_handle._item_added([msg])
self._session._conversation_item_added(msg)

if self._session.agent_state == "speaking":
self._session._update_agent_state("listening")

@tracer.start_as_current_span("agent_turn")
@utils.log_exceptions(logger=logger)
async def _pipeline_reply_task(
self,
Expand All @@ -1710,6 +1755,37 @@ async def _pipeline_reply_task(
instructions: str | None = None,
_previous_user_metrics: llm.MetricsReport | None = None,
_previous_tools_messages: Sequence[llm.FunctionCall | llm.FunctionCallOutput] | None = None,
) -> None:
with tracer.start_as_current_span(
"agent_turn", context=self._session._root_span_context
) as current_span:
current_span.set_attribute(trace_types.ATTR_AGENT_TURN_ID, speech_handle.generation_id)
if parent_id := speech_handle.parent_generation_id:
current_span.set_attribute(trace_types.ATTR_AGENT_PARENT_TURN_ID, parent_id)
speech_handle._agent_turn_context = otel_context.get_current()

await self._pipeline_reply_task_impl(
speech_handle=speech_handle,
chat_ctx=chat_ctx,
tools=tools,
model_settings=model_settings,
new_message=new_message,
instructions=instructions,
_previous_user_metrics=_previous_user_metrics,
_previous_tools_messages=_previous_tools_messages,
)

async def _pipeline_reply_task_impl(
self,
*,
speech_handle: SpeechHandle,
chat_ctx: llm.ChatContext,
tools: list[llm.FunctionTool | llm.RawFunctionTool],
model_settings: ModelSettings,
new_message: llm.ChatMessage | None = None,
instructions: str | None = None,
_previous_user_metrics: llm.MetricsReport | None = None,
_previous_tools_messages: Sequence[llm.FunctionCall | llm.FunctionCallOutput] | None = None,
) -> None:
from .agent import ModelSettings

Expand Down Expand Up @@ -2109,7 +2185,6 @@ async def _realtime_reply_task(
):
self._rt_session.update_options(tool_choice=ori_tool_choice)

@tracer.start_as_current_span("agent_turn")
@utils.log_exceptions(logger=logger)
async def _realtime_generation_task(
self,
Expand All @@ -2118,6 +2193,29 @@ async def _realtime_generation_task(
generation_ev: llm.GenerationCreatedEvent,
model_settings: ModelSettings,
instructions: str | None = None,
) -> None:
with tracer.start_as_current_span(
"agent_turn", context=self._session._root_span_context
) as current_span:
current_span.set_attribute(trace_types.ATTR_AGENT_TURN_ID, speech_handle.generation_id)
if parent_id := speech_handle.parent_generation_id:
current_span.set_attribute(trace_types.ATTR_AGENT_PARENT_TURN_ID, parent_id)
speech_handle._agent_turn_context = otel_context.get_current()

await self._realtime_generation_task_impl(
speech_handle=speech_handle,
generation_ev=generation_ev,
model_settings=model_settings,
instructions=instructions,
)

async def _realtime_generation_task_impl(
self,
*,
speech_handle: SpeechHandle,
generation_ev: llm.GenerationCreatedEvent,
model_settings: ModelSettings,
instructions: str | None = None,
) -> None:
current_span = trace.get_current_span()
current_span.set_attribute(trace_types.ATTR_SPEECH_ID, speech_handle.id)
Expand Down Expand Up @@ -2435,7 +2533,7 @@ def _create_assistant_message(
finally:
self._background_speeches.discard(speech_handle)

# important: no agent ouput should be used after this point
# important: no agent output should be used after this point

if len(tool_output.output) > 0:
speech_handle._num_steps += 1
Expand Down Expand Up @@ -2555,7 +2653,9 @@ def _on_false_interruption() -> None:
and audio_output.can_pause
and not self._paused_speech.done()
):
self._session._update_agent_state("speaking")
self._session._update_agent_state(
"speaking", otel_context=self._paused_speech._agent_turn_context
)
audio_output.resume()
resumed = True
logger.debug("resumed false interrupted speech", extra={"timeout": timeout})
Expand Down
8 changes: 6 additions & 2 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,9 @@ def _cancel_user_away_timer(self) -> None:
self._user_away_timer.cancel()
self._user_away_timer = None

def _update_agent_state(self, state: AgentState) -> None:
def _update_agent_state(
self, state: AgentState, *, otel_context: otel_context.Context | None = None
) -> None:
if self._agent_state == state:
return

Expand All @@ -1169,7 +1171,9 @@ def _update_agent_state(self, state: AgentState) -> None:
self._tts_error_counts = 0

if self._agent_speaking_span is None:
self._agent_speaking_span = tracer.start_span("agent_speaking")
self._agent_speaking_span = tracer.start_span(
"agent_speaking", context=otel_context
)

if self._room_io:
_set_participant_attributes(
Expand Down
9 changes: 6 additions & 3 deletions livekit-agents/livekit/agents/voice/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,12 @@ async def _traceable_fnc_tool(
function_callable: Callable, fnc_call: llm.FunctionCall
) -> None:
current_span = trace.get_current_span()
current_span.set_attribute(trace_types.ATTR_FUNCTION_TOOL_NAME, fnc_call.name)
current_span.set_attribute(
trace_types.ATTR_FUNCTION_TOOL_ARGS, fnc_call.arguments
current_span.set_attributes(
{
trace_types.ATTR_FUNCTION_TOOL_ID: fnc_call.call_id,
trace_types.ATTR_FUNCTION_TOOL_NAME: fnc_call.name,
trace_types.ATTR_FUNCTION_TOOL_ARGS: fnc_call.arguments,
}
)

try:
Expand Down
13 changes: 13 additions & 0 deletions livekit-agents/livekit/agents/voice/speech_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from collections.abc import Generator, Sequence
from typing import Any, Callable

from opentelemetry import context as otel_context

from .. import llm, utils


Expand All @@ -31,6 +33,7 @@ def __init__(self, *, speech_id: str, allow_interruptions: bool) -> None:
self._tasks: list[asyncio.Task] = []
self._chat_items: list[llm.ChatItem] = []
self._num_steps = 1
self._agent_turn_context: otel_context.Context | None = None

self._item_added_callbacks: set[Callable[[llm.ChatItem], None]] = set()
self._done_callbacks: set[Callable[[SpeechHandle], None]] = set()
Expand All @@ -57,6 +60,16 @@ def num_steps(self) -> int:
def id(self) -> str:
return self._id

@property
def generation_id(self) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this field private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, #4124

return f"{self._id}_{self._num_steps}"

@property
def parent_generation_id(self) -> str | None:
if self._num_steps <= 1:
return None
return f"{self._id}_{self._num_steps - 1}"

@property
def scheduled(self) -> bool:
return self._scheduled_fut.done()
Expand Down
Loading