Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t
## Unreleased
- Add botocore instrumentation extension for Bedrock AgentCore services with span attributes
([#490](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/490))
- [PATCH] Only decode JSON input buffer in Anthropic Claude streaming
([#497](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/497))
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,89 @@ def patched_extract_tool_calls(
tool_calls.append(tool_call)
return tool_calls

# TODO: The following code is to patch a bedrock bug that was fixed in
# opentelemetry-instrumentation-botocore==0.60b0 in:
# https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3875
# Remove this code once we've bumped opentelemetry-instrumentation-botocore dependency to 0.60b0
def patched_process_anthropic_claude_chunk(self, chunk):
# pylint: disable=too-many-return-statements,too-many-branches
if not (message_type := chunk.get("type")):
return

if message_type == "message_start":
# {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant',
# 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None,
# 'usage': {'input_tokens': 18, 'output_tokens': 1}}}
if chunk.get("message", {}).get("role") == "assistant":
self._record_message = True
message = chunk["message"]
self._message = {
"role": message["role"],
"content": message.get("content", []),
}
return

if message_type == "content_block_start":
# {'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}}
# {'type': 'content_block_start', 'index': 1, 'content_block':
# {'type': 'tool_use', 'id': 'id', 'name': 'func_name', 'input': {}}}
if self._record_message:
block = chunk.get("content_block", {})
if block.get("type") == "text":
self._content_block = block
elif block.get("type") == "tool_use":
self._content_block = block
return

if message_type == "content_block_delta":
# {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}}
# {'type': 'content_block_delta', 'index': 1, 'delta': {'type': 'input_json_delta', 'partial_json': ''}}
if self._record_message:
delta = chunk.get("delta", {})
if delta.get("type") == "text_delta":
self._content_block["text"] += delta.get("text", "")
elif delta.get("type") == "input_json_delta":
self._tool_json_input_buf += delta.get("partial_json", "")
return

if message_type == "content_block_stop":
# {'type': 'content_block_stop', 'index': 0}
if self._tool_json_input_buf:
try:
self._content_block["input"] = json.loads(self._tool_json_input_buf)
except json.JSONDecodeError:
self._content_block["input"] = self._tool_json_input_buf
self._message["content"].append(self._content_block)
self._content_block = {}
self._tool_json_input_buf = ""
return

if message_type == "message_delta":
# {'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None},
# 'usage': {'output_tokens': 123}}
if (stop_reason := chunk.get("delta", {}).get("stop_reason")) is not None:
self._response["stopReason"] = stop_reason
return

if message_type == "message_stop":
# {'type': 'message_stop', 'amazon-bedrock-invocationMetrics':
# {'inputTokenCount': 18, 'outputTokenCount': 123, 'invocationLatency': 5250, 'firstByteLatency': 290}}
if invocation_metrics := chunk.get("amazon-bedrock-invocationMetrics"):
self._process_invocation_metrics(invocation_metrics)

if self._record_message:
self._response["output"] = {"message": self._message}
self._record_message = False
self._message = None

self._stream_done_callback(self._response)
return

bedrock_utils.ConverseStreamWrapper.__init__ = patched_init
bedrock_utils.ConverseStreamWrapper._process_event = patched_process_event
bedrock_utils.InvokeModelWithResponseStreamWrapper._process_anthropic_claude_chunk = (
patched_process_anthropic_claude_chunk
)
bedrock_utils.extract_tool_calls = patched_extract_tool_calls

# END The OpenTelemetry Authors code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ def _run_patch_behaviour_tests(self):
self._test_unpatched_botocore_propagator()
self._test_unpatched_gevent_instrumentation()
self._test_unpatched_starlette_instrumentation()
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
# Bedrock Runtime tests
self._test_unpatched_converse_stream_wrapper()
self._test_unpatched_extract_tool_calls()

# Apply patches
apply_instrumentation_patches()
Expand Down Expand Up @@ -222,6 +218,16 @@ def _test_unpatched_botocore_instrumentation(self):
# DynamoDB
self.assertTrue("dynamodb" in _KNOWN_EXTENSIONS, "Upstream has removed a DynamoDB extension")

# Bedrock Runtime tests
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
self._test_unpatched_converse_stream_wrapper()
self._test_unpatched_extract_tool_calls()

# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
self._test_unpatched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
self._test_unpatched_process_anthropic_claude_chunk(None, None)
self._test_unpatched_process_anthropic_claude_chunk({}, {})

def _test_unpatched_gevent_instrumentation(self):
self.assertFalse(gevent.monkey.is_module_patched("os"), "gevent os module has been patched")
self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched")
Expand Down Expand Up @@ -267,10 +273,14 @@ def _test_patched_botocore_instrumentation(self):
# Bedrock Agent Operation
self._test_patched_bedrock_agent_instrumentation()

# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
# Bedrock Runtime
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
self._test_patched_converse_stream_wrapper()
self._test_patched_extract_tool_calls()
# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
self._test_patched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
self._test_patched_process_anthropic_claude_chunk(None, None)
self._test_patched_process_anthropic_claude_chunk({}, {})

# Bedrock Agent Runtime
self.assertTrue("bedrock-agent-runtime" in _KNOWN_EXTENSIONS)
Expand Down Expand Up @@ -679,6 +689,77 @@ def _test_patched_extract_tool_calls(self):
result = bedrock_utils.extract_tool_calls(message_with_string_content, True)
self.assertIsNone(result)

def _test_patched_process_anthropic_claude_chunk(
self, input_value: Dict[str, str], expected_output: Dict[str, str]
):
self._test_process_anthropic_claude_chunk(input_value, expected_output, False)

def _test_unpatched_process_anthropic_claude_chunk(
self, input_value: Dict[str, str], expected_output: Dict[str, str]
):
self._test_process_anthropic_claude_chunk(input_value, expected_output, True)

def _test_process_anthropic_claude_chunk(
self, input_value: Dict[str, str], expected_output: Dict[str, str], expect_exception: bool
):
"""Test that _process_anthropic_claude_chunk handles various tool_use input formats."""
wrapper = bedrock_utils.InvokeModelWithResponseStreamWrapper(
stream=MagicMock(),
stream_done_callback=MagicMock,
stream_error_callback=MagicMock,
model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
)

# Simulate message_start
wrapper._process_anthropic_claude_chunk(
{
"type": "message_start",
"message": {
"role": "assistant",
"content": [],
},
}
)

# Simulate content_block_start with specified input
content_block = {
"type": "tool_use",
"id": "test_id",
"name": "test_tool",
}
if input_value is not None:
content_block["input"] = input_value

wrapper._process_anthropic_claude_chunk(
{
"type": "content_block_start",
"index": 0,
"content_block": content_block,
}
)

# Simulate content_block_stop
try:
wrapper._process_anthropic_claude_chunk({"type": "content_block_stop", "index": 0})
except TypeError:
if expect_exception:
return
else:
raise

# Verify the message content
self.assertEqual(len(wrapper._message["content"]), 1)
tool_block = wrapper._message["content"][0]
self.assertEqual(tool_block["type"], "tool_use")
self.assertEqual(tool_block["id"], "test_id")
self.assertEqual(tool_block["name"], "test_tool")

if expected_output is not None:
self.assertEqual(tool_block["input"], expected_output)
self.assertIsInstance(tool_block["input"], dict)
else:
self.assertNotIn("input", tool_block)

def _test_patched_bedrock_agent_instrumentation(self):
"""For bedrock-agent service, both extract_attributes and on_success provides attributes,
the attributes depend on the API being invoked."""
Expand Down
Loading