Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add metrics to LLMInvocation traces
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3891](#3891))

## Version 0.2b0 (2025-10-14)

- Add jsonlines support to fsspec uploader
Expand Down
4 changes: 2 additions & 2 deletions util/opentelemetry-util-genai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ classifiers = [
"Programming Language :: Python :: 3.13",
]
dependencies = [
"opentelemetry-instrumentation ~= 0.57b0",
"opentelemetry-semantic-conventions ~= 0.57b0",
"opentelemetry-instrumentation ~= 0.58b0",
"opentelemetry-semantic-conventions ~= 0.58b0",
"opentelemetry-api>=1.31.0",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@
from typing import Iterator, Optional

from opentelemetry import context as otel_context
from opentelemetry.metrics import MeterProvider, get_meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import (
Span,
SpanKind,
TracerProvider,
get_tracer,
set_span_in_context,
)
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
from opentelemetry.util.genai.span_utils import (
_apply_error_attributes,
_apply_finish_attributes,
Expand All @@ -88,13 +91,41 @@ class TelemetryHandler:
them as spans, metrics, and events.
"""

def __init__(self, tracer_provider: TracerProvider | None = None):
def __init__(
self,
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
):
self._tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=Schemas.V1_36_0.value,
schema_url=Schemas.V1_37_0.value,
)
self._metrics_recorder: Optional[InvocationMetricsRecorder] = None
try:
meter = get_meter(__name__, meter_provider=meter_provider)
self._metrics_recorder = InvocationMetricsRecorder(meter)
except Exception: # pragma: no cover - defensive fallback # pylint: disable=broad-exception-caught
self._metrics_recorder = None

def _record_llm_metrics(
self,
invocation: LLMInvocation,
span: Optional[Span],
*,
error_type: Optional[str] = None,
) -> None:
if self._metrics_recorder is None or span is None:
return
try:
self._metrics_recorder.record(
span,
invocation,
error_type=error_type,
)
except Exception: # pragma: no cover - defensive fallback # pylint: disable=broad-exception-caught
pass

def start_llm(
self,
Expand All @@ -118,10 +149,12 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab
# TODO: Provide feedback that this invocation was not started
return invocation

_apply_finish_attributes(invocation.span, invocation)
span = invocation.span
_apply_finish_attributes(span, invocation)
self._record_llm_metrics(invocation, span)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
span.end()
return invocation

def fail_llm( # pylint: disable=no-self-use
Expand All @@ -132,10 +165,13 @@ def fail_llm( # pylint: disable=no-self-use
# TODO: Provide feedback that this invocation was not started
return invocation

_apply_error_attributes(invocation.span, error)
span = invocation.span
_apply_error_attributes(span, error)
error_type = getattr(error.type, "__qualname__", None)
self._record_llm_metrics(invocation, span, error_type=error_type)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
span.end()
return invocation

@contextmanager
Expand Down Expand Up @@ -165,6 +201,7 @@ def llm(

def get_telemetry_handler(
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
) -> TelemetryHandler:
"""
Returns a singleton TelemetryHandler instance.
Expand All @@ -173,6 +210,9 @@ def get_telemetry_handler(
get_telemetry_handler, "_default_handler", None
)
if handler is None:
handler = TelemetryHandler(tracer_provider=tracer_provider)
handler = TelemetryHandler(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)
setattr(get_telemetry_handler, "_default_handler", handler)
return handler
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics

_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
0.01,
0.02,
0.04,
0.08,
0.16,
0.32,
0.64,
1.28,
2.56,
5.12,
10.24,
20.48,
40.96,
81.92,
]

_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [
1,
4,
16,
64,
256,
1024,
4096,
16384,
65536,
262144,
1048576,
4194304,
16777216,
67108864,
]


class Instruments:
def __init__(self, meter: Meter):
self.operation_duration_histogram: Histogram = meter.create_histogram(
name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION,
description="Duration of GenAI client operation",
unit="s",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)
self.token_usage_histogram: Histogram = meter.create_histogram(
name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE,
description="Number of input and output tokens used by GenAI clients",
unit="{token}",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Helpers for emitting GenAI metrics from LLM invocations."""

from __future__ import annotations

import time
from numbers import Number
from typing import Dict, Optional

from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.trace import Span, set_span_in_context
from opentelemetry.util.genai.instruments import Instruments
from opentelemetry.util.genai.types import LLMInvocation
from opentelemetry.util.types import AttributeValue

_NS_PER_SECOND = 1_000_000_000


def _now_ns() -> int:
return time.time_ns()


def _get_span_start_time_ns(span: Optional[Span]) -> Optional[int]:
if span is None:
return None
for attr in ("start_time", "_start_time"):
value = getattr(span, attr, None)
if isinstance(value, int):
return value
return None


def _calculate_duration_seconds(span: Optional[Span]) -> Optional[float]:
"""Calculate duration in seconds from span start time to now."""
start_time_ns = _get_span_start_time_ns(span)
if start_time_ns is None:
return None
elapsed_ns = max(_now_ns() - start_time_ns, 0)
return elapsed_ns / _NS_PER_SECOND


class InvocationMetricsRecorder:
"""Records duration and token usage histograms for GenAI invocations."""

def __init__(self, meter: Meter):
instruments = Instruments(meter)
self._duration_histogram: Histogram = (
instruments.operation_duration_histogram
)
self._token_histogram: Histogram = instruments.token_usage_histogram

def record(
self,
span: Optional[Span],
invocation: LLMInvocation,
*,
error_type: Optional[str] = None,
) -> None:
"""Record duration and token metrics for an invocation if possible."""
if span is None:
return

tokens: list[tuple[int, str]] = []
if isinstance(invocation.input_tokens, int):
tokens.append(
(
invocation.input_tokens,
GenAI.GenAiTokenTypeValues.INPUT.value,
)
)
if isinstance(invocation.output_tokens, int):
tokens.append(
(
invocation.output_tokens,
GenAI.GenAiTokenTypeValues.COMPLETION.value,
)
)

if not tokens:
return

attributes: Dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value
}
if invocation.request_model:
attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model
if invocation.provider:
attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider
if invocation.response_model_name:
attributes[GenAI.GEN_AI_RESPONSE_MODEL] = (
invocation.response_model_name
)

# Calculate duration from span timing
duration_seconds = _calculate_duration_seconds(span)

span_context = set_span_in_context(span)
if error_type:
attributes["error.type"] = error_type

if (
duration_seconds is not None
and isinstance(duration_seconds, Number)
and duration_seconds >= 0
):
duration_attributes: Dict[str, AttributeValue] = dict(attributes)
self._duration_histogram.record(
float(duration_seconds),
attributes=duration_attributes,
context=span_context,
)

for token in tokens:
token_attributes: Dict[str, AttributeValue] = dict(attributes)
token_attributes[GenAI.GEN_AI_TOKEN_TYPE] = token[1]
self._token_histogram.record(
token[0],
attributes=token_attributes,
context=span_context,
)


__all__ = ["InvocationMetricsRecorder"]
Loading