-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unify activity interruption #239
base: master
Are you sure you want to change the base?
Conversation
35ad297
to
c8e989f
Compare
fce01e3
to
4214b9b
Compare
Rebased master to resolve conflicts with heartbeat throttling and to get test reliability fixes |
78f0a48
to
1b776a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jeffschoner, thank you for the PR!
I've pointed out a few things inline, but the notable decision we'll need to make is around the cancellation behaviour in general. Please let me know if you wanna sync on it and figure out a path forward since it's a breaking change potentially.
# Returns true if the activity has been canceled directly by its workflow or indirectly by | ||
# its workflow being terminated. This will only be set following a call to heartbeat that is | ||
# not throttled. | ||
def cancel_requested |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's go with cancel_requested?
to be consistent with other query-type methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we may want to call thins cancelled?
instead based on the sdk-ruby API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@antstorm This is just moving an existing method from an attr_reader to a proper method so that I can more clearly put a comment on it. I'm happy to alias this as cancelled?
and mark the existing method as deprecated if that's the direction you want to move instead.
require 'temporal/uuid' | ||
require 'temporal/activity/async_token' | ||
|
||
module Temporal | ||
class Activity | ||
class Context | ||
def initialize(connection, metadata, config, heartbeat_thread_pool) | ||
def initialize(connection, metadata, config, heartbeat_thread_pool, is_shutting_down) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is_shutting_down
-> shutting_down_proc
? Otherwise it feels like a boolean is being passed
# This returns true if the worker has started shutting down upon receiving a | ||
# TERM or INT signal. Once this happens, your activity should finishing processing | ||
# quickly or raise an error to fail the activity attempt. | ||
def shutting_down? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the new SDK we've taken a slightly different approach and I wonder if it might be a good idea to steer this into the same direction. Basically instead of asking whether the worker is shutting down, the worker would attempt to cancel the activity (similar process to activity cancellation, except using a different exception).
The main reason here is to make sure that cancellations are explicitly handled (including explicitly ignored). By exposing the shutting_down?
method we're still defaulting to unhandled behaviour and I assume most activities won't use it at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be curious to learn more about the API and mechanics of this. Is the idea that the worker propagates the cancellation request to the activity code and the activity then acks the cancellation once it's ready to shutdown? How is it different than simply raising an error that can be rescued and re-raised later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent some time looking through the sdk-ruby code. This is much clearer to me now. I agree this pattern should work better.
However, I do have concerns about changing the behavior of the existing API where there are currently no errors raised either directly by the heartbeat method or through thread interruption. For now, here's how I'm thinking to deal with this:
- Add these methods onto the activity context:
cancel
for communicating cancellation reasons via an error argumentcancelled?
an alias tocancel_requested
that returns a Boolean. This can be checked by activities that don't want to deal with errors being raised byheartbeat!
or who don't want to heartbeat at all, but rather check for cancellation at certain critical points of the activity.
- Use the
cancel
method to communicate cancellation to the context for the three cases in this PR. This should help clean up some icky code likeis_shutting_down_proc
- Eliminate
ActivityInterrupted
, instead makingActivityCanceled
the base class for naming consistency. Having subclassed errors is still useful, but I'm open to collapsing these into a single error like in sdk-ruby too. - For now, instead of raising an error onto the thread, this errors will only be raised out of the
heartbeat_interrupted
/heartbeat!
method added in this PR already - Eventually, an activity or worker option can be added for thread interruption behavior. A
shield
method can also be written to protect certain sections of activity code from being interrupted.
|
||
# Activity cancellation because the workflow canceled the activity or because the workflow | ||
# was terminated, is only communicated back to the activity by heartbeating. | ||
if activity.cancel_requested |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to get this inline with https://github.com/temporalio/sdk-ruby/blob/main/lib/temporalio/activity/context.rb#L73
# shutting down. This flag defaults to false, in which these states can be detected by | ||
# inspecting the heartbeat response for cancelation, or calling timed_out? or shutting_down? | ||
# methods. | ||
def heartbeat_interrupted(details = nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: heartbeat!
? Similar to Rails' save!
and update!
which raise in case of a failure
# inspecting the heartbeat response for cancelation, or calling timed_out? or shutting_down? | ||
# methods. | ||
def heartbeat_interrupted(details = nil) | ||
if deadline_exceeded?(schedule_to_close_deadline) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Semantically it's a bit weird to couple timeouts and shutdowns to the heartbeat mechanism — they don't really have anything to do with one another. Therefore a cancellation mechanism (similar to other SDKs) with a specific exception type might be a better solution overall (see my other comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are disparate, but this is basically the single point where activity execution can be pre-empted without resorting to something like Thread.raise
which is much more disruptive to the existing activity programming model. There is precedent for this in the Java SDK, including guidance on the heartbeat method to rescue/catch the super class of all these errors/exceptions as the proper way to handle interruption.
It sounds like the cancellation mechanism you talk about would allow for one way to be made aware of cancellation/interruption based on any of the 3 pre-emption reasons: worker shutdown, activity cancellation, activity task timeout.
@@ -21,16 +21,21 @@ def initialize(namespace, task_queue, activity_lookup, config, middleware = [], | |||
@config = config | |||
@middleware = middleware | |||
@shutting_down = false | |||
@shutting_down_mutex = Mutex.new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this mutex is actually needed since it only wraps accessing the @shutting_down
instance variable, which itself is pretty much atomic and won't cause a context switch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to remove it if you don't think it's necessary
@antstorm I'm mostly familiar with how the Java SDK handles this. I have a related PR over there right now to add timeout detection too (temporalio/sdk-java#1771), where there's another discussion going on about the correct model. Do you have a code pointer to the sdk-ruby or one of the other new SDKs for the suggested cancellation API? Or perhaps a sketch of what you're thinking? |
Summary
Activities can be interrupted for three reasons:
This change,
ActivityInterrupted
that can be raised by callingheartbeat_interrupted
. This provides a convenient way of taking an automatic behavior when an activity should be interrupted. For those that want more granular control, the individual errors can be rescued or methods on the activity context can be used to inspect each case.shutting_down?
method on the activity context. This is wired into the activity task processor which already was tracking shutdown state. That code is now wrapped in a mutex since it is now accessed by multiple threads.Testing
There is a new example test for worker shutdown:
This test is novel in that it spins up its own workers and does not rely on those started via
bin/worker
. The activity worker must be spun down while the workflow worker continues to run, in order for the test to pass. Moreover, the workflow and activity used here are not interesting as samples in their own right.A similar standalone test has also been added for activity timeout:
This test needs to understand the error raising behavior inside an activity since the workflow will only see the activity timeout from Temporal server. This isn't possible unless the worker is running in the same process as the spec.
There are many new and updated unit specs in,