Skip to content

Commit 2eb967a

Browse files
committed
WIP: complete Nexus task as failure
1 parent c8321d6 commit 2eb967a

File tree

2 files changed

+47
-37
lines changed

2 files changed

+47
-37
lines changed

temporalio/converter.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,7 @@ def _error_to_failure(
901901
failure.child_workflow_execution_failure_info.retry_state = (
902902
temporalio.api.enums.v1.RetryState.ValueType(error.retry_state or 0)
903903
)
904+
# TODO(dan): test coverage for this
904905
elif isinstance(error, temporalio.exceptions.NexusOperationError):
905906
failure.nexus_operation_execution_failure_info.SetInParent()
906907
failure.nexus_operation_execution_failure_info.operation_token = (
@@ -1073,6 +1074,7 @@ def from_failure(
10731074
# string operation_token = 6;
10741075
# }
10751076
# TODO(dan)
1077+
# This is covered by cancellation tests
10761078
nexus_op_failure_info = failure.nexus_operation_execution_failure_info
10771079
err = temporalio.exceptions.NexusOperationError(
10781080
failure.message or "Nexus operation error",

temporalio/worker/_nexus.py

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -245,50 +245,58 @@ async def _run_nexus_operation(
245245
# bytes details = 3;
246246
# }
247247

248-
completion = temporalio.bridge.proto.nexus.NexusTaskCompletion(
249-
task_token=task_token,
250-
)
251-
try:
252-
result = await start(input, options)
253-
except BaseException:
254-
# await self._data_converter.encode_failure(err, completion.error.failure)
255-
# TODO(dan): mirror appropriate aspects of _run_activity error handling
256-
raise NotImplementedError(
257-
"TODO: Nexus operation error handling not implemented"
258-
)
259-
260-
try:
261-
# Send task completion
262-
if isinstance(result, nexusrpc.handler.StartOperationAsyncResult):
263-
print(f"🟢 Nexus operation started with async response {result}")
264-
op_resp = temporalio.api.nexus.v1.StartOperationResponse(
265-
async_success=temporalio.api.nexus.v1.StartOperationResponse.Async(
266-
operation_token=result.token,
267-
links=[
268-
temporalio.api.nexus.v1.Link(url=l.url, type=l.type)
269-
for l in result.links
270-
],
271-
)
248+
async def run() -> temporalio.bridge.proto.nexus.NexusTaskCompletion:
249+
try:
250+
result = await start(input, options)
251+
except BaseException as err:
252+
# TODO(dan): should encode_failure be called here?? (It accepts the
253+
# api.Failure proto struct, not the Nexus one.)
254+
# await self._data_converter.encode_failure(err, completion.error.failure)
255+
return temporalio.bridge.proto.nexus.NexusTaskCompletion(
256+
task_token=task_token,
257+
error=temporalio.api.nexus.v1.HandlerError(
258+
# TODO(dan): what should error_type be? See `Predefined Handler Errors` in spec
259+
error_type="INTERNAL",
260+
failure=temporalio.api.nexus.v1.Failure(
261+
message=str(err),
262+
metadata={},
263+
details=b"",
264+
),
265+
),
272266
)
273267
else:
274-
# TODO(dan): are we going to use StartOperationSyncResult from nexusrpc?
275-
# (contains links and headers in addition to result) IIRC Go does something
276-
# like that.
277-
[payload] = await self._data_converter.encode([result])
278-
op_resp = temporalio.api.nexus.v1.StartOperationResponse(
279-
sync_success=temporalio.api.nexus.v1.StartOperationResponse.Sync(
280-
payload=payload
268+
if isinstance(result, nexusrpc.handler.StartOperationAsyncResult):
269+
print(f"🟢 Nexus operation started with async response {result}")
270+
op_resp = temporalio.api.nexus.v1.StartOperationResponse(
271+
async_success=temporalio.api.nexus.v1.StartOperationResponse.Async(
272+
operation_token=result.token,
273+
links=[
274+
temporalio.api.nexus.v1.Link(url=l.url, type=l.type)
275+
for l in result.links
276+
],
277+
)
278+
)
279+
else:
280+
# TODO(dan): are we going to use StartOperationSyncResult from nexusrpc?
281+
# (contains links and headers in addition to result) IIRC Go does something
282+
# like that.
283+
[payload] = await self._data_converter.encode([result])
284+
op_resp = temporalio.api.nexus.v1.StartOperationResponse(
285+
sync_success=temporalio.api.nexus.v1.StartOperationResponse.Sync(
286+
payload=payload
287+
)
281288
)
289+
return temporalio.bridge.proto.nexus.NexusTaskCompletion(
290+
task_token=task_token,
291+
completed=temporalio.api.nexus.v1.Response(start_operation=op_resp),
282292
)
283-
completion.completed.CopyFrom(
284-
temporalio.api.nexus.v1.Response(start_operation=op_resp)
285-
)
293+
294+
try:
295+
completion = await run()
286296
await self._bridge_worker().complete_nexus_task(completion)
287297
del self._running_operations[task_token]
288298
except Exception:
289-
temporalio.nexus.logger.exception(
290-
"Failed to send Nexus operation completion"
291-
)
299+
temporalio.nexus.logger.exception("Failed completing Nexus operation")
292300

293301
async def _handle_cancel_operation(
294302
self, request: temporalio.api.nexus.v1.CancelOperationRequest, task_token: bytes

0 commit comments

Comments
 (0)