Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement splitting and encoding ops, nsInfo as separate OP_MSG sections, implement prose tests #1495

Merged
merged 104 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 97 commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
48b9614
Create and document Java sync improved bulk write API
stIncMale Jul 23, 2024
af854ed
Remove the type parameter from `ClientWriteModel`
stIncMale Jul 23, 2024
1eb7466
Remove `ClientBulkWriteException.create` as we can get by with the co…
stIncMale Jul 25, 2024
5567324
Merge branch 'master' into JAVA-5527
stIncMale Jul 27, 2024
644d561
Merge branch 'master' into JAVA-5527
stIncMale Aug 13, 2024
f566303
Do minor improvements
stIncMale Aug 15, 2024
3d13ffd
Make changes needed for the implementation
stIncMale Aug 15, 2024
cf46b46
Fix formatting in ClientUpdateManyModel
stIncMale Aug 15, 2024
e049234
Make a few minor changes
stIncMale Aug 15, 2024
0e16427
Merge branch 'master' into JAVA-5527
stIncMale Aug 15, 2024
0d518d8
Add more info to the API docs, add `ClientWriteModel` subtypes
stIncMale Aug 22, 2024
6b77f78
Add `ClientWriteModelWithNamespace`
stIncMale Aug 22, 2024
2ed8e87
Implement
stIncMale Aug 8, 2024
07bac95
Sync spec tests
stIncMale Aug 15, 2024
16f100b
Implement required test runner changes
stIncMale Aug 15, 2024
9f8ce2c
Improve how `indexedNamespaces` are computed
stIncMale Aug 22, 2024
fdb90d2
Remove `throws` declarations from the API
stIncMale Aug 22, 2024
39386fe
Make wording on `ClientWriteModel` methods consistent with that on `C…
stIncMale Aug 22, 2024
f67af3f
Move constructor methods to `ClientNamespacedWriteModel`
stIncMale Aug 22, 2024
53f6883
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Aug 22, 2024
9a7b668
Fix errors caused by the merge
stIncMale Aug 22, 2024
395af7a
Use `Optional` to express verbose/summary results
stIncMale Aug 22, 2024
8f504b0
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Aug 22, 2024
b02a638
Fix errors caused by the merge
stIncMale Aug 22, 2024
a11e5f6
Make an API doc improvement
stIncMale Aug 27, 2024
bfbc1cc
Refactor `shouldAttemptToRetryWriteAndAddRetryableLabel`
stIncMale Aug 27, 2024
1c3c590
Improve `CrudProseTest.insertMustGenerateIdAtMostOnce`
stIncMale Aug 27, 2024
1c9c19e
Move `MixedBulkWriteOperation.validateAndGetEffectiveWriteConcern`/`c…
stIncMale Aug 27, 2024
061e605
Add a comment in `toClientNamespacedWriteModel`
stIncMale Aug 27, 2024
8320216
Use `CommandResultDocumentCodec` in `ClientBulkWriteOperation`
stIncMale Aug 28, 2024
f41ed59
Use `Integer` for indexes in `ClientBulkWriteResult.Verbose`. Make `C…
stIncMale Aug 28, 2024
4846e0b
Use `Integer` for indexes in `ClientBulkWriteException`.
stIncMale Aug 28, 2024
dbf9a26
Take `ClientBulkWriteException` into account in `OperationExecutor.ex…
stIncMale Aug 28, 2024
a0005cd
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Aug 28, 2024
35eee96
Fixes after the merge
stIncMale Aug 28, 2024
21bb22e
Make `ClientInsertOneResult.getInsertedId` return `Optional`
stIncMale Aug 28, 2024
182b2f9
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Aug 28, 2024
fb32fde
Fixes after the merge
stIncMale Aug 28, 2024
f026ee3
Update the documentation of `ClientBulkWriteException` and remove a TODO
stIncMale Aug 28, 2024
7372f13
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Aug 28, 2024
0fb65d4
Synchronize unified tests with https://github.com/mongodb/specificati…
stIncMale Aug 29, 2024
e36898f
Refactor `CrudProseTest` to support the reactive client
stIncMale Aug 29, 2024
70bb422
Add methods for all prose tests. Where possible, implement them
stIncMale Aug 30, 2024
3584de5
Move results to `com.mongodb.client.model.bulk`
stIncMale Aug 30, 2024
5a17ea0
Merge branch 'master' into JAVA-5527
stIncMale Aug 30, 2024
8e1d770
Move internal results to `com.mongodb.internal.client.model.bulk`
stIncMale Aug 30, 2024
e973615
Document that `bulkWrite` is not supported by serverless instances
stIncMale Aug 30, 2024
33ec764
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Aug 30, 2024
ee893f2
Fixes after the merge
stIncMale Aug 30, 2024
e33f592
Add subtypes of `ClientNamespacedWriteModel` and hide `ClientWriteModel`
stIncMale Sep 3, 2024
f94ad58
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Sep 3, 2024
310426a
Fixes after the merge
stIncMale Sep 3, 2024
301a2ba
Remove a garbage comment
stIncMale Sep 3, 2024
1a57fb6
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Sep 3, 2024
6f01e61
Replace `ConcreteClientNamespacedWriteModel` with multiple more speci…
stIncMale Sep 9, 2024
c3224e9
Rearrange `ClientWriteModel` inheritance: "one" should extend "many",…
stIncMale Sep 9, 2024
c057390
Make internal `AbstractClientNamespacedWriteModel` public
stIncMale Sep 9, 2024
dbb6ec8
Rearrange `ClientWriteModel` inheritance: neither "one" nor "many" sh…
stIncMale Sep 9, 2024
cb11c44
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Sep 9, 2024
8bd7a6a
Fixes after the merge
stIncMale Sep 9, 2024
f781bca
Make internal `AbstractClientUpdateModel`, `AbstractClientDeleteModel…
stIncMale Sep 9, 2024
ec88c02
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Sep 9, 2024
1d4a1d3
Implement branching in `ByteBufferBsonOutput` and use this type expli…
stIncMale Sep 3, 2024
14bb86e
Improve code comments in `ClientBulkWriteOperation`
stIncMale Sep 10, 2024
2127031
Introduce `OpMsgSequences` that encompasses `SplittablePayload`
stIncMale Sep 4, 2024
bcbde1e
Implement batching, limit checking, retryability per batch
stIncMale Sep 6, 2024
fcbfe08
Implement prose tests
stIncMale Sep 10, 2024
752fcbe
Merge branch 'JAVA-5528' into JAVA-5529
stIncMale Sep 13, 2024
764d7de
Address code walkthrough concerns
stIncMale Sep 16, 2024
f186b31
Fix `server-selection/logging/operation-id.json:Failed client bulkWri…
stIncMale Sep 16, 2024
3442a73
Rename an incorrectly named variable
stIncMale Sep 17, 2024
ffd4f75
Assert that `CryptConnection` observes only either `ValidatableSplitt…
stIncMale Sep 17, 2024
3fc86bb
Stop linking to `Filters` from the documentation of `ClientNamespaced…
stIncMale Sep 17, 2024
f9c960c
Merge branch 'JAVA-5528' into JAVA-5529
stIncMale Sep 18, 2024
86e5234
Update driver-core/src/main/com/mongodb/client/model/bulk/ClientBulkW…
stIncMale Sep 18, 2024
fb134f8
Update driver-core/src/main/com/mongodb/client/model/bulk/ClientDelet…
stIncMale Sep 18, 2024
a4bf4d0
Fix typos in API docs
stIncMale Sep 18, 2024
2837235
Change code to always refer to `ClientBulkWriteResult.Verbose` with t…
stIncMale Sep 18, 2024
b6702a6
Rename `ClientBulkWriteResult.Verbose` to `ClientBulkWriteResult.Verb…
stIncMale Sep 23, 2024
24ce6db
Improve partial result API documentation wording
stIncMale Sep 24, 2024
ca5d19a
Merge branch 'JAVA-5527' into JAVA-5528
stIncMale Sep 24, 2024
6d8a3e6
Fixes after the merge
stIncMale Sep 24, 2024
076d39a
Merge branch 'master' into JAVA-5528
stIncMale Sep 24, 2024
6b2b7a8
Merge branch 'JAVA-4586_bulk-write' into JAVA-5528
stIncMale Sep 24, 2024
96733a7
Merge branch 'JAVA-5528' into JAVA-5529
stIncMale Sep 24, 2024
8bda529
Fixes after the merge
stIncMale Sep 24, 2024
11653f0
Merge branch 'JAVA-4586_bulk-write' into JAVA-5529
stIncMale Sep 24, 2024
8d545ff
Fix logic for determining whether to populate `ClientBulkWriteExcepti…
stIncMale Sep 25, 2024
e2aaa2a
Update `client-bulkWrite-partialResults.json` to match the specification
stIncMale Sep 25, 2024
52bb622
Fix how `ClientBulkWriteOperation.ClientBulkWriteCommand.OpsAndNsInfo…
stIncMale Sep 27, 2024
70fd9f1
Make the changes needed for tests to pass despite async/Kotlin APIs n…
stIncMale Sep 27, 2024
038fafa
Add a DRIVERS-2997 workaround to `AbstractClientSideOperationsTimeout…
stIncMale Oct 3, 2024
f430b7e
Double the timeouts in `AbstractClientSideOperationsTimeoutProseTest.…
stIncMale Oct 3, 2024
cf175d4
Make the changes needed in `UnifiedTest` for tests to pass despite as…
stIncMale Oct 4, 2024
ea0633e
Replace lazy command document with non-lazy one
stIncMale Oct 21, 2024
9fe35e4
Create `DualMessageSequences` abstraction
stIncMale Oct 22, 2024
056d411
Get rid of `OrdinaryAndStoredBsonWriters`
stIncMale Nov 5, 2024
6f68c0e
Add `CommandMessageTest.getCommandDocumentFromClientBulkWrite`
stIncMale Nov 5, 2024
ee3073e
Remove the BSON document size validation requirement for the client b…
stIncMale Nov 14, 2024
0e78b67
Update driver-core/src/main/com/mongodb/internal/connection/CommandMe…
stIncMale Nov 21, 2024
38c880d
Address simple review concerns
stIncMale Nov 21, 2024
0156135
Extract `writeOpMsg`, `writeOpQuery` from `CommandMessage.encodeMessa…
stIncMale Nov 21, 2024
f07ff6f
Refactor `CommandMessage`
stIncMale Nov 22, 2024
3c6c09f
Refactor `BsonWriterHelper.appendElementsToDocument`
stIncMale Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bson/src/main/org/bson/AbstractBsonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,15 @@ protected void throwInvalidState(final String methodName, final State... validSt
methodName, validStatesString, state));
}

