Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 12 additions & 48 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@
)
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
from sentry.workflow_engine.types import (
WorkflowEvaluation,
WorkflowEvaluationData,
WorkflowEventData,
)
from sentry.workflow_engine.types import WorkflowEventData
from sentry.workflow_engine.utils import log_context, scopedstats
from sentry.workflow_engine.utils.metrics import metrics_incr

Expand Down Expand Up @@ -468,7 +464,7 @@ def process_workflows(
event_data: WorkflowEventData,
event_start_time: datetime,
detector: Detector | None = None,
) -> WorkflowEvaluation:
) -> set[Workflow]:
"""
This method will get the detector based on the event, and then gather the associated workflows.
Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met,
Expand All @@ -482,8 +478,6 @@ def process_workflows(
fire_actions,
)

workflow_evaluation_data = WorkflowEvaluationData(group_event=event_data.event)

try:
if detector is None and isinstance(event_data.event, GroupEvent):
detector = get_detector_by_event(event_data)
Expand All @@ -502,13 +496,7 @@ def process_workflows(
)
)
except Detector.DoesNotExist:
return WorkflowEvaluation(
tainted=True,
msg="No Detectors associated with the issue were found",
data=workflow_evaluation_data,
)

workflow_evaluation_data.associated_detector = detector
return set()

try:
environment = get_environment_by_event(event_data)
Expand All @@ -522,58 +510,33 @@ def process_workflows(
)
)
except Environment.DoesNotExist:
return WorkflowEvaluation(
tainted=True,
msg="Environment for event not found",
data=workflow_evaluation_data,
)
return set()

if features.has("organizations:workflow-engine-process-workflows-logs", organization):
log_context.set_verbose(True)

workflows = _get_associated_workflows(detector, environment, event_data)
workflow_evaluation_data.workflows = workflows

if not workflows:
return WorkflowEvaluation(
tainted=True,
msg="No workflows are associated with the detector in the event",
data=workflow_evaluation_data,
)
# If there aren't any workflows, there's nothing to evaluate
return set()

triggered_workflows, queue_items_by_workflow_id = evaluate_workflow_triggers(
workflows, event_data, event_start_time
)

workflow_evaluation_data.triggered_workflows = triggered_workflows

if not triggered_workflows and not queue_items_by_workflow_id:
return WorkflowEvaluation(
tainted=True,
msg="No items were triggered or queued for slow evaluation",
data=workflow_evaluation_data,
)
# if there aren't any triggered workflows, there's no action filters to evaluate
return set()

# TODO - we should probably return here and have the rest from here be
# `process_actions`, this will take a list of "triggered_workflows"
actions_to_trigger, queue_items_by_workflow_id = evaluate_workflows_action_filters(
triggered_workflows, event_data, queue_items_by_workflow_id, event_start_time
)

enqueue_workflows(batch_client, queue_items_by_workflow_id)

actions = filter_recently_fired_workflow_actions(actions_to_trigger, event_data)
sentry_sdk.set_tag("workflow_engine.triggered_actions", len(actions))

workflow_evaluation_data.action_groups = actions_to_trigger
workflow_evaluation_data.triggered_actions = set(actions)

if not actions:
return WorkflowEvaluation(
tainted=True,
msg="No actions to evaluate; filtered or not triggered",
data=workflow_evaluation_data,
)
# If there aren't any actions on the associated workflows, there's nothing to trigger
return triggered_workflows

should_trigger_actions = should_fire_workflow_actions(organization, event_data.group.type)
create_workflow_fire_histories(
Expand All @@ -586,4 +549,5 @@ def process_workflows(
)

fire_actions(actions, detector, event_data)
return WorkflowEvaluation(tainted=False, msg=None, data=workflow_evaluation_data)

return triggered_workflows
16 changes: 5 additions & 11 deletions src/sentry/workflow_engine/tasks/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from sentry.utils import metrics
from sentry.utils.exceptions import quiet_redis_noise
from sentry.utils.locking import UnableToAcquireLock
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
from sentry.workflow_engine.models import DataConditionGroup, Detector
from sentry.workflow_engine.tasks.utils import (
EventNotFoundError,
Expand All @@ -45,6 +44,7 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int)
The task will get the Activity from the database, create a WorkflowEventData object,
and then process the data in `process_workflows`.
"""
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
from sentry.workflow_engine.processors.workflow import process_workflows

with transaction.atomic(router.db_for_write(Detector)):
Expand All @@ -69,12 +69,9 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int)
)
with quiet_redis_noise():
batch_client = DelayedWorkflowClient()
evaluation = process_workflows(
process_workflows(
batch_client, event_data, event_start_time=activity.datetime, detector=detector
)

evaluation.to_log(logger)

metrics.incr(
"workflow_engine.tasks.process_workflows.activity_update.executed",
tags={"activity_type": activity.type, "detector_type": detector.type},
Expand Down Expand Up @@ -106,11 +103,11 @@ def process_workflows_event(
start_timestamp_seconds: float | None = None,
**kwargs: dict[str, Any],
) -> None:
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
from sentry.workflow_engine.processors.workflow import process_workflows

recorder = scopedstats.Recorder()
start_time = time.time()

with recorder.record():
try:
event_data = build_workflow_event_data_from_event(
Expand All @@ -134,11 +131,7 @@ def process_workflows_event(
)
with quiet_redis_noise():
batch_client = DelayedWorkflowClient()
evaluation = process_workflows(
batch_client, event_data, event_start_time=event_start_time
)

evaluation.to_log(logger)
process_workflows(batch_client, event_data, event_start_time=event_start_time)
duration = time.time() - start_time
is_slow = duration > 1.0
# We want full coverage for particularly slow cases, plus a random sampling.
Expand All @@ -165,6 +158,7 @@ def schedule_delayed_workflows(**kwargs: Any) -> None:
"""
Schedule delayed workflow buffers in a batch.
"""
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
from sentry.workflow_engine.processors.schedule import process_buffered_workflows

lock_name = "schedule_delayed_workflows"
Expand Down
55 changes: 2 additions & 53 deletions src/sentry/workflow_engine/types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import asdict, dataclass, field
from dataclasses import dataclass, field
from enum import IntEnum, StrEnum
from logging import Logger
from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypedDict, TypeVar

from sentry.types.group import PriorityLevel
Expand All @@ -21,7 +20,7 @@
from sentry.snuba.models import SnubaQueryEventType
from sentry.workflow_engine.endpoints.validators.base import BaseDetectorTypeValidator
from sentry.workflow_engine.handlers.detector import DetectorHandler
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
from sentry.workflow_engine.models import Action, Detector
from sentry.workflow_engine.models.data_condition import Condition

T = TypeVar("T")
Expand Down Expand Up @@ -73,56 +72,6 @@ class WorkflowEventData:
workflow_env: Environment | None = None


@dataclass
class WorkflowEvaluationData:
group_event: GroupEvent | Activity
action_groups: set[DataConditionGroup] | None = None
workflows: set[Workflow] | None = None
triggered_actions: set[Action] | None = None
triggered_workflows: set[Workflow] | None = None
associated_detector: Detector | None = None


@dataclass(frozen=True)
class WorkflowEvaluation:
"""
This is the result of `process_workflows`, and is used to
encapsulate different stages of completion for the method.

The `tainted` flag is used to indicate whether or not actions
have been triggered during the workflows evaluation.

The `msg` field is used for debug information during the evaluation.

The `data` attribute will include all the data used to evaluate the
workflows, and determine if an action should be triggered.
"""

tainted: bool
msg: str | None
data: WorkflowEvaluationData

def to_log(self, logger: Logger) -> None:
"""
Determines how far in the process the evaluation got to
and creates a structured log string to quickly find.

Then this will return the that log string, and the
relevant processing data to be logged.
"""
log_str = "workflow_engine.process_workflows.evaluation"

if self.tainted:
if self.data.triggered_workflows is None:
log_str = f"{log_str}.workflows.not_triggered"
else:
log_str = f"{log_str}.workflows.triggered"
else:
log_str = f"{log_str}.actions.triggered"

logger.info(log_str, extra={**asdict(self.data), "msg": self.msg})


class ConfigTransformer(ABC):
"""
A ConfigTransformer is used to transform the config between API and internal representations.
Expand Down
42 changes: 19 additions & 23 deletions tests/sentry/workflow_engine/processors/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ def test_skips_disabled_workflows(self) -> None:
workflow=workflow,
)

result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert result.data.triggered_workflows == {self.error_workflow}
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert triggered_workflows == {self.error_workflow}

def test_error_event(self) -> None:
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert result.data.triggered_workflows == {self.error_workflow}
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert triggered_workflows == {self.error_workflow}

@patch("sentry.workflow_engine.processors.action.fire_actions")
def test_process_workflows_event(self, mock_fire_actions: MagicMock) -> None:
Expand Down Expand Up @@ -161,9 +161,9 @@ def test_populate_workflow_env_for_filters(self, mock_filter: MagicMock) -> None
assert self.event_data.group_state
self.event_data.group_state["is_new"] = True

result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
process_workflows(self.batch_client, self.event_data, FROZEN_TIME)

mock_filter.assert_called_with({workflow_filters}, self.event_data)
assert result.tainted is False

def test_same_environment_only(self) -> None:
env = self.create_environment(project=self.project)
Expand Down Expand Up @@ -208,15 +208,15 @@ def test_same_environment_only(self) -> None:
workflow=mismatched_env_workflow,
)

result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert result.data.triggered_workflows == {self.error_workflow, matching_env_workflow}
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert triggered_workflows == {self.error_workflow, matching_env_workflow}

def test_issue_occurrence_event(self) -> None:
issue_occurrence = self.build_occurrence(evidence_data={"detector_id": self.detector.id})
self.group_event.occurrence = issue_occurrence

result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert result.data.triggered_workflows == {self.workflow}
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert triggered_workflows == {self.workflow}

def test_regressed_event(self) -> None:
dcg = self.create_data_condition_group()
Expand All @@ -233,16 +233,17 @@ def test_regressed_event(self) -> None:
workflow=workflow,
)

result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert result.data.triggered_workflows == {self.error_workflow, workflow}
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert triggered_workflows == {self.error_workflow, workflow}

@patch("sentry.utils.metrics.incr")
@patch("sentry.workflow_engine.processors.detector.logger")
def test_no_detector(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None:
self.group_event.occurrence = self.build_occurrence(evidence_data={})

result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert result.msg == "No Detectors associated with the issue were found"
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)

assert not triggered_workflows

mock_incr.assert_called_once_with("workflow_engine.detectors.error")
mock_logger.exception.assert_called_once_with(
Expand All @@ -259,10 +260,9 @@ def test_no_detector(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None
def test_no_environment(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None:
Environment.objects.all().delete()
cache.clear()
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)

assert not result.data.triggered_workflows
assert result.msg == "Environment for event not found"
assert not triggered_workflows

mock_incr.assert_called_once_with(
"workflow_engine.process_workflows.error", 1, tags={"detector_type": "error"}
Expand Down Expand Up @@ -338,12 +338,8 @@ def test_defaults_to_error_workflows(self) -> None:
self.group_event.occurrence = issue_occurrence
self.group.update(type=issue_occurrence.type.type_id)

result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)

assert result.tainted is True
assert result.data.triggered_workflows == {self.error_workflow}
assert result.data.triggered_actions is not None
assert len(result.data.triggered_actions) == 0
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
assert triggered_workflows == {self.error_workflow}


class TestEvaluateWorkflowTriggers(BaseWorkflowTest):
Expand Down
Loading
Loading