diff --git a/.github/workflows/test-universal-hap-kfp.yml b/.github/workflows/test-universal-hap-kfp.yml new file mode 100644 index 000000000..4e52e0535 --- /dev/null +++ b/.github/workflows/test-universal-hap-kfp.yml @@ -0,0 +1,116 @@ +# +# DO NOT EDIT THIS FILE: it is generated from test-transform.template, Edit there and run make to change these files +# +name: Test KFP - transforms/universal/hap + +on: + workflow_dispatch: + push: + branches: + - "dev" + - "releases/**" + tags: + - "*" + paths: + - ".make.*" + - "transforms/.make.workflows" + - "transforms/universal/hap/**" + - "!kfp/**" # This is tested in separate workflow + - "!data-processing-lib/**" # This is tested in separate workflow + - "!**.md" + - "!**/doc/**" + - "!**/images/**" + - "!**.gitignore" + pull_request: + branches: + - "dev" + - "releases/**" + paths: + - ".make.*" + - "transforms/.make.workflows" + - "transforms/universal/hap/**" + - "!data-processing-lib/**" # This is tested in separate workflow + - "!kfp/**" # This is tested in separate workflow + - "!**.md" + - "!**/doc/**" + - "!**/images/**" + - "!**.gitignore" + +# taken from https://stackoverflow.com/questions/66335225/how-to-cancel-previous-runs-in-the-pr-when-you-push-new-commitsupdate-the-curre +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + test-kfp-v1: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Free up space in github runner + # Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173 + run: | + df -h + sudo rm -rf "/usr/local/share/boost" + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android /usr/local/share/powershell /usr/share/swift /usr/lib/jvm /usr/local/.ghcup + sudo docker rmi $(docker image ls -aq) >/dev/null 2>&1 || true + df -h + - name: Import environment variables + run: | + cat scripts/k8s-setup/requirements.env >> $GITHUB_ENV + echo "K8S_SETUP_SCRIPTS=$PWD/scripts/k8s-setup" >> $GITHUB_ENV + echo "REPOROOT=$PWD" >> $GITHUB_ENV + echo "PATH=$PATH:/tmp" >> $GITHUB_ENV + - name: Test V1 KFP workflow for transforms/universal/hap + timeout-minutes: 120 + run: | + KFP_BLACK_LIST=$(./scripts/check-workflows.sh -show-kfp-black-list) + if [ -e "transforms/universal/hap/Makefile" -a -e "transforms/universal/hap/kfp_ray/Makefile" ]; then + transform=$(basename "transforms/universal/hap") + if echo ${KFP_BLACK_LIST} | grep -qv ${transform}; then + $PWD/scripts/workflow_helper.sh install-tools + $PWD/scripts/workflow_helper.sh test-workflow transforms/universal/hap + else + $PWD/scripts/workflow_helper.sh build-workflow transforms/universal/hap + fi + else + echo "Skipping transforms/universal/hap kfp test for lack of Makefile and/or kfp_ray/Makefile" + fi + + test-kfp-v2: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Free up space in github runner + # Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173 + run: | + df -h + sudo rm -rf "/usr/local/share/boost" + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android /usr/local/share/powershell /usr/share/swift /usr/lib/jvm /usr/local/.ghcup + sudo docker rmi $(docker image ls -aq) >/dev/null 2>&1 || true + df -h + - name: Import environment variables + run: | + cat scripts/k8s-setup/requirements.env >> $GITHUB_ENV + echo "K8S_SETUP_SCRIPTS=$PWD/scripts/k8s-setup" >> $GITHUB_ENV + echo "REPOROOT=$PWD" >> $GITHUB_ENV + echo "PATH=$PATH:/tmp" >> $GITHUB_ENV + echo "KFPv2=1" >> $GITHUB_ENV + - name: Test V2 KFP workflow for transforms/universal/hap + timeout-minutes: 120 + run: | + KFP_BLACK_LIST=$(./scripts/check-workflows.sh -show-kfp-black-list) + if [ -e "transforms/universal/hap/Makefile" -a -e "transforms/universal/hap/kfp_ray/Makefile" ]; then + transform=$(basename "transforms/universal/hap") + if echo ${KFP_BLACK_LIST} | grep -qv ${transform}; then + $PWD/scripts/workflow_helper.sh install-tools + $PWD/scripts/workflow_helper.sh test-workflow transforms/universal/hap + else + $PWD/scripts/workflow_helper.sh build-workflow transforms/universal/hap + fi + else + echo "Skipping transforms/universal/hap kfp test for lack of Makefile and/or kfp_ray/Makefile" + fi \ No newline at end of file diff --git a/scripts/k8s-setup/populate_minio.sh b/scripts/k8s-setup/populate_minio.sh index 3b22b37e7..793a3cf73 100755 --- a/scripts/k8s-setup/populate_minio.sh +++ b/scripts/k8s-setup/populate_minio.sh @@ -42,4 +42,4 @@ mc cp --recursive ${REPOROOT}/transforms/universal/noop/ray/test-data/input/ kfp mc cp --recursive ${REPOROOT}/transforms/universal/tokenization/ray/test-data/ds01/input/ kfp/test/tokenization/ds01/input mc cp --recursive ${REPOROOT}/transforms/universal/profiler/ray/test-data/input/ kfp/test/profiler/input mc cp --recursive ${REPOROOT}/transforms/universal/resize/ray/test-data/input/ kfp/test/resize/input - +mc cp --recursive ${REPOROOT}/transforms/universal/hap/ray/test-data/input/ kfp/test/hap/input diff --git a/transforms/universal/hap/kfp_ray/Makefile b/transforms/universal/hap/kfp_ray/Makefile new file mode 100644 index 000000000..4074b8713 --- /dev/null +++ b/transforms/universal/hap/kfp_ray/Makefile @@ -0,0 +1,59 @@ +REPOROOT=${CURDIR}/../../../../ + +WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate +include $(REPOROOT)/transforms/.make.workflows + +# Include the common configuration for this transform +include ../transform.config + +SRC_DIR=${CURDIR}/../ray/ + +PYTHON_WF := $(shell find ./ -name '*_wf.py') +YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF}) + +workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE} + +.PHONY: clean +clean: + @# Help: Clean up the virtual environment. + rm -rf ${REPOROOT}/transforms/venv + +venv:: + +build:: + +setup:: + +test:: + +test-src:: + +test-image:: + +publish:: + +image:: + +kind-load-image:: + +docker-load-image:: + +docker-save-image:: + +.PHONY: workflow-build +workflow-build: workflow-venv + $(MAKE) $(YAML_WF) + +.PHONY: workflow-test +workflow-test: workflow-build + $(MAKE) .workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=hap_wf.yaml + +.PHONY: workflow-upload +workflow-upload: workflow-build + @for file in $(YAML_WF); do \ + $(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \ + done + +.PHONY: workflow-generate +workflow-generate: workflow-venv + . ${WORKFLOW_VENV_ACTIVATE} && ../../../../kfp/pipeline_generator/single-pipeline/run.sh -c `pwd`/pipeline_definitions.yaml -od . diff --git a/transforms/universal/hap/kfp_ray/hap_wf.py b/transforms/universal/hap/kfp_ray/hap_wf.py new file mode 100644 index 000000000..786011d4d --- /dev/null +++ b/transforms/universal/hap/kfp_ray/hap_wf.py @@ -0,0 +1,239 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl +from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils + + +task_image = "quay.io/dataprep1/data-prep-kit/hap-ray:latest" + +# the name of the job script +EXEC_SCRIPT_NAME: str = "hap_transform_ray.py" + +# components +base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + +# path to kfp component specifications files +component_spec_path = "../../../../kfp/kfp_ray_components/" + +# compute execution parameters. Here different transforms might need different implementations. As +# a result, instead of creating a component we are creating it in place here. +def compute_exec_params_func( + worker_options: dict, + actor_options: dict, + data_s3_config: str, + data_max_files: int, + data_num_samples: int, + data_checkpointing: bool, + runtime_pipeline_id: str, + runtime_job_id: str, + runtime_code_location: dict, + model_name_or_path: str, + annotation_column: str, + doc_text_column: str, + inference_engine: str, + max_length: int, + batch_size: int, +) -> dict: + from runtime_utils import KFPUtils + + return { + "data_s3_config": data_s3_config, + "data_max_files": data_max_files, + "data_num_samples": data_num_samples, + "data_checkpointing": data_checkpointing, + "runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)), + "runtime_worker_options": str(actor_options), + "runtime_pipeline_id": runtime_pipeline_id, + "runtime_job_id": runtime_job_id, + "runtime_code_location": str(runtime_code_location), + "model_name_or_path": model_name_or_path, + "annotation_column": annotation_column, + "doc_text_column": doc_text_column, + "inference_engine": inference_engine, + "max_length": max_length, + "batch_size": batch_size, + } + + +# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the +# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path. +# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use +# this if/else statement and explicitly call the decorator. +if os.getenv("KFPv2", "0") == "1": + # In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create + # a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to + # https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at + # compilation time. + import uuid + + compute_exec_params_op = dsl.component_decorator.component( + func=compute_exec_params_func, base_image=base_kfp_image + ) + print( + "WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " + + "same version of the same pipeline !!!" + ) + run_id = uuid.uuid4().hex +else: + compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) + run_id = dsl.RUN_ID_PLACEHOLDER + +# create Ray cluster +create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") +# execute job +execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml") +# clean up Ray +cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") + +# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. +TASK_NAME: str = "hap" + + +@dsl.pipeline( + name=TASK_NAME + "-ray-pipeline", + description="Pipeline for hap task", +) +def hap( + # Ray cluster + ray_name: str = "hap-kfp-ray", # name of Ray cluster + # Add image_pull_secret and image_pull_policy to ray workers if needed + ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, + ray_worker_options: dict = { + "replicas": 2, + "max_replicas": 2, + "min_replicas": 2, + "cpu": 2, + "memory": 4, + "image": task_image, + }, + server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + # data access + data_s3_config: str = "{'input_folder': 'test/hap/input/', 'output_folder': 'test/hap/output/'}", + data_s3_access_secret: str = "s3-secret", + data_max_files: int = -1, + data_num_samples: int = -1, + data_checkpointing: bool = False, + # orchestrator + runtime_actor_options: dict = {"num_cpus": 0.8}, + runtime_pipeline_id: str = "pipeline_id", + runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"}, + # hap parameters + model_name_or_path: str = "ibm-granite/granite-guardian-hap-38m", + annotation_column: str = "hap_score", + doc_text_column: str = "contents", + inference_engine: str = "CPU", + max_length: int = 512, + batch_size: int = 128, + # additional parameters + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}', +): + """ + Pipeline to execute hap transform + :param ray_name: name of the Ray cluster + :param ray_head_options: head node options, containing the following: + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + tolerations - (optional) tolerations for the ray pods + :param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following: + replicas - number of replicas to create + max_replicas - max number of replicas + min_replicas - min number of replicas + cpu - number of cpus + memory - memory + image - image to use + image_pull_secret - image pull secret + tolerations - (optional) tolerations for the ray pods + :param server_url - server url + :param additional_params: additional (support) parameters, containing the following: + wait_interval - wait interval for API server, sec + wait_cluster_ready_tmout - time to wait for cluster ready, sec + wait_cluster_up_tmout - time to wait for cluster up, sec + wait_job_ready_tmout - time to wait for job ready, sec + wait_print_tmout - time between prints, sec + http_retries - http retries for API server calls + :param data_s3_access_secret - s3 access secret + :param data_s3_config - s3 configuration + :param data_max_files - max files to process + :param data_num_samples - num samples to process + :param runtime_actor_options - actor options + :param runtime_pipeline_id - pipeline id + :param runtime_code_location - code location + :param model_name_or_path - # HAP model path + :param annotation_column - # hap score for each document + :param doc_text_column - # The column name that contains the document text + :param inference_engine - # inference engine used + :param max_length - # inference engine used + :param batch_size - # batch size + :return: None + """ + # create clean_up task + clean_up_task = cleanup_ray_op( + ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params + ) + ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) + # pipeline definition + with dsl.ExitHandler(clean_up_task): + # compute execution params + compute_exec_params = compute_exec_params_op( + worker_options=ray_worker_options, + actor_options=runtime_actor_options, + data_s3_config=data_s3_config, + data_max_files=data_max_files, + data_num_samples=data_num_samples, + data_checkpointing=data_checkpointing, + runtime_pipeline_id=runtime_pipeline_id, + runtime_job_id=run_id, + runtime_code_location=runtime_code_location, + model_name_or_path=model_name_or_path, + annotation_column=annotation_column, + doc_text_column=doc_text_column, + inference_engine=inference_engine, + max_length=max_length, + batch_size=batch_size, + ) + + ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) + # start Ray cluster + ray_cluster = create_ray_op( + ray_name=ray_name, + run_id=run_id, + ray_head_options=ray_head_options, + ray_worker_options=ray_worker_options, + server_url=server_url, + additional_params=additional_params, + ) + ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) + ray_cluster.after(compute_exec_params) + + # Execute job + execute_job = execute_ray_jobs_op( + ray_name=ray_name, + run_id=run_id, + additional_params=additional_params, + exec_params=compute_exec_params.output, + exec_script_name=EXEC_SCRIPT_NAME, + server_url=server_url, + ) + ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) + ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) + execute_job.after(ray_cluster) + + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile(hap, __file__.replace(".py", ".yaml")) diff --git a/transforms/universal/hap/kfp_ray/pipeline_definitions.yaml b/transforms/universal/hap/kfp_ray/pipeline_definitions.yaml new file mode 100644 index 000000000..9716bb349 --- /dev/null +++ b/transforms/universal/hap/kfp_ray/pipeline_definitions.yaml @@ -0,0 +1,44 @@ +pipeline_parameters: + name: "hap" + description: "Pipeline for hap task" + script_name: "hap_transform_ray.py" + prefix: "" + multi_s3: False + compute_func_name: "" + compute_func_import: "" + component_spec_path: "" + +pipeline_common_input_parameters_values: + kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" + transform_image: "quay.io/dataprep1/data-prep-kit/hap-ray:latest" + s3_access_secret: "s3-secret" + image_pull_secret: "" + input_folder: "test/hap/input/" + output_folder: "test/hap/output/" + +pipeline_transform_input_parameters: + pipeline_arguments: + - name: "model_name_or_path" + type: "str" + value: "ibm-granite/granite-guardian-hap-38m" + description: "# HAP model path" + - name: "annotation_column" + type: "str" + value: "hap_score" + description: "# hap score for each document" + - name: "doc_text_column" + type: "str" + value: "contents" + description: "# The column name that contains the document text" + - name: "inference_engine" + type: "str" + value: "CPU" + description: "# inference engine used" + - name: max_length + type: "int" + value: 512 + description: "# inference engine used" + - name: "batch_size" + type: "int" + value: 128 + description: "# batch size"