Skip to content

ref(workflow_engine): Prefetch all the data for detector evaluation #95184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions src/sentry/workflow_engine/processors/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,51 @@
logger = logging.getLogger("sentry.workflow_engine.process_data_source")


# TODO - @saponifi3d - change the text choices to an enum
def bulk_fetch_enabled_detectors(
source_ids: set[str], query_type: str
) -> dict[str, list[Detector]]:
"""
Get all of the enabled detectors for a list of detector source ids and types.
This will also prefetch all the subsequent data models for evaluating the detector.
"""
data_sources = (
DataSource.objects.filter(
source_id__in=source_ids,
type=query_type,
detectors__enabled=True,
)
.prefetch_related(
Prefetch(
"detectors",
queryset=Detector.objects.filter(enabled=True)
.select_related("workflow_condition_group")
.prefetch_related("workflow_condition_group__conditions"),
)
)
.distinct()
)

result: dict[str, list[Detector]] = {}
for data_source in data_sources:
result[data_source.source_id] = list(data_source.detectors.all())

return result


# TODO - @saponifi3d - make query_type optional override, otherwise infer from the data packet.
def process_data_sources[
T
](data_packets: list[DataPacket[T]], query_type: str) -> list[tuple[DataPacket[T], list[Detector]]]:
metrics.incr("workflow_engine.process_data_sources", tags={"query_type": query_type})

data_packet_ids = {packet.source_id for packet in data_packets}

# Fetch all data sources and associated detectors for the given data packets
with sentry_sdk.start_span(op="workflow_engine.process_data_sources.fetch_data_sources"):
data_sources = DataSource.objects.filter(
source_id__in=data_packet_ids,
type=query_type,
detectors__enabled=True,
).prefetch_related(Prefetch("detectors"))

# Build a lookup dict for source_id to detectors
source_id_to_detectors = {ds.source_id: list(ds.detectors.all()) for ds in data_sources}
with sentry_sdk.start_span(op="workflow_engine.process_data_sources.get_enabled_detectors"):
packet_source_ids = {packet.source_id for packet in data_packets}
source_to_detector = bulk_fetch_enabled_detectors(packet_source_ids, query_type)

# Create the result tuples
result = []
for packet in data_packets:
detectors = source_id_to_detectors.get(packet.source_id)
detectors: list[Detector] = source_to_detector.get(packet.source_id, [])

if detectors:
data_packet_tuple = (packet, detectors)
Expand Down
35 changes: 35 additions & 0 deletions tests/sentry/workflow_engine/processors/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ def setUp(self):
self.detector_one = self.create_detector(name="test_detector1")
self.detector_two = self.create_detector(name="test_detector2", type="metric_issue")

self.detector_one.workflow_condition_group = self.create_data_condition_group(
logic_type="any"
)

self.create_data_condition(
condition_group=self.detector_one.workflow_condition_group,
type="eq",
comparison="bar",
condition_result=True,
)
self.detector_one.save()

self.ds1 = self.create_data_source(source_id=self.query.id, type="test")
self.ds1.detectors.set([self.detector_one])

Expand Down Expand Up @@ -141,3 +153,26 @@ def test_metrics_for_many_detectors(self):
2,
tags={"query_type": "test"},
)

def test_sql_cascades(self):
with self.assertNumQueries(3):
"""
There should be 3 total SQL queries for `bulk_fetch_enabled_detectors`:
- Get all the detectors
- Get all the data condition groups for those detectors
- Get all the data conditions for those groups
"""
results = process_data_sources(self.data_packets, "test")

for packet, detectors in results:
# If the detector is not prefetched this will increase the query count
assert all(detector.enabled for detector in detectors)

for detector in detectors:
if detector.workflow_condition_group:
# Trigger a SQL query if not prefetched, and fail the assertion
assert detector.workflow_condition_group.id is not None

for condition in detector.workflow_condition_group.conditions.all():
# Trigger a SQL query if not prefetched, and fail the assertion
assert condition.id is not None
Loading