Skip to content

[SPARK-51596][SS] Fix concurrent StateStoreProvider maintenance and closing #50391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

liviazhu
Copy link
Contributor

What changes were proposed in this pull request?

Moves the unload operation away from task thread into the maintenance thread. To ensure unloading still occurs ASAP (rather than potentially waiting for the maintenance interval) as was introduced by https://issues.apache.org/jira/browse/SPARK-33827, we immediately trigger a maintenance thread to do the unload.

This gives us an extra benefit that unloading other providers doesn't block the task thread. To capitalize on this, unload() should not hold the loadedProviders lock the entire time (which will block other task threads), but instead release it once it has deleted the unloading providers from the map and close the providers without the lock held.

Why are the changes needed?

Currently, both the task thread and maintenance thread can call unload() on a provider. This leads to a race condition where the maintenance could be conducting maintenance while the task thread is closing the provider, leading to unexpected behavior.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@hasnain-db hasnain-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing some drive-by reviews as I learn more about spark, excuse me if these are dumb comments :)

)
// Make maintenance interval very large (30s) so that task thread runs before maintenance.
sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 30000L)
// Use the `MaintenanceErrorOnCertainPartitionsProvider` to run the test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - this seems copied from below -- this should now be FakeStateStoreProviderTracksCloseThread right?


withSpark(SparkContext.getOrCreate(conf)) { sc =>
withCoordinatorRef(sc) { coordinatorRef =>
val rootLocation = s"${Utils.createTempDir().getAbsolutePath}/spark-48997"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
val rootLocation = s"${Utils.createTempDir().getAbsolutePath}/spark-48997"
val rootLocation = s"${Utils.createTempDir().getAbsolutePath}/spark-51596"

@liviazhu
Copy link
Contributor Author

Replicated by #50595

@liviazhu liviazhu closed this Apr 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants