From 6c54442ed5274785f91e6ad628d77121036706ba Mon Sep 17 00:00:00 2001 From: Maroun Touma Date: Wed, 11 Dec 2024 08:01:19 +0100 Subject: [PATCH] added notebooks Signed-off-by: Maroun Touma --- ...doc_chunk.ipynb => doc_chunk-python.ipynb} | 109 +++++----- .../language/doc_chunk/doc_chunk-ray.ipynb | 187 ++++++++++++++++++ .../doc_chunk/dpk_doc_chunk/ray/transform.py | 35 +++- .../dpk_doc_chunk/transform_python.py | 28 ++- 4 files changed, 291 insertions(+), 68 deletions(-) rename transforms/language/doc_chunk/{doc_chunk.ipynb => doc_chunk-python.ipynb} (58%) create mode 100644 transforms/language/doc_chunk/doc_chunk-ray.ipynb diff --git a/transforms/language/doc_chunk/doc_chunk.ipynb b/transforms/language/doc_chunk/doc_chunk-python.ipynb similarity index 58% rename from transforms/language/doc_chunk/doc_chunk.ipynb rename to transforms/language/doc_chunk/doc_chunk-python.ipynb index 3a8466037..6dd763dd8 100644 --- a/transforms/language/doc_chunk/doc_chunk.ipynb +++ b/transforms/language/doc_chunk/doc_chunk-python.ipynb @@ -14,7 +14,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "id": "4c45c3c6-e4d7-4e61-8de6-32d61f2ce695", "metadata": {}, "outputs": [], @@ -22,9 +22,8 @@ "%%capture\n", "## This is here as a reference only\n", "# Users and application developers must use the right tag for the latest from pypi\n", - "#!pip install data-prep-toolkit\n", - "#!pip install data-prep-toolkit-transforms\n", - "#!pip install data-prep-connector" + "!pip install data-prep-toolkit\n", + "!pip install data-prep-toolkit-transforms[doc_chunk]\n" ] }, { @@ -52,18 +51,12 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 1, "id": "c2a12abc-9460-4e45-8961-873b48a9ab19", "metadata": {}, "outputs": [], "source": [ - "import ast\n", - "import os\n", - "import sys\n", - "\n", - "from data_processing.runtime.pure_python import PythonTransformLauncher\n", - "from data_processing.utils import ParamsUtils\n", - "from doc_chunk_transform_python import DocChunkPythonTransformConfiguration\n" + "from dpk_doc_chunk.transform_python import DocChunkRuntime" ] }, { @@ -71,70 +64,48 @@ "id": "7234563c-2924-4150-8a31-4aec98c1bf33", "metadata": {}, "source": [ - "##### ***** Setup runtime parameters for this transform" + "##### ***** Setup runtime parameters for this transform and invoke transform" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 2, "id": "e90a853e-412f-45d7-af3d-959e755aeebb", "metadata": {}, - "outputs": [], - "source": [ - "# create parameters\n", - "input_folder = os.path.join(\"python\", \"test-data\", \"input\")\n", - "output_folder = os.path.join( \"python\", \"output\")\n", - "local_conf = {\n", - " \"input_folder\": input_folder,\n", - " \"output_folder\": output_folder,\n", - "}\n", - "params = {\n", - " \"data_local_config\": ParamsUtils.convert_to_ast(local_conf),\n", - " \"data_files_to_use\": ast.literal_eval(\"['.parquet']\"),\n", - " \"runtime_pipeline_id\": \"pipeline_id\",\n", - " \"runtime_job_id\": \"job_id\",\n", - " \"doc_chunk_chunking_type\": \"dl_json\",\n", - "}" - ] - }, - { - "cell_type": "markdown", - "id": "7949f66a-d207-45ef-9ad7-ad9406f8d42a", - "metadata": {}, - "source": [ - "##### ***** Use python runtime to invoke the transform" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "0775e400-7469-49a6-8998-bd4772931459", - "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "15:19:48 INFO - pipeline id pipeline_id\n", - "15:19:48 INFO - code location None\n", - "15:19:48 INFO - data factory data_ is using local data access: input_folder - python/test-data/input output_folder - python/output\n", - "15:19:48 INFO - data factory data_ max_files -1, n_sample -1\n", - "15:19:48 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']\n", - "15:19:48 INFO - orchestrator doc_chunk started at 2024-11-20 15:19:48\n", - "15:19:48 INFO - Number of files is 1, source profile {'max_file_size': 0.011513710021972656, 'min_file_size': 0.011513710021972656, 'total_file_size': 0.011513710021972656}\n", - "15:19:48 INFO - Completed 1 files (100.0%) in 0.001 min\n", - "15:19:48 INFO - Done processing 1 files, waiting for flush() completion.\n", - "15:19:48 INFO - done flushing in 0.0 sec\n", - "15:19:48 INFO - Completed execution in 0.001 min, execution result 0\n" + "07:53:11 INFO - doc_chunk parameters are : {'chunking_type': 'dl_json', 'content_column_name': 'contents', 'doc_id_column_name': 'document_id', 'output_chunk_column_name': 'contents', 'output_source_doc_id_column_name': 'source_document_id', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox', 'chunk_size_tokens': 128, 'chunk_overlap_tokens': 30, 'dl_min_chunk_len': None}\n", + "07:53:11 INFO - pipeline id pipeline_id\n", + "07:53:11 INFO - code location None\n", + "07:53:11 INFO - data factory data_ is using local data access: input_folder - test-data/input output_folder - output\n", + "07:53:11 INFO - data factory data_ max_files -1, n_sample -1\n", + "07:53:11 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']\n", + "07:53:11 INFO - orchestrator doc_chunk started at 2024-12-11 07:53:11\n", + "07:53:11 INFO - Number of files is 1, source profile {'max_file_size': 0.011513710021972656, 'min_file_size': 0.011513710021972656, 'total_file_size': 0.011513710021972656}\n", + "07:53:15 INFO - Completed 1 files (100.0%) in 0.062 min\n", + "07:53:15 INFO - Done processing 1 files, waiting for flush() completion.\n", + "07:53:15 INFO - done flushing in 0.0 sec\n", + "07:53:15 INFO - Completed execution in 0.062 min, execution result 0\n" ] + }, + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" } ], "source": [ - "%%capture\n", - "sys.argv = ParamsUtils.dict_to_req(d=params)\n", - "launcher = PythonTransformLauncher(runtime_config=DocChunkPythonTransformConfiguration())\n", - "launcher.launch()\n", - "\n" + "DocChunkRuntime(input_folder='test-data/input',\n", + " output_folder='output',\n", + " doc_chunk_chunking_type= \"dl_json\").transform()" ] }, { @@ -147,25 +118,33 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 3, "id": "7276fe84-6512-4605-ab65-747351e13a7c", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "['python/output/metadata.json', 'python/output/test1.parquet']" + "['output/metadata.json', 'output/test1.parquet']" ] }, - "execution_count": 5, + "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import glob\n", - "glob.glob(\"python/output/*\")" + "glob.glob(\"output/*\")" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "50fa6c21-ff73-4e9e-88a0-6fc85a5ef8e0", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/transforms/language/doc_chunk/doc_chunk-ray.ipynb b/transforms/language/doc_chunk/doc_chunk-ray.ipynb new file mode 100644 index 000000000..eaaeb143b --- /dev/null +++ b/transforms/language/doc_chunk/doc_chunk-ray.ipynb @@ -0,0 +1,187 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "afd55886-5f5b-4794-838e-ef8179fb0394", + "metadata": {}, + "source": [ + "##### **** These pip installs need to be adapted to use the appropriate release level. Alternatively, The venv running the jupyter lab could be pre-configured with a requirement file that includes the right release. Example for transform developers working from git clone:\n", + "```\n", + "make venv\n", + "source venv/bin/activate && pip install jupyterlab\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4c45c3c6-e4d7-4e61-8de6-32d61f2ce695", + "metadata": {}, + "outputs": [], + "source": [ + "%%capture\n", + "## This is here as a reference only\n", + "# Users and application developers must use the right tag for the latest from pypi\n", + "!pip install data-prep-toolkit[ray]\n", + "!pip install data-prep-toolkit-transforms[doc_chunk]\n" + ] + }, + { + "cell_type": "markdown", + "id": "407fd4e4-265d-4ec7-bbc9-b43158f5f1f3", + "metadata": { + "jp-MarkdownHeadingCollapsed": true + }, + "source": [ + "##### **** Configure the transform parameters. We will only show the use of data_files_to_use and doc_chunk_chunking_type. For a complete list of parameters, please refer to the README.md for this transform\n", + "##### \n", + "| parameter:type | value | Description |\n", + "| --- | --- | --- |\n", + "|data_files_to_use: list | .parquet | Process all parquet files in the input folder |\n", + "| doc_chunk_chunking_type: str | dl_json | |\n" + ] + }, + { + "cell_type": "markdown", + "id": "ebf1f782-0e61-485c-8670-81066beb734c", + "metadata": {}, + "source": [ + "##### ***** Import required Classes and modules" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "c2a12abc-9460-4e45-8961-873b48a9ab19", + "metadata": {}, + "outputs": [], + "source": [ + "from dpk_doc_chunk.ray.transform import DocChunkRuntime\n", + "from data_processing.utils import GB" + ] + }, + { + "cell_type": "markdown", + "id": "7234563c-2924-4150-8a31-4aec98c1bf33", + "metadata": {}, + "source": [ + "##### ***** Setup runtime parameters for this transform and invoke transform" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "e90a853e-412f-45d7-af3d-959e755aeebb", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "07:59:00 INFO - doc_chunk parameters are : {'chunking_type': 'dl_json', 'content_column_name': 'contents', 'doc_id_column_name': 'document_id', 'output_chunk_column_name': 'contents', 'output_source_doc_id_column_name': 'source_document_id', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox', 'chunk_size_tokens': 128, 'chunk_overlap_tokens': 30, 'dl_min_chunk_len': None}\n", + "07:59:00 INFO - pipeline id pipeline_id\n", + "07:59:00 INFO - code location None\n", + "07:59:00 INFO - number of workers 2 worker options {'num_cpus': 1, 'memory': 2147483648, 'max_restarts': -1}\n", + "07:59:00 INFO - actor creation delay 0\n", + "07:59:00 INFO - job details {'job category': 'preprocessing', 'job name': 'doc_chunk', 'job type': 'ray', 'job id': 'job_id'}\n", + "07:59:00 INFO - data factory data_ is using local data access: input_folder - test-data/input output_folder - output\n", + "07:59:00 INFO - data factory data_ max_files -1, n_sample -1\n", + "07:59:00 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']\n", + "07:59:00 INFO - Running locally\n", + "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", + "I0000 00:00:1733900341.050308 1984270 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers\n", + "I0000 00:00:1733900348.435057 1984270 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers\n", + "I0000 00:00:1733900348.456883 1984270 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers\n", + "2024-12-11 07:59:11,066\tINFO worker.py:1777 -- Started a local Ray instance. View the dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265 \u001b[39m\u001b[22m\n", + "\u001b[36m(orchestrate pid=96891)\u001b[0m 07:59:16 INFO - orchestrator started at 2024-12-11 07:59:16\n", + "\u001b[36m(orchestrate pid=96891)\u001b[0m 07:59:16 INFO - Number of files is 1, source profile {'max_file_size': 0.011513710021972656, 'min_file_size': 0.011513710021972656, 'total_file_size': 0.011513710021972656}\n", + "\u001b[36m(orchestrate pid=96891)\u001b[0m 07:59:16 INFO - Cluster resources: {'cpus': 12, 'gpus': 0, 'memory': 10.991171646863222, 'object_store': 2.0}\n", + "\u001b[36m(orchestrate pid=96891)\u001b[0m 07:59:16 INFO - Number of workers - 2 with {'num_cpus': 1, 'memory': 2147483648, 'max_restarts': -1} each\n", + "\u001b[36m(orchestrate pid=96891)\u001b[0m 07:59:20 INFO - Completed 0 files (0.0%) in 0.0 min. Waiting for completion\n", + "\u001b[36m(orchestrate pid=96891)\u001b[0m 07:59:20 INFO - Completed processing 1 files in 0.001 min\n", + "\u001b[36m(orchestrate pid=96891)\u001b[0m 07:59:20 INFO - done flushing in 0.002 sec\n", + "07:59:30 INFO - Completed execution in 0.494 min, execution result 0\n" + ] + }, + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "DocChunkRuntime(input_folder='test-data/input',\n", + " output_folder='output',\n", + " run_locally= True,\n", + " num_cpus= 1,\n", + " memory= 2 * GB,\n", + " runtime_num_workers = 2,\n", + " doc_chunk_chunking_type= \"dl_json\").transform()" + ] + }, + { + "cell_type": "markdown", + "id": "c3df5adf-4717-4a03-864d-9151cd3f134b", + "metadata": {}, + "source": [ + "##### **** The specified folder will include the transformed parquet files." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "7276fe84-6512-4605-ab65-747351e13a7c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['output/metadata.json', 'output/test1.parquet']" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import glob\n", + "glob.glob(\"output/*\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8b99c91d-21b5-4e97-8c5e-779beea4752c", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/transforms/language/doc_chunk/dpk_doc_chunk/ray/transform.py b/transforms/language/doc_chunk/dpk_doc_chunk/ray/transform.py index 8ebe326ef..aec7ad254 100644 --- a/transforms/language/doc_chunk/dpk_doc_chunk/ray/transform.py +++ b/transforms/language/doc_chunk/dpk_doc_chunk/ray/transform.py @@ -9,7 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - +import sys from argparse import ArgumentParser, Namespace from typing import Any @@ -18,7 +18,7 @@ PythonTransformRuntimeConfiguration, ) from data_processing.transform import AbstractTableTransform, TransformConfiguration -from data_processing.utils import CLIArgumentProvider, get_logger +from data_processing.utils import CLIArgumentProvider, ParamsUtils, get_logger from data_processing_ray.runtime.ray import RayTransformLauncher from data_processing_ray.runtime.ray.runtime_configuration import ( RayTransformRuntimeConfiguration, @@ -44,6 +44,37 @@ def __init__(self): super().__init__(transform_config=DocChunkTransformConfiguration()) +# Class used by the notebooks to ingest binary files and create parquet files +class DocChunkRuntime: + def __init__(self, **kwargs): + self.params = {} + for key in kwargs: + self.params[key] = kwargs[key] + # if input_folder and output_folder are specified, then assume it is represent data_local_config + try: + local_conf = {k: self.params[k] for k in ("input_folder", "output_folder")} + self.params["data_local_config"] = ParamsUtils.convert_to_ast(local_conf) + del self.params["input_folder"] + del self.params["output_folder"] + except: + pass + try: + worker_options = {k: self.params[k] for k in ("num_cpus", "memory")} + self.params["runtime_worker_options"] = ParamsUtils.convert_to_ast(worker_options) + del self.params["num_cpus"] + del self.params["memory"] + except: + pass + + def transform(self): + sys.argv = ParamsUtils.dict_to_req(d=(self.params)) + # create launcher + launcher = RayTransformLauncher(DocChunkRayTransformConfiguration()) + # launch + return_code = launcher.launch() + return return_code + + if __name__ == "__main__": launcher = RayTransformLauncher(DocChunkRayTransformConfiguration()) logger.info("Launching doc_chunk transform") diff --git a/transforms/language/doc_chunk/dpk_doc_chunk/transform_python.py b/transforms/language/doc_chunk/dpk_doc_chunk/transform_python.py index f037caeb0..f3a32237b 100644 --- a/transforms/language/doc_chunk/dpk_doc_chunk/transform_python.py +++ b/transforms/language/doc_chunk/dpk_doc_chunk/transform_python.py @@ -10,11 +10,13 @@ # limitations under the License. ################################################################################ +import sys + from data_processing.runtime.pure_python import PythonTransformLauncher from data_processing.runtime.pure_python.runtime_configuration import ( PythonTransformRuntimeConfiguration, ) -from data_processing.utils import get_logger +from data_processing.utils import ParamsUtils, get_logger from dpk_doc_chunk.transform import DocChunkTransformConfiguration @@ -36,6 +38,30 @@ def __init__(self): super().__init__(transform_config=DocChunkTransformConfiguration()) +# Class used by the notebooks to ingest binary files and create parquet files +class DocChunkRuntime: + def __init__(self, **kwargs): + self.params = {} + for key in kwargs: + self.params[key] = kwargs[key] + # if input_folder and output_folder are specified, then assume it is represent data_local_config + try: + local_conf = {k: self.params[k] for k in ("input_folder", "output_folder")} + self.params["data_local_config"] = ParamsUtils.convert_to_ast(local_conf) + del self.params["input_folder"] + del self.params["output_folder"] + except: + pass + + def transform(self): + sys.argv = ParamsUtils.dict_to_req(d=(self.params)) + # create launcher + launcher = PythonTransformLauncher(DocChunkPythonTransformConfiguration()) + # launch + return_code = launcher.launch() + return return_code + + if __name__ == "__main__": # launcher = DocChunkRayLauncher() launcher = PythonTransformLauncher(DocChunkPythonTransformConfiguration())