Skip to content

Commit 6bb3121

Browse files
DrishyaDasdrishdev
authored andcommitted
Add milvus metrics support
1 parent 6d13035 commit 6bb3121

File tree

8 files changed

+266
-29
lines changed

8 files changed

+266
-29
lines changed

packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/__init__.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@
66
from typing import Collection
77

88
from opentelemetry.instrumentation.milvus.config import Config
9+
from opentelemetry.metrics import get_meter
910
from opentelemetry.trace import get_tracer
1011
from wrapt import wrap_function_wrapper
1112

13+
from opentelemetry.semconv_ai import Meters
1214
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
1315
from opentelemetry.instrumentation.utils import unwrap
1416

1517
from opentelemetry.instrumentation.milvus.wrapper import _wrap
1618
from opentelemetry.instrumentation.milvus.version import __version__
19+
from opentelemetry.instrumentation.milvus.utils import is_metrics_enabled
1720

1821
logger = logging.getLogger(__name__)
1922

@@ -82,8 +85,39 @@ def instrumentation_dependencies(self) -> Collection[str]:
8285
return _instruments
8386

8487
def _instrument(self, **kwargs):
88+
if is_metrics_enabled():
89+
meter_provider = kwargs.get("meter_provider")
90+
meter = get_meter(__name__, __version__, meter_provider)
91+
92+
query_duration_metric = meter.create_histogram(
93+
Meters.MILVUS_DB_QUERY_DURATION,
94+
"s",
95+
"Duration of query operations to Milvus",
96+
)
97+
distance_metric = meter.create_histogram(
98+
Meters.MILVUS_DB_SEARCH_DISTANCE,
99+
"",
100+
"Cosine distance between search query vector and matched vectors",
101+
)
102+
insert_units_metric = meter.create_counter(
103+
Meters.MILVUS_DB_USAGE_INSERT_UNITS,
104+
"",
105+
"Number of insert units consumed in serverless calls",
106+
)
107+
upsert_units_metric = meter.create_counter(
108+
Meters.MILVUS_DB_USAGE_UPSERT_UNITS,
109+
"",
110+
"Number of upsert units consumed in serverless calls",
111+
)
112+
delete_units_metric = meter.create_counter(
113+
Meters.MILVUS_DB_USAGE_DELETE_UNITS,
114+
"",
115+
"Number of delete collections units consumed in serverless calls",
116+
)
117+
85118
tracer_provider = kwargs.get("tracer_provider")
86119
tracer = get_tracer(__name__, __version__, tracer_provider)
120+
87121
for wrapped_method in WRAPPED_METHODS:
88122
wrap_package = wrapped_method.get("package")
89123
wrap_object = wrapped_method.get("object")
@@ -92,7 +126,15 @@ def _instrument(self, **kwargs):
92126
wrap_function_wrapper(
93127
wrap_package,
94128
f"{wrap_object}.{wrap_method}",
95-
_wrap(tracer, wrapped_method),
129+
_wrap(
130+
tracer,
131+
query_duration_metric,
132+
distance_metric,
133+
insert_units_metric,
134+
upsert_units_metric,
135+
delete_units_metric,
136+
wrapped_method
137+
),
96138
)
97139

98140
def _uninstrument(self, **kwargs):

packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/utils.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import traceback
3+
import os
34
from opentelemetry.instrumentation.milvus.config import Config
45

56

@@ -26,3 +27,7 @@ def wrapper(*args, **kwargs):
2627
Config.exception_logger(e)
2728

2829
return wrapper
30+
31+
32+
def is_metrics_enabled() -> bool:
33+
return (os.getenv("TRACELOOP_METRICS_ENABLED") or "true").lower() == "true"

packages/opentelemetry-instrumentation-milvus/opentelemetry/instrumentation/milvus/wrapper.py

Lines changed: 104 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,40 @@
1+
import time
12
from opentelemetry.instrumentation.milvus.utils import dont_throw
23
from opentelemetry.semconv.trace import SpanAttributes
34

45
from opentelemetry import context as context_api
6+
from opentelemetry.trace.status import Status, StatusCode
57
from opentelemetry.instrumentation.utils import (
68
_SUPPRESS_INSTRUMENTATION_KEY,
79
)
810
from opentelemetry.semconv_ai import Events, EventAttributes
9-
from opentelemetry.semconv_ai import SpanAttributes as AISpanAttributes
11+
from opentelemetry.semconv_ai import SpanAttributes as AISpanAttributes, Meters
1012

1113

1214
def _with_tracer_wrapper(func):
1315
"""Helper for providing tracer for wrapper functions."""
1416

15-
def _with_tracer(tracer, to_wrap):
17+
def _with_tracer(
18+
tracer,
19+
query_duration_metric,
20+
distance_metric,
21+
insert_units_metric,
22+
upsert_units_metric,
23+
delete_units_metric,
24+
to_wrap):
1625
def wrapper(wrapped, instance, args, kwargs):
17-
return func(tracer, to_wrap, wrapped, instance, args, kwargs)
26+
return func(
27+
tracer,
28+
query_duration_metric,
29+
distance_metric,
30+
insert_units_metric,
31+
upsert_units_metric,
32+
delete_units_metric,
33+
to_wrap,
34+
wrapped,
35+
instance,
36+
args,
37+
kwargs)
1838

1939
return wrapper
2040

@@ -29,44 +49,76 @@ def _set_span_attribute(span, name, value):
2949

3050

3151
@_with_tracer_wrapper
32-
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
52+
def _wrap(
53+
tracer,
54+
query_duration_metric,
55+
distance_metric,
56+
insert_units_metric,
57+
upsert_units_metric,
58+
delete_units_metric,
59+
to_wrap,
60+
wrapped,
61+
instance,
62+
args,
63+
kwargs
64+
):
3365
"""Instruments and calls every function defined in TO_WRAP."""
3466
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
3567
return wrapped(*args, **kwargs)
3668

69+
method = to_wrap.get("method")
3770
name = to_wrap.get("span_name")
3871
with tracer.start_as_current_span(name) as span:
3972
span.set_attribute(SpanAttributes.DB_SYSTEM, "milvus")
4073
span.set_attribute(SpanAttributes.DB_OPERATION, to_wrap.get("method"))
4174

42-
if to_wrap.get("method") == "insert":
75+
if method == "insert":
4376
_set_insert_attributes(span, kwargs)
44-
elif to_wrap.get("method") == "upsert":
77+
elif method == "upsert":
4578
_set_upsert_attributes(span, kwargs)
46-
elif to_wrap.get("method") == "delete":
79+
elif method == "delete":
4780
_set_delete_attributes(span, kwargs)
48-
elif to_wrap.get("method") == "search":
81+
elif method == "search":
4982
_set_search_attributes(span, kwargs)
50-
elif to_wrap.get("method") == "get":
83+
elif method == "get":
5184
_set_get_attributes(span, kwargs)
52-
elif to_wrap.get("method") == "query":
85+
elif method == "query":
5386
_set_query_attributes(span, kwargs)
54-
elif to_wrap.get("method") == "create_collection":
87+
elif method == "create_collection":
5588
_set_create_collection_attributes(span, kwargs)
56-
elif to_wrap.get("method") == "hybrid_search":
89+
elif method == "hybrid_search":
5790
_set_hybrid_search_attributes(span, kwargs)
5891

92+
start_time = time.time()
5993
return_value = wrapped(*args, **kwargs)
94+
end_time = time.time()
6095

61-
if to_wrap.get("method") == "query":
96+
if method == "query":
6297
_add_query_result_events(span, return_value)
6398

64-
if (
65-
to_wrap.get("method") == "search"
66-
or to_wrap.get("method") == "hybrid_search"
67-
):
99+
elif (method == "search" or method == "hybrid_search"):
68100
_add_search_result_events(span, return_value)
69101

102+
shared_attributes = {SpanAttributes.DB_SYSTEM: "milvus"}
103+
duration = end_time - start_time
104+
if duration > 0 and query_duration_metric and method == "query":
105+
query_duration_metric.record(duration, shared_attributes)
106+
107+
if return_value and span.is_recording():
108+
if method == "search" or method == "hybrid_search":
109+
set_search_response(span, distance_metric, shared_attributes, return_value)
110+
111+
_set_response_attributes(
112+
span,
113+
insert_units_metric,
114+
upsert_units_metric,
115+
delete_units_metric,
116+
shared_attributes,
117+
return_value,
118+
)
119+
120+
span.set_status(Status(StatusCode.OK))
121+
70122
return return_value
71123

