Skip to content

Commit a16bb03

Browse files
committed
fixup! Add support for worker state callbacks
1 parent 449077e commit a16bb03

File tree

1 file changed

+17
-27
lines changed

1 file changed

+17
-27
lines changed

src/cluster.jl

+17-27
Original file line numberDiff line numberDiff line change
@@ -869,46 +869,36 @@ end
869869

870870
# Callbacks
871871

872-
# We define the callback methods in a loop here and add docstrings for them afterwards
873-
for callback_type in (:added, :exiting, :exited)
874-
let add_name = Symbol(:add_worker_, callback_type, :_callback),
875-
remove_name = Symbol(:remove_worker_, callback_type, :_callback),
876-
dict_name = Symbol(:worker_, callback_type, :_callbacks)
877-
878-
@eval begin
879-
function $add_name(f::Base.Callable; key=nothing)
880-
if !hasmethod(f, Tuple{Int})
881-
throw(ArgumentError("Callback function is invalid, it must be able to accept a single Int argument"))
882-
end
883-
884-
if isnothing(key)
885-
key = Symbol(gensym(), nameof(f))
886-
end
887-
888-
$dict_name[key] = f
889-
return key
890-
end
872+
function _add_callback(f, key, dict)
873+
if !hasmethod(f, Tuple{Int})
874+
throw(ArgumentError("Callback function is invalid, it must be able to accept a single Int argument"))
875+
end
891876

892-
$remove_name(key) = delete!($dict_name, key)
893-
end
877+
if isnothing(key)
878+
key = Symbol(gensym(), nameof(f))
894879
end
880+
881+
dict[key] = f
882+
return key
895883
end
896884

885+
_remove_callback(key, dict) = delete!(dict, key)
886+
897887
"""
898888
add_worker_added_callback(f::Base.Callable; key=nothing)
899889
900890
Register a callback to be called on the master process whenever a worker is
901891
added. The callback will be called with the added worker ID,
902892
e.g. `f(w::Int)`. Returns a unique key for the callback.
903893
"""
904-
function add_worker_added_callback end
894+
add_worker_added_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_added_callbacks)
905895

906896
"""
907897
remove_worker_added_callback(key)
908898
909899
Remove the callback for `key`.
910900
"""
911-
function remove_worker_added_callback end
901+
remove_worker_added_callback(key) = _remove_callback(key, worker_added_callbacks)
912902

913903
"""
914904
add_worker_exiting_callback(f::Base.Callable; key=nothing)
@@ -921,14 +911,14 @@ All callbacks will be executed asynchronously and if they don't all finish
921911
before the `callback_timeout` passed to `rmprocs()` then the process will be
922912
removed anyway.
923913
"""
924-
function add_worker_exiting_callback end
914+
add_worker_exiting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exiting_callbacks)
925915

926916
"""
927917
remove_worker_exiting_callback(key)
928918
929919
Remove the callback for `key`.
930920
"""
931-
function remove_worker_exiting_callback end
921+
remove_worker_exiting_callback(key) = _remove_callback(key, worker_exiting_callbacks)
932922

933923
"""
934924
add_worker_exited_callback(f::Base.Callable; key=nothing)
@@ -938,14 +928,14 @@ for any reason (i.e. not only because of [`rmprocs()`](@ref) but also the worker
938928
segfaulting etc). The callback will be called with the worker ID,
939929
e.g. `f(w::Int)`. Returns a unique key for the callback.
940930
"""
941-
function add_worker_exited_callback end
931+
add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks)
942932

943933
"""
944934
remove_worker_exited_callback(key)
945935
946936
Remove the callback for `key`.
947937
"""
948-
function remove_worker_exited_callback end
938+
remove_worker_exited_callback(key) = _remove_callback(key, worker_exited_callbacks)
949939

950940
# cluster management related API
951941
"""

0 commit comments

Comments
 (0)