diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index f56cd5e100..38e76118cc 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add jsonlines support to fsspec uploader + ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3791](#3791)) + ## Version 0.1b0 (2025-09-24) - Add completion hook to genai utils to implement semconv v1.37. diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/completion_hook.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/completion_hook.py index 56d7b0dcd6..1d7cc3f3c0 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/completion_hook.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/_fsspec_upload/completion_hook.py @@ -24,6 +24,7 @@ from contextlib import ExitStack from dataclasses import asdict, dataclass from functools import partial +from os import environ from time import time from typing import Any, Callable, Final, Literal, TextIO, cast from uuid import uuid4 @@ -35,6 +36,9 @@ from opentelemetry.trace import Span from opentelemetry.util.genai import types from opentelemetry.util.genai.completion_hook import CompletionHook +from opentelemetry.util.genai.environment_variables import ( + OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT, +) GEN_AI_INPUT_MESSAGES_REF: Final = ( gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref" @@ -46,6 +50,10 @@ gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS + "_ref" ) +_MESSAGE_INDEX_KEY = "index" + +Format = Literal["json", "jsonl"] +_FORMATS: tuple[Format, ...] = ("json", "jsonl") _logger = logging.getLogger(__name__) @@ -94,10 +102,27 @@ def __init__( *, base_path: str, max_size: int = 20, + upload_format: Format | None = None, ) -> None: self._base_path = base_path self._max_size = max_size + if upload_format not in _FORMATS + (None,): + raise ValueError( + f"Invalid {upload_format=}. Must be one of {_FORMATS}" + ) + + if upload_format is None: + environ_format = environ.get( + OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT, "json" + ).lower() + if environ_format not in _FORMATS: + upload_format = "json" + else: + upload_format = environ_format + + self._format: Final[Literal["json", "jsonl"]] = upload_format + # Use a ThreadPoolExecutor for its queueing and thread management. The semaphore # limits the number of queued tasks. If the queue is full, data will be dropped. self._executor = ThreadPoolExecutor(max_workers=max_size) @@ -139,27 +164,39 @@ def _calculate_ref_path(self) -> CompletionRefs: uuid_str = str(uuid4()) return CompletionRefs( inputs_ref=posixpath.join( - self._base_path, f"{uuid_str}_inputs.json" + self._base_path, f"{uuid_str}_inputs.{self._format}" ), outputs_ref=posixpath.join( - self._base_path, f"{uuid_str}_outputs.json" + self._base_path, f"{uuid_str}_outputs.{self._format}" ), system_instruction_ref=posixpath.join( - self._base_path, f"{uuid_str}_system_instruction.json" + self._base_path, + f"{uuid_str}_system_instruction.{self._format}", ), ) - @staticmethod def _do_upload( - path: str, json_encodeable: Callable[[], JsonEncodeable] + self, path: str, json_encodeable: Callable[[], JsonEncodeable] ) -> None: + if self._format == "json": + # output as a single line with the json messages array + message_lines = [json_encodeable()] + else: + # output as one line per message in the array + message_lines = json_encodeable() + # add an index for streaming readers of jsonl + for message_idx, line in enumerate(message_lines): + line[_MESSAGE_INDEX_KEY] = message_idx + with fsspec_open(path, "w") as file: - json.dump( - json_encodeable(), - file, - separators=(",", ":"), - cls=Base64JsonEncoder, - ) + for message in message_lines: + json.dump( + message, + file, + separators=(",", ":"), + cls=Base64JsonEncoder, + ) + file.write("\n") def on_completion( self, diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py index 0ff089d82a..7c03169743 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py @@ -43,3 +43,13 @@ `_ for advanced use cases. """ + +OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT = ( + "OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT" +) +""" +.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT + +The format to use when uploading prompt and response data. Must be one of ``json`` or +``jsonl``. Defaults to ``json``. +""" diff --git a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py index 96c76d8458..2d390adfff 100644 --- a/util/opentelemetry-util-genai/tests/test_fsspec_upload.py +++ b/util/opentelemetry-util-genai/tests/test_fsspec_upload.py @@ -19,7 +19,6 @@ import sys import threading from contextlib import contextmanager -from dataclasses import asdict from typing import Any from unittest import TestCase from unittest.mock import MagicMock, patch @@ -76,6 +75,24 @@ def test_fsspec_entry_point_no_fsspec(self): role="user", parts=[types.Text(content="What is the capital of France?")], ), + types.InputMessage( + role="assistant", + parts=[ + types.ToolCall( + id="get_capital_0", + name="get_capital", + arguments={"city": "Paris"}, + ) + ], + ), + types.InputMessage( + role="user", + parts=[ + types.ToolCallResponse( + id="get_capital_0", response={"capital": "Paris"} + ) + ], + ), ] FAKE_OUTPUTS = [ types.OutputMessage( @@ -197,6 +214,44 @@ def test_failed_upload_logs(self): self.assertIn("fsspec uploader failed", logs.output[0]) + def test_invalid_upload_format(self): + with self.assertRaisesRegex(ValueError, "Invalid upload_format"): + FsspecUploadCompletionHook( + base_path=BASE_PATH, upload_format="invalid" + ) + + def test_parse_upload_format_envvar(self): + for envvar_value, expect in ( + ("", "json"), + ("json", "json"), + ("invalid", "json"), + ("jsonl", "jsonl"), + ("jSoNl", "jsonl"), + ): + with patch.dict( + "os.environ", + {"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": envvar_value}, + clear=True, + ): + hook = FsspecUploadCompletionHook(base_path=BASE_PATH) + self.addCleanup(hook.shutdown) + self.assertEqual( + hook._format, + expect, + f"expected upload format {expect=} with {envvar_value=} got {hook._format}", + ) + + with patch.dict( + "os.environ", + {"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": "json"}, + clear=True, + ): + hook = FsspecUploadCompletionHook( + base_path=BASE_PATH, upload_format="jsonl" + ) + self.addCleanup(hook.shutdown) + self.assertEqual(hook._format, "jsonl") + def test_upload_after_shutdown_logs(self): self.hook.shutdown() with self.assertLogs(level=logging.INFO) as logs: @@ -212,25 +267,15 @@ def test_upload_after_shutdown_logs(self): ) -class FsspecUploaderTest(TestCase): - def test_upload(self): - FsspecUploadCompletionHook._do_upload( - "memory://my_path", - lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS], - ) - - with fsspec.open("memory://my_path", "r") as file: - self.assertEqual( - file.read(), - '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]', - ) - - class TestFsspecUploadCompletionHookIntegration(TestBase): def setUp(self): super().setUp() self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH) + def create_hook(self) -> FsspecUploadCompletionHook: + self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH) + return self.hook + def tearDown(self): super().tearDown() self.hook.shutdown() @@ -271,15 +316,15 @@ def test_upload_completions(self): self.assert_fsspec_equal( span.attributes["gen_ai.input.messages_ref"], - '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]', + '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]},{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}]},{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}]}]\n', ) self.assert_fsspec_equal( span.attributes["gen_ai.output.messages_ref"], - '[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]', + '[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]\n', ) self.assert_fsspec_equal( span.attributes["gen_ai.system_instructions_ref"], - '[{"content":"You are a helpful assistant.","type":"text"}]', + '[{"content":"You are a helpful assistant.","type":"text"}]\n', ) def test_stamps_empty_log(self): @@ -316,5 +361,59 @@ def test_upload_bytes(self) -> None: self.assert_fsspec_equal( log_record.attributes["gen_ai.input.messages_ref"], - '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"},{"type":"generic_bytes","bytes":"aGVsbG8="}]}]', + '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"},{"type":"generic_bytes","bytes":"aGVsbG8="}]}]\n', + ) + + def test_upload_json(self) -> None: + hook = FsspecUploadCompletionHook( + base_path=BASE_PATH, upload_format="json" + ) + self.addCleanup(hook.shutdown) + log_record = LogRecord() + + hook.on_completion( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + log_record=log_record, + ) + hook.shutdown() + + ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"] + self.assertTrue( + ref_uri.endswith(".json"), f"{ref_uri=} does not end with .json" + ) + + self.assert_fsspec_equal( + ref_uri, + '[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]},{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}]},{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}]}]\n', + ) + + def test_upload_jsonlines(self) -> None: + hook = FsspecUploadCompletionHook( + base_path=BASE_PATH, upload_format="jsonl" + ) + self.addCleanup(hook.shutdown) + log_record = LogRecord() + + hook.on_completion( + inputs=FAKE_INPUTS, + outputs=FAKE_OUTPUTS, + system_instruction=FAKE_SYSTEM_INSTRUCTION, + log_record=log_record, + ) + hook.shutdown() + + ref_uri: str = log_record.attributes["gen_ai.input.messages_ref"] + self.assertTrue( + ref_uri.endswith(".jsonl"), f"{ref_uri=} does not end with .jsonl" + ) + + self.assert_fsspec_equal( + ref_uri, + """\ +{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}],"index":0} +{"role":"assistant","parts":[{"arguments":{"city":"Paris"},"name":"get_capital","id":"get_capital_0","type":"tool_call"}],"index":1} +{"role":"user","parts":[{"response":{"capital":"Paris"},"id":"get_capital_0","type":"tool_call_response"}],"index":2} +""", )