Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 61 additions & 36 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from typing import Tuple
from typing import Type

import wrapt

from ddtrace.internal.datadog.profiling import ddup
from ddtrace.profiling import _threading
from ddtrace.profiling import collector
Expand All @@ -34,22 +32,24 @@ def _current_thread() -> Tuple[int, str]:
return thread_id, _threading.get_thread_name(thread_id)


# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will
# appear in the stack trace and we need to hide it.
WRAPT_C_EXT: bool
if os.environ.get("WRAPT_DISABLE_EXTENSIONS"):
WRAPT_C_EXT = False
else:
try:
import wrapt._wrappers as _w # noqa: F401
except ImportError:
WRAPT_C_EXT = False
else:
WRAPT_C_EXT = True
del _w


class _ProfiledLock(wrapt.ObjectProxy):
class _ProfiledLock:
"""Lightweight lock wrapper that profiles lock acquire/release operations.

This is a simple delegating wrapper that intercepts lock methods without
the overhead of a full proxy object.
"""

__slots__ = (
"__wrapped__",
"_self_tracer",
"_self_max_nframes",
"_self_capture_sampler",
"_self_endpoint_collection_enabled",
"_self_init_loc",
"_self_acquired_at",
"_self_name",
)

def __init__(
self,
wrapped: Any,
Expand All @@ -58,12 +58,13 @@ def __init__(
capture_sampler: collector.CaptureSampler,
endpoint_collection_enabled: bool,
) -> None:
wrapt.ObjectProxy.__init__(self, wrapped)
self.__wrapped__: Any = wrapped
self._self_tracer: Optional[Tracer] = tracer
self._self_max_nframes: int = max_nframes
self._self_capture_sampler: collector.CaptureSampler = capture_sampler
self._self_endpoint_collection_enabled: bool = endpoint_collection_enabled
frame: FrameType = sys._getframe(2 if WRAPT_C_EXT else 3)
# Frame depth: 0=__init__, 1=_profiled_allocate_lock, 2=_LockAllocatorWrapper.__call__, 3=caller
frame: FrameType = sys._getframe(3)
code: CodeType = frame.f_code
self._self_init_loc: str = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno)
self._self_acquired_at: int = 0
Expand Down Expand Up @@ -134,11 +135,7 @@ def acquire(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.acquire, *args, **kwargs)

def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
# The underlying threading.Lock class is implemented using C code, and
# it doesn't have the __dict__ attribute. So we can't do
# self.__dict__.pop("_self_acquired_at", None) to remove the attribute.
# Instead, we need to use the following workaround to retrieve and
# remove the attribute.
# Using __slots__ makes attribute handling cleaner than with wrapt.ObjectProxy
start: Optional[int] = getattr(self, "_self_acquired_at", None)
try:
# Though it should generally be avoided to call release() from
Expand Down Expand Up @@ -250,13 +247,39 @@ def _maybe_update_self_name(self) -> None:

if not self._self_name:
self._self_name = ""


class FunctionWrapper(wrapt.FunctionWrapper):
# Override the __get__ method: whatever happens, _allocate_lock is always considered by Python like a "static"
# method, even when used as a class attribute. Python never tried to "bind" it to a method, because it sees it is a
# builtin function. Override default wrapt behavior here that tries to detect bound method.
def __get__(self, instance: Any, owner: Optional[Type] = None) -> FunctionWrapper:

# Delegate remaining lock methods to the wrapped lock
def locked(self) -> bool:
"""Return True if lock is currently held."""
return self.__wrapped__.locked()

def __repr__(self) -> str:
return f"<_ProfiledLock({self.__wrapped__!r}) at {self._self_init_loc}>"

# Support for being used in with statements
def __bool__(self) -> bool:
return True


class _LockAllocatorWrapper:
"""Wrapper for lock allocator functions that prevents method binding.

When a function is stored as a class attribute and accessed via an instance,
Python's descriptor protocol normally binds it as a method. This wrapper
prevents that behavior by implementing __get__ to always return self,
similar to how staticmethod works, but as a callable object.
"""

__slots__ = ("_func",)

def __init__(self, func: Callable[..., Any]) -> None:
self._func: Callable[..., Any] = func

def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self._func(*args, **kwargs)

def __get__(self, instance: Any, owner: Optional[Type] = None) -> "_LockAllocatorWrapper":
# Always return self, never bind as a method
return self


Expand Down Expand Up @@ -303,9 +326,9 @@ def patch(self) -> None:
# Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile.
self._original = self._get_patch_target()

# TODO: `instance` is unused
def _allocate_lock(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> _ProfiledLock:
lock: Any = wrapped(*args, **kwargs)
# Create a simple wrapper function that returns profiled locks
def _profiled_allocate_lock(*args: Any, **kwargs: Any) -> _ProfiledLock:
lock: Any = self._original(*args, **kwargs)
return self.PROFILED_LOCK_CLASS(
lock,
self.tracer,
Expand All @@ -314,7 +337,9 @@ def _allocate_lock(wrapped: Any, instance: Any, args: Any, kwargs: Any) -> _Prof
self.endpoint_collection_enabled,
)

self._set_patch_target(FunctionWrapper(self._original, _allocate_lock))
# Wrap the function to prevent it from being bound as a method when
# accessed as a class attribute (e.g., Foo.lock_class = threading.Lock)
self._set_patch_target(_LockAllocatorWrapper(_profiled_allocate_lock))

def unpatch(self) -> None:
"""Unpatch the threading module for tracking lock allocation."""
Expand Down
74 changes: 10 additions & 64 deletions tests/profiling_v2/collector/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,19 @@ def test_repr():


def test_patch():
from ddtrace.profiling.collector._lock import _LockAllocatorWrapper

lock = threading.Lock
collector = collector_threading.ThreadingLockCollector()
collector.start()
assert lock == collector._original
# wrapt makes this true
assert lock == threading.Lock
# After patching, threading.Lock is replaced with our wrapper
# The old reference (lock) points to the original builtin Lock class
assert lock != threading.Lock # They're different after patching
assert isinstance(threading.Lock, _LockAllocatorWrapper) # threading.Lock is now wrapped
assert callable(threading.Lock) # and it's callable
collector.stop()
# After stopping, everything is restored
assert lock == threading.Lock
assert collector._original == threading.Lock

Expand Down Expand Up @@ -105,68 +111,8 @@ def test_user_threads_have_native_id():
p.stop()


@pytest.mark.subprocess(
env=dict(WRAPT_DISABLE_EXTENSIONS="True", DD_PROFILING_FILE_PATH=__file__),
)
def test_wrapt_disable_extensions():
import os
import threading

from ddtrace.internal.datadog.profiling import ddup
from ddtrace.profiling.collector import _lock
from ddtrace.profiling.collector import threading as collector_threading
from tests.profiling.collector import pprof_utils
from tests.profiling.collector.lock_utils import get_lock_linenos
from tests.profiling.collector.lock_utils import init_linenos

assert ddup.is_available, "ddup is not available"

# Set up the ddup exporter
test_name = "test_wrapt_disable_extensions"
pprof_prefix = "/tmp" + os.sep + test_name
output_filename = pprof_prefix + "." + str(os.getpid())
ddup.config(env="test", service=test_name, version="my_version", output_filename=pprof_prefix)
ddup.start()

init_linenos(os.environ["DD_PROFILING_FILE_PATH"])

# WRAPT_DISABLE_EXTENSIONS is a flag that can be set to disable the C extension
# for wrapt. It's not set by default in dd-trace-py, but it can be set by
# users. This test checks that the collector works even if the flag is set.
assert os.environ.get("WRAPT_DISABLE_EXTENSIONS")
assert _lock.WRAPT_C_EXT is False

with collector_threading.ThreadingLockCollector(capture_pct=100):
th_lock = threading.Lock() # !CREATE! test_wrapt_disable_extensions
with th_lock: # !ACQUIRE! !RELEASE! test_wrapt_disable_extensions
pass

ddup.upload()

expected_filename = "test_threading.py"

linenos = get_lock_linenos("test_wrapt_disable_extensions", with_stmt=True)

profile = pprof_utils.parse_newest_profile(output_filename)
pprof_utils.assert_lock_events(
profile,
expected_acquire_events=[
pprof_utils.LockAcquireEvent(
caller_name="<module>",
filename=expected_filename,
linenos=linenos,
lock_name="th_lock",
)
],
expected_release_events=[
pprof_utils.LockReleaseEvent(
caller_name="<module>",
filename=expected_filename,
linenos=linenos,
lock_name="th_lock",
)
],
)
# test_wrapt_disable_extensions was removed as wrapt is no longer used in lock profiling.
# The lock profiler now uses a simple delegating wrapper with __slots__ for better performance.


# This test has to be run in a subprocess because it calls gevent.monkey.patch_all()
Expand Down