/**
* {@inheritDoc}
* <p>
* The {@link #flush()} method of {@link AbstractBsonWriter} does nothing.</p>
*/
@Override
public void flush() {
}

@Override
public void close() {
closed = true;
Expand Down
5 changes: 0 additions & 5 deletions bson/src/main/org/bson/BSONCallbackAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ protected BSONCallbackAdapter(final BsonWriterSettings settings, final BSONCallb
this.bsonCallback = bsonCallback;
}

@Override
public void flush() {
//Looks like should be no-op?
}

@Override
public void doWriteStartDocument() {
BsonContextType contextType = getState() == State.SCOPE_DOCUMENT
Expand Down
4 changes: 0 additions & 4 deletions bson/src/main/org/bson/BsonBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ public BsonBinaryWriterSettings getBinaryWriterSettings() {
return binaryWriterSettings;
}

@Override
public void flush() {
}

@Override
protected Context getContext() {
return (Context) super.getContext();
Expand Down
4 changes: 0 additions & 4 deletions bson/src/main/org/bson/BsonDocumentWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,6 @@ public void doWriteUndefined() {
write(new BsonUndefined());
}

@Override
public void flush() {
}

@Override
protected Context getContext() {
return (Context) super.getContext();
Expand Down
10 changes: 10 additions & 0 deletions bson/src/main/org/bson/io/OutputBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ public void write(final byte[] b) {
public void close() {
}

/**
* {@inheritDoc}
* <p>
* The {@link #flush()} method of {@link OutputBuffer} does nothing.</p>
*/
@Override
public void flush() throws IOException {
super.flush();
}

@Override
public void write(final byte[] bytes, final int offset, final int length) {
writeBytes(bytes, offset, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ <T> void commandAsync(String database, BsonDocument command, FieldNameValidator

<T> void commandAsync(String database, BsonDocument command, FieldNameValidator commandFieldNameValidator,
@Nullable ReadPreference readPreference, Decoder<T> commandResultDecoder,
OperationContext operationContext, boolean responseExpected, @Nullable SplittablePayload payload,
@Nullable FieldNameValidator payloadFieldNameValidator, SingleResultCallback<T> callback);
OperationContext operationContext, boolean responseExpected, MessageSequences sequences, SingleResultCallback<T> callback);

void markAsPinned(Connection.PinningMode pinningMode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,63 @@

package com.mongodb.internal.connection;

import com.mongodb.internal.connection.DualMessageSequences.EncodeDocumentsResult;
import com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryWriter;
import org.bson.BsonBinaryWriterSettings;
import org.bson.BsonDocument;
import org.bson.BsonElement;
import org.bson.BsonMaximumSizeExceededException;
import org.bson.BsonValue;
import org.bson.BsonWriter;
import org.bson.BsonWriterSettings;
import org.bson.FieldNameValidator;
import org.bson.codecs.BsonValueCodecProvider;
import org.bson.codecs.Codec;
import org.bson.codecs.Encoder;
import org.bson.codecs.EncoderContext;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.io.BsonOutput;

import java.util.List;

import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_REACHED;
import static com.mongodb.internal.connection.MessageSettings.DOCUMENT_HEADROOM_SIZE;
import static java.lang.String.format;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;

final class BsonWriterHelper {
private static final int DOCUMENT_HEADROOM = 1024 * 16;
/**
* This class is not part of the public API and may be removed or changed at any time.
*/
public final class BsonWriterHelper {
private static final CodecRegistry REGISTRY = fromProviders(new BsonValueCodecProvider());
private static final EncoderContext ENCODER_CONTEXT = EncoderContext.builder().build();

static void writeElements(final BsonWriter writer, final List<BsonElement> bsonElements) {
for (BsonElement bsonElement : bsonElements) {
writer.writeName(bsonElement.getName());
getCodec(bsonElement.getValue()).encode(writer, bsonElement.getValue(), ENCODER_CONTEXT);
static void appendElementsToDocument(
final BsonOutput bsonOutputWithDocument,
final int documentStartPosition,
@Nullable final List<BsonElement> bsonElements) {
int bsonDocumentEndingSize = 1;
int appendFrom = bsonOutputWithDocument.getPosition() - bsonDocumentEndingSize;
BsonBinaryWriter writer = createBsonBinaryWriter(bsonOutputWithDocument, NoOpFieldNameValidator.INSTANCE, null);
// change `writer`s state so that we can append elements
writer.writeStartDocument();
vbabanin marked this conversation as resolved.
Show resolved Hide resolved
bsonOutputWithDocument.truncateToPosition(appendFrom);
if (bsonElements != null) {
for (BsonElement element : bsonElements) {
String name = element.getName();
BsonValue value = element.getValue();
writer.writeName(name);
encodeUsingRegistry(writer, value);
}
}
// write the BSON document ending
vbabanin marked this conversation as resolved.
Show resolved Hide resolved
bsonOutputWithDocument.writeByte(0);
backpatchLength(documentStartPosition, bsonOutputWithDocument);
}

static void writePayloadArray(final BsonWriter writer, final BsonOutput bsonOutput, final MessageSettings settings,
Expand All @@ -65,16 +96,89 @@ static void writePayload(final BsonWriter writer, final BsonOutput bsonOutput, f
}

if (payload.getPosition() == 0) {
throw new BsonMaximumSizeExceededException(format("Payload document size is larger than maximum of %d.",
payloadSettings.getMaxDocumentSize()));
throw createBsonMaximumSizeExceededException(payloadSettings.getMaxDocumentSize());
}
}

/**
* @return See {@link DualMessageSequences#encodeDocuments(WritersProviderAndLimitsChecker)}.
*/
static EncodeDocumentsResult writeDocumentsOfDualMessageSequences(
final DualMessageSequences dualMessageSequences,
final int commandDocumentSizeInBytes,
final BsonOutput firstOutput,
final BsonOutput secondOutput,
final MessageSettings messageSettings,
final boolean validateDocumentSizeLimits) {
BsonBinaryWriter firstWriter = createBsonBinaryWriter(
firstOutput, dualMessageSequences.getFirstFieldNameValidator(), validateDocumentSizeLimits ? messageSettings : null);
BsonBinaryWriter secondWriter = createBsonBinaryWriter(
secondOutput, dualMessageSequences.getSecondFieldNameValidator(), validateDocumentSizeLimits ? messageSettings : null);
// the size of operation-agnostic command fields (a.k.a. extra elements) is counted towards `messageOverheadInBytes`
int messageOverheadInBytes = 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this limitation apply only to client bulk write?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if limitation is the right word here. But yes, this is unique to client bulk writes:

  1. A mixed bulk write does not mix different kinds of operations in a batch because it is technically impossible there. As a result, we know in advance what kind of operations is in a batch, thus knowing whether it supports retries or not. That allows us to encode extraElements before encoding the PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE section. Thus, when we are encoding the document sequence, we know exactly how much space we have left available before reaching MessageSettings.getMaxMessageSize.
  2. A client bulk write may mix different kinds of operations in a batch, which means that we know whether the batch supports retries only after encoding its PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE sections. That is, we may need to write something after writing those sections. That, in turn, means we can't know exactly how much space we have left when we encode document sequences. But whatever we write after writing the sequences, its size is bounded, and 1000 bytes is used in the spec as the value that is definitely not smaller than that bound.

int maxSizeInBytes = messageSettings.getMaxMessageSize() - (messageOverheadInBytes + commandDocumentSizeInBytes);
int firstStart = firstOutput.getPosition();
int secondStart = secondOutput.getPosition();
int maxBatchCount = messageSettings.getMaxBatchCount();
return dualMessageSequences.encodeDocuments(write -> {
vbabanin marked this conversation as resolved.
Show resolved Hide resolved
int firstBeforeWritePosition = firstOutput.getPosition();
int secondBeforeWritePosition = secondOutput.getPosition();
int batchCountAfterWrite = write.doAndGetBatchCount(firstWriter, secondWriter);
assertTrue(batchCountAfterWrite <= maxBatchCount);
int writtenSizeInBytes =
firstOutput.getPosition() - firstStart
+ secondOutput.getPosition() - secondStart;
if (writtenSizeInBytes < maxSizeInBytes && batchCountAfterWrite < maxBatchCount) {
return OK_LIMIT_NOT_REACHED;
} else if (writtenSizeInBytes > maxSizeInBytes) {
firstOutput.truncateToPosition(firstBeforeWritePosition);
secondOutput.truncateToPosition(secondBeforeWritePosition);
if (batchCountAfterWrite == 1) {
// we have failed to write a single document
throw createBsonMaximumSizeExceededException(messageSettings.getMaxDocumentSize());
}
return FAIL_LIMIT_EXCEEDED;
} else {
return OK_LIMIT_REACHED;
}
});
}

/**
* @param messageSettings Non-{@code null} iff the document size limit must be validated.
*/
static BsonBinaryWriter createBsonBinaryWriter(
final BsonOutput out,
final FieldNameValidator validator,
@Nullable final MessageSettings messageSettings) {
return new BsonBinaryWriter(
new BsonWriterSettings(),
messageSettings == null
? new BsonBinaryWriterSettings()
: new BsonBinaryWriterSettings(messageSettings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE),
out,
validator);
}

/**
* Backpatches the document/message/sequence length into the beginning of the document/message/sequence.
*
* @param startPosition The start position of the document/message/sequence in {@code bsonOutput}.
*/
static void backpatchLength(final int startPosition, final BsonOutput bsonOutput) {
int messageLength = bsonOutput.getPosition() - startPosition;
bsonOutput.writeInt32(startPosition, messageLength);
}

private static BsonMaximumSizeExceededException createBsonMaximumSizeExceededException(final int maxSize) {
return new BsonMaximumSizeExceededException(format("Payload document size is larger than maximum of %d.", maxSize));
}

private static boolean writeDocument(final BsonWriter writer, final BsonOutput bsonOutput, final MessageSettings settings,
final BsonDocument document, final int messageStartPosition, final int batchItemCount,
final int maxSplittableDocumentSize) {
int currentPosition = bsonOutput.getPosition();
getCodec(document).encode(writer, document, ENCODER_CONTEXT);
encodeUsingRegistry(writer, document);
int messageSize = bsonOutput.getPosition() - messageStartPosition;
int documentSize = bsonOutput.getPosition() - currentPosition;
if (exceedsLimits(settings, messageSize, documentSize, batchItemCount)
Expand All @@ -85,24 +189,25 @@ private static boolean writeDocument(final BsonWriter writer, final BsonOutput b
return true;
}

@SuppressWarnings({"unchecked"})
private static Codec<BsonValue> getCodec(final BsonValue bsonValue) {
return (Codec<BsonValue>) REGISTRY.get(bsonValue.getClass());
static void encodeUsingRegistry(final BsonWriter writer, final BsonValue value) {
@SuppressWarnings("unchecked")
Encoder<BsonValue> encoder = (Encoder<BsonValue>) REGISTRY.get(value.getClass());
encoder.encode(writer, value, ENCODER_CONTEXT);
}

private static MessageSettings getPayloadMessageSettings(final SplittablePayload.Type type, final MessageSettings settings) {
MessageSettings payloadMessageSettings = settings;
if (type != SplittablePayload.Type.INSERT) {
payloadMessageSettings = createMessageSettingsBuilder(settings)
.maxDocumentSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM)
.maxDocumentSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE)
.build();
}
return payloadMessageSettings;
}

private static MessageSettings getDocumentMessageSettings(final MessageSettings settings) {
return createMessageSettingsBuilder(settings)
.maxMessageSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM)
.maxMessageSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE)
.build();
}

Expand All @@ -126,8 +231,45 @@ private static boolean exceedsLimits(final MessageSettings settings, final int m
return false;
}

/**
* @return {@code writer} if {@code maxDocumentSize} is {@code null}, otherwise decorates it.
*/
public static BsonWriter decorateWithDocumentSizeChecking(final BsonBinaryWriter writer, @Nullable final Integer maxDocumentSize) {
return maxDocumentSize == null ? writer : new DocumentSizeLimitCheckingBsonBinaryWriter(writer, maxDocumentSize);
}

private BsonWriterHelper() {
}

private static final class DocumentSizeLimitCheckingBsonBinaryWriter extends LevelCountingBsonWriter {
private final int maxStoredDocumentSize;
private final BsonOutput out;
private int documentStart;

DocumentSizeLimitCheckingBsonBinaryWriter(final BsonBinaryWriter writer, final int maxStoredDocumentSize) {
super(writer);
assertTrue(maxStoredDocumentSize > 0);
this.maxStoredDocumentSize = maxStoredDocumentSize;
this.out = writer.getBsonOutput();
}

@Override
public void writeStartDocument() {
if (getCurrentLevel() == INITIAL_LEVEL) {
documentStart = out.getPosition();
}
super.writeStartDocument();
}

@Override
public void writeEndDocument() throws BsonMaximumSizeExceededException {
super.writeEndDocument();
if (getCurrentLevel() == INITIAL_LEVEL) {
int documentSize = out.getPosition() - documentStart;
if (documentSize > maxStoredDocumentSize) {
throw createBsonMaximumSizeExceededException(maxStoredDocumentSize);
}
}
}
}
}
Loading