Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions examples/avatar_agents/liveavatar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# LiveKit LiveAvatar Avatar Agent

This example demonstrates how to create a animated avatar using [LiveAvatar by HeyGen](https://www.liveavatar.com/).

## Usage

* Update the environment:

```bash
# LiveAvatar Config
export LIVEAVATAR_API_KEY="..."
export LIVEAVATAR_AVATAR_ID="..."

# STT + LLM + TTS config
export OPENAI_API_KEY="..."
export DEEPGRAM_API_KEY="..."

# LiveKit config
export LIVEKIT_API_KEY="..."
export LIVEKIT_API_SECRET="..."
export LIVEKIT_URL="..."
```

* Start the agent worker:

```bash
python examples/avatar_agents/liveavatar/agent_worker.py dev
```
38 changes: 38 additions & 0 deletions examples/avatar_agents/liveavatar/agent_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
import os

from dotenv import load_dotenv

from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli
from livekit.plugins import deepgram, liveavatar, openai

logger = logging.getLogger("liveavatar-avatar-example")
logger.setLevel(logging.INFO)

load_dotenv()


async def entrypoint(ctx: JobContext):
session = AgentSession(
stt=deepgram.STT(),
llm=openai.LLM(),
tts=openai.TTS(),
resume_false_interruption=False,
)

liveavatar_avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")
avatar = liveavatar.AvatarSession(avatar_id=liveavatar_avatar_id)
await avatar.start(session, room=ctx.room)

agent = Agent(instructions="Talk to me!")

await session.start(
agent=agent,
room=ctx.room,
)

session.generate_reply(instructions="say hello to the user")


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
6 changes: 6 additions & 0 deletions livekit-plugins/livekit-plugins-liveavatar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# HeyGen virtual avatar plugin for LiveKit Agents

Support for the [Heygen LiveAvatar](https://www.liveavatar.com/) virtual avatar.

See [https://docs.livekit.io/agents/integrations/avatar/heygen/](https://docs.livekit.io/agents/integrations/avatar/heygen/) for more information.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""LiveAvatar avatar plugin for LiveKit Agents

Provides LiveAvatar interactive avatar integration similar to Tavus.
"""

from .api import LiveAvatarException
from .avatar import AvatarSession

__all__ = [
"LiveAvatarException",
"AvatarSession",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import asyncio
import logging
import os
from typing import Any, Optional

import aiohttp

from livekit import rtc
from livekit.agents import (
DEFAULT_API_CONNECT_OPTIONS,
APIConnectionError,
APIConnectOptions,
APIStatusError,
utils,
)

logger = logging.getLogger(__name__)


class LiveAvatarException(Exception):
"""Exception for LiveAvatar errors"""


DEFAULT_API_URL = "https://api.liveavatar.com/v1/sessions"


class LiveAvatarAPI:
def __init__(
self,
api_key: str,
*,
api_url: str = DEFAULT_API_URL,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
session: Optional[aiohttp.ClientSession] = None,
) -> None:
self._api_key = api_key or os.getenv("LIVEAVATAR_API_KEY")
if self._api_key is None:
raise LiveAvatarException("api_key or LIVEAVATAR_API_KEY must be set")

self._api_url = api_url or DEFAULT_API_URL
self._conn_options = conn_options
self._session = session or aiohttp.ClientSession()

def _ensure_http_session(self) -> aiohttp.ClientSession:
if self._session is None:
self._session = utils.http_context.http_session()
return self._session

async def create_streaming_session(
self,
*,
livekit_url: str,
livekit_token: str,
room: rtc.Room,
avatar_id: str,
) -> dict[str, Any]:
"""Create a new streaming session, return a session id"""

livekit_config = {
"livekit_room": room.name,
"livekit_url": livekit_url,
"livekit_client_token": livekit_token,
}

payload = {
"mode": "CUSTOM",
"avatar_id": avatar_id,
"livekit_config": livekit_config,
}

self._headers = {
"accept": "application/json",
"content-type": "application/json",
"X-API-KEY": self._api_key,
}
response_data = await self._post(endpoint="/token", payload=payload, headers=self._headers)
return response_data

async def start_streaming_session(self, session_id: str, session_token: str) -> dict[str, Any]:
"""Start the streaming session"""
payload = {"session_id": session_id}
headers = {"content-type": "application/json", "Authorization": f"Bearer {session_token}"}
response_data = await self._post(endpoint="/start", payload=payload, headers=headers)
return response_data

async def _post(
self, *, endpoint: str, payload: dict[str, Any], headers: dict[str, Any]
) -> dict[str, Any]:
url = self._api_url + endpoint
for i in range(self._conn_options.max_retry):
try:
async with self._ensure_http_session().post(
url=url, headers=headers, json=payload
) as response:
if not response.ok:
text = await response.text()
raise APIStatusError(
f"Server returned an error for {url}: {response.status}",
status_code=response.status,
body=text,
)
return await response.json() # type: ignore
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logger.warning(
f"API request to {url} failed on attempt {i}",
extra={"error": str(e)},
)
except Exception:
logger.exception("failed to call LiveAvatar API")

if i < self._conn_options.max_retry - 1:
await asyncio.sleep(self._conn_options.retry_interval)

raise APIConnectionError("Failed to call LiveAvatar API after all retries.")
Loading
Loading