Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from livekit.agents.utils import is_given

from .models import ChatModels
from .utils import CACHE_CONTROL_EPHEMERAL, to_fnc_ctx
from .utils import CACHE_CONTROL_EPHEMERAL, AsyncAzureADTokenProvider, to_fnc_ctx


@dataclass
Expand Down Expand Up @@ -97,21 +97,98 @@ def __init__(
max_tokens=max_tokens,
)
anthropic_api_key = api_key if is_given(api_key) else os.environ.get("ANTHROPIC_API_KEY")
if not anthropic_api_key:
raise ValueError("Anthropic API key is required")

self._client = anthropic.AsyncClient(
api_key=anthropic_api_key,
base_url=base_url if is_given(base_url) else None,
http_client=httpx.AsyncClient(
timeout=5.0,
follow_redirects=True,
limits=httpx.Limits(
max_connections=1000,
max_keepalive_connections=100,
keepalive_expiry=120,

if client:
self._client = client
else:
if not anthropic_api_key:
raise ValueError("Anthropic API key is required")

self._client = anthropic.AsyncClient(
api_key=anthropic_api_key,
base_url=base_url if is_given(base_url) else None,
http_client=httpx.AsyncClient(
timeout=5.0,
follow_redirects=True,
limits=httpx.Limits(
max_connections=1000,
max_keepalive_connections=100,
keepalive_expiry=120,
),
),
),
)

@staticmethod
def with_azure(
*,
azure_endpoint: str,
azure_deployment: str,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
base_url: str | None = None,
model: str | ChatModels | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
client: anthropic.AsyncClient | None = None,
top_k: NotGivenOr[int] = NOT_GIVEN,
max_tokens: NotGivenOr[int] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
caching: NotGivenOr[Literal["ephemeral"]] = NOT_GIVEN,
) -> LLM:
"""
Create a new instance of Anthropic LLM with Azure AI Foundry support.

This method automatically configures the client to use Azure's endpoint structure.
"""
api_key = api_key or os.environ.get("AZURE_OPENAI_API_KEY")

if base_url is None:
azure_endpoint = azure_endpoint.rstrip("/")
if azure_endpoint.endswith("/anthropic"):
base_url = azure_endpoint
else:
base_url = f"{azure_endpoint}/anthropic"

default_headers = {}
if api_version:
default_headers["anthropic-version"] = api_version

if azure_ad_token:
default_headers["Authorization"] = f"Bearer {azure_ad_token}"
# We need to pass a non-empty api_key to AsyncClient as it is required
if not api_key:
api_key = "dummy"

if not client:
client = anthropic.AsyncClient(
api_key=api_key,
base_url=base_url,
default_headers=default_headers,
http_client=httpx.AsyncClient(
timeout=5.0,
follow_redirects=True,
limits=httpx.Limits(
max_connections=1000,
max_keepalive_connections=100,
keepalive_expiry=120,
),
),
)

return LLM(
model=model or azure_deployment,
client=client,
user=user,
temperature=temperature,
parallel_tool_calls=parallel_tool_calls,
tool_choice=tool_choice,
caching=caching,
top_k=top_k,
max_tokens=max_tokens,
api_key=api_key or NOT_GIVEN,
)

@property
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Literal, Optional, Union
from collections.abc import Awaitable
from typing import Callable, Literal, Optional, Union

import anthropic
from livekit.agents import llm
Expand All @@ -9,6 +10,8 @@
is_raw_function_tool,
)

AsyncAzureADTokenProvider = Callable[[], Union[str, Awaitable[str]]]

# We can define up to 4 cache breakpoints, we will add them at:
# - the last tool definition
# - the last system message
Expand All @@ -17,7 +20,7 @@
# https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching#structuring-your-prompt
CACHE_CONTROL_EPHEMERAL = anthropic.types.CacheControlEphemeralParam(type="ephemeral")

__all__ = ["to_fnc_ctx", "CACHE_CONTROL_EPHEMERAL"]
__all__ = ["to_fnc_ctx", "CACHE_CONTROL_EPHEMERAL", "AsyncAzureADTokenProvider"]


def to_fnc_ctx(
Expand Down
Loading