Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: lazy init workerPool #12427

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

abstractdog
Copy link

@abstractdog abstractdog commented Feb 28, 2025

For code paths where an ExecutorService can be provided, the default call to ThreadPools.getWorkerPool() can be deferred until the point of actual usage.

@github-actions github-actions bot added the core label Feb 28, 2025
@pvary
Copy link
Contributor

pvary commented Feb 28, 2025

Hi Laci,
Long time, no see...
What is the actual issue where you have find this problematic?

@abstractdog
Copy link
Author

abstractdog commented Feb 28, 2025

Hi Laci, Long time, no see... What is the actual issue where you have find this problematic?
Hey, long time no see Peter!

the actual use-case was HIVE-28759, where I'm about to supply an executorService to SnapshotProducer to be able to submit some jobs during HS2 shutdown

even though the API supports scanManifestsWith, I wasn't able to achive my goal, because the static final variable was initialized in when the class is first loaded and used, see:

at org.apache.iceberg.SnapshotProducer.<init>(SnapshotProducer.java:102) 

...so this is an edge-case, when:

  1. HS2 starts
  2. there are some records in the query history queue that need to be flushed (any simple non-iceberg stuff, see 3) )
  3. no iceberg operations using ThreadPools were running
  4. HS2 shutdown

I got an exception like:

Caused by: java.lang.IllegalStateException: Shutdown in progress
	at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66) ~[?:1.8.0_292]
	at java.lang.Runtime.addShutdownHook(Runtime.java:211) ~[?:1.8.0_292]
	at org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors$Application.addShutdownHook(MoreExecutors.java:289) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors$Application.addDelayedShutdownHook(MoreExecutors.java:266) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors$Application.getExitingExecutorService(MoreExecutors.java:241) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors$Application.getExitingExecutorService(MoreExecutors.java:246) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors.getExitingExecutorService(MoreExecutors.java:129) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.util.ThreadPools.newWorkerPool(ThreadPools.java:89) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.util.ThreadPools.newWorkerPool(ThreadPools.java:85) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.util.ThreadPools.<clinit>(ThreadPools.java:45) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.SnapshotProducer.<init>(SnapshotProducer.java:102) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.MergingSnapshotProducer.<init>(MergingSnapshotProducer.java:103) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.MergeAppend.<init>(MergeAppend.java:32) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.BaseTable.newAppend(BaseTable.java:180) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.commitWrite(HiveIcebergOutputCommitter.java:555) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.commitTable(HiveIcebergOutputCommitter.java:494) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.lambda$commitJobs$4(HiveIcebergOutputCommitter.java:292) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.mr.hive.HiveIcebergOutputCommitter.commitJobs(HiveIcebergOutputCommitter.java:286) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.iceberg.mr.hive.HiveIcebergStorageHandler.storageHandlerCommit(HiveIcebergStorageHandler.java:828) ~[hive-iceberg-handler-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.hadoop.hive.ql.queryhistory.repository.IcebergRepository.flush(IcebergRepository.java:134) ~[hive-exec-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at org.apache.hadoop.hive.ql.queryhistory.QueryHistoryService.lambda$doFlush$1(QueryHistoryService.java:205) ~[hive-exec-3.1.3000.2025.0.20.0-SNAPSHOT.jar:3.1.3000.2025.0.20.0-SNAPSHOT]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_292]
	... 4 more

even this is just an edge-case, the lazy init might look better in general

@pvary
Copy link
Contributor

pvary commented Feb 28, 2025

I have seen a similar issue here: #12220

@pvary
Copy link
Contributor

pvary commented Feb 28, 2025

So let me recap:

  • Hive have a task which needs to access some Iceberg tables during the shutdown
  • During the shutdown the general threadpools are terminated
  • Hive could provide their own threadpools to access the Iceberg tables
  • And the issue is that the ThreadPools.getWorkerPool() is always called (and throws an exception) - even if it is not used

@abstractdog
Copy link
Author

I have seen a similar issue here: #12220

thanks, it looks like the very same problem, so by using the default worker pool, we need to accept the default "exiting" behavior, which is fine, as long as users can use their own pools instead, that's what's 99% satisfied by scanManifestsWith interface in my case
however, for that to work, iceberg should unblock it by lazily using the workerpool, otherwise the "Shutdown in progress" cannot be avoided

@abstractdog
Copy link
Author

So let me recap:

  • Hive have a task which needs to access some Iceberg tables during the shutdown
  • During the shutdown the general threadpools are terminated
  • Hive could provide their own threadpools to access the Iceberg tables
  • And the issue is that the ThreadPools.getWorkerPool() is always called (and throws an exception) - even if it is not used

exactly!

Comment on lines 200 to +201
protected ExecutorService workerPool() {
return this.workerPool;
return Optional.ofNullable(workerPool).orElseGet(ThreadPools::getWorkerPool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked the code, and there are some places where the workerPool is accessed directly.
Is this a fancy way of telling:

if (workerPool == null) {
    this.workerPool = ThreadPools.getWorkerPool();
}

return workerPool;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think it's a cool one-liner, assuming that repeatedly calling ThreadPools::getWorkerPool is not expensive, which is true because it just returns the pool from the 2nd call:

  public static ExecutorService getWorkerPool() {
    return WORKER_POOL;
  }

Copy link
Author

@abstractdog abstractdog Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, looks like I forgot to replace with workerPool(), let me do quickly
FIXED in e50b767

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants