Skip to content

Commit 730ea4b

Browse files
refactor(core): split orchestrate into submodules (#217)
* refactor: split orchestrate to enhance maintainability * fix: typing * refactor: provide functions via orchestrate The refactoring moves general/explore orchestration function to private modules for clarity. However we don't want to expose this private modules external - want to keep the access point orchestrate.py * feat(core): Convert RayTaskError to original error * docs(core): Update exceptions raised orchestrate_explore_operation * feat(core): Improve granularity of exception handling * chore(core): merge log change from main * refactor: update to new import * refactor: expose required members via orchestrate * fix: Access via public interface
1 parent 3262f04 commit 730ea4b

File tree

6 files changed

+877
-772
lines changed

6 files changed

+877
-772
lines changed

.secrets.baseline

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"files": "requirements.txt|^.secrets.baseline$",
44
"lines": null
55
},
6-
"generated_at": "2025-11-12T15:28:45Z",
6+
"generated_at": "2025-11-12T15:32:50Z",
77
"plugins_used": [
88
{
99
"name": "AWSKeyDetector"
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Copyright (c) IBM Corporation
2+
# SPDX-License-Identifier: MIT
3+
4+
import logging
5+
import signal
6+
import typing
7+
8+
import ray
9+
from ray.actor import ActorHandle
10+
11+
shutdown = False
12+
CLEANER_ACTOR = "resource_cleaner"
13+
14+
moduleLog = logging.getLogger("orchestration_cleanup")
15+
16+
17+
def graceful_operation_shutdown():
18+
19+
global shutdown
20+
21+
if not shutdown:
22+
import time
23+
24+
moduleLog.info("Shutting down gracefully")
25+
26+
shutdown = True
27+
28+
moduleLog.debug("Cleanup custom actors")
29+
try:
30+
cleaner_handle = ray.get_actor(name=CLEANER_ACTOR)
31+
ray.get(cleaner_handle.cleanup.remote())
32+
# deleting a cleaner actor. It is detached one, so has to be deleted explicitly
33+
ray.kill(cleaner_handle)
34+
except Exception as e:
35+
moduleLog.warning(f"Failed to cleanup custom actors {e}")
36+
37+
moduleLog.info("Shutting down Ray...")
38+
ray.shutdown()
39+
moduleLog.info("Waiting for logs to flush ...")
40+
time.sleep(10)
41+
moduleLog.info("Graceful shutdown complete")
42+
else:
43+
moduleLog.info("Graceful shutdown already completed")
44+
45+
46+
def graceful_operation_shutdown_handler() -> (
47+
typing.Callable[[int, typing.Any | None], None]
48+
):
49+
50+
def handler(sig, frame):
51+
52+
moduleLog.warning(f"Got signal {sig}")
53+
moduleLog.warning("Calling graceful shutdown")
54+
graceful_operation_shutdown()
55+
56+
return handler
57+
58+
59+
@ray.remote
60+
class ResourceCleaner:
61+
"""
62+
This is a singleton allowing various custom actors to clean up before shutdown,
63+
"""
64+
65+
def __init__(self):
66+
"""
67+
Constructor
68+
"""
69+
# list of handles for the actors to be cleaned
70+
self.to_clean = []
71+
72+
def add_to_cleanup(self, handle: ActorHandle) -> None:
73+
"""
74+
Add to clean up
75+
Can be used by any custom actor to add itself to clean up list. This class has to implement cleanup method
76+
:param handle: handle of the actor to be cleaned
77+
:return: None
78+
"""
79+
self.to_clean.append(handle)
80+
81+
def cleanup(self) -> None:
82+
"""
83+
Clean up all required classes
84+
:return: None
85+
"""
86+
if len(self.to_clean) > 0:
87+
handles = [h.cleanup.remote() for h in self.to_clean]
88+
done, not_done = ray.wait(
89+
ray_waitables=handles, num_returns=len(handles), timeout=60.0
90+
)
91+
moduleLog.info(f"cleaned {len(done)}, clean failed {len(not_done)}")
92+
93+
94+
def initialize_resource_cleaner():
95+
# create a cleaner actor.
96+
# We are creating Named detached actor (https://docs.ray.io/en/latest/ray-core/actors/named-actors.html)
97+
# so that we do not need to pass its handle (can get it by name) and it does not go out of scope, until
98+
# we explicitly kill it
99+
ResourceCleaner.options(
100+
name=CLEANER_ACTOR, get_if_exists=True, lifetime="detached"
101+
).remote()
102+
# Create a default handler that will clean up the ResourceCleaner
103+
# Orchestration functions that require more complex shutdown can replace this handler
104+
signal.signal(
105+
signalnum=signal.SIGTERM, handler=graceful_operation_shutdown_handler()
106+
)

0 commit comments

Comments
 (0)