From be306045b502c013848f83fa5b329f566b8635a4 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Tue, 16 Sep 2025 13:19:33 -0400 Subject: [PATCH 01/13] perf(telemetry): batch telemetry events into one request --- ddtrace/internal/telemetry/writer.py | 247 ++++++++++++++++----------- tests/conftest.py | 39 +++-- tests/telemetry/test_telemetry.py | 2 +- tests/telemetry/test_writer.py | 191 +++++---------------- 4 files changed, 223 insertions(+), 256 deletions(-) diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index e36fcfa3320..2e87ff55058 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -31,10 +31,8 @@ from ..utils.version import version as tracer_version from . import modules from .constants import TELEMETRY_APM_PRODUCT -from .constants import TELEMETRY_LOG_LEVEL # noqa:F401 +from .constants import TELEMETRY_LOG_LEVEL from .constants import TELEMETRY_NAMESPACE -from .constants import TELEMETRY_TYPE_DISTRIBUTION -from .constants import TELEMETRY_TYPE_GENERATE_METRICS from .constants import TELEMETRY_TYPE_LOGS from .data import get_application from .data import get_host_info @@ -207,9 +205,8 @@ def __init__(self, is_periodic=True, agentless=None): # This will occur when the agent writer starts. self.enable() # Force app started for unit tests - if config.FORCE_START: - self._app_started() - # Send logged error to telemetry + if config.FORCE_START and (app_started := self._app_started()): + self._events_queue.append(app_started) get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self)) def enable(self): @@ -264,18 +261,17 @@ def add_event(self, payload, payload_type): Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change """ if self.enable(): - event = { - "tracer_time": int(time.time()), - "runtime_id": get_runtime_id(), - "api_version": "v2", - "seq_id": next(self._sequence_payloads), - "debug": self._debug, - "application": get_application(config.SERVICE, config.VERSION, config.ENV), - "host": get_host_info(), - "payload": payload, - "request_type": payload_type, - } - self._events_queue.append(event) + with self._service_lock: + self._events_queue.append({"payload": payload, "request_type": payload_type}) + + def add_events(self, events): + # type: (List[Dict[str, Any]]) -> None + """ + Adds a list of Telemetry events to the TelemetryWriter event buffer + """ + if self.enable(): + with self._service_lock: + self._events_queue.extend(events) def add_integration(self, integration_name, patched, auto_patched=None, error_msg=None, version=""): # type: (str, bool, Optional[bool], Optional[str], Optional[str]) -> None @@ -285,6 +281,9 @@ def add_integration(self, integration_name, patched, auto_patched=None, error_ms :param str integration_name: name of patched module :param bool auto_enabled: True if module is enabled in _monkey.PATCH_MODULES """ + if not self.enable(): + return + # Integrations can be patched before the telemetry writer is enabled. with self._service_lock: if integration_name not in self._integrations_queue: @@ -310,11 +309,11 @@ def add_error(self, code, msg, filename, line_number): self._error = (code, msg) def _app_started(self, register_app_shutdown=True): - # type: (bool) -> None + # type: (bool) -> Optional[Dict[str, Any]] """Sent when TelemetryWriter is enabled or forks""" if self._forked or self.started: # app-started events should only be sent by the main process - return + return None # List of configurations to be collected self.started = True @@ -345,10 +344,10 @@ def _app_started(self, register_app_shutdown=True): # Reset the error after it has been reported. self._error = (0, "") - self.add_event(payload, "app-started") + return {"payload": payload, "request_type": "app-started"} def _app_heartbeat_event(self): - # type: () -> None + # type: () -> Dict[str, Any] if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: self._extended_time += self._extended_heartbeat_interval self._app_dependencies_loaded_event() @@ -357,26 +356,29 @@ def _app_heartbeat_event(self): {"name": name, "version": version} for name, version in self._imported_dependencies.items() ] } - self.add_event(payload, "app-extended-heartbeat") + request_type = "app-extended-heartbeat" else: - self.add_event({}, "app-heartbeat") + payload = {} + request_type = "app-heartbeat" + return {"payload": payload, "request_type": request_type} def _app_closing_event(self): - # type: () -> None + # type: () -> Optional[Dict[str, Any]] """Adds a Telemetry event which notifies the agent that an application instance has terminated""" if self._forked: # app-closing event should only be sent by the main process - return - payload = {} # type: Dict - self.add_event(payload, "app-closing") + return None + return {"payload": {}, "request_type": "app-closing"} def _app_integrations_changed_event(self, integrations): - # type: (List[Dict]) -> None + # type: (List[Dict]) -> Dict """Adds a Telemetry event which sends a list of configured integrations to the agent""" - payload = { - "integrations": integrations, + return { + "payload": { + "integrations": integrations, + }, + "request_type": "app-integrations-change", } - self.add_event(payload, "app-integrations-change") def _flush_integrations_queue(self): # type: () -> List[Dict] @@ -395,32 +397,32 @@ def _flush_configuration_queue(self): return configurations def _app_client_configuration_changed_event(self, configurations): - # type: (List[Dict]) -> None + # type: (List[Dict]) -> Dict[str, Any] """Adds a Telemetry event which sends list of modified configurations to the agent""" - payload = { - "configuration": configurations, + return { + "payload": { + "configuration": configurations, + }, + "request_type": "app-client-configuration-change", } - self.add_event(payload, "app-client-configuration-change") def _app_dependencies_loaded_event(self): + # type: () -> Optional[Dict[str, Any]] """Adds events to report imports done since the last periodic run""" - if not config.DEPENDENCY_COLLECTION or not self._enabled: - return + return None with self._service_lock: newly_imported_deps = modules.get_newly_imported_modules(self._modules_already_imported) if not newly_imported_deps: - return + return None with self._service_lock: - packages = update_imported_dependencies(self._imported_dependencies, newly_imported_deps) - - if packages: - payload = {"dependencies": packages} - self.add_event(payload, "app-dependencies-loaded") + if packages := update_imported_dependencies(self._imported_dependencies, newly_imported_deps): + return {"payload": {"dependencies": packages}, "request_type": "app-dependencies-loaded"} + return None - def _add_endpoints_event(self): + def _flush_app_endpoints(self): """Adds a Telemetry event which sends the list of HTTP endpoints found at startup to the agent""" import ddtrace.settings.asm as asm_config_module @@ -432,24 +434,25 @@ def _add_endpoints_event(self): with self._service_lock: payload = endpoint_collection.flush(asm_config_module.config._api_security_endpoint_collection_limit) - - self.add_event(payload, "app-endpoints") + return {"payload": payload, "request_type": "app-endpoints"} def _app_product_change(self): - # type: () -> None + # type: () -> Optional[Dict[str, Any]] """Adds a Telemetry event which reports the enablement of an APM product""" if not self._send_product_change_updates: - return + return None - payload = { - "products": { - product: {"version": tracer_version, "enabled": status} - for product, status in self._product_enablement.items() - } - } - self.add_event(payload, "app-product-change") self._send_product_change_updates = False + return { + "payload": { + "products": { + product: {"version": tracer_version, "enabled": status} + for product, status in self._product_enablement.items() + } + }, + "request_type": "app-product-change", + } def product_activated(self, product, enabled): # type: (str, bool) -> None @@ -502,7 +505,7 @@ def add_configurations(self, configuration_list: List[Tuple[str, str, str]]): def add_log(self, level, message, stack_trace="", tags=None): """ - Queues log. This event is meant to send library logs to Datadog’s backend through the Telemetry intake. + Queues log. This event is meant to send library logs to Datadog's backend through the Telemetry intake. This will make support cycles easier and ensure we know about potentially silent issues in libraries. """ if tags is None: @@ -627,24 +630,25 @@ def _flush_log_metrics(self): self._logs = set() return log_metrics - def _generate_metrics_event(self, namespace_metrics) -> None: + def _generate_metrics_event(self, namespace_metrics): + # type: (Dict[str, Dict[str, List[Dict[str, Any]]]]) -> Optional[Dict[str, Any]] for payload_type, namespaces in namespace_metrics.items(): for namespace, metrics in namespaces.items(): if metrics: - payload = { - "namespace": namespace, - "series": metrics, - } log.debug("%s request payload, namespace %s", payload_type, namespace) - if payload_type == TELEMETRY_TYPE_DISTRIBUTION: - self.add_event(payload, TELEMETRY_TYPE_DISTRIBUTION) - elif payload_type == TELEMETRY_TYPE_GENERATE_METRICS: - self.add_event(payload, TELEMETRY_TYPE_GENERATE_METRICS) + return { + "payload": { + "namespace": namespace, + "series": metrics, + }, + "request_type": payload_type, + } + return None def _generate_logs_event(self, logs): - # type: (Set[Dict[str, str]]) -> None + # type: (Set[Dict[str, str]]) -> Dict[str, Any] log.debug("%s request payload", TELEMETRY_TYPE_LOGS) - self.add_event({"logs": list(logs)}, TELEMETRY_TYPE_LOGS) + return {"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS} def _dispatch(self): # moved core here to avoid circular import @@ -653,47 +657,96 @@ def _dispatch(self): core.dispatch("telemetry.periodic") def periodic(self, force_flush=False, shutting_down=False): - # ensure app_started is called at least once in case traces weren't flushed - self._app_started() - self._app_product_change() + """Process and send telemetry events in batches. + + This method handles the periodic collection and sending of telemetry data with two main timing intervals: + 1. Metrics collection interval (10 seconds by default): Collects metrics and logs + 2. Heartbeat interval (60 seconds by default): Sends all collected data to the telemetry endpoint + + The method follows this flow: + 1. Collects metrics and logs that have accumulated since last collection + 2. If not at heartbeat interval and not force_flush: + - Queues the metrics and logs for future sending + - Returns early + 3. At heartbeat interval or force_flush: + - Collects app status (started, product changes) + - Collects integration changes + - Collects configuration changes + - Collects dependency changes + - Collects stored events (ex: metrics and logs) + - Sends everything as a single batch + + Args: + force_flush: If True, bypasses the heartbeat interval check and sends immediately + shutting_down: If True, includes app-closing event in the batch + + Note: + - Metrics are collected every 10 seconds to ensure accurate time-based data + - All data is sent in a single batch every 60 seconds to minimize network overhead + - A heartbeat event is always included to keep RC connections alive + """ self._dispatch() + # Collect metrics and logs that have accumulated since last batch + events = [] + if namespace_metrics := self._namespace.flush(float(self.interval)): + if metrics_event := self._generate_metrics_event(namespace_metrics): + events.append(metrics_event) - namespace_metrics = self._namespace.flush(float(self.interval)) - if namespace_metrics: - self._generate_metrics_event(namespace_metrics) - - logs_metrics = self._flush_log_metrics() - if logs_metrics: - self._generate_logs_event(logs_metrics) + if logs_metrics := self._flush_log_metrics(): + events.append(self._generate_logs_event(logs_metrics)) - # Telemetry metrics and logs should be aggregated into payloads every time periodic is called. - # This ensures metrics and logs are submitted in 10 second time buckets. + # Queue metrics if not at heartbeat interval if self._is_periodic and force_flush is False: if self._periodic_count < self._periodic_threshold: self._periodic_count += 1 + if events: + self.add_events(events) return self._periodic_count = 0 - integrations = self._flush_integrations_queue() - if integrations: - self._app_integrations_changed_event(integrations) + # At heartbeat interval, collect and send all telemetry data + if app_started := self._app_started(): + # app-started should be the first event in the batch + events = [app_started] + events - configurations = self._flush_configuration_queue() - if configurations: - self._app_client_configuration_changed_event(configurations) + if app_product_change := self._app_product_change(): + events.append(app_product_change) - self._app_dependencies_loaded_event() - self._add_endpoints_event() + if integrations := self._flush_integrations_queue(): + events.append(self._app_integrations_changed_event(integrations)) - if shutting_down: - self._app_closing_event() + if endpoints_payload := self._flush_app_endpoints(): + events.append(endpoints_payload) - # Send a heartbeat event to the agent, this is required to keep RC connections alive - self._app_heartbeat_event() + if configurations := self._flush_configuration_queue(): + events.append(self._app_client_configuration_changed_event(configurations)) - telemetry_events = self._flush_events_queue() - for telemetry_event in telemetry_events: - self._client.send_event(telemetry_event) + if app_dependencies_loaded := self._app_dependencies_loaded_event(): + events.append(app_dependencies_loaded) + + if shutting_down and (app_closing := self._app_closing_event()): + events.append(app_closing) + + # Always include a heartbeat to keep RC connections alive + events.append(self._app_heartbeat_event()) + + # Get any queued events and combine with current batch + if queued_events := self._flush_events_queue(): + events.extend(queued_events) + + # Prepare and send the final batch + batch_event = { + "tracer_time": int(time.time()), + "runtime_id": get_runtime_id(), + "api_version": "v2", + "seq_id": next(self._sequence_payloads), + "debug": self._debug, + "application": get_application(config.SERVICE, config.VERSION, config.ENV), + "host": get_host_info(), + "payload": events, + "request_type": "message-batch", + } + self._client.send_event(batch_event) def app_shutdown(self): if self.started: @@ -776,8 +829,8 @@ def _telemetry_excepthook(self, tp, value, root_traceback): error_msg = "{}:{} {}".format(filename, lineno, str(value)) self.add_integration(integration_name, True, error_msg=error_msg) - if self._enabled and not self.started: - self._app_started(False) + if app_started := self._app_started(False): + self._events_queue.append(app_started) self.app_shutdown() diff --git a/tests/conftest.py b/tests/conftest.py index 48ef8521ddb..1dfa2ae5d3c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ import ast import base64 import contextlib +import copy import functools import http.client as httplib import importlib @@ -32,7 +33,6 @@ from ddtrace.internal.core import crashtracking from ddtrace.internal.remoteconfig.client import RemoteConfigClient from ddtrace.internal.remoteconfig.worker import remoteconfig_poller -from ddtrace.internal.runtime import get_runtime_id from ddtrace.internal.service import ServiceStatus from ddtrace.internal.service import ServiceStatusError from ddtrace.internal.telemetry import TelemetryWriter @@ -580,7 +580,7 @@ def clear(self): pytest.fail("Failed to clear session: %s" % self.token) return True - def get_requests(self, request_type=None, filter_heartbeats=True): + def get_requests(self, filter_heartbeats=True): """Get a list of the requests sent to the test agent Results are in reverse order by ``seq_id`` @@ -595,11 +595,10 @@ def get_requests(self, request_type=None, filter_heartbeats=True): # /test/session/requests captures non telemetry payloads, ignore these requests continue req["body"] = json.loads(base64.b64decode(req["body"])) - # filter heartbeat requests to reduce noise + if req["body"]["request_type"] == "app-heartbeat" and filter_heartbeats: continue - if request_type is None or req["body"]["request_type"] == request_type: - requests.append(req) + requests.append(req) return sorted(requests, key=lambda r: r["body"]["seq_id"], reverse=True) @@ -608,12 +607,30 @@ def get_events(self, event_type=None, filter_heartbeats=True, subprocess=False): Results are in reverse order by ``seq_id`` """ - requests = self.get_requests(event_type, filter_heartbeats) - if subprocess: - # Use get_runtime_id to filter telemetry events generated in the current process - runtime_id = get_runtime_id() - requests = [req for req in requests if req["body"]["runtime_id"] != runtime_id] - return [req["body"] for req in requests] + requests = self.get_requests() + events = [] + for req in requests: + for req_body in self._get_request_bodies(req): + if filter_heartbeats and req_body["request_type"] == "app-heartbeat": + # filter heartbeat events to reduce noise + continue + if event_type is None or req_body["request_type"] == event_type: + events.append(req_body) + return events + + def _get_request_bodies(self, req): + if req["body"]["request_type"] == "message-batch": + payloads = req["body"]["payload"] + else: + payloads = [{"payload": req["body"]["payload"], "request_type": req["body"]["request_type"]}] + + requests = [] + for payload in payloads: + req_body = copy.deepcopy(req["body"]) + req_body["request_type"] = payload["request_type"] + req_body["payload"] = payload["payload"] + requests.append(req_body) + return requests def get_metrics(self, name=None): metrics = [] diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 623a23c47d1..28847d6ebde 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -161,7 +161,7 @@ def process_trace(self, trace): # force app_started event (instead of waiting for 10 seconds) from ddtrace.internal.telemetry import telemetry_writer -telemetry_writer._app_started() +telemetry_writer.periodic(force_flush=True) """ _, stderr, status, _ = run_python_code_in_subprocess(code) assert status == 0, stderr diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 74458d3edbe..71d36b349cc 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -4,6 +4,7 @@ import time from typing import Any # noqa:F401 from typing import Dict # noqa:F401 +from typing import Optional # noqa:F401 import httpretty import mock @@ -18,7 +19,6 @@ from ddtrace.internal.telemetry.writer import TelemetryWriter from ddtrace.internal.telemetry.writer import get_runtime_id from ddtrace.internal.utils.version import _pep440_to_semver -from ddtrace.settings._config import DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP_DEFAULT from ddtrace.settings._telemetry import config as telemetry_config from tests.conftest import DEFAULT_DDTRACE_SUBPROCESS_TEST_SERVICE_NAME from tests.utils import call_program @@ -34,15 +34,18 @@ def test_add_event(telemetry_writer, test_agent_session, mock_time): # send request to the agent telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests(payload_type) + requests = test_agent_session.get_requests() assert len(requests) == 1 assert requests[0]["headers"]["Content-Type"] == "application/json" assert requests[0]["headers"]["DD-Client-Library-Language"] == "python" assert requests[0]["headers"]["DD-Client-Library-Version"] == _pep440_to_semver() - assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == payload_type + assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" assert requests[0]["headers"]["DD-Telemetry-API-Version"] == "v2" assert requests[0]["headers"]["DD-Telemetry-Debug-Enabled"] == "False" - assert requests[0]["body"] == _get_request_body(payload, payload_type) + + events = test_agent_session.get_events(payload_type) + assert len(events) == 1 + validate_request_body(events[0], payload, payload_type) def test_add_event_disabled_writer(telemetry_writer, test_agent_session): @@ -54,7 +57,7 @@ def test_add_event_disabled_writer(telemetry_writer, test_agent_session): # ensure no request were sent telemetry_writer.periodic(force_flush=True) - assert len(test_agent_session.get_requests(payload_type)) == 1 + assert len(test_agent_session.get_events(payload_type)) == 1 @pytest.mark.parametrize( @@ -88,129 +91,22 @@ def test_app_started_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_started() queues a valid telemetry request which is then sent by periodic()""" with override_global_config(dict(_telemetry_dependency_collection=False)): # queue an app started event - telemetry_writer._app_started() + event = telemetry_writer._app_started() + assert event is not None, "app_started() did not return an event" + telemetry_writer.add_event(event["payload"], "app-started") # force a flush telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests("app-started") + requests = test_agent_session.get_requests() assert len(requests) == 1 - assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "app-started" - - payload = { - "configuration": sorted( - [ - {"name": "DD_AGENT_HOST", "origin": "unknown", "value": None}, - {"name": "DD_AGENT_PORT", "origin": "unknown", "value": None}, - {"name": "DD_DOGSTATSD_PORT", "origin": "unknown", "value": None}, - {"name": "DD_DOGSTATSD_URL", "origin": "unknown", "value": None}, - {"name": "DD_DYNAMIC_INSTRUMENTATION_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_EXCEPTION_REPLAY_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_FASTAPI_ASYNC_BODY_TIMEOUT_SECONDS", "origin": "default", "value": 0.1}, - {"name": "DD_INSTRUMENTATION_TELEMETRY_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_STACK_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_MEMORY_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_HEAP_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_LOCK_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_CAPTURE_PCT", "origin": "unknown", "value": 1.0}, - {"name": "DD_PROFILING_UPLOAD_INTERVAL", "origin": "unknown", "value": 60.0}, - {"name": "DD_PROFILING_MAX_FRAMES", "origin": "unknown", "value": 64}, - {"name": "DD_REMOTE_CONFIGURATION_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "origin": "unknown", "value": 5.0}, - {"name": "DD_RUNTIME_METRICS_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_SERVICE_MAPPING", "origin": "unknown", "value": ""}, - {"name": "DD_SPAN_SAMPLING_RULES", "origin": "unknown", "value": None}, - {"name": "DD_SPAN_SAMPLING_RULES_FILE", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_TRACE_AGENT_HOSTNAME", "origin": "default", "value": None}, - {"name": "DD_TRACE_AGENT_PORT", "origin": "default", "value": None}, - {"name": "DD_TRACE_AGENT_TIMEOUT_SECONDS", "origin": "unknown", "value": 2.0}, - {"name": "DD_TRACE_API_VERSION", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_CLIENT_IP_ENABLED", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_COMPUTE_STATS", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_DEBUG", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_HEALTH_METRICS_ENABLED", "origin": "unknown", "value": False}, - { - "name": "DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP", - "origin": "unknown", - "value": DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP_DEFAULT, - }, - {"name": "DD_TRACE_OTEL_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_PARTIAL_FLUSH_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_TRACE_PARTIAL_FLUSH_MIN_SPANS", "origin": "unknown", "value": 300}, - { - "name": "DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED", - "origin": "default", - "value": False, - }, - { - "name": "DD_TRACE_PEER_SERVICE_MAPPING", - "origin": "env_var", - "value": "default_service:remapped_service", - }, - {"name": "DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_PEER_SERVICE_MAPPING", "origin": "unknown", "value": ""}, - { - "name": "DD_TRACE_PROPAGATION_STYLE_EXTRACT", - "origin": "unknown", - "value": "datadog,tracecontext", - }, - {"name": "DD_TRACE_PROPAGATION_STYLE_INJECT", "origin": "unknown", "value": "datadog,tracecontext"}, - {"name": "DD_TRACE_RATE_LIMIT", "origin": "unknown", "value": 100}, - {"name": "DD_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_SPAN_ATTRIBUTE_SCHEMA", "origin": "unknown", "value": "v0"}, - {"name": "DD_TRACE_STARTUP_LOGS", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_WRITER_BUFFER_SIZE_BYTES", "origin": "unknown", "value": 20 << 20}, - {"name": "DD_TRACE_WRITER_INTERVAL_SECONDS", "origin": "unknown", "value": 1.0}, - {"name": "DD_TRACE_WRITER_MAX_PAYLOAD_SIZE_BYTES", "origin": "unknown", "value": 20 << 20}, - {"name": "DD_TRACE_WRITER_REUSE_CONNECTIONS", "origin": "unknown", "value": False}, - {"name": "instrumentation_source", "origin": "code", "value": "manual"}, - {"name": "profiling_enabled", "origin": "default", "value": "false"}, - {"name": "data_streams_enabled", "origin": "default", "value": "false"}, - {"name": "appsec_enabled", "origin": "default", "value": "false"}, - {"name": "crashtracking_create_alt_stack", "origin": "unknown", "value": True}, - {"name": "crashtracking_use_alt_stack", "origin": "unknown", "value": True}, - {"name": "crashtracking_available", "origin": "unknown", "value": sys.platform == "linux"}, - {"name": "crashtracking_debug_url", "origin": "unknown", "value": None}, - {"name": "crashtracking_enabled", "origin": "unknown", "value": sys.platform == "linux"}, - {"name": "crashtracking_stacktrace_resolver", "origin": "unknown", "value": "full"}, - {"name": "crashtracking_started", "origin": "unknown", "value": False}, - {"name": "crashtracking_stderr_filename", "origin": "unknown", "value": None}, - {"name": "crashtracking_stdout_filename", "origin": "unknown", "value": None}, - { - "name": "python_build_gnu_type", - "origin": "unknown", - "value": sysconfig.get_config_var("BUILD_GNU_TYPE"), - }, - { - "name": "python_host_gnu_type", - "origin": "unknown", - "value": sysconfig.get_config_var("HOST_GNU_TYPE"), - }, - {"name": "python_soabi", "origin": "unknown", "value": sysconfig.get_config_var("SOABI")}, - {"name": "trace_sample_rate", "origin": "default", "value": "1.0"}, - {"name": "trace_sampling_rules", "origin": "default", "value": ""}, - {"name": "trace_header_tags", "origin": "default", "value": ""}, - {"name": "logs_injection_enabled", "origin": "default", "value": True}, - {"name": "trace_tags", "origin": "default", "value": ""}, - {"name": "trace_enabled", "origin": "default", "value": "true"}, - {"name": "instrumentation_config_id", "origin": "default", "value": ""}, - {"name": "DD_INJECT_FORCE", "origin": "unknown", "value": True}, - {"name": "DD_LIB_INJECTED", "origin": "unknown", "value": False}, - {"name": "DD_LIB_INJECTION_ATTEMPTED", "origin": "unknown", "value": False}, - ], - key=lambda x: x["name"], - ), - "error": { - "code": 0, - "message": "", - }, - } - requests[0]["body"]["payload"]["configuration"].sort(key=lambda c: c["name"]) - result = _get_request_body(payload, "app-started") - result["payload"]["configuration"] = [ - a for a in result["payload"]["configuration"] if a["name"] != "DD_TRACE_AGENT_URL" - ] - assert payload == result["payload"] + assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" + app_started_events = test_agent_session.get_events("app-started") + assert len(app_started_events) == 1 + validate_request_body(app_started_events[0], None, "app-started") + assert len(app_started_events[0]["payload"]) == 3 + assert app_started_events[0]["payload"].get("configuration") + assert app_started_events[0]["payload"].get("products") + assert app_started_events[0]["payload"].get("error") == {"code": 0, "message": ""} def test_app_started_event_configuration_override(test_agent_session, run_python_code_in_subprocess, tmpdir): @@ -740,11 +636,12 @@ def test_app_closing_event(telemetry_writer, test_agent_session, mock_time): # send app closed event telemetry_writer.app_shutdown() - requests = test_agent_session.get_requests("app-closing") - assert len(requests) == 1 + num_requests = len(test_agent_session.get_requests()) + assert num_requests == 1 # ensure a valid request body was sent - totel_events = len(test_agent_session.get_events()) - assert requests[0]["body"] == _get_request_body({}, "app-closing", totel_events) + events = test_agent_session.get_events("app-closing") + assert len(events) == 1 + validate_request_body(events[0], {}, "app-closing", num_requests) def test_add_integration(telemetry_writer, test_agent_session, mock_time): @@ -756,12 +653,11 @@ def test_add_integration(telemetry_writer, test_agent_session, mock_time): # send integrations to the agent telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests("app-integrations-change") + events = test_agent_session.get_events("app-integrations-change") # assert integration change telemetry request was sent - assert len(requests) == 1 - + assert len(events) == 1 # assert that the request had a valid request body - requests[0]["body"]["payload"]["integrations"].sort(key=lambda x: x["name"]) + events[0]["payload"]["integrations"].sort(key=lambda x: x["name"]) expected_payload = { "integrations": [ { @@ -782,7 +678,7 @@ def test_add_integration(telemetry_writer, test_agent_session, mock_time): }, ] } - assert requests[0]["body"] == _get_request_body(expected_payload, "app-integrations-change", seq_id=2) + validate_request_body(events[0], expected_payload, "app-integrations-change") def test_app_client_configuration_changed_event(telemetry_writer, test_agent_session, mock_time): @@ -822,7 +718,7 @@ def test_add_integration_disabled_writer(telemetry_writer, test_agent_session): telemetry_writer.add_integration("integration-name", True, False, "") telemetry_writer.periodic(force_flush=True) - assert len(test_agent_session.get_requests("app-integrations-change")) == 0 + assert len(test_agent_session.get_events("app-integrations-change")) == 0 @pytest.mark.parametrize("mock_status", [300, 400, 401, 403, 500]) @@ -920,20 +816,21 @@ def test_app_product_change_event(mock_time, telemetry_writer, test_agent_sessio } -def _get_request_body(payload, payload_type, seq_id=1): - # type: (Dict, str, int) -> Dict +def validate_request_body(received_body, payload, payload_type, seq_id=None): + # type: (Dict, Dict, str, Optional[int]) -> Dict """used to test the body of requests received by the testagent""" - return { - "tracer_time": time.time(), - "runtime_id": get_runtime_id(), - "api_version": "v2", - "debug": False, - "seq_id": seq_id, - "application": get_application(config.service, config.version, config.env), - "host": get_host_info(), - "payload": payload, - "request_type": payload_type, - } + assert len(received_body) == 9 + assert received_body["tracer_time"] == time.time() + assert received_body["runtime_id"] == get_runtime_id() + assert received_body["api_version"] == "v2" + assert received_body["debug"] is False + if seq_id is not None: + assert received_body["seq_id"] == seq_id + assert received_body["application"] == get_application(config.service, config.version, config.env) + assert received_body["host"] == get_host_info() + if payload is not None: + assert received_body["payload"] == payload + assert received_body["request_type"] == payload_type def test_telemetry_writer_agent_setup(): From 7b36e7bb43f955e4a58d5021b9b44ca372f9f899 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Tue, 16 Sep 2025 14:20:24 -0400 Subject: [PATCH 02/13] update smoke test --- tests/appsec/architectures/mini.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/appsec/architectures/mini.py b/tests/appsec/architectures/mini.py index f0981167a55..bd87e6c4200 100644 --- a/tests/appsec/architectures/mini.py +++ b/tests/appsec/architectures/mini.py @@ -19,25 +19,27 @@ _TELEMETRY_DEPENDENCIES = [] # intercept telemetry events -from ddtrace.internal.telemetry.writer import TelemetryWriter # noqa: E402 +from ddtrace.internal.telemetry.writer import _TelemetryClient # noqa: E402 -_flush_events = TelemetryWriter._flush_events_queue +_send_event = _TelemetryClient.send_event -def _flush_events_wrapper(self): +def _send_event_wrapper(self, event): global _TELEMETRY_DEPENDENCIES - res = _flush_events(self) - if res: - dependencies = [v.get("payload", {}).get("dependencies", {}) for v in res] - dependencies = [d for d in dependencies if d] + print(f"Captured telemetry event: {event}", flush=True) + if event: + if event.get("request_type") == "message-batch": + dependencies = [v.get("payload", {}).get("dependencies", []) for v in event.get("payload", [])] + else: + dependencies = event.get("payload", {}).get("dependencies", []) for lst in dependencies: _TELEMETRY_DEPENDENCIES.extend(lst) - print(f"flushed events {dependencies}", flush=True) - return res + print(f"Captured dependencies: {dependencies}", flush=True) + return _send_event(self, event) -TelemetryWriter._flush_events_queue = _flush_events_wrapper +_TelemetryClient.send_event = _send_event_wrapper @app.route("/") From 874eacb2061104ee8d0d797998c450ccda51ebed Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Tue, 16 Sep 2025 15:19:04 -0400 Subject: [PATCH 03/13] fix broken app started tests --- tests/integration/test_settings.py | 5 +---- tests/telemetry/test_writer.py | 12 +++++------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/tests/integration/test_settings.py b/tests/integration/test_settings.py index c2f1291b419..8d5aa263d9e 100644 --- a/tests/integration/test_settings.py +++ b/tests/integration/test_settings.py @@ -59,6 +59,7 @@ def test_setting_origin_code(test_agent_session, run_python_code_in_subprocess): "DD_TAGS": "team:apm,component:web", "DD_TRACE_ENABLED": "true", "DD_CIVISIBILITY_AGENTLESS_ENABLED": "false", + "_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED": "true", } ) out, err, status, _ = run_python_code_in_subprocess( @@ -69,10 +70,6 @@ def test_setting_origin_code(test_agent_session, run_python_code_in_subprocess): config._trace_http_header_tags = {"header": "value"} config.tags = {"header": "value"} config._tracing_enabled = False - -from ddtrace.internal.telemetry import telemetry_writer -# simulate app start event, this occurs when the first span is sent to the datadog agent -telemetry_writer._app_started() """, env=env, ) diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 71d36b349cc..c3b4ce81dac 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -184,9 +184,8 @@ def test_app_started_event_configuration_override(test_agent_session, run_python env["DD_INJECT_FORCE"] = "true" env["DD_INJECTION_ENABLED"] = "tracer" - # By default telemetry collection is enabled after 10 seconds, so we either need to - # to sleep for 10 seconds or manually call _app_started() to generate the app started event. - # This delay allows us to collect start up errors and dynamic configurations + # Ensures app-started event is queued immediately after ddtrace is imported + # instead of waiting for 10 seconds. env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" _, stderr, status, _ = run_python_code_in_subprocess(code, env=env) @@ -630,9 +629,8 @@ def test_update_dependencies_event_not_stdlib(test_agent_session, ddtrace_run_py def test_app_closing_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_shutdown() queues and sends an app-closing telemetry request""" - # app started event must be queued before any other telemetry event - telemetry_writer._app_started(register_app_shutdown=False) - assert telemetry_writer.started + # Telemetry writer must start before app-closing event is queued + telemetry_writer.started = True # send app closed event telemetry_writer.app_shutdown() @@ -784,7 +782,7 @@ def test_app_product_change_event(mock_time, telemetry_writer, test_agent_sessio telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.APPSEC, True) assert all(telemetry_writer._product_enablement.values()) - telemetry_writer._app_started() + telemetry_writer.periodic(force_flush=True) # Assert that there's only an app_started event (since product activation happened before the app started) events = test_agent_session.get_events("app-product-change") From 71ca2c1f3961a1081aeed1c8dd8f41fbae13a477 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Tue, 16 Sep 2025 17:46:35 -0400 Subject: [PATCH 04/13] fix generate metrics and improve sent debug logs --- ddtrace/internal/telemetry/writer.py | 42 +++++++++++++++++----------- tests/telemetry/test_writer.py | 9 +++--- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 2e87ff55058..f2601acd1b8 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -95,17 +95,25 @@ def send_event(self, request: Dict) -> Optional[httplib.HTTPResponse]: conn = get_connection(self._telemetry_url) conn.request("POST", self._endpoint, rb_json, headers) resp = conn.getresponse() + request_types = request["request_type"] + if request_types == "message-batch": + request_types = ", ".join([event["request_type"] for event in request["payload"]]) if resp.status < 300: log.debug( - "Instrumentation Telemetry sent %d bytes in %.5fs to %s. Event: %s. Response: %s", + "Instrumentation Telemetry sent %d bytes in %.5fs to %s. Event(s): %s. Response: %s", len(rb_json), sw.elapsed(), self.url, - request["request_type"], + request_types, resp.status, ) else: - log.debug("Failed to send Instrumentation Telemetry to %s. response: %s", self.url, resp.status) + log.debug( + "Failed to send Instrumentation Telemetry to %s. Event(s): %s. Response: %s", + self.url, + request_types, + resp.status, + ) except Exception as e: log.debug("Failed to send Instrumentation Telemetry to %s. Error: %s", self.url, str(e)) finally: @@ -630,20 +638,22 @@ def _flush_log_metrics(self): self._logs = set() return log_metrics - def _generate_metrics_event(self, namespace_metrics): - # type: (Dict[str, Dict[str, List[Dict[str, Any]]]]) -> Optional[Dict[str, Any]] + def _generate_metrics_events(self, namespace_metrics): + # type: (Dict[str, Dict[str, List[Dict[str, Any]]]]) -> List[Dict[str, Any]] + metric_payloads = [] for payload_type, namespaces in namespace_metrics.items(): for namespace, metrics in namespaces.items(): if metrics: - log.debug("%s request payload, namespace %s", payload_type, namespace) - return { - "payload": { - "namespace": namespace, - "series": metrics, - }, - "request_type": payload_type, - } - return None + metric_payloads.append( + { + "payload": { + "namespace": namespace, + "series": metrics, + }, + "request_type": payload_type, + } + ) + return metric_payloads def _generate_logs_event(self, logs): # type: (Set[Dict[str, str]]) -> Dict[str, Any] @@ -689,8 +699,8 @@ def periodic(self, force_flush=False, shutting_down=False): # Collect metrics and logs that have accumulated since last batch events = [] if namespace_metrics := self._namespace.flush(float(self.interval)): - if metrics_event := self._generate_metrics_event(namespace_metrics): - events.append(metrics_event) + if metrics_events := self._generate_metrics_events(namespace_metrics): + events.extend(metrics_events) if logs_metrics := self._flush_log_metrics(): events.append(self._generate_logs_event(logs_metrics)) diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index c3b4ce81dac..dc11d17f8ec 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -733,8 +733,9 @@ def test_send_failing_request(mock_status, telemetry_writer): telemetry_writer.periodic(force_flush=True) # asserts unsuccessful status code was logged log.debug.assert_called_with( - "Failed to send Instrumentation Telemetry to %s. response: %s", + "Failed to send Instrumentation Telemetry to %s. Event(s): %s. Response: %s", telemetry_writer._client.url, + mock.ANY, mock_status, ) @@ -753,10 +754,10 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se # Assert next flush contains app-heartbeat event for _ in range(telemetry_writer._periodic_threshold): telemetry_writer.periodic() - assert test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) == [] + assert test_agent_session.get_events(mock.ANY, filter_heartbeats=False) == [] telemetry_writer.periodic() - heartbeat_events = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) + heartbeat_events = test_agent_session.get_events(mock.ANY, filter_heartbeats=False) assert len(heartbeat_events) == 1 @@ -765,7 +766,7 @@ def test_app_heartbeat_event(mock_time, telemetry_writer, test_agent_session): """asserts that we queue/send app-heartbeat event every 60 seconds when app_heartbeat_event() is called""" # Assert a maximum of one heartbeat is queued per flush telemetry_writer.periodic(force_flush=True) - events = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) + events = test_agent_session.get_events(mock.ANY, filter_heartbeats=False) assert len(events) > 0 From c41db6f140192282901ebd588ee52d6a42746221 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Tue, 16 Sep 2025 19:29:47 -0400 Subject: [PATCH 05/13] add rn clean up send event args --- ddtrace/internal/telemetry/writer.py | 21 +++++++++++-------- ...l-telemetry-requests-4d36c7f74b077dc0.yaml | 4 ++++ tests/appsec/architectures/mini.py | 4 ++-- tests/telemetry/test_writer.py | 2 +- 4 files changed, 19 insertions(+), 12 deletions(-) create mode 100644 releasenotes/notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index f2601acd1b8..ac0746428e2 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -84,7 +84,7 @@ def __init__(self, agentless): def url(self): return parse.urljoin(self._telemetry_url, self._endpoint) - def send_event(self, request: Dict) -> Optional[httplib.HTTPResponse]: + def send_event(self, request: Dict, payload_type: str) -> Optional[httplib.HTTPResponse]: """Sends a telemetry request to the trace agent""" resp = None conn = None @@ -95,27 +95,29 @@ def send_event(self, request: Dict) -> Optional[httplib.HTTPResponse]: conn = get_connection(self._telemetry_url) conn.request("POST", self._endpoint, rb_json, headers) resp = conn.getresponse() - request_types = request["request_type"] - if request_types == "message-batch": - request_types = ", ".join([event["request_type"] for event in request["payload"]]) if resp.status < 300: log.debug( "Instrumentation Telemetry sent %d bytes in %.5fs to %s. Event(s): %s. Response: %s", len(rb_json), sw.elapsed(), self.url, - request_types, + payload_type, resp.status, ) else: log.debug( "Failed to send Instrumentation Telemetry to %s. Event(s): %s. Response: %s", self.url, - request_types, + payload_type, resp.status, ) - except Exception as e: - log.debug("Failed to send Instrumentation Telemetry to %s. Error: %s", self.url, str(e)) + except Exception: + log.debug( + "Failed to send Instrumentation Telemetry to %s. Event(s): %s", + self.url, + payload_type, + exc_info=True, + ) finally: if conn is not None: conn.close() @@ -744,6 +746,7 @@ def periodic(self, force_flush=False, shutting_down=False): if queued_events := self._flush_events_queue(): events.extend(queued_events) + payload_types = ", ".join([event["request_type"] for event in events]) # Prepare and send the final batch batch_event = { "tracer_time": int(time.time()), @@ -756,7 +759,7 @@ def periodic(self, force_flush=False, shutting_down=False): "payload": events, "request_type": "message-batch", } - self._client.send_event(batch_event) + self._client.send_event(batch_event, payload_types) def app_shutdown(self): if self.started: diff --git a/releasenotes/notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml b/releasenotes/notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml new file mode 100644 index 00000000000..67b30dad89d --- /dev/null +++ b/releasenotes/notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + performance: Batch all telemetry requests within a single heartbeat interval, reducing network requests by 95% (from ~20 to 1 per minute per process). \ No newline at end of file diff --git a/tests/appsec/architectures/mini.py b/tests/appsec/architectures/mini.py index bd87e6c4200..687ba77ee3b 100644 --- a/tests/appsec/architectures/mini.py +++ b/tests/appsec/architectures/mini.py @@ -25,7 +25,7 @@ _send_event = _TelemetryClient.send_event -def _send_event_wrapper(self, event): +def _send_event_wrapper(self, event, payload_type): global _TELEMETRY_DEPENDENCIES print(f"Captured telemetry event: {event}", flush=True) if event: @@ -36,7 +36,7 @@ def _send_event_wrapper(self, event): for lst in dependencies: _TELEMETRY_DEPENDENCIES.extend(lst) print(f"Captured dependencies: {dependencies}", flush=True) - return _send_event(self, event) + return _send_event(self, event, payload_type) _TelemetryClient.send_event = _send_event_wrapper diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index dc11d17f8ec..fd5cc58e5ad 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -757,7 +757,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se assert test_agent_session.get_events(mock.ANY, filter_heartbeats=False) == [] telemetry_writer.periodic() - heartbeat_events = test_agent_session.get_events(mock.ANY, filter_heartbeats=False) + heartbeat_events = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) assert len(heartbeat_events) == 1 From 760c2a08cf20e1fb81209e1a2c61464f9b271a43 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Wed, 17 Sep 2025 14:09:15 -0400 Subject: [PATCH 06/13] rm release note --- .../notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 releasenotes/notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml diff --git a/releasenotes/notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml b/releasenotes/notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml deleted file mode 100644 index 67b30dad89d..00000000000 --- a/releasenotes/notes/batch-all-telemetry-requests-4d36c7f74b077dc0.yaml +++ /dev/null @@ -1,4 +0,0 @@ ---- -fixes: - - | - performance: Batch all telemetry requests within a single heartbeat interval, reducing network requests by 95% (from ~20 to 1 per minute per process). \ No newline at end of file From 6bb05d2db4fc124f27a1830893e85e07cfd1251e Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Wed, 17 Sep 2025 13:58:23 -0400 Subject: [PATCH 07/13] clean up payload generation --- ddtrace/internal/telemetry/writer.py | 210 +++++++++----------------- tests/appsec/appsec/test_telemetry.py | 4 +- tests/telemetry/app.py | 2 +- tests/telemetry/test_telemetry.py | 6 +- tests/telemetry/test_writer.py | 31 ++-- 5 files changed, 96 insertions(+), 157 deletions(-) diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index ac0746428e2..e98f63f007d 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -182,7 +182,7 @@ def __init__(self, is_periodic=True, agentless=None): self._imported_dependencies: Dict[str, str] = dict() self._modules_already_imported: Set[str] = set() self._product_enablement = {product.value: False for product in TELEMETRY_APM_PRODUCT} - self._send_product_change_updates = False + self._previous_product_enablement = {} self._extended_time = time.monotonic() # The extended heartbeat interval is set to 24 hours self._extended_heartbeat_interval = 3600 * 24 @@ -215,8 +215,8 @@ def __init__(self, is_periodic=True, agentless=None): # This will occur when the agent writer starts. self.enable() # Force app started for unit tests - if config.FORCE_START and (app_started := self._app_started()): - self._events_queue.append(app_started) + if config.FORCE_START and (app_started := self._app_started_payload()): + self._events_queue.append({"payload": app_started, "request_type": "app-started"}) get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self)) def enable(self): @@ -318,31 +318,26 @@ def add_error(self, code, msg, filename, line_number): msg = "%s:%s: %s" % (filename, line_number, msg) self._error = (code, msg) - def _app_started(self, register_app_shutdown=True): - # type: (bool) -> Optional[Dict[str, Any]] + def _app_started_payload(self, register_app_shutdown=True): + # type: (bool) -> Dict[str, Any] """Sent when TelemetryWriter is enabled or forks""" if self._forked or self.started: # app-started events should only be sent by the main process - return None + return {} # List of configurations to be collected self.started = True - products = { - product: {"version": tracer_version, "enabled": status} - for product, status in self._product_enablement.items() - } - # SOABI should help us identify which wheels people are getting from PyPI self.add_configurations(get_python_config_vars()) payload = { - "configuration": self._flush_configuration_queue(), + "configuration": self._report_configuration_queue(), "error": { "code": self._error[0], "message": self._error[1], }, - "products": products, + "products": self._report_app_products(), } # type: Dict[str, Union[Dict[str, Any], List[Any]]] # Add time to value telemetry metrics for single step instrumentation if config.INSTALL_ID or config.INSTALL_TYPE or config.INSTALL_TIME: @@ -354,43 +349,20 @@ def _app_started(self, register_app_shutdown=True): # Reset the error after it has been reported. self._error = (0, "") - return {"payload": payload, "request_type": "app-started"} + return payload - def _app_heartbeat_event(self): + def _app_heartbeat_payload(self): # type: () -> Dict[str, Any] if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: self._extended_time += self._extended_heartbeat_interval - self._app_dependencies_loaded_event() - payload = { + return { "dependencies": [ {"name": name, "version": version} for name, version in self._imported_dependencies.items() ] } - request_type = "app-extended-heartbeat" - else: - payload = {} - request_type = "app-heartbeat" - return {"payload": payload, "request_type": request_type} - - def _app_closing_event(self): - # type: () -> Optional[Dict[str, Any]] - """Adds a Telemetry event which notifies the agent that an application instance has terminated""" - if self._forked: - # app-closing event should only be sent by the main process - return None - return {"payload": {}, "request_type": "app-closing"} - - def _app_integrations_changed_event(self, integrations): - # type: (List[Dict]) -> Dict - """Adds a Telemetry event which sends a list of configured integrations to the agent""" - return { - "payload": { - "integrations": integrations, - }, - "request_type": "app-integrations-change", - } + return {} - def _flush_integrations_queue(self): + def _report_integrations(self): # type: () -> List[Dict] """Flushes and returns a list of all queued integrations""" with self._service_lock: @@ -398,7 +370,7 @@ def _flush_integrations_queue(self): self._integrations_queue = dict() return integrations - def _flush_configuration_queue(self): + def _report_configuration_queue(self): # type: () -> List[Dict] """Flushes and returns a list of all queued configurations""" with self._service_lock: @@ -406,76 +378,51 @@ def _flush_configuration_queue(self): self._configuration_queue = [] return configurations - def _app_client_configuration_changed_event(self, configurations): - # type: (List[Dict]) -> Dict[str, Any] - """Adds a Telemetry event which sends list of modified configurations to the agent""" - return { - "payload": { - "configuration": configurations, - }, - "request_type": "app-client-configuration-change", - } - - def _app_dependencies_loaded_event(self): + def _report_app_dependencies(self): # type: () -> Optional[Dict[str, Any]] """Adds events to report imports done since the last periodic run""" if not config.DEPENDENCY_COLLECTION or not self._enabled: - return None + return {} + with self._service_lock: newly_imported_deps = modules.get_newly_imported_modules(self._modules_already_imported) if not newly_imported_deps: - return None + return {} with self._service_lock: - if packages := update_imported_dependencies(self._imported_dependencies, newly_imported_deps): - return {"payload": {"dependencies": packages}, "request_type": "app-dependencies-loaded"} - return None + return update_imported_dependencies(self._imported_dependencies, newly_imported_deps) - def _flush_app_endpoints(self): + def _report_app_endpoints(self): """Adds a Telemetry event which sends the list of HTTP endpoints found at startup to the agent""" import ddtrace.settings.asm as asm_config_module if not asm_config_module.config._api_security_endpoint_collection or not self._enabled: - return + return {} if not endpoint_collection.endpoints: - return + return {} with self._service_lock: - payload = endpoint_collection.flush(asm_config_module.config._api_security_endpoint_collection_limit) - return {"payload": payload, "request_type": "app-endpoints"} + return endpoint_collection.flush(asm_config_module.config._api_security_endpoint_collection_limit) - def _app_product_change(self): - # type: () -> Optional[Dict[str, Any]] + def _report_app_products(self): + # type: () -> Dict[str, Any] """Adds a Telemetry event which reports the enablement of an APM product""" + with self._service_lock: + products = self._product_enablement.items() + for product, status in products: + self._previous_product_enablement[product] = status + self._product_enablement = {} + return {product: {"version": tracer_version, "enabled": status} for product, status in products} - if not self._send_product_change_updates: - return None - - self._send_product_change_updates = False - return { - "payload": { - "products": { - product: {"version": tracer_version, "enabled": status} - for product, status in self._product_enablement.items() - } - }, - "request_type": "app-product-change", - } - - def product_activated(self, product, enabled): + def product_activated(self, product, status): # type: (str, bool) -> None """Updates the product enablement dict""" - - if self._product_enablement.get(product, False) is enabled: - return - - self._product_enablement[product] = enabled - - # If the app hasn't started, the product status will be included in the app_started event's payload - if self.started: - self._send_product_change_updates = True + with self._service_lock: + # Only send product change event if the product status has changed + if self._previous_product_enablement.get(product) != status: + self._product_enablement[product] = status def add_configuration(self, configuration_name, configuration_value, origin="unknown", config_id=None): # type: (str, Any, str, Optional[str]) -> None @@ -633,34 +580,12 @@ def add_distribution_metric( tags, ) - def _flush_log_metrics(self): + def _report_logs(self): # type () -> Set[Metric] with self._service_lock: - log_metrics = self._logs + logs = self._logs self._logs = set() - return log_metrics - - def _generate_metrics_events(self, namespace_metrics): - # type: (Dict[str, Dict[str, List[Dict[str, Any]]]]) -> List[Dict[str, Any]] - metric_payloads = [] - for payload_type, namespaces in namespace_metrics.items(): - for namespace, metrics in namespaces.items(): - if metrics: - metric_payloads.append( - { - "payload": { - "namespace": namespace, - "series": metrics, - }, - "request_type": payload_type, - } - ) - return metric_payloads - - def _generate_logs_event(self, logs): - # type: (Set[Dict[str, str]]) -> Dict[str, Any] - log.debug("%s request payload", TELEMETRY_TYPE_LOGS) - return {"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS} + return logs def _dispatch(self): # moved core here to avoid circular import @@ -686,7 +611,7 @@ def periodic(self, force_flush=False, shutting_down=False): - Collects configuration changes - Collects dependency changes - Collects stored events (ex: metrics and logs) - - Sends everything as a single batch + - Sends everything as a single batched request Args: force_flush: If True, bypasses the heartbeat interval check and sends immediately @@ -696,16 +621,21 @@ def periodic(self, force_flush=False, shutting_down=False): - Metrics are collected every 10 seconds to ensure accurate time-based data - All data is sent in a single batch every 60 seconds to minimize network overhead - A heartbeat event is always included to keep RC connections alive + - Multiple event types are combined into a single message-batch request """ self._dispatch() # Collect metrics and logs that have accumulated since last batch events = [] if namespace_metrics := self._namespace.flush(float(self.interval)): - if metrics_events := self._generate_metrics_events(namespace_metrics): - events.extend(metrics_events) + for payload_type, namespaces in namespace_metrics.items(): + for namespace, metrics in namespaces.items(): + if metrics: + events.append( + {"payload": {"namespace": namespace, "series": metrics}, "request_type": payload_type} + ) - if logs_metrics := self._flush_log_metrics(): - events.append(self._generate_logs_event(logs_metrics)) + if logs := self._report_logs(): + events.append({"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS}) # Queue metrics if not at heartbeat interval if self._is_periodic and force_flush is False: @@ -717,36 +647,44 @@ def periodic(self, force_flush=False, shutting_down=False): self._periodic_count = 0 # At heartbeat interval, collect and send all telemetry data - if app_started := self._app_started(): + if app_started_payload := self._app_started_payload(): # app-started should be the first event in the batch - events = [app_started] + events + events = [{"payload": app_started_payload, "request_type": "app-started"}] + events - if app_product_change := self._app_product_change(): - events.append(app_product_change) + if products := self._report_app_products(): + events.append({"payload": {"products": products}, "request_type": "app-product-change"}) - if integrations := self._flush_integrations_queue(): - events.append(self._app_integrations_changed_event(integrations)) + if ints := self._report_integrations(): + events.append({"payload": {"integrations": ints}, "request_type": "app-integrations-change"}) - if endpoints_payload := self._flush_app_endpoints(): - events.append(endpoints_payload) + if endpoints := self._report_app_endpoints(): + events.append({"payload": endpoints, "request_type": "app-endpoints"}) - if configurations := self._flush_configuration_queue(): - events.append(self._app_client_configuration_changed_event(configurations)) + if configs := self._report_configuration_queue(): + events.append({"payload": {"configuration": configs}, "request_type": "app-client-configuration-change"}) - if app_dependencies_loaded := self._app_dependencies_loaded_event(): - events.append(app_dependencies_loaded) + if deps := self._report_app_dependencies(): + events.append({"payload": {"dependencies": deps}, "request_type": "app-dependencies-loaded"}) - if shutting_down and (app_closing := self._app_closing_event()): - events.append(app_closing) + if shutting_down and not self._forked: + events.append({"payload": {}, "request_type": "app-closing"}) # Always include a heartbeat to keep RC connections alive - events.append(self._app_heartbeat_event()) + # Extended heartbeat should be queued after app-dependencies-loaded event. This + # ensures that that imported dependencies are accurately reported. + if heartbeat_payload := self._app_heartbeat_payload(): + # Extended heartbeat report dependencies while regular heartbeats report empty payloads + events.append({"payload": heartbeat_payload, "request_type": "app-extended-heartbeat"}) + else: + events.append({"payload": {}, "request_type": "app-heartbeat"}) # Get any queued events and combine with current batch if queued_events := self._flush_events_queue(): events.extend(queued_events) + # Create comma-separated list of event types for logging payload_types = ", ".join([event["request_type"] for event in events]) + # Prepare and send the final batch batch_event = { "tracer_time": int(time.time()), @@ -842,8 +780,8 @@ def _telemetry_excepthook(self, tp, value, root_traceback): error_msg = "{}:{} {}".format(filename, lineno, str(value)) self.add_integration(integration_name, True, error_msg=error_msg) - if app_started := self._app_started(False): - self._events_queue.append(app_started) + if app_started := self._app_started_payload(False): + self._events_queue.append({"payload": app_started, "request_type": "app-started"}) self.app_shutdown() diff --git a/tests/appsec/appsec/test_telemetry.py b/tests/appsec/appsec/test_telemetry.py index 24f2dc6a883..fd542ba244d 100644 --- a/tests/appsec/appsec/test_telemetry.py +++ b/tests/appsec/appsec/test_telemetry.py @@ -299,7 +299,7 @@ def test_appsec_enabled_metric( # Restore defaults and enabling telemetry appsec service with override_global_config({"_asm_enabled": True, "_lib_was_injected": False}): tracer.configure(appsec_enabled=appsec_enabled, appsec_enabled_origin=APPSEC.ENABLED_ORIGIN_UNKNOWN) - telemetry_writer._flush_configuration_queue() + telemetry_writer._report_configuration_queue() # Start the test with override_env(environment), override_global_config( @@ -311,7 +311,7 @@ def test_appsec_enabled_metric( telemetry_writer._dispatch() - metrics_result = telemetry_writer._flush_configuration_queue() + metrics_result = telemetry_writer._report_configuration_queue() assert metrics_result == [ {"name": "DD_APPSEC_ENABLED", "origin": expected_origin, "seq_id": ANY, "value": expected_result} ] diff --git a/tests/telemetry/app.py b/tests/telemetry/app.py index ae2b9932c9f..c945684ee46 100644 --- a/tests/telemetry/app.py +++ b/tests/telemetry/app.py @@ -16,7 +16,7 @@ def index(): def starting_app_view(): # We must call app-started before telemetry events can be sent to the agent. # This endpoint mocks the behavior of the agent writer. - telemetry_writer._app_started() + telemetry_writer._app_started_payload() return "OK", 200 diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 28847d6ebde..f923fb8dcf8 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -38,9 +38,9 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess): if os.fork() == 0: # Send multiple started events to confirm none get sent - telemetry_writer._app_started() - telemetry_writer._app_started() - telemetry_writer._app_started() + telemetry_writer._app_started_payload() + telemetry_writer._app_started_payload() + telemetry_writer._app_started_payload() else: # Print the parent process runtime id for validation print(get_runtime_id()) diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index fd5cc58e5ad..af26f94a09f 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -91,9 +91,9 @@ def test_app_started_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_started() queues a valid telemetry request which is then sent by periodic()""" with override_global_config(dict(_telemetry_dependency_collection=False)): # queue an app started event - event = telemetry_writer._app_started() - assert event is not None, "app_started() did not return an event" - telemetry_writer.add_event(event["payload"], "app-started") + app_started_payload = telemetry_writer._app_started_payload() + assert app_started_payload is not None, "app_started() did not return an event" + telemetry_writer.add_event(app_started_payload, "app-started") # force a flush telemetry_writer.periodic(force_flush=True) @@ -776,42 +776,43 @@ def test_app_product_change_event(mock_time, telemetry_writer, test_agent_sessio # Assert that the default product status is disabled assert any(telemetry_writer._product_enablement.values()) is False - + # Assert that the product status is first reported in app-started event telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, True) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.DYNAMIC_INSTRUMENTATION, True) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, True) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.APPSEC, True) - assert all(telemetry_writer._product_enablement.values()) - telemetry_writer.periodic(force_flush=True) - - # Assert that there's only an app_started event (since product activation happened before the app started) + events = test_agent_session.get_events("app-started") + assert len(events) == 1 + products = events[0]["payload"]["products"] + version = _pep440_to_semver() + assert products == { + TELEMETRY_APM_PRODUCT.APPSEC.value: {"enabled": True, "version": version}, + TELEMETRY_APM_PRODUCT.DYNAMIC_INSTRUMENTATION.value: {"enabled": True, "version": version}, + TELEMETRY_APM_PRODUCT.LLMOBS.value: {"enabled": True, "version": version}, + TELEMETRY_APM_PRODUCT.PROFILER.value: {"enabled": True, "version": version}, + } + # Assert that product change event is not sent, products should be first reported in app-started events = test_agent_session.get_events("app-product-change") telemetry_writer.periodic(force_flush=True) assert not len(events) - # Assert that unchanged status doesn't generate the event telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, True) telemetry_writer.periodic(force_flush=True) events = test_agent_session.get_events("app-product-change") assert not len(events) - - # Assert that a single event is generated + # Assert that product change event is sent when product status changes telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.APPSEC, False) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.DYNAMIC_INSTRUMENTATION, False) telemetry_writer.periodic(force_flush=True) events = test_agent_session.get_events("app-product-change") assert len(events) == 1 - - # Assert that payload is as expected assert events[0]["request_type"] == "app-product-change" products = events[0]["payload"]["products"] version = _pep440_to_semver() assert products == { TELEMETRY_APM_PRODUCT.APPSEC.value: {"enabled": False, "version": version}, TELEMETRY_APM_PRODUCT.DYNAMIC_INSTRUMENTATION.value: {"enabled": False, "version": version}, - TELEMETRY_APM_PRODUCT.LLMOBS.value: {"enabled": True, "version": version}, - TELEMETRY_APM_PRODUCT.PROFILER.value: {"enabled": True, "version": version}, } From ec2693724e033b9b61f3cc65d8184c28b1e6e8aa Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Wed, 17 Sep 2025 14:06:27 -0400 Subject: [PATCH 08/13] use event type --- ddtrace/internal/telemetry/constants.py | 17 ++++++++-- .../internal/telemetry/metrics_namespaces.pyx | 11 +++---- ddtrace/internal/telemetry/writer.py | 31 +++++++++++-------- tests/appsec/appsec/test_telemetry.py | 15 +++++---- tests/appsec/iast/test_telemetry.py | 10 +++--- tests/telemetry/test_telemetry_metrics.py | 11 +++---- 6 files changed, 54 insertions(+), 41 deletions(-) diff --git a/ddtrace/internal/telemetry/constants.py b/ddtrace/internal/telemetry/constants.py index d853474fdaa..23af42d6e86 100644 --- a/ddtrace/internal/telemetry/constants.py +++ b/ddtrace/internal/telemetry/constants.py @@ -11,9 +11,20 @@ class TELEMETRY_NAMESPACE(Enum): PROFILER = "profiler" -TELEMETRY_TYPE_GENERATE_METRICS = "generate-metrics" -TELEMETRY_TYPE_DISTRIBUTION = "distributions" -TELEMETRY_TYPE_LOGS = "logs" +class TELEMETRY_EVENT_TYPE(Enum): + STARTED = "app-started" + SHUTDOWN = "app-shutdown" + HEARTBEAT = "app-heartbeat" + EXTENDED_HEARTBEAT = "app-extended-heartbeat" + DEPENDENCIES_LOADED = "app-dependencies-loaded" + PRODUCT_CHANGE = "app-product-change" + INTEGRATIONS_CHANGE = "app-integrations-change" + ENDPOINTS = "app-endpoints" + CLIENT_CONFIGURATION_CHANGE = "app-client-configuration-change" + LOGS = "logs" + METRICS = "generate-metrics" + DISTRIBUTIONS = "distributions" + MESSAGE_BATCH = "message-batch" class TELEMETRY_LOG_LEVEL(Enum): diff --git a/ddtrace/internal/telemetry/metrics_namespaces.pyx b/ddtrace/internal/telemetry/metrics_namespaces.pyx index 25060787aec..45c6c268dc4 100644 --- a/ddtrace/internal/telemetry/metrics_namespaces.pyx +++ b/ddtrace/internal/telemetry/metrics_namespaces.pyx @@ -6,8 +6,7 @@ from typing import Tuple from ddtrace.internal import forksafe from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE MetricTagType = Optional[Tuple[Tuple[str, str], ...]] @@ -46,14 +45,14 @@ cdef class MetricNamespace: now = int(time.time()) data = { - TELEMETRY_TYPE_GENERATE_METRICS: {}, - TELEMETRY_TYPE_DISTRIBUTION: {}, + TELEMETRY_EVENT_TYPE.METRICS: {}, + TELEMETRY_EVENT_TYPE.DISTRIBUTIONS: {}, } for metric_id, value in namespace_metrics.items(): name, namespace, _tags, metric_type = metric_id tags = ["{}:{}".format(k, v).lower() for k, v in _tags] if _tags else [] if metric_type is MetricType.DISTRIBUTION: - data[TELEMETRY_TYPE_DISTRIBUTION].setdefault(namespace, []).append({ + data[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS].setdefault(namespace, []).append({ "metric": name, "points": value, "tags": tags, @@ -70,7 +69,7 @@ cdef class MetricNamespace: } if metric_type in (MetricType.RATE, MetricType.GAUGE): metric["interval"] = _interval - data[TELEMETRY_TYPE_GENERATE_METRICS].setdefault(namespace, []).append(metric) + data[TELEMETRY_EVENT_TYPE.METRICS].setdefault(namespace, []).append(metric) return data diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index e98f63f007d..3422e467ed3 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -31,9 +31,9 @@ from ..utils.version import version as tracer_version from . import modules from .constants import TELEMETRY_APM_PRODUCT +from .constants import TELEMETRY_EVENT_TYPE from .constants import TELEMETRY_LOG_LEVEL from .constants import TELEMETRY_NAMESPACE -from .constants import TELEMETRY_TYPE_LOGS from .data import get_application from .data import get_host_info from .data import get_python_config_vars @@ -216,7 +216,7 @@ def __init__(self, is_periodic=True, agentless=None): self.enable() # Force app started for unit tests if config.FORCE_START and (app_started := self._app_started_payload()): - self._events_queue.append({"payload": app_started, "request_type": "app-started"}) + self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED}) get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self)) def enable(self): @@ -635,7 +635,7 @@ def periodic(self, force_flush=False, shutting_down=False): ) if logs := self._report_logs(): - events.append({"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS}) + events.append({"payload": {"logs": list(logs)}, "request_type": TELEMETRY_EVENT_TYPE.LOGS}) # Queue metrics if not at heartbeat interval if self._is_periodic and force_flush is False: @@ -649,34 +649,39 @@ def periodic(self, force_flush=False, shutting_down=False): # At heartbeat interval, collect and send all telemetry data if app_started_payload := self._app_started_payload(): # app-started should be the first event in the batch - events = [{"payload": app_started_payload, "request_type": "app-started"}] + events + events = [{"payload": app_started_payload, "request_type": TELEMETRY_EVENT_TYPE.STARTED}] + events if products := self._report_app_products(): - events.append({"payload": {"products": products}, "request_type": "app-product-change"}) + events.append({"payload": {"products": products}, "request_type": TELEMETRY_EVENT_TYPE.PRODUCT_CHANGE}) if ints := self._report_integrations(): - events.append({"payload": {"integrations": ints}, "request_type": "app-integrations-change"}) + events.append({"payload": {"integrations": ints}, "request_type": TELEMETRY_EVENT_TYPE.INTEGRATIONS_CHANGE}) if endpoints := self._report_app_endpoints(): - events.append({"payload": endpoints, "request_type": "app-endpoints"}) + events.append({"payload": endpoints, "request_type": TELEMETRY_EVENT_TYPE.ENDPOINTS}) if configs := self._report_configuration_queue(): - events.append({"payload": {"configuration": configs}, "request_type": "app-client-configuration-change"}) + events.append( + { + "payload": {"configuration": configs}, + "request_type": TELEMETRY_EVENT_TYPE.CLIENT_CONFIGURATION_CHANGE, + } + ) if deps := self._report_app_dependencies(): - events.append({"payload": {"dependencies": deps}, "request_type": "app-dependencies-loaded"}) + events.append({"payload": {"dependencies": deps}, "request_type": TELEMETRY_EVENT_TYPE.DEPENDENCIES_LOADED}) if shutting_down and not self._forked: - events.append({"payload": {}, "request_type": "app-closing"}) + events.append({"payload": {}, "request_type": TELEMETRY_EVENT_TYPE.SHUTDOWN}) # Always include a heartbeat to keep RC connections alive # Extended heartbeat should be queued after app-dependencies-loaded event. This # ensures that that imported dependencies are accurately reported. if heartbeat_payload := self._app_heartbeat_payload(): # Extended heartbeat report dependencies while regular heartbeats report empty payloads - events.append({"payload": heartbeat_payload, "request_type": "app-extended-heartbeat"}) + events.append({"payload": heartbeat_payload, "request_type": TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT}) else: - events.append({"payload": {}, "request_type": "app-heartbeat"}) + events.append({"payload": {}, "request_type": TELEMETRY_EVENT_TYPE.HEARTBEAT}) # Get any queued events and combine with current batch if queued_events := self._flush_events_queue(): @@ -781,7 +786,7 @@ def _telemetry_excepthook(self, tp, value, root_traceback): self.add_integration(integration_name, True, error_msg=error_msg) if app_started := self._app_started_payload(False): - self._events_queue.append({"payload": app_started, "request_type": "app-started"}) + self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED}) self.app_shutdown() diff --git a/tests/appsec/appsec/test_telemetry.py b/tests/appsec/appsec/test_telemetry.py index fd542ba244d..20ba8609356 100644 --- a/tests/appsec/appsec/test_telemetry.py +++ b/tests/appsec/appsec/test_telemetry.py @@ -15,9 +15,8 @@ from ddtrace.constants import APPSEC_ENV from ddtrace.contrib.internal.trace_utils import set_http_meta from ddtrace.ext import SpanTypes +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS from ddtrace.trace import tracer import tests.appsec.rules as rules from tests.appsec.utils import asm_context @@ -38,7 +37,7 @@ def _assert_generate_metrics(metrics_result, is_rule_triggered=False, is_blocked # this function and make the tests flaky. That's why we exclude the "enabled" metric from this assert generate_metrics = [ m - for m in metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.APPSEC.value] + for m in metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.APPSEC.value] if m["metric"] != "enabled" ] assert ( @@ -76,7 +75,7 @@ def _assert_generate_metrics(metrics_result, is_rule_triggered=False, is_blocked def _assert_distributions_metrics(metrics_result, is_rule_triggered=False, is_blocked_request=False): - distributions_metrics = metrics_result[TELEMETRY_TYPE_DISTRIBUTION][TELEMETRY_NAMESPACE.APPSEC.value] + distributions_metrics = metrics_result[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS][TELEMETRY_NAMESPACE.APPSEC.value] assert len(distributions_metrics) == 2, "Expected 2 distributions_metrics" for metric in distributions_metrics: @@ -101,8 +100,8 @@ def test_metrics_when_appsec_doesnt_runs(telemetry_writer, tracer): rules.Config(), ) metrics_data = telemetry_writer._namespace.flush() - assert len(metrics_data[TELEMETRY_TYPE_GENERATE_METRICS]) == 0 - assert len(metrics_data[TELEMETRY_TYPE_DISTRIBUTION]) == 0 + assert len(metrics_data[TELEMETRY_EVENT_TYPE.METRICS]) == 0 + assert len(metrics_data[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS]) == 0 def test_metrics_when_appsec_runs(telemetry_writer, tracer): @@ -185,7 +184,7 @@ def test_log_metric_error_ddwaf_timeout(telemetry_writer, tracer): list_metrics_logs = list(telemetry_writer._logs) assert len(list_metrics_logs) == 0 - generate_metrics = telemetry_writer._namespace.flush()[TELEMETRY_TYPE_GENERATE_METRICS][ + generate_metrics = telemetry_writer._namespace.flush()[TELEMETRY_EVENT_TYPE.METRICS][ TELEMETRY_NAMESPACE.APPSEC.value ] @@ -228,7 +227,7 @@ def test_log_metric_error_ddwaf_internal_error(telemetry_writer): assert len(list_telemetry_logs) == 0 assert span.get_tag("_dd.appsec.waf.error") == "-3" metrics_result = telemetry_writer._namespace.flush() - list_telemetry_metrics = metrics_result.get(TELEMETRY_TYPE_GENERATE_METRICS, {}).get( + list_telemetry_metrics = metrics_result.get(TELEMETRY_EVENT_TYPE.METRICS, {}).get( TELEMETRY_NAMESPACE.APPSEC.value, {} ) error_metrics = [m for m in list_telemetry_metrics if m["metric"] == "waf.error"] diff --git a/tests/appsec/iast/test_telemetry.py b/tests/appsec/iast/test_telemetry.py index 76ff7b84f30..de1890062ff 100644 --- a/tests/appsec/iast/test_telemetry.py +++ b/tests/appsec/iast/test_telemetry.py @@ -21,8 +21,8 @@ from ddtrace.appsec._iast.taint_sinks.header_injection import patch as header_injection_patch from ddtrace.appsec._iast.taint_sinks.weak_hash import patch as weak_hash_patch from ddtrace.ext import SpanTypes +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS from tests.appsec.iast.iast_utils import _iast_patched_module from tests.appsec.utils import asm_context from tests.utils import DummyTracer @@ -31,7 +31,7 @@ def _assert_instrumented_sink(telemetry_writer, vuln_type): metrics_result = telemetry_writer._namespace.flush() - generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value] + generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value] assert len(generate_metrics) == 1, "Expected 1 generate_metrics" assert [metric["metric"] for metric in generate_metrics] == ["instrumented.sink"] assert [metric["tags"] for metric in generate_metrics] == [[f"vulnerability_type:{vuln_type.lower()}"]] @@ -97,7 +97,7 @@ def test_metric_executed_sink( metrics_result = telemetry_writer._namespace.flush() _testing_unpatch_iast() - generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value] + generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value] assert len(generate_metrics) == 1 # Remove potential sinks from internal usage of the lib (like http.client, used to communicate with # the agent) @@ -142,7 +142,7 @@ def test_metric_instrumented_propagation(no_request_sampling, telemetry_writer): _iast_patched_module("benchmarks.bm.iast_fixtures.str_methods") metrics_result = telemetry_writer._namespace.flush() - generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value] + generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value] # Remove potential sinks from internal usage of the lib (like http.client, used to communicate with # the agent) filtered_metrics = [ @@ -170,7 +170,7 @@ def test_metric_request_tainted(no_request_sampling, telemetry_writer): metrics_result = telemetry_writer._namespace.flush() - generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value] + generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value] # Remove potential sinks from internal usage of the lib (like http.client, used to communicate with # the agent) filtered_metrics = [metric["metric"] for metric in generate_metrics if metric["metric"] != "executed.sink"] diff --git a/tests/telemetry/test_telemetry_metrics.py b/tests/telemetry/test_telemetry_metrics.py index 2cf3d78f014..e3548debf94 100644 --- a/tests/telemetry/test_telemetry_metrics.py +++ b/tests/telemetry/test_telemetry_metrics.py @@ -3,10 +3,9 @@ from mock.mock import ANY +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS from tests.utils import override_global_config @@ -14,7 +13,7 @@ def _assert_metric( test_agent, expected_metrics, namespace=TELEMETRY_NAMESPACE.TRACERS, - type_paypload=TELEMETRY_TYPE_GENERATE_METRICS, + type_paypload=TELEMETRY_EVENT_TYPE.METRICS, ): assert len(expected_metrics) > 0, "expected_metrics should not be empty" test_agent.telemetry_writer.periodic(force_flush=True) @@ -291,7 +290,7 @@ def test_send_appsec_distributions_metric(telemetry_writer, test_agent_session, test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_TYPE_DISTRIBUTION, + type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) @@ -312,7 +311,7 @@ def test_send_metric_flush_and_distributions_series_is_restarted(telemetry_write test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_TYPE_DISTRIBUTION, + type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) expected_series = [ @@ -329,7 +328,7 @@ def test_send_metric_flush_and_distributions_series_is_restarted(telemetry_write test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_TYPE_DISTRIBUTION, + type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) From ed22cdcc113188df7c3cdb05237388e34aa2c6ee Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Tue, 30 Sep 2025 09:29:51 -0400 Subject: [PATCH 09/13] clean up event names --- ddtrace/internal/telemetry/writer.py | 36 +++++++++++++-------------- tests/appsec/appsec/test_telemetry.py | 4 +-- tests/telemetry/app.py | 2 +- tests/telemetry/test_telemetry.py | 6 ++--- tests/telemetry/test_writer.py | 2 +- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 9004261ec4b..70f377e16f0 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -202,7 +202,7 @@ def __init__(self, is_periodic=True, agentless=None): # This will occur when the agent writer starts. self.enable() # Force app started for unit tests - if config.FORCE_START and (app_started := self._app_started_payload()): + if config.FORCE_START and (app_started := self._report_app_started()): self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED}) get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self)) @@ -294,7 +294,7 @@ def add_integration(self, integration_name, patched, auto_patched=None, error_ms self._integrations_queue[integration_name]["compatible"] = error_msg == "" self._integrations_queue[integration_name]["error"] = error_msg - def _app_started_payload(self, register_app_shutdown: bool = True) -> Optional[Dict[str, Any]]: + def _report_app_started(self, register_app_shutdown: bool = True) -> Optional[Dict[str, Any]]: """Sent when TelemetryWriter is enabled or forks""" if self._forked or self.started: # app-started events should only be sent by the main process @@ -307,8 +307,8 @@ def _app_started_payload(self, register_app_shutdown: bool = True) -> Optional[D self.add_configurations(get_python_config_vars()) payload = { - "configuration": self._report_configuration_queue(), - "products": self._report_app_products(), + "configuration": self._report_configurations(), + "products": self._report_products(), } # type: Dict[str, Union[Dict[str, Any], List[Any]]] # Add time to value telemetry metrics for single step instrumentation if config.INSTALL_ID or config.INSTALL_TYPE or config.INSTALL_TIME: @@ -319,7 +319,7 @@ def _app_started_payload(self, register_app_shutdown: bool = True) -> Optional[D } return payload - def _app_heartbeat_payload(self): + def _report_heartbeat(self): # type: () -> Dict[str, Any] if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: self._extended_time += self._extended_heartbeat_interval @@ -338,7 +338,7 @@ def _report_integrations(self): self._integrations_queue = dict() return integrations - def _report_configuration_queue(self): + def _report_configurations(self): # type: () -> List[Dict] """Flushes and returns a list of all queued configurations""" with self._service_lock: @@ -346,7 +346,7 @@ def _report_configuration_queue(self): self._configuration_queue = [] return configurations - def _report_app_dependencies(self): + def _report_dependencies(self): # type: () -> Optional[Dict[str, Any]] """Adds events to report imports done since the last periodic run""" if not config.DEPENDENCY_COLLECTION or not self._enabled: @@ -358,7 +358,7 @@ def _report_app_dependencies(self): return {} return update_imported_dependencies(self._imported_dependencies, newly_imported_deps) - def _report_app_endpoints(self): + def _report_endpoints(self): """Adds a Telemetry event which sends the list of HTTP endpoints found at startup to the agent""" import ddtrace.settings.asm as asm_config_module @@ -371,7 +371,7 @@ def _report_app_endpoints(self): with self._service_lock: return endpoint_collection.flush(asm_config_module.config._api_security_endpoint_collection_limit) - def _report_app_products(self): + def _report_products(self): # type: () -> Dict[str, Any] """Adds a Telemetry event which reports the enablement of an APM product""" with self._service_lock: @@ -622,20 +622,20 @@ def periodic(self, force_flush=False, shutting_down=False): self._periodic_count = 0 # At heartbeat interval, collect and send all telemetry data - if app_started_payload := self._app_started_payload(): + if app_started_payload := self._report_app_started(): # app-started should be the first event in the batch events = [{"payload": app_started_payload, "request_type": TELEMETRY_EVENT_TYPE.STARTED}] + events - if products := self._report_app_products(): + if products := self._report_products(): events.append({"payload": {"products": products}, "request_type": TELEMETRY_EVENT_TYPE.PRODUCT_CHANGE}) if ints := self._report_integrations(): events.append({"payload": {"integrations": ints}, "request_type": TELEMETRY_EVENT_TYPE.INTEGRATIONS_CHANGE}) - if endpoints := self._report_app_endpoints(): + if endpoints := self._report_endpoints(): events.append({"payload": endpoints, "request_type": TELEMETRY_EVENT_TYPE.ENDPOINTS}) - if configs := self._report_configuration_queue(): + if configs := self._report_configurations(): events.append( { "payload": {"configuration": configs}, @@ -643,7 +643,7 @@ def periodic(self, force_flush=False, shutting_down=False): } ) - if deps := self._report_app_dependencies(): + if deps := self._report_dependencies(): events.append({"payload": {"dependencies": deps}, "request_type": TELEMETRY_EVENT_TYPE.DEPENDENCIES_LOADED}) if shutting_down and not self._forked: @@ -652,14 +652,14 @@ def periodic(self, force_flush=False, shutting_down=False): # Always include a heartbeat to keep RC connections alive # Extended heartbeat should be queued after app-dependencies-loaded event. This # ensures that that imported dependencies are accurately reported. - if heartbeat_payload := self._app_heartbeat_payload(): + if heartbeat_payload := self._report_heartbeat(): # Extended heartbeat report dependencies while regular heartbeats report empty payloads events.append({"payload": heartbeat_payload, "request_type": TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT}) else: events.append({"payload": {}, "request_type": TELEMETRY_EVENT_TYPE.HEARTBEAT}) # Get any queued events and combine with current batch - if queued_events := self._flush_events_queue(): + if queued_events := self._report_events(): events.extend(queued_events) # Create comma-separated list of event types for logging @@ -692,7 +692,7 @@ def reset_queues(self): self._imported_dependencies = {} self._configuration_queue = [] - def _flush_events_queue(self): + def _report_events(self): # type: () -> List[Dict] """Flushes and returns a list of all telemtery event""" with self._service_lock: @@ -761,7 +761,7 @@ def _telemetry_excepthook(self, tp, value, root_traceback): error_msg = "{}:{} {}".format(filename, lineno, str(value)) self.add_integration(integration_name, True, error_msg=error_msg) - if app_started := self._app_started_payload(False): + if app_started := self._report_app_started(False): self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED}) self.app_shutdown() diff --git a/tests/appsec/appsec/test_telemetry.py b/tests/appsec/appsec/test_telemetry.py index c2f0f35b3a8..a07a0b8e06d 100644 --- a/tests/appsec/appsec/test_telemetry.py +++ b/tests/appsec/appsec/test_telemetry.py @@ -300,7 +300,7 @@ def test_appsec_enabled_metric( # Restore defaults and enabling telemetry appsec service with override_global_config({"_asm_enabled": True, "_lib_was_injected": False}): tracer.configure(appsec_enabled=appsec_enabled, appsec_enabled_origin=APPSEC.ENABLED_ORIGIN_UNKNOWN) - telemetry_writer._report_configuration_queue() + telemetry_writer._report_configurations() # Start the test with override_env(environment), override_global_config( @@ -312,7 +312,7 @@ def test_appsec_enabled_metric( telemetry_writer._dispatch() - metrics_result = telemetry_writer._report_configuration_queue() + metrics_result = telemetry_writer._report_configurations() assert metrics_result == [ {"name": "DD_APPSEC_ENABLED", "origin": expected_origin, "seq_id": ANY, "value": expected_result} ] diff --git a/tests/telemetry/app.py b/tests/telemetry/app.py index c945684ee46..530e98dfe75 100644 --- a/tests/telemetry/app.py +++ b/tests/telemetry/app.py @@ -16,7 +16,7 @@ def index(): def starting_app_view(): # We must call app-started before telemetry events can be sent to the agent. # This endpoint mocks the behavior of the agent writer. - telemetry_writer._app_started_payload() + telemetry_writer._report_app_started() return "OK", 200 diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 90bfc50b020..9a439f5a6e3 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -37,9 +37,9 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess): if os.fork() == 0: # Send multiple started events to confirm none get sent - telemetry_writer._app_started_payload() - telemetry_writer._app_started_payload() - telemetry_writer._app_started_payload() + telemetry_writer._report_app_started() + telemetry_writer._report_app_started() + telemetry_writer._report_app_started() else: # Print the parent process runtime id for validation print(get_runtime_id()) diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 16eef83f779..5e8293234c7 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -92,7 +92,7 @@ def test_app_started_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_started() queues a valid telemetry request which is then sent by periodic()""" with override_global_config(dict(_telemetry_dependency_collection=False)): # queue an app started event - payload = telemetry_writer._app_started_payload() + payload = telemetry_writer._report_app_started() assert payload is not None, "app_started() did not return an event" telemetry_writer.add_event(payload, "app-started") # force a flush From 86de1a9dddad087bb760064c5a2d9b8e0a2b1b4a Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Tue, 30 Sep 2025 09:45:02 -0400 Subject: [PATCH 10/13] fix typing --- ddtrace/internal/telemetry/writer.py | 153 +++++++++++++-------------- 1 file changed, 71 insertions(+), 82 deletions(-) diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 70f377e16f0..5e9343b3090 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -1,18 +1,17 @@ # -*- coding: utf-8 -*- -import http.client as httplib # noqa: E402 +import http.client as httplib import itertools import os import sys import time import traceback -from typing import TYPE_CHECKING # noqa:F401 -from typing import Any # noqa:F401 -from typing import Dict # noqa:F401 -from typing import List # noqa:F401 -from typing import Optional # noqa:F401 -from typing import Set # noqa:F401 -from typing import Tuple # noqa:F401 -from typing import Union # noqa:F401 +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Set +from typing import Tuple +from typing import Union import urllib.parse as parse from ddtrace.internal.endpoints import endpoint_collection @@ -49,10 +48,10 @@ class LogData(dict): - def __hash__(self): + def __hash__(self) -> int: return hash((self["message"], self["level"], self.get("tags"), self.get("stack_trace"))) - def __eq__(self, other): + def __eq__(self, other) -> bool: return ( self["message"] == other["message"] and self["level"] == other["level"] @@ -65,8 +64,7 @@ class _TelemetryClient: AGENT_ENDPOINT = "telemetry/proxy/api/v2/apmtelemetry" AGENTLESS_ENDPOINT_V2 = "api/v2/apmtelemetry" - def __init__(self, agentless): - # type: (bool) -> None + def __init__(self, agentless: bool) -> None: self._telemetry_url = self.get_host(config.SITE, agentless) self._endpoint = self.get_endpoint(agentless) self._encoder = JSONEncoderV2() @@ -82,7 +80,7 @@ def __init__(self, agentless): self._headers["dd-api-key"] = config.API_KEY @property - def url(self): + def url(self) -> str: return parse.urljoin(self._telemetry_url, self._endpoint) def send_event(self, request: Dict, payload_type: str) -> Optional[httplib.HTTPResponse]: @@ -114,8 +112,7 @@ def send_event(self, request: Dict, payload_type: str) -> Optional[httplib.HTTPR conn.close() return resp - def get_headers(self, request): - # type: (Dict) -> Dict + def get_headers(self, request: Dict) -> Dict: """Get all telemetry api v2 request headers""" headers = self._headers.copy() headers["DD-Telemetry-Debug-Enabled"] = request["debug"] @@ -150,8 +147,7 @@ class TelemetryWriter(PeriodicService): _ORIGINAL_EXCEPTHOOK = staticmethod(sys.excepthook) CWD = os.getcwd() - def __init__(self, is_periodic=True, agentless=None): - # type: (bool, Optional[bool]) -> None + def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) -> None: super(TelemetryWriter, self).__init__(interval=min(config.HEARTBEAT_INTERVAL, 10)) # Decouples the aggregation and sending of the telemetry events @@ -160,12 +156,12 @@ def __init__(self, is_periodic=True, agentless=None): self._periodic_threshold = int(config.HEARTBEAT_INTERVAL // self.interval) - 1 self._periodic_count = 0 self._is_periodic = is_periodic - self._integrations_queue = dict() # type: Dict[str, Dict] + self._integrations_queue: Dict[str, Dict] = dict() self._namespace = MetricNamespace() - self._logs = set() # type: Set[Dict[str, Any]] - self._forked = False # type: bool - self._events_queue = [] # type: List[Dict] - self._configuration_queue = [] # type: List[Dict] + self._logs: Set[Dict[str, Any]] = set() + self._forked: bool = False + self._events_queue: List[Dict] = [] + self._configuration_queue: List[Dict] = [] self._imported_dependencies: Dict[str, str] = dict() self._modules_already_imported: Set[str] = set() self._product_enablement = {product.value: False for product in TELEMETRY_APM_PRODUCT} @@ -206,8 +202,7 @@ def __init__(self, is_periodic=True, agentless=None): self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED}) get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self)) - def enable(self): - # type: () -> bool + def enable(self) -> bool: """ Enable the instrumentation telemetry collection service. If the service has already been activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service. @@ -226,8 +221,7 @@ def enable(self): self.status = ServiceStatus.RUNNING return True - def disable(self): - # type: () -> None + def disable(self) -> None: """ Disable the telemetry collection service and drop the existing integrations and events Once disabled, telemetry collection can not be re-enabled. @@ -235,21 +229,17 @@ def disable(self): self._enabled = False self.reset_queues() - def enable_agentless_client(self, enabled=True): - # type: (bool) -> None - + def enable_agentless_client(self, enabled: bool = True) -> None: if self._client._agentless == enabled: return self._client = _TelemetryClient(enabled) - def _is_running(self): - # type: () -> bool + def _is_running(self) -> bool: """Returns True when the telemetry writer worker thread is running""" return self._is_periodic and self._worker is not None and self.status is ServiceStatus.RUNNING - def add_event(self, payload, payload_type): - # type: (Union[Dict[str, Any], List[Any]], str) -> None + def add_event(self, payload: Union[Dict[str, Any], List[Any]], payload_type: str) -> None: """ Adds a Telemetry event to the TelemetryWriter event buffer @@ -260,16 +250,21 @@ def add_event(self, payload, payload_type): if self.enable(): self._events_queue.append({"payload": payload, "request_type": payload_type}) - def add_events(self, events): - # type: (List[Dict[str, Any]]) -> None + def add_events(self, events: List[Dict[str, Any]]) -> None: """ Adds a list of Telemetry events to the TelemetryWriter event buffer """ if self.enable(): self._events_queue.extend(events) - def add_integration(self, integration_name, patched, auto_patched=None, error_msg=None, version=""): - # type: (str, bool, Optional[bool], Optional[str], Optional[str]) -> None + def add_integration( + self, + integration_name: str, + patched: bool, + auto_patched: Optional[bool] = None, + error_msg: Optional[str] = None, + version: str = "", + ) -> None: """ Creates and queues the names and settings of a patched module @@ -309,7 +304,7 @@ def _report_app_started(self, register_app_shutdown: bool = True) -> Optional[Di payload = { "configuration": self._report_configurations(), "products": self._report_products(), - } # type: Dict[str, Union[Dict[str, Any], List[Any]]] + } # Add time to value telemetry metrics for single step instrumentation if config.INSTALL_ID or config.INSTALL_TYPE or config.INSTALL_TIME: payload["install_signature"] = { @@ -319,8 +314,7 @@ def _report_app_started(self, register_app_shutdown: bool = True) -> Optional[Di } return payload - def _report_heartbeat(self): - # type: () -> Dict[str, Any] + def _report_heartbeat(self) -> Optional[Dict[str, Any]]: if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: self._extended_time += self._extended_heartbeat_interval return { @@ -328,51 +322,47 @@ def _report_heartbeat(self): {"name": name, "version": version} for name, version in self._imported_dependencies.items() ] } - return {} + return None - def _report_integrations(self): - # type: () -> List[Dict] + def _report_integrations(self) -> List[Dict]: """Flushes and returns a list of all queued integrations""" with self._service_lock: integrations = list(self._integrations_queue.values()) self._integrations_queue = dict() return integrations - def _report_configurations(self): - # type: () -> List[Dict] + def _report_configurations(self) -> List[Dict]: """Flushes and returns a list of all queued configurations""" with self._service_lock: configurations = self._configuration_queue self._configuration_queue = [] return configurations - def _report_dependencies(self): - # type: () -> Optional[Dict[str, Any]] + def _report_dependencies(self) -> Optional[Dict[str, Any]]: """Adds events to report imports done since the last periodic run""" if not config.DEPENDENCY_COLLECTION or not self._enabled: - return {} + return None with self._service_lock: newly_imported_deps = modules.get_newly_imported_modules(self._modules_already_imported) if not newly_imported_deps: - return {} + return None return update_imported_dependencies(self._imported_dependencies, newly_imported_deps) - def _report_endpoints(self): + def _report_endpoints(self) -> Optional[Dict[str, Any]]: """Adds a Telemetry event which sends the list of HTTP endpoints found at startup to the agent""" import ddtrace.settings.asm as asm_config_module if not asm_config_module.config._api_security_endpoint_collection or not self._enabled: - return {} + return None if not endpoint_collection.endpoints: - return {} + return None with self._service_lock: return endpoint_collection.flush(asm_config_module.config._api_security_endpoint_collection_limit) - def _report_products(self): - # type: () -> Dict[str, Any] + def _report_products(self) -> Dict[str, Any]: """Adds a Telemetry event which reports the enablement of an APM product""" with self._service_lock: products = self._product_enablement.items() @@ -381,16 +371,20 @@ def _report_products(self): self._product_enablement = {} return {product: {"version": tracer_version, "enabled": status} for product, status in products} - def product_activated(self, product, status): - # type: (str, bool) -> None + def product_activated(self, product: str, status: bool) -> None: """Updates the product enablement dict""" with self._service_lock: # Only send product change event if the product status has changed if self._previous_product_enablement.get(product) != status: self._product_enablement[product] = status - def add_configuration(self, configuration_name, configuration_value, origin="unknown", config_id=None): - # type: (str, Any, str, Optional[str]) -> None + def add_configuration( + self, + configuration_name: str, + configuration_value: Any, + origin: str = "unknown", + config_id: Optional[str] = None, + ) -> None: """Creates and queues the name, origin, value of a configuration""" if isinstance(configuration_value, dict): configuration_value = ",".join(":".join((k, str(v))) for k, v in configuration_value.items()) @@ -412,7 +406,7 @@ def add_configuration(self, configuration_name, configuration_value, origin="unk config["seq_id"] = next(self._sequence_configurations) self._configuration_queue.append(config) - def add_configurations(self, configuration_list: List[Tuple[str, str, str]]): + def add_configurations(self, configuration_list: List[Tuple[str, str, str]]) -> None: """Creates and queues a list of configurations""" with self._service_lock: for name, value, origin in configuration_list: @@ -425,7 +419,7 @@ def add_configurations(self, configuration_list: List[Tuple[str, str, str]]): } ) - def add_log(self, level, message, stack_trace="", tags=None): + def add_log(self, level, message: str, stack_trace: str = "", tags: Optional[Dict] = None) -> None: """ Queues log. This event is meant to send library logs to Datadog's backend through the Telemetry intake. This will make support cycles easier and ensure we know about potentially silent issues in libraries. @@ -497,7 +491,7 @@ def _format_file_path(self, filename: str) -> str: def add_gauge_metric( self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: Optional[MetricTagType] = None - ): + ) -> None: """ Queues gauge metric """ @@ -512,7 +506,7 @@ def add_gauge_metric( def add_rate_metric( self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: Optional[MetricTagType] = None - ): + ) -> None: """ Queues rate metric """ @@ -527,7 +521,7 @@ def add_rate_metric( def add_count_metric( self, namespace: TELEMETRY_NAMESPACE, name: str, value: int = 1, tags: Optional[MetricTagType] = None - ): + ) -> None: """ Queues count metric """ @@ -542,7 +536,7 @@ def add_count_metric( def add_distribution_metric( self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: Optional[MetricTagType] = None - ): + ) -> None: """ Queues distributions metric """ @@ -555,20 +549,19 @@ def add_distribution_metric( tags, ) - def _report_logs(self): - # type () -> Set[Metric] + def _report_logs(self) -> Set[Dict[str, Any]]: with self._service_lock: logs = self._logs self._logs = set() return logs - def _dispatch(self): + def _dispatch(self) -> None: # moved core here to avoid circular import from ddtrace.internal import core core.dispatch("telemetry.periodic") - def periodic(self, force_flush=False, shutting_down=False): + def periodic(self, force_flush: bool = False, shutting_down: bool = False) -> None: """Process and send telemetry events in batches. This method handles the periodic collection and sending of telemetry data with two main timing intervals: @@ -678,13 +671,12 @@ def periodic(self, force_flush=False, shutting_down=False): } self._client.send_event(batch_event, payload_types) - def app_shutdown(self): + def app_shutdown(self) -> None: if self.started: self.periodic(force_flush=True, shutting_down=True) self.disable() - def reset_queues(self): - # type: () -> None + def reset_queues(self) -> None: self._events_queue = [] self._integrations_queue = dict() self._namespace.flush() @@ -692,16 +684,14 @@ def reset_queues(self): self._imported_dependencies = {} self._configuration_queue = [] - def _report_events(self): - # type: () -> List[Dict] + def _report_events(self) -> List[Dict]: """Flushes and returns a list of all telemtery event""" with self._service_lock: events = self._events_queue self._events_queue = [] return events - def _fork_writer(self): - # type: () -> None + def _fork_writer(self) -> None: self._forked = True # Avoid sending duplicate events. # Queued events should be sent in the main process. @@ -715,17 +705,16 @@ def _fork_writer(self): # error in Python 3.12 self.enable() - def _restart_sequence(self): + def _restart_sequence(self) -> None: self._sequence_payloads = itertools.count(1) self._sequence_configurations = itertools.count(1) - def _stop_service(self, join=True, *args, **kwargs): - # type: (...) -> None + def _stop_service(self, join: bool = True, *args, **kwargs) -> None: super(TelemetryWriter, self)._stop_service(*args, **kwargs) if join: self.join(timeout=2) - def _telemetry_excepthook(self, tp, value, root_traceback): + def _telemetry_excepthook(self, tp, value, root_traceback) -> None: if root_traceback is not None: # Get the frame which raised the exception traceback = root_traceback @@ -768,10 +757,10 @@ def _telemetry_excepthook(self, tp, value, root_traceback): return TelemetryWriter._ORIGINAL_EXCEPTHOOK(tp, value, root_traceback) - def install_excepthook(self): + def install_excepthook(self) -> None: """Install a hook that intercepts unhandled exception and send metrics about them.""" sys.excepthook = self._telemetry_excepthook - def uninstall_excepthook(self): + def uninstall_excepthook(self) -> None: """Uninstall the global tracer except hook.""" sys.excepthook = TelemetryWriter._ORIGINAL_EXCEPTHOOK From 3dea761654605b69d063ce77b24ab9179dfd833d Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Wed, 1 Oct 2025 10:29:24 -0400 Subject: [PATCH 11/13] remove add_event and add_events, use get event to generate payloads, make telemetry type an enum --- ddtrace/internal/telemetry/constants.py | 2 +- ddtrace/internal/telemetry/writer.py | 65 ++++++++--------------- tests/telemetry/test_telemetry_metrics.py | 10 ++-- tests/telemetry/test_writer.py | 42 +-------------- 4 files changed, 30 insertions(+), 89 deletions(-) diff --git a/ddtrace/internal/telemetry/constants.py b/ddtrace/internal/telemetry/constants.py index ab5097507ef..bab41bcfaa9 100644 --- a/ddtrace/internal/telemetry/constants.py +++ b/ddtrace/internal/telemetry/constants.py @@ -11,7 +11,7 @@ class TELEMETRY_NAMESPACE(Enum): PROFILER = "profiler" -class TELEMETRY_EVENT_TYPE(object): +class TELEMETRY_EVENT_TYPE(Enum): STARTED = "app-started" SHUTDOWN = "app-closing" HEARTBEAT = "app-heartbeat" diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 5e9343b3090..545ee7e75b7 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -160,7 +160,7 @@ def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) - self._namespace = MetricNamespace() self._logs: Set[Dict[str, Any]] = set() self._forked: bool = False - self._events_queue: List[Dict] = [] + self._events_queue: List[Dict[str, Any]] = [] self._configuration_queue: List[Dict] = [] self._imported_dependencies: Dict[str, str] = dict() self._modules_already_imported: Set[str] = set() @@ -199,7 +199,7 @@ def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) - self.enable() # Force app started for unit tests if config.FORCE_START and (app_started := self._report_app_started()): - self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED}) + self._events_queue.append(self._get_event(app_started, TELEMETRY_EVENT_TYPE.STARTED)) get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self)) def enable(self) -> bool: @@ -221,6 +221,11 @@ def enable(self) -> bool: self.status = ServiceStatus.RUNNING return True + def _get_event( + self, payload: Union[Dict[str, Any], List[Any]], payload_type: TELEMETRY_EVENT_TYPE + ) -> Dict[str, Any]: + return {"payload": payload, "request_type": payload_type.value} + def disable(self) -> None: """ Disable the telemetry collection service and drop the existing integrations and events @@ -239,24 +244,6 @@ def _is_running(self) -> bool: """Returns True when the telemetry writer worker thread is running""" return self._is_periodic and self._worker is not None and self.status is ServiceStatus.RUNNING - def add_event(self, payload: Union[Dict[str, Any], List[Any]], payload_type: str) -> None: - """ - Adds a Telemetry event to the TelemetryWriter event buffer - - :param Dict payload: stores a formatted telemetry event - :param str payload_type: The payload_type denotes the type of telemetry request. - Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change - """ - if self.enable(): - self._events_queue.append({"payload": payload, "request_type": payload_type}) - - def add_events(self, events: List[Dict[str, Any]]) -> None: - """ - Adds a list of Telemetry events to the TelemetryWriter event buffer - """ - if self.enable(): - self._events_queue.extend(events) - def add_integration( self, integration_name: str, @@ -598,60 +585,54 @@ def periodic(self, force_flush: bool = False, shutting_down: bool = False) -> No for payload_type, namespaces in namespace_metrics.items(): for namespace, metrics in namespaces.items(): if metrics: - events.append( - {"payload": {"namespace": namespace, "series": metrics}, "request_type": payload_type} - ) + events.append(self._get_event({"namespace": namespace, "series": metrics}, payload_type)) if logs := self._report_logs(): - events.append({"payload": {"logs": list(logs)}, "request_type": TELEMETRY_EVENT_TYPE.LOGS}) + events.append(self._get_event({"logs": list(logs)}, TELEMETRY_EVENT_TYPE.LOGS)) # Queue metrics if not at heartbeat interval if self._is_periodic and not force_flush: if self._periodic_count < self._periodic_threshold: self._periodic_count += 1 if events: - self.add_events(events) + # list.extend() is an atomic operation in CPython, we don't need to lock the queue + self._events_queue.extend(events) return self._periodic_count = 0 # At heartbeat interval, collect and send all telemetry data if app_started_payload := self._report_app_started(): # app-started should be the first event in the batch - events = [{"payload": app_started_payload, "request_type": TELEMETRY_EVENT_TYPE.STARTED}] + events + events = [self._get_event(app_started_payload, TELEMETRY_EVENT_TYPE.STARTED)] + events if products := self._report_products(): - events.append({"payload": {"products": products}, "request_type": TELEMETRY_EVENT_TYPE.PRODUCT_CHANGE}) + events.append(self._get_event({"products": products}, TELEMETRY_EVENT_TYPE.PRODUCT_CHANGE)) if ints := self._report_integrations(): - events.append({"payload": {"integrations": ints}, "request_type": TELEMETRY_EVENT_TYPE.INTEGRATIONS_CHANGE}) + events.append(self._get_event({"integrations": ints}, TELEMETRY_EVENT_TYPE.INTEGRATIONS_CHANGE)) if endpoints := self._report_endpoints(): - events.append({"payload": endpoints, "request_type": TELEMETRY_EVENT_TYPE.ENDPOINTS}) + events.append(self._get_event(endpoints, TELEMETRY_EVENT_TYPE.ENDPOINTS)) if configs := self._report_configurations(): - events.append( - { - "payload": {"configuration": configs}, - "request_type": TELEMETRY_EVENT_TYPE.CLIENT_CONFIGURATION_CHANGE, - } - ) + events.append(self._get_event({"configuration": configs}, TELEMETRY_EVENT_TYPE.CLIENT_CONFIGURATION_CHANGE)) if deps := self._report_dependencies(): - events.append({"payload": {"dependencies": deps}, "request_type": TELEMETRY_EVENT_TYPE.DEPENDENCIES_LOADED}) + events.append(self._get_event({"dependencies": deps}, TELEMETRY_EVENT_TYPE.DEPENDENCIES_LOADED)) if shutting_down and not self._forked: - events.append({"payload": {}, "request_type": TELEMETRY_EVENT_TYPE.SHUTDOWN}) + events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.SHUTDOWN)) # Always include a heartbeat to keep RC connections alive # Extended heartbeat should be queued after app-dependencies-loaded event. This # ensures that that imported dependencies are accurately reported. if heartbeat_payload := self._report_heartbeat(): # Extended heartbeat report dependencies while regular heartbeats report empty payloads - events.append({"payload": heartbeat_payload, "request_type": TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT}) + events.append(self._get_event(heartbeat_payload, TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT)) else: - events.append({"payload": {}, "request_type": TELEMETRY_EVENT_TYPE.HEARTBEAT}) + events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.HEARTBEAT)) - # Get any queued events and combine with current batch + # Get any queued events (ie metrics and logs from previous periodic calls) and combine with current batch if queued_events := self._report_events(): events.extend(queued_events) @@ -667,7 +648,7 @@ def periodic(self, force_flush: bool = False, shutting_down: bool = False) -> No "application": get_application(config.SERVICE, config.VERSION, config.ENV), "host": get_host_info(), "payload": events, - "request_type": "message-batch", + "request_type": TELEMETRY_EVENT_TYPE.MESSAGE_BATCH.value, } self._client.send_event(batch_event, payload_types) @@ -751,7 +732,7 @@ def _telemetry_excepthook(self, tp, value, root_traceback) -> None: self.add_integration(integration_name, True, error_msg=error_msg) if app_started := self._report_app_started(False): - self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED}) + self._events_queue.append(self._get_event(app_started, TELEMETRY_EVENT_TYPE.STARTED)) self.app_shutdown() diff --git a/tests/telemetry/test_telemetry_metrics.py b/tests/telemetry/test_telemetry_metrics.py index e3548debf94..c9d6272e751 100644 --- a/tests/telemetry/test_telemetry_metrics.py +++ b/tests/telemetry/test_telemetry_metrics.py @@ -13,11 +13,11 @@ def _assert_metric( test_agent, expected_metrics, namespace=TELEMETRY_NAMESPACE.TRACERS, - type_paypload=TELEMETRY_EVENT_TYPE.METRICS, + type_payload=TELEMETRY_EVENT_TYPE.METRICS, ): assert len(expected_metrics) > 0, "expected_metrics should not be empty" test_agent.telemetry_writer.periodic(force_flush=True) - metrics_events = test_agent.get_events(type_paypload) + metrics_events = test_agent.get_events(type_payload.value) assert len(metrics_events) > 0, "captured metrics events should not be empty" metrics = [] @@ -290,7 +290,7 @@ def test_send_appsec_distributions_metric(telemetry_writer, test_agent_session, test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, + type_payload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) @@ -311,7 +311,7 @@ def test_send_metric_flush_and_distributions_series_is_restarted(telemetry_write test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, + type_payload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) expected_series = [ @@ -328,7 +328,7 @@ def test_send_metric_flush_and_distributions_series_is_restarted(telemetry_write test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, + type_payload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 5e8293234c7..03bf3750532 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -26,41 +26,6 @@ from tests.utils import override_global_config -def test_add_event(telemetry_writer, test_agent_session, mock_time): - """asserts that add_event queues a telemetry request with valid headers and payload""" - payload = {"test": "123"} - payload_type = "test-event" - # add event to the queue - telemetry_writer.add_event(payload, payload_type) - # send request to the agent - telemetry_writer.periodic(force_flush=True) - - requests = test_agent_session.get_requests() - assert len(requests) == 1 - assert requests[0]["headers"]["Content-Type"] == "application/json" - assert requests[0]["headers"]["DD-Client-Library-Language"] == "python" - assert requests[0]["headers"]["DD-Client-Library-Version"] == _pep440_to_semver() - assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" - assert requests[0]["headers"]["DD-Telemetry-API-Version"] == "v2" - assert requests[0]["headers"]["DD-Telemetry-Debug-Enabled"] == "False" - - events = test_agent_session.get_events(payload_type) - assert len(events) == 1 - validate_request_body(events[0], payload, payload_type) - - -def test_add_event_disabled_writer(telemetry_writer, test_agent_session): - """asserts that add_event() does not create a telemetry request when telemetry writer is disabled""" - payload = {"test": "123"} - payload_type = "test-event" - # ensure events are not queued when telemetry is disabled - telemetry_writer.add_event(payload, payload_type) - - # ensure no request were sent - telemetry_writer.periodic(force_flush=True) - assert len(test_agent_session.get_events(payload_type)) == 1 - - @pytest.mark.parametrize( "env_var,value,expected_value", [ @@ -91,13 +56,8 @@ def test_app_started_event_configuration_override_asm( def test_app_started_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_started() queues a valid telemetry request which is then sent by periodic()""" with override_global_config(dict(_telemetry_dependency_collection=False)): - # queue an app started event - payload = telemetry_writer._report_app_started() - assert payload is not None, "app_started() did not return an event" - telemetry_writer.add_event(payload, "app-started") - # force a flush + # App started should be queued by the first periodic call telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests() assert len(requests) == 1 assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" From bdd56c82c1c035f075f3aefe35a47ea37cc2d824 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Fri, 3 Oct 2025 10:54:26 -0400 Subject: [PATCH 12/13] fix typing errors --- ddtrace/internal/telemetry/writer.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 545ee7e75b7..2b67b3c82a3 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -48,10 +48,10 @@ class LogData(dict): - def __hash__(self) -> int: + def __hash__(self): return hash((self["message"], self["level"], self.get("tags"), self.get("stack_trace"))) - def __eq__(self, other) -> bool: + def __eq__(self, other): return ( self["message"] == other["message"] and self["level"] == other["level"] @@ -164,8 +164,8 @@ def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) - self._configuration_queue: List[Dict] = [] self._imported_dependencies: Dict[str, str] = dict() self._modules_already_imported: Set[str] = set() - self._product_enablement = {product.value: False for product in TELEMETRY_APM_PRODUCT} - self._previous_product_enablement = {} + self._product_enablement: Dict[str, bool] = {product.value: False for product in TELEMETRY_APM_PRODUCT} + self._previous_product_enablement: Dict[str, bool] = {} self._extended_time = time.monotonic() # The extended heartbeat interval is set to 24 hours self._extended_heartbeat_interval = 3600 * 24 @@ -325,7 +325,7 @@ def _report_configurations(self) -> List[Dict]: self._configuration_queue = [] return configurations - def _report_dependencies(self) -> Optional[Dict[str, Any]]: + def _report_dependencies(self) -> Optional[List[Dict[str, Any]]]: """Adds events to report imports done since the last periodic run""" if not config.DEPENDENCY_COLLECTION or not self._enabled: return None From 1ea06b3a6a82801d61800f46ab9ba67df16995e8 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Fri, 10 Oct 2025 14:18:46 -0400 Subject: [PATCH 13/13] fix iast tests --- tests/appsec/iast/test_telemetry.py | 4 ++-- .../integrations/flask_tests/test_iast_flask_telemetry.py | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/appsec/iast/test_telemetry.py b/tests/appsec/iast/test_telemetry.py index 8f38bc0743c..64ba623bc63 100644 --- a/tests/appsec/iast/test_telemetry.py +++ b/tests/appsec/iast/test_telemetry.py @@ -242,7 +242,7 @@ def test_django_instrumented_metrics(telemetry_writer): _on_django_patch() metrics_result = telemetry_writer._namespace.flush() - metrics_source_tags_result = [metric["tags"][0] for metric in metrics_result["generate-metrics"]["iast"]] + metrics_source_tags_result = [metric["tags"][0] for metric in metrics_result[TELEMETRY_EVENT_TYPE.METRICS]["iast"]] assert len(metrics_source_tags_result) == 9 assert f"source_type:{origin_to_str(OriginType.HEADER_NAME)}" in metrics_source_tags_result @@ -261,4 +261,4 @@ def test_django_instrumented_metrics_iast_disabled(telemetry_writer): _on_django_patch() metrics_result = telemetry_writer._namespace.flush() - assert "iast" not in metrics_result["generate-metrics"] + assert "iast" not in metrics_result[TELEMETRY_EVENT_TYPE.METRICS] diff --git a/tests/appsec/integrations/flask_tests/test_iast_flask_telemetry.py b/tests/appsec/integrations/flask_tests/test_iast_flask_telemetry.py index 0442f2fc2a8..ced8306a733 100644 --- a/tests/appsec/integrations/flask_tests/test_iast_flask_telemetry.py +++ b/tests/appsec/integrations/flask_tests/test_iast_flask_telemetry.py @@ -1,6 +1,7 @@ from ddtrace.appsec._iast._handlers import _on_flask_patch from ddtrace.appsec._iast._taint_tracking import OriginType from ddtrace.appsec._iast._taint_tracking import origin_to_str +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE from tests.appsec.appsec_utils import flask_server from tests.utils import override_global_config @@ -20,11 +21,11 @@ def test_flask_instrumented_metrics(telemetry_writer): _on_flask_patch((2, 0, 0)) metrics_result = telemetry_writer._namespace.flush() - assert metrics_result["generate-metrics"]["iast"] + assert metrics_result[TELEMETRY_EVENT_TYPE.METRICS]["iast"] metrics_source_tags_result = [ metric["tags"][0] - for metric in metrics_result["generate-metrics"]["iast"] + for metric in metrics_result[TELEMETRY_EVENT_TYPE.METRICS]["iast"] if metric["metric"] == "instrumented.source" ] @@ -46,4 +47,4 @@ def test_flask_instrumented_metrics_iast_disabled(telemetry_writer): _on_flask_patch((2, 0, 0)) metrics_result = telemetry_writer._namespace.flush() - assert "iast" not in metrics_result["generate-metrics"] + assert "iast" not in metrics_result[TELEMETRY_EVENT_TYPE.METRICS]