Skip to content

Conversation

Kimahriman
Copy link
Contributor

@Kimahriman Kimahriman commented Sep 24, 2025

What changes were proposed in this pull request?

Add the option to applyInArrow to take a function that takes an iterator of RecordBatch and returns an iterator of RecordBatch. A new eval type is added SQL_GROUPED_MAP_ARROW_ITER_UDF, and is detected via type hints on the function.

Why are the changes needed?

Having a single Table as input and a single Table as output requires collecting all inputs and outputs in memory for a single batch. This can require excessive memory for certain edge cases with large groups. Inputs and outputs already get serialized as record batches, so simply expose this lazy iterator directly instead of forcing materialization into a table.

Does this PR introduce any user-facing change?

Yes, a new function signature supported by applyInArrow.

Example:

import pyarrow as pa
import pyarrow.compute as pc
def sum_func(key: Tuple[pa.Scalar, ...], batches: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
    total = 0
    for batch in batches:
        total += pc.sum(batch.column("v")).as_py()
    yield pyarrow.RecordBatch.from_pydict({"id": [key[0].as_py()], "v": [total]})

df.groupby("id").applyInArrow(sum_func, schema="id long, v double").show()
+---+----+
| id|   v|
+---+----+
|  1| 3.0|
|  2|18.0|
+---+----+

How was this patch tested?

Updated existing UTs to test both Table signatures and RecordBatch signatures

Was this patch authored or co-authored using generative AI tooling?

No

@Kimahriman
Copy link
Contributor Author

@zhengruifeng @HyukjinKwon

Copy link
Contributor

@zhengruifeng zhengruifeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good!
only a few minor comments.

also cc @HyukjinKwon

verify_arrow_result(batch, assign_cols_by_name, expected_cols_and_types)


def wrap_grouped_map_arrow_udf(f, return_type, argspec, is_iterator, runner_conf):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a dedicated function def wrap_grouped_map_arrow_iter_udf for the new eval type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep it is cleaner that way, done

@zhengruifeng
Copy link
Contributor

please also add a simple example in the PR description, section Does this PR introduce any user-facing change?

`pyarrow.Table` or takes an iterator of `pyarrow.RecordBatch` and yields
`pyarrow.RecordBatch`. Additionally, each form can take a tuple of grouping keys
as the first argument, with the `pyarrow.Table` or iterator of `pyarrow.RecordBatch`
as the second argument.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add

    .. versionchanged:: 4.1.0
        Supports iterator API ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also add two simple examples (w/o key) in the Examples section

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added one with and without key, since the key is more relevant for this API I think

@zhengruifeng
Copy link
Contributor

thank you @Kimahriman so much for spending time on this.
merged to master for 4.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants