Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial pass at a pipelining transform #424

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
Draft

Conversation

daw3rd
Copy link
Member

@daw3rd daw3rd commented Jul 18, 2024

Why are these changes needed?

We would like to define a sequence of transforms using python and run the sequence as any other transform would be run.

Related issue number (if any).

#374

@daw3rd daw3rd marked this pull request as draft July 18, 2024 18:01
@daw3rd daw3rd requested a review from blublinsky July 18, 2024 18:01
Copy link
Collaborator

@blublinsky blublinsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really trvialization

raise ValueError(f"Missing configuration key {transform_key} specifying the list of transforms to run")
for transform in self.transforms:
if not isinstance(transform, AbstractBinaryTransform):
raise ValueError(f"{transform} is not an instance of AbstractBinaryTransform")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every transform here can have its own config parameters. Where are transforms initialized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. not sure we need that in the first pass, but open to suggestions. Initially this may be for the non-launched/embedded/notebooks.

# Capture the list of outputs from this transform as inputs to the next (or as the return values).
for transformation in transformation_tuples:
transformed, extension = transformation
fname = transform_name + "-output" + extension
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break transform logic, if the need the input file name, for example codetoparquet or pdf conversion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't they all only operate on the extension?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not all of them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i could use use original name, but then using the same name for all seems incorrect. Suggestion?

r_bytes = []
r_metadata = {}
for transform in self.transforms:
transformation_tuples, metadata = transform.flush_binary()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is completely wrong. flush from the first transform has to be processed by the rest of them, then fluch from the second transform has to be processed by remaining one and so on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good. yes, A->B->C, A.flush() has to be fed to B.transform(). ugly, but true.

# which use/expect parquet files.
fixtures = []
noop0 = NOOPTransform({"sleep": 0})
noop1 = NOOPTransform({"sleep": 0})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is complete cheating

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a start. that said, we're not testing the application of the underlying transforms, as much as the structure of the output. but yes, would nice to have a better test, but would require having transforms othre than NOOP in the test_support packages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you are referring to the configuration part. We have always been saying that transforms can be configured outside of the CLI/runtime mechanics. I'm doing that here. However, it is true, that to run a pipeline transform in a runtime may require more work - this is more for the python only non-runtime users, at least initially.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I repeat my sentiment. Its circumventing the "normal" execution

@blublinsky
Copy link
Collaborator

These issues just magnify why this approach is not starter (in my opinion)

@daw3rd
Copy link
Member Author

daw3rd commented Jul 19, 2024

from typing import Any
import pyarrow as pa
from pyarrow import Table
from data_processing.transform import AbstractTableTransform
from data_processing.transform.binary_pipeline import PipelinedBinaryTransform
from data_processing.utils import TransformUtils

class HelloTransform(AbstractTableTransform):
    """" Adds a column of greetings """
    def __init__(self, config:dict):
        self.who = config.get("who", "World")
        self.column_name = config.get("column_name", "greeting")

    def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
        # Create a new column with each row holding the who value
        new_column = ["Hello " + self.who + "!"] * table.num_rows
        # Append the column to create a new table with the configured name.
        table = TransformUtils.add_column(table=table, name=self.column_name, content=new_column)
        return [table], {}

if __name__ == "__main__":

    ids = pa.array([0 ])
    contents = pa.array(["Some content"])
    names = ["doc_id", "content"]
    table = pa.Table.from_arrays([ids, contents], names=names)

    hello0 = HelloTransform({})
    hello1 = HelloTransform({"column_name": "greating2", "who": "David"})
    hello2 = HelloTransform({"column_name": "greating3", "who": "Boris"})
    pipeline = PipelinedTransform({"transforms": [ hello0, hello1, hello2]})
    tables, metadata = pipeline.transform(table)
    table = tables[0]
    print(f"table={table}")

produces

table=pyarrow.Table
doc_id: int64
content: string
greeting: string
greating2: string
greating3: string
----
doc_id: [[0]]
content: [["Some content"]]
greeting: [["Hello World!"]]
greating2: [["Hello David!"]]
greating3: [["Hello Boris!"]]


def __init__(self, config: dict[str, Any]):
"""
Create the pipeline using a list of initialize transforms
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialized?

self.input_extension = None
self.transforms = config.get(transform_key, None)
if self.transforms is None:
raise ValueError(f"Missing configuration key {transform_key} specifying the list of transforms to run")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also make sure the list is not empty and actually contains transforms, not some garbage

r_bytes, r_metadata = self._apply_transforms_to_datum(self.transforms, (file_name, byte_array))
return r_bytes, r_metadata

def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this table, it should be binary. it should check the extension and then decide whether this is a table

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to avoid having someone that wants to transform a table, first having to convert to bytes to call transform_binary(). i think as you said, we may need to restrict this to transform_binary(), or allow transform() but check to be sure it is a Table. Or, have transform_binary() do the type checking. I think it would be useful though to avoid unnecessary conversion back and forth of Table to bytes and bytes to Table, for each transform.

transform: Union[AbstractTableTransform, AbstractBinaryTransform],
datum: Union[pa.Table, bytearray],
file_name: str,
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be wrong. Transform is always binary. A table is just one of the cases of binary. no need for Unions. Take a look at https://github.com/IBM/data-prep-kit/blob/dev/data-processing-lib/python/src/data_processing/transform/table_transform.py. it takes binary and returns binary. Table is just an intermediate format

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, I was/am trying to find a way to avoid requiring the user to call transform_binary() if they are starting with a Table. And if the list of transforms is all TableTransforms, we should try and avoid two byte/Table conversions on each transform.

transformation_tuples, metadata = transform.transform_binary(file_name, datum)
return transformation_tuples, metadata

def _call_flush(self, transform: Union[AbstractTableTransform, AbstractBinaryTransform]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here this union stuff again

for table in tables:
transformation_tuples.append((table, ".parquet"))
else:
transformation_tuples, metadata = transform.flush_binary()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the only method called. Why is it more complex that it needs to be

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, trying to avoid byte/Table conversion, but not sure its ready yet.

# which use/expect parquet files.
fixtures = []
noop0 = NOOPTransform({"sleep": 0})
noop1 = NOOPTransform({"sleep": 0})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I repeat my sentiment. Its circumventing the "normal" execution

@blublinsky
Copy link
Collaborator

This is breaking cli

@daw3rd
Copy link
Member Author

daw3rd commented Sep 19, 2024

This is breaking cli

I have not completed that part yet. This is still a Draft afterall :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants