diff --git a/CHANGELOG.md b/CHANGELOG.md index bcb62e443..6f98dd45c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,5 +13,7 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t ## Unreleased - Add botocore instrumentation extension for Bedrock AgentCore services with span attributes ([#490](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/490)) +- [PATCH] Only decode JSON input buffer in Anthropic Claude streaming + ([#497](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/497)) - Fix timeout handling for exceeded deadline in retry logic in OTLPAwsLogsExporter ([#501](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/501)) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index 900ce688f..a415f6148 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -334,8 +334,89 @@ def patched_extract_tool_calls( tool_calls.append(tool_call) return tool_calls + # TODO: The following code is to patch a bedrock bug that was fixed in + # opentelemetry-instrumentation-botocore==0.60b0 in: + # https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3875 + # Remove this code once we've bumped opentelemetry-instrumentation-botocore dependency to 0.60b0 + def patched_process_anthropic_claude_chunk(self, chunk): + # pylint: disable=too-many-return-statements,too-many-branches + if not (message_type := chunk.get("type")): + return + + if message_type == "message_start": + # {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant', + # 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None, + # 'usage': {'input_tokens': 18, 'output_tokens': 1}}} + if chunk.get("message", {}).get("role") == "assistant": + self._record_message = True + message = chunk["message"] + self._message = { + "role": message["role"], + "content": message.get("content", []), + } + return + + if message_type == "content_block_start": + # {'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}} + # {'type': 'content_block_start', 'index': 1, 'content_block': + # {'type': 'tool_use', 'id': 'id', 'name': 'func_name', 'input': {}}} + if self._record_message: + block = chunk.get("content_block", {}) + if block.get("type") == "text": + self._content_block = block + elif block.get("type") == "tool_use": + self._content_block = block + return + + if message_type == "content_block_delta": + # {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}} + # {'type': 'content_block_delta', 'index': 1, 'delta': {'type': 'input_json_delta', 'partial_json': ''}} + if self._record_message: + delta = chunk.get("delta", {}) + if delta.get("type") == "text_delta": + self._content_block["text"] += delta.get("text", "") + elif delta.get("type") == "input_json_delta": + self._tool_json_input_buf += delta.get("partial_json", "") + return + + if message_type == "content_block_stop": + # {'type': 'content_block_stop', 'index': 0} + if self._tool_json_input_buf: + try: + self._content_block["input"] = json.loads(self._tool_json_input_buf) + except json.JSONDecodeError: + self._content_block["input"] = self._tool_json_input_buf + self._message["content"].append(self._content_block) + self._content_block = {} + self._tool_json_input_buf = "" + return + + if message_type == "message_delta": + # {'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, + # 'usage': {'output_tokens': 123}} + if (stop_reason := chunk.get("delta", {}).get("stop_reason")) is not None: + self._response["stopReason"] = stop_reason + return + + if message_type == "message_stop": + # {'type': 'message_stop', 'amazon-bedrock-invocationMetrics': + # {'inputTokenCount': 18, 'outputTokenCount': 123, 'invocationLatency': 5250, 'firstByteLatency': 290}} + if invocation_metrics := chunk.get("amazon-bedrock-invocationMetrics"): + self._process_invocation_metrics(invocation_metrics) + + if self._record_message: + self._response["output"] = {"message": self._message} + self._record_message = False + self._message = None + + self._stream_done_callback(self._response) + return + bedrock_utils.ConverseStreamWrapper.__init__ = patched_init bedrock_utils.ConverseStreamWrapper._process_event = patched_process_event + bedrock_utils.InvokeModelWithResponseStreamWrapper._process_anthropic_claude_chunk = ( + patched_process_anthropic_claude_chunk + ) bedrock_utils.extract_tool_calls = patched_extract_tool_calls # END The OpenTelemetry Authors code diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_instrumentation_patch.py index 979f79f7b..7adaaef50 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_instrumentation_patch.py @@ -120,10 +120,6 @@ def _run_patch_behaviour_tests(self): self._test_unpatched_botocore_propagator() self._test_unpatched_gevent_instrumentation() self._test_unpatched_starlette_instrumentation() - # TODO: remove these tests once we bump botocore instrumentation version to 0.56b0 - # Bedrock Runtime tests - self._test_unpatched_converse_stream_wrapper() - self._test_unpatched_extract_tool_calls() # Apply patches apply_instrumentation_patches() @@ -222,6 +218,16 @@ def _test_unpatched_botocore_instrumentation(self): # DynamoDB self.assertTrue("dynamodb" in _KNOWN_EXTENSIONS, "Upstream has removed a DynamoDB extension") + # Bedrock Runtime tests + # TODO: remove these tests once we bump botocore instrumentation version to 0.56b0 + self._test_unpatched_converse_stream_wrapper() + self._test_unpatched_extract_tool_calls() + + # TODO: remove these tests once we bump botocore instrumentation version to 0.60b0 + self._test_unpatched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"}) + self._test_unpatched_process_anthropic_claude_chunk(None, None) + self._test_unpatched_process_anthropic_claude_chunk({}, {}) + def _test_unpatched_gevent_instrumentation(self): self.assertFalse(gevent.monkey.is_module_patched("os"), "gevent os module has been patched") self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched") @@ -267,10 +273,14 @@ def _test_patched_botocore_instrumentation(self): # Bedrock Agent Operation self._test_patched_bedrock_agent_instrumentation() - # TODO: remove these tests once we bump botocore instrumentation version to 0.56b0 # Bedrock Runtime + # TODO: remove these tests once we bump botocore instrumentation version to 0.56b0 self._test_patched_converse_stream_wrapper() self._test_patched_extract_tool_calls() + # TODO: remove these tests once we bump botocore instrumentation version to 0.60b0 + self._test_patched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"}) + self._test_patched_process_anthropic_claude_chunk(None, None) + self._test_patched_process_anthropic_claude_chunk({}, {}) # Bedrock Agent Runtime self.assertTrue("bedrock-agent-runtime" in _KNOWN_EXTENSIONS) @@ -679,6 +689,95 @@ def _test_patched_extract_tool_calls(self): result = bedrock_utils.extract_tool_calls(message_with_string_content, True) self.assertIsNone(result) + # Test with toolUse format to exercise the for loop + message_with_tool_use = {"role": "assistant", "content": [{"toolUse": {"toolUseId": "id1", "name": "func1"}}]} + result = bedrock_utils.extract_tool_calls(message_with_tool_use, True) + self.assertEqual(len(result), 1) + + # Test with tool_use format to exercise the for loop + message_with_type_tool_use = { + "role": "assistant", + "content": [{"type": "tool_use", "id": "id2", "name": "func2"}], + } + result = bedrock_utils.extract_tool_calls(message_with_type_tool_use, True) + self.assertEqual(len(result), 1) + + def _test_patched_process_anthropic_claude_chunk( + self, input_value: Dict[str, str], expected_output: Dict[str, str] + ): + self._test_process_anthropic_claude_chunk(input_value, expected_output, False) + + def _test_unpatched_process_anthropic_claude_chunk( + self, input_value: Dict[str, str], expected_output: Dict[str, str] + ): + self._test_process_anthropic_claude_chunk(input_value, expected_output, True) + + def _test_process_anthropic_claude_chunk( + self, input_value: Dict[str, str], expected_output: Dict[str, str], expect_exception: bool + ): + """Test that _process_anthropic_claude_chunk handles various tool_use input formats.""" + wrapper = bedrock_utils.InvokeModelWithResponseStreamWrapper( + stream=MagicMock(), + stream_done_callback=MagicMock, + stream_error_callback=MagicMock, + model_id="anthropic.claude-3-5-sonnet-20240620-v1:0", + ) + + # Simulate message_start + wrapper._process_anthropic_claude_chunk( + { + "type": "message_start", + "message": { + "role": "assistant", + "content": [], + }, + } + ) + + # Simulate content_block_start with specified input + content_block = { + "type": "tool_use", + "id": "test_id", + "name": "test_tool", + } + if input_value is not None: + content_block["input"] = input_value + + wrapper._process_anthropic_claude_chunk( + { + "type": "content_block_start", + "index": 0, + "content_block": content_block, + } + ) + + # Simulate content_block_stop + try: + wrapper._process_anthropic_claude_chunk({"type": "content_block_stop", "index": 0}) + except TypeError: + if expect_exception: + return + else: + raise + + # Verify the message content + self.assertEqual(len(wrapper._message["content"]), 1) + tool_block = wrapper._message["content"][0] + self.assertEqual(tool_block["type"], "tool_use") + self.assertEqual(tool_block["id"], "test_id") + self.assertEqual(tool_block["name"], "test_tool") + + if expected_output is not None: + self.assertEqual(tool_block["input"], expected_output) + self.assertIsInstance(tool_block["input"], dict) + else: + self.assertNotIn("input", tool_block) + + # Just adding this to do basic sanity checks and increase code coverage + wrapper._process_anthropic_claude_chunk({"type": "content_block_delta", "index": 0}) + wrapper._process_anthropic_claude_chunk({"type": "message_delta"}) + wrapper._process_anthropic_claude_chunk({"type": "message_stop"}) + def _test_patched_bedrock_agent_instrumentation(self): """For bedrock-agent service, both extract_attributes and on_success provides attributes, the attributes depend on the API being invoked."""