From 6865c52f7197538d2adc5db309cd96ad7f386680 Mon Sep 17 00:00:00 2001
From: Yury Yarashevich <yura.yaroshevich@gmail.com>
Date: Wed, 17 Aug 2022 00:10:46 +0200
Subject: [PATCH] Use ForkJoinPool as executor service.

---
 .../src/main/java/io/objectbox/BoxStore.java  |  5 +-
 .../java/io/objectbox/BoxStoreBuilder.java    | 12 +++
 .../internal/ObjectBoxThreadPool.java         | 91 +++++++++++--------
 3 files changed, 71 insertions(+), 37 deletions(-)

diff --git a/objectbox-java/src/main/java/io/objectbox/BoxStore.java b/objectbox-java/src/main/java/io/objectbox/BoxStore.java
index bcc31d99..ce6950b9 100644
--- a/objectbox-java/src/main/java/io/objectbox/BoxStore.java
+++ b/objectbox-java/src/main/java/io/objectbox/BoxStore.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -227,7 +228,7 @@ public static boolean isSyncServerAvailable() {
     private final int[] allEntityTypeIds;
     private final Map<Class<?>, Box<?>> boxes = new ConcurrentHashMap<>();
     private final Set<Transaction> transactions = Collections.newSetFromMap(new WeakHashMap<>());
-    private final ExecutorService threadPool = new ObjectBoxThreadPool(this);
+    private final ExecutorService threadPool;
     private final ObjectClassPublisher objectClassPublisher;
     final boolean debugTxRead;
     final boolean debugTxWrite;
@@ -257,6 +258,8 @@ public static boolean isSyncServerAvailable() {
     private SyncClient syncClient;
 
     BoxStore(BoxStoreBuilder builder) {
+        threadPool = Executors.unconfigurableExecutorService(
+            new ObjectBoxThreadPool(this, builder.executorServiceParallelism));
         context = builder.context;
         relinker = builder.relinker;
         NativeLibraryLoader.ensureLoaded();
diff --git a/objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java b/objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java
index 1497f1f6..f156f117 100644
--- a/objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java
+++ b/objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java
@@ -31,6 +31,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -94,6 +95,8 @@ public class BoxStoreBuilder {
     int maxReaders;
     boolean noReaderThreadLocals;
 
+    int executorServiceParallelism = ForkJoinPool.getCommonPoolParallelism();
+
     int queryAttempts;
 
     /** For DebugCursor. */
@@ -319,6 +322,15 @@ public BoxStoreBuilder noReaderThreadLocals() {
         return this;
     }
 
+    /**
+     * Sets the maximum allowed level of parallelism allowed by executor service
+     * used by BoxStore. The default value is equal to {@ref ForkJoinPool#getCommonPoolParallelism())}
+     */
+    public BoxStoreBuilder executorServiceParallelism(int parallelism) {
+        this.executorServiceParallelism = parallelism;
+        return this;
+    }
+
     @Internal
     public void entity(EntityInfo<?> entityInfo) {
         entityInfoList.add(entityInfo);
diff --git a/objectbox-java/src/main/java/io/objectbox/internal/ObjectBoxThreadPool.java b/objectbox-java/src/main/java/io/objectbox/internal/ObjectBoxThreadPool.java
index 41b2ccdd..c51082b5 100644
--- a/objectbox-java/src/main/java/io/objectbox/internal/ObjectBoxThreadPool.java
+++ b/objectbox-java/src/main/java/io/objectbox/internal/ObjectBoxThreadPool.java
@@ -17,65 +17,84 @@
 package io.objectbox.internal;
 
 import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import io.objectbox.BoxStore;
 import io.objectbox.annotation.apihint.Internal;
 
 /**
- * Custom thread pool similar to {@link Executors#newCachedThreadPool()} with the following adjustments:
+ * Custom executor service similar to {@link Executors#newWorkStealingPool()} with the following adjustments:
  * <ul>
- *     <li>Release thread local resources ({@link BoxStore#closeThreadResources()})</li>
- *     <li>Reduce keep-alive time for threads to 20 seconds</li>
- *     <li>Uses a ThreadFactory to name threads like "ObjectBox-1-Thread-1"</li>
+ *     <li>Release thread local resources ({@link BoxStore#closeThreadResources()}) after task execution</li>
+ *     <li>Uses a custom thread factory to name threads like "ObjectBox-ForkJoinPool-1-Thread-1"</li>
  * </ul>
  *
  */
 @Internal
-public class ObjectBoxThreadPool extends ThreadPoolExecutor {
+public final class ObjectBoxThreadPool extends AbstractExecutorService {
     private final BoxStore boxStore;
+    private final ExecutorService executorImpl;
 
-    public ObjectBoxThreadPool(BoxStore boxStore) {
-        super(0, Integer.MAX_VALUE, 20L, TimeUnit.SECONDS, new SynchronousQueue<>(),
-                new ObjectBoxThreadFactory());
+    public ObjectBoxThreadPool(BoxStore boxStore, int parallelism) {
         this.boxStore = boxStore;
+        this.executorImpl = Executors.unconfigurableExecutorService(
+            new ForkJoinPool(
+                parallelism, 
+                pool -> {
+                    ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+                    // Priority and daemon status are inherited from calling thread; ensure to reset if required
+                    if (thread.getPriority() != Thread.NORM_PRIORITY) {
+                        thread.setPriority(Thread.NORM_PRIORITY);
+                    }
+                    if (thread.isDaemon()) {
+                        thread.setDaemon(false);
+                    }
+                    thread.setName("ObjectBox-" + thread.getName());
+                    return thread;
+                },
+                null,
+                false));
     }
 
+
     @Override
-    protected void afterExecute(Runnable runnable, Throwable throwable) {
-        super.afterExecute(runnable, throwable);
-        boxStore.closeThreadResources();
+    public void shutdown() {
+        executorImpl.shutdown();
     }
 
-    static class ObjectBoxThreadFactory implements ThreadFactory {
-        private static final AtomicInteger POOL_COUNT = new AtomicInteger();
+    @Override
+    public List<Runnable> shutdownNow() {
+        return executorImpl.shutdownNow();
+    }
 
-        private final ThreadGroup group;
-        private final String namePrefix = "ObjectBox-" + POOL_COUNT.incrementAndGet() + "-Thread-";
-        private final AtomicInteger threadCount = new AtomicInteger();
+    @Override
+    public boolean isShutdown() {
+        return executorImpl.isShutdown();
+    }
 
-        ObjectBoxThreadFactory() {
-            SecurityManager securityManager = System.getSecurityManager();
-            group = (securityManager != null) ? securityManager.getThreadGroup() :
-                    Thread.currentThread().getThreadGroup();
-        }
+    @Override
+    public boolean isTerminated() {
+        return executorImpl.isTerminated();
+    }
 
-        public Thread newThread(Runnable runnable) {
-            String name = namePrefix + threadCount.incrementAndGet();
-            Thread thread = new Thread(group, runnable, name);
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        return executorImpl.awaitTermination(timeout, unit);
+    }
 
-            // Priority and daemon status are inherited from calling thread; ensure to reset if required
-            if (thread.getPriority() != Thread.NORM_PRIORITY) {
-                thread.setPriority(Thread.NORM_PRIORITY);
-            }
-            if (thread.isDaemon()) {
-                thread.setDaemon(false);
+    @Override
+    public void execute(Runnable command) {
+        executorImpl.execute(() -> {
+            try {
+                command.run();
+            } finally {
+                boxStore.closeThreadResources();
             }
-            return thread;
-        }
+        });
     }
 }