Skip to content

Include error messages for stale primary #1714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
}

if (isStalePrimary(newDescription)) {
invalidatePotentialPrimary(newDescription);
//TODO change exception type
invalidatePotentialPrimary(newDescription, new Throwable("Primary marked stale due to electionId/setVersion mismatch"));
return false;
}

Expand Down Expand Up @@ -297,12 +298,13 @@ private boolean isStalePrimary(final ServerDescription description) {
}
}

private void invalidatePotentialPrimary(final ServerDescription newDescription) {
private void invalidatePotentialPrimary(final ServerDescription newDescription, final Throwable cause) {
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
+ "is less than one already seen of (%d, %s)",
newDescription.getAddress(), newDescription.getSetVersion(), newDescription.getElectionId(),
maxSetVersion, maxElectionId));
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();

addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting(cause);
}

/**
Expand Down Expand Up @@ -377,7 +379,8 @@ private void invalidateOldPrimaries(final ServerAddress newPrimary) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Rediscovering type of existing primary %s", serverTuple.description.getAddress()));
}
serverTuple.server.invalidate();
//TODO use specific exception type
serverTuple.server.invalidate(new Throwable("Primary marked stale due to discovery of newer primary"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ interface ClusterableServer extends Server {
/**
* Reset server description to connecting state
*/
void resetToConnecting();
void resetToConnecting(Throwable cause);

/**
* Invalidate the description of this server. Implementation of this method should not block, but rather trigger an asynchronous
* attempt to connect with the server in order to determine its current status.
*/
void invalidate();
void invalidate(Throwable cause);

/**
* <p>Closes the server. Instances that have been closed will no longer be available for use.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ private void operationEnd() {
}

@Override
public void resetToConnecting() {
sdam.update(unknownConnectingServerDescription(serverId, null));
public void resetToConnecting(final Throwable cause) {
sdam.update(unknownConnectingServerDescription(serverId, cause));
}

@Override
public void invalidate() {
public void invalidate(final Throwable cause) {
if (!isClosed()) {
sdam.handleExceptionAfterHandshake(SdamIssue.unspecified(sdam.context()));
sdam.handleExceptionAfterHandshake(SdamIssue.specific(cause, sdam.context()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ public LoadBalancedServer(final ServerId serverId, final ConnectionPool connecti
}

@Override
public void resetToConnecting() {
public void resetToConnecting(final Throwable cause) {
// no op
}

@Override
public void invalidate() {
public void invalidate(final Throwable cause) {
// no op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class DefaultServerSpecification extends Specification {

def 'invalidate should invoke server listeners'() {
given:
def serverListener = Mock(ServerListener)
def serverListener = new TestServerListener()
def connectionPool = Mock(ConnectionPool)
def sdamProvider = SameObjectProvider.<SdamServerDescriptionManager>uninitialized()
def serverMonitor = new TestServerMonitor(sdamProvider)
Expand All @@ -151,7 +151,7 @@ class DefaultServerSpecification extends Specification {
.build())

when:
server.invalidate()
server.invalidate(new Throwable())

then:
1 * serverListener.serverDescriptionChanged(_)
Expand All @@ -170,7 +170,7 @@ class DefaultServerSpecification extends Specification {
server.close()

when:
server.invalidate()
server.invalidate(new Throwable())

then:
0 * connectionPool.invalidate(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static com.mongodb.internal.connection.ClusterDescriptionHelper.getPrimaries;
import static com.mongodb.internal.event.EventListenerHelper.NO_OP_CLUSTER_LISTENER;
import static com.mongodb.internal.event.EventListenerHelper.NO_OP_SERVER_LISTENER;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -116,6 +117,14 @@ private void assertServer(final String serverName, final BsonDocument expectedSe
assertNotNull(serverDescription);
assertEquals(getServerType(expectedServerDescriptionDocument.getString("type").getValue()), serverDescription.getType());

if(expectedServerDescriptionDocument.containsKey("error")){
String expectedErrorMessage = expectedServerDescriptionDocument.getString("error").getValue();

Throwable exception = serverDescription.getException();
assertNotNull(format("Expected exception with message \"%s\" in cluster description", expectedErrorMessage), exception);
String actualErrorMessage = exception.getMessage();
assertEquals("Expected exception message is not equal to actual one", expectedErrorMessage, actualErrorMessage);
}
if (expectedServerDescriptionDocument.isObjectId("electionId")) {
assertNotNull(serverDescription.getElectionId());
assertEquals(expectedServerDescriptionDocument.getObjectId("electionId").getValue(), serverDescription.getElectionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public TestServer(final ServerAddress serverAddress, final Cluster cluster, fina
this.cluster = cluster;
this.serverListener = serverListener;
this.description = ServerDescription.builder().state(CONNECTING).address(serverId.getAddress()).build();
invalidate();
sendNotification(ServerDescription.builder().state(CONNECTING).address(serverId.getAddress()).build());
}

public void sendNotification(final ServerDescription newDescription) {
Expand All @@ -55,13 +55,13 @@ public void sendNotification(final ServerDescription newDescription) {
}

@Override
public void resetToConnecting() {
this.description = ServerDescription.builder().state(CONNECTING).address(serverId.getAddress()).build();
public void resetToConnecting(final Throwable cause) {
this.description = ServerDescription.builder().state(CONNECTING).exception(cause).address(serverId.getAddress()).build();
}

@Override
public void invalidate() {
sendNotification(ServerDescription.builder().state(CONNECTING).address(serverId.getAddress()).build());
public void invalidate(final Throwable cause) {
sendNotification(ServerDescription.builder().state(CONNECTING).exception(cause).address(serverId.getAddress()).build());
}

@Override
Expand Down