Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the calculation of the desired ray actors #654

Merged
merged 8 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def operator() -> ActorHandle:

cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','')
actors = [operator() for _ in range(n_actors)]
for i in range(60):
for i in range(120):
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")])
if len(actors) == len(alive):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def default_compute_execution_params(
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_cpu = int((cluster_cpu - 1) * 0.7 / a_options.get("num_cpus", 0.5))
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
Expand Down
2 changes: 1 addition & 1 deletion transforms/universal/ededup/kfp_ray/ededup_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def ededup(
ededup_hash_cpu: float = 0.5,
ededup_doc_column: str = "contents",
ededup_use_snapshot: bool = False,
ededup_snapshot_directory: str = None,
ededup_snapshot_directory: str = "",
# data sampling
ededup_n_samples: int = 10,
# additional parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.
################################################################################


def ededup_compute_execution_params(
worker_options: dict, # ray worker configuration
actor_options: dict, # actor's resource requirements
Expand Down Expand Up @@ -94,9 +95,9 @@ def ededup_compute_execution_params(
)
sys.exit(1)
# Define number of workers
n_workers = int((0.85 * cluster_cpu - required_hash_cpu) / actor_cpu)
n_workers = int((0.85 * (cluster_cpu - 1) - required_hash_cpu) / actor_cpu)
print(f"Number of workers - {n_workers}")
if n_workers < 2:
if n_workers <= 0:
print(f"Cluster is too small - estimated number of workers {n_workers}")
sys.exit(1)
# Limit amount of workers and processors to prevent S3 saturation
Expand All @@ -110,6 +111,9 @@ def ededup_compute_execution_params(
print(f"Try to increase the size of the cluster or increase size of the cpu per worker")
sys.exit(1)
print(f"Projected execution time {EXECUTION_OF_KB_DOC * avg_doc_size * number_of_docs / n_workers / 60} min")
# process None able parameters
if ededup_snapshot_directory is None or len(ededup_snapshot_directory) <= 1:
ededup_snapshot_directory = None
return {
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
doc_column_name_cli_param = f"{cli_prefix}{doc_column_name_key}"
int_column_name_cli_param = f"{cli_prefix}{int_column_name_key}"
use_snapshot_cli_param = f"{cli_prefix}{use_snapshot_key}"
snapshot_directory_cli_param = f"--{cli_prefix}{snapshot_directory_key}"
snapshot_directory_cli_param = f"{cli_prefix}{snapshot_directory_key}"

class HashFilter:
"""
Expand Down
2 changes: 1 addition & 1 deletion transforms/universal/fdedup/kfp_ray/fdedup_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def fdedup(
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: dict = {"num_cpus": 0.8},
runtime_actor_options: dict = {"num_cpus": 0.7},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# columns used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float:
cluster_cpu = worker_options["replicas"] * worker_options["cpu"]
cluster_memory = worker_options["replicas"] * worker_options["memory"]
print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}")
cluster_cpu *= 0.85
cluster_cpu -= 1
cluster_memory *= 0.85
# get actor requirements
actor_cpu = actor_options["num_cpus"]
Expand Down Expand Up @@ -172,7 +172,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float:
n_preprocessors = int(
(0.85 * cluster_cpu - b_actors * bucket_cpu - m_actors * mhash_cpu - d_actors * doc_cpu) / actor_cpu
)
if n_preprocessors < 0:
if n_preprocessors <= 0:
print(f"Not enough CPUs to run fuzzy de duping, computed number of workers is {n_preprocessors}")
print(f"Required bucket actors {b_actors}, minhash actors {m_actors}, document actors {d_actors}")
print("Try to increase the size of the cluster")
Expand Down
4 changes: 0 additions & 4 deletions transforms/universal/hap/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,12 @@ docker-save-image::

.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C kfp_ray workflow-venv

.PHONY: workflow-test
workflow-test:
$(MAKE) -C kfp_ray workflow-test

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C kfp_ray workflow-upload

.PHONY: workflow-build
workflow-build:
$(MAKE) -C kfp_ray workflow-build
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def profiler_compute_execution_params(
cluster_cpu = w_options["replicas"] * w_options["cpu"]
cluster_memory = w_options["replicas"] * w_options["memory"]
print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}")
cluster_cpu *= 0.85
cluster_cpu -= 1
cluster_memory *= 0.85
# get actor requirements
a_options = actor_options
Expand All @@ -82,7 +82,7 @@ def profiler_compute_execution_params(
n_aggregators = math.ceil(number_of_docs * 32 / GB)
print(f"Estimated Required hashes {n_aggregators}")
print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}")
required_aggregator_cpu = n_aggregators * aggregator_cpu
required_aggregator_cpu = math.ceil(n_aggregators * aggregator_cpu)
required_hash_mem = n_aggregators * 2
if required_aggregator_cpu > cluster_cpu or required_hash_mem > cluster_memory:
print(
Expand All @@ -93,7 +93,7 @@ def profiler_compute_execution_params(
# Define number of workers
n_workers = int((0.85 * cluster_cpu - required_aggregator_cpu) / actor_cpu)
print(f"Number of workers - {n_workers}")
if n_workers < 2:
if n_workers <= 0:
print(f"Cluster is too small - estimated number of workers {n_workers}")
sys.exit(1)
# Limit amount of workers and processors to prevent S3 saturation
Expand Down