Skip to content

Nexus cancellation types #981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporalio/worker/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
input: InputT
schedule_to_close_timeout: Optional[timedelta]
cancellation_type: temporalio.workflow.NexusOperationCancellationType
headers: Optional[Mapping[str, str]]
output_type: Optional[Type[OutputT]] = None

Expand Down
67 changes: 38 additions & 29 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@
import temporalio.bridge.proto.activity_result
import temporalio.bridge.proto.child_workflow
import temporalio.bridge.proto.common
import temporalio.bridge.proto.nexus
import temporalio.bridge.proto.workflow_activation
import temporalio.bridge.proto.workflow_commands
import temporalio.bridge.proto.workflow_completion
import temporalio.common
import temporalio.converter
import temporalio.exceptions
import temporalio.nexus
import temporalio.workflow
from temporalio.service import __version__

Expand Down Expand Up @@ -881,9 +881,17 @@ def _apply_resolve_nexus_operation(
) -> None:
handle = self._pending_nexus_operations.pop(job.seq, None)
if not handle:
raise RuntimeError(
f"Failed to find nexus operation handle for job sequence number {job.seq}"
)
# One way this can occur is:
# 1. Cancel request issued with cancellation_type=WaitRequested.
# 2. Server receives nexus cancel handler task completion and writes
# NexusOperationCancelRequestCompleted / NexusOperationCancelRequestFailed. On
# consuming this event, core sends an activation resolving the handle future as
# completed / failed.
# 4. Subsequently, the nexus operation completes as completed/failed, causing the server
# to write NexusOperationCompleted / NexusOperationFailed. On consuming this event,
# core sends an activation which would attempt to resolve the handle future as
# completed / failed, but it has already been resolved.
return

# Handle the four oneof variants of NexusOperationResult
result = job.result
Expand Down Expand Up @@ -1500,9 +1508,10 @@ async def workflow_start_nexus_operation(
service: str,
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
input: Any,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
headers: Optional[Mapping[str, str]] = None,
output_type: Optional[Type[OutputT]],
schedule_to_close_timeout: Optional[timedelta],
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
headers: Optional[Mapping[str, str]],
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
# start_nexus_operation
return await self._outbound.start_nexus_operation(
Expand All @@ -1513,6 +1522,7 @@ async def workflow_start_nexus_operation(
input=input,
output_type=output_type,
schedule_to_close_timeout=schedule_to_close_timeout,
cancellation_type=cancellation_type,
headers=headers,
)
)
Expand Down Expand Up @@ -1824,20 +1834,19 @@ async def run_child() -> Any:
async def _outbound_start_nexus_operation(
self, input: StartNexusOperationInput[Any, OutputT]
) -> _NexusOperationHandle[OutputT]:
# A Nexus operation handle contains two futures: self._start_fut is resolved as a
# result of the Nexus operation starting (activation job:
# resolve_nexus_operation_start), and self._result_fut is resolved as a result of
# the Nexus operation completing (activation job: resolve_nexus_operation). The
# handle itself corresponds to an asyncio.Task which waits on self.result_fut,
# handling CancelledError by emitting a RequestCancelNexusOperation command. We do
# not return the handle until we receive resolve_nexus_operation_start, like
# ChildWorkflowHandle and unlike ActivityHandle. Note that a Nexus operation may
# complete synchronously (in which case both jobs will be sent in the same
# activation, and start will be resolved without an operation token), or
# asynchronously (in which case start they may be sent in separate activations,
# and start will be resolved with an operation token). See comments in
# tests/worker/test_nexus.py for worked examples of the evolution of the resulting
# handle state machine in the sync and async Nexus response cases.
# A Nexus operation handle contains two futures: self._start_fut is resolved as a result of
# the Nexus operation starting (activation job: resolve_nexus_operation_start), and
# self._result_fut is resolved as a result of the Nexus operation completing (activation
# job: resolve_nexus_operation). The handle itself corresponds to an asyncio.Task which
# waits on self.result_fut, handling CancelledError by emitting a
# RequestCancelNexusOperation command. We do not return the handle until we receive
# resolve_nexus_operation_start, like ChildWorkflowHandle and unlike ActivityHandle. Note
# that a Nexus operation may complete synchronously (in which case both jobs will be sent in
# the same activation, and start will be resolved without an operation token), or
# asynchronously (in which case they may be sent in separate activations, and start will be
# resolved with an operation token). See comments in tests/worker/test_nexus.py for worked
# examples of the evolution of the resulting handle state machine in the sync and async
# Nexus response cases.
handle: _NexusOperationHandle[OutputT]

async def operation_handle_fn() -> OutputT:
Expand Down Expand Up @@ -2758,7 +2767,7 @@ def _apply_schedule_command(
if self._input.retry_policy:
self._input.retry_policy.apply_to_proto(v.retry_policy)
v.cancellation_type = cast(
"temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType",
temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType,
int(self._input.cancellation_type),
)

Expand Down Expand Up @@ -2894,7 +2903,7 @@ def _apply_start_command(self) -> None:
if self._input.task_timeout:
v.workflow_task_timeout.FromTimedelta(self._input.task_timeout)
v.parent_close_policy = cast(
"temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType",
temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType,
int(self._input.parent_close_policy),
)
v.workflow_id_reuse_policy = cast(
Expand All @@ -2916,7 +2925,7 @@ def _apply_start_command(self) -> None:
self._input.search_attributes, v.search_attributes
)
v.cancellation_type = cast(
"temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType",
temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType,
int(self._input.cancellation_type),
)
if self._input.versioning_intent:
Expand Down Expand Up @@ -3012,11 +3021,6 @@ def __init__(

@property
def operation_token(self) -> Optional[str]:
# TODO(nexus-preview): How should this behave?
# Java has a separate class that only exists if the operation token exists:
# https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java#L26
# And Go similar:
# https://github.com/temporalio/sdk-go/blob/master/internal/workflow.go#L2770-L2771
try:
return self._start_fut.result()
except BaseException:
Expand Down Expand Up @@ -3065,6 +3069,11 @@ def _apply_schedule_command(self) -> None:
v.schedule_to_close_timeout.FromTimedelta(
self._input.schedule_to_close_timeout
)
v.cancellation_type = cast(
temporalio.bridge.proto.nexus.NexusOperationCancellationType.ValueType,
int(self._input.cancellation_type),
)

if self._input.headers:
for key, val in self._input.headers.items():
v.nexus_header[key] = val
Expand Down
68 changes: 62 additions & 6 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,10 @@ async def workflow_start_nexus_operation(
service: str,
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
input: Any,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
headers: Optional[Mapping[str, str]] = None,
output_type: Optional[Type[OutputT]],
schedule_to_close_timeout: Optional[timedelta],
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
headers: Optional[Mapping[str, str]],
) -> NexusOperationHandle[OutputT]: ...

@abstractmethod
Expand Down Expand Up @@ -1322,9 +1323,9 @@ async def sleep(
This can be in single-line Temporal markdown format.
"""
await _Runtime.current().workflow_sleep(
duration=duration.total_seconds()
if isinstance(duration, timedelta)
else duration,
duration=(
duration.total_seconds() if isinstance(duration, timedelta) else duration
),
summary=summary,
)

Expand Down Expand Up @@ -4413,6 +4414,8 @@ class NexusOperationHandle(Generic[OutputT]):
This API is experimental and unstable.
"""

# TODO(nexus-preview): should attempts to instantiate directly throw?

def cancel(self) -> bool:
"""Request cancellation of the operation."""
raise NotImplementedError
Expand Down Expand Up @@ -5138,6 +5141,43 @@ def _to_proto(self) -> temporalio.bridge.proto.common.VersioningIntent.ValueType
ServiceT = TypeVar("ServiceT")


class NexusOperationCancellationType(IntEnum):
"""Defines behavior of a Nexus operation when the caller workflow initiates cancellation.

Pass one of these values to :py:meth:`NexusClient.start_operation` to define cancellation
behavior.

To initiate cancellation, use :py:meth:`NexusOperationHandle.cancel` and then `await` the
operation handle. This will result in a :py:class:`exceptions.NexusOperationError`. The values
of this enum define what is guaranteed to have happened by that point.
"""

ABANDON = int(temporalio.bridge.proto.nexus.NexusOperationCancellationType.ABANDON)
"""Do not send any cancellation request to the operation handler; just report cancellation to the caller"""

TRY_CANCEL = int(
temporalio.bridge.proto.nexus.NexusOperationCancellationType.TRY_CANCEL
)
"""Send a cancellation request but immediately report cancellation to the caller. Note that this
does not guarantee that cancellation is delivered to the operation handler if the caller exits
before the delivery is done.
"""

WAIT_REQUESTED = int(
temporalio.bridge.proto.nexus.NexusOperationCancellationType.WAIT_CANCELLATION_REQUESTED
)
"""Send a cancellation request and wait for confirmation that the request was received.
Does not wait for the operation to complete.
"""

WAIT_COMPLETED = int(
temporalio.bridge.proto.nexus.NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED
)
"""Send a cancellation request and wait for the operation to complete.
Note that the operation may not complete as cancelled (for example, if it catches the
:py:exc:`asyncio.CancelledError` resulting from the cancellation request)."""


class NexusClient(ABC, Generic[ServiceT]):
"""A client for invoking Nexus operations.

Expand Down Expand Up @@ -5168,6 +5208,7 @@ async def start_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> NexusOperationHandle[OutputT]: ...

Expand All @@ -5181,6 +5222,7 @@ async def start_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> NexusOperationHandle[OutputT]: ...

Expand All @@ -5197,6 +5239,7 @@ async def start_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> NexusOperationHandle[OutputT]: ...

Expand All @@ -5213,6 +5256,7 @@ async def start_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> NexusOperationHandle[OutputT]: ...

Expand All @@ -5229,6 +5273,7 @@ async def start_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> NexusOperationHandle[OutputT]: ...

Expand All @@ -5240,6 +5285,7 @@ async def start_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> Any:
"""Start a Nexus operation and return its handle.
Expand Down Expand Up @@ -5269,6 +5315,7 @@ async def execute_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> OutputT: ...

Expand All @@ -5282,6 +5329,7 @@ async def execute_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> OutputT: ...

Expand All @@ -5298,6 +5346,7 @@ async def execute_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> OutputT: ...

Expand All @@ -5317,6 +5366,7 @@ async def execute_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> OutputT: ...

Expand All @@ -5333,6 +5383,7 @@ async def execute_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> OutputT: ...

Expand All @@ -5344,6 +5395,7 @@ async def execute_operation(
*,
output_type: Optional[Type[OutputT]] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> Any:
"""Execute a Nexus operation and return its result.
Expand Down Expand Up @@ -5395,6 +5447,7 @@ async def start_operation(
*,
output_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> Any:
return (
Expand All @@ -5405,6 +5458,7 @@ async def start_operation(
input=input,
output_type=output_type,
schedule_to_close_timeout=schedule_to_close_timeout,
cancellation_type=cancellation_type,
headers=headers,
)
)
Expand All @@ -5416,13 +5470,15 @@ async def execute_operation(
*,
output_type: Optional[Type] = None,
schedule_to_close_timeout: Optional[timedelta] = None,
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
headers: Optional[Mapping[str, str]] = None,
) -> Any:
handle = await self.start_operation(
operation,
input,
output_type=output_type,
schedule_to_close_timeout=schedule_to_close_timeout,
cancellation_type=cancellation_type,
headers=headers,
)
return await handle
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
"system.enableDeploymentVersions=true",
"--dynamic-config-value",
"frontend.activityAPIsEnabled=true",
"--dynamic-config-value",
"component.nexusoperations.recordCancelRequestCompletionEvents=true",
"--http-port",
str(http_port),
],
Expand Down
Loading