Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nodes toleration to Ray pods #627

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kfp/doc/simple_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The parameters used here are as follows:
* memory - memory
* image - image to use
* image_pull_secret - image pull secret
* tolerations - (optional) tolerations for the ray pods
* ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
* replicas - number of replicas to create
* max_replicas - max number of replicas
Expand All @@ -119,6 +120,7 @@ The parameters used here are as follows:
* memory - memory
* image - image to use
* image_pull_secret - image pull secret
* tolerations - (optional) tolerations for the ray pods
* server_url - server url
* additional_params: additional (support) parameters, containing the following:
* wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
ClusterSpec,
HeadNodeSpec,
RayJobRequest,
Template,
WorkerNodeSpec,
environment_variables_decoder,
template_decoder,
volume_decoder,
)
from ray.job_submission import JobStatus
Expand Down Expand Up @@ -121,41 +121,37 @@ def create_ray_cluster(
"""
# start with templates
# head_node
cpus = head_node.get("cpu", 1)
memory = head_node.get("memory", 1)
gpus = head_node.get("gpu", 0)
accelerator = head_node.get("gpu_accelerator", None)
dct = {}
dct["cpu"] = head_node.get("cpu", 1)
dct["memory"] = head_node.get("memory", 1)
dct["gpu"] = head_node.get("gpu", 0)
dct["gpu_accelerator"] = head_node.get("gpu_accelerator", None)
head_node_template_name = f"{name}-head-template"
_, _ = self.api_server_client.delete_compute_template(ns=namespace, name=head_node_template_name)
head_template = Template(
name=head_node_template_name,
namespace=namespace,
cpu=cpus,
memory=memory,
gpu=gpus,
gpu_accelerator=accelerator,
)
dct["name"] = head_node_template_name
dct["namespace"] = namespace
if "tolerations" in head_node:
dct["tolerations"] = head_node.get("tolerations")
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
_, _ = self.api_server_client.delete_compute_template(ns=namespace, name=dct["name"])
head_template = template_decoder(dct)
status, error = self.api_server_client.create_compute_template(head_template)
if status != 200:
return status, error
worker_template_names = [""] * len(worker_nodes)
index = 0
# For every worker group
for worker_node in worker_nodes:
cpus = worker_node.get("cpu", 1)
memory = worker_node.get("memory", 1)
gpus = worker_node.get("gpu", 0)
accelerator = worker_node.get("gpu_accelerator", None)
dct = {}
dct["cpu"] = worker_node.get("cpu", 1)
dct["memory"] = worker_node.get("memory", 1)
dct["gpu"] = worker_node.get("gpu", 0)
dct["gpu_accelerator"] = worker_node.get("gpu_accelerator", None)
worker_node_template_name = f"{name}-worker-template-{index}"
_, _ = self.api_server_client.delete_compute_template(ns=namespace, name=worker_node_template_name)
worker_template = Template(
name=worker_node_template_name,
namespace=namespace,
cpu=cpus,
memory=memory,
gpu=gpus,
gpu_accelerator=accelerator,
)
dct["name"] = worker_node_template_name
dct["namespace"] = namespace
if "tolerations" in worker_node:
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
dct["tolerations"] = worker_node.get("tolerations")
_, _ = self.api_server_client.delete_compute_template(ns=namespace, name=dct["name"])
worker_template = template_decoder(dct)
status, error = self.api_server_client.create_compute_template(worker_template)
if status != 200:
return status, error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def {{ pipeline_name }}(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -143,6 +144,7 @@ def {{ pipeline_name }}(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def code2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -146,6 +147,7 @@ def code2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/code_quality/kfp_ray/code_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def code_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -143,6 +144,7 @@ def code_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def header_cleanser(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -140,6 +141,7 @@ def header_cleanser(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/malware/kfp_ray/malware_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def malware(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -133,6 +134,7 @@ def malware(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/code/proglang_select/kfp_ray/proglang_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def lang_select(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -136,6 +137,7 @@ def lang_select(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def repo_level_order(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -163,6 +164,7 @@ def repo_level_order(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def doc_chunk(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -139,6 +140,7 @@ def doc_chunk(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def doc_chunk(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -140,6 +141,7 @@ def doc_chunk(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def doc_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -136,6 +137,7 @@ def doc_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/doc_quality/kfp_ray/doc_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def doc_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -136,6 +137,7 @@ def doc_quality(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def lang_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -145,6 +146,7 @@ def lang_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/lang_id/kfp_ray/lang_id_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def lang_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -146,6 +147,7 @@ def lang_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def pdf2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -133,6 +134,7 @@ def pdf2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/pdf2parquet/kfp_ray/pdf2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def pdf2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -137,6 +138,7 @@ def pdf2parquet(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def pii_redactor(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -131,6 +132,7 @@ def pii_redactor(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def text_encoder(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -136,6 +137,7 @@ def text_encoder(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/language/text_encoder/kfp_ray/text_encoder_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def text_encoder(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -137,6 +138,7 @@ def text_encoder(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
2 changes: 2 additions & 0 deletions transforms/universal/doc_id/kfp_ray/doc_id_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def doc_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
Expand All @@ -147,6 +148,7 @@ def doc_id(
memory - memory
image - image to use
image_pull_secret - image pull secret
tolerations - (optional) tolerations for the ray pods
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
Expand Down
Loading