Skip to content

Conversation

@EnricoMi
Copy link

@EnricoMi EnricoMi commented Jun 12, 2025

What changes were proposed in this pull request?

On the presence of a fallback storage, ShuffleBlockFetcherIterator can optimistically try to read a block from the fallback storage, as it might have been migrated from a decommissioned executor to the fallback storage.

Note: This optimistic attempt to find the missing shuffle data on the fallback storage would collide with some replication delay handled in #16.

Why are the changes needed?

In a kubernetes environment, executors may be decommissioned. With a fallback storage configured, shuffle data will be migrated to other executors or the fallback storage. Tasks that start during a decommissioning phase of another executor might read blocks from that executor after it has been decommissioned. The task does not know the new location of the migrated block. Given a fallback storage is configured, it could optimistically try to read the block from the fallback storage.

This avoids a stage retry, which otherwise is an expensive way to fetch the current block address after a block migration.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test and manual testing in a kubernetes setup.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Jun 12, 2025
@EnricoMi EnricoMi changed the title Attempt to read missing block from fallback storage [SPARK-52507][K8S] Attempt to read missing block from fallback storage Jun 17, 2025
dongjoon-hyun and others added 28 commits August 6, 2025 19:44
…s `SlowHiveTest`

### What changes were proposed in this pull request?

This PR aims to mark `DynamicPartitionPruningHive*Suite*` as `SlowHiveTest`.
- DynamicPartitionPruningHiveScanSuiteAEOff
- DynamicPartitionPruningHiveScanSuiteAEOn

### Why are the changes needed?

To balance Hive CIs.

| JOB NAME | BEFORE | AFTER |
|---|---|---|
| hive - slow | 46m | 51m |
| hive - other | 89m | 76m |

**BEFORE (master branch)**
<img width="627" height="99" alt="Screenshot 2025-08-06 at 15 52 52" src="https://github.com/user-attachments/assets/b2ea8930-c5f7-4700-b5cb-c7b75f040fac" />

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Check the CI result on this PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51888 from dongjoon-hyun/SPARK-53162.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR aims to promote the existing exit code, 101, to the official `SparkExitCode` at Apache Spark 4.1.0.

### Why are the changes needed?

`SparkSubmit` has been exposing the exit code, `101`, in case of `ClassNotFoundException` or `NoClassDefFoundError`. We had better register this as `SparkExitCode` class for consistency.

https://github.com/apache/spark/blob/46fd5258a16523637f7ac5fa7ece16f626816454/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1083

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51892 from dongjoon-hyun/SPARK-53165.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…es` object

### What changes were proposed in this pull request?

This PR aims to use `SparkExitCode.EXIT_FAILURE` instead of the magic number, `1`, in the source code.

```scala
- throw SparkUserAppException(1)
+ throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
```

### Why are the changes needed?

We had better use the predefined constant variable instead of magic numbers in the code:
- `SparkUserAppException` is designed to have `exitCode`.
- `SparkExitCode` defines `EXIT_FAILURE` for `1`.

https://github.com/apache/spark/blob/46fd5258a16523637f7ac5fa7ece16f626816454/common/utils/src/main/scala/org/apache/spark/SparkException.scala#L156

https://github.com/apache/spark/blob/46fd5258a16523637f7ac5fa7ece16f626816454/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala#L26

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51893 from dongjoon-hyun/SPARK-53166.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?
We change the behavior of the `Murmur3Hash` and `XxHash64` catalyst expressions to be collation agnostic (i.e. collation-unaware). Also, we introduce two new internal catalyst expressions: `CollationAwareMurmur3Hash` and `CollationAwareXxHash64`, which are collation aware and take the collation of the string into consideration when hashing collated strings.

Furthermore, we replace `Murmur3Hash` and `XxHash64` in expressions where the hash expressions should be collation aware with `CollationAwareMurmur3Hash` and `CollationAwareXxHash64`. This is necessary for example when we do hash partitioning. Moreover, we change the way hashing is done for collated strings for the internal HiveHash expression to be consistent with the rest of the hashing expressions (the HiveHash expression is meant to always be collation-aware).

Finally, we add a kill switch (the SQL config is `COLLATION_AGNOSTIC_HASHING_ENABLED`) that allows to recover the previous behavior of `Murmur3Hash` and `XxHash64` as user-facing expressions. The kill switch has no effect on the new collation aware hashing expressions, or the HiveHash expression, which are internal and need to follow the new collation aware behavior.

### Why are the changes needed?
The `Murmur3Hash` and `XxHash64` catalyst expressions, when applied to collated strings, currently always take into consideration the collation of the string, that is they are collation aware. This is not the correct behavior, and these expressions should be collation agnostic by default instead.

### Does this PR introduce _any_ user-facing change?
Yes, see the detailed explanation above.

### How was this patch tested?
Updated existing tests in relevant suites: CollationFactorySuite, DistributionSuite, and HashExpressionsSuite. Also verified that the CollationSuite suite passes.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51521 from uros-db/collation-hashing.

Lead-authored-by: Uros Bojanic <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
Prior to this change, `EXPLAIN FORMATTED` on query e.g.:

```
SELECT *
FROM catalog.tbl1 t1
    JOIN catalog.tbl2 t2 ON t1.id1 = t2.id2
    JOIN catalog.tbl3 t3 ON t2.id2 = t3.id3
    JOIN catalog.tbl4 t4 ON t3.id3 = t4.id4;
