Skip to content

CSHARP-4706: Support for $out to Time-series collections #1223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/MongoDB.Driver.Core/Core/Misc/Feature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class Feature
private static readonly Feature __aggregateLet = new Feature("AggregateLet", WireVersion.Server36);
private static readonly Feature __aggregateMerge = new Feature("AggregateMerge", WireVersion.Server42);
private static readonly Feature __aggregateOut = new Feature("AggregateOut", WireVersion.Server26);
private static readonly Feature __aggregateOutTimeSeries = new Feature("AggregateOutTimeSeries", WireVersion.Server70);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alphabetical order (field and property).

private static readonly Feature __aggregateOutOnSecondary = new Feature("AggregateOutOnSecondary", WireVersion.Server50);
private static readonly Feature __aggregateOutToDifferentDatabase = new Feature("AggregateOutToDifferentDatabase", WireVersion.Server44);
private static readonly Feature __aggregateToString = new Feature("AggregateToString", WireVersion.Server40);
Expand Down Expand Up @@ -237,7 +238,12 @@ public class Feature
public static Feature AggregateOut => __aggregateOut;

/// <summary>
/// Gets the aggregate out on secondary feature,
/// Gets the aggregate out to time series feature.
/// </summary>
public static Feature AggregateOutTimeSeries => __aggregateOutTimeSeries;

