Skip to content

Commit

Permalink
made it work
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Sep 21, 2024
1 parent 2b7df37 commit 2b44b4c
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ def _get_arguments(self, parser: argparse.ArgumentParser) -> argparse.Namespace:
:return: list of arguments
"""
# add additional arguments
self.runtime_config.add_input_params(parser=parser)
self.data_access_factory.add_input_params(parser=parser)
self.execution_config.add_input_params(parser=parser)
self.runtime_config.add_input_params(parser=parser)
return parser.parse_args()

def _get_parameters(self, args: argparse.Namespace) -> bool:
Expand All @@ -68,11 +68,10 @@ def _get_parameters(self, args: argparse.Namespace) -> bool:
and does parameters validation
:return: True if validation passes or False, if not
"""
return (
self.runtime_config.apply_input_params(args=args)
return (self.runtime_config.apply_input_params(args=args)
and self.execution_config.apply_input_params(args=args)
and self.data_access_factory.apply_input_params(args=args)
)
)

def _submit_for_execution(self) -> int:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from .table_transform_test import AbstractTableTransformTest
from .binary_transform_test import AbstractBinaryTransformTest
from .noop_transform import (
from data_processing.test_support.transform.table_transform_test import AbstractTableTransformTest
from data_processing.test_support.transform.binary_transform_test import AbstractBinaryTransformTest
from data_processing.test_support.transform.noop_transform import (
NOOPTransform,
NOOPPythonTransformConfiguration,
)
from data_processing.test_support.transform.resize_transform import (
ResizeTransform,
ResizePythonTransformConfiguration,
)

from data_processing.test_support.transform.pipeline_transform import (
ResizeNOOPPythonTransformConfiguration,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.runtime.pure_python.runtime_configuration import (
PythonTransformRuntimeConfiguration,
)
from data_processing.transform import PipelineTransformConfiguration
from data_processing.utils import get_logger
from data_processing.test_support.transform import NOOPPythonTransformConfiguration, ResizePythonTransformConfiguration

logger = get_logger(__name__)


class ResizeNOOPPythonTransformConfiguration(PythonTransformRuntimeConfiguration):
"""
Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
"""

def __init__(self):
"""
Initialization
"""
super().__init__(transform_config=
PipelineTransformConfiguration({"transforms": [ResizePythonTransformConfiguration(),
NOOPPythonTransformConfiguration()]})
)


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = PythonTransformLauncher(ResizeNOOPPythonTransformConfiguration())
logger.info("Launching resize/noop transform")
launcher.launch()
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from data_processing.transform.transform_configuration import TransformConfiguration, get_transform_config
from data_processing.transform.runtime_configuration import TransformRuntimeConfiguration
from data_processing.transform.pipeline_transform import AbstractPipelineTransform
from data_processing.transform.pipeline_transform_configuration import PipelineTransformConfiguration
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
################################################################################

from typing import Any
from data_processing.transform import AbstractBinaryTransform, BaseTransformRuntime, TransformRuntimeConfiguration
from data_processing.transform import AbstractBinaryTransform, BaseTransformRuntime
from data_processing.utils import get_logger, UnrecoverableException, TransformUtils


Expand All @@ -21,15 +21,16 @@ class AbstractPipelineTransform(AbstractBinaryTransform):
participating transforms in memory
"""

def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConfiguration]):
def __init__(self, config: dict[str, Any]):
"""
Initializes pipeline execution for the list of transforms
:param config - configuration parameters
:param transforms - list of transforms in the pipeline. Note that transforms will
:param config - configuration parameters - dictionary of transforms in the pipeline.
Note that transforms will be executed
be executed
"""
super().__init__(config)
super().__init__({})
self.logger = get_logger(__name__)
transforms = config.get("transforms", [])
if len(transforms) == 0:
# Empty pipeline
self.logger.error("Pipeline transform with empty list")
Expand Down Expand Up @@ -85,7 +86,17 @@ def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tupl
# no data returned by this transform
return [], stats
# all done
return data, stats
return self._convert_output(data), stats

@staticmethod
def _convert_output(data: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]:
res = [None] * len(data)
i = 0
for dt in data:
fname = TransformUtils.get_file_extension(dt[1])
res[i] = (dt[0], fname[1])
i += 1
return res

@staticmethod
def _process_transform(transform: AbstractBinaryTransform, data: list[tuple[bytes, str]]
Expand All @@ -107,7 +118,7 @@ def _process_transform(transform: AbstractBinaryTransform, data: list[tuple[byte
res.append((ouf[0], src[0] + ouf[1]))
# accumulate statistics
stats |= st
return res, stats
return res, stats

def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Expand Down Expand Up @@ -139,9 +150,10 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
if len(data) == 0:
# no data returned by this transform
break
res += data
res += self._convert_output(data)
else:
res += out_files
res += self._convert_output(out_files)
i += 1
# Done flushing, compute execution stats
for _, runtime in self.participants:
self._compute_execution_stats(runtime=runtime, st=stats)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from typing import Any
from argparse import ArgumentParser, Namespace

from data_processing.transform import TransformConfiguration
from data_processing.transform.pure_python import PythonPipelineTransform
from data_processing.utils import get_logger

logger = get_logger(__name__)


class PipelineTransformConfiguration(TransformConfiguration):

"""
Provides support for configuring and using the associated Transform class include
configuration with CLI args.
"""

def __init__(self, config: dict[str, Any]):
super().__init__(
name="pipeline",
transform_class=PythonPipelineTransform,
)
self.params = config

def add_input_params(self, parser: ArgumentParser) -> None:
"""
Add Transform-specific arguments to the given parser.
This will be included in a dictionary used to initialize the NOOPTransform.
By convention a common prefix should be used for all transform-specific CLI args
(e.g, noop_, pii_, etc.)
"""
for t in self.params["transforms"]:
t.transform_config.add_input_params(parser=parser)

def apply_input_params(self, args: Namespace) -> bool:
"""
Validate and apply the arguments that have been parsed
:param args: user defined arguments.
:return: True, if validate pass or False otherwise
"""
res = True
for t in self.params["transforms"]:
res = res and t.transform_config.apply_input_params(args=args)
return res

def get_input_params(self) -> dict[str, Any]:
"""
Provides a default implementation if the user has provided a set of keys to the initializer.
These keys are used in apply_input_params() to extract our key/values from the global Namespace of args.
:return:
"""
params = {}
for t in self.params["transforms"]:
params |= t.transform_config.get_input_params()
return params

def get_transform_metadata(self) -> dict[str, Any]:
"""
Get transform metadata. Before returning remove all parameters key accumulated in
self.remove_from metadata. This allows transform developer to mark any input parameters
that should not make it to the metadata. This can be parameters containing sensitive
information, access keys, secrets, passwords, etc.
:return parameters for metadata:
"""
params = {}
for t in self.params["transforms"]:
params |= t.transform_config.get_transform_metadata()
return params
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from typing import Any
from data_processing.transform import AbstractPipelineTransform
from data_processing.runtime import TransformRuntimeConfiguration, BaseTransformRuntime
from data_processing.transform import TransformRuntimeConfiguration, BaseTransformRuntime


class PythonPipelineTransform(AbstractPipelineTransform):
Expand All @@ -21,14 +21,14 @@ class PythonPipelineTransform(AbstractPipelineTransform):
participating transforms in memory
"""

def __init__(self, config: dict[str, Any], transforms: list[TransformRuntimeConfiguration]):
def __init__(self, config: dict[str, Any]):
"""
Initializes pipeline execution for the list of transforms
:param config - configuration parameters
:param transforms - list of transforms in the pipeline. Note that transforms will
be executed
"""
super().__init__(config, transforms)
super().__init__(config)

def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ class TransformConfiguration(CLIArgumentProvider):
"""

def __init__(
self, name: str, transform_class: type[AbstractBinaryTransform], remove_from_metadata: list[str] = []
self,
name: str,
transform_class: type[AbstractBinaryTransform],
remove_from_metadata: list[str] = [],
):
"""
Initialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
# limitations under the License.
################################################################################

from typing import Tuple

import pyarrow as pa
from data_processing.test_support.transform.noop_transform import NOOPTransform
from data_processing.test_support.transform.table_transform_test import (
from data_processing.test_support.transform import NOOPTransform
from data_processing.test_support.transform import (
AbstractTableTransformTest,
)

Expand All @@ -30,7 +28,7 @@ class TestNOOPTransform(AbstractTableTransformTest):
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[Tuple]:
def get_test_transform_fixtures(self) -> list[tuple]:
fixtures = [
(NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list),
(NOOPTransform({"sleep": 0}), [table], [expected_table], expected_metadata_list),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

from data_processing.test_support.transform import ResizePythonTransformConfiguration
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)


class TestPythonResizeTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[tuple]:
# The following based on 3 identical input files of about 39kbytes, and 200 rows
fixtures = []
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../../transforms/universal/resize/python/test-data"))
launcher = PythonTransformLauncher(ResizePythonTransformConfiguration())

# Split into 4 or so files
config = {"resize_max_rows_per_table": 125}
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-125"))

# Merge into 2 or so files
config = {"resize_max_rows_per_table": 300}
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-rows-300"))

# # Merge all into a single table
config = {"resize_max_mbytes_per_table": 1}
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-1"))

# # Merge the 1st 2 and some of the 2nd with the 3rd
config = {"resize_max_mbytes_per_table": 0.05}
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.05"))

# Split into 4 or so files
config = {"resize_max_mbytes_per_table": 0.02}
fixtures.append((launcher, config, basedir + "/input", basedir + "/expected-mbytes-0.02"))

return fixtures
Loading

0 comments on commit 2b44b4c

Please sign in to comment.