forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 1
[SPARK-52509][K8S] Cleanup shuffles from fallback storage #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
EnricoMi
wants to merge
1,222
commits into
master
Choose a base branch
from
fallback-storage-cleanup
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
eef1d80 to
d96df77
Compare
d96df77 to
602689b
Compare
…ndas_api` ### What changes were proposed in this pull request? Reenable doctest for `DataFrame.pandas_api` ### Why are the changes needed? for test coverage, the doctest will be ran when pandas and pyarrow are installed ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#52383 from zhengruifeng/doc_test_pandas_api. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…ureSuite ### What changes were proposed in this pull request? As title. ### Why are the changes needed? Address comments apache#52269 (review) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR adds new UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52381 from pan3793/SPARK-53523-followup. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…on guide ### What changes were proposed in this pull request? Add invalid mixed-type operations to ANSI migration guide ### Why are the changes needed? Smooth user migration since ANSI is on by default for Pandas API on Spark ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests; manual verification as: <img width="571" height="735" alt="image" src="https://github.com/user-attachments/assets/9bb2a652-34b5-4a52-8ee6-f95c38e30c7a" /> ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52376 from xinrong-meng/ansi_guide_2. Authored-by: Xinrong Meng <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This PR logically reverts SPARK-51311. ### Why are the changes needed? HADOOP-19152 has already been included in Hadoop 3.4.2 via apache/hadoop#7439, the previous workaround is unnecessary anymore. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I manually tested it in a kerberized cluster with a SparkPi. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52390 from pan3793/SPARK-53637. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…ddress ApplyCharTypePadding incompatibility ### What changes were proposed in this pull request? Modify streaming MicrobatchExecution to propagate metadata columns through projections to resolve an incompatibility with the ApplyCharTypePadding rule which is applied by default in serverless which previous resulted in an `assertion failed: Invalid batch: ACTV_IND#130290,_metadata#130291 != ACTV_IND#130307` error. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52375 from liviazhu/liviazhu-db/col-metadata. Authored-by: Livia Zhu <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…toConverter ### What changes were proposed in this pull request? This PR simplifies data type handling in the Spark Connect `LiteralValueProtoConverter` by consolidating type information into a single `data_type` field at the root level of the `Literal` message, rather than having separate type fields within nested structures. **Key changes:** 1. **Protobuf Schema Simplification:** - Added a new `data_type` field (field 100) to the root `Expression.Literal` message - Removed redundant type fields from nested messages (`Array.data_type`, `Map.data_type`, `Struct.data_type_struct`) 2. **Array Type Handling Enhancement:** - Added special handling for `ByteType` arrays to convert them to `Binary` type in the protobuf representation - This addresses a specific edge case where byte arrays should be represented as binary data ### Why are the changes needed? The current data type handling in Spark Connect has several issues: 1. **Redundancy and Complexity:** Type information is scattered across multiple fields in nested messages, making the protobuf schema unnecessarily complex and error-prone. 2. **Limited Extensibility:** Without this data_type field, it is difficult to add type information for literal types. For example, it's challenging to include detailed type metadata for types like `String` (with collation information), `YearMonthInterval`, `DayTimeInterval`, and other types that may require additional type-specific attributes. ### Does this PR introduce _any_ user-facing change? **No.** This is an internal refactoring of the Spark Connect protobuf schema and converter logic. ### How was this patch tested? `build/sbt "connect/testOnly *LiteralExpressionProtoConverterSuite"` `SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "connect-client-jvm/testOnly org.apache.spark.sql.PlanGenerationTestSuite"` `build/sbt "connect/testOnly org.apache.spark.sql.connect.ProtoToParsedPlanTestSuite"` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 1.5.11 Closes apache#52342 from heyihong/SPARK-53578. Authored-by: Yihong He <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? The CatalogColumnStat.readLargeTableProp is an O(N) operation. Considering a table can have a lot of table properties, this effectively becomes an O(N^2) operation, which can be very slow for tables with a lot of table properties. This PR improves the algorithmic complexity to O(N) by only constructing the large table properties if numParts exists. ### Why are the changes needed? For fixing a performance issue unintentionally introduced before. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. A previous patch already tested the side effect of this change apache#52355 ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52374 from yeshengm/improve-read-large-prop. Authored-by: Yesheng Ma <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…egisterShuffle ### What changes were proposed in this pull request? This PR fixes the thread-safetye issue in `SortShuffleManager.unregisterShuffle` by enforcing synchronous lock on `mapTaskIds`'s iteration. Besides, this PR also addresses the [concern](apache#52337 (comment)) to enfore the type of `taskIdMapsForShuffle` as `ConcurrentHashMap` to ensure its thread-safety. ### Why are the changes needed? Fix the potential thread-safety issue as pointed at apache#52337 (comment) and also the [concern](apache#52337 (comment)). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52386 from Ngone51/fix. Authored-by: Yi Wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…e NULLs ### What changes were proposed in this pull request? As a follow-up of apache#52655, add NULL handling in approx_top_k_accumulate/estimate/combine. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests on null handling for accumulate, combine and estimate. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52673 from yhuang-db/accumulate_estimate_count_null. Authored-by: yhuang-db <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request? As title. ### Why are the changes needed? This release fixes one regression that fails to decompress in certain case - luben/zstd-jni@1941118 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GHA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52684 from pan3793/SPARK-53971. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? Drop temporary functions in Arrow UDF tests ### Why are the changes needed? to avoid the env being polluted ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#52682 from zhengruifeng/with_temp_func_arrow. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? As title, bump Avro to the latest patched version (contains security fixes). ### Why are the changes needed? Release Notes are available at apache/avro#3518 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GHA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52664 from pan3793/SPARK-53954. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…o PySpark API ### What changes were proposed in this pull request? Introduce two new geospatial data types to PySpark API: - `GeographyType` - `GeometryType` This PR also adds appropriate JSON serialization logic for the new types in PySpark. Note that the GEOMETRY and GEOGRAPHY logical types were recently included to Spark SQL as part of: apache#52491. ### Why are the changes needed? Expanding on GEOMETRY and GEOGRAPHY type support across all of the supported APIs. ### Does this PR introduce _any_ user-facing change? Yes, two new data types are now available to users of the PySpark API. ### How was this patch tested? Added new tests to: - `test_geographytype.py` - `test_geometrytype.py` Also, added appropriate test cases to: - `test_types.py` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52627 from uros-db/geo-python-types. Authored-by: Uros Bojanic <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…ailure ### What changes were proposed in this pull request? When an option of AvroOptions requires boolean value, but a value that cannot be casted to boolean is passed in, classify the error. ### Why are the changes needed? The error should be classified to have better user experience. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52686 from siying/avro_boolean_option. Authored-by: Siying Dong <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
…ests ### What changes were proposed in this pull request? Drop temporary functions in Pandas UDF tests ### Why are the changes needed? for isolation of testing envs ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#52690 from zhengruifeng/with_temp_func_pd. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? Jackson 2.20 was released on August 28, 2025. Release Notes: https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.20 Release Blog: https://cowtowncoder.medium.com/jackson-2-20-0-released-0cc58ed1ea9f ### Why are the changes needed? Bump Jackson to the latest 2.x version, Jackson 3.0.0 has been released in October, 2.20 probably is the last feature version of 2.x serials? ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GHA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52687 from pan3793/SPARK-53974. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Kousuke Saruta <[email protected]>
…pressions ### What changes were proposed in this pull request? Currently, arithmetic expressions such as `Add` and `Multiply` use the configuration `spark.sql.decimalOperations.allowPrecisionLoss` to determine their output type when working with decimal values. This approach is problematic because if the expression is transformed or copied, its return type could change depending on the active configuration value. This issue can happen during view resolution; we can use one value of the config during analysis and different one during query optimization. If a referenced expression changes type and that reference is reused elsewhere in the plan it will trigger a plan validation error. ### Why are the changes needed? To address this, we should follow a similar approach to what was done for ANSI mode: store the relevant context directly within the expression as part of its state. This ensures the expression remains stable and unaffected by configuration changes when it’s copied or transformed. To make this transition smooth, I’ve generalized the existing EvalMode used for ANSI so that it can be extended to multiple configuration dimensions. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a new unit test in SQLViewSuite which was failing with plan validation error before. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52681 from stefankandic/fixViewDec. Authored-by: Stefan Kandic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-53917 #### Problem description LocalRelation is a Catalyst logical operator used to represent a dataset of rows inline as part of the LogicalPlan. LocalRelations represent dataframes created directly from Python and Scala objects, e.g., Python and Scala lists, pandas dataframes, csv files loaded in memory, etc. In Spark Connect, local relations are transferred over gRPC using LocalRelation (for relations under 64MB) and CachedLocalRelation (larger relations over 64MB) messages. CachedLocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB. #### Design In Spark Connect, the client needs to serialize the local relation before transferring it to the server. It serializes data via an Arrow IPC stream as a single record batch and schema as a json string. It then embeds data and schema as LocalRelation{schema,data} proto message. Small local relations (under 64MB) are sent directly as part of the ExecutePlanRequest. <img width="1398" height="550" alt="image" src="https://github.com/user-attachments/assets/c176f4cd-1a8f-4d72-8217-5a3bc221ace9" /> Larger local relations are first sent to the server via addArtifact and stored in memory or on disk via BlockManager. Then an ExecutePlanRequest is sent containing CachedLocalRelation{hash}, where hash is the artifact hash. The server retrieves the cached LocalRelation from the BlockManager via the hash, deserializes it, adds it to the LogicalPlan and then executes it. <img width="1401" height="518" alt="image" src="https://github.com/user-attachments/assets/51352194-5439-4559-9d43-fc19dfe81437" /> The server reads the data from the BlockManager as a stream and tries to create proto.LocalRelation via ``` proto.Relation .newBuilder() .getLocalRelation .getParserForType .parseFrom(blockData.toInputStream()) ``` This fails, because java protobuf library has a 2GB limit on deserializing protobuf messages from a string. ``` org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException) CodedInputStream encountered an embedded string or message which claimed to have negative size. ``` <img width="1396" height="503" alt="image" src="https://github.com/user-attachments/assets/60da9441-f4cc-45d5-b028-57573a0175c2" /> To fix this, I propose avoiding the protobuf layer during the serialization on the client and deserialization on the server. Instead of caching the full protobuf LocalRelation message, we cache the data and schema as separate artifacts, send two hashes {data_hash, schema_hash} to the server, load them both from BlockManager directly and create a LocalRelation on the server based on the unpacked data and schema. <img width="1397" height="515" alt="image" src="https://github.com/user-attachments/assets/e44558de-df64-43b0-8813-d03de6689810" /> After creating a prototype with the new proto message, I discovered that there are additional limits for CachedLocalRelations. Both the Scala Client and the Server store the data in a single Java Array[Byte], which has a 2GB size limit in Java. To avoid this limit, I propose transferring data in chunks. The Python and Scala clients will split data into multiple Arrow batches and upload them separately to the server. Each batch will be uploaded and stored a separate artifact. The Server will then load and process each batch separately. We will keep batch sizes around 16MB (TBD), well below the 2GB limit. This way we will avoid 2GB limits on both clients and on the server. <img width="1395" height="569" alt="image" src="https://github.com/user-attachments/assets/16fac7b2-d247-42a6-9ac3-decb48df023d" /> The final proto message looks like this: ``` message ChunkedCachedLocalRelation { repeated string dataHashes = 1; optional string schemaHash = 2; } ``` ### Why are the changes needed? LocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New python and scala tests for large local relations. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52613 from khakhlyuk/largelocalrelations. Authored-by: Alex Khakhlyuk <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
### What changes were proposed in this pull request? In the PR, I propose to extend the `try_make_timestamp_ltz` function, and accept a date and time fields. **Syntax** ``` try_make_timestamp_ltz(date[, time]) ``` **Arguments** - `date`: A date expression. - `time`: A time expression. **Returns** A `TIMESTAMP_LTZ`. Examples ``` > SELECT try_make_timestamp_ltz(DATE'2014-12-28', TIME'6:30:45.887'); 2014-12-28 06:30:45.887 ``` ### Why are the changes needed? Users will be able to create a timestamp with local time zone by combining a time and a date. ### Does this PR introduce _any_ user-facing change? Yes, this extends `try_make_timestamp_ltz` to accept additional kinds of inputs. ### How was this patch tested? Added new e2e SQL tests in corresponding golden files. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52063 from uros-db/try_make_timestamp_ltz. Lead-authored-by: Uros Bojanic <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? This PR proposes to introduce WATERMARK clause in SQL statement. WATERMARK clause is to define the watermark against the relation, especially streaming relation where STREAM keyword is added to the relation (or table valued function). Please refer to the SQL reference doc for WATERMARK clause about its definition. https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-qry-select-watermark The PR also contains new tests which show how to use it. ### Why are the changes needed? This is needed to unblock the stateful workload in Streaming Table & Flow being described as SQL statement. ### Does this PR introduce _any_ user-facing change? Yes, users can define the watermark for stateful workloads with SQL statement. ### How was this patch tested? New UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52428 from HeartSaVioR/SPARK-53687. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…assic pyspark ### What changes were proposed in this pull request? We found a performance regression in recentProgress in pyspark in assigned cluster after version 4.0. The direct cause of the regression in this [commit](apache@22eb6c4#diff-4d4ed29d139877b160de444add7ee63cfa7a7577d849ab2686f1aa2d5b4aae64), it changes how we load recentProgress `fromJson` to `fromJObject`, and this commit is only included in 4.0. However, when constructing a dict from `JObject`, py4j needs to make multiple RPCs to JVM which result in a long time ### Proposed Fix: Use fromJson to load StraemingQueryProgress instead. This will be aligned with the [recentProgress in pyConnect](https://github.com/apache/spark/blob/master/python/pyspark/sql/connect/streaming/query.py#L114-L130) as well We are also fixing `lastProgress` to load StreamingQueryProgress from json. ### Why are the changes needed? Otherwise there is a performance regression in recentProgress. Here is the log of the time it takes to load 70 recent progress: ``` [2025-10-22 06:28:37]Total Time: 0.12269 seconds for getting 1 progress [2025-10-22 06:28:38]Total Time: 0.199387 seconds for getting 2 progress [2025-10-22 06:28:39]Total Time: 0.384784 seconds for getting 4 progress ... [2025-10-22 06:29:27]Total Time: 4.089001 seconds for getting 48 progress [2025-10-22 06:29:32]Total Time: 4.571433 seconds for getting 53 progress [2025-10-22 06:29:38]Total Time: 5.024825 seconds for getting 58 progress [2025-10-22 06:29:45]Total Time: 5.520222 seconds for getting 64 progress [2025-10-22 06:29:52]Total Time: 6.071674 seconds for getting 71 progress ``` This is generated by adding the following test to test_streaming.py locally ``` def test_recent_progress_regression(self): df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load() q = df.writeStream.format("noop").start() print("begin waiting for progress") numProgress = len(q.recentProgress) while numProgress < 70 and q.exception() is None: time.sleep(1) beforeTime = datetime.now() rep = q.recentProgress numProgress = len(rep) afterTime = datetime.now() print("[" + afterTime.strftime("%Y-%m-%d %H:%M:%S") + "]" + "Total Time: " + str((afterTime - beforeTime).total_seconds()) + " seconds for getting " + str(numProgress) + " progress") q.stop() q.awaitTermination() assert(q.exception() is None) assert(numProgress == 300) # wrong statement so that we can see the logs ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run local script with the changes and here is the output log. ``` 2025-10-22 06:23:35]Total Time: 0.001411 seconds for getting 1 progress [2025-10-22 06:23:36]Total Time: 0.002293 seconds for getting 2 progress [2025-10-22 06:23:37]Total Time: 0.003163 seconds for getting 3 progress [2025-10-22 06:23:38]Total Time: 0.003583 seconds for getting 4 progress [2025-10-22 06:23:39]Total Time: 0.004025 seconds for getting 5 progress [2025-10-22 06:23:40]Total Time: 0.004954 seconds for getting 6 progress ... [2025-10-22 06:24:43]Total Time: 0.012729 seconds for getting 69 progress [2025-10-22 06:24:44]Total Time: 0.01289 seconds for getting 70 progress ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52688 from zifeif2/recent-progress-bug. Authored-by: Ubuntu <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
… in PySpark ### What changes were proposed in this pull request? Implement the support for TIME type in `try_make_timestamp` function in PySpark API. ### Why are the changes needed? Expand API support for the `TryMakeTimestamp` expression. ### Does this PR introduce _any_ user-facing change? Yes, the new function is now available in PySpark API. ### How was this patch tested? Added appropriate Python functions tests and examples. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52666 from uros-db/python-try-make_timestamp. Authored-by: Uros Bojanic <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…PySpark ### What changes were proposed in this pull request? Implement the support for TIME type in `make_timestamp` function in PySpark API. ### Why are the changes needed? Expand API support for the `MakeTimestamp` expression. ### Does this PR introduce _any_ user-facing change? Yes, the new function is now available in PySpark API. ### How was this patch tested? Added appropriate Python functions tests and examples. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52648 from uros-db/python-try_make_timestamp. Authored-by: Uros Bojanic <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
…ng Spark Connect query execution ### What changes were proposed in this pull request? This PR introduces a new protobuf message, PipelineAnalysisContext, in ect/common/src/main/protobuf/spark/connect/pipelines.proto. ### Why are the changes needed? Special handling is needed for spark.sql in certain contexts. This proto provides a foundation for passing such context in future. ### Does this PR introduce _any_ user-facing change? No, it only adds an internal protobuf message. ### How was this patch tested? Verified through protobuf compilation and existing test coverage. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52685 from cookiedough77/jessie.luo-data/add-analysis-context-proto. Authored-by: Jessie Luo <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
### What changes were proposed in this pull request? Upgrade Netty to 4.2.7.Final ### Why are the changes needed? Dependency updates with fixes - https://github.com/netty/netty/milestone/326?closed=1 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Passing CI ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#52695 from yaooqinn/SPARK-53981. Authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
### What changes were proposed in this pull request? Introduce two new physical types to Spark: - `PhysicalGeographyType` - `PhysicalGeometryType` This PR also adds appropriate mapping from the logical geospatial types (introduced in: apache#52491) to the new physical types. ### Why are the changes needed? Extending the implementation of GEOMETRY and GEOGRAPHY types in Spark, laying the groundwork for full geospatial data type support. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new tests to: - `GeographyValSuite` - `GeometryValSuite` Also, added appropriate test cases to: - `GeographyTypeSuite` - `GeographyTypeSuite` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52629 from uros-db/geo-physical-types. Authored-by: Uros Bojanic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…ava API ### What changes were proposed in this pull request? Introduce two new geospatial data types to Java API: - `GeographyType` - `GeometryType` Note that the GEOMETRY and GEOGRAPHY logical types were recently included to Spark SQL as part of: apache#52491. ### Why are the changes needed? Expanding on GEOMETRY and GEOGRAPHY type support across all of the supported APIs. ### Does this PR introduce _any_ user-facing change? Yes, two new data types are now available to users of the Java API. ### How was this patch tested? Added new tests to: - `JavaGeographyTypeSuite` - `JavaGeometryTypeSuite` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52623 from uros-db/geo-java-types. Authored-by: Uros Bojanic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? I apologize for making a silly mistake. https://github.com/apache/spark/actions/runs/18718856740/job/53385398202 ``` [ERROR] Could not find the selected project in the reactor: sql/connect/client/jdbc [ERROR] Could not find the selected project in the reactor: sql/connect/client/jdbc -> [Help 1] ``` The issue is, ``` elif [[ "$MODULES_TO_TEST" == "connect" && "$INPUT_BRANCH" == "branch-4.0" ]]; then ... ``` should be defined before ``` elif [[ "$MODULES_TO_TEST" == "connect" ]]; then ... ``` ### Why are the changes needed? Fix branch-4.0 daily maven test ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Monitor daily Maven CI stability. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52707 from pan3793/SPARK-53914-followup. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Kousuke Saruta <[email protected]>
### What changes were proposed in this pull request? This PR aims to add utility functions to detect JVM GCs. ### Why are the changes needed? To provide a general capability to optimize based on the GC types. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52678 from wankunde/zgc. Authored-by: WanKun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This PR adds Native KQUEUE via JNI support for transport, such as shuffle, file, and rpc procedures ### Why are the changes needed? Feature parity between Linux and MacOS/BSD platforms ### Does this PR introduce _any_ user-facing change? Yes, a new option for io.mode ### How was this patch tested? new unit tests ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#52703 from yaooqinn/SPARK-53999. Authored-by: Kent Yao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…tionSuite.Cancellation APIs in SparkSession are isolated` ### What changes were proposed in this pull request? This PR aims to reenable `SparkSessionJobTaggingAndCancellationSuite.Cancellation APIs in SparkSession are isolated`. apache#48736 disabled this test because of it was flaky. In this test, futures ran on threads managed by `ForkJoinPool`. Each future invokes `SparkSession#addTag` and `SparkSession#getTag`, and tags are implemented using `InheritableThreadLocal`. So the root cause of this issue is same as apache#52417. But apache#48906 replaced `ForkJoinPool` with `Executors.newFixedThreadPool(3)` so I believe this issue no longer occurs. In fact, this issue can be reproduced by replacing `Executors.newFixedThreadPool(3)` with `new ForkJoinPool(3)` and inserting a sleep like as follows. ``` // global ExecutionContext has only 2 threads in Apache Spark CI // create own thread pool for four Futures used in this test - val threadPool = Executors.newFixedThreadPool(3) + val threadPool = new ForkJoinPool(3) ... + Thread.sleep(1000) val jobB = Future { sessionB = globalSession.cloneSession() import globalSession.implicits._ ``` Then, run the test as follows. ``` $ build/sbt 'sql/testOnly org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite -- -z "Cancellation APIs in Spark\ Session are isolated"' ``` ``` info] - Cancellation APIs in SparkSession are isolated *** FAILED *** (2 seconds, 726 milliseconds) [info] ArrayBuffer({"spark.app.startTime"="1761192376305", "spark.rdd.scope"="{"id":"3","name":"Exchange"}", "spark.hadoop.fs.s3a.vectored.read.min.seek.size"="128K", "spark.hadoop.hadoop.caller.context.enabled"="true", "spark.memory.debugFill"="true", "spark.master.rest.enabled"="false", "spark.sql.warehouse.dir"="file:/Users/sarutak/oss/spark/sql/core/spark-warehouse", "spark.master"="local[2]", "spark.job.interruptOnCancel"="true", "spark.app.name"="test", "spark.driver.host"="192.168.1.109", "spark.app.id"="local-1761192376735", "spark.job.tags"="spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c,spark-session-8c09c25f-089c-41ee-add1-1de463658349-thread-6b832f9d-3a55-4d1f-b47d-418fc2ed05e4-one,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-execution-root-id-0,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-thread-a4b5b347-6e56-4416-b3a5-37a312bdfe34-one,spark-session-8c09c25f-089c-41ee-add1-1de463658349,spark-session-8c09c25f-089c-41ee-add1-1de463658349-thread-6b832f9d-3a55-4d1f-b47d-418fc2ed05e4-two", "spark.unsafe.exceptionOnMemoryLeak"="true", "spark.sql.execution.root.id"="0", "spark.ui.showConsoleProgress"="false", "spark.driver.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled -Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE --enable-native-access=ALL-UNNAMED", "spark.driver.port"="56972", "spark.testing"="true", "spark.hadoop.fs.s3a.vectored.read.max.merged.size"="2M", "spark.sql.execution.id"="1", "spark.rdd.scope.noOverride"="true", "spark.executor.id"="driver", "spark.port.maxRetries"="100", "spark.executor.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled -Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE --enable-native-access=ALL-UNNAMED", "spark.test.home"="/Users/sarutak/oss/spark", "spark.ui.enabled"="false"}, {"spark.app.startTime"="1761192376305", "spark.rdd.scope"="{"id":"5","name":"Exchange"}", "spark.hadoop.fs.s3a.vectored.read.min.seek.size"="128K", "spark.hadoop.hadoop.caller.context.enabled"="true", "spark.memory.debugFill"="true", "spark.master.rest.enabled"="false", "spark.sql.warehouse.dir"="file:/Users/sarutak/oss/spark/sql/core/spark-warehouse", "spark.master"="local[2]", "spark.job.interruptOnCancel"="true", "spark.app.name"="test", "spark.driver.host"="192.168.1.109", "spark.app.id"="local-1761192376735", "spark.job.tags"="spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-execution-root-id-0,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c-thread-a4b5b347-6e56-4416-b3a5-37a312bdfe34-one,spark-session-e2dd839b-2170-43c9-a8c9-1c8a24fe583c", "spark.unsafe.exceptionOnMemoryLeak"="true", "spark.sql.execution.root.id"="0", "spark.ui.showConsoleProgress"="false", "spark.driver.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled -Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE --enable-native-access=ALL-UNNAMED", "spark.driver.port"="56972", "spark.testing"="true", "spark.hadoop.fs.s3a.vectored.read.max.merged.size"="2M", "spark.sql.execution.id"="0", "spark.rdd.scope.noOverride"="true", "spark.executor.id"="driver", "spark.port.maxRetries"="100", "spark.executor.extraJavaOptions"="-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.type=pooled -Dio.netty.handler.ssl.defaultEndpointVerificationAlgorithm=NONE --enable-native-access=ALL-UNNAMED", "spark.test.home"="/Users/sarutak/oss/spark", "spark.ui.enabled"="false"}) had size 2 instead of expected size 1 (SparkSessionJobTaggingAndCancellationSuite.scala:229) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$13(SparkSessionJobTaggingAndCancellationSuite.scala:229) [info] at scala.collection.immutable.List.foreach(List.scala:323) [info] at org.apache.spark.sql.SparkSessionJobTaggingAndCancellationSuite.$anonfun$new$6(SparkSessionJobTaggingAndCancellationSuite.scala:226) [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) [info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) [info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) [info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:68) [info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:154) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:226) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:68) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:323) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) [info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564) [info] at org.scalatest.Suite.run(Suite.scala:1114) [info] at org.scalatest.Suite.run$(Suite.scala:1096) [info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68) [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [info] at java.base/java.lang.Thread.run(Thread.java:840) ``` On the other hand, if inserting sleep but leaving `Executors.newFixedThreadPool(3)` as it is, this test always seems to pass. So, we can now reenable this test. ### Why are the changes needed? For better test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The test always passes on my dev environment even if inserting sleep like explained above. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52704 from sarutak/SPARK-50205. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
6ae8768 to
bbff22d
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Shuffle data of individual shuffles are deleted from the fallback storage during regular shuffle cleanup.
Why are the changes needed?
Currently, the shuffle data are only removed from the fallback storage on Spark context shutdown. Long running Spark jobs accumulate shuffle data, though this data is not used by Spark any more. Those shuffles should be cleaned up while Spark context is running.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests and manual test via reproduction example.
Run the reproduction example without the
<<< "$scala". In the Spark shell, execute this code:This writes some data of shuffle 0 to the fallback storage.
Invoking
System.gc()removes that shuffle from the fallback storage.Was this patch authored or co-authored using generative AI tooling?
No.