Skip to content

Commit e40f456

Browse files
committed
fixup! Add support for worker state callbacks
1 parent 5dd64d0 commit e40f456

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

src/cluster.jl

+9-4
Original file line numberDiff line numberDiff line change
@@ -1009,13 +1009,18 @@ remove_worker_exiting_callback(key) = _remove_callback(key, worker_exiting_callb
10091009
10101010
Register a callback to be called on the master process when a worker has exited
10111011
for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker
1012-
segfaulting etc). The callback will be called with the worker ID,
1013-
e.g. `f(w::Int)`. Chooses and returns a unique key for the callback if `key` is
1012+
segfaulting etc). Chooses and returns a unique key for the callback if `key` is
10141013
not specified.
10151014
1015+
The callback will be called with the worker ID and the final
1016+
`Distributed.WorkerState` of the worker, e.g. `f(w::Int, state)`. `state` is an
1017+
enum, a value of `WorkerState_terminated` means a graceful exit and a value of
1018+
`WorkerState_exterminated` means the worker died unexpectedly.
1019+
10161020
If the callback throws an exception it will be caught and printed.
10171021
"""
1018-
add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks)
1022+
add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks;
1023+
arg_types=Tuple{Int, WorkerState})
10191024

10201025
"""
10211026
remove_worker_exited_callback(key)
@@ -1426,7 +1431,7 @@ function deregister_worker(pg, pid)
14261431
if myid() == 1
14271432
for (name, callback) in worker_exited_callbacks
14281433
try
1429-
callback(pid)
1434+
callback(pid, w.state)
14301435
catch ex
14311436
@error "Error when running worker-exited callback '$(name)'" exception=(ex, catch_backtrace())
14321437
end

test/distributed_exec.jl

+14-3
Original file line numberDiff line numberDiff line change
@@ -1946,11 +1946,11 @@ end
19461946
starting_managers = []
19471947
started_workers = Int[]
19481948
exiting_workers = Int[]
1949-
exited_workers = Int[]
1949+
exited_workers = []
19501950
starting_key = DistributedNext.add_worker_starting_callback((manager, kwargs) -> push!(starting_managers, manager))
19511951
started_key = DistributedNext.add_worker_started_callback(pid -> (push!(started_workers, pid); error("foo")))
19521952
exiting_key = DistributedNext.add_worker_exiting_callback(pid -> push!(exiting_workers, pid))
1953-
exited_key = DistributedNext.add_worker_exited_callback(pid -> push!(exited_workers, pid))
1953+
exited_key = DistributedNext.add_worker_exited_callback((pid, state) -> push!(exited_workers, (pid, state)))
19541954

19551955
# Test that the worker-started exception bubbles up
19561956
@test_throws TaskFailedException addprocs(1)
@@ -1960,7 +1960,7 @@ end
19601960
@test started_workers == [pid]
19611961
rmprocs(workers())
19621962
@test exiting_workers == [pid]
1963-
@test exited_workers == [pid]
1963+
@test exited_workers == [(pid, DistributedNext.WorkerState_terminated)]
19641964

19651965
# Trying to reset an existing callback should fail
19661966
@test_throws ArgumentError DistributedNext.add_worker_started_callback(Returns(nothing); key=started_key)
@@ -1992,6 +1992,17 @@ end
19921992
@test length(started_workers) == 1
19931993
@test length(exiting_workers) == 1
19941994
@test length(exited_workers) == 1
1995+
1996+
# Test that workers that were killed forcefully are detected as such
1997+
exit_state = nothing
1998+
DistributedNext.add_worker_exited_callback((pid, state) -> exit_state = state)
1999+
pid = only(addprocs(1))
2000+
2001+
redirect_stderr(devnull) do
2002+
remote_do(exit, pid)
2003+
timedwait(() -> !isnothing(exit_state), 10)
2004+
end
2005+
@test exit_state == DistributedNext.WorkerState_exterminated
19952006
end
19962007

19972008
# This is a simplified copy of a test from Revise.jl's tests

0 commit comments

Comments
 (0)