[SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions#55712
[SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions#55712sven-weber-db wants to merge 1 commit intoapache:masterfrom
Conversation
d4409b8 to
83e1033
Compare
83e1033 to
184fc44
Compare
| import org.apache.spark.udf.worker.UDFWorkerSpecification | ||
| import org.apache.spark.udf.worker.core.{UDFWorkerManager, WorkerDispatcher, WorkerLogger} | ||
|
|
||
| class DirectUDFWorkerManager( |
There was a problem hiding this comment.
The manager may not care about if the backend of a dispatcher. I suggest:
- Call it
DispatcherManager, as it is a central place to hold the dispatchers - instead of creating the dispatcher here, register a created dispatcher from callsite, so as this manager does not have to care about the creation logic or the backend of the dispatcher.
There was a problem hiding this comment.
This seems to be a dead class not used anywhere, let's remove it
There was a problem hiding this comment.
This seems to be a dead class not used anywhere, let's remove it
This class is the class that would be consumed in SparkEnv as the current implementation of the DispatcherManager spawning direct workers. I agree it is not yet complete and its implementation will need to be changed once the gRPC protocol lands. However, it will have to exist. Therefore, I would propose to keep it with the current todo and to replace the implementation once your changes land.
Call it DispatcherManager, as it is a central place to hold the dispatchers
Ok, happy to rename it.
Instead of creating the dispatcher here, register a created dispatcher from callsite
I don't think this works as dispatchers depend on the workerSpec, which is a runtime value. Therefore, we cannot pass one instance of a dispatcher but we need to be able to generate different instances at runtime. We could introduce a DispatcherFactory as an additional abstraction that is passed to the DispatcherManager instead of using subclasses, if you prefer this.
| // https://github.com/apache/spark/pull/55657 | ||
|
|
||
| @Experimental | ||
| private[direct] class SimpleWorkerConnection( |
There was a problem hiding this comment.
let's wait for the other PR to land and we can avoid touching this part of logic in this PR.
|
|
||
| // Must be called while holding `lock`. | ||
| private def handleSessionTermination( | ||
| workerSpec: UDFWorkerSpecification |
There was a problem hiding this comment.
maybe we shall also pass the session object here?
What changes were proposed in this pull request?
This PR implements a
UDFWorkerManagerclass in the new/udfpackage that was initiated by SPIP SPARK-55278. The purpose of the new Manager class is to provide a single entry-point for Spark with which a UDF session to a external UDF worker can be created, based on aWorkerSpecificationinstance. This manager and entry-point will be used by follow-up PRs to implement new, language agnostic Catalyst nodes.Why are the changes needed?
The
UDFWorkerManagerserves two main purposes:WorkerDispachterclasses - depending on theUDFWorkerSpecificationthey are created for. This is required because the newly proposed UDF framework from SPIP SPARK-55278, enables clients to specify different UDF dispatchers for their UDFs. This implies:2.1. Multiple, different dispatchers can exist at the same time
-> The right one needs to be selected to create a UDF session
2.2. Dispatcher lifetime needs to be managed
-> Dispatchers and their resources need to be cleaned-up if they are no longer needed by clients
Does this PR introduce any user-facing change?
No - All changes are marked as
Experimentaland not yet consumed.How was this patch tested?
New unit-tests where added for the changes in the
UDFWorkerManagerandWorkerSessionWas this patch authored or co-authored using generative AI tooling?
Partially. However, the code was manually reviewed and adjusted.