Skip to content

[WIP][SPARK-56661] Introducing logical and physical planning nodes for language-agnostic Spark UDFs#55611

Draft
sven-weber-db wants to merge 2 commits intoapache:masterfrom
sven-weber-db:sven-weber_data/SPARK-56661
Draft

[WIP][SPARK-56661] Introducing logical and physical planning nodes for language-agnostic Spark UDFs#55611
sven-weber-db wants to merge 2 commits intoapache:masterfrom
sven-weber-db:sven-weber_data/SPARK-56661

Conversation

@sven-weber-db
Copy link
Copy Markdown
Contributor

@sven-weber-db sven-weber-db commented Apr 29, 2026

What changes were proposed in this pull request?

This PR introduces new logical and physical Catalyst nodes for language-agnostic User Defined Functions (UDF) as part of SPIP SPARK-55278, which proposes language-agnostic UDFs.

As a first step towards the goal of language-agnostic UDFs, we want to target mapPartition UDFs like pyspark.sql.DataFrame.mapInArrow, pyspark.RDD.mapPartitions, or pyspark.sql.DataFrame.mapInArrow. The overarching goal is to deprecate the current, language-specific Catalyst nodes (like mapInArrow). However, for now, the new nodes will exist in addition to the old ones until the new framework has reach maturity.

In summary, this PR introduces:

  • A new Catalyst Expression, ExternalUDFExpression, which captures language-agnostic UDF properties (payload, name, etc.)
  • A new Catalyst logical node, ExternalUDF, which serves as a base class for all language-agnostic UDF nodes
  • A new Catalyst logical node, MapPartitionExternalUDF, which is the new, language-agnostic map partition node
  • Catalyst physical nodes for both logical nodes
  • WorkerSessionFactory - A factory class to generate new worker sessions using the new UDF worker approach

None of the changes introduced above are currently consumed in Spark.

Why are the changes needed?

This is the first step from the Spark planning side to enable UDFs written in any language.

This is the first step toward language-agnostic UDF execution for Spark. Existing physical and logical planning nodes need to be replaced eventually to achieve this goal as they make language-specific assumptions.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New unit-tests were added.

Was this patch authored or co-authored using generative AI tooling?