```

looked like:
```
PushedJoins: [join_pushdown_catalog.tbl1, join_pushdown_catalog.tbl2, join_pushdown_catalog.tlb3, join_pushdown_catalog.tlb4]
```

With the change from PR, the output of `EXPLAIN FORMATTED` would be:

```
 == Physical Plan ==
   *(1) Project [ID#x, AMOUNT#x, ADDRESS#x, ID_1#x AS ID#x, NEXT_ID#x, SALARY#x, SURNAME#x, id_3#x AS id#x, id_1_2#x AS id_1#x, id_2#x, id_1_1#x, sid#x, id_4#x AS id#x, id_1_3#x AS id_1#x, id_2_2#x AS id_2#x, id_2_1#x, Sid_1#x AS Sid#x]
   +- *(1) Scan JDBC v1 Relation from v2 scan join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1[ID#x,AMOUNT#x,ADDRESS#x,ID_1#x,NEXT_ID#x,SALARY#x,SURNAME#x,id_3#x,id_1_2#x,id_2#x,id_1_1#x,sid#x,id_4#x,id_1_3#x,id_2_2#x,id_2_1#x,Sid_1#x] PushedFilters: [id_3 = (id_4 + 1)], PushedJoins:
   [L]: PushedFilters: [ID_1 = (id_3 + 1)]
        PushedJoins:
        [L]: PushedFilters: [ID = (ID_1 + 1)]
             PushedJoins:
             [L]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1
                  PushedFilters: [ID IS NOT NULL]
             [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_2
                  PushedFilters: [ID IS NOT NULL]
        [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_3
             PushedFilters: [id IS NOT NULL]
   [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_4
        PushedFilters: [id IS NOT NULL]
   , ReadSchema: struct<ID:int,AMOUNT:decimal(10,2),ADDRESS:string,ID_1:int,NEXT_ID:int,SALARY:decimal(10,2),SURNAME:string,id_3:int,id_1_2:int,id_2:int,id_1_1:int,sid:int,id_4:int,id_1_3:int,id_2_2:int,id_2_1:int,Sid_1:int>
```

PushedFilters on top of PushedJoins are actually join conditions.
It can be seen that the name of `Scan JDBC v1 Relation from v2 scan` is ` catalog.tbl1`. This should be fixed as well, but it won't be a part of this PR.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#51781 from PetarVasiljevic-DB/improve_explain_command_for_dsv2_join_pushdown.

Authored-by: Petar Vasiljevic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…ad of `Files.toByteArray`

### What changes were proposed in this pull request?

This PR aims to use Java 9+ `java.nio.file.Files.readAllBytes` instead of `com.google.common.io.Files.toByteArray`.

In addition, a new Scalastyle rule is added to ban `Files.toByteArray` for consistency.

### Why are the changes needed?

The built-in Java method is as good as 3rd party library.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51891 from dongjoon-hyun/SPARK-53164.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…arameter

### What changes were proposed in this pull request?

This PR aims to improve `SparkUserAppException` to have `cause` parameter additionally.

```scala
-private[spark] case class SparkUserAppException(exitCode: Int)
+private[spark] case class SparkUserAppException(exitCode: Int, cause: Throwable = null)
```

### Why are the changes needed?

`SparkUserAppException` is used in many places; `SparkSubmit`, `PythonRunner`, `RRunner`, `SparkPipelines`. Although this is defined as `private[spark]`, a new parameter `cause` can be used to enrich the information from the user application to upper layers internally.

### Does this PR introduce _any_ user-facing change?

No, this is defined as `private[spark]`.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51897 from dongjoon-hyun/SPARK-53170.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?
This PR improves the UTF8String repeat logic for some special cases

### Why are the changes needed?
performance improvement

### Does this PR introduce _any_ user-facing change?
now

### How was this patch tested?
Passing existing UT and benchmarked like

```
scala> spark.time((1 to 10000000).foreach(_ => UTF8String.fromStri
ng("A").repeat(1024)))
Time taken: 784 ms
scala> spark.time((1 to 10000000).foreach(_ => repeat(UTF8String.f
romString("A"), 1024)))
Time taken: 432 ms
````

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#51900 from yaooqinn/SPARK-53171.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?
Upgrade PyArrow to 21.0.0

### Why are the changes needed?
to test against the latest pyarrow

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#51890 from zhengruifeng/pyarrow_21.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…s file

### What changes were proposed in this pull request?

This PR modifies the `SparkSubmitCommandBuilder` to use "effective config" to evaluate `isRemote` instead of the passed args only. This makes `spark.remote` configured in the properties file(`spark-defaults.conf` or other files specified by `--properties-file`) effective.

### Why are the changes needed?

```
$ sbin/start-connect-server.sh
$ cat > conf/spark-connect.conf <<EOF
spark.reomte=sc://localhost:15002
EOF
$ bin/spark-shell --properties-file conf/spark-connect.conf
```

```
25/08/07 13:35:56 ERROR Main: Failed to initialize Spark session.
org.apache.spark.SparkException: A master URL must be set in your configuration
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:421)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:3055)
	at org.apache.spark.sql.classic.SparkSession$Builder.$anonfun$build$2(SparkSession.scala:839)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.classic.SparkSession$Builder.build(SparkSession.scala:830)
	at org.apache.spark.sql.classic.SparkSession$Builder.getOrCreate(SparkSession.scala:859)
	at org.apache.spark.sql.classic.SparkSession$Builder.getOrCreate(SparkSession.scala:732)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:923)
	at org.apache.spark.repl.Main$.createSparkSession(Main.scala:116)
```

### Does this PR introduce _any_ user-facing change?

Yes, a kind of bug fix.

### How was this patch tested?

UT is added, plus manually test:

```
$ sbin/start-connect-server.sh
$ cat > conf/spark-connect.conf <<EOF
spark.reomte=sc://localhost:15002
EOF
$ bin/spark-shell --properties-file conf/spark-connect.conf
```
The previously failed case now works.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51896 from pan3793/SPARK-53167.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…ith a project

### What changes were proposed in this pull request?

This patch fixes the optimization rule `RemoveRedundantAggregates`.

### Why are the changes needed?

The optimizer rule `RemoveRedundantAggregates` removes redundant lower aggregation from a query plan and replace it with a project of referred non-aggregate expressions. However, if the removed aggregation is a global one, that is not correct because a project is different with a global aggregation in semantics.

For example, if the input relation is empty, a project might be optimized to an empty relation, while a global aggregation will return a single row.

### Does this PR introduce _any_ user-facing change?

Yes, this fixes a user-facing bug. Previously, a global aggregation under another aggregation might be treated as redundant and replaced as a project with non-aggregation expressions. If the input relation is empty, the replacement is incorrect and might produce incorrect result. This patch adds a new unit test to show the difference.

### How was this patch tested?

Unit test, manual test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#51884 from viirya/fix_remove_redundant_agg.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
…memory size from resource profile

### What changes were proposed in this pull request?

Add two APIs to get overhead memory size and offheap memory size from resource profile.

### Why are the changes needed?

Simplify retrieving overhead memory size and off-heap memory size.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The existing tests with modifications.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51870 from PHILO-HE/add-rp-api.

Authored-by: PHILO-HE <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…r `level` for `SparkFunSuite#withLogAppender` from `None` to `Some(Level.INFO)`

### What changes were proposed in this pull request?
This pr changes the default value of the input parameter `level: Option[Level]` for the `SparkFunSuite#withLogAppender` function from `None` to `Some(Level.INFO)`, in order to decouple the relevant tests from the `rootLogger.level` configuration in the `log4j2.properties` .

### Why are the changes needed?
Suppose, for some reason, we change the `rootLogger.level` configuration value in `sql/core/src/test/resources/log4j2.properties` from `info` to `warn`. Subsequently, when running unit tests, failures similar to the following may occur:

```
build/sbt clean "sql/testOnly org.apache.spark.sql.execution.QueryExecutionSuite"

[info] - Logging plan changes for execution *** FAILED *** (63 milliseconds)
[info]   testAppender.loggingEvents.exists(((x$10: org.apache.logging.log4j.core.LogEvent) => x$10.getMessage().getFormattedMessage().contains(expectedMsg))) was false (QueryExecutionSuite.scala:243)
[info]   org.scalatest.exceptions.TestFailedException:
...
```

Similar issues may also arise in other test cases such as `AdaptiveQueryExecSuite`. Therefore, this PR modifies the default value of the `level` parameter to avoid such test coupling problems.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Passed Github Actions
- Manually verified that the aforementioned test cases no longer fail

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51895 from LuciferYang/SPARK-53168.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
…of File Appender to`" from `log4j2.properties`

### What changes were proposed in this pull request?
This pr removes the comments related to "`Set the logger level of File Appender to`" from `log4j2.properties`, as the relevant configuration is self-descriptive and the comments often do not match the actual configuration.

### Why are the changes needed?
Remove redundant comments.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass Github Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51894 from LuciferYang/minor-sql-log4j-config.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
….BaseEncoding`

### What changes were proposed in this pull request?

This PR aims to use Java `Base64` instead of `com.google.common.io.BaseEncoding`.

In addition, new Scalastyle and Checkstyle rules are added to ban `com.google.common.io.BaseEncoding` in order to prevent a future regression.

### Why are the changes needed?

Java implementation is **over 2x faster** than Google one.

```scala
scala> val s = "a".repeat(5_000_000).getBytes(java.nio.charset.StandardCharsets.UTF_8)

scala> spark.time(java.util.Base64.getDecoder().decode(java.util.Base64.getEncoder().encodeToString(s)).length)
Time taken: 18 ms
val res0: Int = 5000000

scala> spark.time(com.google.common.io.BaseEncoding.base64().decode(com.google.common.io.BaseEncoding.base64().encode(s)).length)
Time taken: 50 ms
val res1: Int = 5000000
```

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51904 from dongjoon-hyun/SPARK-53177.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…sted_arrays`

### What changes were proposed in this pull request?
this test is incorrect, but it happened to pass with special partitioning

### Why are the changes needed?
to correct a test, it fails with different envs

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci and manually check with pyspark shell

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#51898 from zhengruifeng/fix_nest_array_test.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Xinrong Meng <[email protected]>
… `CharStreams.toString`

### What changes were proposed in this pull request?

This PR aims to se `SparkStreamUtils.toString` instead of `CharStreams.toString`.

### Why are the changes needed?

`SparkStreamUtils.toString` is ***faster*** than `CharStreams.toString`.

```scala
scala> spark.time(org.apache.spark.util.SparkStreamUtils.toString(new java.io.FileInputStream("/tmp/1G.bin")).length)
Time taken: 322 ms
val res0: Int = 1073741824

scala> spark.time(com.google.common.io.CharStreams.toString(new java.io.InputStreamReader(java.nio.file.Files.newInputStream(Path.of("/tmp/1G.bin")))).length)
Time taken: 533 ms
val res1: Int = 1073741824
```

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51908 from dongjoon-hyun/SPARK-53179.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR aims to upgrade `curator` to 5.9.0.

### Why are the changes needed?

To bring the latest improvement and bug fixes.

- https://github.com/apache/curator/releases/tag/apache-curator-5.9.0
- https://github.com/apache/curator/releases/tag/apache-curator-5.8.0

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51906 from dongjoon-hyun/SPARK-53178.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…eStreams.skipFully`

### What changes were proposed in this pull request?

This PR aims to use Java `InputStream.skipNBytes` instead of `ByteStreams.skipFully`.

```scala
-    ByteStreams.skipFully(is, offset)
+    is.skipNBytes(offset)
```

### Why are the changes needed?

Java 12+ supports `skipNBytes` natively. We had better use this simple style.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51909 from dongjoon-hyun/SPARK-53180.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…ead of `ByteStreams.toByteArray`

### What changes were proposed in this pull request?

This PR aims to `SparkStreamUtils.toString` instead of `new String(ByteStreams.toByteArray(...))` pattern.

```scala
-  val decrypted = new String(ByteStreams.toByteArray(in), UTF_8)
-  assert(content === decrypted)
+  assert(content === Utils.toString(in))
```

### Why are the changes needed?

To simplify the code because we already have the same code.

https://github.com/apache/spark/blob/c77f316cae6032171936587a2dba1d0a633879ae/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala#L109-L111

### Does this PR introduce _any_ user-facing change?

No, this is a test-only change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51912 from dongjoon-hyun/SPARK-53185.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
… input param/return type

### What changes were proposed in this pull request?
Fix Python UDF not accepting collated strings as input param/return type.

### Why are the changes needed?
Bug fix.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51688 from ilicmarkodb/fix_collated_string_as_input_of_python_udf.

Authored-by: ilicmarkodb <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…catalyst.util.fileToString`

### What changes were proposed in this pull request?

This PR aims to use Java 11+ `java.nio.file.Files.readString` instead of `o.a.s.sql.catalyst.util.fileToString`. In other words, this PR removes Spark's `fileToString` method from Spark code base.

### Why are the changes needed?

Since Java 11, `Files.readString` exists. So, we don't need to maintain `fileToString` method. Note that Apache Spark always uses the default value of `encoding`, `UTF-8`.

https://github.com/apache/spark/blob/c77f316cae6032171936587a2dba1d0a633879ae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala#L51-L58

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

**BEFORE**
```
$ git grep fileToString | wc -l
      22
```

**AFTER**
```
$ git grep fileToString | wc -l
       0
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51911 from dongjoon-hyun/SPARK-53183.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…d `JavaUtils`

### What changes were proposed in this pull request?

This PR aims to support `readFully` in `SparkStreamUtils` and `JavaUtils` which is based on Java 9+ `readNBytes` API.

```java
public static void readFully(InputStream in, byte[] arr, int off, int len) throws IOException {
  if (in == null || len < 0 || (off < 0 || off > arr.length - len)) {
    throw new IllegalArgumentException("Invalid input argument");
  }
  if (len != in.readNBytes(arr, off, len)) {
    throw new EOFException("Fail to read " + len + " bytes.");
  }
}
```

### Why are the changes needed?

```scala
- ByteStreams.readFully(is, rowBuffer, 0, rowSize)
+ Utils.readFully(is, rowBuffer, 0, rowSize)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51915 from dongjoon-hyun/SPARK-53188.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…eStreams.copy`

### What changes were proposed in this pull request?

This PR aims to use Java 9+ API `InputStream.transferTo` instead of `ByteStreams.copy`.

Note that this improves `UnsafeShuffleWriter`.

### Why are the changes needed?

Java `transferTo` is **faster** than `ByteStreams.copy`.

```scala
scala> import java.io._
import java.io._

scala> spark.time(new FileInputStream("/tmp/4G.bin").transferTo(new FileOutputStream("/dev/null")))
Time taken: 5 ms
val res2: Long = 4294967296

scala> spark.time(com.google.common.io.ByteStreams.copy(new FileInputStream("/tmp/4G.bin"), new FileOutputStream("/dev/null")))
Time taken: 772 ms
val res3: Long = 4294967296
```

```scala
$ bin/spark-shell --driver-memory 12G
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
      /_/

Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 21.0.8)
...
scala> spark.time(new java.io.FileInputStream("/tmp/4G.bin").transferTo(new java.io.FileOutputStream("/tmp/4G.bin.java")))
Time taken: 1209 ms
val res0: Long = 4294967296
```

```scala
$ bin/spark-shell --driver-memory 12G
...
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
      /_/

Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 21.0.8)
...
scala> spark.time(com.google.common.io.ByteStreams.copy(new java.io.FileInputStream("/tmp/4G.bin"), new java.io.FileOutputStream("/tmp/4G.bin.google")))
Time taken: 1899 ms
val res0: Long = 4294967296
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51918 from dongjoon-hyun/SPARK-53190.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…tes` instead of `ByteStreams.toByteArray`

### What changes were proposed in this pull request?

This PR aims to use Java `InputStream.readAllBytes` instead of `ByteStreams.toByteArray` in order to improve the performance.

### Why are the changes needed?

Since Java 9+, we can use `readAllBytes` which is roughly 30% faster than `ByteStreams.toByteArray`.

**BEFORE (ByteStreams.toByteArray)**

```scala
scala> spark.time(com.google.common.io.ByteStreams.toByteArray(new java.io.FileInputStream("/tmp/1G.bin")).length)
Time taken: 386 ms
val res0: Int = 1073741824
```

**AFTER (InputStream.readAllBytes)**

```scala
scala> spark.time(new java.io.FileInputStream("/tmp/1G.bin").readAllBytes().length)
Time taken: 248 ms
val res0: Int = 1073741824
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51919 from dongjoon-hyun/SPARK-53191.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…tests

### What changes were proposed in this pull request?

Set -XX:ErrorFile to build/target directory for tests

### Why are the changes needed?

When a fatal error occurs, hs_err_pidXXX.log will be uploaded with other logs for troubleshooting, while we can only see the header section now

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test locally

```
# An error report file with more information is saved as:
# /Users/hzyaoqin/spark/sql/hive/target/hs_err_pid33707.log
#
```

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#51922 from yaooqinn/SPARK-53194.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…eStreams.read`

### What changes were proposed in this pull request?

This PR aims to use Java 9+ `InputStream.readNBytes` API instead of `ByteStreams.read`.

### Why are the changes needed?

To simplify the code by using native Java API.

```scala
- var numBytes = ByteStreams.read(gzInputStream, buf, 0, bufSize)
+ var numBytes = gzInputStream.readNBytes(buf, 0, bufSize)
```

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51923 from dongjoon-hyun/SPARK-53195.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…of `ByteStreams.nullOutputStream`

### What changes were proposed in this pull request?

This PR aims to use Java 11+ `OutputStream.nullOutputStream` instead of `ByteStreams.nullOutputStream`.

### Why are the changes needed?

We had better use native JDK APIs when it's available.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51924 from dongjoon-hyun/SPARK-53196.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
JoshRosen and others added 10 commits September 1, 2025 10:32
…ly call System.exit after user code main method exits

This PR is based on apache#46889 authored by JoshRosen

### What changes were proposed in this pull request?
This PR adds a new SparkConf flag option, `spark.submit.callSystemExitOnMainExit` (default false), which when true will cause SparkSubmit to call `System.exit()` in the JVM once the user code's main method has exited (for Java / Scala jobs) or once the user's Python or R script has exited.

### Why are the changes needed?
This is intended to address a longstanding issue where `spark-submit` runs might hang after user code has completed:

[According to Java’s java.lang.Runtime docs](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Runtime.html#shutdown):

> The Java Virtual Machine initiates the shutdown sequence in response to one of several events:
>
> 1. when the number of [live](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Thread.html#isAlive()) non-daemon threads drops to zero for the first time (see note below on the JNI Invocation API);
> 2. when the Runtime.exit or System.exit method is called for the first time; or
> 3. when some external event occurs, such as an interrupt or a signal is received from the operating system.

For Python and R programs, SparkSubmit’s PythonRunner and RRunner will call System.exit() if the user program exits with a non-zero exit code (see [python](https://github.com/apache/spark/blob/d5c33c6bfb5757b243fc8e1734daeaa4fe3b9b32/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L101-L104) and [R](https://github.com/apache/spark/blob/d5c33c6bfb5757b243fc8e1734daeaa4fe3b9b32/core/src/main/scala/org/apache/spark/deploy/RRunner.scala#L109-L111) runner code).

But for Java and Scala programs, plus any successful R or Python programs, Spark will not automatically call System.exit.

In those situation, the JVM will only shutdown when, via event (1), all non-[daemon](https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java) threads have exited (unless the job is cancelled and sent an external interrupt / kill signal, corresponding to event (3)).

Thus, non-daemon threads might cause logically-completed spark-submit jobs to hang rather than completing.

The non-daemon threads are not always under Spark's own control and may not necessarily be cleaned up by `SparkContext.stop()`.

Thus, it is useful to have an opt-in functionality to have SparkSubmit automatically call `System.exit()` upon main method exit (which usually, but not always, corresponds to job completion): this option will allow users and data platform operators to enforce System.exit() calls without having to modify individual jobs' code.

### Does this PR introduce _any_ user-facing change?
Yes, it adds a new user-facing configuration option for opting in to a behavior change.

### How was this patch tested?
New tests in `SparkSubmitSuite`, including one which hangs (failing with a timeout) unless the new option is set to `true`.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#52091 from pan3793/SPARK-48547.

Lead-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
### What changes were proposed in this pull request?

There is race condition between `CachedRDDBuilder.cachedColumnBuffers` and `CachedRDDBuilder.clearCache`: when they interleave each other, `cachedColumnBuffers` might return a `nullptr`.

This looks like a day-1 bug introduced from  apache@20ca208#diff-4068fce361a50e3d32af2ba2d4231905f500e7b2da9f46d5ddd99b758c30fd43

### Why are the changes needed?

The race condition might lead to NPE from [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L303) which is basically a null `RDD` returned from `CachedRDDBuilder.cachedColumnBuffers`

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Theoretically this race condition might be triggered as long as cache materialization and unpersistence happen on different thread. But there is no reliable way to construct unit test.

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes apache#52174 from liuzqt/SPARK-53435.

Authored-by: ziqi liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…ysis

### What changes were proposed in this pull request?

Propagate `LogicalPlan.PLAN_ID_TAG` to the resolved logical plan during SDP analysis so when the whole plan is sent to Spark for analysis, it contains the correct plan id.

### Why are the changes needed?

Spark Connect attaches a plan id to each logical plan. In SDP, we take part of the logical plan and analyze it independently to resolve table references correctly. When this happens, the logical plan id is lost which causes resolution errors when the plan is sent to Spark for complete analysis.

For example, group by and rollup functions would fail with `sql.AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like df1.select(df2.col("a"))`
```python3
from pyspark.sql.functions import col, sum, count

dp.materialized_view
def groupby_result():
    return spark.read.table("src").groupBy("id").count()
```
This happens because we take the below unresolved logical plan:
```
'Aggregate ['id], ['id, 'count(1) AS count#7]
+- 'UnresolvedRelation [src], [], false
```
Perform independent analysis on the `UnresolvedRelation` part to identify the table. During this analysis, the plan id is lost.
```
'Aggregate ['id], ['id, 'count(1) AS count#7]
+- SubqueryAlias spark_catalog.default.src
   +- Relation spark_catalog.default.src[id#9L] parquet
```
So when the above partially resolved logical plan is sent to Spark for analysis, it tries to resolve the `id` attribute in the aggregate operation with respect to the `SubqueryAlias` children, and fails because the children no longer contains the same plan id.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Tests

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#52121 from JiaqiWang18/SPARK-53377-sdp-groupBy-rollup-tests.

Authored-by: Jacky Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?

When a user sends multiple artifacts with the `addArtifacts` API, we process each artifact one at a time on the server-side.

If the server detects the user attempting to modify an artifact (by overwriting an existing artifact of the same path with a different byte sequence), an exception is immediately thrown and artifact addition process is terminated.

 Instead, the operation should be idempotent and the server should try to add as many artifacts as possible instead of returning early.

### Why are the changes needed?

As explained, if the server encounters an error while adding artifacts it will return immediately. This can be a bit wasteful as the server discards all other artifacts sent over the wire regardless of their own status. Thus, an improvement can be made to process all artifacts, catch any exceptions and rethrow them at the end.

### Does this PR introduce _any_ user-facing change?

This PR does not modify the existing API or the return codes. If the above scenario is triggered, the only user facing change is that the server adds as many artifacts as possible. Therefore it should be fully backwards compatible. Additionally, if more than one artifact already existed, its exception is added as a suppressed exception. Currently, these suppressed exceptions are not serialized into the grpc object and sent over the wire, however.

### How was this patch tested?

Unit tests and local testing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52073 from HendrikHuebner/improve-add-artifact-exceptions.

Lead-authored-by: Hendrik Huebner <[email protected]>
Co-authored-by: Hendrik Huebner <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
Implement the `time_diff` function in Scala API.

### Why are the changes needed?
Expand API support for the `TimeDiff` expression.

### Does this PR introduce _any_ user-facing change?
Yes, the new function is now available in Scala API.

### How was this patch tested?
Added appropriate Scala function tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51826 from uros-db/scala-time_diff.

Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…r YearMonthIntervalType field

### What changes were proposed in this pull request?

InterpretedUnsafeProjection shall setNull4Bytes for a YearMonthIntervalType field instead of setNull8Bytes

### Why are the changes needed?

YearMonthIntervalType is 4-byte length

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Passing CI

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52178 from yaooqinn/SPARK-53437.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
…ends

### What changes were proposed in this pull request?
Report a heartbeat on the driver when the application stops.

### Why are the changes needed?
When the application proactively terminates due to some memory issues at the driver (SparkOOM, result size too large, etc...), due to metric sampling issues we will often miss this resourcing problem in the memory metrics and in the event log. We will abort the job before we capture accurate metrics for the driver. If we report an additional heartbeat (metric collection at the driver) on application termination than we will be able to better reflect the memory usage in the event log, shs, etc.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested
Tested with a custom job that collected a large amount of data to the driver, and otherwise had very low driver memory usage, (low # partitions no other data structures used at driver), without the change we witnessed that the peak memory usage at the driver was low <~100MiB, with this change we witness the higher memory usage reflected.
<img width="1723" height="230" alt="image" src="https://github.com/user-attachments/assets/fb442550-a262-453e-b6e2-f47e1e9f11b1" />

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51882 from ForVic/vsunderl/report_driver_heartbeat.

Lead-authored-by: ForVic <[email protected]>
Co-authored-by: Victor Sunderland <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
… 15.0.0

### What changes were proposed in this pull request?

This PR is a followup of apache#52172 that makes the test compatible with PyArrow 0.15.

### Why are the changes needed?

Tests fail with PyArrow 0.15 https://github.com/apache/spark/actions/runs/17355567623/job/49268068508

`StructArray.from_arrays(..., type=...)` is available from PyArrow 15.0.0 (https://arrow.apache.org/docs/18.0/python/generated/pyarrow.StructArray.html#pyarrow.StructArray.from_arrays).

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52186 from HyukjinKwon/SPARK-53433-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment