Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51537][CONNECT][CORE] construct the session-specific classloader based on the default session classloader on executor #50334

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ private[spark] class Executor(
val currentJars = new HashMap[String, Long]
val currentArchives = new HashMap[String, Long]
val urlClassLoader =
createClassLoader(currentJars, isStubbingEnabledForState(jobArtifactState.uuid))
createClassLoader(currentJars, isStubbingEnabledForState(jobArtifactState.uuid),
isDefaultState(jobArtifactState.uuid))
val replClassLoader = addReplClassLoaderIfNeeded(
urlClassLoader, jobArtifactState.replClassDirUri, jobArtifactState.uuid)
new IsolatedSessionState(
Expand Down Expand Up @@ -1072,7 +1073,8 @@ private[spark] class Executor(
*/
private def createClassLoader(
currentJars: HashMap[String, Long],
useStub: Boolean): MutableURLClassLoader = {
useStub: Boolean,
isDefaultSession: Boolean): MutableURLClassLoader = {
// Bootstrap the list of jars with the user class path.
val now = System.currentTimeMillis()
userClassPath.foreach { url =>
Expand All @@ -1084,10 +1086,12 @@ private[spark] class Executor(
val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
}
createClassLoader(urls, useStub)
createClassLoader(urls, useStub, isDefaultSession)
}

private def createClassLoader(urls: Array[URL], useStub: Boolean): MutableURLClassLoader = {
private def createClassLoader(urls: Array[URL],
useStub: Boolean,
isDefaultSession: Boolean): MutableURLClassLoader = {
logInfo(
log"Starting executor with user classpath" +
log" (userClassPathFirst =" +
Expand All @@ -1096,33 +1100,45 @@ private[spark] class Executor(
)

if (useStub) {
createClassLoaderWithStub(urls, conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES))
createClassLoaderWithStub(urls, conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES), isDefaultSession)
} else {
createClassLoader(urls)
createClassLoader(urls, isDefaultSession)
}
}

private def createClassLoader(urls: Array[URL]): MutableURLClassLoader = {
private def createClassLoader(urls: Array[URL],
isDefaultSession: Boolean): MutableURLClassLoader = {
// SPARK-51537: The isolated session must *inherit* the classloader from the default session,
// which has already included the global JARs specified via --jars. For Spark plugins, we
// cannot simply add the plugin JARs to the classpath of the isolated session, as this may
// cause the plugin to be reloaded, leading to potential conflicts or unexpected behavior.
val loader = if (isDefaultSession) systemLoader else defaultSessionState.replClassLoader
if (userClassPathFirst) {
new ChildFirstURLClassLoader(urls, systemLoader)
new ChildFirstURLClassLoader(urls, loader)
} else {
new MutableURLClassLoader(urls, systemLoader)
new MutableURLClassLoader(urls, loader)
}
}

private def createClassLoaderWithStub(
urls: Array[URL],
binaryName: Seq[String]): MutableURLClassLoader = {
binaryName: Seq[String],
isDefaultSession: Boolean): MutableURLClassLoader = {
// SPARK-51537: The isolated session must *inherit* the classloader from the default session,
// which has already included the global JARs specified via --jars. For Spark plugins, we
// cannot simply add the plugin JARs to the classpath of the isolated session, as this may
// cause the plugin to be reloaded, leading to potential conflicts or unexpected behavior.
val loader = if (isDefaultSession) systemLoader else defaultSessionState.replClassLoader
if (userClassPathFirst) {
// user -> (sys -> stub)
val stubClassLoader =
StubClassLoader(systemLoader, binaryName)
StubClassLoader(loader, binaryName)
new ChildFirstURLClassLoader(urls, stubClassLoader)
} else {
// sys -> user -> stub
val stubClassLoader =
StubClassLoader(null, binaryName)
new ChildFirstURLClassLoader(urls, stubClassLoader, systemLoader)
new ChildFirstURLClassLoader(urls, stubClassLoader, loader)
}
}

Expand Down Expand Up @@ -1229,7 +1245,8 @@ private[spark] class Executor(
}
if (renewClassLoader) {
// Recreate the class loader to ensure all classes are updated.
state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs, useStub = true)
state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs,
useStub = true, isDefaultState(state.sessionUUID))
state.replClassLoader =
addReplClassLoaderIfNeeded(state.urlClassLoader, state.replClassDirUri, state.sessionUUID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,32 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
}
}
}

test("SPARK-51537 Executor isolation session classloader inherits from " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can also add a test (here or elsewhere) for the plugin issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we file a followup for this? For now, I've no idea about how to adding the plugin in this test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

"default session classloader") {
assume(jarURL2 != null)
sc = new SparkContext(new SparkConf()
.setAppName("test")
.setMaster("local")
.set("spark.jars", jar2))

// TestHelloV2's test method returns '2'
val artifactSetWithHelloV2 = new JobArtifactSet(
Some(JobArtifactState(uuid = "hello2", replClassDirUri = None)),
jars = Map.empty,
files = Map.empty,
archives = Map.empty
)

JobArtifactSet.withActiveJobArtifactState(artifactSetWithHelloV2.state.get) {
sc.parallelize(1 to 1).foreach { i =>
val cls = Utils.classForName("com.example.Hello$")
val module = cls.getField("MODULE$").get(null)
val result = cls.getMethod("test").invoke(module).asInstanceOf[Int]
if (result != 2) {
throw new RuntimeException("Unexpected result: " + result)
}
}
}
}
}