diff --git a/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py b/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py index 648d48669..3344491d2 100644 --- a/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py +++ b/data-processing-lib/python/src/data_processing/runtime/transform_launcher.py @@ -57,9 +57,9 @@ def _get_arguments(self, parser: argparse.ArgumentParser) -> argparse.Namespace: :return: list of arguments """ # add additional arguments - self.runtime_config.add_input_params(parser=parser) self.data_access_factory.add_input_params(parser=parser) self.execution_config.add_input_params(parser=parser) + self.runtime_config.add_input_params(parser=parser) return parser.parse_args() def _get_parameters(self, args: argparse.Namespace) -> bool: @@ -68,11 +68,10 @@ def _get_parameters(self, args: argparse.Namespace) -> bool: and does parameters validation :return: True if validation passes or False, if not """ - return ( - self.runtime_config.apply_input_params(args=args) + return (self.runtime_config.apply_input_params(args=args) and self.execution_config.apply_input_params(args=args) and self.data_access_factory.apply_input_params(args=args) - ) + ) def _submit_for_execution(self) -> int: """ diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py index 0e90f7ffd..1f665dea1 100644 --- a/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/test_support/transform/__init__.py @@ -1,6 +1,14 @@ -from .table_transform_test import AbstractTableTransformTest -from .binary_transform_test import AbstractBinaryTransformTest -from .noop_transform import ( +from data_processing.test_support.transform.table_transform_test import AbstractTableTransformTest +from data_processing.test_support.transform.binary_transform_test import AbstractBinaryTransformTest +from data_processing.test_support.transform.noop_transform import ( NOOPTransform, NOOPPythonTransformConfiguration, ) +from data_processing.test_support.transform.resize_transform import ( + ResizeTransform, + ResizePythonTransformConfiguration, +) + +from data_processing.test_support.transform.pipeline_transform import ( + ResizeNOOPPythonTransformConfiguration, +) diff --git a/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py new file mode 100644 index 000000000..591f679b8 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/test_support/transform/pipeline_transform.py @@ -0,0 +1,45 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.runtime.pure_python.runtime_configuration import ( + PythonTransformRuntimeConfiguration, +) +from data_processing.transform import PipelineTransformConfiguration +from data_processing.utils import get_logger +from data_processing.test_support.transform import NOOPPythonTransformConfiguration, ResizePythonTransformConfiguration + +logger = get_logger(__name__) + + +class ResizeNOOPPythonTransformConfiguration(PythonTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + """ + super().__init__(transform_config= + PipelineTransformConfiguration({"transforms": [ResizePythonTransformConfiguration(), + NOOPPythonTransformConfiguration()]}) + ) + + +if __name__ == "__main__": + # launcher = NOOPRayLauncher() + launcher = PythonTransformLauncher(ResizeNOOPPythonTransformConfiguration()) + logger.info("Launching resize/noop transform") + launcher.launch() diff --git a/data-processing-lib/python/src/data_processing/transform/__init__.py b/data-processing-lib/python/src/data_processing/transform/__init__.py index 9f97f0528..69ca1323f 100644 --- a/data-processing-lib/python/src/data_processing/transform/__init__.py +++ b/data-processing-lib/python/src/data_processing/transform/__init__.py @@ -5,3 +5,4 @@ from data_processing.transform.transform_configuration import TransformConfiguration, get_transform_config from data_processing.transform.runtime_configuration import TransformRuntimeConfiguration from data_processing.transform.pipeline_transform import AbstractPipelineTransform +from data_processing.transform.pipeline_transform_configuration import PipelineTransformConfiguration diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index 34feb050d..d825d4697 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -11,7 +11,7 @@ ################################################################################ from typing import Any -from data_processing.transform import AbstractBinaryTransform, BaseTransformRuntime, TransformRuntimeConfiguration +from data_processing.transform import AbstractBinaryTransform, BaseTransformRuntime from data_processing.utils import get_logger, UnrecoverableException, TransformUtils @@ -21,15 +21,16 @@ class AbstractPipelineTransform(AbstractBinaryTransform): participating transforms in memory """ - def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConfiguration]): + def __init__(self, config: dict[str, Any]): """ Initializes pipeline execution for the list of transforms - :param config - configuration parameters - :param transforms - list of transforms in the pipeline. Note that transforms will + :param config - configuration parameters - dictionary of transforms in the pipeline. + Note that transforms will be executed be executed """ - super().__init__(config) + super().__init__({}) self.logger = get_logger(__name__) + transforms = config.get("transforms", []) if len(transforms) == 0: # Empty pipeline self.logger.error("Pipeline transform with empty list") @@ -85,7 +86,17 @@ def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tupl # no data returned by this transform return [], stats # all done - return data, stats + return self._convert_output(data), stats + + @staticmethod + def _convert_output(data: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]: + res = [None] * len(data) + i = 0 + for dt in data: + fname = TransformUtils.get_file_extension(dt[1]) + res[i] = (dt[0], fname[1]) + i += 1 + return res @staticmethod def _process_transform(transform: AbstractBinaryTransform, data: list[tuple[bytes, str]] @@ -107,7 +118,7 @@ def _process_transform(transform: AbstractBinaryTransform, data: list[tuple[byte res.append((ouf[0], src[0] + ouf[1])) # accumulate statistics stats |= st - return res, stats + return res, stats def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: """ @@ -139,9 +150,10 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: if len(data) == 0: # no data returned by this transform break - res += data + res += self._convert_output(data) else: - res += out_files + res += self._convert_output(out_files) + i += 1 # Done flushing, compute execution stats for _, runtime in self.participants: self._compute_execution_stats(runtime=runtime, st=stats) diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py new file mode 100644 index 000000000..8416c2884 --- /dev/null +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform_configuration.py @@ -0,0 +1,80 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any +from argparse import ArgumentParser, Namespace + +from data_processing.transform import TransformConfiguration +from data_processing.transform.pure_python import PythonPipelineTransform +from data_processing.utils import get_logger + +logger = get_logger(__name__) + + +class PipelineTransformConfiguration(TransformConfiguration): + + """ + Provides support for configuring and using the associated Transform class include + configuration with CLI args. + """ + + def __init__(self, config: dict[str, Any]): + super().__init__( + name="pipeline", + transform_class=PythonPipelineTransform, + ) + self.params = config + + def add_input_params(self, parser: ArgumentParser) -> None: + """ + Add Transform-specific arguments to the given parser. + This will be included in a dictionary used to initialize the NOOPTransform. + By convention a common prefix should be used for all transform-specific CLI args + (e.g, noop_, pii_, etc.) + """ + for t in self.params["transforms"]: + t.transform_config.add_input_params(parser=parser) + + def apply_input_params(self, args: Namespace) -> bool: + """ + Validate and apply the arguments that have been parsed + :param args: user defined arguments. + :return: True, if validate pass or False otherwise + """ + res = True + for t in self.params["transforms"]: + res = res and t.transform_config.apply_input_params(args=args) + return res + + def get_input_params(self) -> dict[str, Any]: + """ + Provides a default implementation if the user has provided a set of keys to the initializer. + These keys are used in apply_input_params() to extract our key/values from the global Namespace of args. + :return: + """ + params = {} + for t in self.params["transforms"]: + params |= t.transform_config.get_input_params() + return params + + def get_transform_metadata(self) -> dict[str, Any]: + """ + Get transform metadata. Before returning remove all parameters key accumulated in + self.remove_from metadata. This allows transform developer to mark any input parameters + that should not make it to the metadata. This can be parameters containing sensitive + information, access keys, secrets, passwords, etc. + :return parameters for metadata: + """ + params = {} + for t in self.params["transforms"]: + params |= t.transform_config.get_transform_metadata() + return params diff --git a/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py index e798e6a36..d52e3a0bb 100644 --- a/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pure_python/pipeline_transform.py @@ -12,7 +12,7 @@ from typing import Any from data_processing.transform import AbstractPipelineTransform -from data_processing.runtime import TransformRuntimeConfiguration, BaseTransformRuntime +from data_processing.transform import TransformRuntimeConfiguration, BaseTransformRuntime class PythonPipelineTransform(AbstractPipelineTransform): @@ -21,14 +21,14 @@ class PythonPipelineTransform(AbstractPipelineTransform): participating transforms in memory """ - def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConfiguration]): + def __init__(self, config: dict[str, Any]): """ Initializes pipeline execution for the list of transforms :param config - configuration parameters :param transforms - list of transforms in the pipeline. Note that transforms will be executed """ - super().__init__(config, transforms) + super().__init__(config) def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: """ diff --git a/data-processing-lib/python/src/data_processing/transform/transform_configuration.py b/data-processing-lib/python/src/data_processing/transform/transform_configuration.py index 033e92f2a..5e4938ce8 100644 --- a/data-processing-lib/python/src/data_processing/transform/transform_configuration.py +++ b/data-processing-lib/python/src/data_processing/transform/transform_configuration.py @@ -23,7 +23,10 @@ class TransformConfiguration(CLIArgumentProvider): """ def __init__( - self, name: str, transform_class: type[AbstractBinaryTransform], remove_from_metadata: list[str] = [] + self, + name: str, + transform_class: type[AbstractBinaryTransform], + remove_from_metadata: list[str] = [], ): """ Initialization diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py b/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py index 1eb85fe48..caf1c60f6 100644 --- a/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_noop.py @@ -10,11 +10,9 @@ # limitations under the License. ################################################################################ -from typing import Tuple - import pyarrow as pa -from data_processing.test_support.transform.noop_transform import NOOPTransform -from data_processing.test_support.transform.table_transform_test import ( +from data_processing.test_support.transform import NOOPTransform +from data_processing.test_support.transform import ( AbstractTableTransformTest, ) @@ -30,7 +28,7 @@ class TestNOOPTransform(AbstractTableTransformTest): The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. """ - def get_test_transform_fixtures(self) -> list[Tuple]: + def get_test_transform_fixtures(self) -> list[tuple]: fixtures = [ (NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list), (NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list), diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py b/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py new file mode 100644 index 000000000..61ec43c50 --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_resize.py @@ -0,0 +1,53 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +from data_processing.test_support.transform import ResizePythonTransformConfiguration +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) + + +class TestPythonResizeTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + # The following based on 3 identical input files of about 39kbytes, and 200 rows + fixtures = [] + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../../transforms/universal/resize/python/test-data")) + launcher = PythonTransformLauncher(ResizePythonTransformConfiguration()) + + # Split into 4 or so files + config = {"resize_max_rows_per_table": 125} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125")) + + # Merge into 2 or so files + config = {"resize_max_rows_per_table": 300} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300")) + + # # Merge all into a single table + config = {"resize_max_mbytes_per_table": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1")) + + # # Merge the 1st 2 and some of the 2nd with the 3rd + config = {"resize_max_mbytes_per_table": 0.05} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05")) + + # Split into 4 or so files + config = {"resize_max_mbytes_per_table": 0.02} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02")) + + return fixtures diff --git a/data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py b/data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py new file mode 100644 index 000000000..939b34da0 --- /dev/null +++ b/data-processing-lib/python/test/data_processing_tests/transform/test_resize_noop.py @@ -0,0 +1,54 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +from data_processing.test_support.transform import ResizeNOOPPythonTransformConfiguration +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) + + +class TestPythonResizeNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + # The following based on 3 identical input files of about 39kbytes, and 200 rows + fixtures = [] + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), + "../../../../../transforms/universal/resize/python/test-data")) + launcher = PythonTransformLauncher(ResizeNOOPPythonTransformConfiguration()) + + # Split into 4 or so files + config = {"resize_max_rows_per_table": 125, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125")) + + # Merge into 2 or so files + config = {"resize_max_rows_per_table": 300, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300")) + + # # Merge all into a single table + config = {"resize_max_mbytes_per_table": 1, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1")) + + # # Merge the 1st 2 and some of the 2nd with the 3rd + config = {"resize_max_mbytes_per_table": 0.05, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05")) + + # Split into 4 or so files + config = {"resize_max_mbytes_per_table": 0.02, "noop_sleep_sec": 1} + fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02")) + + return fixtures