Skip to content

Commit

Permalink
Update internal metrics handling
Browse files Browse the repository at this point in the history
Make sure `afterClosed` is called whenever a pooled connection is closed. In addition, handle reauth pipelining error.
  • Loading branch information
injectives committed Feb 5, 2025
1 parent 31b8f96 commit 385d38e
Showing 1 changed file with 53 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
import org.neo4j.driver.internal.bolt.api.BoltServerAddress;
import org.neo4j.driver.internal.bolt.api.DatabaseName;
import org.neo4j.driver.internal.bolt.api.ListenerEvent;
import org.neo4j.driver.internal.bolt.api.LoggingProvider;
import org.neo4j.driver.internal.bolt.api.MetricsListener;
import org.neo4j.driver.internal.bolt.api.NotificationConfig;
Expand Down Expand Up @@ -158,16 +159,16 @@ public CompletionStage<BoltConnection> connect(
return;
}

var beforeAcquiringOrCreatingEvent = metricsListener.createListenerEvent();
metricsListener.beforeAcquiringOrCreating(poolId, beforeAcquiringOrCreatingEvent);
var acquireEvent = metricsListener.createListenerEvent();
metricsListener.beforeAcquiringOrCreating(poolId, acquireEvent);
acquisitionFuture.whenComplete((connection, throwable) -> {
throwable = FutureUtil.completionExceptionCause(throwable);
if (throwable != null) {
if (throwable instanceof TimeoutException) {
metricsListener.afterTimedOutToAcquireOrCreate(poolId);
}
} else {
metricsListener.afterAcquiredOrCreated(poolId, beforeAcquiringOrCreatingEvent);
metricsListener.afterAcquiredOrCreated(poolId, acquireEvent);
}
metricsListener.afterAcquiringOrCreating(poolId);
});
Expand Down Expand Up @@ -312,24 +313,28 @@ private void connect(
purge(entry);
metricsListener.afterConnectionReleased(poolId, inUseEvent);
});
reauthStage(entryWithMetadata, authToken).whenComplete((ignored2, throwable2) -> {
if (!acquisitionFuture.complete(pooledConnection)) {
// acquisition timed out
CompletableFuture<PooledBoltConnection> pendingAcquisition;
synchronized (this) {
pendingAcquisition = pendingAcquisitions.poll();
if (pendingAcquisition == null) {
// nothing pending, just make the entry available
entry.available = true;
}
}
if (pendingAcquisition != null) {
if (pendingAcquisition.complete(pooledConnection)) {
metricsListener.afterConnectionCreated(poolId, inUseEvent);
}
}
reauthStage(entryWithMetadata, authToken).whenComplete((ignored2, reauthThrowable) -> {
if (reauthThrowable != null) {
// reauth pipelining failed, purge the connection and try again
purge(entry);
connect(
acquisitionFuture,
securityPlan,
databaseName,
authToken,
authTokenStageSupplier,
mode,
bookmarks,
impersonatedUser,
minVersion,
notificationConfig);
} else {
metricsListener.afterConnectionCreated(poolId, inUseEvent);
if (!acquisitionFuture.complete(pooledConnection)) {
// acquisition timed out
findAndCompletePendingAcquisition(entry, pooledConnection, inUseEvent);
} else {
metricsListener.afterConnectionCreated(poolId, inUseEvent);
}
}
});
}
Expand Down Expand Up @@ -408,6 +413,25 @@ private void connect(
}
}

private void findAndCompletePendingAcquisition(
ConnectionEntry entry, PooledBoltConnection pooledConnection, ListenerEvent<?> inUseEvent) {
CompletableFuture<PooledBoltConnection> pendingAcquisition;
synchronized (this) {
pendingAcquisition = pendingAcquisitions.poll();
if (pendingAcquisition == null) {
// nothing pending, just make the entry available
entry.available = true;
}
}
if (pendingAcquisition != null) {
if (pendingAcquisition.complete(pooledConnection)) {
metricsListener.afterConnectionCreated(poolId, inUseEvent);
} else {
findAndCompletePendingAcquisition(entry, pooledConnection, inUseEvent);
}
}
}

private synchronized ConnectionEntryWithMetadata acquireExistingEntry(
AuthToken authToken, BoltProtocolVersion minVersion) {
ConnectionEntryWithMetadata connectionEntryWithMetadata = null;
Expand All @@ -425,6 +449,7 @@ private synchronized ConnectionEntryWithMetadata acquireExistingEntry(
if (connection.state() != BoltConnectionState.OPEN) {
connection.close();
iterator.remove();
metricsListener.afterClosed(poolId);
continue;
}

Expand Down Expand Up @@ -463,6 +488,7 @@ private synchronized ConnectionEntryWithMetadata acquireExistingEntry(
throwable.getClass().getCanonicalName());
}
});
metricsListener.afterClosed(poolId);
continue;
}
}
Expand All @@ -483,15 +509,7 @@ private CompletionStage<Void> reauthStage(
.connection
.logoff()
.thenCompose(conn -> conn.logon(authToken))
.handle((ignored, throwable) -> {
if (throwable != null) {
connectionEntryWithMetadata.connectionEntry.connection.close();
synchronized (this) {
pooledConnectionEntries.remove(connectionEntryWithMetadata.connectionEntry);
}
}
return null;
});
.thenApply(ignored -> null);
} else {
stage = CompletableFuture.completedStage(null);
}
Expand Down Expand Up @@ -612,9 +630,12 @@ public CompletionStage<Void> close() {
var iterator = pooledConnectionEntries.iterator();
while (iterator.hasNext()) {
var entry = iterator.next();
if (entry.connection != null && entry.connection.state() == BoltConnectionState.OPEN) {
if (entry.connection != null) {
this.closeStage = this.closeStage.thenCompose(
ignored -> entry.connection.close().exceptionally(throwable -> null));
ignored -> entry.connection.close().handle((ignored1, ignored2) -> {
metricsListener.afterClosed(poolId);
return null;
}));
}
iterator.remove();
}
Expand Down Expand Up @@ -677,8 +698,8 @@ private void purge(ConnectionEntry entry) {
synchronized (this) {
pooledConnectionEntries.remove(entry);
}
metricsListener.afterClosed(poolId);
entry.connection.close();
metricsListener.afterClosed(poolId);
log.log(System.Logger.Level.DEBUG, "Connection purged from the pool.");
}

Expand Down

0 comments on commit 385d38e

Please sign in to comment.