From 156dfec47cd9eb17124129ee43cfd18315b2401b Mon Sep 17 00:00:00 2001 From: blublinsky Date: Tue, 24 Sep 2024 13:40:02 +0100 Subject: [PATCH] additional testing --- .../src/data_processing_ray/runtime/ray/ray_utils.py | 12 ++++++++++-- .../universal/noop/ray/src/noop_transform_ray.py | 3 +-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index f42047895..741f1cfa3 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -11,11 +11,11 @@ ################################################################################ import logging -import sys import time from typing import Any import ray +from ray.experimental.state.api import list_actors from data_processing.utils import GB, UnrecoverableException from ray.actor import ActorHandle from ray.exceptions import RayError @@ -109,7 +109,14 @@ def operator() -> ActorHandle: time.sleep(creation_delay) return clazz.options(**actor_options).remote(params) - return [operator() for _ in range(n_actors)] + cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','') + actors = [operator() for _ in range(n_actors)] + time.sleep(1) + alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")]) + if len(actors) == len(alive): + return actors + print(f"created {actors}, alive {alive}") + raise UnrecoverableException(f"out of {len(actors)} created classes only {len(alive)} alive") @staticmethod def process_files( @@ -220,6 +227,7 @@ def process_files( def wait_for_execution_completion(logger: logging.Logger, replies: list[ray.ObjectRef]) -> int: """ Wait for all requests completed + :param logger: logger to use :param replies: list of request futures :return: None """ diff --git a/transforms/universal/noop/ray/src/noop_transform_ray.py b/transforms/universal/noop/ray/src/noop_transform_ray.py index 71aaf672d..2160090f5 100644 --- a/transforms/universal/noop/ray/src/noop_transform_ray.py +++ b/transforms/universal/noop/ray/src/noop_transform_ray.py @@ -10,7 +10,7 @@ # limitations under the License. ################################################################################ -from data_processing.utils import CLIArgumentProvider, get_logger +from data_processing.utils import get_logger from data_processing_ray.runtime.ray import RayTransformLauncher from data_processing_ray.runtime.ray.runtime_configuration import ( RayTransformRuntimeConfiguration, @@ -31,7 +31,6 @@ class NOOPRayTransformConfiguration(RayTransformRuntimeConfiguration): def __init__(self): """ Initialization - :param base_configuration - base configuration class """ super().__init__(transform_config=NOOPTransformConfiguration())