Skip to content

Commit 9ae8e44

Browse files
authored
fix(co): ensure entrypoint instrumented (#14790)
## Description We make sure that entrypoints are instrumented to attach code origin information on entry span when the feature is enabled remotely.
1 parent ae5562f commit 9ae8e44

File tree

8 files changed

+135
-61
lines changed

8 files changed

+135
-61
lines changed

.gitlab/benchmarks/bp-runner.microbenchmarks.fail-on-breach.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@ experiments:
5656
- name: djangosimple-tracer-minimal
5757
thresholds:
5858
- execution_time < 17.50 ms
59-
- max_rss_usage < 66.00 MB
59+
- max_rss_usage < 67.00 MB
6060
- name: djangosimple-tracer-native
6161
thresholds:
6262
- execution_time < 21.75 ms
6363
- max_rss_usage < 72.50 MB
6464
- name: djangosimple-tracer-and-profiler
6565
thresholds:
6666
- execution_time < 23.50 ms
67-
- max_rss_usage < 67.50 MB
67+
- max_rss_usage < 68.00 MB
6868
- name: djangosimple-tracer-no-caches
6969
thresholds:
7070
- execution_time < 19.65 ms
@@ -76,7 +76,7 @@ experiments:
7676
- name: djangosimple-tracer-dont-create-db-spans
7777
thresholds:
7878
- execution_time < 21.50 ms
79-
- max_rss_usage < 66.00 MB
79+
- max_rss_usage < 67.00 MB
8080
- name: djangosimple-tracer-no-middleware
8181
thresholds:
8282
- execution_time < 21.50 ms

ddtrace/debugging/_debugger.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,10 @@ def _dd_debugger_hook(self, probe: Probe) -> None:
334334
self._probe_registry.set_emitting(probe)
335335

336336
log.debug("[%s][P: %s] Debugger. Report signal %s", os.getpid(), os.getppid(), signal)
337-
self.__uploader__.get_collector().push(signal)
337+
if (collector := self.__uploader__.get_collector()) is None:
338+
log.error("No collector available to push signal %s", signal)
339+
return
340+
collector.push(signal)
338341

339342
except Exception:
340343
log.error("Failed to execute probe hook", exc_info=True)
@@ -474,6 +477,7 @@ def _eject_probes(self, probes_to_eject: List[LineProbe]) -> None:
474477

475478
def _probe_wrapping_hook(self, module: ModuleType) -> None:
476479
probes = self._probe_registry.get_pending(module.__name__)
480+
collector = self.__uploader__.get_collector()
477481
for probe in probes:
478482
if not isinstance(probe, FunctionLocationMixin):
479483
continue
@@ -500,9 +504,13 @@ def _probe_wrapping_hook(self, module: ModuleType) -> None:
500504
function,
501505
)
502506
else:
507+
if collector is None:
508+
log.error("No signal collector available")
509+
self._probe_registry.set_error(probe, "NoCollector", "No signal collector available")
510+
continue
503511
context = DebuggerWrappingContext(
504512
function,
505-
collector=self.__uploader__.get_collector(),
513+
collector=collector,
506514
registry=self._probe_registry,
507515
tracer=self._tracer,
508516
probe_meter=self._probe_meter,

ddtrace/debugging/_exception/replay.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,11 @@ def _attach_tb_frame_snapshot_to_span(
283283
snapshot.do_line()
284284

285285
# Collect
286-
self.__uploader__.get_collector().push(snapshot)
286+
if (collector := self.__uploader__.get_collector()) is None:
287+
log.error("No collector available to push exception replay snapshot")
288+
return False
289+
290+
collector.push(snapshot)
287291

288292
# Memoize
289293
frame.f_locals[SNAPSHOT_KEY] = snapshot_id = snapshot.uuid

ddtrace/debugging/_origin/span.py

Lines changed: 80 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from dataclasses import dataclass
2-
from functools import partial
32
from itertools import count
43
import sys
54
from threading import current_thread
@@ -17,36 +16,30 @@
1716
from ddtrace.debugging._probe.model import LogLineProbe
1817
from ddtrace.debugging._probe.model import ProbeEvalTiming
1918
from ddtrace.debugging._session import Session
20-
from ddtrace.debugging._signal.collector import SignalCollector
2119
from ddtrace.debugging._signal.snapshot import Snapshot
2220
from ddtrace.debugging._uploader import SignalUploader
2321
from ddtrace.debugging._uploader import UploaderProduct
2422
from ddtrace.ext import EXIT_SPAN_TYPES
25-
from ddtrace.internal import core
2623
from ddtrace.internal.compat import Path
24+
from ddtrace.internal.forksafe import Lock
25+
from ddtrace.internal.logger import get_logger
2726
from ddtrace.internal.packages import is_user_code
2827
from ddtrace.internal.safety import _isinstance
2928
from ddtrace.internal.wrapping.context import WrappingContext
3029
from ddtrace.settings.code_origin import config as co_config
3130
from ddtrace.trace import Span
3231

3332

33+
log = get_logger(__name__)
34+
35+
3436
def frame_stack(frame: FrameType) -> t.Iterator[FrameType]:
3537
_frame: t.Optional[FrameType] = frame
3638
while _frame is not None:
3739
yield _frame
3840
_frame = _frame.f_back
3941

4042

41-
def wrap_entrypoint(collector: SignalCollector, f: t.Callable) -> None:
42-
if not _isinstance(f, FunctionType):
43-
return
44-
45-
_f = t.cast(FunctionType, f)
46-
if not EntrySpanWrappingContext.is_wrapped(_f):
47-
EntrySpanWrappingContext(collector, _f).wrap()
48-
49-
5043
@dataclass
5144
class EntrySpanProbe(LogFunctionProbe):
5245
__span_class__ = "entry"
@@ -118,12 +111,13 @@ class EntrySpanLocation:
118111

119112

120113
class EntrySpanWrappingContext(WrappingContext):
114+
__enabled__ = False
121115
__priority__ = 199
122116

123-
def __init__(self, collector: SignalCollector, f: FunctionType) -> None:
117+
def __init__(self, uploader: t.Type[SignalUploader], f: FunctionType) -> None:
124118
super().__init__(f)
125119

126-
self.collector = collector
120+
self.uploader = uploader
127121

128122
filename = str(Path(f.__code__.co_filename).resolve())
129123
name = f.__qualname__
@@ -139,26 +133,30 @@ def __init__(self, collector: SignalCollector, f: FunctionType) -> None:
139133
def __enter__(self):
140134
super().__enter__()
141135

142-
root = ddtrace.tracer.current_root_span()
143-
span = ddtrace.tracer.current_span()
144-
location = self.location
145-
if root is None or span is None or root.get_tag("_dd.entry_location.file") is not None:
146-
return self
136+
if self.__enabled__:
137+
root = ddtrace.tracer.current_root_span()
138+
span = ddtrace.tracer.current_span()
139+
location = self.location
140+
if root is None or span is None or root.get_tag("_dd.entry_location.file") is not None:
141+
return self
147142

148-
# Add tags to the local root
149-
for s in (root, span):
150-
s._set_tag_str("_dd.code_origin.type", "entry")
143+
# Add tags to the local root
144+
for s in (root, span):
145+
s._set_tag_str("_dd.code_origin.type", "entry")
151146

152-
s._set_tag_str("_dd.code_origin.frames.0.file", location.file)
153-
s._set_tag_str("_dd.code_origin.frames.0.line", str(location.line))
154-
s._set_tag_str("_dd.code_origin.frames.0.type", location.module)
155-
s._set_tag_str("_dd.code_origin.frames.0.method", location.name)
147+
s._set_tag_str("_dd.code_origin.frames.0.file", location.file)
148+
s._set_tag_str("_dd.code_origin.frames.0.line", str(location.line))
149+
s._set_tag_str("_dd.code_origin.frames.0.type", location.module)
150+
s._set_tag_str("_dd.code_origin.frames.0.method", location.name)
156151

157-
self.set("start_time", monotonic_ns())
152+
self.set("start_time", monotonic_ns())
158153

159154
return self
160155

161156
def _close_signal(self, retval=None, exc_info=(None, None, None)):
157+
if not self.__enabled__:
158+
return
159+
162160
root = ddtrace.tracer.current_root_span()
163161
span = ddtrace.tracer.current_span()
164162
if root is None or span is None:
@@ -167,6 +165,12 @@ def _close_signal(self, retval=None, exc_info=(None, None, None)):
167165
# Check if we have any level 2 debugging sessions running for the
168166
# current trace
169167
if any(s.level >= 2 for s in Session.from_trace(root.context or span.context)):
168+
try:
169+
start_time: int = self.get("start_time")
170+
except KeyError:
171+
# Context was not opened
172+
return
173+
170174
# Create a snapshot
171175
snapshot = Snapshot(
172176
probe=self.location.probe,
@@ -182,9 +186,10 @@ def _close_signal(self, retval=None, exc_info=(None, None, None)):
182186
root._set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
183187
span._set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
184188

185-
snapshot.do_exit(retval, exc_info, monotonic_ns() - self.get("start_time"))
189+
snapshot.do_exit(retval, exc_info, monotonic_ns() - start_time)
186190

187-
self.collector.push(snapshot)
191+
if (collector := self.uploader.get_collector()) is not None:
192+
collector.push(snapshot)
188193

189194
def __return__(self, retval):
190195
self._close_signal(retval=retval)
@@ -195,42 +200,68 @@ def __exit__(self, exc_type, exc_value, traceback):
195200
super().__exit__(exc_type, exc_value, traceback)
196201

197202

198-
@dataclass
199203
class SpanCodeOriginProcessorEntry:
200204
__uploader__ = SignalUploader
205+
__context_wrapper__ = EntrySpanWrappingContext
201206

202207
_instance: t.Optional["SpanCodeOriginProcessorEntry"] = None
203-
_handler: t.Optional[t.Callable] = None
208+
209+
_pending: t.List = []
210+
_lock = Lock()
211+
212+
@classmethod
213+
def instrument_view(cls, f):
214+
if not _isinstance(f, FunctionType):
215+
return
216+
217+
with cls._lock:
218+
if cls._instance is None:
219+
# Entry span code origin is not enabled, so we defer the
220+
# instrumentation
221+
cls._pending.append(f)
222+
return
223+
224+
_f = t.cast(FunctionType, f)
225+
if not EntrySpanWrappingContext.is_wrapped(_f):
226+
log.debug("Patching entrypoint %r for code origin", f)
227+
EntrySpanWrappingContext(cls.__uploader__, _f).wrap()
204228

205229
@classmethod
206230
def enable(cls):
207231
if cls._instance is not None:
208232
return
209233

210-
cls._instance = cls()
234+
with cls._lock:
235+
cls._instance = cls()
236+
237+
# Instrument the pending views
238+
while cls._pending:
239+
cls.instrument_view(cls._pending.pop())
211240

212241
# Register code origin for span with the snapshot uploader
213-
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN)
242+
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN_ENTRY)
214243

215-
# Register the entrypoint wrapping for entry spans
216-
cls._handler = handler = partial(wrap_entrypoint, cls.__uploader__.get_collector())
217-
core.on("service_entrypoint.patch", handler)
244+
# Enable the context wrapper
245+
cls.__context_wrapper__.__enabled__ = True
246+
247+
log.debug("Code Origin for Spans (entry) enabled")
218248

219249
@classmethod
220250
def disable(cls):
221251
if cls._instance is None:
222252
return
223253

224-
# Unregister the entrypoint wrapping for entry spans
225-
core.reset_listeners("service_entrypoint.patch", cls._handler)
254+
# Disable the context wrapper
255+
cls.__context_wrapper__.__enabled__ = False
256+
226257
# Unregister code origin for span with the snapshot uploader
227-
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN)
258+
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN_ENTRY)
228259

229-
cls._handler = None
230260
cls._instance = None
231261

262+
log.debug("Code Origin for Spans (entry) disabled")
263+
232264

233-
@dataclass
234265
class SpanCodeOriginProcessorExit(SpanProcessor):
235266
__uploader__ = SignalUploader
236267

@@ -283,7 +314,8 @@ def on_span_start(self, span: Span) -> None:
283314
snapshot.do_line()
284315

285316
# Collect
286-
self.__uploader__.get_collector().push(snapshot)
317+
if (collector := self.__uploader__.get_collector()) is not None:
318+
collector.push(snapshot)
287319

288320
# Correlate the snapshot with the span
289321
span._set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)
@@ -299,11 +331,13 @@ def enable(cls):
299331
instance = cls._instance = cls()
300332

301333
# Register code origin for span with the snapshot uploader
302-
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN)
334+
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN_EXIT)
303335

304336
# Register the processor for exit spans
305337
instance.register()
306338

339+
log.debug("Code Origin for Spans (exit) enabled")
340+
307341
@classmethod
308342
def disable(cls):
309343
if cls._instance is None:
@@ -313,6 +347,8 @@ def disable(cls):
313347
cls._instance.unregister()
314348

315349
# Unregister code origin for span with the snapshot uploader
316-
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN)
350+
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN_EXIT)
317351

318352
cls._instance = None
353+
354+
log.debug("Code Origin for Spans (exit) disabled")

ddtrace/debugging/_products/code_origin/span.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
import enum
2+
from functools import partial
3+
import typing as t
24

5+
import ddtrace.internal.core as core
6+
from ddtrace.internal.logger import get_logger
37
from ddtrace.internal.products import manager as product_manager
48
from ddtrace.settings._core import ValueSource
59
from ddtrace.settings.code_origin import config
610

711

12+
log = get_logger(__name__)
13+
814
CO_ENABLED = "DD_CODE_ORIGIN_FOR_SPANS_ENABLED"
915
DI_PRODUCT_KEY = "dynamic-instrumentation"
1016

@@ -23,18 +29,26 @@ def _start():
2329

2430

2531
def start():
26-
if config.span.enabled:
32+
# We need to instrument the entrypoints on boot because this is the only
33+
# time the tracer will notify us of entrypoints being registered.
34+
@partial(core.on, "service_entrypoint.patch")
35+
def _(f: t.Callable) -> None:
2736
from ddtrace.debugging._origin.span import SpanCodeOriginProcessorEntry
37+
38+
SpanCodeOriginProcessorEntry.instrument_view(f)
39+
40+
log.debug("Registered entrypoint patching hook for code origin for spans")
41+
42+
if config.span.enabled:
2843
from ddtrace.debugging._origin.span import SpanCodeOriginProcessorExit
2944

30-
SpanCodeOriginProcessorEntry.enable()
3145
SpanCodeOriginProcessorExit.enable()
46+
47+
_start()
3248
# If dynamic instrumentation is enabled, and code origin for spans is not explicitly disabled,
3349
# we'll enable entry spans only.
3450
elif product_manager.is_enabled(DI_PRODUCT_KEY) and config.value_source(CO_ENABLED) == ValueSource.DEFAULT:
35-
from ddtrace.debugging._origin.span import SpanCodeOriginProcessorEntry
36-
37-
SpanCodeOriginProcessorEntry.enable()
51+
_start()
3852

3953

4054
def restart(join=False):

ddtrace/debugging/_uploader.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ class UploaderProduct(str, Enum):
3232

3333
DEBUGGER = "dynamic_instrumentation"
3434
EXCEPTION_REPLAY = "exception_replay"
35-
CODE_ORIGIN_SPAN = "code_origin.span"
35+
CODE_ORIGIN_SPAN_ENTRY = "code_origin.span.entry"
36+
CODE_ORIGIN_SPAN_EXIT = "code_origin.span.exit"
3637

3738

3839
@dataclass
@@ -220,11 +221,8 @@ def online(self) -> None:
220221
on_shutdown = online
221222

222223
@classmethod
223-
def get_collector(cls) -> SignalCollector:
224-
if cls._instance is None:
225-
raise RuntimeError("No products registered with the uploader")
226-
227-
return cls._instance._collector
224+
def get_collector(cls) -> Optional[SignalCollector]:
225+
return cls._instance._collector if cls._instance is not None else None
228226

229227
@classmethod
230228
def register(cls, product: UploaderProduct) -> None:

0 commit comments

Comments
 (0)