Skip to content

Commit 50d366f

Browse files
committed
Initial pass at WorkflowNotProcessable, and adding logs. this will allow us to have a deterministic result from the evaluation, and simplified logging outside of it.
1 parent 17f21ec commit 50d366f

File tree

3 files changed

+91
-19
lines changed

3 files changed

+91
-19
lines changed

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from dataclasses import asdict, dataclass, replace
33
from datetime import datetime
44
from enum import StrEnum
5-
from typing import DefaultDict
5+
from typing import Any, DefaultDict
66

77
import sentry_sdk
88
from django.db import router, transaction
@@ -33,7 +33,7 @@
3333
)
3434
from sentry.workflow_engine.processors.detector import get_detector_by_event
3535
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
36-
from sentry.workflow_engine.types import WorkflowEventData
36+
from sentry.workflow_engine.types import WorkflowEventData, WorkflowNotProcessable
3737
from sentry.workflow_engine.utils import log_context, scopedstats
3838
from sentry.workflow_engine.utils.metrics import metrics_incr
3939

@@ -464,7 +464,7 @@ def process_workflows(
464464
event_data: WorkflowEventData,
465465
event_start_time: datetime,
466466
detector: Detector | None = None,
467-
) -> set[Workflow]:
467+
) -> set[Workflow] | WorkflowNotProcessable:
468468
"""
469469
This method will get the detector based on the event, and then gather the associated workflows.
470470
Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met,
@@ -478,6 +478,12 @@ def process_workflows(
478478
fire_actions,
479479
)
480480

481+
# This dictionary stores the data for WorkflowNotProcessable
482+
# and is eventually unpacked to pass into the frozen dataclass
483+
extra_data: dict[str, Any] = {
484+
"group_event": event_data.event,
485+
}
486+
481487
try:
482488
if detector is None and isinstance(event_data.event, GroupEvent):
483489
detector = get_detector_by_event(event_data)
@@ -496,7 +502,12 @@ def process_workflows(
496502
)
497503
)
498504
except Detector.DoesNotExist:
499-
return set()
505+
return WorkflowNotProcessable(
506+
message="No Detectors associated with the issue were found",
507+
**extra_data,
508+
)
509+
510+
extra_data["associated_detector"] = [detector]
500511

501512
try:
502513
environment = get_environment_by_event(event_data)
@@ -510,30 +521,53 @@ def process_workflows(
510521
)
511522
)
512523
except Environment.DoesNotExist:
513-
return set()
524+
return WorkflowNotProcessable(
525+
message="Environment for event not found",
526+
**extra_data,
527+
)
514528

515529
if features.has("organizations:workflow-engine-process-workflows-logs", organization):
516530
log_context.set_verbose(True)
517531

518532
workflows = _get_associated_workflows(detector, environment, event_data)
533+
extra_data["workflows"] = workflows
534+
519535
if not workflows:
520-
# If there aren't any workflows, there's nothing to evaluate
521-
return set()
536+
return WorkflowNotProcessable(
537+
message="No workflows are associated with the detector in the event",
538+
**extra_data,
539+
)
522540

523541
triggered_workflows, queue_items_by_workflow_id = evaluate_workflow_triggers(
524542
workflows, event_data, event_start_time
525543
)
544+
545+
extra_data["triggered_workflows"] = triggered_workflows
546+
526547
if not triggered_workflows and not queue_items_by_workflow_id:
527-
# if there aren't any triggered workflows, there's no action filters to evaluate
528-
return set()
548+
return WorkflowNotProcessable(
549+
message="No items were triggered or queued for slow evaluation",
550+
**extra_data,
551+
)
529552

530553
actions_to_trigger, queue_items_by_workflow_id = evaluate_workflows_action_filters(
531554
triggered_workflows, event_data, queue_items_by_workflow_id, event_start_time
532555
)
556+
533557
enqueue_workflows(batch_client, queue_items_by_workflow_id)
558+
559+
# TODO - refactor to have action evaluation separate from workflow evaluation
560+
# The result of processing a workflow, should be a list of actions we want to trigger.
561+
# From here on we are probably doing too many side effects and things related to actions.
534562
actions = filter_recently_fired_workflow_actions(actions_to_trigger, event_data)
535563
sentry_sdk.set_tag("workflow_engine.triggered_actions", len(actions))
536564

565+
extra_data = {
566+
**extra_data,
567+
"actions": actions_to_trigger, # All the actions we found to trigger
568+
"triggered_actions": actions, # All the actions we intend to trigger
569+
}
570+
537571
if not actions:
538572
# If there aren't any actions on the associated workflows, there's nothing to trigger
539573
return triggered_workflows

src/sentry/workflow_engine/tasks/workflows.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import random
22
import time
3+
from dataclasses import asdict
34
from datetime import UTC, datetime
45
from typing import Any
56

@@ -18,17 +19,47 @@
1819
from sentry.utils import metrics
1920
from sentry.utils.exceptions import quiet_redis_noise
2021
from sentry.utils.locking import UnableToAcquireLock
22+
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
2123
from sentry.workflow_engine.models import DataConditionGroup, Detector
2224
from sentry.workflow_engine.tasks.utils import (
2325
EventNotFoundError,
2426
build_workflow_event_data_from_event,
2527
)
26-
from sentry.workflow_engine.types import WorkflowEventData
28+
from sentry.workflow_engine.types import WorkflowEventData, WorkflowNotProcessable
2729
from sentry.workflow_engine.utils import log_context, scopedstats
2830

2931
logger = log_context.get_logger(__name__)
3032

3133

34+
# TODO - this seems like it should live in processors.workflow,
35+
# Then we can create a `process_new_issue` or `process_event` kind
36+
# of method. That will wrap a new `process_actions_with_logs`
37+
# (and `process_actions`) method. Allowing us to have a much cleaner
38+
# interface here, and between those processor methods.
39+
def process_workflows_with_logs(
40+
batch_client: DelayedWorkflowClient,
41+
event_data: WorkflowEventData,
42+
event_start_time: datetime,
43+
detector: Detector | None = None,
44+
) -> None:
45+
"""
46+
This method is used to create improved logging for `process_workflows`,
47+
where we can capture the `WorkflowNotProcessable` results, and log them.
48+
49+
This should create a log for each issue, so we can determine why a workflow
50+
did or did not trigger.
51+
"""
52+
from sentry.workflow_engine.processors.workflow import process_workflows
53+
54+
evaluation = process_workflows(batch_client, event_data, event_start_time, detector)
55+
56+
if isinstance(evaluation, WorkflowNotProcessable):
57+
logger.info("process_workflows.evaluation", extra=asdict(evaluation))
58+
else:
59+
# TODO - Log the triggered workflows
60+
pass
61+
62+
3263
@instrumented_task(
3364
name="sentry.workflow_engine.tasks.process_workflow_activity",
3465
namespace=namespaces.workflow_engine_tasks,
@@ -44,8 +75,6 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int)
4475
The task will get the Activity from the database, create a WorkflowEventData object,
4576
and then process the data in `process_workflows`.
4677
"""
47-
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
48-
from sentry.workflow_engine.processors.workflow import process_workflows
4978

