Skip to content

Commit

Permalink
Kinesis ResourceARN and ConsumerARN Parsing (#1302)
Browse files Browse the repository at this point in the history
* Add parsing for kinesis ResourceARN and ConsumerARN

* Add updated testing for kinesis

* Fixup: lint errors

---------

Co-authored-by: Hannah Stepanek <[email protected]>
  • Loading branch information
TimPansino and hmstepanek authored Feb 6, 2025
1 parent de08634 commit 9abd5a1
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 41 deletions.
51 changes: 35 additions & 16 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ def extract_sqs(*args, **kwargs):


def extract_kinesis(*args, **kwargs):
# The stream name can be passed as the StreamName or as part of the StreamARN.
stream_value = kwargs.get("StreamName", None)
if stream_value is None:
arn = kwargs.get("StreamARN", None)
if arn is not None:
stream_value = arn.split("/", 1)[-1]
return stream_value
# The stream name can be passed as the StreamName or as part of the StreamARN, ResourceARN, or ConsumerARN.
stream_name = kwargs.get("StreamName", None)
if stream_name is not None:
return stream_name

arn = kwargs.get("StreamARN", None) or kwargs.get("ResourceARN", None) or kwargs.get("ConsumerARN", None)
if arn is not None:
return arn.split("/")[1]


def extract_firehose(*args, **kwargs):
Expand All @@ -90,16 +91,18 @@ def extract_sqs_agent_attrs(instance, *args, **kwargs):


def extract_kinesis_agent_attrs(instance, *args, **kwargs):
# Try to capture AWS Kinesis ARN from the StreamARN parameter or by generating the ARN from various discoverable
# info. Log any exception to debug.
# Try to capture AWS Kinesis ARN from the StreamARN, ConsumerARN, or ResourceARN parameters, or by generating the
# ARN from various discoverable info. Log any exception to debug.
agent_attrs = {}
try:
stream_arn = kwargs.get("StreamARN", None)
if stream_arn:
if stream_arn is not None:
agent_attrs["cloud.platform"] = "aws_kinesis_data_streams"
agent_attrs["cloud.resource_id"] = stream_arn
else:
stream_name = kwargs.get("StreamName", None)
return agent_attrs

stream_name = kwargs.get("StreamName", None)
if stream_name is not None:
transaction = current_transaction()
settings = transaction.settings if transaction.settings else global_settings()
account_id = settings.cloud.aws.account_id if settings and settings.cloud.aws.account_id else None
Expand All @@ -109,6 +112,14 @@ def extract_kinesis_agent_attrs(instance, *args, **kwargs):
if stream_name and account_id and region:
agent_attrs["cloud.platform"] = "aws_kinesis_data_streams"
agent_attrs["cloud.resource_id"] = f"arn:aws:kinesis:{region}:{account_id}:stream/{stream_name}"

resource_arn = kwargs.get("ResourceARN", None) or kwargs.get("ConsumerARN", None)
if resource_arn is not None:
# Extract just the StreamARN out of ConsumerARNs.
agent_attrs["cloud.resource_id"] = "/".join(resource_arn.split("/")[0:2])
agent_attrs["cloud.platform"] = "aws_kinesis_data_streams"
return agent_attrs

except Exception as e:
_logger.debug("Failed to capture AWS Kinesis info.", exc_info=True)
return agent_attrs
Expand Down Expand Up @@ -1178,7 +1189,9 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "delete_resource_policy"): aws_function_trace("delete_resource_policy", library="Kinesis"),
("kinesis", "delete_resource_policy"): aws_function_trace(
"delete_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "delete_stream"): aws_function_trace(
"delete_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand Down Expand Up @@ -1210,7 +1223,9 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "get_resource_policy"): aws_function_trace("get_resource_policy", library="Kinesis"),
("kinesis", "get_resource_policy"): aws_function_trace(
"get_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_shard_iterator"): aws_function_trace(
"get_shard_iterator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1233,7 +1248,9 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("kinesis", "merge_shards"): aws_function_trace(
"merge_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_resource_policy"): aws_function_trace("put_resource_policy", library="Kinesis"),
("kinesis", "put_resource_policy"): aws_function_trace(
"put_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "register_stream_consumer"): aws_function_trace(
"register_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1249,7 +1266,9 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("kinesis", "stop_stream_encryption"): aws_function_trace(
"stop_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "subscribe_to_shard"): aws_function_trace("subscribe_to_shard", library="Kinesis"),
("kinesis", "subscribe_to_shard"): aws_function_trace(
"subscribe_to_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "update_shard_count"): aws_function_trace(
"update_shard_count", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand Down
102 changes: 77 additions & 25 deletions tests/external_botocore/test_boto3_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,65 +32,76 @@
MOTO_VERSION = get_package_version_tuple("moto")
BOTOCORE_VERSION = get_package_version_tuple("boto3")

URL = "kinesis.us-east-1.amazonaws.com"
AWS_ACCESS_KEY_ID = "AAAAAAAAAAAACCESSKEY"
AWS_SECRET_ACCESS_KEY = "AAAAAASECRETKEY" # nosec
AWS_REGION = "us-east-1"
AWS_ACCOUNT_ID = 123456789012

KINESIS_URL = "kinesis.us-east-1.amazonaws.com"
KINESIS_CONTROL_URL = f"{AWS_ACCOUNT_ID}.control-kinesis.{AWS_REGION}.amazonaws.com"
KINESIS_DATA_URL = f"{AWS_ACCOUNT_ID}.data-kinesis.{AWS_REGION}.amazonaws.com"
TEST_STREAM = f"python-agent-test-{uuid.uuid4()}"
EXPECTED_AGENT_ATTRS = {
"exact_agents": {
"cloud.platform": "aws_kinesis_data_streams",
"cloud.resource_id": f"arn:aws:kinesis:us-east-1:123456789012:stream/{TEST_STREAM}",
"cloud.resource_id": f"arn:aws:kinesis:us-east-1:{AWS_ACCOUNT_ID}:stream/{TEST_STREAM}",
},
}

AWS_ACCESS_KEY_ID = "AAAAAAAAAAAACCESSKEY"
AWS_SECRET_ACCESS_KEY = "AAAAAASECRETKEY" # nosec
AWS_REGION = "us-east-1"

_kinesis_scoped_metrics = [
(f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2),
(f"MessageBroker/Kinesis/Stream/Consume/Named/{TEST_STREAM}", 1),
(f"Kinesis/create_stream/{TEST_STREAM}", 1),
(f"Kinesis/list_streams", 1),
("Kinesis/list_streams", 1),
(f"Kinesis/describe_stream/{TEST_STREAM}", 1),
(f"Kinesis/put_resource_policy/{TEST_STREAM}", 2),
(f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1),
(f"Kinesis/delete_stream/{TEST_STREAM}", 1),
(f"External/{URL}/botocore/POST", 3),
(f"External/{KINESIS_URL}/botocore/POST", 3),
(f"External/{KINESIS_CONTROL_URL}/botocore/POST", 3),
(f"External/{KINESIS_DATA_URL}/botocore/POST", 1),
]
if BOTOCORE_VERSION < (1, 29, 0):
_kinesis_scoped_metrics = [
(f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2),
(f"Kinesis/create_stream/{TEST_STREAM}", 1),
(f"Kinesis/list_streams", 1),
("Kinesis/list_streams", 1),
(f"Kinesis/describe_stream/{TEST_STREAM}", 1),
(f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1),
(f"Kinesis/delete_stream/{TEST_STREAM}", 1),
(f"External/{URL}/botocore/POST", 5),
(f"External/{KINESIS_URL}/botocore/POST", 5),
]

_kinesis_rollup_metrics = [
(f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2),
(f"MessageBroker/Kinesis/Stream/Consume/Named/{TEST_STREAM}", 1),
(f"Kinesis/create_stream/{TEST_STREAM}", 1),
(f"Kinesis/list_streams", 1),
("Kinesis/list_streams", 1),
(f"Kinesis/describe_stream/{TEST_STREAM}", 1),
(f"Kinesis/put_resource_policy/{TEST_STREAM}", 2),
(f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1),
(f"Kinesis/delete_stream/{TEST_STREAM}", 1),
("External/all", 5),
("External/allOther", 5),
(f"External/{URL}/all", 3),
(f"External/{URL}/botocore/POST", 3),
("External/all", 7),
("External/allOther", 7),
(f"External/{KINESIS_URL}/all", 3),
(f"External/{KINESIS_URL}/botocore/POST", 3),
(f"External/{KINESIS_CONTROL_URL}/all", 3),
(f"External/{KINESIS_CONTROL_URL}/botocore/POST", 3),
(f"External/{KINESIS_DATA_URL}/all", 1),
(f"External/{KINESIS_DATA_URL}/botocore/POST", 1),
]
if BOTOCORE_VERSION < (1, 29, 0):
_kinesis_rollup_metrics = [
(f"MessageBroker/Kinesis/Stream/Produce/Named/{TEST_STREAM}", 2),
(f"Kinesis/create_stream/{TEST_STREAM}", 1),
(f"Kinesis/list_streams", 1),
("Kinesis/list_streams", 1),
(f"Kinesis/describe_stream/{TEST_STREAM}", 1),
(f"Kinesis/get_shard_iterator/{TEST_STREAM}", 1),
(f"Kinesis/delete_stream/{TEST_STREAM}", 1),
("External/all", 5),
("External/allOther", 5),
(f"External/{URL}/all", 5),
(f"External/{URL}/botocore/POST", 5),
(f"External/{KINESIS_URL}/all", 5),
(f"External/{KINESIS_URL}/botocore/POST", 5),
]

_kinesis_scoped_metrics_error = [
Expand All @@ -114,7 +125,13 @@ def test_instrumented_kinesis_methods():

ignored_methods = set(
("kinesis", method)
for method in ("generate_presigned_url", "close", "get_waiter", "can_paginate", "get_paginator")
for method in (
"generate_presigned_url",
"close",
"get_waiter",
"can_paginate",
"get_paginator",
)
)
client_methods = inspect.getmembers(client, predicate=inspect.ismethod)
methods = {("kinesis", name) for (name, method) in client_methods if not name.startswith("_")}
Expand All @@ -128,7 +145,7 @@ def test_instrumented_kinesis_methods():
@validate_span_events(exact_agents={"aws.operation": "CreateStream"}, count=1)
@validate_span_events(
**EXPECTED_AGENT_ATTRS,
count=6 if BOTOCORE_VERSION < (1, 29, 0) else 7,
count=6 if BOTOCORE_VERSION < (1, 29, 0) else 9,
)
@validate_span_events(exact_agents={"aws.operation": "DeleteStream"}, count=1)
@validate_transaction_metrics(
Expand All @@ -147,7 +164,11 @@ def test_kinesis():
region_name=AWS_REGION,
)
# Create stream
resp = client.create_stream(StreamName=TEST_STREAM, ShardCount=123, StreamModeDetails={"StreamMode": "on-demand"})
resp = client.create_stream(
StreamName=TEST_STREAM,
ShardCount=123,
StreamModeDetails={"StreamMode": "on-demand"},
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

# List streams
Expand All @@ -160,28 +181,55 @@ def test_kinesis():
Limit=123,
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
ARN = resp["StreamDescription"]["StreamARN"]
STREAM_ARN = resp["StreamDescription"]["StreamARN"]
CONSUMER_ARN = f"{STREAM_ARN}/consumer/my_consumer:123" # Mock ConsumerARN

# StreamARN is not supported in older versions of botocore.
stream_kwargs = {"StreamName": TEST_STREAM} if BOTOCORE_VERSION < (1, 29, 0) else {"StreamARN": ARN}
stream_kwargs = {"StreamName": TEST_STREAM} if BOTOCORE_VERSION < (1, 29, 0) else {"StreamARN": STREAM_ARN}

# Send message
resp = client.put_record(Data=b"foo1", PartitionKey="bar", **stream_kwargs)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

# Send messages
resp = client.put_records(
Records=[{"Data": b"foo2", "PartitionKey": "bar"}, {"Data": b"foo3", "PartitionKey": "bar"}], **stream_kwargs
Records=[
{"Data": b"foo2", "PartitionKey": "bar"},
{"Data": b"foo3", "PartitionKey": "bar"},
],
**stream_kwargs,
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

# Shards
shard_iter = client.get_shard_iterator(
ShardId="shardId-000000000000",
ShardIteratorType="AT_SEQUENCE_NUMBER",
StartingSequenceNumber="0",
**stream_kwargs,
)["ShardIterator"]

# TODO: Unfortunately we can't test client.subscribe_to_shard() yet as moto has not implemented it.
# It's the only method that uses ConsumerARN as a parameter name, so extracting that parameter can't be tested.
# ResourceARN, however, can be tested and can be either a StreamARN or ConsumerARN format. We can therefore
# at least cover the parsing of ConsumerARNs for the underlying stream by exercising that.

if BOTOCORE_VERSION >= (1, 29, 0):
# This was only made available in Botocore 1.29.0, no way to test ResourceARN before that
# Use ResourceARN as StreamARN
resp = client.put_resource_policy(
ResourceARN=STREAM_ARN,
Policy="some policy",
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

# Use ResourceARN as ConsumerARN
resp = client.put_resource_policy(
ResourceARN=CONSUMER_ARN,
Policy="some policy",
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

# Receive message
if BOTOCORE_VERSION < (1, 29, 0):
resp = client.get_records(ShardIterator=shard_iter)
Expand Down Expand Up @@ -214,7 +262,11 @@ def test_kinesis_error():
region_name=AWS_REGION,
)
# Create stream
resp = client.create_stream(StreamName=TEST_STREAM, ShardCount=123, StreamModeDetails={"StreamMode": "on-demand"})
resp = client.create_stream(
StreamName=TEST_STREAM,
ShardCount=123,
StreamModeDetails={"StreamMode": "on-demand"},
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

# Stream ARN is needed for rest of methods.
Expand Down

0 comments on commit 9abd5a1

Please sign in to comment.