Skip to content

Conversation

Kimahriman
Copy link
Contributor

What changes were proposed in this pull request?

Add the option to applyInArrow to take a function that takes an iterator of RecordBatch and returns an iterator of RecordBatch, and respect spark.sql.execution.arrow.maxRecordsPerBatch on the input iterator.

Why are the changes needed?

Being limited to returning a single Table requires collecting all results in memory for a single batch. This can require excessive memory for certain edge cases with large groups. Currently the Python worker immediately converts a table into it's underlying batches, so there's barely any changes required to accommodate this. There are larger changes required to support max records per batch on the input side.

Does this PR introduce any user-facing change?

Yes, a new function signature supported by applyInArrow

How was this patch tested?

Updated existing UTs to test both Table signatures and RecordBatch signatures

Was this patch authored or co-authored using generative AI tooling?

No

@Kimahriman
Copy link
Contributor Author

Gentle ping for potential inclusion in 4.0

@Kimahriman Kimahriman force-pushed the apply-in-arrow-input branch from 5c2c07c to 7766b1d Compare February 8, 2025 14:13
@Kimahriman
Copy link
Contributor Author

Gentle ping again now that 4.0 is out

@ConeyLiu
Copy link
Contributor

ConeyLiu commented Jul 4, 2025

Hi, @HyukjinKwon @zhengruifeng could you please review this again? We indeed encountered this problem in our production jobs when users call applyInPandas, which returns a large DataFrame.

@Kimahriman Kimahriman force-pushed the apply-in-arrow-input branch from a901806 to 4670c1d Compare August 29, 2025 11:18
@Kimahriman
Copy link
Contributor Author

Gentle ping again, it's getting tricky to keep up with/figure out new merge conflicts each time there are conflicting changes that make it in

@zhengruifeng
Copy link
Contributor

zhengruifeng commented Sep 12, 2025

Gentle ping again, it's getting tricky to keep up with/figure out new merge conflicts each time there are conflicting changes that make it in

@Kimahriman we plan to optimize the batch size in multiple UDF types, e.g.

  • SQL_GROUPED_MAP_PANDAS_UDF
  • SQL_GROUPED_MAP_ARROW_UDF
  • SQL_GROUPED_AGG_ARROW_UDF
  • SQL_GROUPED_AGG_PANDAS_UDF
  • SQL_WINDOW_AGG_PANDAS_UDF
  • SQL_WINDOW_AGG_ARROW_UDF

this is the first one for SQL_GROUPED_MAP_PANDAS_UDF /SQL_GROUPED_MAP_ARROW_UDF. Can we reuse it for the iterator API?

Regarding this PR, I think we should:
1, exclude any changes in cogroup;
2, add a new eval type for the new iterator API, because it is a user-facing change. For example, we have SQL_SCALAR_PANDAS_ITER_UDF for the iterator API in Pandas UDF.

@Kimahriman
Copy link
Contributor Author

Kimahriman commented Sep 13, 2025

I can try to update with new eval types and using type hints as the mechanism. I do think your update will fit in fine, as it's just a different way to implement the JVM side batching that I implemented here. Basically you just used a trait instead of a subclass. I do think it would still be beneficial for both batched and non-batched to go through the same code path, otherwise including more eval types will make it even more complicated

@Kimahriman
Copy link
Contributor Author

I have a new version of this with a new eval type and using type hints ready to go after #52303 gets merged

@Kimahriman
Copy link
Contributor Author

Closing in favor of #52440

@Kimahriman Kimahriman closed this Sep 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants