Skip to content

feat(instrumentation): Add milvus metrics support #3013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
from typing import Collection

from opentelemetry.instrumentation.milvus.config import Config
from opentelemetry.metrics import get_meter
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper

from opentelemetry.semconv_ai import Meters
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap

from opentelemetry.instrumentation.milvus.wrapper import _wrap
from opentelemetry.instrumentation.milvus.version import __version__
from opentelemetry.instrumentation.milvus.utils import is_metrics_enabled

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -82,8 +85,39 @@ def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
if is_metrics_enabled():
meter_provider = kwargs.get("meter_provider")
meter = get_meter(__name__, __version__, meter_provider)

query_duration_metric = meter.create_histogram(
Meters.MILVUS_DB_QUERY_DURATION,
"s",
"Duration of query operations to Milvus",
)
distance_metric = meter.create_histogram(
Meters.MILVUS_DB_SEARCH_DISTANCE,
"",
"Cosine distance between search query vector and matched vectors",
)
insert_units_metric = meter.create_counter(
Meters.MILVUS_DB_USAGE_INSERT_UNITS,
"",
"Number of insert units consumed in serverless calls",
)
upsert_units_metric = meter.create_counter(
Meters.MILVUS_DB_USAGE_UPSERT_UNITS,
"",
"Number of upsert units consumed in serverless calls",
)
delete_units_metric = meter.create_counter(
Meters.MILVUS_DB_USAGE_DELETE_UNITS,
"",
"Number of delete collections units consumed in serverless calls",
)

tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)

for wrapped_method in WRAPPED_METHODS:
wrap_package = wrapped_method.get("package")
wrap_object = wrapped_method.get("object")
Expand All @@ -92,7 +126,15 @@ def _instrument(self, **kwargs):
wrap_function_wrapper(
wrap_package,
f"{wrap_object}.{wrap_method}",
_wrap(tracer, wrapped_method),
_wrap(
tracer,
query_duration_metric,
distance_metric,
insert_units_metric,
upsert_units_metric,
delete_units_metric,
wrapped_method
),
)

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import traceback
import os
from opentelemetry.instrumentation.milvus.config import Config


Expand All @@ -26,3 +27,7 @@ def wrapper(*args, **kwargs):
Config.exception_logger(e)

return wrapper


def is_metrics_enabled() -> bool:
return (os.getenv("TRACELOOP_METRICS_ENABLED") or "true").lower() == "true"
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import time
from opentelemetry.instrumentation.milvus.utils import dont_throw
from opentelemetry.semconv.trace import SpanAttributes

from opentelemetry import context as context_api
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
)
Expand All @@ -12,9 +14,27 @@
def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, to_wrap):
def _with_tracer(
tracer,
query_duration_metric,
distance_metric,
insert_units_metric,
upsert_units_metric,
delete_units_metric,
to_wrap):
def wrapper(wrapped, instance, args, kwargs):
return func(tracer, to_wrap, wrapped, instance, args, kwargs)
return func(
tracer,
query_duration_metric,
distance_metric,
insert_units_metric,
upsert_units_metric,
delete_units_metric,
to_wrap,
wrapped,
instance,
args,
kwargs)

return wrapper

Expand All @@ -29,44 +49,76 @@ def _set_span_attribute(span, name, value):


@_with_tracer_wrapper
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
def _wrap(
tracer,
query_duration_metric,
distance_metric,
insert_units_metric,
upsert_units_metric,
delete_units_metric,
to_wrap,
wrapped,
instance,
args,
kwargs
):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

method = to_wrap.get("method")
name = to_wrap.get("span_name")
with tracer.start_as_current_span(name) as span:
span.set_attribute(SpanAttributes.DB_SYSTEM, "milvus")
span.set_attribute(SpanAttributes.DB_OPERATION, to_wrap.get("method"))

if to_wrap.get("method") == "insert":
if method == "insert":
_set_insert_attributes(span, kwargs)
elif to_wrap.get("method") == "upsert":
elif method == "upsert":
_set_upsert_attributes(span, kwargs)
elif to_wrap.get("method") == "delete":
elif method == "delete":
_set_delete_attributes(span, kwargs)
elif to_wrap.get("method") == "search":
elif method == "search":
_set_search_attributes(span, kwargs)
elif to_wrap.get("method") == "get":
elif method == "get":
_set_get_attributes(span, kwargs)
elif to_wrap.get("method") == "query":
elif method == "query":
_set_query_attributes(span, kwargs)
elif to_wrap.get("method") == "create_collection":
elif method == "create_collection":
_set_create_collection_attributes(span, kwargs)
elif to_wrap.get("method") == "hybrid_search":
elif method == "hybrid_search":
_set_hybrid_search_attributes(span, kwargs)

start_time = time.time()
return_value = wrapped(*args, **kwargs)
end_time = time.time()

if to_wrap.get("method") == "query":
if method == "query":
_add_query_result_events(span, return_value)

if (
to_wrap.get("method") == "search"
or to_wrap.get("method") == "hybrid_search"
):
elif (method == "search" or method == "hybrid_search"):
_add_search_result_events(span, return_value)

shared_attributes = {SpanAttributes.DB_SYSTEM: "milvus"}
duration = end_time - start_time
if duration > 0 and query_duration_metric and method == "query":
query_duration_metric.record(duration, shared_attributes)

if return_value and span.is_recording():
if method == "search" or method == "hybrid_search":
set_search_response(span, distance_metric, shared_attributes, return_value)

_set_response_attributes(
span,
insert_units_metric,
upsert_units_metric,
delete_units_metric,
shared_attributes,
return_value,
)

span.set_status(Status(StatusCode.OK))

return return_value


Expand Down Expand Up @@ -101,6 +153,27 @@ def count_or_none(obj):
return None


def _set_response_attributes(
span,
insert_units_metric,
upsert_units_metric,
delete_units_metric,
shared_attributes,
response
):
if 'upsert_count' in response:
upsert_count = response['upsert_count']
upsert_units_metric.add(upsert_count, shared_attributes)

if ('insert_count' in response):
insert_count = response['insert_count']
insert_units_metric.add(insert_count, shared_attributes)

if ('delete_count' in response):
delete_count = response['delete_count']
delete_units_metric.add(delete_count, shared_attributes)


@dont_throw
def _set_create_collection_attributes(span, kwargs):
_set_span_attribute(
Expand Down Expand Up @@ -425,3 +498,13 @@ def _set_delete_attributes(span, kwargs):
AISpanAttributes.MILVUS_DELETE_FILTER,
_encode_filter(kwargs.get("filter")),
)


@dont_throw
def set_search_response(distance_metric, shared_attributes, response):
for query_result in response:
for match in query_result:
distance = match.get("distance")

if distance_metric and distance is not None:
distance_metric.record(distance, shared_attributes)
49 changes: 48 additions & 1 deletion packages/opentelemetry-instrumentation-milvus/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
"""Unit tests configuration module."""

import pytest
from opentelemetry import trace
from opentelemetry import trace, metrics
from opentelemetry.instrumentation.anthropic import AnthropicInstrumentor
from opentelemetry.sdk.metrics import Counter, Histogram, MeterProvider
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
InMemoryMetricReader,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
Expand All @@ -27,3 +34,43 @@ def exporter():
@pytest.fixture(autouse=True)
def clear_exporter(exporter):
exporter.clear()


@pytest.fixture(scope="session")
def reader():
reader = InMemoryMetricReader(
{Counter: AggregationTemporality.DELTA, Histogram: AggregationTemporality.DELTA}
)
return reader


@pytest.fixture(scope="session")
def meter_provider(reader):
resource = Resource.create()
meter_provider = MeterProvider(metric_readers=[reader], resource=resource)
metrics.set_meter_provider(meter_provider)

return meter_provider


@pytest.fixture(scope="session", autouse=True)
def instrument(exporter, reader, meter_provider):
async def upload_base64_image(*args):
return "/some/url"

AnthropicInstrumentor(
enrich_token_usage=True,
upload_base64_image=upload_base64_image,
).instrument()

yield

exporter.shutdown()
reader.shutdown()
meter_provider.shutdown()


@pytest.fixture(autouse=True)
def clear_exporter_reader(exporter, reader):
exporter.clear()
reader.get_metrics_data()
Loading