-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Fix][Connector-doris] DorisStreamLoad loading state mismanagement causes RecordBuffer infinite loop during shutdown #10060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job.
CI is not successful, you can refer to https://github.com/apache/seatunnel/pull/10060/checks?check_run_id=55259688662.
CI will run a few hours per time ^_^
Can you add an E2E test for enabling 2PC?
|
When will the |
|
Thanks for pointing this out. The flushing flag should be reset to false immediately after the flush operation completest. I'll include the fix for resetting the flushing state along with the new E2E test in the next commit.
Thanks for pointing this out. The flushing flag should be reset to false immediately after the flush operation completest. I'll include the fix for resetting the flushing state along with the new E2E test in the next commit. FYI, I noticed the CI failure was due to the job exceeding the 10-minute timeout limit. |
Good. you can also help fix CI ^_^ By the way, I think we can have a more in-depth communication to help you get familiar with SeaTunnel. Feel free to contact me on LinkedIn (David Zollo) or WeChat (taskflow). When adding me, please let me know your GitHub ID |
| } | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Spark 2.4, the DataWriter interface does not extend Closeable, so when the test case runs on the Spark 2.4 engine and the job fails, theclose()method of the DorisSinkWriter is never invoked. As a result, the threads inside the DorisSinkWriter remain alive and prevent the SeaTunnel job from terminating. Therefore, releasing resources here.
I'm not sure if this is a good fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davidzollo I made some new changes, please help review it when you have time, thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not allowed to perform close in abortPrepare, and other connectors do not have such an implementation.
How about implementing Closeable in Spark2.4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is impossible to implement Closeable in Spark2.4, We are unable to modify the interface of the execution engine.
The code differences between spark2 and spark3 are as follows.
spark2
https://github.com/apache/spark/blob/4be566062defa249435c4d72eb106fe7b933e023/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L146-L153
spark3
https://github.com/apache/spark/blob/2f3e4e36017d16d67086fd4ecaf39636a2fb4b7c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L477
spark2
https://github.com/apache/spark/blob/4be566062defa249435c4d72eb106fe7b933e023/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java#L59
spark3
https://github.com/apache/spark/blob/2f3e4e36017d16d67086fd4ecaf39636a2fb4b7c/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java#L63
|
|
||
| public RespContent stopLoad() throws IOException { | ||
| loading = false; | ||
| flushing = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add a separate "flushing" instead of just using "loading"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, you can take a look at the description of root cause first #10059 (comment).
As shown in the following code, the error msg in the http response will only be obtained when it is in the loading state, and this loading will be reset to false during flush. Therefore, if the flush action is executed before the http response is returned. This means that errorMessage will always be null, an infinite loop occurs, preventing the seatunnel task from stopping.
Lines 194 to 210 in de9085d
| public String getLoadFailedMsg() { | |
| if (!loading) { | |
| return null; | |
| } | |
| if (this.getPendingLoadFuture() != null && this.getPendingLoadFuture().isDone()) { | |
| String errorMessage; | |
| try { | |
| errorMessage = handlePreCommitResponse(pendingLoadFuture.get()).getMessage(); | |
| } catch (Exception e) { | |
| errorMessage = ExceptionUtils.getMessage(e); | |
| } | |
| recordStream.setErrorMessageByStreamLoad(errorMessage); | |
| return errorMessage; | |
| } else { | |
| return null; | |
| } | |
| } |
Another way to fix it is to place the action of resetting the loading to false after the endInput, that is, the loading is only considered to have ended after the streaming is truly closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve of your second plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve of your second plan.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve of your second plan.
@zhangshenghang I made some new changes, please help review it when you have time, thank you.
744e03e to
ab4b85d
Compare
|
@zhangshenghang I made some new changes, please help review it when you have time, thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical issue where DorisStreamLoad's loading state mismanagement caused RecordBuffer to enter an infinite loop during shutdown, particularly when Doris returns parsing errors (e.g., ANALYSIS_ERROR). The fix ensures proper cleanup and state management across multiple components.
Key Changes
- Moved the
loadingflag update to a finally block in DorisStreamLoad to ensure consistent state even when exceptions occur - Added try-catch-finally blocks in DorisSinkWriter.close() and SparkDataWriter.abort() to guarantee resource cleanup
- Moved sinkWriter.close() from commit() to close() in Spark 3.3 DataWriter for proper lifecycle management
- Added E2E tests to verify graceful failure handling when Doris returns cast errors
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| DorisStreamLoad.java | Moved loading = false to finally block to prevent infinite loops in RecordBuffer when exceptions occur during stopLoad() |
| DorisSinkWriter.java | Added try-catch-finally to ensure scheduledExecutorService and dorisStreamLoad are closed even if flush() fails |
| SeaTunnelSparkDataWriter.java | Moved sinkWriter.close() and WriterCloseEvent from commit() to close() method for proper resource lifecycle |
| SparkDataWriter.java | Enhanced abort() with try-catch-finally to ensure sinkWriter.close() is called even when abort operations fail |
| doris_source_and_sink_with_cast_error.conf | Test configuration for cast error scenario with 2PC disabled |
| doris_source_and_sink_with_cast_error_2pc_true.conf | Test configuration for cast error scenario with 2PC enabled |
| DorisIT.java | Added testDorisCastError() to verify graceful failure when Doris returns type cast errors, and createTypeCastErrorSinkTableForTest() to create incompatible schema |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| flush(); | ||
| } | ||
| } catch (Exception e) { | ||
| log.error("Flush data failed when close doris writer.", e); |
Copilot
AI
Dec 5, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grammar error in log message. Should be "when closing" instead of "when close".
| log.error("Flush data failed when close doris writer.", e); | |
| log.error("Flush data failed when closing doris writer.", e); |
...-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
Outdated
Show resolved
Hide resolved
...rk-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/writer/SparkDataWriter.java
Outdated
Show resolved
Hide resolved
| sinkWriter.close(); | ||
| context.getEventListener().onEvent(new WriterCloseEvent()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hisoka-X Will there be a problem? Why was it closed here before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this snippet was copied from the Spark 2 template and the author missed the subtle difference: DataWriter interface in Spark 2 doesn’t implement Closeable, so close it manually here.,(¬_¬)
4dc6c62 to
1cc4a42
Compare
|
@zhangshenghang I made some new changes, please help review it when you have time, thank you. |
…uses RecordBuffer infinite loop during shutdown
1cc4a42 to
ce235e3
Compare
Purpose of this pull request
close #10059
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide