Skip to content

Commit

Permalink
Implement splitting and encoding ops, nsInfo as separate OP_MSG
Browse files Browse the repository at this point in the history
… sections, implement prose tests (#1495)

JAVA-5529
JAVA-5610
JAVA-5695

---------

Co-authored-by: Viacheslav Babanin <[email protected]>
Co-authored-by: Jeff Yemin <[email protected]>
  • Loading branch information
3 people authored Nov 27, 2024
1 parent fc65ad9 commit b0e2bdf
Show file tree
Hide file tree
Showing 51 changed files with 2,702 additions and 1,269 deletions.
9 changes: 9 additions & 0 deletions bson/src/main/org/bson/AbstractBsonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,15 @@ protected void throwInvalidState(final String methodName, final State... validSt
methodName, validStatesString, state));
}

/**
* {@inheritDoc}
* <p>
* The {@link #flush()} method of {@link AbstractBsonWriter} does nothing.</p>
*/
@Override
public void flush() {
}

@Override
public void close() {
closed = true;
Expand Down
5 changes: 0 additions & 5 deletions bson/src/main/org/bson/BSONCallbackAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ protected BSONCallbackAdapter(final BsonWriterSettings settings, final BSONCallb
this.bsonCallback = bsonCallback;
}

@Override
public void flush() {
//Looks like should be no-op?
}

@Override
public void doWriteStartDocument() {
BsonContextType contextType = getState() == State.SCOPE_DOCUMENT
Expand Down
4 changes: 0 additions & 4 deletions bson/src/main/org/bson/BsonBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ public BsonBinaryWriterSettings getBinaryWriterSettings() {
return binaryWriterSettings;
}

@Override
public void flush() {
}

@Override
protected Context getContext() {
return (Context) super.getContext();
Expand Down
4 changes: 0 additions & 4 deletions bson/src/main/org/bson/BsonDocumentWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,6 @@ public void doWriteUndefined() {
write(new BsonUndefined());
}

@Override
public void flush() {
}

@Override
protected Context getContext() {
return (Context) super.getContext();
Expand Down
10 changes: 10 additions & 0 deletions bson/src/main/org/bson/io/OutputBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ public void write(final byte[] b) {
public void close() {
}

/**
* {@inheritDoc}
* <p>
* The {@link #flush()} method of {@link OutputBuffer} does nothing.</p>
*/
@Override
public void flush() throws IOException {
super.flush();
}

@Override
public void write(final byte[] bytes, final int offset, final int length) {
writeBytes(bytes, offset, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ <T> void commandAsync(String database, BsonDocument command, FieldNameValidator

<T> void commandAsync(String database, BsonDocument command, FieldNameValidator commandFieldNameValidator,
@Nullable ReadPreference readPreference, Decoder<T> commandResultDecoder,
OperationContext operationContext, boolean responseExpected, @Nullable SplittablePayload payload,
@Nullable FieldNameValidator payloadFieldNameValidator, SingleResultCallback<T> callback);
OperationContext operationContext, boolean responseExpected, MessageSequences sequences, SingleResultCallback<T> callback);

void markAsPinned(Connection.PinningMode pinningMode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,56 @@

package com.mongodb.internal.connection;

import com.mongodb.internal.connection.DualMessageSequences.EncodeDocumentsResult;
import com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker;
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryWriter;
import org.bson.BsonBinaryWriterSettings;
import org.bson.BsonContextType;
import org.bson.BsonDocument;
import org.bson.BsonElement;
import org.bson.BsonMaximumSizeExceededException;
import org.bson.BsonValue;
import org.bson.BsonWriter;
import org.bson.BsonWriterSettings;
import org.bson.FieldNameValidator;
import org.bson.codecs.BsonValueCodecProvider;
import org.bson.codecs.Codec;
import org.bson.codecs.Encoder;
import org.bson.codecs.EncoderContext;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.io.BsonOutput;

import java.util.List;

import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_REACHED;
import static com.mongodb.internal.connection.MessageSettings.DOCUMENT_HEADROOM_SIZE;
import static java.lang.String.format;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;

final class BsonWriterHelper {
private static final int DOCUMENT_HEADROOM = 1024 * 16;
/**
* This class is not part of the public API and may be removed or changed at any time.
*/
public final class BsonWriterHelper {
private static final CodecRegistry REGISTRY = fromProviders(new BsonValueCodecProvider());
private static final EncoderContext ENCODER_CONTEXT = EncoderContext.builder().build();

static void writeElements(final BsonWriter writer, final List<BsonElement> bsonElements) {
for (BsonElement bsonElement : bsonElements) {
writer.writeName(bsonElement.getName());
getCodec(bsonElement.getValue()).encode(writer, bsonElement.getValue(), ENCODER_CONTEXT);
static void appendElementsToDocument(
final BsonOutput bsonOutputWithDocument,
final int documentStartPosition,
@Nullable final List<BsonElement> bsonElements) {
if ((bsonElements == null) || bsonElements.isEmpty()) {
return;
}
try (AppendingBsonWriter writer = new AppendingBsonWriter(bsonOutputWithDocument, documentStartPosition)) {
for (BsonElement element : bsonElements) {
String name = element.getName();
BsonValue value = element.getValue();
writer.writeName(name);
encodeUsingRegistry(writer, value);
}
}
}

Expand All @@ -65,16 +90,86 @@ static void writePayload(final BsonWriter writer, final BsonOutput bsonOutput, f
}

if (payload.getPosition() == 0) {
throw new BsonMaximumSizeExceededException(format("Payload document size is larger than maximum of %d.",
payloadSettings.getMaxDocumentSize()));
throw createBsonMaximumSizeExceededException(payloadSettings.getMaxDocumentSize());
}
}

/**
* @return See {@link DualMessageSequences#encodeDocuments(WritersProviderAndLimitsChecker)}.
*/
static EncodeDocumentsResult writeDocumentsOfDualMessageSequences(
final DualMessageSequences dualMessageSequences,
final int commandDocumentSizeInBytes,
final BsonOutput firstOutput,
final BsonOutput secondOutput,
final MessageSettings messageSettings) {
BsonBinaryWriter firstWriter = createBsonBinaryWriter(firstOutput, dualMessageSequences.getFirstFieldNameValidator(), null);
BsonBinaryWriter secondWriter = createBsonBinaryWriter(secondOutput, dualMessageSequences.getSecondFieldNameValidator(), null);
// the size of operation-agnostic command fields (a.k.a. extra elements) is counted towards `messageOverheadInBytes`
int messageOverheadInBytes = 1000;
int maxSizeInBytes = messageSettings.getMaxMessageSize() - (messageOverheadInBytes + commandDocumentSizeInBytes);
int firstStart = firstOutput.getPosition();
int secondStart = secondOutput.getPosition();
int maxBatchCount = messageSettings.getMaxBatchCount();
return dualMessageSequences.encodeDocuments(writeAction -> {
int firstBeforeWritePosition = firstOutput.getPosition();
int secondBeforeWritePosition = secondOutput.getPosition();
int batchCountAfterWrite = writeAction.doAndGetBatchCount(firstWriter, secondWriter);
assertTrue(batchCountAfterWrite <= maxBatchCount);
int writtenSizeInBytes =
firstOutput.getPosition() - firstStart
+ secondOutput.getPosition() - secondStart;
if (writtenSizeInBytes < maxSizeInBytes && batchCountAfterWrite < maxBatchCount) {
return OK_LIMIT_NOT_REACHED;
} else if (writtenSizeInBytes > maxSizeInBytes) {
firstOutput.truncateToPosition(firstBeforeWritePosition);
secondOutput.truncateToPosition(secondBeforeWritePosition);
if (batchCountAfterWrite == 1) {
// we have failed to write a single document
throw createBsonMaximumSizeExceededException(messageSettings.getMaxDocumentSize());
}
return FAIL_LIMIT_EXCEEDED;
} else {
return OK_LIMIT_REACHED;
}
});
}

/**
* @param messageSettings Non-{@code null} iff the document size limit must be validated.
*/
static BsonBinaryWriter createBsonBinaryWriter(
final BsonOutput out,
final FieldNameValidator validator,
@Nullable final MessageSettings messageSettings) {
return new BsonBinaryWriter(
new BsonWriterSettings(),
messageSettings == null
? new BsonBinaryWriterSettings()
: new BsonBinaryWriterSettings(messageSettings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE),
out,
validator);
}

/**
* Backpatches the document/message/sequence length into the beginning of the document/message/sequence.
*
* @param startPosition The start position of the document/message/sequence in {@code bsonOutput}.
*/
static void backpatchLength(final int startPosition, final BsonOutput bsonOutput) {
int messageLength = bsonOutput.getPosition() - startPosition;
bsonOutput.writeInt32(startPosition, messageLength);
}

private static BsonMaximumSizeExceededException createBsonMaximumSizeExceededException(final int maxSize) {
return new BsonMaximumSizeExceededException(format("Payload document size is larger than maximum of %d.", maxSize));
}

private static boolean writeDocument(final BsonWriter writer, final BsonOutput bsonOutput, final MessageSettings settings,
final BsonDocument document, final int messageStartPosition, final int batchItemCount,
final int maxSplittableDocumentSize) {
int currentPosition = bsonOutput.getPosition();
getCodec(document).encode(writer, document, ENCODER_CONTEXT);
encodeUsingRegistry(writer, document);
int messageSize = bsonOutput.getPosition() - messageStartPosition;
int documentSize = bsonOutput.getPosition() - currentPosition;
if (exceedsLimits(settings, messageSize, documentSize, batchItemCount)
Expand All @@ -85,24 +180,25 @@ private static boolean writeDocument(final BsonWriter writer, final BsonOutput b
return true;
}

@SuppressWarnings({"unchecked"})
private static Codec<BsonValue> getCodec(final BsonValue bsonValue) {
return (Codec<BsonValue>) REGISTRY.get(bsonValue.getClass());
static void encodeUsingRegistry(final BsonWriter writer, final BsonValue value) {
@SuppressWarnings("unchecked")
Encoder<BsonValue> encoder = (Encoder<BsonValue>) REGISTRY.get(value.getClass());
encoder.encode(writer, value, ENCODER_CONTEXT);
}

private static MessageSettings getPayloadMessageSettings(final SplittablePayload.Type type, final MessageSettings settings) {
MessageSettings payloadMessageSettings = settings;
if (type != SplittablePayload.Type.INSERT) {
payloadMessageSettings = createMessageSettingsBuilder(settings)
.maxDocumentSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM)
.maxDocumentSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE)
.build();
}
return payloadMessageSettings;
}

private static MessageSettings getDocumentMessageSettings(final MessageSettings settings) {
return createMessageSettingsBuilder(settings)
.maxMessageSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM)
.maxMessageSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE)
.build();
}

Expand All @@ -126,8 +222,50 @@ private static boolean exceedsLimits(final MessageSettings settings, final int m
return false;
}

/**
* A {@link BsonWriter} that allows appending key/value pairs to a document that has been fully written to a {@link BsonOutput}.
*/
private static final class AppendingBsonWriter extends LevelCountingBsonWriter implements AutoCloseable {
private static final int INITIAL_LEVEL = DEFAULT_INITIAL_LEVEL + 1;

private BsonWriterHelper() {
/**
* @param bsonOutputWithDocument A {@link BsonOutput} {@linkplain BsonOutput#getPosition() positioned}
* immediately after the end of the document.
* @param documentStartPosition The {@linkplain BsonOutput#getPosition() position} of the start of the document
* in {@code bsonOutputWithDocument}.
*/
AppendingBsonWriter(final BsonOutput bsonOutputWithDocument, final int documentStartPosition) {
super(
new InternalAppendingBsonBinaryWriter(bsonOutputWithDocument, documentStartPosition),
INITIAL_LEVEL);
}

@Override
public void writeEndDocument() {
assertTrue(getCurrentLevel() > INITIAL_LEVEL);
super.writeEndDocument();
}

@Override
public void close() {
try (InternalAppendingBsonBinaryWriter writer = (InternalAppendingBsonBinaryWriter) getBsonWriter()) {
writer.writeEndDocument();
}
}

private static final class InternalAppendingBsonBinaryWriter extends BsonBinaryWriter {
InternalAppendingBsonBinaryWriter(final BsonOutput bsonOutputWithDocument, final int documentStartPosition) {
super(bsonOutputWithDocument);
int documentEndPosition = bsonOutputWithDocument.getPosition();
int bsonDocumentEndingSize = 1;
int appendFromPosition = documentEndPosition - bsonDocumentEndingSize;
bsonOutputWithDocument.truncateToPosition(appendFromPosition);
setState(State.NAME);
setContext(new Context(null, BsonContextType.DOCUMENT, documentStartPosition));
}
}
}

private BsonWriterHelper() {
}
}
Loading

0 comments on commit b0e2bdf

Please sign in to comment.