21
21
from temporalio .common import WorkflowIDConflictPolicy
22
22
from temporalio .testing import WorkflowEnvironment
23
23
from temporalio .worker import Worker
24
- from tests .helpers import print_interleaved_histories
25
24
from tests .helpers .nexus import create_nexus_endpoint , make_nexus_endpoint_name
26
25
27
26
@@ -50,10 +49,10 @@ async def run(self) -> None:
50
49
try :
51
50
await asyncio .Future ()
52
51
except asyncio .CancelledError :
53
- if (
54
- test_context . cancellation_type
55
- == workflow .NexusOperationCancellationType .WAIT_REQUESTED
56
- ) :
52
+ if test_context . cancellation_type in [
53
+ workflow . NexusOperationCancellationType . TRY_CANCEL ,
54
+ workflow .NexusOperationCancellationType .WAIT_REQUESTED ,
55
+ ] :
57
56
# We want to prove that the caller op future can be resolved before the operation
58
57
# (i.e. its backing workflow) is cancelled.
59
58
await self .caller_op_future_resolved .wait ()
@@ -193,10 +192,10 @@ async def run(self, input: Input) -> CancellationResult:
193
192
datetime .now (timezone .utc )
194
193
)
195
194
assert op_handle .operation_token
196
- if (
197
- input . cancellation_type
198
- == workflow .NexusOperationCancellationType .WAIT_REQUESTED
199
- ) :
195
+ if input . cancellation_type in [
196
+ workflow . NexusOperationCancellationType . TRY_CANCEL ,
197
+ workflow .NexusOperationCancellationType .WAIT_REQUESTED ,
198
+ ] :
200
199
# We want to prove that the future can be unblocked before the handler workflow is
201
200
# cancelled. Send a signal, so that handler workflow can wait for it.
202
201
await workflow .get_external_workflow_handle_for (
@@ -307,7 +306,6 @@ async def check_behavior_for_abandon(
307
306
caller_wf ,
308
307
EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ,
309
308
)
310
- await print_interleaved_histories ([caller_wf , handler_wf ])
311
309
312
310
313
311
async def check_behavior_for_try_cancel (
@@ -326,24 +324,16 @@ async def check_behavior_for_try_cancel(
326
324
else :
327
325
pytest .fail ("Expected WorkflowFailureError" )
328
326
await caller_wf .signal (CallerWorkflow .release )
329
- result = await caller_wf .result ()
327
+ await caller_wf .result ()
330
328
331
329
handler_status = (await handler_wf .describe ()).status
332
330
assert handler_status == WorkflowExecutionStatus .CANCELED
333
331
caller_op_future_resolved = test_context .caller_op_future_resolved .result ()
334
- cancel_handler_released = test_context .cancel_handler_released .result ()
335
- await print_interleaved_histories (
336
- [caller_wf , handler_wf ],
337
- extra_events = [
338
- (caller_wf , "Future unblocked" , caller_op_future_resolved ),
339
- (handler_wf , "Cancel handler released" , cancel_handler_released ),
340
- ],
341
- )
342
332
await _assert_event_subsequence (
343
333
[
344
- (caller_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_STARTED ),
345
334
(caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ),
346
335
(caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED ),
336
+ (caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCELED ),
347
337
]
348
338
)
349
339
op_cancel_requested_event = await _get_event_time (
@@ -385,10 +375,9 @@ async def check_behavior_for_wait_cancellation_requested(
385
375
assert handler_status == WorkflowExecutionStatus .CANCELED
386
376
await _assert_event_subsequence (
387
377
[
388
- (caller_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_STARTED ),
389
378
(caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ),
379
+ (caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED ),
390
380
(caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCELED ),
391
- (caller_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED ),
392
381
]
393
382
)
394
383
caller_op_future_resolved = test_context .caller_op_future_resolved .result ()
@@ -400,12 +389,6 @@ async def check_behavior_for_wait_cancellation_requested(
400
389
handler_wf ,
401
390
EventType .EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED ,
402
391
)
403
- await print_interleaved_histories (
404
- [caller_wf , handler_wf ],
405
- extra_events = [
406
- (caller_wf , "Future unblocked" , caller_op_future_resolved ),
407
- ],
408
- )
409
392
assert op_cancel_request_completed < caller_op_future_resolved < op_canceled
410
393
411
394
@@ -417,7 +400,6 @@ async def check_behavior_for_wait_cancellation_completed(
417
400
Check that a cancellation request is sent and the caller workflow nexus operation future is
418
401
unblocked after the operation is canceled.
419
402
"""
420
- # For WAIT_COMPLETED, wait for the handler workflow to complete
421
403
try :
422
404
await handler_wf .result ()
423
405
except WorkflowFailureError as err :
@@ -429,7 +411,7 @@ async def check_behavior_for_wait_cancellation_completed(
429
411
assert handler_status == WorkflowExecutionStatus .CANCELED
430
412
431
413
await caller_wf .signal (CallerWorkflow .release )
432
- result = await caller_wf .result ()
414
+ await caller_wf .result ()
433
415
434
416
await _assert_event_subsequence (
435
417
[
@@ -449,15 +431,7 @@ async def check_behavior_for_wait_cancellation_completed(
449
431
handler_wf ,
450
432
EventType .EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED ,
451
433
)
452
- await print_interleaved_histories (
453
- [caller_wf , handler_wf ],
454
- extra_events = [
455
- (caller_wf , "Future unblocked" , caller_op_future_resolved ),
456
- ],
457
- )
458
- assert (
459
- caller_op_future_resolved > handler_wf_canceled_event_time
460
- ), "For WAIT_COMPLETED, the future should be unblocked after handler workflow cancellation. "
434
+ assert caller_op_future_resolved > handler_wf_canceled_event_time
461
435
462
436
463
437
async def _has_event (wf_handle : WorkflowHandle , event_type : EventType .ValueType ):
0 commit comments