Partially. However, the code was manually reviewed and adjusted.

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661 branch from 40ef055 to 0599ac7 Compare April 30, 2026 11:28
@sven-weber-db sven-weber-db changed the title [WIP][SPARK-56661] Adding language-agnosic UDF logical and physical planning nodes [SPARK-56661] Adding language-agnosic UDF logical and physical planning nodes Apr 30, 2026
@sven-weber-db sven-weber-db changed the title [SPARK-56661] Adding language-agnosic UDF logical and physical planning nodes [SPARK-56661] Adding logical and physical planning nodes for language-agnostic Spark UDFs Apr 30, 2026
@sven-weber-db sven-weber-db changed the title [SPARK-56661] Adding logical and physical planning nodes for language-agnostic Spark UDFs [SPARK-56661] Introducing logical and physical planning nodes for language-agnostic Spark UDFs Apr 30, 2026
@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661 branch 2 times, most recently from ffcd9e0 to c372bd1 Compare April 30, 2026 13:54
case class MapPartitionExternalUDF(
workerSpec: UDFWorkerSpecification,
sessionFactory: WorkerSessionFactory,
functionExpr: ExternalUDFExpression,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A typical mapPartitions takes just a function, rather than an expression. The difference is that:

  1. In udf expressions: select some_udf(col + 1), some_udf2(col2 + col3), we have two UDF expressions, each expression is mapped to one function, and has its inputs from other expressions.
  2. for mapPartitions, it's usually input rows as whole and output rows as whole, it usually looks like
    df.mapInArrow(some_func), df.mapPartitions(some_lambda). It directly relies on one udf function, rather than depending on an expression.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not yet connected to pyspark - do we plan to do that in a followup? For example, we introduce a flag that will ask pyspark to plan mapInArrow / mapInPandas to this node.

There we can also connect to the worker capability - only when the flag is on and the worker supports mapPartitions, then we can plan using this node.

Copy link
Copy Markdown
Contributor Author

@sven-weber-db sven-weber-db May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A typical mapPartitions takes just a function, rather than an expression. The difference is that:

Thanks for the pointers and explanation. In the initial version, the name Expression was used wrongly. It should have just been ExternalUserDefinedFunction - representing a single UDF. I changed this in the new version of the PR. MapPartitions now receives a single ExternalUserDefinedFunction instance and there is no sharing anymore with the ExternalUDF base trait.

This is not yet connected to pyspark - do we plan to do that in a followup?

Initially, I was planning to do this as a follow-up. I have now extended the PR to also include the PySpark Connection with a Flag for MapInPandas. Let's get this working end-to-end, and then we can extend to other operators.

def workerSpec: UDFWorkerSpecification

/** The UDF expression describing the function to execute. */
def functionExpr: ExternalUDFExpression
Copy link
Copy Markdown
Contributor

@haiyangsun-db haiyangsun-db Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Also how do you plan to express multiple UDFs (for single udf cases) in the same node with one single functionExpr? I can imagine that we might compact chained UDFs into one single expressions (where inner udf calls are leaf expression nodes), but we coul still have parallel UDF expressions that generate multiple outputs.

For example: select foo(bar(col1)), baz(col2) from some_table, you may have one udf node that contains two UDF expressions: 1 representing foo(bar(col1), while the other represents baz(col2)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as described above: This was removed. Nodes which require a UDF expression/multiple UDFs will receive a LIST of ExternalUserDefinedFunction.

// All access guarded by `synchronized(lock)`.
private val lock = new Object
private val dispatchers =
new HashMap[UDFWorkerSpecification, DispatcherEntry]()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use concurrent hashmap here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use a concurrent hash map but its not super straight forward here. In the close function, we need to fetch the current session count, evaluate it, and potentially remove the dispatcher when the count is 0. All of this logic needs to protected from potential, concurrent updates on the same key. Therefore, a classical lock might be simpler here.

If you have a way to express this with a concurrent hash map I am happy to adjust the code

abstract class WorkerSessionFactory {

private class DispatcherEntry(val dispatcher: WorkerDispatcher) {
var activeSessionCount: Int = 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If dispatcher already directly generates session, why not keep the active session count a property for every dispatcher? Then we can get rid of this dispatcher entry.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are the following reasons to keep the activeSessionCount here instead of the Dispatcher:

  1. The Dispatcher is a factory class which pushes the cleanup of sessions to its consumers. If we were to track the activeSessionCount in the Dispatcher, this would mean we need to track session closures there as well. This contradits the current session class design and it could easily create race conditions depending on the exact caller order and when the count value is updated.
  2. The WorkerSessionnFactory provides a single place where the logic can be implemented - not duplicating code across many dispatchers

@@ -0,0 +1,128 @@
/*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do feel like we should break this PR into two parts - the catalyst changes should be separate from the changes in "/udf" module

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed. Lets split it into two parts

@sven-weber-db sven-weber-db changed the title [SPARK-56661] Introducing logical and physical planning nodes for language-agnostic Spark UDFs [WIP[SPARK-56661] Introducing logical and physical planning nodes for language-agnostic Spark UDFs Apr 30, 2026
@sven-weber-db sven-weber-db changed the title [WIP[SPARK-56661] Introducing logical and physical planning nodes for language-agnostic Spark UDFs [WIP][SPARK-56661] Introducing logical and physical planning nodes for language-agnostic Spark UDFs Apr 30, 2026
@sven-weber-db sven-weber-db marked this pull request as draft April 30, 2026 15:11
* @param resultId Unique expression ID for this invocation.
*/
@Experimental
case class ExternalUDFExpression(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems unused yet in this PR, we need it when introducing the udf expression support

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new changes it gets used now!

Copy link
Copy Markdown
Contributor Author

@sven-weber-db sven-weber-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded to some comments. PR still needs more work. Will respond to the remaining comments in a follow-up

abstract class WorkerSessionFactory {

private class DispatcherEntry(val dispatcher: WorkerDispatcher) {
var activeSessionCount: Int = 0
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are the following reasons to keep the activeSessionCount here instead of the Dispatcher:

  1. The Dispatcher is a factory class which pushes the cleanup of sessions to its consumers. If we were to track the activeSessionCount in the Dispatcher, this would mean we need to track session closures there as well. This contradits the current session class design and it could easily create race conditions depending on the exact caller order and when the count value is updated.
  2. The WorkerSessionnFactory provides a single place where the logic can be implemented - not duplicating code across many dispatchers

// All access guarded by `synchronized(lock)`.
private val lock = new Object
private val dispatchers =
new HashMap[UDFWorkerSpecification, DispatcherEntry]()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use a concurrent hash map but its not super straight forward here. In the close function, we need to fetch the current session count, evaluate it, and potentially remove the dispatcher when the count is 0. All of this logic needs to protected from potential, concurrent updates on the same key. Therefore, a classical lock might be simpler here.

If you have a way to express this with a concurrent hash map I am happy to adjust the code

@@ -0,0 +1,128 @@
/*
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed. Lets split it into two parts

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661 branch 3 times, most recently from 783453d to 40219ec Compare May 5, 2026 16:50

private val initialized = new AtomicBoolean(false)
private val processed = new AtomicBoolean(false)
private val closed = new AtomicBoolean(false)
private val completionListeners =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why CopyOnWrite

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. You are right, that's not the best implementation here. When revisiting this code, I also noticed that the implementation for the completionListeners is not 100% correct: A listener will not be called when the session was closed before the listener was added.

I changed the code to use a locking object and a normal ArrayList instead. Additionally, completionListeners are now also fired when the session was already closed/cancled before. I also added some tests to validate the completion listener logic.

val func = funcCol.expr
Dataset.ofRows(
sparkSession,
val output = toAttributes(func.dataType.asInstanceOf[StructType])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapInArrow (below) can also reuse the code here. The only different thing is the eval type.
This reminds us that the eval type needs to be wired into the ExternalUserDefinedFunction.

Currently, the proto explicitly supports eval type as part of UdfPayload: https://github.com/apache/spark/pull/55657/changes#diff-9df5dd8319af6bd7238d49a6fd8f082e5b46cb92758dfe48ec35f1b085b16ef1R410

We should introduce an explicit eval_type (optional) to ExternalUserDefinedFunction - when payload itself is not enough, eval_type should be set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline: Adding support for mapInArrow is currently blocked on the introduction of the gRPC protocol and its implementation. I will raise the PR for the first part of this change (changes in udf). When gRPC has landed till the second part is required we can address this in one PR. Otherwise, there will be a follow-up.

functionPayload = functionExpr.payload,
inputSchema = Array.empty,
outputSchema = Array.empty))
session.close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need a placeholder for session.process - we can't simply call session.close() after session.process as it is an asynchronous operator that returns an operator.

We need to properly call session.cancel(), session.close() in the lifecyle of a task here:

  1. in most graceful case, we should trigger session.close() when the result iterator has been exhausted.
  2. in case of task failure, we should be able to trigger session.cancel() in the failure listener, then call session.close()
  3. in case of graceful task completion before result iterator is exhausted (e.g., due to limit), we should trigger a cancel and then a close in task completion listener.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed. This is a temporary implementation and there is an open TODO linking the overarching SPIP ticket to revisit this code and implement it properly. This also depends on the exact cancel/stop semantics that will be implemented with the gRPC protocol. These concerns will be addressed in follow-up PRs once the gRPC protocol lands

* [[org.apache.spark.api.python.PythonRunner]]).
*/
@Experimental
object PythonUDFWorkerSpecification {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this piece feel like testable - ensure that the direct worker dispatcher can use this spec to launch a functioning pyspark worker.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is not yet testable as the PySpark worker is not yet fully functional. We can create a python process that runs the worker. However, the worker does not connect to the socket yet - this is WIP as some PySpark changes are required. As discussed offline: I will add some planning test that ensure the new planning node is used when the flag is enabled. Further (e2e) tests should be added in the follow-up PR that also implement the end-to-end functionality.

* [[Logging]] trait so that worker log messages go through the
* standard Spark logging pipeline.
*/
private[spark] class SparkUDFWorkerLogger
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need a pass-through interface? Can we just use Spark's logging?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is intended that the udf packages does not have any spark dependencies and can be consumed as a standalone package

*/
private val externalUDFSessionFactory: WorkerSessionFactory = createUDFWorkerSessionFactory()

private def createUDFWorkerSessionFactory() : WorkerSessionFactory = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect to use this method anywhere else?
If not, can we just init externalUDFSessionFactory with the function body?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will probably only be used in SparkEnv. However, it is intended that we will create different session/dispatcher factories, depending on certain environment or Spark configurations. Therefore, I think a dedicated method is more suitable here as it can easily be extended with creation conditions in the future

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661 branch from 40219ec to d284486 Compare May 6, 2026 13:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants