Skip to content

Commit 33d356a

Browse files
authoredDec 7, 2023
CSHARP-4706: Support for $out to Time-series collections (#1223)
1 parent 3f09282 commit 33d356a

13 files changed

+268
-20
lines changed
 

‎src/MongoDB.Driver.Core/Core/Misc/Feature.cs

+7-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class Feature
4141
private static readonly Feature __aggregateMerge = new Feature("AggregateMerge", WireVersion.Server42);
4242
private static readonly Feature __aggregateOut = new Feature("AggregateOut", WireVersion.Server26);
4343
private static readonly Feature __aggregateOutOnSecondary = new Feature("AggregateOutOnSecondary", WireVersion.Server50);
44+
private static readonly Feature __aggregateOutTimeSeries = new Feature("AggregateOutTimeSeries", WireVersion.Server70);
4445
private static readonly Feature __aggregateOutToDifferentDatabase = new Feature("AggregateOutToDifferentDatabase", WireVersion.Server44);
4546
private static readonly Feature __aggregateToString = new Feature("AggregateToString", WireVersion.Server40);
4647
private static readonly Feature __aggregateUnionWith = new Feature("AggregateUnionWith", WireVersion.Server44);
@@ -237,10 +238,15 @@ public class Feature
237238
public static Feature AggregateOut => __aggregateOut;
238239

239240
/// <summary>
240-
/// Gets the aggregate out on secondary feature,
241+
/// Gets the aggregate out on secondary feature.
241242
/// </summary>
242243
public static Feature AggregateOutOnSecondary => __aggregateOutOnSecondary;
243244

245+
/// <summary>
246+
/// Gets the aggregate out to time series feature.
247+
/// </summary>
248+
public static Feature AggregateOutTimeSeries => __aggregateOutTimeSeries;
249+
244250
/// <summary>
245251
/// Gets the aggregate out to a different database feature.
246252
/// </summary>

‎src/MongoDB.Driver.Core/Core/Operations/AggregateToCollectionOperation.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ private IReadOnlyList<BsonDocument> SimplifyOutStageIfOutputDatabaseIsSameAsInpu
330330
{
331331
var lastStage = pipeline.Last();
332332
var lastStageName = lastStage.GetElement(0).Name;
333-
if (lastStageName == "$out" && lastStage["$out"] is BsonDocument outDocument)
333+
if (lastStageName == "$out" && lastStage["$out"] is BsonDocument outDocument && !outDocument.Contains("timeseries"))
334334
{
335335
if (outDocument.TryGetValue("db", out var db) && db.IsString &&
336336
outDocument.TryGetValue("coll", out var coll) && coll.IsString)

‎src/MongoDB.Driver/AggregateFluent.cs

+28
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,20 @@ public override IAsyncCursor<TResult> Out(string collectionName, CancellationTok
210210
return Out(outputCollection, cancellationToken);
211211
}
212212

213+
public override IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
214+
{
215+
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
216+
var aggregate = WithPipeline(_pipeline.Out(outputCollection, timeSeriesOptions));
217+
return aggregate.ToCursor(cancellationToken);
218+
}
219+
220+
public override IAsyncCursor<TResult> Out(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
221+
{
222+
Ensure.IsNotNull(collectionName, nameof(collectionName));
223+
var outputCollection = Database.GetCollection<TResult>(collectionName);
224+
return Out(outputCollection, timeSeriesOptions, cancellationToken);
225+
}
226+
213227
public override Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken)
214228
{
215229
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
@@ -224,6 +238,20 @@ public override Task<IAsyncCursor<TResult>> OutAsync(string collectionName, Canc
224238
return OutAsync(outputCollection, cancellationToken);
225239
}
226240

241+
public override Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
242+
{
243+
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
244+
var aggregate = WithPipeline(_pipeline.Out(outputCollection, timeSeriesOptions));
245+
return aggregate.ToCursorAsync(cancellationToken);
246+
}
247+
248+
public override Task<IAsyncCursor<TResult>> OutAsync(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
249+
{
250+
Ensure.IsNotNull(collectionName, nameof(collectionName));
251+
var outputCollection = Database.GetCollection<TResult>(collectionName);
252+
return OutAsync(outputCollection, timeSeriesOptions, cancellationToken);
253+
}
254+
227255
public override IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection)
228256
{
229257
return WithPipeline(_pipeline.Project(projection));

‎src/MongoDB.Driver/AggregateFluentBase.cs

+21
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,18 @@ public virtual IAsyncCursor<TResult> Out(string collectionName, CancellationToke
192192
throw new NotImplementedException();
193193
}
194194

195+
/// <inheritdoc />
196+
public virtual IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
197+
{
198+
throw new NotImplementedException();
199+
}
200+
201+
/// <inheritdoc />
202+
public virtual IAsyncCursor<TResult> Out(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
203+
{
204+
throw new NotImplementedException();
205+
}
206+
195207
/// <inheritdoc />
196208
public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, CancellationToken cancellationToken)
197209
{
@@ -201,6 +213,15 @@ public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> ou
201213
/// <inheritdoc />
202214
public abstract Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken);
203215

216+
/// <inheritdoc />
217+
public virtual Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken)
218+
{
219+
throw new NotImplementedException();
220+
}
221+
222+
/// <inheritdoc />
223+
public abstract Task<IAsyncCursor<TResult>> OutAsync(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken);
224+
204225
/// <inheritdoc />
205226
public abstract IAggregateFluent<TNewResult> Project<TNewResult>(ProjectionDefinition<TResult, TNewResult> projection);
206227

‎src/MongoDB.Driver/IAggregateFluent.cs

+36
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,24 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
303303
/// <returns>A cursor.</returns>
304304
IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken = default(CancellationToken));
305305

306+
/// <summary>
307+
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
308+
/// </summary>
309+
/// <param name="outputCollection">The output collection.</param>
310+
/// <param name="timeSeriesOptions">The time series options.</param>
311+
/// <param name="cancellationToken">The cancellation token.</param>
312+
/// <returns>A cursor.</returns>
313+
IAsyncCursor<TResult> Out(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken = default(CancellationToken));
314+
315+
/// <summary>
316+
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
317+
/// </summary>
318+
/// <param name="collectionName">Name of the collection.</param>
319+
/// <param name="timeSeriesOptions">The time series options.</param>
320+
/// <param name="cancellationToken">The cancellation token.</param>
321+
/// <returns>A cursor.</returns>
322+
IAsyncCursor<TResult> Out(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken = default(CancellationToken));
323+
306324
/// <summary>
307325
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
308326
/// </summary>
@@ -319,6 +337,24 @@ IAggregateFluent<TNewResult> Lookup<TForeignDocument, TAsElement, TAs, TNewResul
319337
/// <returns>A Task whose result is a cursor.</returns>
320338
Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken = default(CancellationToken));
321339

340+
/// <summary>
341+
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
342+
/// </summary>
343+
/// <param name="outputCollection">The output collection.</param>
344+
/// <param name="timeSeriesOptions">The time series options.</param>
345+
/// <param name="cancellationToken">The cancellation token.</param>
346+
/// <returns>A Task whose result is a cursor.</returns>
347+
Task<IAsyncCursor<TResult>> OutAsync(IMongoCollection<TResult> outputCollection, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken = default(CancellationToken));
348+
349+
/// <summary>
350+
/// Appends an out stage to the pipeline and executes it, and then returns a cursor to read the contents of the output collection.
351+
/// </summary>
352+
/// <param name="collectionName">Name of the collection.</param>
353+
/// <param name="timeSeriesOptions">The time series options.</param>
354+
/// <param name="cancellationToken">The cancellation token.</param>
355+
/// <returns>A Task whose result is a cursor.</returns>
356+
Task<IAsyncCursor<TResult>> OutAsync(string collectionName, TimeSeriesOptions timeSeriesOptions, CancellationToken cancellationToken = default(CancellationToken));
357+
322358
/// <summary>
323359
/// Appends a project stage to the pipeline.
324360
/// </summary>

‎src/MongoDB.Driver/PipelineDefinitionBuilder.cs

+5-3
Original file line numberDiff line numberDiff line change
@@ -960,14 +960,16 @@ public static PipelineDefinition<TInput, TOutput> Merge<TInput, TIntermediate, T
960960
/// <typeparam name="TOutput">The type of the output documents.</typeparam>
961961
/// <param name="pipeline">The pipeline.</param>
962962
/// <param name="outputCollection">The output collection.</param>
963+
/// <param name="timeSeriesOptions">The time series options.</param>
963964
/// <returns>A new pipeline with an additional stage.</returns>
964965
/// <exception cref="System.NotSupportedException"></exception>
965966
public static PipelineDefinition<TInput, TOutput> Out<TInput, TOutput>(
966967
this PipelineDefinition<TInput, TOutput> pipeline,
967-
IMongoCollection<TOutput> outputCollection)
968+
IMongoCollection<TOutput> outputCollection,
969+
TimeSeriesOptions timeSeriesOptions = null)
968970
{
969971
Ensure.IsNotNull(pipeline, nameof(pipeline));
970-
return pipeline.AppendStage(PipelineStageDefinitionBuilder.Out<TOutput>(outputCollection));
972+
return pipeline.AppendStage(PipelineStageDefinitionBuilder.Out<TOutput>(outputCollection, timeSeriesOptions));
971973
}
972974

973975
/// <summary>
@@ -1111,7 +1113,7 @@ public static PipelineDefinition<TInput, TOutput> ReplaceWith<TInput, TIntermedi
11111113
/// </param>
11121114
/// <param name="scoreDetails">
11131115
/// Flag that specifies whether to return a detailed breakdown
1114-
/// of the score for each document in the result.
1116+
/// of the score for each document in the result.
11151117
/// </param>
11161118
/// <returns>A new pipeline with an additional stage.</returns>
11171119
public static PipelineDefinition<TInput, TOutput> Search<TInput, TOutput>(

‎src/MongoDB.Driver/PipelineStageDefinitionBuilder.cs

+10-3
Original file line numberDiff line numberDiff line change
@@ -1244,14 +1244,21 @@ public static PipelineStageDefinition<TInput, TOutput> OfType<TInput, TOutput>(
12441244
/// </summary>
12451245
/// <typeparam name="TInput">The type of the input documents.</typeparam>
12461246
/// <param name="outputCollection">The output collection.</param>
1247+
/// <param name="timeSeriesOptions">The time series options.</param>
12471248
/// <returns>The stage.</returns>
12481249
public static PipelineStageDefinition<TInput, TInput> Out<TInput>(
1249-
IMongoCollection<TInput> outputCollection)
1250+
IMongoCollection<TInput> outputCollection,
1251+
TimeSeriesOptions timeSeriesOptions = null)
12501252
{
12511253
Ensure.IsNotNull(outputCollection, nameof(outputCollection));
12521254
var outputDatabaseName = outputCollection.Database.DatabaseNamespace.DatabaseName;
12531255
var outputCollectionName = outputCollection.CollectionNamespace.CollectionName;
1254-
var outDocument = new BsonDocument { { "db", outputDatabaseName }, { "coll", outputCollectionName } };
1256+
var outDocument = new BsonDocument
1257+
{
1258+
{ "db", outputDatabaseName },
1259+
{ "coll", outputCollectionName },
1260+
{ "timeseries", () => timeSeriesOptions.ToBsonDocument(), timeSeriesOptions != null}
1261+
};
12551262
return new BsonDocumentPipelineStageDefinition<TInput, TInput>(new BsonDocument("$out", outDocument));
12561263
}
12571264

@@ -1331,7 +1338,7 @@ public static PipelineStageDefinition<TInput, TOutput> Project<TInput, TOutput>(
13311338
/// </param>
13321339
/// <param name="scoreDetails">
13331340
/// Flag that specifies whether to return a detailed breakdown
1334-
/// of the score for each document in the result.
1341+
/// of the score for each document in the result.
13351342
/// </param>
13361343
/// <returns>The stage.</returns>
13371344
public static PipelineStageDefinition<TInput, TInput> Search<TInput>(

‎tests/MongoDB.Driver.Core.Tests/Core/Operations/AggregateToCollectionOperationTests.cs

+37
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,43 @@ public void Execute_should_return_expected_result(
607607
result.Should().HaveCount(1);
608608
}
609609

610+
[Theory]
611+
[ParameterAttributeData]
612+
public void Execute_to_time_series_collection_should_work(
613+
[Values(false, true)] bool usingDifferentOutputDatabase,
614+
[Values(false, true)] bool async)
615+
{
616+
RequireServer.Check().Supports(Feature.AggregateOutTimeSeries);
617+
618+
var pipeline = new List<BsonDocument> { BsonDocument.Parse("{ $match : { _id : 1 } }") };
619+
var inputDatabaseName = _databaseNamespace.DatabaseName;
620+
var inputCollectionName = _collectionNamespace.CollectionName;
621+
var outputDatabaseName = usingDifferentOutputDatabase ? $"{inputDatabaseName}-outputdatabase-timeseries" : inputDatabaseName;
622+
var outputCollectionName = $"{inputCollectionName}-outputcollection-timeseries";
623+
624+
pipeline.Add(new BsonDocument { {"$set", new BsonDocument { {"time", DateTime.Now } } } } );
625+
pipeline.Add(BsonDocument.Parse($"{{ $out : {{ db : '{outputDatabaseName}', coll : '{outputCollectionName}', timeseries: {{ timeField: 'time' }} }} }}"));
626+
627+
EnsureTestData();
628+
if (usingDifferentOutputDatabase)
629+
{
630+
EnsureDatabaseExists(outputDatabaseName);
631+
}
632+
var subject = new AggregateToCollectionOperation(_collectionNamespace, pipeline, _messageEncoderSettings);
633+
634+
ExecuteOperation(subject, async);
635+
636+
var databaseNamespace = new DatabaseNamespace(outputDatabaseName);
637+
var result = ReadAllFromCollection(new CollectionNamespace(databaseNamespace, outputCollectionName), async);
638+
639+
result.Should().NotBeNull();
640+
result.Should().HaveCount(1);
641+
642+
var output = ListCollections(databaseNamespace);
643+
output["cursor"]["firstBatch"][0][0].ToString().Should().Be($"{outputCollectionName}"); // checking name of collection
644+
output["cursor"]["firstBatch"][0][1].ToString().Should().Be("timeseries"); // checking type of collection
645+
}
646+
610647
[Theory]
611648
[ParameterAttributeData]
612649
public void Execute_should_return_expected_result_when_AllowDiskUse_is_set(

‎tests/MongoDB.Driver.Core.Tests/Core/Operations/OperationTestBase.cs

+13
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,19 @@ protected void KillOpenTransactions()
347347
}
348348
}
349349

350+
protected BsonDocument ListCollections(DatabaseNamespace databaseNamespace)
351+
{
352+
var listCollectionsCommand = new BsonDocument
353+
{
354+
{ "listCollections", 1 }, { "filter", new BsonDocument { { "type", "timeseries" } } }
355+
};
356+
357+
var runCommandOperation = new ReadCommandOperation<BsonDocument>(databaseNamespace, listCollectionsCommand,
358+
BsonDocumentSerializer.Instance, _messageEncoderSettings);
359+
360+
return ExecuteOperation(runCommandOperation);
361+
}
362+
350363
protected Profiler Profile(DatabaseNamespace databaseNamespace)
351364
{
352365
var op = new WriteCommandOperation<BsonDocument>(

0 commit comments

Comments
 (0)