Skip to content

Swallowed cancellation #879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 2, 2025
27 changes: 26 additions & 1 deletion temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import timedelta
from enum import IntEnum
from typing import (
Any,
Awaitable,
Expand Down Expand Up @@ -367,8 +368,8 @@ def activate(
self._continue_as_new_suggested = act.continue_as_new_suggested
self._time_ns = act.timestamp.ToNanoseconds()
self._is_replaying = act.is_replaying

self._current_thread_id = threading.get_ident()
self._current_internal_flags = act.available_internal_flags
activation_err: Optional[Exception] = None
try:
# Split into job sets with patches, then signals + updates, then
Expand Down Expand Up @@ -1613,6 +1614,20 @@ async def run_activity() -> Any:
# dict with its new seq
self._pending_activities[handle._seq] = handle
except asyncio.CancelledError:
# If an activity future completes at the same time as a cancellation is being processed, the cancellation would be swallowed
# _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY will correctly reraise the exception
if handle._result_fut.done():
# TODO in next release, check sdk flag when not replaying instead of global override, remove the override, and set flag use
if (
(
not self._is_replaying
and _raise_on_cancelling_completed_activity_override
)
or _WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY
in self._current_internal_flags
):
# self._current_completion.successful.used_internal_flags.append(WorkflowLogicFlag.RAISE_ON_CANCELLING_COMPLETED_ACTIVITY)
raise
# Send a cancel request to the activity
handle._apply_cancel_command(self._add_command())

Expand Down Expand Up @@ -3112,3 +3127,13 @@ def _make_unfinished_signal_handler_message(
[{"name": name, "count": count} for name, count in names.most_common()]
)
)


class _WorkflowLogicFlag(IntEnum):
"""Flags that may be set on task/activation completion to differentiate new from old workflow behavior."""

RAISE_ON_CANCELLING_COMPLETED_ACTIVITY = 1


# Used by tests to validate behavior prior to SDK flag becoming default
_raise_on_cancelling_completed_activity_override = False
55 changes: 55 additions & 0 deletions tests/worker/test_replayer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import random
import re
import sys
import uuid
from dataclasses import dataclass
Expand All @@ -18,6 +20,7 @@
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)
from temporalio.workflow import NondeterminismError
from tests.helpers import assert_eq_eventually
from tests.worker.test_workflow import (
ActivityAndSignalsWhileWorkflowDown,
Expand Down Expand Up @@ -503,3 +506,55 @@ async def test_replayer_alternate_async_ordering() -> None:
"counter-2",
"act-done",
]


@activity.defn
async def short_activity_async():
delay = random.uniform(0.05, 0.15) # 50~150ms delay
await asyncio.sleep(delay)
return 1


@workflow.defn
class QuickActivityWorkflow:
@workflow.run
async def run(self, total_seconds: float = 10.0):
workflow.logger.info("Duration: %f", total_seconds)
end = workflow.now() + timedelta(seconds=total_seconds)
while True:
workflow.logger.info("Stage 1")
res = await workflow.execute_activity(
short_activity_async, schedule_to_close_timeout=timedelta(seconds=10)
)
workflow.logger.info("Stage 2, %s", res)

if workflow.now() > end:
break


async def test_swallowed_activity_cancellation() -> None:
with (
Path(__file__)
.with_name("test_replayer_swallowed_activity_cancellation.json")
.open() as f
):
history = f.read()
with pytest.raises(NondeterminismError):
await Replayer(
workflows=[QuickActivityWorkflow],
interceptors=[WorkerWorkflowResultInterceptor()],
).replay_workflow(WorkflowHistory.from_json("fake", history))


async def test_swallowed_activity_cancellation_no_flag() -> None:
with (
Path(__file__)
.with_name("test_replayer_swallowed_activity_cancellation.json")
.open() as f
):
history = f.read()
history = re.sub(r'"langUsedFlags": \[\s*1\s*]', "", history)
await Replayer(
workflows=[QuickActivityWorkflow],
interceptors=[WorkerWorkflowResultInterceptor()],
).replay_workflow(WorkflowHistory.from_json("fake", history))
Loading
Loading