Skip to content

Commit

Permalink
Change the calculation of the desired ray actors (#654)
Browse files Browse the repository at this point in the history
* Fix the calculation of the desired ray actors.

Signed-off-by: Revital Sur <[email protected]>
Co-authored-by: Boris Lublinsky <[email protected]>

* Fix hap workflow Makefile commands.

Signed-off-by: Revital Sur <[email protected]>

* More change.

Signed-off-by: Revital Sur <[email protected]>
Co-authored-by: Boris Lublinsky <[email protected]>

* Additional fix.

Signed-off-by: Revital Sur <[email protected]>

* additional change.

Signed-off-by: Revital Sur <[email protected]>
Co-authored-by: Boris Lublinsky <[email protected]>

* Cherry pick Boris's 05b97fe commit to removed None not supported by kfpV2.

Signed-off-by: Revital Sur <[email protected]>
Co-authored-by: Boris Lublinsky <[email protected]>

* Minor fix in ededup_transform_base.py

Signed-off-by: Revital Sur <[email protected]>
Co-authored-by: Boris Lublinsky <[email protected]>

* Disable pii Makefile.

Signed-off-by: Revital Sur <[email protected]>

---------

Signed-off-by: Revital Sur <[email protected]>
Co-authored-by: Boris Lublinsky <[email protected]>
  • Loading branch information
revit13 and Boris Lublinsky authored Oct 3, 2024
1 parent f0f46de commit 0a9fe37
Show file tree
Hide file tree
Showing 10 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def operator() -> ActorHandle:

cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','')
actors = [operator() for _ in range(n_actors)]
for i in range(60):
for i in range(120):
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")])
if len(actors) == len(alive):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def default_compute_execution_params(
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_cpu = int((cluster_cpu - 1) * 0.7 / a_options.get("num_cpus", 0.5))
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion transforms/universal/ededup/kfp_ray/ededup_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def ededup(
ededup_hash_cpu: float = 0.5,
ededup_doc_column: str = "contents",
ededup_use_snapshot: bool = False,
ededup_snapshot_directory: str = None,
ededup_snapshot_directory: str = "",
# data sampling
ededup_n_samples: int = 10,
# additional parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.
################################################################################


def ededup_compute_execution_params(
worker_options: dict, # ray worker configuration
actor_options: dict, # actor's resource requirements
Expand Down Expand Up @@ -94,9 +95,9 @@ def ededup_compute_execution_params(
)
sys.exit(1)
# Define number of workers
n_workers = int((0.85 * cluster_cpu - required_hash_cpu) / actor_cpu)
n_workers = int((0.85 * (cluster_cpu - 1) - required_hash_cpu) / actor_cpu)
print(f"Number of workers - {n_workers}")
if n_workers < 2:
if n_workers <= 0:
print(f"Cluster is too small - estimated number of workers {n_workers}")
sys.exit(1)
# Limit amount of workers and processors to prevent S3 saturation
Expand All @@ -110,6 +111,9 @@ def ededup_compute_execution_params(
print(f"Try to increase the size of the cluster or increase size of the cpu per worker")
sys.exit(1)
print(f"Projected execution time {EXECUTION_OF_KB_DOC * avg_doc_size * number_of_docs / n_workers / 60} min")
# process None able parameters
if ededup_snapshot_directory is None or len(ededup_snapshot_directory) <= 1:
ededup_snapshot_directory = None
return {
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
doc_column_name_cli_param = f"{cli_prefix}{doc_column_name_key}"
int_column_name_cli_param = f"{cli_prefix}{int_column_name_key}"
use_snapshot_cli_param = f"{cli_prefix}{use_snapshot_key}"
snapshot_directory_cli_param = f"--{cli_prefix}{snapshot_directory_key}"
snapshot_directory_cli_param = f"{cli_prefix}{snapshot_directory_key}"

class HashFilter:
"""
Expand Down
2 changes: 1 addition & 1 deletion transforms/universal/fdedup/kfp_ray/fdedup_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def fdedup(
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: dict = {"num_cpus": 0.8},
runtime_actor_options: dict = {"num_cpus": 0.7},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# columns used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float:
cluster_cpu = worker_options["replicas"] * worker_options["cpu"]
cluster_memory = worker_options["replicas"] * worker_options["memory"]
print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}")
cluster_cpu *= 0.85
cluster_cpu -= 1
cluster_memory *= 0.85
# get actor requirements
actor_cpu = actor_options["num_cpus"]
Expand Down Expand Up @@ -172,7 +172,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float:
n_preprocessors = int(
(0.85 * cluster_cpu - b_actors * bucket_cpu - m_actors * mhash_cpu - d_actors * doc_cpu) / actor_cpu
)
if n_preprocessors < 0:
if n_preprocessors <= 0:
print(f"Not enough CPUs to run fuzzy de duping, computed number of workers is {n_preprocessors}")
print(f"Required bucket actors {b_actors}, minhash actors {m_actors}, document actors {d_actors}")
print("Try to increase the size of the cluster")
Expand Down
4 changes: 0 additions & 4 deletions transforms/universal/hap/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,12 @@ docker-save-image::

.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C kfp_ray workflow-venv

.PHONY: workflow-test
workflow-test:
$(MAKE) -C kfp_ray workflow-test

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C kfp_ray workflow-upload

.PHONY: workflow-build
workflow-build:
$(MAKE) -C kfp_ray workflow-build
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def profiler_compute_execution_params(
cluster_cpu = w_options["replicas"] * w_options["cpu"]
cluster_memory = w_options["replicas"] * w_options["memory"]
print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}")
cluster_cpu *= 0.85
cluster_cpu -= 1
cluster_memory *= 0.85
# get actor requirements
a_options = actor_options
Expand All @@ -82,7 +82,7 @@ def profiler_compute_execution_params(
n_aggregators = math.ceil(number_of_docs * 32 / GB)
print(f"Estimated Required hashes {n_aggregators}")
print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}")
required_aggregator_cpu = n_aggregators * aggregator_cpu
required_aggregator_cpu = math.ceil(n_aggregators * aggregator_cpu)
required_hash_mem = n_aggregators * 2
if required_aggregator_cpu > cluster_cpu or required_hash_mem > cluster_memory:
print(
Expand All @@ -93,7 +93,7 @@ def profiler_compute_execution_params(
# Define number of workers
n_workers = int((0.85 * cluster_cpu - required_aggregator_cpu) / actor_cpu)
print(f"Number of workers - {n_workers}")
if n_workers < 2:
if n_workers <= 0:
print(f"Cluster is too small - estimated number of workers {n_workers}")
sys.exit(1)
# Limit amount of workers and processors to prevent S3 saturation
Expand Down

0 comments on commit 0a9fe37

Please sign in to comment.