Skip to content

Conversation

EnricoMi
Copy link

@EnricoMi EnricoMi commented Nov 26, 2024

What changes were proposed in this pull request?

Adds options to retry FileNotFoundExceptions when opening files migrated to the fallback storage.

  • STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_DELAY sets the allowed replication delay.
    The executor waits at most this long for the shuffle data file to appear on the fallback storage
  • STORAGE_DECOMMISSION_FALLBACK_STORAGE_REPLICATION_WAIT sets an interval of re-attempts looking for the file

Why are the changes needed?

Using a distributed filesystem as the fallback storage for migrating shuffle data on executor decommissioning, executors that attempt to read the migrated data might not yet see the file that has been written by the decommissioned executor. This is called replication delay.

Currently, executors give up instantly, even though they know the data have been successfully migrated to the fallback storage, from where they do not migrate further. Having the executor wait for a defined time and reattempt to open the file avoids a fetch failure and a re-computation of the migrated shuffle data.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Nov 26, 2024
@EnricoMi EnricoMi force-pushed the fallback-storage-retry-file-not-found branch from e88431f to 9aaf53c Compare November 28, 2024 08:51
EnricoMi pushed a commit that referenced this pull request Jan 29, 2025
This is a trivial change to replace the loop index from `int` to `long`. Surprisingly, microbenchmark shows more than double performance uplift.

Analysis
--------
The hot loop of `arrayEquals` method is simplifed as below. Loop index `i` is defined as `int`, it's compared with `length`, which is a `long`, to determine if the loop should end.
```
public static boolean arrayEquals(
    Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
  ......
  int i = 0;
  while (i <= length - 8) {
    if (Platform.getLong(leftBase, leftOffset + i) !=
        Platform.getLong(rightBase, rightOffset + i)) {
          return false;
    }
    i += 8;
  }
  ......
}
```

Strictly speaking, there's a code bug here. If `length` is greater than 2^31 + 8, this loop will never end because `i` as a 32 bit integer is at most 2^31 - 1. But compiler must consider this behaviour as intentional and generate code strictly match the logic. It prevents compiler from generating optimal code.

Defining loop index `i` as `long` corrects this issue. Besides more accurate code logic, JIT is able to optimize this code much more aggressively. From microbenchmark, this trivial change improves performance significantly on both Arm and x86 platforms.

Benchmark
---------
Source code:
https://gist.github.com/cyb70289/258e261f388e22f47e4d961431786d1a

Result on Arm Neoverse N2:
```
Benchmark                             Mode  Cnt    Score   Error  Units
ArrayEqualsBenchmark.arrayEqualsInt   avgt   10  674.313 ± 0.213  ns/op
ArrayEqualsBenchmark.arrayEqualsLong  avgt   10  313.563 ± 2.338  ns/op
```

Result on Intel Cascake Lake:
```
Benchmark                             Mode  Cnt     Score   Error  Units
ArrayEqualsBenchmark.arrayEqualsInt   avgt   10  1130.695 ± 0.168  ns/op
ArrayEqualsBenchmark.arrayEqualsLong  avgt   10   461.979 ± 0.097  ns/op
```

Deep dive
---------
Dive deep to the machine code level, we can see why the big gap. Listed below are arm64 assembly generated by Openjdk-17 C2 compiler.

For `int i`, the machine code is similar to source code, no deep optimization. Safepoint polling is expensive in this short loop.
```
// jit c2 machine code snippet
  0x0000ffff81ba8904:   mov        w15, wzr              // int i = 0
  0x0000ffff81ba8908:   nop
  0x0000ffff81ba890c:   nop
loop:
  0x0000ffff81ba8910:   ldr        x10, [x13, w15, sxtw] // Platform.getLong(leftBase, leftOffset + i)
  0x0000ffff81ba8914:   ldr        x14, [x12, w15, sxtw] // Platform.getLong(rightBase, rightOffset + i)
  0x0000ffff81ba8918:   cmp        x10, x14
  0x0000ffff81ba891c:   b.ne       0x0000ffff81ba899c    // return false if not equal
  0x0000ffff81ba8920:   ldr        x14, [x28, apache#848]      // x14 -> safepoint
  0x0000ffff81ba8924:   add        w15, w15, #0x8        // i += 8
  0x0000ffff81ba8928:   ldr        wzr, [x14]            // safepoint polling
  0x0000ffff81ba892c:   sxtw       x10, w15              // extend i to long
  0x0000ffff81ba8930:   cmp        x10, x11
  0x0000ffff81ba8934:   b.le       0x0000ffff81ba8910    // if (i <= length - 8) goto loop
```

For `long i`, JIT is able to do much more aggressive optimization. E.g, below code snippet unrolls the loop by four.
```
// jit c2 machine code snippet
unrolled_loop:
  0x0000ffff91de6fe0:   sxtw       x10, w7
  0x0000ffff91de6fe4:   add        x23, x22, x10
  0x0000ffff91de6fe8:   add        x24, x21, x10
  0x0000ffff91de6fec:   ldr        x13, [x23]          // unroll-1
  0x0000ffff91de6ff0:   ldr        x14, [x24]
  0x0000ffff91de6ff4:   cmp        x13, x14
  0x0000ffff91de6ff8:   b.ne       0x0000ffff91de70a8
  0x0000ffff91de6ffc:   ldr        x13, [x23, #8]      // unroll-2
  0x0000ffff91de7000:   ldr        x14, [x24, #8]
  0x0000ffff91de7004:   cmp        x13, x14
  0x0000ffff91de7008:   b.ne       0x0000ffff91de70b4
  0x0000ffff91de700c:   ldr        x13, [x23, #16]     // unroll-3
  0x0000ffff91de7010:   ldr        x14, [x24, #16]
  0x0000ffff91de7014:   cmp        x13, x14
  0x0000ffff91de7018:   b.ne       0x0000ffff91de70a4
  0x0000ffff91de701c:   ldr        x13, [x23, #24]     // unroll-4
  0x0000ffff91de7020:   ldr        x14, [x24, #24]
  0x0000ffff91de7024:   cmp        x13, x14
  0x0000ffff91de7028:   b.ne       0x0000ffff91de70b0
  0x0000ffff91de702c:   add        w7, w7, #0x20
  0x0000ffff91de7030:   cmp        w7, w11
  0x0000ffff91de7034:   b.lt       0x0000ffff91de6fe0
```

### What changes were proposed in this pull request?

A trivial change to replace loop index `i` of method `arrayEquals` from `int` to `long`.

### Why are the changes needed?

To improve performance and fix a possible bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49568 from cyb70289/arrayEquals.

Authored-by: Yibo Cai <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
@EnricoMi EnricoMi force-pushed the fallback-storage-retry-file-not-found branch from 9aaf53c to 6f7c8e6 Compare May 23, 2025 15:41
@EnricoMi EnricoMi force-pushed the fallback-storage-retry-file-not-found branch 3 times, most recently from 8f2d443 to 796b866 Compare June 12, 2025 14:06
@EnricoMi EnricoMi marked this pull request as ready for review June 13, 2025 06:00
@EnricoMi EnricoMi changed the title FallbackStorage retries FileNotFoundExceptions [SPARK-52508][K8S] FallbackStorage retries FileNotFoundExceptions Jun 17, 2025
xinrong-meng and others added 19 commits August 7, 2025 10:38
### What changes were proposed in this pull request?
Fix autocorr divide-by-zero error under ANSI mode

### Why are the changes needed?
Ensure pandas on Spark works well with ANSI mode on.
Part of https://issues.apache.org/jira/browse/SPARK-52169.

### Does this PR introduce _any_ user-facing change?
When ANSI is on,

FROM
```py
>>> s = ps.Series([1, 0, 0, 0])
>>> s.autocorr()
...
25/08/04 13:25:13 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 33)
org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. SQLSTATE: 22012
== DataFrame ==
"corr" was called from
...
```

TO
```py
>>> s = ps.Series([1, 0, 0, 0])
>>> s.autocorr()
nan
```

### How was this patch tested?
Unit tests.

Commands below passed
```
 1004  SPARK_ANSI_SQL_MODE=true ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.series.test_stat SeriesStatTests.test_autocorr"

 1009  SPARK_ANSI_SQL_MODE=false ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.series.test_stat SeriesStatTests.test_autocorr
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51192 from xinrong-meng/autocorr.

Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
…s `SlowHiveTest`

### What changes were proposed in this pull request?

This PR aims to mark `DynamicPartitionPruningHive*Suite*` as `SlowHiveTest`.
- DynamicPartitionPruningHiveScanSuiteAEOff
- DynamicPartitionPruningHiveScanSuiteAEOn

### Why are the changes needed?

To balance Hive CIs.

| JOB NAME | BEFORE | AFTER |
|---|---|---|
| hive - slow | 46m | 51m |
| hive - other | 89m | 76m |

**BEFORE (master branch)**
<img width="627" height="99" alt="Screenshot 2025-08-06 at 15 52 52" src="https://github.com/user-attachments/assets/b2ea8930-c5f7-4700-b5cb-c7b75f040fac" />

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Check the CI result on this PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51888 from dongjoon-hyun/SPARK-53162.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR aims to promote the existing exit code, 101, to the official `SparkExitCode` at Apache Spark 4.1.0.

### Why are the changes needed?

`SparkSubmit` has been exposing the exit code, `101`, in case of `ClassNotFoundException` or `NoClassDefFoundError`. We had better register this as `SparkExitCode` class for consistency.

https://github.com/apache/spark/blob/46fd5258a16523637f7ac5fa7ece16f626816454/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1083

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51892 from dongjoon-hyun/SPARK-53165.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…es` object

### What changes were proposed in this pull request?

This PR aims to use `SparkExitCode.EXIT_FAILURE` instead of the magic number, `1`, in the source code.

```scala
- throw SparkUserAppException(1)
+ throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
```

### Why are the changes needed?

We had better use the predefined constant variable instead of magic numbers in the code:
- `SparkUserAppException` is designed to have `exitCode`.
- `SparkExitCode` defines `EXIT_FAILURE` for `1`.

https://github.com/apache/spark/blob/46fd5258a16523637f7ac5fa7ece16f626816454/common/utils/src/main/scala/org/apache/spark/SparkException.scala#L156

https://github.com/apache/spark/blob/46fd5258a16523637f7ac5fa7ece16f626816454/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala#L26

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51893 from dongjoon-hyun/SPARK-53166.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?
We change the behavior of the `Murmur3Hash` and `XxHash64` catalyst expressions to be collation agnostic (i.e. collation-unaware). Also, we introduce two new internal catalyst expressions: `CollationAwareMurmur3Hash` and `CollationAwareXxHash64`, which are collation aware and take the collation of the string into consideration when hashing collated strings.

Furthermore, we replace `Murmur3Hash` and `XxHash64` in expressions where the hash expressions should be collation aware with `CollationAwareMurmur3Hash` and `CollationAwareXxHash64`. This is necessary for example when we do hash partitioning. Moreover, we change the way hashing is done for collated strings for the internal HiveHash expression to be consistent with the rest of the hashing expressions (the HiveHash expression is meant to always be collation-aware).

Finally, we add a kill switch (the SQL config is `COLLATION_AGNOSTIC_HASHING_ENABLED`) that allows to recover the previous behavior of `Murmur3Hash` and `XxHash64` as user-facing expressions. The kill switch has no effect on the new collation aware hashing expressions, or the HiveHash expression, which are internal and need to follow the new collation aware behavior.

### Why are the changes needed?
The `Murmur3Hash` and `XxHash64` catalyst expressions, when applied to collated strings, currently always take into consideration the collation of the string, that is they are collation aware. This is not the correct behavior, and these expressions should be collation agnostic by default instead.

### Does this PR introduce _any_ user-facing change?
Yes, see the detailed explanation above.

### How was this patch tested?
Updated existing tests in relevant suites: CollationFactorySuite, DistributionSuite, and HashExpressionsSuite. Also verified that the CollationSuite suite passes.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51521 from uros-db/collation-hashing.

Lead-authored-by: Uros Bojanic <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
Prior to this change, `EXPLAIN FORMATTED` on query e.g.:

```
SELECT *
FROM catalog.tbl1 t1
    JOIN catalog.tbl2 t2 ON t1.id1 = t2.id2
    JOIN catalog.tbl3 t3 ON t2.id2 = t3.id3
    JOIN catalog.tbl4 t4 ON t3.id3 = t4.id4;
```

looked like:
```
PushedJoins: [join_pushdown_catalog.tbl1, join_pushdown_catalog.tbl2, join_pushdown_catalog.tlb3, join_pushdown_catalog.tlb4]
```

With the change from PR, the output of `EXPLAIN FORMATTED` would be:

```
 == Physical Plan ==
   *(1) Project [ID#x, AMOUNT#x, ADDRESS#x, ID_1#x AS ID#x, NEXT_ID#x, SALARY#x, SURNAME#x, id_3#x AS id#x, id_1_2#x AS id_1#x, id_2#x, id_1_1#x, sid#x, id_4#x AS id#x, id_1_3#x AS id_1#x, id_2_2#x AS id_2#x, id_2_1#x, Sid_1#x AS Sid#x]
   +- *(1) Scan JDBC v1 Relation from v2 scan join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1[ID#x,AMOUNT#x,ADDRESS#x,ID_1#x,NEXT_ID#x,SALARY#x,SURNAME#x,id_3#x,id_1_2#x,id_2#x,id_1_1#x,sid#x,id_4#x,id_1_3#x,id_2_2#x,id_2_1#x,Sid_1#x] PushedFilters: [id_3 = (id_4 + 1)], PushedJoins:
   [L]: PushedFilters: [ID_1 = (id_3 + 1)]
        PushedJoins:
        [L]: PushedFilters: [ID = (ID_1 + 1)]
             PushedJoins:
             [L]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1
                  PushedFilters: [ID IS NOT NULL]
             [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_2
                  PushedFilters: [ID IS NOT NULL]
        [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_3
             PushedFilters: [id IS NOT NULL]
   [R]: Relation: join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_4
        PushedFilters: [id IS NOT NULL]
   , ReadSchema: struct<ID:int,AMOUNT:decimal(10,2),ADDRESS:string,ID_1:int,NEXT_ID:int,SALARY:decimal(10,2),SURNAME:string,id_3:int,id_1_2:int,id_2:int,id_1_1:int,sid:int,id_4:int,id_1_3:int,id_2_2:int,id_2_1:int,Sid_1:int>
```

PushedFilters on top of PushedJoins are actually join conditions.
It can be seen that the name of `Scan JDBC v1 Relation from v2 scan` is ` catalog.tbl1`. This should be fixed as well, but it won't be a part of this PR.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#51781 from PetarVasiljevic-DB/improve_explain_command_for_dsv2_join_pushdown.

Authored-by: Petar Vasiljevic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…ad of `Files.toByteArray`

### What changes were proposed in this pull request?

This PR aims to use Java 9+ `java.nio.file.Files.readAllBytes` instead of `com.google.common.io.Files.toByteArray`.

In addition, a new Scalastyle rule is added to ban `Files.toByteArray` for consistency.

### Why are the changes needed?

The built-in Java method is as good as 3rd party library.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51891 from dongjoon-hyun/SPARK-53164.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…arameter

### What changes were proposed in this pull request?

This PR aims to improve `SparkUserAppException` to have `cause` parameter additionally.

```scala
-private[spark] case class SparkUserAppException(exitCode: Int)
+private[spark] case class SparkUserAppException(exitCode: Int, cause: Throwable = null)
```

### Why are the changes needed?

`SparkUserAppException` is used in many places; `SparkSubmit`, `PythonRunner`, `RRunner`, `SparkPipelines`. Although this is defined as `private[spark]`, a new parameter `cause` can be used to enrich the information from the user application to upper layers internally.

### Does this PR introduce _any_ user-facing change?

No, this is defined as `private[spark]`.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51897 from dongjoon-hyun/SPARK-53170.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?
This PR improves the UTF8String repeat logic for some special cases

### Why are the changes needed?
performance improvement

### Does this PR introduce _any_ user-facing change?
now

### How was this patch tested?
Passing existing UT and benchmarked like

```
scala> spark.time((1 to 10000000).foreach(_ => UTF8String.fromStri
ng("A").repeat(1024)))
Time taken: 784 ms
scala> spark.time((1 to 10000000).foreach(_ => repeat(UTF8String.f
romString("A"), 1024)))
Time taken: 432 ms
````

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#51900 from yaooqinn/SPARK-53171.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?
Upgrade PyArrow to 21.0.0

### Why are the changes needed?
to test against the latest pyarrow

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#51890 from zhengruifeng/pyarrow_21.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…s file

### What changes were proposed in this pull request?

This PR modifies the `SparkSubmitCommandBuilder` to use "effective config" to evaluate `isRemote` instead of the passed args only. This makes `spark.remote` configured in the properties file(`spark-defaults.conf` or other files specified by `--properties-file`) effective.

### Why are the changes needed?

```
$ sbin/start-connect-server.sh
$ cat > conf/spark-connect.conf <<EOF
spark.reomte=sc://localhost:15002
EOF
$ bin/spark-shell --properties-file conf/spark-connect.conf
```

```
25/08/07 13:35:56 ERROR Main: Failed to initialize Spark session.
org.apache.spark.SparkException: A master URL must be set in your configuration
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:421)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:3055)
	at org.apache.spark.sql.classic.SparkSession$Builder.$anonfun$build$2(SparkSession.scala:839)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.classic.SparkSession$Builder.build(SparkSession.scala:830)
	at org.apache.spark.sql.classic.SparkSession$Builder.getOrCreate(SparkSession.scala:859)
	at org.apache.spark.sql.classic.SparkSession$Builder.getOrCreate(SparkSession.scala:732)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:923)
	at org.apache.spark.repl.Main$.createSparkSession(Main.scala:116)
