Skip to content

Commit

Permalink
Create DualMessageSequences abstraction
Browse files Browse the repository at this point in the history
This makes `CommandMessage` generic for any command with two sequences. The new abstraction adds:

* The sequence identifiers for each sequence.
* A field name validator for both sequences.
* A `List<BsonElement>`` for any extra elements required by the splitting logic, so that `txnNumber` doesn't
  have to be treated specially.

Make `SplittablePayload` extend `OpMsgSequence`.
This brings SplittablePayload closer in design to `DualMessageSequences`,
reducing a potential source of confusion for future readers.

JAVA-5529

Co-authored-by: Jeff Yemin <[email protected]>
  • Loading branch information
stIncMale and jyemin committed Oct 22, 2024
1 parent ea0633e commit 9fe35e4
Show file tree
Hide file tree
Showing 14 changed files with 269 additions and 232 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.mongodb.internal.connection;

import com.mongodb.internal.operation.ClientBulkWriteOperation.ClientBulkWriteCommand;
import com.mongodb.internal.operation.ClientBulkWriteOperation.ClientBulkWriteCommand.OpsAndNsInfo.WritersProviderAndLimitsChecker;
import com.mongodb.internal.connection.DualMessageSequences.EncodeDocumentsResult;
import com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryWriter;
Expand All @@ -38,6 +38,9 @@
import java.util.List;

import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_REACHED;
import static com.mongodb.internal.connection.MessageSettings.DOCUMENT_HEADROOM_SIZE;
import static java.lang.String.format;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
Expand Down Expand Up @@ -98,46 +101,49 @@ static void writePayload(final BsonWriter writer, final BsonOutput bsonOutput, f
}

/**
* @return See {@link ClientBulkWriteCommand.OpsAndNsInfo#encode(WritersProviderAndLimitsChecker)}.
* @return See {@link DualMessageSequences#encodeDocuments(WritersProviderAndLimitsChecker)}.
*/
static ClientBulkWriteCommand.OpsAndNsInfo.EncodeResult writeOpsAndNsInfo(
final ClientBulkWriteCommand.OpsAndNsInfo opsAndNsInfo,
static EncodeDocumentsResult writeDocumentsOfDualMessageSequences(
final DualMessageSequences dualMessageSequences,
final int commandDocumentSizeInBytes,
final BsonOutput opsOut,
final BsonOutput nsInfoOut,
final BsonOutput firstOutput,
final BsonOutput secondOutput,
final MessageSettings messageSettings,
final boolean validateDocumentSizeLimits) {
BinaryOpsBsonWriters opsWriters = new BinaryOpsBsonWriters(
opsOut,
opsAndNsInfo.getFieldNameValidator(),
BinaryOrdinaryAndStoredBsonWriters firstWriters = new BinaryOrdinaryAndStoredBsonWriters(
firstOutput,
dualMessageSequences.getFirstFieldNameValidator(),
validateDocumentSizeLimits ? messageSettings : null);
BinaryOrdinaryAndStoredBsonWriters secondWriters = new BinaryOrdinaryAndStoredBsonWriters(
secondOutput,
dualMessageSequences.getSecondFieldNameValidator(),
validateDocumentSizeLimits ? messageSettings : null);
BsonBinaryWriter nsInfoWriter = new BsonBinaryWriter(nsInfoOut, NoOpFieldNameValidator.INSTANCE);
// the size of operation-agnostic command fields (a.k.a. extra elements) is counted towards `messageOverheadInBytes`
int messageOverheadInBytes = 1000;
int maxOpsAndNsInfoSizeInBytes = messageSettings.getMaxMessageSize() - (messageOverheadInBytes + commandDocumentSizeInBytes);
int opsStart = opsOut.getPosition();
int nsInfoStart = nsInfoOut.getPosition();
int maxSizeInBytes = messageSettings.getMaxMessageSize() - (messageOverheadInBytes + commandDocumentSizeInBytes);
int firstStart = firstOutput.getPosition();
int secondStart = secondOutput.getPosition();
int maxBatchCount = messageSettings.getMaxBatchCount();
return opsAndNsInfo.encode(write -> {
int opsBeforeWritePosition = opsOut.getPosition();
int nsInfoBeforeWritePosition = nsInfoOut.getPosition();
int batchCountAfterWrite = write.doAndGetBatchCount(opsWriters, nsInfoWriter);
return dualMessageSequences.encodeDocuments(write -> {
int firstBeforeWritePosition = firstOutput.getPosition();
int secondBeforeWritePosition = secondOutput.getPosition();
int batchCountAfterWrite = write.doAndGetBatchCount(firstWriters, secondWriters);
assertTrue(batchCountAfterWrite <= maxBatchCount);
int opsAndNsInfoSizeInBytes =
opsOut.getPosition() - opsStart
+ nsInfoOut.getPosition() - nsInfoStart;
if (opsAndNsInfoSizeInBytes < maxOpsAndNsInfoSizeInBytes && batchCountAfterWrite < maxBatchCount) {
return WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
} else if (opsAndNsInfoSizeInBytes > maxOpsAndNsInfoSizeInBytes) {
opsOut.truncateToPosition(opsBeforeWritePosition);
nsInfoOut.truncateToPosition(nsInfoBeforeWritePosition);
int writtenSizeInBytes =
firstOutput.getPosition() - firstStart
+ secondOutput.getPosition() - secondStart;
if (writtenSizeInBytes < maxSizeInBytes && batchCountAfterWrite < maxBatchCount) {
return OK_LIMIT_NOT_REACHED;
} else if (writtenSizeInBytes > maxSizeInBytes) {
firstOutput.truncateToPosition(firstBeforeWritePosition);
secondOutput.truncateToPosition(secondBeforeWritePosition);
if (batchCountAfterWrite == 1) {
// we have failed to write a single model
// we have failed to write a single document
throw createBsonMaximumSizeExceededException(messageSettings.getMaxDocumentSize());
}
return WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED;
return FAIL_LIMIT_EXCEEDED;
} else {
return WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_REACHED;
return OK_LIMIT_REACHED;
}
});
}
Expand Down Expand Up @@ -233,14 +239,14 @@ private static boolean exceedsLimits(final MessageSettings settings, final int m
private BsonWriterHelper() {
}

private static final class BinaryOpsBsonWriters implements WritersProviderAndLimitsChecker.OpsBsonWriters {
private static final class BinaryOrdinaryAndStoredBsonWriters implements WritersProviderAndLimitsChecker.OrdinaryAndStoredBsonWriters {
private final BsonBinaryWriter writer;
private final BsonWriter storedDocumentWriter;

/**
* @param messageSettings Non-{@code null} iff the document size limits must be validated.
*/
BinaryOpsBsonWriters(
BinaryOrdinaryAndStoredBsonWriters(
final BsonOutput out,
final FieldNameValidator validator,
@Nullable final MessageSettings messageSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.connection.MessageSequences.EmptyMessageSequences;
import com.mongodb.internal.operation.ClientBulkWriteOperation.ClientBulkWriteCommand;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
Expand Down Expand Up @@ -57,7 +56,7 @@
import static com.mongodb.connection.ServerType.STANDALONE;
import static com.mongodb.internal.connection.BsonWriterHelper.appendElementsToDocument;
import static com.mongodb.internal.connection.BsonWriterHelper.backpatchLength;
import static com.mongodb.internal.connection.BsonWriterHelper.writeOpsAndNsInfo;
import static com.mongodb.internal.connection.BsonWriterHelper.writeDocumentsOfDualMessageSequences;
import static com.mongodb.internal.connection.BsonWriterHelper.writePayload;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createList;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createOne;
Expand All @@ -81,11 +80,11 @@ public final class CommandMessage extends RequestMessage {
private final MessageSequences sequences;
private final boolean responseExpected;
/**
* {@code null} iff either {@link #sequences} is not of the {@link ClientBulkWriteCommand.OpsAndNsInfo} type,
* {@code null} iff either {@link #sequences} is not of the {@link DualMessageSequences} type,
* or it is of that type, but it has not been {@linkplain #encodeMessageBodyWithMetadata(ByteBufferBsonOutput, OperationContext) encoded}.
*/
@Nullable
private Boolean opsAndNsInfoRequireResponse;
private Boolean dualMessageSequencesRequireResponse;
private final ClusterConnectionMode clusterConnectionMode;
private final ServerApi serverApi;

Expand Down Expand Up @@ -122,7 +121,7 @@ public final class CommandMessage extends RequestMessage {
this.commandFieldNameValidator = commandFieldNameValidator;
this.readPreference = readPreference;
this.responseExpected = responseExpected;
opsAndNsInfoRequireResponse = null;
dualMessageSequencesRequireResponse = null;
this.exhaustAllowed = exhaustAllowed;
this.sequences = sequences;
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
Expand Down Expand Up @@ -206,12 +205,11 @@ boolean isResponseExpected() {
if (responseExpected) {
return true;
} else {
if (sequences instanceof ValidatableSplittablePayload) {
ValidatableSplittablePayload validatableSplittablePayload = (ValidatableSplittablePayload) sequences;
SplittablePayload payload = validatableSplittablePayload.getSplittablePayload();
if (sequences instanceof SplittablePayload) {
SplittablePayload payload = (SplittablePayload) sequences;
return payload.isOrdered() && payload.hasAnotherSplit();
} else if (sequences instanceof ClientBulkWriteCommand.OpsAndNsInfo) {
return assertNotNull(opsAndNsInfoRequireResponse);
} else if (sequences instanceof DualMessageSequences) {
return assertNotNull(dualMessageSequencesRequireResponse);
} else if (!(sequences instanceof EmptyMessageSequences)) {
fail(sequences.toString());
}
Expand All @@ -233,38 +231,34 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final ByteBufferBsonOut
bsonOutput.writeByte(0); // payload type
commandStartPosition = bsonOutput.getPosition();
ArrayList<BsonElement> extraElements = getExtraElements(operationContext);
// `OpsAndNsInfo` requires validation only if no response is expected, otherwise we must rely on the server validation
boolean validateDocumentSizeLimits = !(sequences instanceof ClientBulkWriteCommand.OpsAndNsInfo) || !responseExpected;
// `DualMessageSequences` requires validation only if no response is expected, otherwise we must rely on the server validation
boolean validateDocumentSizeLimits = !(sequences instanceof DualMessageSequences) || !responseExpected;

int commandDocumentSizeInBytes = writeDocument(command, bsonOutput, commandFieldNameValidator, validateDocumentSizeLimits);
if (sequences instanceof ValidatableSplittablePayload) {
if (sequences instanceof SplittablePayload) {
appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
ValidatableSplittablePayload validatableSplittablePayload = (ValidatableSplittablePayload) sequences;
SplittablePayload payload = validatableSplittablePayload.getSplittablePayload();
SplittablePayload payload = (SplittablePayload) sequences;
writeOpMsgSectionWithPayloadType1(bsonOutput, payload.getPayloadName(), () -> {
writePayload(
new BsonBinaryWriter(bsonOutput, validatableSplittablePayload.getFieldNameValidator()),
new BsonBinaryWriter(bsonOutput, payload.getFieldNameValidator()),
bsonOutput, getSettings(), messageStartPosition, payload, getSettings().getMaxDocumentSize()
);
return null;
});
} else if (sequences instanceof ClientBulkWriteCommand.OpsAndNsInfo) {
ClientBulkWriteCommand.OpsAndNsInfo opsAndNsInfo = (ClientBulkWriteCommand.OpsAndNsInfo) sequences;
} else if (sequences instanceof DualMessageSequences) {
DualMessageSequences dualMessageSequences = (DualMessageSequences) sequences;
try (ByteBufferBsonOutput.Branch bsonOutputBranch2 = bsonOutput.branch();
ByteBufferBsonOutput.Branch bsonOutputBranch1 = bsonOutput.branch()) {
ClientBulkWriteCommand.OpsAndNsInfo.EncodeResult opsAndNsInfoEncodeResult = writeOpMsgSectionWithPayloadType1(
bsonOutputBranch1, "ops", () ->
writeOpMsgSectionWithPayloadType1(bsonOutputBranch2, "nsInfo", () ->
writeOpsAndNsInfo(
opsAndNsInfo, commandDocumentSizeInBytes, bsonOutputBranch1,
DualMessageSequences.EncodeDocumentsResult encodeDocumentsResult = writeOpMsgSectionWithPayloadType1(
bsonOutputBranch1, dualMessageSequences.getFirstSequenceId(), () ->
writeOpMsgSectionWithPayloadType1(bsonOutputBranch2, dualMessageSequences.getSecondSequenceId(), () ->
writeDocumentsOfDualMessageSequences(
dualMessageSequences, commandDocumentSizeInBytes, bsonOutputBranch1,
bsonOutputBranch2, getSettings(), validateDocumentSizeLimits)
)
);
opsAndNsInfoRequireResponse = opsAndNsInfoEncodeResult.isServerResponseRequired();
Long txnNumber = opsAndNsInfoEncodeResult.getTxnNumber();
if (txnNumber != null) {
extraElements.add(new BsonElement(TXN_NUMBER_KEY, new BsonInt64(txnNumber)));
}
dualMessageSequencesRequireResponse = encodeDocumentsResult.isServerResponseRequired();
extraElements.addAll(encodeDocumentsResult.getExtraElements());
appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
}
} else if (sequences instanceof EmptyMessageSequences) {
Expand Down Expand Up @@ -391,6 +385,11 @@ private void addReadConcernDocument(final List<BsonElement> extraElements, final
}
}

/**
* @param sequenceId The identifier of the sequence contained in the {@code OP_MSG} section to be written.
* @param writeDocumentsAction The action that writes the documents of the sequence.
* @see <a href="https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.md">OP_MSG</a>
*/
private <R> R writeOpMsgSectionWithPayloadType1(
final ByteBufferBsonOutput bsonOutput,
final String sequenceId,
Expand Down
Loading

0 comments on commit 9fe35e4

Please sign in to comment.