72124

@@ -101,6 +153,31 @@ def count_or_none(obj):
101153
return None
102154

103155

156+
def _set_response_attributes(
157+
span,
158+
insert_units_metric,
159+
upsert_units_metric,
160+
delete_units_metric,
161+
shared_attributes,
162+
response
163+
):
164+
print(response)
165+
if 'upsert_count' in response:
166+
upsert_count = response['upsert_count']
167+
upsert_units_metric.add(upsert_count, shared_attributes)
168+
span.set_attribute(Meters.MILVUS_DB_USAGE_UPSERT_UNITS, upsert_count)
169+
170+
if ('insert_count' in response):
171+
insert_count = response['insert_count']
172+
insert_units_metric.add(insert_count, shared_attributes)
173+
span.set_attribute(Meters.MILVUS_DB_USAGE_INSERT_UNITS, insert_count)
174+
175+
if ('delete_count' in response):
176+
delete_count = response['delete_count']
177+
delete_units_metric.add(delete_count, shared_attributes)
178+
span.set_attribute(Meters.MILVUS_DB_USAGE_DELETE_UNITS, delete_count)
179+
180+
104181
@dont_throw
105182
def _set_create_collection_attributes(span, kwargs):
106183
_set_span_attribute(
@@ -425,3 +502,13 @@ def _set_delete_attributes(span, kwargs):
425502
AISpanAttributes.MILVUS_DELETE_FILTER,
426503
_encode_filter(kwargs.get("filter")),
427504
)
505+
506+
507+
@dont_throw
508+
def set_search_response(distance_metric, shared_attributes, response):
509+
for query_result in response:
510+
for match in query_result:
511+
distance = match.get("distance")
512+
513+
if distance_metric and distance is not None:
514+
distance_metric.record(distance, shared_attributes)

packages/opentelemetry-instrumentation-milvus/tests/conftest.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
"""Unit tests configuration module."""
22

33
import pytest
4-
from opentelemetry import trace
4+
from opentelemetry import trace, metrics
5+
from opentelemetry.instrumentation.anthropic import AnthropicInstrumentor
6+
from opentelemetry.sdk.metrics import Counter, Histogram, MeterProvider
7+
from opentelemetry.sdk.metrics.export import (
8+
AggregationTemporality,
9+
InMemoryMetricReader,
10+
)
11+
from opentelemetry.sdk.resources import Resource
512
from opentelemetry.sdk.trace import TracerProvider
613
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
714
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
@@ -27,3 +34,43 @@ def exporter():
2734
@pytest.fixture(autouse=True)
2835
def clear_exporter(exporter):
2936
exporter.clear()
37+
38+
39+
@pytest.fixture(scope="session")
40+
def reader():
41+
reader = InMemoryMetricReader(
42+
{Counter: AggregationTemporality.DELTA, Histogram: AggregationTemporality.DELTA}
43+
)
44+
return reader
45+
46+
47+
@pytest.fixture(scope="session")
48+
def meter_provider(reader):
49+
resource = Resource.create()
50+
meter_provider = MeterProvider(metric_readers=[reader], resource=resource)
51+
metrics.set_meter_provider(meter_provider)
52+
53+
return meter_provider
54+
55+
56+
@pytest.fixture(scope="session", autouse=True)
57+
def instrument(exporter, reader, meter_provider):
58+
async def upload_base64_image(*args):
59+
return "/some/url"
60+
61+
AnthropicInstrumentor(
62+
enrich_token_usage=True,
63+
upload_base64_image=upload_base64_image,
64+
).instrument()
65+
66+
yield
67+
68+
exporter.shutdown()
69+
reader.shutdown()
70+
meter_provider.shutdown()
71+
72+
73+
@pytest.fixture(autouse=True)
74+
def clear_exporter_reader(exporter, reader):
75+
exporter.clear()
76+
reader.get_metrics_data()

0 commit comments

Comments
 (0)