Skip to content

Commit

Permalink
small bugs fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Sep 22, 2024
1 parent d8f531a commit 18e01a3
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self, config: dict[str, Any]):
participants.append((tr, runtime))
# save participating transforms
self.participants = participants
self.file_name = ""

def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]:
"""
Expand All @@ -76,6 +77,7 @@ def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tupl
holding the extension to be used when writing out the new bytes.
"""
# process transforms sequentially
self.file_name = file_name
data = [(byte_array, file_name)]
stats = {}
for transform, _ in self.participants:
Expand All @@ -94,10 +96,7 @@ def _convert_output(data: list[tuple[bytes, str]]) -> list[tuple[bytes, str]]:
i = 0
for dt in data:
fname = TransformUtils.get_file_extension(dt[1])
ext = fname[1]
if len(ext) <= 1:
ext = fname[0]
res[i] = (dt[0], ext)
res[i] = (dt[0], fname[1])
i += 1
return res

Expand Down Expand Up @@ -145,7 +144,7 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
# flush produced output - run it through the rest of the chain
data = []
for ouf in out_files:
data.append((ouf[0], f"file{ouf[1]}"))
data.append((ouf[0], self.file_name))
for n in range(i + 1, len(self.participants)):
data, st = self._process_transform(transform=self.participants[n][0], data=data)
# Accumulate stats
Expand All @@ -155,7 +154,7 @@ def flush_binary(self) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
break
res += self._convert_output(data)
else:
res += self._convert_output(out_files)
res += out_files
i += 1
# Done flushing, compute execution stats
for _, runtime in self.participants:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@

import os

import pyarrow as pa
from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing_ray.test_support.transform import NOOPRayTransformConfiguration


table = pa.Table.from_pydict({"name": pa.array(["Tom"]), "age": pa.array([23])})
expected_table = table # We're a noop after all.
expected_metadata_list = [{"nfiles": 1, "nrows": 1}, {}] # transform() result # flush() result


class TestRayNOOPTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
Expand Down

0 comments on commit 18e01a3

Please sign in to comment.