diff --git a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py index ea0de5175..e7eb349b6 100644 --- a/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py +++ b/data-processing-lib/python/src/data_processing/transform/pipeline_transform.py @@ -56,6 +56,7 @@ def __init__(self, config: dict[str, Any]): participants.append((tr, runtime)) # save participating transforms self.participants = participants + self.file_name = "" def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]: """ @@ -76,6 +77,7 @@ def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tupl holding the extension to be used when writing out the new bytes. """ # process transforms sequentially + self.file_name = file_name data = [(byte_array, file_name)] stats = {} for transform, _ in self.participants: @@ -94,10 +96,7 @@ def _convert_output(data: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]: i = 0 for dt in data: fname = TransformUtils.get_file_extension(dt[1]) - ext = fname[1] - if len(ext) <= 1: - ext = fname[0] - res[i] = (dt[0], ext) + res[i] = (dt[0], fname[1]) i += 1 return res @@ -145,7 +144,7 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: # flush produced output - run it through the rest of the chain data = [] for ouf in out_files: - data.append((ouf[0], f"file{ouf[1]}")) + data.append((ouf[0], self.file_name)) for n in range(i + 1, len(self.participants)): data, st = self._process_transform(transform=self.participants[n][0], data=data) # Accumulate stats @@ -155,7 +154,7 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]: break res += self._convert_output(data) else: - res += self._convert_output(out_files) + res += out_files i += 1 # Done flushing, compute execution stats for _, runtime in self.participants: diff --git a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py index d4cc874f0..e706a4dfa 100644 --- a/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py +++ b/data-processing-lib/ray/test/data_processing_ray_tests/launch/ray/ray_test_noop_launch.py @@ -12,7 +12,6 @@ import os -import pyarrow as pa from data_processing.test_support.launch.transform_test import ( AbstractTransformLauncherTest, ) @@ -20,11 +19,6 @@ from data_processing_ray.test_support.transform import NOOPRayTransformConfiguration -table = pa.Table.from_pydict({"name": pa.array(["Tom"]), "age": pa.array([23])}) -expected_table = table # We're a noop after all. -expected_metadata_list = [{"nfiles": 1, "nrows": 1}, {}] # transform() result # flush() result - - class TestRayNOOPTransform(AbstractTransformLauncherTest): """ Extends the super-class to define the test data for the tests defined there.