diff --git a/bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java b/bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java index c684b777c..b887a682e 100644 --- a/bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java +++ b/bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java @@ -45,6 +45,7 @@ import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion; import org.neo4j.driver.internal.bolt.api.BoltServerAddress; import org.neo4j.driver.internal.bolt.api.DatabaseName; +import org.neo4j.driver.internal.bolt.api.ListenerEvent; import org.neo4j.driver.internal.bolt.api.LoggingProvider; import org.neo4j.driver.internal.bolt.api.MetricsListener; import org.neo4j.driver.internal.bolt.api.NotificationConfig; @@ -158,8 +159,8 @@ public CompletionStage connect( return; } - var beforeAcquiringOrCreatingEvent = metricsListener.createListenerEvent(); - metricsListener.beforeAcquiringOrCreating(poolId, beforeAcquiringOrCreatingEvent); + var acquireEvent = metricsListener.createListenerEvent(); + metricsListener.beforeAcquiringOrCreating(poolId, acquireEvent); acquisitionFuture.whenComplete((connection, throwable) -> { throwable = FutureUtil.completionExceptionCause(throwable); if (throwable != null) { @@ -167,7 +168,7 @@ public CompletionStage connect( metricsListener.afterTimedOutToAcquireOrCreate(poolId); } } else { - metricsListener.afterAcquiredOrCreated(poolId, beforeAcquiringOrCreatingEvent); + metricsListener.afterAcquiredOrCreated(poolId, acquireEvent); } metricsListener.afterAcquiringOrCreating(poolId); }); @@ -312,24 +313,28 @@ private void connect( purge(entry); metricsListener.afterConnectionReleased(poolId, inUseEvent); }); - reauthStage(entryWithMetadata, authToken).whenComplete((ignored2, throwable2) -> { - if (!acquisitionFuture.complete(pooledConnection)) { - // acquisition timed out - CompletableFuture pendingAcquisition; - synchronized (this) { - pendingAcquisition = pendingAcquisitions.poll(); - if (pendingAcquisition == null) { - // nothing pending, just make the entry available - entry.available = true; - } - } - if (pendingAcquisition != null) { - if (pendingAcquisition.complete(pooledConnection)) { - metricsListener.afterConnectionCreated(poolId, inUseEvent); - } - } + reauthStage(entryWithMetadata, authToken).whenComplete((ignored2, reauthThrowable) -> { + if (reauthThrowable != null) { + // reauth pipelining failed, purge the connection and try again + purge(entry); + connect( + acquisitionFuture, + securityPlan, + databaseName, + authToken, + authTokenStageSupplier, + mode, + bookmarks, + impersonatedUser, + minVersion, + notificationConfig); } else { - metricsListener.afterConnectionCreated(poolId, inUseEvent); + if (!acquisitionFuture.complete(pooledConnection)) { + // acquisition timed out + findAndCompletePendingAcquisition(entry, pooledConnection, inUseEvent); + } else { + metricsListener.afterConnectionCreated(poolId, inUseEvent); + } } }); } @@ -408,6 +413,25 @@ private void connect( } } + private void findAndCompletePendingAcquisition( + ConnectionEntry entry, PooledBoltConnection pooledConnection, ListenerEvent inUseEvent) { + CompletableFuture pendingAcquisition; + synchronized (this) { + pendingAcquisition = pendingAcquisitions.poll(); + if (pendingAcquisition == null) { + // nothing pending, just make the entry available + entry.available = true; + } + } + if (pendingAcquisition != null) { + if (pendingAcquisition.complete(pooledConnection)) { + metricsListener.afterConnectionCreated(poolId, inUseEvent); + } else { + findAndCompletePendingAcquisition(entry, pooledConnection, inUseEvent); + } + } + } + private synchronized ConnectionEntryWithMetadata acquireExistingEntry( AuthToken authToken, BoltProtocolVersion minVersion) { ConnectionEntryWithMetadata connectionEntryWithMetadata = null; @@ -425,6 +449,7 @@ private synchronized ConnectionEntryWithMetadata acquireExistingEntry( if (connection.state() != BoltConnectionState.OPEN) { connection.close(); iterator.remove(); + metricsListener.afterClosed(poolId); continue; } @@ -463,6 +488,7 @@ private synchronized ConnectionEntryWithMetadata acquireExistingEntry( throwable.getClass().getCanonicalName()); } }); + metricsListener.afterClosed(poolId); continue; } } @@ -483,15 +509,7 @@ private CompletionStage reauthStage( .connection .logoff() .thenCompose(conn -> conn.logon(authToken)) - .handle((ignored, throwable) -> { - if (throwable != null) { - connectionEntryWithMetadata.connectionEntry.connection.close(); - synchronized (this) { - pooledConnectionEntries.remove(connectionEntryWithMetadata.connectionEntry); - } - } - return null; - }); + .thenApply(ignored -> null); } else { stage = CompletableFuture.completedStage(null); } @@ -612,9 +630,12 @@ public CompletionStage close() { var iterator = pooledConnectionEntries.iterator(); while (iterator.hasNext()) { var entry = iterator.next(); - if (entry.connection != null && entry.connection.state() == BoltConnectionState.OPEN) { + if (entry.connection != null) { this.closeStage = this.closeStage.thenCompose( - ignored -> entry.connection.close().exceptionally(throwable -> null)); + ignored -> entry.connection.close().handle((ignored1, ignored2) -> { + metricsListener.afterClosed(poolId); + return null; + })); } iterator.remove(); } @@ -677,8 +698,8 @@ private void purge(ConnectionEntry entry) { synchronized (this) { pooledConnectionEntries.remove(entry); } - metricsListener.afterClosed(poolId); entry.connection.close(); + metricsListener.afterClosed(poolId); log.log(System.Logger.Level.DEBUG, "Connection purged from the pool."); }