/// <summary>
/// Gets the aggregate out on secondary feature.
/// </summary>
public static Feature AggregateOutOnSecondary => __aggregateOutOnSecondary;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ private IReadOnlyList<BsonDocument> SimplifyOutStageIfOutputDatabaseIsSameAsInpu
{
var lastStage = pipeline.Last();
var lastStageName = lastStage.GetElement(0).Name;
if (lastStageName == "$out" && lastStage["$out"] is BsonDocument outDocument)
if (lastStageName == "$out" && lastStage["$out"] is BsonDocument outDocument && !outDocument.Contains("timeseries"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have this tested directly or indirectly somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tested indirectly through the Aggregate_out_to_time_series_collection_on_secondary_should_work test and the AggregateToCollection time series tests. Without this, the simplification of the out stage will just remove the timeseries options resulting in the time series tests failing.

{
if (outDocument.TryGetValue("db", out var db) && db.IsString &&
outDocument.TryGetValue("coll", out var coll) && coll.IsString)
Expand Down
28 changes: 28 additions & 0 deletions src/MongoDB.Driver/AggregateFluent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,20 @@ public override IAsyncCursor<TResult> Out(string collectionName, CancellationTok
return Out(outputCollection, cancellationToken);
}

public override IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
{
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
var aggregate = WithPipeline(_pipeline.Out(outputCollection, timeSeriesOptions));
return aggregate.ToCursor(cancellationToken);
}

public override IAsyncCursor<TResult> Out(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
{
Ensure.IsNotNull(collectionName, nameof(collectionName));
var outputCollection = Database.GetCollection<TResult>(collectionName);
return Out(outputCollection, timeSeriesOptions, cancellationToken);
}

public override Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken)
{
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
Expand All @@ -224,6 +238,20 @@ public override Task<IAsyncCursor<TResult>> OutAsync(string collectionName, Canc
return OutAsync(outputCollection, cancellationToken);
}

public override Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
{
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
var aggregate = WithPipeline(_pipeline.Out(outputCollection, timeSeriesOptions));
return aggregate.ToCursorAsync(cancellationToken);
}

public override Task<IAsyncCursor<TResult>> OutAsync(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
{
Ensure.IsNotNull(collectionName, nameof(collectionName));
var outputCollection = Database.GetCollection<TResult>(collectionName);
return OutAsync(outputCollection, timeSeriesOptions, cancellationToken);
}

public override IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection)
{
return WithPipeline(_pipeline.Project(projection));
Expand Down
21 changes: 21 additions & 0 deletions src/MongoDB.Driver/AggregateFluentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ public virtual IAsyncCursor<TResult> Out(string collectionName, CancellationToke
throw new NotImplementedException();
}

/// <inheritdoc />
public virtual IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

/// <inheritdoc />
public virtual IAsyncCursor<TResult> Out(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

/// <inheritdoc />
public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken)
{
Expand All @@ -201,6 +213,15 @@ public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> ou
/// <inheritdoc />
public abstract Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken);

/// <inheritdoc />
public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

/// <inheritdoc />
public abstract Task<IAsyncCursor<TResult>> OutAsync(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken);

/// <inheritdoc />
public abstract IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection);

Expand Down
36 changes: 36 additions & 0 deletions src/MongoDB.Driver/IAggregateFluent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,24 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
/// <returns>A cursor.</returns>
IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
/// </summary>
/// <param name="outputCollection">The output collection.</param>
/// <param name="timeSeriesOptions">The time series options.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A cursor.</returns>
IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
/// </summary>
/// <param name="collectionName">Name of the collection.</param>
/// <param name="timeSeriesOptions">The time series options.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A cursor.</returns>
IAsyncCursor<TResult> Out(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
/// </summary>
Expand All @@ -319,6 +337,24 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
/// <returns>A Task whose result is a cursor.</returns>
Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
/// </summary>
/// <param name="outputCollection">The output collection.</param>
/// <param name="timeSeriesOptions">The time series options.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A Task whose result is a cursor.</returns>
Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
/// </summary>
/// <param name="collectionName">Name of the collection.</param>
/// <param name="timeSeriesOptions">The time series options.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A Task whose result is a cursor.</returns>
Task<IAsyncCursor<TResult>> OutAsync(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Appends a project stage to the pipeline.
/// </summary>
Expand Down
21 changes: 20 additions & 1 deletion src/MongoDB.Driver/PipelineDefinitionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,25 @@ public static PipelineDefinition<TInput, TOutput> Out<TInput, TOutput>(
return pipeline.AppendStage(PipelineStageDefinitionBuilder.Out<TOutput>(outputCollection));
}

/// <summary>
/// Appends a $out stage to the pipeline.
/// </summary>
/// <typeparam name="TInput">The type of the input documents.</typeparam>
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
/// <param name="pipeline">The pipeline.</param>
/// <param name="outputCollection">The output collection.</param>
/// <param name="timeSeriesOptions">The time-series options</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time-series => time series

And add period at end of the param description.

/// <returns>A new pipeline with an additional stage.</returns>
/// <exception cref="System.NotSupportedException"></exception>
public static PipelineDefinition<TInput, TOutput> Out<TInput, TOutput>(
this PipelineDefinition<TInput, TOutput> pipeline,
IMongoCollection<TOutput> outputCollection,
TimeSeriesOptions timeSeriesOptions)
{
Ensure.IsNotNull(pipeline, nameof(pipeline));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should either merge overloads (add timeSeriesOptions = null parameter) or forbid null here.
I personally would prefer having less overloads for simplicity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional parameters are problematic and I think we should avoid them in our public API. We can use them internally where it makes sense, but not in our public API.

Reason 1: The optional value is inserted into the calling code at compile-time for the calling code. So if you change your default value, but don't recompile the calling code, you'll still use the same value.

Reason 2: Even more insidious is that the C# compiler doesn't enforce consistency among default values. The following is perfectly valid:

var oops1 = new EvilClass();
var oops2 = oops1 as IOops;

Console.WriteLine(oops1.GetValue());
Console.WriteLine(oops2.GetValue());

interface IOops
{
    int GetValue(int x = 42);
}

class EvilClass : IOops
{
    public int GetValue(int x = 99) => x;
}

The output is:

99
42

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we are already doing this elsewhere in our Fluent Aggregate API. Sigh. I guess we can use TimeSeriesOptions timeSeriesOptions = null here to reduce the number of overloads.

return pipeline.AppendStage(PipelineStageDefinitionBuilder.Out<TOutput>(outputCollection, timeSeriesOptions));
}

/// <summary>
/// Appends a $project stage to the pipeline.
/// </summary>
Expand Down Expand Up @@ -1111,7 +1130,7 @@ public static PipelineDefinition<TInput, TOutput> ReplaceWith<TInput, TIntermedi
/// </param>
/// <param name="scoreDetails">
/// Flag that specifies whether to return a detailed breakdown
/// of the score for each document in the result.
/// of the score for each document in the result.
/// </param>
/// <returns>A new pipeline with an additional stage.</returns>
public static PipelineDefinition<TInput, TOutput> Search<TInput, TOutput>(
Expand Down
25 changes: 22 additions & 3 deletions src/MongoDB.Driver/PipelineStageDefinitionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,17 +1244,36 @@ public static PipelineStageDefinition<TInput, TOutput> OfType<TInput, TOutput>(
/// </summary>
/// <typeparam name="TInput">The type of the input documents.</typeparam>
/// <param name="outputCollection">The output collection.</param>
/// <param name="timeSeriesOptions">The time series options</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Period at end of sentence.

/// <returns>The stage.</returns>
public static PipelineStageDefinition<TInput, TInput> Out<TInput>(
IMongoCollection<TInput> outputCollection)
IMongoCollection<TInput> outputCollection,
TimeSeriesOptions timeSeriesOptions)
{
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
var outputDatabaseName = outputCollection.Database.DatabaseNamespace.DatabaseName;
var outputCollectionName = outputCollection.CollectionNamespace.CollectionName;
var outDocument = new BsonDocument { { "db", outputDatabaseName }, { "coll", outputCollectionName } };
var outDocument = new BsonDocument
{
{ "db", outputDatabaseName },
{ "coll", outputCollectionName },
{ "timeseries", () => timeSeriesOptions.ToBsonDocument(), timeSeriesOptions != null}
};
return new BsonDocumentPipelineStageDefinition<TInput, TInput>(new BsonDocument("$out", outDocument));
}

/// <summary>
/// Creates a $out stage.
/// </summary>
/// <typeparam name="TInput">The type of the input documents.</typeparam>
/// <param name="outputCollection">The output collection.</param>
/// <returns>The stage.</returns>
public static PipelineStageDefinition<TInput, TInput> Out<TInput>(
IMongoCollection<TInput> outputCollection)
{
return Out(outputCollection, null);
}

/// <summary>
/// Creates a $project stage.
/// </summary>
Expand Down Expand Up @@ -1331,7 +1350,7 @@ public static PipelineStageDefinition<TInput, TOutput> Project<TInput, TOutput>(
/// </param>
/// <param name="scoreDetails">
/// Flag that specifies whether to return a detailed breakdown
/// of the score for each document in the result.
/// of the score for each document in the result.
/// </param>
/// <returns>The stage.</returns>
public static PipelineStageDefinition<TInput, TInput> Search<TInput>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,43 @@ public void Execute_should_return_expected_result(
result.Should().HaveCount(1);
}

[Theory]
[ParameterAttributeData]
public void Execute_to_time_series_collection_should_work(
[Values(false, true)] bool usingDifferentOutputDatabase,
[Values(false, true)] bool async)
{
RequireServer.Check().Supports(Feature.AggregateOutTimeSeries);

var pipeline = new List<BsonDocument> { BsonDocument.Parse("{ $match : { _id : 1 } }") };
var inputDatabaseName = _databaseNamespace.DatabaseName;
var inputCollectionName = _collectionNamespace.CollectionName;
var outputDatabaseName = usingDifferentOutputDatabase ? $"{inputDatabaseName}-outputdatabase-timeseries" : inputDatabaseName;
var outputCollectionName = $"{inputCollectionName}-outputcollection-timeseries";

pipeline.Add(new BsonDocument { {"$set", new BsonDocument { {"time", DateTime.Now } } } } );
pipeline.Add(BsonDocument.Parse($"{{ $out : {{ db : '{outputDatabaseName}', coll : '{outputCollectionName}', timeseries: {{ timeField: 'time' }} }} }}"));

EnsureTestData();
if (usingDifferentOutputDatabase)
{
EnsureDatabaseExists(outputDatabaseName);
}
var subject = new AggregateToCollectionOperation(_collectionNamespace, pipeline, _messageEncoderSettings);

ExecuteOperation(subject, async);

var databaseNamespace = new DatabaseNamespace(outputDatabaseName);
var result = ReadAllFromCollection(new CollectionNamespace(databaseNamespace, outputCollectionName), async);

result.Should().NotBeNull();
result.Should().HaveCount(1);

var output = ListCollections(databaseNamespace);
output["cursor"]["firstBatch"][0][0].ToString().Should().Be($"{outputCollectionName}"); // checking name of collection
output["cursor"]["firstBatch"][0][1].ToString().Should().Be("timeseries"); // checking type of collection
}

[Theory]
[ParameterAttributeData]
public void Execute_should_return_expected_result_when_AllowDiskUse_is_set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,19 @@ protected void KillOpenTransactions()
}
}

protected BsonDocument ListCollections(DatabaseNamespace databaseNamespace)
{
var listCollectionsCommand = new BsonDocument
{
{ "listCollections", 1 }, { "filter", new BsonDocument { { "type", "timeseries" } } }
};

var runCommandOperation = new ReadCommandOperation<BsonDocument>(databaseNamespace, listCollectionsCommand,
BsonDocumentSerializer.Instance, _messageEncoderSettings);

return ExecuteOperation(runCommandOperation);
}

protected Profiler Profile(DatabaseNamespace databaseNamespace)
{
var op = new WriteCommandOperation<BsonDocument>(
Expand Down
Loading