Skip to content

Commit

Permalink
AWS Kinesis Delivery Streams (Firehose) Entity Mapping (#1300)
Browse files Browse the repository at this point in the history
* Fix tox naming convention for botocore

* Firehose instrumentation

* Firehose testing

* Fix broken s3transfer test

* Correct naming of firehose ARN

* Clean up boto3 tests

* Correct naming of list_delivery_streams

* Convert firehose message traces to function traces

* Correct behaviors of kinesis operations without stream targets

* Control flow tweaks
  • Loading branch information
TimPansino authored Feb 5, 2025
1 parent 819b6d7 commit de08634
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 40 deletions.
116 changes: 89 additions & 27 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,18 @@ def extract_sqs(*args, **kwargs):

def extract_kinesis(*args, **kwargs):
# The stream name can be passed as the StreamName or as part of the StreamARN.
stream_value = kwargs.get("StreamName", "Unknown")
if stream_value == "Unknown":
stream_value = kwargs.get("StreamName", None)
if stream_value is None:
arn = kwargs.get("StreamARN", None)
if arn:
if arn is not None:
stream_value = arn.split("/", 1)[-1]
return stream_value


def extract_firehose(*args, **kwargs):
return kwargs.get("DeliveryStreamName", None)


def extract_sqs_agent_attrs(instance, *args, **kwargs):
# Try to capture AWS SQS info as agent attributes. Log any exception to debug.
agent_attrs = {}
Expand Down Expand Up @@ -110,6 +114,29 @@ def extract_kinesis_agent_attrs(instance, *args, **kwargs):
return agent_attrs


def extract_firehose_agent_attrs(instance, *args, **kwargs):
# Try to generate AWS Kinesis Delivery Stream (Firehose) ARN from the DeliveryStreamName parameter and from various
# discoverable info. Log any exception to debug.
agent_attrs = {}
try:
stream_name = kwargs.get("DeliveryStreamName", None)
if stream_name:
transaction = current_transaction()
settings = transaction.settings if transaction.settings else global_settings()
account_id = settings.cloud.aws.account_id if settings and settings.cloud.aws.account_id else None
region = None
if hasattr(instance, "_client_config") and hasattr(instance._client_config, "region_name"):
region = instance._client_config.region_name
if account_id and region:
agent_attrs["cloud.platform"] = "aws_kinesis_delivery_streams"
agent_attrs["cloud.resource_id"] = (
f"arn:aws:firehose:{region}:{account_id}:deliverystream/{stream_name}"
)
except Exception as e:
_logger.debug("Failed to capture AWS Kinesis Delivery Stream (Firehose) info.", exc_info=True)
return agent_attrs


def extract(argument_names, default=None):
def extractor_list(*args, **kwargs):
for argument_name in argument_names:
Expand Down Expand Up @@ -991,7 +1018,7 @@ def _nr_dynamodb_datastore_trace_wrapper_(wrapped, instance, args, kwargs):

def aws_function_trace(
operation,
destination_name,
destination_name=None,
params={},
terminal=False,
async_wrapper=None,
Expand All @@ -1008,19 +1035,20 @@ def _nr_aws_function_trace_wrapper_(wrapped, instance, args, kwargs):
else:
parent = None

_destination_name = destination_name(*args, **kwargs)
_destination_name = destination_name(*args, **kwargs) if destination_name is not None else None
name = f"{operation}/{_destination_name}" if _destination_name else operation

trace = FunctionTrace(
name=_destination_name,
group=f"{library}/{operation}",
name=name,
group=library,
params=params,
terminal=terminal,
parent=parent,
source=wrapped,
)

# Attach extracted agent attributes.
_agent_attrs = extract_agent_attrs(instance, *args, **kwargs)
_agent_attrs = extract_agent_attrs(instance, *args, **kwargs) if extract_agent_attrs is not None else {}
trace.agent_attributes.update(_agent_attrs)

if wrapper: # pylint: disable=W0125,W0126
Expand Down Expand Up @@ -1055,7 +1083,7 @@ def _nr_aws_message_trace_wrapper_(wrapped, instance, args, kwargs):
_library = library
_operation = operation
_destination_type = destination_type
_destination_name = destination_name(*args, **kwargs)
_destination_name = destination_name(*args, **kwargs) or "Unknown"

trace = MessageTrace(
_library,
Expand Down Expand Up @@ -1150,9 +1178,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "delete_resource_policy"): aws_function_trace(
"delete_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "delete_resource_policy"): aws_function_trace("delete_resource_policy", library="Kinesis"),
("kinesis", "delete_stream"): aws_function_trace(
"delete_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1162,9 +1188,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "describe_limits"): aws_function_trace(
"describe_limits", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "describe_limits"): aws_function_trace("describe_limits", library="Kinesis"),
("kinesis", "describe_stream"): aws_function_trace(
"describe_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1186,9 +1210,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
extract_agent_attrs=extract_kinesis_agent_attrs,
library="Kinesis",
),
("kinesis", "get_resource_policy"): aws_function_trace(
"get_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "get_resource_policy"): aws_function_trace("get_resource_policy", library="Kinesis"),
("kinesis", "get_shard_iterator"): aws_function_trace(
"get_shard_iterator", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1204,18 +1226,14 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("kinesis", "list_stream_consumers"): aws_function_trace(
"list_stream_consumers", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_streams"): aws_function_trace(
"list_streams", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "list_streams"): aws_function_trace("list_streams", library="Kinesis"),
("kinesis", "list_tags_for_stream"): aws_function_trace(
"list_tags_for_stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "merge_shards"): aws_function_trace(
"merge_shards", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_resource_policy"): aws_function_trace(
"put_resource_policy", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "put_resource_policy"): aws_function_trace("put_resource_policy", library="Kinesis"),
("kinesis", "register_stream_consumer"): aws_function_trace(
"register_stream_consumer", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1231,9 +1249,7 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("kinesis", "stop_stream_encryption"): aws_function_trace(
"stop_stream_encryption", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "subscribe_to_shard"): aws_function_trace(
"subscribe_to_shard", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("kinesis", "subscribe_to_shard"): aws_function_trace("subscribe_to_shard", library="Kinesis"),
("kinesis", "update_shard_count"): aws_function_trace(
"update_shard_count", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
Expand All @@ -1249,6 +1265,52 @@ def wrap_serialize_to_request(wrapped, instance, args, kwargs):
("kinesis", "get_records"): aws_message_trace(
"Consume", "Stream", extract_kinesis, extract_agent_attrs=extract_kinesis_agent_attrs, library="Kinesis"
),
("firehose", "create_delivery_stream"): aws_function_trace(
"create_delivery_stream", extract_firehose, extract_agent_attrs=extract_firehose_agent_attrs, library="Firehose"
),
("firehose", "delete_delivery_stream"): aws_function_trace(
"delete_delivery_stream", extract_firehose, extract_agent_attrs=extract_firehose_agent_attrs, library="Firehose"
),
("firehose", "describe_delivery_stream"): aws_function_trace(
"describe_delivery_stream",
extract_firehose,
extract_agent_attrs=extract_firehose_agent_attrs,
library="Firehose",
),
("firehose", "list_delivery_streams"): aws_function_trace("list_delivery_streams", library="Firehose"),
("firehose", "list_tags_for_delivery_stream"): aws_function_trace(
"list_tags_for_delivery_stream",
extract_firehose,
extract_agent_attrs=extract_firehose_agent_attrs,
library="Firehose",
),
("firehose", "put_record"): aws_function_trace(
"put_record", extract_firehose, extract_agent_attrs=extract_firehose_agent_attrs, library="Firehose"
),
("firehose", "put_record_batch"): aws_function_trace(
"put_record_batch", extract_firehose, extract_agent_attrs=extract_firehose_agent_attrs, library="Firehose"
),
("firehose", "start_delivery_stream_encryption"): aws_function_trace(
"start_delivery_stream_encryption",
extract_firehose,
extract_agent_attrs=extract_firehose_agent_attrs,
library="Firehose",
),
("firehose", "stop_delivery_stream_encryption"): aws_function_trace(
"stop_delivery_stream_encryption",
extract_firehose,
extract_agent_attrs=extract_firehose_agent_attrs,
library="Firehose",
),
("firehose", "tag_delivery_stream"): aws_function_trace(
"tag_delivery_stream", extract_firehose, extract_agent_attrs=extract_firehose_agent_attrs, library="Firehose"
),
("firehose", "untag_delivery_stream"): aws_function_trace(
"untag_delivery_stream", extract_firehose, extract_agent_attrs=extract_firehose_agent_attrs, library="Firehose"
),
("firehose", "update_destination"): aws_function_trace(
"update_destination", extract_firehose, extract_agent_attrs=extract_firehose_agent_attrs, library="Firehose"
),
("sqs", "send_message"): aws_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_sqs_agent_attrs, library="SQS"
),
Expand Down
Loading

0 comments on commit de08634

Please sign in to comment.