5079
with transaction.atomic(router.db_for_write(Detector)):
5180
try:
@@ -69,9 +98,10 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int)
6998
)
7099
with quiet_redis_noise():
71100
batch_client = DelayedWorkflowClient()
72-
process_workflows(
101+
process_workflows_with_logs(
73102
batch_client, event_data, event_start_time=activity.datetime, detector=detector
74103
)
104+
75105
metrics.incr(
76106
"workflow_engine.tasks.process_workflows.activity_update.executed",
77107
tags={"activity_type": activity.type, "detector_type": detector.type},
@@ -103,11 +133,9 @@ def process_workflows_event(
103133
start_timestamp_seconds: float | None = None,
104134
**kwargs: dict[str, Any],
105135
) -> None:
106-
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
107-
from sentry.workflow_engine.processors.workflow import process_workflows
108-
109136
recorder = scopedstats.Recorder()
110137
start_time = time.time()
138+
111139
with recorder.record():
112140
try:
113141
event_data = build_workflow_event_data_from_event(
@@ -131,7 +159,7 @@ def process_workflows_event(
131159
)
132160
with quiet_redis_noise():
133161
batch_client = DelayedWorkflowClient()
134-
process_workflows(batch_client, event_data, event_start_time=event_start_time)
162+
process_workflows_with_logs(batch_client, event_data, event_start_time=event_start_time)
135163
duration = time.time() - start_time
136164
is_slow = duration > 1.0
137165
# We want full coverage for particularly slow cases, plus a random sampling.
@@ -158,7 +186,6 @@ def schedule_delayed_workflows(**kwargs: Any) -> None:
158186
"""
159187
Schedule delayed workflow buffers in a batch.
160188
"""
161-
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
162189
from sentry.workflow_engine.processors.schedule import process_buffered_workflows
163190

164191
lock_name = "schedule_delayed_workflows"

src/sentry/workflow_engine/types.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from sentry.snuba.models import SnubaQueryEventType
2121
from sentry.workflow_engine.endpoints.validators.base import BaseDetectorTypeValidator
2222
from sentry.workflow_engine.handlers.detector import DetectorHandler
23-
from sentry.workflow_engine.models import Action, Detector
23+
from sentry.workflow_engine.models import Action, Detector, Workflow
2424
from sentry.workflow_engine.models.data_condition import Condition
2525

2626
T = TypeVar("T")
@@ -72,6 +72,17 @@ class WorkflowEventData:
7272
workflow_env: Environment | None = None
7373

7474

75+
@dataclass(frozen=True)
76+
class WorkflowNotProcessable:
77+
group_event: GroupEvent
78+
message: str
79+
actions: list[Action] | None = None
80+
workflows: list[Workflow] | None = None
81+
triggered_actions: list[Action] | None = None
82+
triggered_workflows: list[Workflow] | None = None
83+
associated_detectors: list[Detector] | None = None
84+
85+
7586
class ConfigTransformer(ABC):
7687
"""
7788
A ConfigTransformer is used to transform the config between API and internal representations.

0 commit comments

Comments
 (0)