Skip to content

Commit

Permalink
additional testing
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Sep 24, 2024
1 parent d44d552 commit 156dfec
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
################################################################################

import logging
import sys
import time
from typing import Any

import ray
from ray.experimental.state.api import list_actors
from data_processing.utils import GB, UnrecoverableException
from ray.actor import ActorHandle
from ray.exceptions import RayError
Expand Down Expand Up @@ -109,7 +109,14 @@ def operator() -> ActorHandle:
time.sleep(creation_delay)
return clazz.options(**actor_options).remote(params)

return [operator() for _ in range(n_actors)]
cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','')
actors = [operator() for _ in range(n_actors)]
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")])
if len(actors) == len(alive):
return actors
print(f"created {actors}, alive {alive}")
raise UnrecoverableException(f"out of {len(actors)} created classes only {len(alive)} alive")

@staticmethod
def process_files(
Expand Down Expand Up @@ -220,6 +227,7 @@ def process_files(
def wait_for_execution_completion(logger: logging.Logger, replies: list[ray.ObjectRef]) -> int:
"""
Wait for all requests completed
:param logger: logger to use
:param replies: list of request futures
:return: None
"""
Expand Down
3 changes: 1 addition & 2 deletions transforms/universal/noop/ray/src/noop_transform_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# limitations under the License.
################################################################################

from data_processing.utils import CLIArgumentProvider, get_logger
from data_processing.utils import get_logger
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing_ray.runtime.ray.runtime_configuration import (
RayTransformRuntimeConfiguration,
Expand All @@ -31,7 +31,6 @@ class NOOPRayTransformConfiguration(RayTransformRuntimeConfiguration):
def __init__(self):
"""
Initialization
:param base_configuration - base configuration class
"""
super().__init__(transform_config=NOOPTransformConfiguration())

Expand Down

0 comments on commit 156dfec

Please sign in to comment.