Skip to content

Commit 5f8fb62

Browse files
committed
feat(workflow): support workflow failure
Signed-off-by: Shijie Sheng <[email protected]>
1 parent 890fe83 commit 5f8fb62

File tree

4 files changed

+76
-12
lines changed

4 files changed

+76
-12
lines changed

cadence/_internal/workflow/workflow_engine.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
import logging
22
from dataclasses import dataclass
3+
import traceback
34
from typing import List
45

56
from cadence._internal.workflow.context import Context
67
from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator
78
from cadence._internal.workflow.statemachine.decision_manager import DecisionManager
89
from cadence._internal.workflow.workflow_intance import WorkflowInstance
9-
from cadence.api.v1.common_pb2 import Payload
10+
from cadence.api.v1.common_pb2 import Failure, Payload
1011
from cadence.api.v1.decision_pb2 import (
1112
CompleteWorkflowExecutionDecisionAttributes,
1213
Decision,
14+
FailWorkflowExecutionDecisionAttributes,
1315
)
1416
from cadence.api.v1.history_pb2 import (
1517
HistoryEvent,
1618
WorkflowExecutionStartedEventAttributes,
1719
)
1820
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
21+
from cadence.error import WorkflowFailure
1922
from cadence.workflow import WorkflowDefinition, WorkflowInfo
2023

2124
logger = logging.getLogger(__name__)
@@ -75,23 +78,32 @@ def process_decision(
7578
decisions = self._decision_manager.collect_pending_decisions()
7679

7780
# complete workflow if it is done
78-
try:
79-
if self._workflow_instance.is_done():
81+
if self._workflow_instance.is_done():
82+
try:
8083
result = self._workflow_instance.get_result()
84+
except WorkflowFailure as e:
85+
decisions.append(
86+
Decision(
87+
fail_workflow_execution_decision_attributes=FailWorkflowExecutionDecisionAttributes(
88+
failure=Failure(
89+
reason=str(e),
90+
details=traceback.format_exc().encode("utf-8"),
91+
)
92+
)
93+
)
94+
)
95+
# TODO: handle cancellation error
96+
except Exception:
97+
raise
98+
else:
8199
decisions.append(
82100
Decision(
83101
complete_workflow_execution_decision_attributes=CompleteWorkflowExecutionDecisionAttributes(
84102
result=result
85103
)
86104
)
87105
)
88-
return DecisionResult(decisions=decisions)
89-
90-
except Exception:
91-
# TODO: handle CancellationError
92-
# TODO: handle WorkflowError
93-
# TODO: handle unknown error, fail decision task and try again instead of breaking the engine
94-
raise
106+
return DecisionResult(decisions=decisions)
95107

96108
except Exception as e:
97109
# Log decision task failure with full context (matches Java ReplayDecisionTaskHandler)

cadence/_internal/workflow/workflow_intance.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
from asyncio import Task
1+
from asyncio import CancelledError, InvalidStateError, Task
22
from typing import Any, Optional
33
from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop
44
from cadence.api.v1.common_pb2 import Payload
55
from cadence.data_converter import DataConverter
6+
from cadence.error import WorkflowFailure
67
from cadence.workflow import WorkflowDefinition
78

89

@@ -33,6 +34,11 @@ def is_done(self) -> bool:
3334
def get_result(self) -> Payload:
3435
if self._task is None:
3536
raise RuntimeError("Workflow is not started yet")
36-
result = self._task.result()
37+
try:
38+
result = self._task.result()
39+
except (CancelledError, InvalidStateError) as e:
40+
raise e
41+
except Exception as e:
42+
raise WorkflowFailure(f"Workflow failed: {e}") from e
3743
# TODO: handle result with multiple outputs
3844
return self._data_converter.to_data([result])

cadence/error.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ def __init__(self, message: str) -> None:
66
super().__init__(message)
77

88

9+
class WorkflowFailure(Exception):
10+
def __init__(self, message: str) -> None:
11+
super().__init__(message)
12+
13+
914
class CadenceRpcError(Exception):
1015
def __init__(self, message: str, code: grpc.StatusCode, *args):
1116
super().__init__(message, code, *args)

tests/integration_tests/workflow/test_workflows.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ async def echo(self, message: str) -> str:
2323
return message
2424

2525

26+
@reg.workflow()
27+
class FailureWorkflow:
28+
@workflow.run
29+
async def failure(self, message: str) -> str:
30+
raise Exception("mocked workflow failure")
31+
32+
2633
async def test_simple_workflow(helper: CadenceHelper):
2734
async with helper.worker(reg) as worker:
2835
execution = await worker.client.start_workflow(
@@ -50,6 +57,40 @@ async def test_simple_workflow(helper: CadenceHelper):
5057
)
5158

5259

60+
async def test_workflow_failure(helper: CadenceHelper):
61+
async with helper.worker(reg) as worker:
62+
execution = await worker.client.start_workflow(
63+
"FailureWorkflow",
64+
"hello world",
65+
task_list=worker.task_list,
66+
execution_start_to_close_timeout=timedelta(seconds=10),
67+
)
68+
69+
response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
70+
GetWorkflowExecutionHistoryRequest(
71+
domain=DOMAIN_NAME,
72+
workflow_execution=execution,
73+
wait_for_new_event=True,
74+
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
75+
skip_archival=True,
76+
)
77+
)
78+
79+
assert (
80+
"Workflow failed: mocked workflow failure"
81+
== response.history.events[
82+
-1
83+
].workflow_execution_failed_event_attributes.failure.reason
84+
)
85+
86+
assert (
87+
"""raise Exception("mocked workflow failure")"""
88+
in response.history.events[
89+
-1
90+
].workflow_execution_failed_event_attributes.failure.details.decode()
91+
)
92+
93+
5394
@pytest.mark.skip(reason="Incorrect WorkflowType")
5495
async def test_workflow_fn(helper: CadenceHelper):
5596
async with helper.worker(reg) as worker:

0 commit comments

Comments
 (0)