diff --git a/ddtrace/internal/telemetry/constants.py b/ddtrace/internal/telemetry/constants.py index d853474fdaa..bab41bcfaa9 100644 --- a/ddtrace/internal/telemetry/constants.py +++ b/ddtrace/internal/telemetry/constants.py @@ -11,9 +11,20 @@ class TELEMETRY_NAMESPACE(Enum): PROFILER = "profiler" -TELEMETRY_TYPE_GENERATE_METRICS = "generate-metrics" -TELEMETRY_TYPE_DISTRIBUTION = "distributions" -TELEMETRY_TYPE_LOGS = "logs" +class TELEMETRY_EVENT_TYPE(Enum): + STARTED = "app-started" + SHUTDOWN = "app-closing" + HEARTBEAT = "app-heartbeat" + EXTENDED_HEARTBEAT = "app-extended-heartbeat" + DEPENDENCIES_LOADED = "app-dependencies-loaded" + PRODUCT_CHANGE = "app-product-change" + INTEGRATIONS_CHANGE = "app-integrations-change" + ENDPOINTS = "app-endpoints" + CLIENT_CONFIGURATION_CHANGE = "app-client-configuration-change" + LOGS = "logs" + METRICS = "generate-metrics" + DISTRIBUTIONS = "distributions" + MESSAGE_BATCH = "message-batch" class TELEMETRY_LOG_LEVEL(Enum): diff --git a/ddtrace/internal/telemetry/metrics_namespaces.pyx b/ddtrace/internal/telemetry/metrics_namespaces.pyx index 25060787aec..45c6c268dc4 100644 --- a/ddtrace/internal/telemetry/metrics_namespaces.pyx +++ b/ddtrace/internal/telemetry/metrics_namespaces.pyx @@ -6,8 +6,7 @@ from typing import Tuple from ddtrace.internal import forksafe from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE MetricTagType = Optional[Tuple[Tuple[str, str], ...]] @@ -46,14 +45,14 @@ cdef class MetricNamespace: now = int(time.time()) data = { - TELEMETRY_TYPE_GENERATE_METRICS: {}, - TELEMETRY_TYPE_DISTRIBUTION: {}, + TELEMETRY_EVENT_TYPE.METRICS: {}, + TELEMETRY_EVENT_TYPE.DISTRIBUTIONS: {}, } for metric_id, value in namespace_metrics.items(): name, namespace, _tags, metric_type = metric_id tags = ["{}:{}".format(k, v).lower() for k, v in _tags] if _tags else [] if metric_type is MetricType.DISTRIBUTION: - data[TELEMETRY_TYPE_DISTRIBUTION].setdefault(namespace, []).append({ + data[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS].setdefault(namespace, []).append({ "metric": name, "points": value, "tags": tags, @@ -70,7 +69,7 @@ cdef class MetricNamespace: } if metric_type in (MetricType.RATE, MetricType.GAUGE): metric["interval"] = _interval - data[TELEMETRY_TYPE_GENERATE_METRICS].setdefault(namespace, []).append(metric) + data[TELEMETRY_EVENT_TYPE.METRICS].setdefault(namespace, []).append(metric) return data diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 09ceb9273b8..2b67b3c82a3 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -1,18 +1,17 @@ # -*- coding: utf-8 -*- -import http.client as httplib # noqa: E402 +import http.client as httplib import itertools import os import sys import time import traceback -from typing import TYPE_CHECKING # noqa:F401 -from typing import Any # noqa:F401 -from typing import Dict # noqa:F401 -from typing import List # noqa:F401 -from typing import Optional # noqa:F401 -from typing import Set # noqa:F401 -from typing import Tuple # noqa:F401 -from typing import Union # noqa:F401 +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Set +from typing import Tuple +from typing import Union import urllib.parse as parse from ddtrace.internal.endpoints import endpoint_collection @@ -32,9 +31,9 @@ from ..utils.version import version as tracer_version from . import modules from .constants import TELEMETRY_APM_PRODUCT +from .constants import TELEMETRY_EVENT_TYPE from .constants import TELEMETRY_LOG_LEVEL from .constants import TELEMETRY_NAMESPACE -from .constants import TELEMETRY_TYPE_LOGS from .data import get_application from .data import get_host_info from .data import get_python_config_vars @@ -65,8 +64,7 @@ class _TelemetryClient: AGENT_ENDPOINT = "telemetry/proxy/api/v2/apmtelemetry" AGENTLESS_ENDPOINT_V2 = "api/v2/apmtelemetry" - def __init__(self, agentless): - # type: (bool) -> None + def __init__(self, agentless: bool) -> None: self._telemetry_url = self.get_host(config.SITE, agentless) self._endpoint = self.get_endpoint(agentless) self._encoder = JSONEncoderV2() @@ -82,10 +80,10 @@ def __init__(self, agentless): self._headers["dd-api-key"] = config.API_KEY @property - def url(self): + def url(self) -> str: return parse.urljoin(self._telemetry_url, self._endpoint) - def send_event(self, request: Dict) -> Optional[httplib.HTTPResponse]: + def send_event(self, request: Dict, payload_type: str) -> Optional[httplib.HTTPResponse]: """Sends a telemetry request to the trace agent""" resp = None conn = None @@ -98,11 +96,11 @@ def send_event(self, request: Dict) -> Optional[httplib.HTTPResponse]: resp = conn.getresponse() if resp.status < 300: log.debug( - "Instrumentation Telemetry sent %d bytes in %.5fs to %s. Event: %s. Response: %s", + "Instrumentation Telemetry sent %d bytes in %.5fs to %s. Event(s): %s. Response: %s", len(rb_json), sw.elapsed(), self.url, - request["request_type"], + payload_type, resp.status, ) else: @@ -114,8 +112,7 @@ def send_event(self, request: Dict) -> Optional[httplib.HTTPResponse]: conn.close() return resp - def get_headers(self, request): - # type: (Dict) -> Dict + def get_headers(self, request: Dict) -> Dict: """Get all telemetry api v2 request headers""" headers = self._headers.copy() headers["DD-Telemetry-Debug-Enabled"] = request["debug"] @@ -150,8 +147,7 @@ class TelemetryWriter(PeriodicService): _ORIGINAL_EXCEPTHOOK = staticmethod(sys.excepthook) CWD = os.getcwd() - def __init__(self, is_periodic=True, agentless=None): - # type: (bool, Optional[bool]) -> None + def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) -> None: super(TelemetryWriter, self).__init__(interval=min(config.HEARTBEAT_INTERVAL, 10)) # Decouples the aggregation and sending of the telemetry events @@ -160,16 +156,16 @@ def __init__(self, is_periodic=True, agentless=None): self._periodic_threshold = int(config.HEARTBEAT_INTERVAL // self.interval) - 1 self._periodic_count = 0 self._is_periodic = is_periodic - self._integrations_queue = dict() # type: Dict[str, Dict] + self._integrations_queue: Dict[str, Dict] = dict() self._namespace = MetricNamespace() - self._logs = set() # type: Set[Dict[str, Any]] - self._forked = False # type: bool - self._events_queue = [] # type: List[Dict] - self._configuration_queue = [] # type: List[Dict] + self._logs: Set[Dict[str, Any]] = set() + self._forked: bool = False + self._events_queue: List[Dict[str, Any]] = [] + self._configuration_queue: List[Dict] = [] self._imported_dependencies: Dict[str, str] = dict() self._modules_already_imported: Set[str] = set() - self._product_enablement = {product.value: False for product in TELEMETRY_APM_PRODUCT} - self._send_product_change_updates = False + self._product_enablement: Dict[str, bool] = {product.value: False for product in TELEMETRY_APM_PRODUCT} + self._previous_product_enablement: Dict[str, bool] = {} self._extended_time = time.monotonic() # The extended heartbeat interval is set to 24 hours self._extended_heartbeat_interval = 3600 * 24 @@ -202,12 +198,11 @@ def __init__(self, is_periodic=True, agentless=None): # This will occur when the agent writer starts. self.enable() # Force app started for unit tests - if config.FORCE_START and (app_started := self._app_started()): - self._events_queue.append(app_started) + if config.FORCE_START and (app_started := self._report_app_started()): + self._events_queue.append(self._get_event(app_started, TELEMETRY_EVENT_TYPE.STARTED)) get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self)) - def enable(self): - # type: () -> bool + def enable(self) -> bool: """ Enable the instrumentation telemetry collection service. If the service has already been activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service. @@ -226,8 +221,12 @@ def enable(self): self.status = ServiceStatus.RUNNING return True - def disable(self): - # type: () -> None + def _get_event( + self, payload: Union[Dict[str, Any], List[Any]], payload_type: TELEMETRY_EVENT_TYPE + ) -> Dict[str, Any]: + return {"payload": payload, "request_type": payload_type.value} + + def disable(self) -> None: """ Disable the telemetry collection service and drop the existing integrations and events Once disabled, telemetry collection can not be re-enabled. @@ -235,41 +234,24 @@ def disable(self): self._enabled = False self.reset_queues() - def enable_agentless_client(self, enabled=True): - # type: (bool) -> None - + def enable_agentless_client(self, enabled: bool = True) -> None: if self._client._agentless == enabled: return self._client = _TelemetryClient(enabled) - def _is_running(self): - # type: () -> bool + def _is_running(self) -> bool: """Returns True when the telemetry writer worker thread is running""" return self._is_periodic and self._worker is not None and self.status is ServiceStatus.RUNNING - def add_event(self, payload, payload_type): - # type: (Union[Dict[str, Any], List[Any]], str) -> None - """ - Adds a Telemetry event to the TelemetryWriter event buffer - - :param Dict payload: stores a formatted telemetry event - :param str payload_type: The payload_type denotes the type of telemetry request. - Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change - """ - if self.enable(): - self._events_queue.append({"payload": payload, "request_type": payload_type}) - - def add_events(self, events): - # type: (List[Dict[str, Any]]) -> None - """ - Adds a list of Telemetry events to the TelemetryWriter event buffer - """ - if self.enable(): - self._events_queue.extend(events) - - def add_integration(self, integration_name, patched, auto_patched=None, error_msg=None, version=""): - # type: (str, bool, Optional[bool], Optional[str], Optional[str]) -> None + def add_integration( + self, + integration_name: str, + patched: bool, + auto_patched: Optional[bool] = None, + error_msg: Optional[str] = None, + version: str = "", + ) -> None: """ Creates and queues the names and settings of a patched module @@ -294,7 +276,7 @@ def add_integration(self, integration_name, patched, auto_patched=None, error_ms self._integrations_queue[integration_name]["compatible"] = error_msg == "" self._integrations_queue[integration_name]["error"] = error_msg - def _app_started(self, register_app_shutdown: bool = True) -> Optional[Dict[str, Any]]: + def _report_app_started(self, register_app_shutdown: bool = True) -> Optional[Dict[str, Any]]: """Sent when TelemetryWriter is enabled or forks""" if self._forked or self.started: # app-started events should only be sent by the main process @@ -303,18 +285,13 @@ def _app_started(self, register_app_shutdown: bool = True) -> Optional[Dict[str, self.started = True - products = { - product: {"version": tracer_version, "enabled": status} - for product, status in self._product_enablement.items() - } - # SOABI should help us identify which wheels people are getting from PyPI self.add_configurations(get_python_config_vars()) payload = { - "configuration": self._flush_configuration_queue(), - "products": products, - } # type: Dict[str, Union[Dict[str, Any], List[Any]]] + "configuration": self._report_configurations(), + "products": self._report_products(), + } # Add time to value telemetry metrics for single step instrumentation if config.INSTALL_ID or config.INSTALL_TYPE or config.INSTALL_TIME: payload["install_signature"] = { @@ -322,86 +299,44 @@ def _app_started(self, register_app_shutdown: bool = True) -> Optional[Dict[str, "install_type": config.INSTALL_TYPE, "install_time": config.INSTALL_TIME, } + return payload - return {"payload": payload, "request_type": "app-started"} - - def _app_heartbeat_event(self): - # type: () -> Dict[str, Any] + def _report_heartbeat(self) -> Optional[Dict[str, Any]]: if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: self._extended_time += self._extended_heartbeat_interval - # Extended heartbeat event must be queued after the dependencies loaded event. - # Otherwise, self._imported_dependencies will not be up to date. - payload = { + return { "dependencies": [ {"name": name, "version": version} for name, version in self._imported_dependencies.items() ] } - request_type = "app-extended-heartbeat" - else: - payload = {} - request_type = "app-heartbeat" - return {"payload": payload, "request_type": request_type} - - def _app_closing_event(self): - # type: () -> Optional[Dict[str, Any]] - """Adds a Telemetry event which notifies the agent that an application instance has terminated""" - if self._forked: - # app-closing event should only be sent by the main process - return None - return {"payload": {}, "request_type": "app-closing"} - - def _app_integrations_changed_event(self, integrations): - # type: (List[Dict]) -> Dict[str, Any] - """Adds a Telemetry event which sends a list of configured integrations to the agent""" - return { - "payload": { - "integrations": integrations, - }, - "request_type": "app-integrations-change", - } + return None - def _flush_integrations_queue(self): - # type: () -> List[Dict] + def _report_integrations(self) -> List[Dict]: """Flushes and returns a list of all queued integrations""" with self._service_lock: integrations = list(self._integrations_queue.values()) self._integrations_queue = dict() return integrations - def _flush_configuration_queue(self): - # type: () -> List[Dict] + def _report_configurations(self) -> List[Dict]: """Flushes and returns a list of all queued configurations""" with self._service_lock: configurations = self._configuration_queue self._configuration_queue = [] return configurations - def _app_client_configuration_changed_event(self, configurations): - # type: (List[Dict]) -> Dict[str, Any] - """Adds a Telemetry event which sends list of modified configurations to the agent""" - return { - "payload": { - "configuration": configurations, - }, - "request_type": "app-client-configuration-change", - } - - def _app_dependencies_loaded_event(self) -> Optional[Dict[str, Any]]: + def _report_dependencies(self) -> Optional[List[Dict[str, Any]]]: """Adds events to report imports done since the last periodic run""" if not config.DEPENDENCY_COLLECTION or not self._enabled: return None - with self._service_lock: - newly_imported_deps = modules.get_newly_imported_modules(self._modules_already_imported) - - if not newly_imported_deps: - return None with self._service_lock: - if packages := update_imported_dependencies(self._imported_dependencies, newly_imported_deps): - return {"payload": {"dependencies": packages}, "request_type": "app-dependencies-loaded"} - return None + newly_imported_deps = modules.get_newly_imported_modules(self._modules_already_imported) + if not newly_imported_deps: + return None + return update_imported_dependencies(self._imported_dependencies, newly_imported_deps) - def _flush_app_endpoints(self) -> Optional[Dict[str, Any]]: + def _report_endpoints(self) -> Optional[Dict[str, Any]]: """Adds a Telemetry event which sends the list of HTTP endpoints found at startup to the agent""" import ddtrace.settings.asm as asm_config_module @@ -412,42 +347,31 @@ def _flush_app_endpoints(self) -> Optional[Dict[str, Any]]: return None with self._service_lock: - payload = endpoint_collection.flush(asm_config_module.config._api_security_endpoint_collection_limit) - return {"payload": payload, "request_type": "app-endpoints"} + return endpoint_collection.flush(asm_config_module.config._api_security_endpoint_collection_limit) - def _app_product_change(self): - # type: () -> Optional[Dict[str, Any]] + def _report_products(self) -> Dict[str, Any]: """Adds a Telemetry event which reports the enablement of an APM product""" + with self._service_lock: + products = self._product_enablement.items() + for product, status in products: + self._previous_product_enablement[product] = status + self._product_enablement = {} + return {product: {"version": tracer_version, "enabled": status} for product, status in products} - if not self._send_product_change_updates: - return None - - self._send_product_change_updates = False - return { - "payload": { - "products": { - product: {"version": tracer_version, "enabled": status} - for product, status in self._product_enablement.items() - } - }, - "request_type": "app-product-change", - } - - def product_activated(self, product, enabled): - # type: (str, bool) -> None + def product_activated(self, product: str, status: bool) -> None: """Updates the product enablement dict""" - - if self._product_enablement.get(product, False) is enabled: - return - - self._product_enablement[product] = enabled - - # If the app hasn't started, the product status will be included in the app_started event's payload - if self.started: - self._send_product_change_updates = True - - def add_configuration(self, configuration_name, configuration_value, origin="unknown", config_id=None): - # type: (str, Any, str, Optional[str]) -> None + with self._service_lock: + # Only send product change event if the product status has changed + if self._previous_product_enablement.get(product) != status: + self._product_enablement[product] = status + + def add_configuration( + self, + configuration_name: str, + configuration_value: Any, + origin: str = "unknown", + config_id: Optional[str] = None, + ) -> None: """Creates and queues the name, origin, value of a configuration""" if isinstance(configuration_value, dict): configuration_value = ",".join(":".join((k, str(v))) for k, v in configuration_value.items()) @@ -469,7 +393,7 @@ def add_configuration(self, configuration_name, configuration_value, origin="unk config["seq_id"] = next(self._sequence_configurations) self._configuration_queue.append(config) - def add_configurations(self, configuration_list: List[Tuple[str, str, str]]): + def add_configurations(self, configuration_list: List[Tuple[str, str, str]]) -> None: """Creates and queues a list of configurations""" with self._service_lock: for name, value, origin in configuration_list: @@ -482,7 +406,7 @@ def add_configurations(self, configuration_list: List[Tuple[str, str, str]]): } ) - def add_log(self, level, message, stack_trace="", tags=None): + def add_log(self, level, message: str, stack_trace: str = "", tags: Optional[Dict] = None) -> None: """ Queues log. This event is meant to send library logs to Datadog's backend through the Telemetry intake. This will make support cycles easier and ensure we know about potentially silent issues in libraries. @@ -554,7 +478,7 @@ def _format_file_path(self, filename: str) -> str: def add_gauge_metric( self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: Optional[MetricTagType] = None - ): + ) -> None: """ Queues gauge metric """ @@ -569,7 +493,7 @@ def add_gauge_metric( def add_rate_metric( self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: Optional[MetricTagType] = None - ): + ) -> None: """ Queues rate metric """ @@ -584,7 +508,7 @@ def add_rate_metric( def add_count_metric( self, namespace: TELEMETRY_NAMESPACE, name: str, value: int = 1, tags: Optional[MetricTagType] = None - ): + ) -> None: """ Queues count metric """ @@ -599,7 +523,7 @@ def add_count_metric( def add_distribution_metric( self, namespace: TELEMETRY_NAMESPACE, name: str, value: float, tags: Optional[MetricTagType] = None - ): + ) -> None: """ Queues distributions metric """ @@ -612,42 +536,19 @@ def add_distribution_metric( tags, ) - def _flush_log_metrics(self): - # type () -> Set[Metric] + def _report_logs(self) -> Set[Dict[str, Any]]: with self._service_lock: - log_metrics = self._logs + logs = self._logs self._logs = set() - return log_metrics - - def _generate_metrics_events(self, namespace_metrics): - # type: (Dict[str, Dict[str, List[Dict[str, Any]]]]) -> List[Dict[str, Any]] - metric_payloads = [] - for payload_type, namespaces in namespace_metrics.items(): - for namespace, metrics in namespaces.items(): - if metrics: - metric_payloads.append( - { - "payload": { - "namespace": namespace, - "series": metrics, - }, - "request_type": payload_type, - } - ) - return metric_payloads - - def _generate_logs_event(self, logs): - # type: (Set[Dict[str, str]]) -> Dict[str, Any] - log.debug("%s request payload", TELEMETRY_TYPE_LOGS) - return {"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS} + return logs - def _dispatch(self): + def _dispatch(self) -> None: # moved core here to avoid circular import from ddtrace.internal import core core.dispatch("telemetry.periodic") - def periodic(self, force_flush=False, shutting_down=False): + def periodic(self, force_flush: bool = False, shutting_down: bool = False) -> None: """Process and send telemetry events in batches. This method handles the periodic collection and sending of telemetry data with two main timing intervals: @@ -665,7 +566,7 @@ def periodic(self, force_flush=False, shutting_down=False): - Collects configuration changes - Collects dependency changes - Collects stored events (ex: metrics and logs) - - Sends everything as a single batch + - Sends everything as a single batched request Args: force_flush: If True, bypasses the heartbeat interval check and sends immediately @@ -675,56 +576,68 @@ def periodic(self, force_flush=False, shutting_down=False): - Metrics are collected every 10 seconds to ensure accurate time-based data - All data is sent in a single batch every 60 seconds to minimize network overhead - A heartbeat event is always included to keep RC connections alive + - Multiple event types are combined into a single message-batch request """ + self._dispatch() # Collect metrics and logs that have accumulated since last batch events = [] if namespace_metrics := self._namespace.flush(float(self.interval)): - if metrics_events := self._generate_metrics_events(namespace_metrics): - events.extend(metrics_events) + for payload_type, namespaces in namespace_metrics.items(): + for namespace, metrics in namespaces.items(): + if metrics: + events.append(self._get_event({"namespace": namespace, "series": metrics}, payload_type)) - if logs_metrics := self._flush_log_metrics(): - events.append(self._generate_logs_event(logs_metrics)) + if logs := self._report_logs(): + events.append(self._get_event({"logs": list(logs)}, TELEMETRY_EVENT_TYPE.LOGS)) # Queue metrics if not at heartbeat interval - if self._is_periodic and force_flush is False: + if self._is_periodic and not force_flush: if self._periodic_count < self._periodic_threshold: self._periodic_count += 1 if events: - self.add_events(events) + # list.extend() is an atomic operation in CPython, we don't need to lock the queue + self._events_queue.extend(events) return self._periodic_count = 0 # At heartbeat interval, collect and send all telemetry data - if app_started := self._app_started(): + if app_started_payload := self._report_app_started(): # app-started should be the first event in the batch - events = [app_started] + events + events = [self._get_event(app_started_payload, TELEMETRY_EVENT_TYPE.STARTED)] + events - if app_product_change := self._app_product_change(): - events.append(app_product_change) + if products := self._report_products(): + events.append(self._get_event({"products": products}, TELEMETRY_EVENT_TYPE.PRODUCT_CHANGE)) - if integrations := self._flush_integrations_queue(): - events.append(self._app_integrations_changed_event(integrations)) + if ints := self._report_integrations(): + events.append(self._get_event({"integrations": ints}, TELEMETRY_EVENT_TYPE.INTEGRATIONS_CHANGE)) - if endpoints_payload := self._flush_app_endpoints(): - events.append(endpoints_payload) + if endpoints := self._report_endpoints(): + events.append(self._get_event(endpoints, TELEMETRY_EVENT_TYPE.ENDPOINTS)) - if configurations := self._flush_configuration_queue(): - events.append(self._app_client_configuration_changed_event(configurations)) + if configs := self._report_configurations(): + events.append(self._get_event({"configuration": configs}, TELEMETRY_EVENT_TYPE.CLIENT_CONFIGURATION_CHANGE)) - if app_dependencies_loaded := self._app_dependencies_loaded_event(): - events.append(app_dependencies_loaded) + if deps := self._report_dependencies(): + events.append(self._get_event({"dependencies": deps}, TELEMETRY_EVENT_TYPE.DEPENDENCIES_LOADED)) - if shutting_down and (app_closing := self._app_closing_event()): - events.append(app_closing) + if shutting_down and not self._forked: + events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.SHUTDOWN)) # Always include a heartbeat to keep RC connections alive - events.append(self._app_heartbeat_event()) + # Extended heartbeat should be queued after app-dependencies-loaded event. This + # ensures that that imported dependencies are accurately reported. + if heartbeat_payload := self._report_heartbeat(): + # Extended heartbeat report dependencies while regular heartbeats report empty payloads + events.append(self._get_event(heartbeat_payload, TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT)) + else: + events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.HEARTBEAT)) - # Get any queued events and combine with current batch - if queued_events := self._flush_events_queue(): + # Get any queued events (ie metrics and logs from previous periodic calls) and combine with current batch + if queued_events := self._report_events(): events.extend(queued_events) - log.debug("Encoding instrumentation telemetry events: %s", ", ".join([e["request_type"] for e in events])) + # Create comma-separated list of event types for logging + payload_types = ", ".join([event["request_type"] for event in events]) # Prepare and send the final batch batch_event = { "tracer_time": int(time.time()), @@ -735,18 +648,16 @@ def periodic(self, force_flush=False, shutting_down=False): "application": get_application(config.SERVICE, config.VERSION, config.ENV), "host": get_host_info(), "payload": events, - "request_type": "message-batch", + "request_type": TELEMETRY_EVENT_TYPE.MESSAGE_BATCH.value, } - self._dispatch() - self._client.send_event(batch_event) + self._client.send_event(batch_event, payload_types) - def app_shutdown(self): + def app_shutdown(self) -> None: if self.started: self.periodic(force_flush=True, shutting_down=True) self.disable() - def reset_queues(self): - # type: () -> None + def reset_queues(self) -> None: self._events_queue = [] self._integrations_queue = dict() self._namespace.flush() @@ -754,16 +665,14 @@ def reset_queues(self): self._imported_dependencies = {} self._configuration_queue = [] - def _flush_events_queue(self): - # type: () -> List[Dict] + def _report_events(self) -> List[Dict]: """Flushes and returns a list of all telemtery event""" with self._service_lock: events = self._events_queue self._events_queue = [] return events - def _fork_writer(self): - # type: () -> None + def _fork_writer(self) -> None: self._forked = True # Avoid sending duplicate events. # Queued events should be sent in the main process. @@ -777,17 +686,16 @@ def _fork_writer(self): # error in Python 3.12 self.enable() - def _restart_sequence(self): + def _restart_sequence(self) -> None: self._sequence_payloads = itertools.count(1) self._sequence_configurations = itertools.count(1) - def _stop_service(self, join=True, *args, **kwargs): - # type: (...) -> None + def _stop_service(self, join: bool = True, *args, **kwargs) -> None: super(TelemetryWriter, self)._stop_service(*args, **kwargs) if join: self.join(timeout=2) - def _telemetry_excepthook(self, tp, value, root_traceback): + def _telemetry_excepthook(self, tp, value, root_traceback) -> None: if root_traceback is not None: # Get the frame which raised the exception traceback = root_traceback @@ -823,17 +731,17 @@ def _telemetry_excepthook(self, tp, value, root_traceback): error_msg = "{}:{} {}".format(filename, lineno, str(value)) self.add_integration(integration_name, True, error_msg=error_msg) - if self._enabled and not self.started and (app_started := self._app_started()): - self._events_queue.append(app_started) + if app_started := self._report_app_started(False): + self._events_queue.append(self._get_event(app_started, TELEMETRY_EVENT_TYPE.STARTED)) self.app_shutdown() return TelemetryWriter._ORIGINAL_EXCEPTHOOK(tp, value, root_traceback) - def install_excepthook(self): + def install_excepthook(self) -> None: """Install a hook that intercepts unhandled exception and send metrics about them.""" sys.excepthook = self._telemetry_excepthook - def uninstall_excepthook(self): + def uninstall_excepthook(self) -> None: """Uninstall the global tracer except hook.""" sys.excepthook = TelemetryWriter._ORIGINAL_EXCEPTHOOK diff --git a/tests/appsec/appsec/test_telemetry.py b/tests/appsec/appsec/test_telemetry.py index a208c12b694..a07a0b8e06d 100644 --- a/tests/appsec/appsec/test_telemetry.py +++ b/tests/appsec/appsec/test_telemetry.py @@ -15,9 +15,8 @@ from ddtrace.constants import APPSEC_ENV from ddtrace.contrib.internal.trace_utils import set_http_meta from ddtrace.ext import SpanTypes +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS from ddtrace.trace import tracer import tests.appsec.rules as rules from tests.appsec.utils import asm_context @@ -38,7 +37,7 @@ def _assert_generate_metrics(metrics_result, is_rule_triggered=False, is_blocked # this function and make the tests flaky. That's why we exclude the "enabled" metric from this assert generate_metrics = [ m - for m in metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.APPSEC.value] + for m in metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.APPSEC.value] if m["metric"] != "enabled" ] assert ( @@ -76,7 +75,7 @@ def _assert_generate_metrics(metrics_result, is_rule_triggered=False, is_blocked def _assert_distributions_metrics(metrics_result, is_rule_triggered=False, is_blocked_request=False): - distributions_metrics = metrics_result[TELEMETRY_TYPE_DISTRIBUTION][TELEMETRY_NAMESPACE.APPSEC.value] + distributions_metrics = metrics_result[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS][TELEMETRY_NAMESPACE.APPSEC.value] assert len(distributions_metrics) == 2, "Expected 2 distributions_metrics" for metric in distributions_metrics: @@ -101,8 +100,8 @@ def test_metrics_when_appsec_doesnt_runs(telemetry_writer, tracer): rules.Config(), ) metrics_data = telemetry_writer._namespace.flush() - assert len(metrics_data[TELEMETRY_TYPE_GENERATE_METRICS]) == 0 - assert len(metrics_data[TELEMETRY_TYPE_DISTRIBUTION]) == 0 + assert len(metrics_data[TELEMETRY_EVENT_TYPE.METRICS]) == 0 + assert len(metrics_data[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS]) == 0 def test_metrics_when_appsec_runs(telemetry_writer, tracer): @@ -185,7 +184,7 @@ def test_log_metric_error_ddwaf_timeout(telemetry_writer, tracer): list_metrics_logs = list(telemetry_writer._logs) assert len(list_metrics_logs) == 0 - generate_metrics = telemetry_writer._namespace.flush()[TELEMETRY_TYPE_GENERATE_METRICS][ + generate_metrics = telemetry_writer._namespace.flush()[TELEMETRY_EVENT_TYPE.METRICS][ TELEMETRY_NAMESPACE.APPSEC.value ] @@ -228,7 +227,7 @@ def test_log_metric_error_ddwaf_internal_error(telemetry_writer): assert len(list_telemetry_logs) == 0 assert span.get_tag("_dd.appsec.waf.error") == "-3" metrics_result = telemetry_writer._namespace.flush() - list_telemetry_metrics = metrics_result.get(TELEMETRY_TYPE_GENERATE_METRICS, {}).get( + list_telemetry_metrics = metrics_result.get(TELEMETRY_EVENT_TYPE.METRICS, {}).get( TELEMETRY_NAMESPACE.APPSEC.value, {} ) error_metrics = [m for m in list_telemetry_metrics if m["metric"] == "waf.error"] @@ -301,7 +300,7 @@ def test_appsec_enabled_metric( # Restore defaults and enabling telemetry appsec service with override_global_config({"_asm_enabled": True, "_lib_was_injected": False}): tracer.configure(appsec_enabled=appsec_enabled, appsec_enabled_origin=APPSEC.ENABLED_ORIGIN_UNKNOWN) - telemetry_writer._flush_configuration_queue() + telemetry_writer._report_configurations() # Start the test with override_env(environment), override_global_config( @@ -313,7 +312,7 @@ def test_appsec_enabled_metric( telemetry_writer._dispatch() - metrics_result = telemetry_writer._flush_configuration_queue() + metrics_result = telemetry_writer._report_configurations() assert metrics_result == [ {"name": "DD_APPSEC_ENABLED", "origin": expected_origin, "seq_id": ANY, "value": expected_result} ] diff --git a/tests/appsec/iast/test_telemetry.py b/tests/appsec/iast/test_telemetry.py index f36ee063e5d..8f38bc0743c 100644 --- a/tests/appsec/iast/test_telemetry.py +++ b/tests/appsec/iast/test_telemetry.py @@ -26,8 +26,8 @@ from ddtrace.appsec._iast.taint_sinks.weak_hash import patch as weak_hash_patch from ddtrace.appsec._iast.taint_sinks.xss import patch as xss_patch from ddtrace.ext import SpanTypes +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS from tests.appsec.iast.iast_utils import _iast_patched_module from tests.appsec.utils import asm_context from tests.utils import DummyTracer @@ -36,7 +36,7 @@ def _assert_instrumented_sink(telemetry_writer, vuln_type): metrics_result = telemetry_writer._namespace.flush() - generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value] + generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value] assert len(generate_metrics) == 1, "Expected 1 generate_metrics" assert [metric["metric"] for metric in generate_metrics] == ["instrumented.sink"] assert [metric["tags"] for metric in generate_metrics] == [[f"vulnerability_type:{vuln_type.lower()}"]] @@ -102,7 +102,7 @@ def test_metric_executed_sink( metrics_result = telemetry_writer._namespace.flush() _testing_unpatch_iast() - generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value] + generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value] assert len(generate_metrics) == 1 # Remove potential sinks from internal usage of the lib (like http.client, used to communicate with # the agent) @@ -139,7 +139,7 @@ def test_metric_instrumented_propagation(no_request_sampling, telemetry_writer): _iast_patched_module("benchmarks.bm.iast_fixtures.str_methods") metrics_result = telemetry_writer._namespace.flush() - generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value] + generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value] # Remove potential sinks from internal usage of the lib (like http.client, used to communicate with # the agent) filtered_metrics = [ @@ -167,7 +167,7 @@ def test_metric_request_tainted(no_request_sampling, telemetry_writer): metrics_result = telemetry_writer._namespace.flush() - generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value] + generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value] # Remove potential sinks from internal usage of the lib (like http.client, used to communicate with # the agent) filtered_metrics = [metric["metric"] for metric in generate_metrics if metric["metric"] != "executed.sink"] diff --git a/tests/telemetry/app.py b/tests/telemetry/app.py index ae2b9932c9f..530e98dfe75 100644 --- a/tests/telemetry/app.py +++ b/tests/telemetry/app.py @@ -16,7 +16,7 @@ def index(): def starting_app_view(): # We must call app-started before telemetry events can be sent to the agent. # This endpoint mocks the behavior of the agent writer. - telemetry_writer._app_started() + telemetry_writer._report_app_started() return "OK", 200 diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 783f258cabb..9a439f5a6e3 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -37,9 +37,9 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess): if os.fork() == 0: # Send multiple started events to confirm none get sent - telemetry_writer._app_started() - telemetry_writer._app_started() - telemetry_writer._app_started() + telemetry_writer._report_app_started() + telemetry_writer._report_app_started() + telemetry_writer._report_app_started() else: # Print the parent process runtime id for validation print(get_runtime_id()) diff --git a/tests/telemetry/test_telemetry_metrics.py b/tests/telemetry/test_telemetry_metrics.py index 2cf3d78f014..c9d6272e751 100644 --- a/tests/telemetry/test_telemetry_metrics.py +++ b/tests/telemetry/test_telemetry_metrics.py @@ -3,10 +3,9 @@ from mock.mock import ANY +from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION -from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS from tests.utils import override_global_config @@ -14,11 +13,11 @@ def _assert_metric( test_agent, expected_metrics, namespace=TELEMETRY_NAMESPACE.TRACERS, - type_paypload=TELEMETRY_TYPE_GENERATE_METRICS, + type_payload=TELEMETRY_EVENT_TYPE.METRICS, ): assert len(expected_metrics) > 0, "expected_metrics should not be empty" test_agent.telemetry_writer.periodic(force_flush=True) - metrics_events = test_agent.get_events(type_paypload) + metrics_events = test_agent.get_events(type_payload.value) assert len(metrics_events) > 0, "captured metrics events should not be empty" metrics = [] @@ -291,7 +290,7 @@ def test_send_appsec_distributions_metric(telemetry_writer, test_agent_session, test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_TYPE_DISTRIBUTION, + type_payload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) @@ -312,7 +311,7 @@ def test_send_metric_flush_and_distributions_series_is_restarted(telemetry_write test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_TYPE_DISTRIBUTION, + type_payload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) expected_series = [ @@ -329,7 +328,7 @@ def test_send_metric_flush_and_distributions_series_is_restarted(telemetry_write test_agent_session, expected_series, namespace=TELEMETRY_NAMESPACE.APPSEC, - type_paypload=TELEMETRY_TYPE_DISTRIBUTION, + type_payload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS, ) diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index b84cf0eda94..ecb169feddd 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -26,41 +26,6 @@ from tests.utils import override_global_config -def test_add_event(telemetry_writer, test_agent_session, mock_time): - """asserts that add_event queues a telemetry request with valid headers and payload""" - payload = {"test": "123"} - payload_type = "test-event" - # add event to the queue - telemetry_writer.add_event(payload, payload_type) - # send request to the agent - telemetry_writer.periodic(force_flush=True) - - requests = test_agent_session.get_requests() - assert len(requests) == 1 - assert requests[0]["headers"]["Content-Type"] == "application/json" - assert requests[0]["headers"]["DD-Client-Library-Language"] == "python" - assert requests[0]["headers"]["DD-Client-Library-Version"] == _pep440_to_semver() - assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" - assert requests[0]["headers"]["DD-Telemetry-API-Version"] == "v2" - assert requests[0]["headers"]["DD-Telemetry-Debug-Enabled"] == "False" - - events = test_agent_session.get_events(payload_type) - assert len(events) == 1 - validate_request_body(events[0], payload, payload_type) - - -def test_add_event_disabled_writer(telemetry_writer, test_agent_session): - """asserts that add_event() does not create a telemetry request when telemetry writer is disabled""" - payload = {"test": "123"} - payload_type = "test-event" - # ensure events are not queued when telemetry is disabled - telemetry_writer.add_event(payload, payload_type) - - # ensure no request were sent - telemetry_writer.periodic(force_flush=True) - assert len(test_agent_session.get_events(payload_type)) == 1 - - @pytest.mark.parametrize( "env_var,value,expected_value", [ @@ -91,13 +56,8 @@ def test_app_started_event_configuration_override_asm( def test_app_started_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_started() queues a valid telemetry request which is then sent by periodic()""" with override_global_config(dict(_telemetry_dependency_collection=False)): - # queue an app started event - event = telemetry_writer._app_started() - assert event is not None, "app_started() did not return an event" - telemetry_writer.add_event(event["payload"], "app-started") - # force a flush + # App started should be queued by the first periodic call telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests() assert len(requests) == 1 assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" @@ -673,13 +633,10 @@ def test_app_closing_event(telemetry_writer, test_agent_session, mock_time): telemetry_writer.started = True # send app closed event telemetry_writer.app_shutdown() - - num_requests = len(test_agent_session.get_requests()) - assert num_requests == 1 # ensure a valid request body was sent events = test_agent_session.get_events("app-closing") assert len(events) == 1 - validate_request_body(events[0], {}, "app-closing", num_requests) + validate_request_body(events[0], {}, "app-closing", 1) def test_add_integration(telemetry_writer, test_agent_session, mock_time): @@ -793,7 +750,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se # Assert next flush contains app-heartbeat event for _ in range(telemetry_writer._periodic_threshold): telemetry_writer.periodic() - assert test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) == [] + assert test_agent_session.get_events(mock.ANY, filter_heartbeats=False) == [] telemetry_writer.periodic() heartbeat_events = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) @@ -805,7 +762,7 @@ def test_app_heartbeat_event(mock_time, telemetry_writer, test_agent_session): """asserts that we queue/send app-heartbeat event every 60 seconds when app_heartbeat_event() is called""" # Assert a maximum of one heartbeat is queued per flush telemetry_writer.periodic(force_flush=True) - events = test_agent_session.get_events("app-heartbeat", filter_heartbeats=False) + events = test_agent_session.get_events(mock.ANY, filter_heartbeats=False) assert len(events) > 0 @@ -815,7 +772,7 @@ def test_app_product_change_event(mock_time, telemetry_writer, test_agent_sessio # Assert that the default product status is disabled assert any(telemetry_writer._product_enablement.values()) is False - + # Assert that the product status is first reported in app-started event telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, True) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.DYNAMIC_INSTRUMENTATION, True) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, True) @@ -828,29 +785,23 @@ def test_app_product_change_event(mock_time, telemetry_writer, test_agent_sessio events = test_agent_session.get_events("app-product-change") telemetry_writer.periodic(force_flush=True) assert not len(events) - # Assert that unchanged status doesn't generate the event telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.PROFILER, True) telemetry_writer.periodic(force_flush=True) events = test_agent_session.get_events("app-product-change") assert not len(events) - - # Assert that a single event is generated + # Assert that product change event is sent when product status changes telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.APPSEC, False) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.DYNAMIC_INSTRUMENTATION, False) telemetry_writer.periodic(force_flush=True) events = test_agent_session.get_events("app-product-change") assert len(events) == 1 - - # Assert that payload is as expected assert events[0]["request_type"] == "app-product-change" products = events[0]["payload"]["products"] version = _pep440_to_semver() assert products == { TELEMETRY_APM_PRODUCT.APPSEC.value: {"enabled": False, "version": version}, TELEMETRY_APM_PRODUCT.DYNAMIC_INSTRUMENTATION.value: {"enabled": False, "version": version}, - TELEMETRY_APM_PRODUCT.LLMOBS.value: {"enabled": True, "version": version}, - TELEMETRY_APM_PRODUCT.PROFILER.value: {"enabled": True, "version": version}, }