diff --git a/examples/avatar_agents/liveavatar/README.md b/examples/avatar_agents/liveavatar/README.md new file mode 100644 index 0000000000..7ab473feef --- /dev/null +++ b/examples/avatar_agents/liveavatar/README.md @@ -0,0 +1,28 @@ +# LiveKit LiveAvatar Avatar Agent + +This example demonstrates how to create a animated avatar using [LiveAvatar by HeyGen](https://www.liveavatar.com/). + +## Usage + +* Update the environment: + +```bash +# LiveAvatar Config +export LIVEAVATAR_API_KEY="..." +export LIVEAVATAR_AVATAR_ID="..." + +# STT + LLM + TTS config +export OPENAI_API_KEY="..." +export DEEPGRAM_API_KEY="..." + +# LiveKit config +export LIVEKIT_API_KEY="..." +export LIVEKIT_API_SECRET="..." +export LIVEKIT_URL="..." +``` + +* Start the agent worker: + +```bash +python examples/avatar_agents/liveavatar/agent_worker.py dev +``` diff --git a/examples/avatar_agents/liveavatar/agent_worker.py b/examples/avatar_agents/liveavatar/agent_worker.py new file mode 100644 index 0000000000..edc3b8e9ae --- /dev/null +++ b/examples/avatar_agents/liveavatar/agent_worker.py @@ -0,0 +1,38 @@ +import logging +import os + +from dotenv import load_dotenv + +from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli +from livekit.plugins import deepgram, liveavatar, openai + +logger = logging.getLogger("liveavatar-avatar-example") +logger.setLevel(logging.INFO) + +load_dotenv() + + +async def entrypoint(ctx: JobContext): + session = AgentSession( + stt=deepgram.STT(), + llm=openai.LLM(), + tts=openai.TTS(), + resume_false_interruption=False, + ) + + liveavatar_avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID") + avatar = liveavatar.AvatarSession(avatar_id=liveavatar_avatar_id) + await avatar.start(session, room=ctx.room) + + agent = Agent(instructions="Talk to me!") + + await session.start( + agent=agent, + room=ctx.room, + ) + + session.generate_reply(instructions="say hello to the user") + + +if __name__ == "__main__": + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint)) diff --git a/livekit-plugins/livekit-plugins-liveavatar/README.md b/livekit-plugins/livekit-plugins-liveavatar/README.md new file mode 100644 index 0000000000..1d4ae69775 --- /dev/null +++ b/livekit-plugins/livekit-plugins-liveavatar/README.md @@ -0,0 +1,6 @@ +# HeyGen virtual avatar plugin for LiveKit Agents + +Support for the [Heygen LiveAvatar](https://www.liveavatar.com/) virtual avatar. + +See [https://docs.livekit.io/agents/integrations/avatar/heygen/](https://docs.livekit.io/agents/integrations/avatar/heygen/) for more information. + diff --git a/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/__init__.py b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/__init__.py new file mode 100644 index 0000000000..2e7331d8e7 --- /dev/null +++ b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/__init__.py @@ -0,0 +1,12 @@ +"""LiveAvatar avatar plugin for LiveKit Agents + +Provides LiveAvatar interactive avatar integration similar to Tavus. +""" + +from .api import LiveAvatarException +from .avatar import AvatarSession + +__all__ = [ + "LiveAvatarException", + "AvatarSession", +] diff --git a/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py new file mode 100644 index 0000000000..f558095255 --- /dev/null +++ b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py @@ -0,0 +1,114 @@ +import asyncio +import logging +import os +from typing import Any, Optional + +import aiohttp + +from livekit import rtc +from livekit.agents import ( + DEFAULT_API_CONNECT_OPTIONS, + APIConnectionError, + APIConnectOptions, + APIStatusError, + utils, +) + +logger = logging.getLogger(__name__) + + +class LiveAvatarException(Exception): + """Exception for LiveAvatar errors""" + + +DEFAULT_API_URL = "https://api.liveavatar.com/v1/sessions" + + +class LiveAvatarAPI: + def __init__( + self, + api_key: str, + *, + api_url: str = DEFAULT_API_URL, + conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, + session: Optional[aiohttp.ClientSession] = None, + ) -> None: + self._api_key = api_key or os.getenv("LIVEAVATAR_API_KEY") + if self._api_key is None: + raise LiveAvatarException("api_key or LIVEAVATAR_API_KEY must be set") + + self._api_url = api_url or DEFAULT_API_URL + self._conn_options = conn_options + self._session = session or aiohttp.ClientSession() + + def _ensure_http_session(self) -> aiohttp.ClientSession: + if self._session is None: + self._session = utils.http_context.http_session() + return self._session + + async def create_streaming_session( + self, + *, + livekit_url: str, + livekit_token: str, + room: rtc.Room, + avatar_id: str, + ) -> dict[str, Any]: + """Create a new streaming session, return a session id""" + + livekit_config = { + "livekit_room": room.name, + "livekit_url": livekit_url, + "livekit_client_token": livekit_token, + } + + payload = { + "mode": "CUSTOM", + "avatar_id": avatar_id, + "livekit_config": livekit_config, + } + + self._headers = { + "accept": "application/json", + "content-type": "application/json", + "X-API-KEY": self._api_key, + } + response_data = await self._post(endpoint="/token", payload=payload, headers=self._headers) + return response_data + + async def start_streaming_session(self, session_id: str, session_token: str) -> dict[str, Any]: + """Start the streaming session""" + payload = {"session_id": session_id} + headers = {"content-type": "application/json", "Authorization": f"Bearer {session_token}"} + response_data = await self._post(endpoint="/start", payload=payload, headers=headers) + return response_data + + async def _post( + self, *, endpoint: str, payload: dict[str, Any], headers: dict[str, Any] + ) -> dict[str, Any]: + url = self._api_url + endpoint + for i in range(self._conn_options.max_retry): + try: + async with self._ensure_http_session().post( + url=url, headers=headers, json=payload + ) as response: + if not response.ok: + text = await response.text() + raise APIStatusError( + f"Server returned an error for {url}: {response.status}", + status_code=response.status, + body=text, + ) + return await response.json() # type: ignore + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + logger.warning( + f"API request to {url} failed on attempt {i}", + extra={"error": str(e)}, + ) + except Exception: + logger.exception("failed to call LiveAvatar API") + + if i < self._conn_options.max_retry - 1: + await asyncio.sleep(self._conn_options.retry_interval) + + raise APIConnectionError("Failed to call LiveAvatar API after all retries.") diff --git a/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py new file mode 100644 index 0000000000..14db2c1957 --- /dev/null +++ b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py @@ -0,0 +1,230 @@ +from __future__ import annotations + +import asyncio +import base64 +import contextlib +import os +import uuid +from collections.abc import Iterator + +import aiohttp + +from livekit import api, rtc +from livekit.agents import ( + DEFAULT_API_CONNECT_OPTIONS, + NOT_GIVEN, + AgentSession, + APIConnectionError, + APIConnectOptions, + NotGivenOr, + get_job_context, + utils, +) +from livekit.agents.voice.room_io import ATTRIBUTE_PUBLISH_ON_BEHALF + +from .api import LiveAvatarAPI, LiveAvatarException +from .log import logger + +SAMPLE_RATE = 24000 +_AVATAR_AGENT_IDENTITY = "liveavatar-avatar-agent" +_AVATAR_AGENT_NAME = "liveavatar-avatar-agent" + + +class AvatarSession: + """A LiveAvatar avatar session""" + + def __init__( + self, + *, + avatar_id: NotGivenOr[str] = NOT_GIVEN, + api_url: NotGivenOr[str] = NOT_GIVEN, + api_key: NotGivenOr[str] = NOT_GIVEN, + avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN, + avatar_participant_name: NotGivenOr[str] = NOT_GIVEN, + conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, + ) -> None: + self._avatar_id = avatar_id or os.getenv("LIVEAVATAR_AVATAR_ID") + self._api = LiveAvatarAPI(api_key=api_key, api_url=api_url, conn_options=conn_options) + + self._avatar_participant_identity = avatar_participant_identity or _AVATAR_AGENT_IDENTITY + self._avatar_participant_name = avatar_participant_name or _AVATAR_AGENT_NAME + self._main_atask = asyncio.Task | None + self._audio_resampler: rtc.AudioResampler | None = None + self._session_data = None + self._msg_ch = utils.aio.Chan[dict]() + + async def start( + self, + agent_session: AgentSession, + room: rtc.Room, + *, + livekit_url: NotGivenOr[str] = NOT_GIVEN, + livekit_api_key: NotGivenOr[str] = NOT_GIVEN, + livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, + ) -> None: + self._agent_session = agent_session + self._room = room + livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) + livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) + livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) + if not livekit_url or not livekit_api_key or not livekit_api_secret: + raise LiveAvatarException( + "livekit_url, livekit_api_key, and livekit_api_secret must be set" + ) + + try: + job_ctx = get_job_context() + self._local_participant_identity = job_ctx.token_claims().identity + except RuntimeError as e: + if not room.isconnected(): + raise LiveAvatarException("failed to get local participant identity") from e + self._local_participant_identity = room.local_participant.identity + + livekit_token = ( + api.AccessToken( + api_key=livekit_api_key, + api_secret=livekit_api_secret, + ) + .with_kind("agent") + .with_identity(self._avatar_participant_identity) + .with_name(self._avatar_participant_name) + .with_grants(api.VideoGrants(room_join=True, room=self._room.name)) + .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: self._local_participant_identity}) + .to_jwt() + ) + + logger.debug("starting avatar session") + + session_config_data = await self._api.create_streaming_session( + livekit_url=livekit_url, + livekit_token=livekit_token, + room=self._room, + avatar_id=self._avatar_id, + ) + self._session_id = session_config_data["data"]["session_id"] + self._session_token = session_config_data["data"]["session_token"] + logger.info(f"LiveAvatar session created: {self._session_id}") + + session_start_data = await self._api.start_streaming_session( + self._session_id, self._session_token + ) + self._ws_url = session_start_data["data"]["ws_url"] + logger.info("LiveAvatar streaming session started") + + @self._agent_session.on("agent_state_changed") + def on_agent_state_changed(ev): + if ev.old_state == "speaking" and ev.new_state == "listening": + self.send_event({"type": "agent.speak_end", "event_id": str(uuid.uuid4())}) + self.send_event({"type": "agent.start_listening", "event_id": str(uuid.uuid4())}) + if ev.new_state == "idle": + self.send_event({"type": "agent.stop_listening", "event_id": str(uuid.uuid4())}) + + @self._agent_session.on("conversation_item_added") + def on_conversation_item_added(ev): + if ( + self._agent_session.current_speech is not None + and self._agent_session.current_speech.interrupted + ): + self.send_event({"type": "agent.interrupt", "event_id": str(uuid.uuid4())}) + + @self._room.on("local_track_published") + def on_local_track_published(publication, track): + self._agent_audio_track = track + self._main_atask = asyncio.create_task( + self._main_task(), name="AvatarSession._main_task" + ) + + def _resample_audio(self, frame: rtc.AudioFrame) -> Iterator[rtc.AudioFrame]: + if self._audio_resampler: + if frame.sample_rate != self._audio_resampler._input_rate: + self._audio_resampler = None + + if self._audio_resampler is None and ( + frame.sample_rate != SAMPLE_RATE or frame.num_channels != 1 + ): + self._audio_resampler = rtc.AudioResampler( + input_rate=frame.sample_rate, + output_rate=SAMPLE_RATE, + num_channels=1, + ) + + if self._audio_resampler: + yield from self._audio_resampler.push(frame) + else: + yield frame + + def send_event(self, msg: dict) -> None: + with contextlib.suppress(utils.aio.channel.ChanClosed): + self._msg_ch.send_nowait(msg) + + async def _main_task(self) -> None: + local_participant = self._room.local_participant + track_perms = rtc.ParticipantTrackPermission( + participant_identity=_AVATAR_AGENT_IDENTITY, allow_all=True + ) + local_participant.set_track_subscription_permissions( + allow_all_participants=False, participant_permissions=[track_perms] + ) + + if self._agent_audio_track is not None: + agent_audio_stream = rtc.AudioStream.from_track(track=self._agent_audio_track) + ws_conn = await self._api._ensure_http_session().ws_connect(url=self._ws_url) + + closing = False + + async def _forward_audio() -> None: + async for audio_event in agent_audio_stream: + audio_frame = audio_event.frame + + if not any(audio_frame.data): + continue + + for resampled_frame in self._resample_audio(audio_frame): + data = resampled_frame.data.tobytes() + encoded_audio = base64.b64encode(data).decode("utf-8") + + msg = { + "type": "agent.speak", + "event_id": str(uuid.uuid4()), + "audio": encoded_audio, + } + + self.send_event(msg) + + @utils.log_exceptions(logger=logger) + async def _send_task() -> None: + nonlocal closing + + async for msg in self._msg_ch: + try: + await ws_conn.send_json(data=msg) + except Exception: + break + closing = True + await ws_conn.close() + + @utils.log_exceptions(logger=logger) + async def _recv_task() -> None: + while True: + msg = await ws_conn.receive() + if msg.type in ( + aiohttp.WSMsgType.CLOSED, + aiohttp.WSMsgType.CLOSE, + aiohttp.WSMsgType.CLOSING, + ): + if closing: + return + raise APIConnectionError(message="LiveAvatar connection closed unexpectedly.") + + tasks = [ + asyncio.create_task(_forward_audio(), name="_forward_audio_task"), + asyncio.create_task(_send_task(), name="_send_task"), + asyncio.create_task(_recv_task(), name="_recv_task"), + ] + try: + done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + task.result() + finally: + await utils.aio.cancel_and_wait(*tasks) + await ws_conn.close() diff --git a/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/log.py b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/log.py new file mode 100644 index 0000000000..b244d05538 --- /dev/null +++ b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/log.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger("livekit.plugins.liveavatar") diff --git a/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/version.py b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/version.py new file mode 100644 index 0000000000..47e7c96ad0 --- /dev/null +++ b/livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/version.py @@ -0,0 +1,15 @@ +# Copyright 2025 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "1.2.18" diff --git a/livekit-plugins/livekit-plugins-liveavatar/pyproject.toml b/livekit-plugins/livekit-plugins-liveavatar/pyproject.toml new file mode 100644 index 0000000000..d78111237f --- /dev/null +++ b/livekit-plugins/livekit-plugins-liveavatar/pyproject.toml @@ -0,0 +1,39 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "livekit-plugins-liveavatar" +dynamic = ["version"] +description = "Agent Framework plugin for LiveAvatar" +readme = "README.md" +license = "Apache-2.0" +requires-python = ">=3.9.0" +authors = [{ name = "LiveKit", email = "support@livekit.io" }] +keywords = ["voice", "ai", "realtime", "audio", "video", "livekit", "webrtc", "heygen", "liveavatar", "avatar"] +classifiers = [ + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Topic :: Multimedia :: Sound/Audio", + "Topic :: Multimedia :: Video", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3 :: Only", +] +dependencies = ["livekit-agents>=1.2.18"] + +[project.urls] +Documentation = "https://docs.livekit.io" +Website = "https://livekit.io/" +Source = "https://github.com/livekit/agents" + +[tool.hatch.version] +path = "livekit/plugins/liveavatar/version.py" + +[tool.hatch.build.targets.wheel] +packages = ["livekit"] + +[tool.hatch.build.targets.sdist] +include = ["/livekit"] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 70f698192f..2625d84446 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ livekit-plugins-hedra = { workspace = true } livekit-plugins-hume = { workspace = true } livekit-plugins-inworld = { workspace = true } livekit-plugins-langchain = { workspace = true } +livekit-plugins-liveavatar = { workspace = true} livekit-plugins-lmnt = { workspace = true } livekit-plugins-minimax-ai = { workspace = true } livekit-plugins-mistralai = { workspace = true }