```

### Does this PR introduce _any_ user-facing change?

Yes, a kind of bug fix.

### How was this patch tested?

UT is added, plus manually test:

```
$ sbin/start-connect-server.sh
$ cat > conf/spark-connect.conf <<EOF
spark.reomte=sc://localhost:15002
EOF
$ bin/spark-shell --properties-file conf/spark-connect.conf
```
The previously failed case now works.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51896 from pan3793/SPARK-53167.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…ith a project

### What changes were proposed in this pull request?

This patch fixes the optimization rule `RemoveRedundantAggregates`.

### Why are the changes needed?

The optimizer rule `RemoveRedundantAggregates` removes redundant lower aggregation from a query plan and replace it with a project of referred non-aggregate expressions. However, if the removed aggregation is a global one, that is not correct because a project is different with a global aggregation in semantics.

For example, if the input relation is empty, a project might be optimized to an empty relation, while a global aggregation will return a single row.

### Does this PR introduce _any_ user-facing change?

Yes, this fixes a user-facing bug. Previously, a global aggregation under another aggregation might be treated as redundant and replaced as a project with non-aggregation expressions. If the input relation is empty, the replacement is incorrect and might produce incorrect result. This patch adds a new unit test to show the difference.

### How was this patch tested?

Unit test, manual test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#51884 from viirya/fix_remove_redundant_agg.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
…memory size from resource profile

### What changes were proposed in this pull request?

Add two APIs to get overhead memory size and offheap memory size from resource profile.

### Why are the changes needed?

Simplify retrieving overhead memory size and off-heap memory size.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The existing tests with modifications.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51870 from PHILO-HE/add-rp-api.

Authored-by: PHILO-HE <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…r `level` for `SparkFunSuite#withLogAppender` from `None` to `Some(Level.INFO)`

### What changes were proposed in this pull request?
This pr changes the default value of the input parameter `level: Option[Level]` for the `SparkFunSuite#withLogAppender` function from `None` to `Some(Level.INFO)`, in order to decouple the relevant tests from the `rootLogger.level` configuration in the `log4j2.properties` .

### Why are the changes needed?
Suppose, for some reason, we change the `rootLogger.level` configuration value in `sql/core/src/test/resources/log4j2.properties` from `info` to `warn`. Subsequently, when running unit tests, failures similar to the following may occur:

```
build/sbt clean "sql/testOnly org.apache.spark.sql.execution.QueryExecutionSuite"

[info] - Logging plan changes for execution *** FAILED *** (63 milliseconds)
[info]   testAppender.loggingEvents.exists(((x$10: org.apache.logging.log4j.core.LogEvent) => x$10.getMessage().getFormattedMessage().contains(expectedMsg))) was false (QueryExecutionSuite.scala:243)
[info]   org.scalatest.exceptions.TestFailedException:
...
```

Similar issues may also arise in other test cases such as `AdaptiveQueryExecSuite`. Therefore, this PR modifies the default value of the `level` parameter to avoid such test coupling problems.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Passed Github Actions
- Manually verified that the aforementioned test cases no longer fail

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51895 from LuciferYang/SPARK-53168.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
…of File Appender to`" from `log4j2.properties`

### What changes were proposed in this pull request?
This pr removes the comments related to "`Set the logger level of File Appender to`" from `log4j2.properties`, as the relevant configuration is self-descriptive and the comments often do not match the actual configuration.

### Why are the changes needed?
Remove redundant comments.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass Github Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51894 from LuciferYang/minor-sql-log4j-config.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
….BaseEncoding`

### What changes were proposed in this pull request?

This PR aims to use Java `Base64` instead of `com.google.common.io.BaseEncoding`.

In addition, new Scalastyle and Checkstyle rules are added to ban `com.google.common.io.BaseEncoding` in order to prevent a future regression.

### Why are the changes needed?

Java implementation is **over 2x faster** than Google one.

```scala
scala> val s = "a".repeat(5_000_000).getBytes(java.nio.charset.StandardCharsets.UTF_8)

scala> spark.time(java.util.Base64.getDecoder().decode(java.util.Base64.getEncoder().encodeToString(s)).length)
Time taken: 18 ms
val res0: Int = 5000000

scala> spark.time(com.google.common.io.BaseEncoding.base64().decode(com.google.common.io.BaseEncoding.base64().encode(s)).length)
Time taken: 50 ms
val res1: Int = 5000000
```

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51904 from dongjoon-hyun/SPARK-53177.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…sted_arrays`

### What changes were proposed in this pull request?
this test is incorrect, but it happened to pass with special partitioning

### Why are the changes needed?
to correct a test, it fails with different envs

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci and manually check with pyspark shell

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#51898 from zhengruifeng/fix_nest_array_test.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Xinrong Meng <[email protected]>
… `CharStreams.toString`

### What changes were proposed in this pull request?

This PR aims to se `SparkStreamUtils.toString` instead of `CharStreams.toString`.

### Why are the changes needed?

`SparkStreamUtils.toString` is ***faster*** than `CharStreams.toString`.

```scala
scala> spark.time(org.apache.spark.util.SparkStreamUtils.toString(new java.io.FileInputStream("/tmp/1G.bin")).length)
Time taken: 322 ms
val res0: Int = 1073741824

scala> spark.time(com.google.common.io.CharStreams.toString(new java.io.InputStreamReader(java.nio.file.Files.newInputStream(Path.of("/tmp/1G.bin")))).length)
Time taken: 533 ms
val res1: Int = 1073741824
```

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51908 from dongjoon-hyun/SPARK-53179.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR aims to upgrade `curator` to 5.9.0.

### Why are the changes needed?

To bring the latest improvement and bug fixes.

- https://github.com/apache/curator/releases/tag/apache-curator-5.9.0
- https://github.com/apache/curator/releases/tag/apache-curator-5.8.0

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51906 from dongjoon-hyun/SPARK-53178.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
JoshRosen and others added 9 commits September 1, 2025 10:32
…ly call System.exit after user code main method exits

This PR is based on apache#46889 authored by JoshRosen

### What changes were proposed in this pull request?
This PR adds a new SparkConf flag option, `spark.submit.callSystemExitOnMainExit` (default false), which when true will cause SparkSubmit to call `System.exit()` in the JVM once the user code's main method has exited (for Java / Scala jobs) or once the user's Python or R script has exited.

### Why are the changes needed?
This is intended to address a longstanding issue where `spark-submit` runs might hang after user code has completed:

[According to Java’s java.lang.Runtime docs](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Runtime.html#shutdown):

> The Java Virtual Machine initiates the shutdown sequence in response to one of several events:
>
> 1. when the number of [live](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Thread.html#isAlive()) non-daemon threads drops to zero for the first time (see note below on the JNI Invocation API);
> 2. when the Runtime.exit or System.exit method is called for the first time; or
> 3. when some external event occurs, such as an interrupt or a signal is received from the operating system.

For Python and R programs, SparkSubmit’s PythonRunner and RRunner will call System.exit() if the user program exits with a non-zero exit code (see [python](https://github.com/apache/spark/blob/d5c33c6bfb5757b243fc8e1734daeaa4fe3b9b32/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L101-L104) and [R](https://github.com/apache/spark/blob/d5c33c6bfb5757b243fc8e1734daeaa4fe3b9b32/core/src/main/scala/org/apache/spark/deploy/RRunner.scala#L109-L111) runner code).

But for Java and Scala programs, plus any successful R or Python programs, Spark will not automatically call System.exit.

In those situation, the JVM will only shutdown when, via event (1), all non-[daemon](https://stackoverflow.com/questions/2213340/what-is-a-daemon-thread-in-java) threads have exited (unless the job is cancelled and sent an external interrupt / kill signal, corresponding to event (3)).

Thus, non-daemon threads might cause logically-completed spark-submit jobs to hang rather than completing.

The non-daemon threads are not always under Spark's own control and may not necessarily be cleaned up by `SparkContext.stop()`.

Thus, it is useful to have an opt-in functionality to have SparkSubmit automatically call `System.exit()` upon main method exit (which usually, but not always, corresponds to job completion): this option will allow users and data platform operators to enforce System.exit() calls without having to modify individual jobs' code.

### Does this PR introduce _any_ user-facing change?
Yes, it adds a new user-facing configuration option for opting in to a behavior change.

### How was this patch tested?
New tests in `SparkSubmitSuite`, including one which hangs (failing with a timeout) unless the new option is set to `true`.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#52091 from pan3793/SPARK-48547.

Lead-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
### What changes were proposed in this pull request?

There is race condition between `CachedRDDBuilder.cachedColumnBuffers` and `CachedRDDBuilder.clearCache`: when they interleave each other, `cachedColumnBuffers` might return a `nullptr`.

This looks like a day-1 bug introduced from  apache@20ca208#diff-4068fce361a50e3d32af2ba2d4231905f500e7b2da9f46d5ddd99b758c30fd43

### Why are the changes needed?

The race condition might lead to NPE from [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L303) which is basically a null `RDD` returned from `CachedRDDBuilder.cachedColumnBuffers`

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Theoretically this race condition might be triggered as long as cache materialization and unpersistence happen on different thread. But there is no reliable way to construct unit test.

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes apache#52174 from liuzqt/SPARK-53435.

Authored-by: ziqi liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…ysis

### What changes were proposed in this pull request?

Propagate `LogicalPlan.PLAN_ID_TAG` to the resolved logical plan during SDP analysis so when the whole plan is sent to Spark for analysis, it contains the correct plan id.

### Why are the changes needed?

Spark Connect attaches a plan id to each logical plan. In SDP, we take part of the logical plan and analyze it independently to resolve table references correctly. When this happens, the logical plan id is lost which causes resolution errors when the plan is sent to Spark for complete analysis.

For example, group by and rollup functions would fail with `sql.AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like df1.select(df2.col("a"))`
```python3
from pyspark.sql.functions import col, sum, count

dp.materialized_view
def groupby_result():
    return spark.read.table("src").groupBy("id").count()
```
This happens because we take the below unresolved logical plan:
```
'Aggregate ['id], ['id, 'count(1) AS count#7]
+- 'UnresolvedRelation [src], [], false
```
Perform independent analysis on the `UnresolvedRelation` part to identify the table. During this analysis, the plan id is lost.
```
'Aggregate ['id], ['id, 'count(1) AS count#7]
+- SubqueryAlias spark_catalog.default.src
   +- Relation spark_catalog.default.src[id#9L] parquet
```
So when the above partially resolved logical plan is sent to Spark for analysis, it tries to resolve the `id` attribute in the aggregate operation with respect to the `SubqueryAlias` children, and fails because the children no longer contains the same plan id.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Tests

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#52121 from JiaqiWang18/SPARK-53377-sdp-groupBy-rollup-tests.

Authored-by: Jacky Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?

When a user sends multiple artifacts with the `addArtifacts` API, we process each artifact one at a time on the server-side.

If the server detects the user attempting to modify an artifact (by overwriting an existing artifact of the same path with a different byte sequence), an exception is immediately thrown and artifact addition process is terminated.

 Instead, the operation should be idempotent and the server should try to add as many artifacts as possible instead of returning early.

### Why are the changes needed?

As explained, if the server encounters an error while adding artifacts it will return immediately. This can be a bit wasteful as the server discards all other artifacts sent over the wire regardless of their own status. Thus, an improvement can be made to process all artifacts, catch any exceptions and rethrow them at the end.

### Does this PR introduce _any_ user-facing change?

This PR does not modify the existing API or the return codes. If the above scenario is triggered, the only user facing change is that the server adds as many artifacts as possible. Therefore it should be fully backwards compatible. Additionally, if more than one artifact already existed, its exception is added as a suppressed exception. Currently, these suppressed exceptions are not serialized into the grpc object and sent over the wire, however.

### How was this patch tested?

Unit tests and local testing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52073 from HendrikHuebner/improve-add-artifact-exceptions.

Lead-authored-by: Hendrik Huebner <[email protected]>
Co-authored-by: Hendrik Huebner <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
Implement the `time_diff` function in Scala API.

### Why are the changes needed?
Expand API support for the `TimeDiff` expression.

### Does this PR introduce _any_ user-facing change?
Yes, the new function is now available in Scala API.

### How was this patch tested?
Added appropriate Scala function tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51826 from uros-db/scala-time_diff.

Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…r YearMonthIntervalType field

### What changes were proposed in this pull request?

InterpretedUnsafeProjection shall setNull4Bytes for a YearMonthIntervalType field instead of setNull8Bytes

### Why are the changes needed?

YearMonthIntervalType is 4-byte length

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Passing CI

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52178 from yaooqinn/SPARK-53437.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
…ends

### What changes were proposed in this pull request?
Report a heartbeat on the driver when the application stops.

### Why are the changes needed?
When the application proactively terminates due to some memory issues at the driver (SparkOOM, result size too large, etc...), due to metric sampling issues we will often miss this resourcing problem in the memory metrics and in the event log. We will abort the job before we capture accurate metrics for the driver. If we report an additional heartbeat (metric collection at the driver) on application termination than we will be able to better reflect the memory usage in the event log, shs, etc.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested
Tested with a custom job that collected a large amount of data to the driver, and otherwise had very low driver memory usage, (low # partitions no other data structures used at driver), without the change we witnessed that the peak memory usage at the driver was low <~100MiB, with this change we witness the higher memory usage reflected.
<img width="1723" height="230" alt="image" src="https://github.com/user-attachments/assets/fb442550-a262-453e-b6e2-f47e1e9f11b1" />

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51882 from ForVic/vsunderl/report_driver_heartbeat.

Lead-authored-by: ForVic <[email protected]>
Co-authored-by: Victor Sunderland <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
… 15.0.0

### What changes were proposed in this pull request?

This PR is a followup of apache#52172 that makes the test compatible with PyArrow 0.15.

### Why are the changes needed?

Tests fail with PyArrow 0.15 https://github.com/apache/spark/actions/runs/17355567623/job/49268068508

`StructArray.from_arrays(..., type=...)` is available from PyArrow 15.0.0 (https://arrow.apache.org/docs/18.0/python/generated/pyarrow.StructArray.html#pyarrow.StructArray.from_arrays).

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52186 from HyukjinKwon/SPARK-53433-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@EnricoMi EnricoMi force-pushed the fallback-storage-retry-file-not-found branch from 796b866 to 11f8c36 Compare September 2, 2025 05:02
EnricoMi pushed a commit that referenced this pull request Sep 2, 2025
…ingBuilder`

### What changes were proposed in this pull request?

This PR aims to improve `toString` by `JEP-280` instead of `ToStringBuilder`. In addition, `Scalastyle` and `Checkstyle` rules are added to prevent a future regression.

### Why are the changes needed?

Since Java 9, `String Concatenation` has been handled better by default.

| ID | DESCRIPTION |
| - | - |
| JEP-280 | [Indify String Concatenation](https://openjdk.org/jeps/280) |

For example, this PR improves `OpenBlocks` like the following. Both Java source code and byte code are simplified a lot by utilizing JEP-280 properly.

**CODE CHANGE**
```java

- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-   .append("appId", appId)
-   .append("execId", execId)
-   .append("blockIds", Arrays.toString(blockIds))
-   .toString();
+ return "OpenBlocks[appId=" + appId + ",execId=" + execId + ",blockIds=" +
+     Arrays.toString(blockIds) + "]";
```

**BEFORE**
```
  public java.lang.String toString();
    Code:
       0: new           #39                 // class org/apache/commons/lang3/builder/ToStringBuilder
       3: dup
       4: aload_0
       5: getstatic     #41                 // Field org/apache/commons/lang3/builder/ToStringStyle.SHORT_PREFIX_STYLE:Lorg/apache/commons/lang3/builder/ToStringStyle;
       8: invokespecial #47                 // Method org/apache/commons/lang3/builder/ToStringBuilder."<init>":(Ljava/lang/Object;Lorg/apache/commons/lang3/builder/ToStringStyle;)V
      11: ldc           #50                 // String appId
      13: aload_0
      14: getfield      #7                  // Field appId:Ljava/lang/String;
      17: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      20: ldc           #55                 // String execId
      22: aload_0
      23: getfield      #13                 // Field execId:Ljava/lang/String;
      26: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      29: ldc           #56                 // String blockIds
      31: aload_0
      32: getfield      #16                 // Field blockIds:[Ljava/lang/String;
      35: invokestatic  #57                 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String;
      38: invokevirtual #51                 // Method org/apache/commons/lang3/builder/ToStringBuilder.append:(Ljava/lang/String;Ljava/lang/Object;)Lorg/apache/commons/lang3/builder/ToStringBuilder;
      41: invokevirtual #61                 // Method org/apache/commons/lang3/builder/ToStringBuilder.toString:()Ljava/lang/String;
      44: areturn
```

**AFTER**
```
  public java.lang.String toString();
    Code:
       0: aload_0
       1: getfield      #7                  // Field appId:Ljava/lang/String;
       4: aload_0
       5: getfield      #13                 // Field execId:Ljava/lang/String;
       8: aload_0
       9: getfield      #16                 // Field blockIds:[Ljava/lang/String;
      12: invokestatic  #39                 // Method java/util/Arrays.toString:([Ljava/lang/Object;)Ljava/lang/String;
      15: invokedynamic #43,  0             // InvokeDynamic #0:makeConcatWithConstants:(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;
      20: areturn
```

### Does this PR introduce _any_ user-facing change?

No. This is an `toString` implementation improvement.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51572 from dongjoon-hyun/SPARK-52880.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment