Skip to content

Commit

Permalink
Create DualMessageSequences abstraction
Browse files Browse the repository at this point in the history
This makes `CommandMessage` generic for any command with two sequences. The new abstraction adds:

* The sequence identifiers for each sequence.
* A field name validator for both sequences.
* A `List<BsonElement>`` for any extra elements required by the splitting logic, so that `txnNumber` doesn't
  have to be treated specially.

Make `SplittablePayload` extend `OpMsgSequence`.
This brings SplittablePayload closer in design to `DualMessageSequences`,
reducing a potential source of confusion for future readers.

JAVA-5529

Co-authored-by: Jeff Yemin <[email protected]>
  • Loading branch information
stIncMale and jyemin committed Oct 22, 2024
1 parent ea0633e commit b8734d1
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.mongodb.internal.connection;

import com.mongodb.internal.operation.ClientBulkWriteOperation.ClientBulkWriteCommand;
import com.mongodb.internal.operation.ClientBulkWriteOperation.ClientBulkWriteCommand.OpsAndNsInfo.WritersProviderAndLimitsChecker;
import com.mongodb.internal.connection.DualMessageSequences.EncodeDocumentsResult;
import com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryWriter;
Expand All @@ -38,6 +38,9 @@
import java.util.List;

import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_REACHED;
import static com.mongodb.internal.connection.MessageSettings.DOCUMENT_HEADROOM_SIZE;
import static java.lang.String.format;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
Expand Down Expand Up @@ -98,46 +101,46 @@ static void writePayload(final BsonWriter writer, final BsonOutput bsonOutput, f
}

/**
* @return See {@link ClientBulkWriteCommand.OpsAndNsInfo#encode(WritersProviderAndLimitsChecker)}.
* @return See {@link DualMessageSequences#encodeDocuments(WritersProviderAndLimitsChecker)}.
*/
static ClientBulkWriteCommand.OpsAndNsInfo.EncodeResult writeOpsAndNsInfo(
final ClientBulkWriteCommand.OpsAndNsInfo opsAndNsInfo,
static EncodeDocumentsResult writeDocumentsOfDualMessageSequences(
final DualMessageSequences dualMessageSequences,
final int commandDocumentSizeInBytes,
final BsonOutput opsOut,
final BsonOutput nsInfoOut,
final BsonOutput firstOutput,
final BsonOutput secondOutput,
final MessageSettings messageSettings,
final boolean validateDocumentSizeLimits) {
BinaryOpsBsonWriters opsWriters = new BinaryOpsBsonWriters(
opsOut,
opsAndNsInfo.getFieldNameValidator(),
BinaryOrdinaryAndStoredBsonWriters firstWriters = new BinaryOrdinaryAndStoredBsonWriters(
firstOutput,
dualMessageSequences.getFirstFieldNameValidator(),
validateDocumentSizeLimits ? messageSettings : null);
BsonBinaryWriter nsInfoWriter = new BsonBinaryWriter(nsInfoOut, NoOpFieldNameValidator.INSTANCE);
BsonBinaryWriter secondWriter = new BsonBinaryWriter(secondOutput, dualMessageSequences.getSecondFieldNameValidator());
// the size of operation-agnostic command fields (a.k.a. extra elements) is counted towards `messageOverheadInBytes`
int messageOverheadInBytes = 1000;
int maxOpsAndNsInfoSizeInBytes = messageSettings.getMaxMessageSize() - (messageOverheadInBytes + commandDocumentSizeInBytes);
int opsStart = opsOut.getPosition();
int nsInfoStart = nsInfoOut.getPosition();
int maxSizeInBytes = messageSettings.getMaxMessageSize() - (messageOverheadInBytes + commandDocumentSizeInBytes);
int firstStart = firstOutput.getPosition();
int secondStart = secondOutput.getPosition();
int maxBatchCount = messageSettings.getMaxBatchCount();
return opsAndNsInfo.encode(write -> {
int opsBeforeWritePosition = opsOut.getPosition();
int nsInfoBeforeWritePosition = nsInfoOut.getPosition();
int batchCountAfterWrite = write.doAndGetBatchCount(opsWriters, nsInfoWriter);
return dualMessageSequences.encodeDocuments(write -> {
int firstBeforeWritePosition = firstOutput.getPosition();
int secondBeforeWritePosition = secondOutput.getPosition();
int batchCountAfterWrite = write.doAndGetBatchCount(firstWriters, secondWriter);
assertTrue(batchCountAfterWrite <= maxBatchCount);
int opsAndNsInfoSizeInBytes =
opsOut.getPosition() - opsStart
+ nsInfoOut.getPosition() - nsInfoStart;
if (opsAndNsInfoSizeInBytes < maxOpsAndNsInfoSizeInBytes && batchCountAfterWrite < maxBatchCount) {
return WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
} else if (opsAndNsInfoSizeInBytes > maxOpsAndNsInfoSizeInBytes) {
opsOut.truncateToPosition(opsBeforeWritePosition);
nsInfoOut.truncateToPosition(nsInfoBeforeWritePosition);
int writtenSizeInBytes =
firstOutput.getPosition() - firstStart
+ secondOutput.getPosition() - secondStart;
if (writtenSizeInBytes < maxSizeInBytes && batchCountAfterWrite < maxBatchCount) {
return OK_LIMIT_NOT_REACHED;
} else if (writtenSizeInBytes > maxSizeInBytes) {
firstOutput.truncateToPosition(firstBeforeWritePosition);
secondOutput.truncateToPosition(secondBeforeWritePosition);
if (batchCountAfterWrite == 1) {
// we have failed to write a single model
// we have failed to write a single document
throw createBsonMaximumSizeExceededException(messageSettings.getMaxDocumentSize());
}
return WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED;
return FAIL_LIMIT_EXCEEDED;
} else {
return WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_REACHED;
return OK_LIMIT_REACHED;
}
});
}
Expand Down Expand Up @@ -233,14 +236,14 @@ private static boolean exceedsLimits(final MessageSettings settings, final int m
private BsonWriterHelper() {
}

private static final class BinaryOpsBsonWriters implements WritersProviderAndLimitsChecker.OpsBsonWriters {
private static final class BinaryOrdinaryAndStoredBsonWriters implements WritersProviderAndLimitsChecker.OrdinaryAndStoredBsonWriters {
private final BsonBinaryWriter writer;
private final BsonWriter storedDocumentWriter;

/**
* @param messageSettings Non-{@code null} iff the document size limits must be validated.
*/
BinaryOpsBsonWriters(
BinaryOrdinaryAndStoredBsonWriters(
final BsonOutput out,
final FieldNameValidator validator,
@Nullable final MessageSettings messageSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.connection.MessageSequences.EmptyMessageSequences;
import com.mongodb.internal.operation.ClientBulkWriteOperation.ClientBulkWriteCommand;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
Expand Down Expand Up @@ -57,7 +56,7 @@
import static com.mongodb.connection.ServerType.STANDALONE;
import static com.mongodb.internal.connection.BsonWriterHelper.appendElementsToDocument;
import static com.mongodb.internal.connection.BsonWriterHelper.backpatchLength;
import static com.mongodb.internal.connection.BsonWriterHelper.writeOpsAndNsInfo;
import static com.mongodb.internal.connection.BsonWriterHelper.writeDocumentsOfDualMessageSequences;
import static com.mongodb.internal.connection.BsonWriterHelper.writePayload;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createList;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createOne;
Expand All @@ -81,11 +80,11 @@ public final class CommandMessage extends RequestMessage {
private final MessageSequences sequences;
private final boolean responseExpected;
/**
* {@code null} iff either {@link #sequences} is not of the {@link ClientBulkWriteCommand.OpsAndNsInfo} type,
* {@code null} iff either {@link #sequences} is not of the {@link DualMessageSequences} type,
* or it is of that type, but it has not been {@linkplain #encodeMessageBodyWithMetadata(ByteBufferBsonOutput, OperationContext) encoded}.
*/
@Nullable
private Boolean opsAndNsInfoRequireResponse;
private Boolean dualMessageSequencesRequireResponse;
private final ClusterConnectionMode clusterConnectionMode;
private final ServerApi serverApi;

Expand Down Expand Up @@ -122,7 +121,7 @@ public final class CommandMessage extends RequestMessage {
this.commandFieldNameValidator = commandFieldNameValidator;
this.readPreference = readPreference;
this.responseExpected = responseExpected;
opsAndNsInfoRequireResponse = null;
dualMessageSequencesRequireResponse = null;
this.exhaustAllowed = exhaustAllowed;
this.sequences = sequences;
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
Expand Down Expand Up @@ -206,12 +205,11 @@ boolean isResponseExpected() {
if (responseExpected) {
return true;
} else {
if (sequences instanceof ValidatableSplittablePayload) {
ValidatableSplittablePayload validatableSplittablePayload = (ValidatableSplittablePayload) sequences;
SplittablePayload payload = validatableSplittablePayload.getSplittablePayload();
if (sequences instanceof SplittablePayload) {
SplittablePayload payload = (SplittablePayload) sequences;
return payload.isOrdered() && payload.hasAnotherSplit();
} else if (sequences instanceof ClientBulkWriteCommand.OpsAndNsInfo) {
return assertNotNull(opsAndNsInfoRequireResponse);
} else if (sequences instanceof DualMessageSequences) {
return assertNotNull(dualMessageSequencesRequireResponse);
} else if (!(sequences instanceof EmptyMessageSequences)) {
fail(sequences.toString());
}
Expand All @@ -233,38 +231,34 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final ByteBufferBsonOut
bsonOutput.writeByte(0); // payload type
commandStartPosition = bsonOutput.getPosition();
ArrayList<BsonElement> extraElements = getExtraElements(operationContext);
// `OpsAndNsInfo` requires validation only if no response is expected, otherwise we must rely on the server validation
boolean validateDocumentSizeLimits = !(sequences instanceof ClientBulkWriteCommand.OpsAndNsInfo) || !responseExpected;
// `DualMessageSequences` requires validation only if no response is expected, otherwise we must rely on the server validation
boolean validateDocumentSizeLimits = !(sequences instanceof DualMessageSequences) || !responseExpected;

int commandDocumentSizeInBytes = writeDocument(command, bsonOutput, commandFieldNameValidator, validateDocumentSizeLimits);
if (sequences instanceof ValidatableSplittablePayload) {
if (sequences instanceof SplittablePayload) {
appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
ValidatableSplittablePayload validatableSplittablePayload = (ValidatableSplittablePayload) sequences;
SplittablePayload payload = validatableSplittablePayload.getSplittablePayload();
SplittablePayload payload = (SplittablePayload) sequences;
writeOpMsgSectionWithPayloadType1(bsonOutput, payload.getPayloadName(), () -> {
writePayload(
new BsonBinaryWriter(bsonOutput, validatableSplittablePayload.getFieldNameValidator()),
new BsonBinaryWriter(bsonOutput, payload.getFieldNameValidator()),
bsonOutput, getSettings(), messageStartPosition, payload, getSettings().getMaxDocumentSize()
);
return null;
});
} else if (sequences instanceof ClientBulkWriteCommand.OpsAndNsInfo) {
ClientBulkWriteCommand.OpsAndNsInfo opsAndNsInfo = (ClientBulkWriteCommand.OpsAndNsInfo) sequences;
} else if (sequences instanceof DualMessageSequences) {
DualMessageSequences dualMessageSequences = (DualMessageSequences) sequences;
try (ByteBufferBsonOutput.Branch bsonOutputBranch2 = bsonOutput.branch();
ByteBufferBsonOutput.Branch bsonOutputBranch1 = bsonOutput.branch()) {
ClientBulkWriteCommand.OpsAndNsInfo.EncodeResult opsAndNsInfoEncodeResult = writeOpMsgSectionWithPayloadType1(
bsonOutputBranch1, "ops", () ->
writeOpMsgSectionWithPayloadType1(bsonOutputBranch2, "nsInfo", () ->
writeOpsAndNsInfo(
opsAndNsInfo, commandDocumentSizeInBytes, bsonOutputBranch1,
DualMessageSequences.EncodeDocumentsResult encodeDocumentsResult = writeOpMsgSectionWithPayloadType1(
bsonOutputBranch1, dualMessageSequences.getFirstSequenceId(), () ->
writeOpMsgSectionWithPayloadType1(bsonOutputBranch2, dualMessageSequences.getSecondSequenceId(), () ->
writeDocumentsOfDualMessageSequences(
dualMessageSequences, commandDocumentSizeInBytes, bsonOutputBranch1,
bsonOutputBranch2, getSettings(), validateDocumentSizeLimits)
)
);
opsAndNsInfoRequireResponse = opsAndNsInfoEncodeResult.isServerResponseRequired();
Long txnNumber = opsAndNsInfoEncodeResult.getTxnNumber();
if (txnNumber != null) {
extraElements.add(new BsonElement(TXN_NUMBER_KEY, new BsonInt64(txnNumber)));
}
dualMessageSequencesRequireResponse = encodeDocumentsResult.isServerResponseRequired();
extraElements.addAll(encodeDocumentsResult.getExtraElements());
appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
}
} else if (sequences instanceof EmptyMessageSequences) {
Expand Down Expand Up @@ -391,6 +385,11 @@ private void addReadConcernDocument(final List<BsonElement> extraElements, final
}
}

/**
* @param sequenceId The identifier of the sequence contained in the {@code OP_MSG} section to be written.
* @param writeDocumentsAction The action that writes the documents of the sequence.
* @see <a href="https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.md">OP_MSG</a>
*/
private <R> R writeOpMsgSectionWithPayloadType1(
final ByteBufferBsonOutput bsonOutput,
final String sequenceId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.connection;

import org.bson.BsonElement;
import org.bson.BsonWriter;
import org.bson.FieldNameValidator;
import org.bson.io.BsonOutput;

import java.util.List;

/**
* Two sequences that may either be coupled or independent.
* <p>
* This class is not part of the public API and may be removed or changed at any time.</p>
*/
public abstract class DualMessageSequences extends MessageSequences {

private final String firstSequenceId;
private final FieldNameValidator firstFieldNameValidator;
private final String secondSequenceId;
private final FieldNameValidator secondFieldNameValidator;

protected DualMessageSequences(
final String firstSequenceId,
final FieldNameValidator firstFieldNameValidator,
final String secondSequenceId,
final FieldNameValidator secondFieldNameValidator) {
this.firstSequenceId = firstSequenceId;
this.firstFieldNameValidator = firstFieldNameValidator;
this.secondSequenceId = secondSequenceId;
this.secondFieldNameValidator = secondFieldNameValidator;
}

FieldNameValidator getFirstFieldNameValidator() {
return firstFieldNameValidator;
}

FieldNameValidator getSecondFieldNameValidator() {
return secondFieldNameValidator;
}

String getFirstSequenceId() {
return firstSequenceId;
}

String getSecondSequenceId() {
return secondSequenceId;
}

protected abstract EncodeDocumentsResult encodeDocuments(WritersProviderAndLimitsChecker writersProviderAndLimitsChecker);

/**
* @see #tryWrite(WriteAction)
*/
public interface WritersProviderAndLimitsChecker {
/**
* Provides writers to the specified {@link WriteAction},
* {@linkplain WriteAction#doAndGetBatchCount(OrdinaryAndStoredBsonWriters, BsonWriter) executes} it,
* checks the {@linkplain MessageSettings limits}.
* <p>
* May be called multiple times per {@link #encodeDocuments(WritersProviderAndLimitsChecker)}.</p>
*/
WriteResult tryWrite(WriteAction write);

/**
* @see #doAndGetBatchCount(OrdinaryAndStoredBsonWriters, BsonWriter)
*/
interface WriteAction {
/**
* Writes documents to the sequences using the provided writers.
*
* @return The resulting batch count since the beginning of {@link #encodeDocuments(WritersProviderAndLimitsChecker)}.
* It is generally allowed to be greater than {@link MessageSettings#getMaxBatchCount()}.
*/
// VAKOTODO pass OrdinaryAndStoredBsonWriters for both first and second?
int doAndGetBatchCount(OrdinaryAndStoredBsonWriters firstWriter, BsonWriter secondWriter);
}

interface OrdinaryAndStoredBsonWriters {
BsonWriter getWriter();

/**
* A {@link BsonWriter} to use for writing documents that are intended to be stored in a database.
* Must write to the same {@linkplain BsonOutput output} as {@link #getWriter()} does.
*/
BsonWriter getStoredDocumentWriter();
}

enum WriteResult {
FAIL_LIMIT_EXCEEDED,
OK_LIMIT_REACHED,
OK_LIMIT_NOT_REACHED
}
}

public static final class EncodeDocumentsResult {
private final boolean serverResponseRequired;
private final List<BsonElement> extraElements;

/**
* @param extraElements See {@link #getExtraElements()}.
*/
public EncodeDocumentsResult(final boolean serverResponseRequired, final List<BsonElement> extraElements) {
this.serverResponseRequired = serverResponseRequired;
this.extraElements = extraElements;
}

boolean isServerResponseRequired() {
return serverResponseRequired;
}

/**
* {@linkplain BsonElement Key/value pairs} to be added to the document contained in the {@code OP_MSG} section with payload type 0.
*/
List<BsonElement> getExtraElements() {
return extraElements;
}
}
}
Loading

0 comments on commit b8734d1

Please sign in to comment.