Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to actor list limit to fix issue 803 #814

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from ray.exceptions import RayError
from ray.util.actor_pool import ActorPool

MAX_LIST = 10000 # Max number of actors returned by list
cmadam marked this conversation as resolved.
Show resolved Hide resolved


class RayUtils:
"""
Expand Down Expand Up @@ -110,15 +112,21 @@ def operator() -> ActorHandle:
return clazz.options(**actor_options).remote(params)

cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','')
current = len(list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=MAX_LIST))
actors = [operator() for _ in range(n_actors)]
for i in range(120):
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")])
if len(actors) == len(alive):
return actors
# failed - raise an exception
print(f"created {actors}, alive {alive}")
raise UnrecoverableException(f"out of {len(actors)} created actors only {len(alive)} alive")
overall = current + n_actors
if overall < MAX_LIST:
cmadam marked this conversation as resolved.
Show resolved Hide resolved
n_list = min(overall + 10, MAX_LIST)
for i in range(120):
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=n_list)
if len(actors) >= len(alive) - current:
cmadam marked this conversation as resolved.
Show resolved Hide resolved
return actors
# failed - raise an exception
print(f"created {actors}, alive {alive}")
raise UnrecoverableException(f"out of {len(actors)} created actors only {len(alive)} alive")
else:
print(f"Overall number of actors of class {clazz.__name__} is {overall}. Skipping readiness check")

@staticmethod
def process_files(
Expand Down