Skip to content

Commit 520aefd

Browse files
authored
Swallowed cancellation (#879)
* Add SDK Flag for ensure cancellation is respected when an activity is completing * Linting * Revert "Linting" This reverts commit 898ed1d. * Linting * Remove log statements * Remove test until enabled outside of the replayer * Merge * Adding additional replay test, and a global override for a non-replay test * Address some test comments, fix sdk-core commit change
1 parent 4fe535c commit 520aefd

File tree

4 files changed

+1051
-1
lines changed

4 files changed

+1051
-1
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from contextlib import contextmanager
1818
from dataclasses import dataclass
1919
from datetime import timedelta
20+
from enum import IntEnum
2021
from typing import (
2122
Any,
2223
Awaitable,
@@ -367,8 +368,8 @@ def activate(
367368
self._continue_as_new_suggested = act.continue_as_new_suggested
368369
self._time_ns = act.timestamp.ToNanoseconds()
369370
self._is_replaying = act.is_replaying
370-
371371
self._current_thread_id = threading.get_ident()
372+
self._current_internal_flags = act.available_internal_flags
372373
activation_err: Optional[Exception] = None
373374
try:
374375
# Split into job sets with patches, then signals + updates, then
@@ -1613,6 +1614,20 @@ async def run_activity() -> Any:
16131614
# dict with its new seq
16141615
self._pending_activities[handle._seq] = handle
16151616
except asyncio.CancelledError:
1617+
# If an activity future completes at the same time as a cancellation is being processed, the cancellation would be swallowed
1618+
# _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY will correctly reraise the exception
1619+
if handle._result_fut.done():
1620+
# TODO in next release, check sdk flag when not replaying instead of global override, remove the override, and set flag use
1621+
if (
1622+
(
1623+
not self._is_replaying
1624+
and _raise_on_cancelling_completed_activity_override
1625+
)
1626+
or _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY
1627+
in self._current_internal_flags
1628+
):
1629+
# self._current_completion.successful.used_internal_flags.append(WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY)
1630+
raise
16161631
# Send a cancel request to the activity
16171632
handle._apply_cancel_command(self._add_command())
16181633

@@ -3112,3 +3127,13 @@ def _make_unfinished_signal_handler_message(
31123127
[{"name": name, "count": count} for name, count in names.most_common()]
31133128
)
31143129
)
3130+
3131+
3132+
class _WorkflowLogicFlag(IntEnum):
3133+
"""Flags that may be set on task/activation completion to differentiate new from old workflow behavior."""
3134+
3135+
RAISE_ON_CANCELLING_COMPLETED_ACTIVITY = 1
3136+
3137+
3138+
# Used by tests to validate behavior prior to SDK flag becoming default
3139+
_raise_on_cancelling_completed_activity_override = False

tests/worker/test_replayer.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import random
3+
import re
24
import sys
35
import uuid
46
from dataclasses import dataclass
@@ -18,6 +20,7 @@
1820
WorkflowInboundInterceptor,
1921
WorkflowInterceptorClassInput,
2022
)
23+
from temporalio.workflow import NondeterminismError
2124
from tests.helpers import assert_eq_eventually
2225
from tests.worker.test_workflow import (
2326
ActivityAndSignalsWhileWorkflowDown,
@@ -503,3 +506,55 @@ async def test_replayer_alternate_async_ordering() -> None:
503506
"counter-2",
504507
"act-done",
505508
]
509+
510+
511+
@activity.defn
512+
async def short_activity_async():
513+
delay = random.uniform(0.05, 0.15) # 50~150ms delay
514+
await asyncio.sleep(delay)
515+
return 1
516+
517+
518+
@workflow.defn
519+
class QuickActivityWorkflow:
520+
@workflow.run
521+
async def run(self, total_seconds: float = 10.0):
522+
workflow.logger.info("Duration: %f", total_seconds)
523+
end = workflow.now() + timedelta(seconds=total_seconds)
524+
while True:
525+
workflow.logger.info("Stage 1")
526+
res = await workflow.execute_activity(
527+
short_activity_async, schedule_to_close_timeout=timedelta(seconds=10)
528+
)
529+
workflow.logger.info("Stage 2, %s", res)
530+
531+
if workflow.now() > end:
532+
break
533+
534+
535+
async def test_swallowed_activity_cancellation() -> None:
536+
with (
537+
Path(__file__)
538+
.with_name("test_replayer_swallowed_activity_cancellation.json")
539+
.open() as f
540+
):
541+
history = f.read()
542+
with pytest.raises(NondeterminismError):
543+
await Replayer(
544+
workflows=[QuickActivityWorkflow],
545+
interceptors=[WorkerWorkflowResultInterceptor()],
546+
).replay_workflow(WorkflowHistory.from_json("fake", history))
547+
548+
549+
async def test_swallowed_activity_cancellation_no_flag() -> None:
550+
with (
551+
Path(__file__)
552+
.with_name("test_replayer_swallowed_activity_cancellation.json")
553+
.open() as f
554+
):
555+
history = f.read()
556+
history = re.sub(r'"langUsedFlags": \[\s*1\s*]', "", history)
557+
await Replayer(
558+
workflows=[QuickActivityWorkflow],
559+
interceptors=[WorkerWorkflowResultInterceptor()],
560+
).replay_workflow(WorkflowHistory.from_json("fake", history))

0 commit comments

Comments
 (0)