Skip to content

Improve the toCollection and related API documentation #1677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
public fun timeoutMode(timeoutMode: TimeoutMode): AggregateFlow<T> = apply { wrapped.timeoutMode(timeoutMode) }

/**
* Aggregates documents according to the specified aggregation pipeline, which must end with a $out or $merge stage.
* Aggregates documents according to the specified aggregation pipeline, which must end with an `$out` or `$merge`
* stage. Calling this method is a preferred alternative to consuming this [AggregateFlow].
*
* @throws IllegalStateException if the pipeline does not end with a $out or $merge stage
* @throws IllegalStateException if the pipeline does not end with an `$out` or `$merge` stage
* @see [$out stage](https://www.mongodb.com/docs/manual/reference/operator/aggregation/out/)
* @see [$merge stage](https://www.mongodb.com/docs/manual/reference/operator/aggregation/merge/)
*/
Expand Down Expand Up @@ -214,5 +215,11 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
public suspend inline fun <reified R : Any> explain(verbosity: ExplainVerbosity? = null): R =
explain(R::class.java, verbosity)

/**
* Requests [AggregateFlow] to start streaming data according to the specified aggregation pipeline.
* - If the aggregation pipeline ends with an `$out` or `$merge` stage, then finds all documents in the affected
* namespace and emits them. You may want to use [toCollection] instead.
* - Otherwise, emits no values.
*/
Comment on lines +218 to +223
Copy link
Collaborator

@katcharov katcharov Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to others that these docs and some others are new, and therefore non-trivial.

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import org.bson.conversions.Bson
/**
* Flow implementation for map reduce operations.
*
* By default, the [MapReduceFlow] emits the results inline. You can write map-reduce output to a collection by using
* the [collectionName] and [toCollection] methods.
*
* Note: Starting in MongoDB 5.0, map-reduce is deprecated, prefer Aggregation instead
*
* @param T The type of the result.
Expand Down Expand Up @@ -65,9 +68,10 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)

/**
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which
* must specify a non-inline result.
* must not emit results inline. Calling this method is a preferred alternative to consuming this [MapReduceFlow].
*
* @throws IllegalStateException if a collection name to write the results to has not been specified
* @see collectionName
*/
public suspend fun toCollection() {
wrapped.toCollection().awaitFirstOrNull()
Expand All @@ -80,6 +84,7 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)
*
* @param collectionName the name of the collection that you want the map-reduce operation to write its output.
* @return this
* @see toCollection
*/
public fun collectionName(collectionName: String): MapReduceFlow<T> = apply {
wrapped.collectionName(collectionName)
Expand Down Expand Up @@ -205,5 +210,12 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)
*/
public fun collation(collation: Collation?): MapReduceFlow<T> = apply { wrapped.collation(collation) }

/**
* Requests [MapReduceFlow] to start streaming data according to the specified map-reduce function with the given
* options.
* - If the aggregation produces results inline, then finds all documents in the affected namespace and emits them.
* You may want to use [toCollection] instead.
* - Otherwise, emits no values.
*/
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class AggregateFlowTest {

@Test
fun shouldHaveTheSameMethods() {
val jAggregatePublisherFunctions = AggregatePublisher::class.declaredFunctions.map { it.name }.toSet() - "first"
val jAggregatePublisherFunctions =
AggregatePublisher::class.declaredFunctions.map { it.name }.toSet() - "first" - "subscribe"
val kAggregateFlowFunctions = AggregateFlow::class.declaredFunctions.map { it.name }.toSet() - "collect"

assertEquals(jAggregatePublisherFunctions, kAggregateFlowFunctions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import reactor.core.publisher.Mono
class MapReduceFlowTest {
@Test
fun shouldHaveTheSameMethods() {
val jMapReducePublisherFunctions = MapReducePublisher::class.declaredFunctions.map { it.name }.toSet() - "first"
val jMapReducePublisherFunctions =
MapReducePublisher::class.declaredFunctions.map { it.name }.toSet() - "first" - "subscribe"
val kMapReduceFlowFunctions = MapReduceFlow::class.declaredFunctions.map { it.name }.toSet() - "collect"

assertEquals(jMapReducePublisherFunctions, kMapReduceFlowFunctions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,23 @@ public class AggregateIterable<T : Any>(private val wrapped: JAggregateIterable<
}

/**
* Aggregates documents according to the specified aggregation pipeline, which must end with a $out or $merge stage.
* Aggregates documents according to the specified aggregation pipeline, which must end with an `$out` or `$merge`
* stage. This method is the preferred alternative to [cursor].
*
* @throws IllegalStateException if the pipeline does not end with a $out or $merge stage
* @throws IllegalStateException if the pipeline does not end with an `$out` or `$merge` stage
* @see [$out stage](https://www.mongodb.com/docs/manual/reference/operator/aggregation/out/)
* @see [$merge stage](https://www.mongodb.com/docs/manual/reference/operator/aggregation/merge/)
*/
public fun toCollection(): Unit = wrapped.toCollection()

/**
* Aggregates documents according to the specified aggregation pipeline.
* - If the aggregation pipeline ends with an `$out` or `$merge` stage, then finds all documents in the affected
* namespace and returns a [MongoCursor] over them. You may want to use [toCollection] instead.
* - Otherwise, returns a [MongoCursor] producing no elements.
*/
public override fun cursor(): MongoCursor<T> = super.cursor()

/**
* Enables writing to temporary files. A null value indicates that it's unspecified.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class AggregateIterableTest {

@Test
fun shouldHaveTheSameMethods() {
val jAggregateIterableFunctions = JAggregateIterable::class.declaredFunctions.map { it.name }.toSet()
val jAggregateIterableFunctions =
JAggregateIterable::class.declaredFunctions.map { it.name }.toSet() - "iterator"
val kAggregateIterableFunctions = AggregateIterable::class.declaredFunctions.map { it.name }.toSet()

assertEquals(jAggregateIterableFunctions, kAggregateIterableFunctions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
package com.mongodb.reactivestreams.client;

import com.mongodb.ExplainVerbosity;
import com.mongodb.MongoNamespace;
import com.mongodb.annotations.Alpha;
import com.mongodb.annotations.Reason;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.MergeOptions;
import com.mongodb.lang.Nullable;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -83,13 +87,31 @@ public interface AggregatePublisher<TResult> extends Publisher<TResult> {
AggregatePublisher<TResult> bypassDocumentValidation(@Nullable Boolean bypassDocumentValidation);

/**
* Aggregates documents according to the specified aggregation pipeline, which must end with a $out stage.
* Aggregates documents according to the specified aggregation pipeline, which must end with an
* {@link Aggregates#out(String, String) $out} or {@link Aggregates#merge(MongoNamespace, MergeOptions) $merge} stage.
* Calling this method and then {@linkplain Publisher#subscribe(Subscriber) subscribing} to the returned {@link Publisher}
* is a preferred alternative to {@linkplain #subscribe(Subscriber) subscribing} to this {@link AggregatePublisher}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we expand on why this is currently a preferred alternative? IMO It's unclear to the end user...

Maybe mention that in the next major we will remove the implicit find/cursor we return currently and instead relies on explicit call to toCollection if we want to consume the result of the aggregate?

*
* @throws IllegalStateException if the pipeline does not end with an {@code $out} or {@code $merge} stage
* @return an empty publisher that indicates when the operation has completed
* @mongodb.driver.manual aggregation/ Aggregation
*/
Publisher<Void> toCollection();

/**
* Requests {@link AggregatePublisher} to start streaming data according to the specified aggregation pipeline.
* <ul>
* <li>
* If the aggregation pipeline ends with an {@link Aggregates#out(String, String) $out} or
* {@link Aggregates#merge(MongoNamespace, MergeOptions) $merge} stage,
* then {@linkplain MongoCollection#find() finds all} documents in the affected namespace and produces them.
* You may want to use {@link #toCollection()} instead.</li>
* <li>
* Otherwise, produces no elements.</li>
* </ul>
*/
void subscribe(Subscriber<? super TResult> s);

/**
* Sets the collation options
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@

package com.mongodb.reactivestreams.client;


import com.mongodb.annotations.Alpha;
import com.mongodb.annotations.Reason;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.Collation;
import com.mongodb.lang.Nullable;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import java.util.concurrent.TimeUnit;

/**
* Publisher for map reduce.
* <p>
* By default, the {@code MapReducePublisher} produces the results inline. You can write map-reduce output to a collection by using the
* {@link #collectionName(String)} and {@link #toCollection()} methods.</p>
*
* @param <TResult> The type of the result.
* @since 1.0
Expand All @@ -44,6 +47,7 @@ public interface MapReducePublisher<TResult> extends Publisher<TResult> {
*
* @param collectionName the name of the collection that you want the map-reduce operation to write its output.
* @return this
* @see #toCollection()
*/
MapReducePublisher<TResult> collectionName(String collectionName);

Expand Down Expand Up @@ -152,14 +156,29 @@ public interface MapReducePublisher<TResult> extends Publisher<TResult> {
MapReducePublisher<TResult> bypassDocumentValidation(@Nullable Boolean bypassDocumentValidation);

/**
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which must specify a
* non-inline result.
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which must not produce
* results inline. Calling this method and then {@linkplain Publisher#subscribe(Subscriber) subscribing} to the returned
* {@link Publisher} is a preferred alternative to {@linkplain #subscribe(Subscriber) subscribing} to this {@link MapReducePublisher}.
*
* @return an empty publisher that indicates when the operation has completed
* @throws IllegalStateException if a {@linkplain #collectionName(String) collection name} to write the results to has not been specified
* @see #collectionName(String)
* @mongodb.driver.manual aggregation/ Aggregation
*/
Publisher<Void> toCollection();

/**
* Requests {@link MapReducePublisher} to start streaming data according to the specified map-reduce function with the given options.
* <ul>
* <li>
* If the aggregation produces results inline, then {@linkplain MongoCollection#find() finds all} documents in the
* affected namespace and produces them. You may want to use {@link #toCollection()} instead.</li>
* <li>
* Otherwise, produces no elements.</li>
* </ul>
*/
void subscribe(Subscriber<? super TResult> s);

/**
* Sets the collation options
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.mongodb.scala.bson.BsonValue
import org.mongodb.scala.bson.DefaultHelper.DefaultsTo
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Collation
import org.reactivestreams.Subscriber

import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
Expand Down Expand Up @@ -192,9 +193,13 @@ case class AggregateObservable[TResult](private val wrapped: AggregatePublisher[
}

/**
* Aggregates documents according to the specified aggregation pipeline, which must end with a `\$out` stage.
* Aggregates documents according to the specified aggregation pipeline, which must end with an `\$out` or `\$merge` stage.
* Calling this method and then `subscribing` to the returned [[SingleObservable]]
* is a preferred alternative to subscribing to this [[AggregateObservable]].
*
* [[https://www.mongodb.com/docs/manual/aggregation/ Aggregation]]
*
* @throws java.lang.IllegalStateException if the pipeline does not end with an `\$out` or `\$merge` stage
* @return an Observable that indicates when the operation has completed.
*/
def toCollection(): SingleObservable[Unit] = wrapped.toCollection()
Expand Down Expand Up @@ -257,5 +262,23 @@ case class AggregateObservable[TResult](private val wrapped: AggregatePublisher[
)(implicit e: ExplainResult DefaultsTo Document, ct: ClassTag[ExplainResult]): SingleObservable[ExplainResult] =
wrapped.explain[ExplainResult](ct, verbosity)

/**
* Requests [[AggregateObservable]] to start streaming data according to the specified aggregation pipeline.
*
* - If the aggregation pipeline ends with an `\$out` or `\$merge` stage,
* then finds all documents in the affected namespace and produces them.
* You may want to use [[toCollection]] instead.
* - Otherwise, produces no elements.
*/
override def subscribe(observer: Observer[_ >: TResult]): Unit = wrapped.subscribe(observer)

/**
* Requests [[AggregateObservable]] to start streaming data according to the specified aggregation pipeline.
*
* - If the aggregation pipeline ends with an `\$out` or `\$merge` stage,
* then finds all documents in the affected namespace and produces them.
* You may want to use [[toCollection]] instead.
* - Otherwise, produces no elements.
*/
override def subscribe(observer: Subscriber[_ >: TResult]): Unit = wrapped.subscribe(observer)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ import com.mongodb.client.model.MapReduceAction
import com.mongodb.reactivestreams.client.MapReducePublisher
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Collation
import org.reactivestreams.Subscriber

import scala.concurrent.duration.Duration

/**
* Observable for map reduce.
*
* By default, the [[MapReduceObservable]] produces the results inline. You can write map-reduce output to a collection by using the
* [[collectionName]] and [[toCollection]] methods.
*
* @define docsRef https://www.mongodb.com/docs/manual/reference
*
* @tparam TResult The type of the result.
Expand All @@ -44,6 +48,7 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
*
* @param collectionName the name of the collection that you want the map-reduce operation to write its output.
* @return this
* @see [[toCollection]]
*/
def collectionName(collectionName: String): MapReduceObservable[TResult] = {
wrapped.collectionName(collectionName)
Expand Down Expand Up @@ -214,11 +219,14 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
}

/**
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which must specify a
* non-inline result.
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which must not produce
* results inline. Calling this method and then subscribing to the returned [[SingleObservable]] is a preferred alternative to
* subscribing to this [[MapReduceObservable]].
*
* @return an Observable that indicates when the operation has completed
* [[https://www.mongodb.com/docs/manual/aggregation/ Aggregation]]
* @throws java.lang.IllegalStateException if a collection name to write the results to has not been specified
* @see [[collectionName]]
*/
def toCollection(): SingleObservable[Unit] = wrapped.toCollection()

Expand Down Expand Up @@ -246,5 +254,21 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
*/
def first(): SingleObservable[TResult] = wrapped.first()

/**
* Requests [[MapReduceObservable]] to start streaming data according to the specified map-reduce function with the given options.
*
* - If the aggregation produces results inline, then finds all documents in the
* affected namespace and produces them. You may want to use [[toCollection]] instead.
* - Otherwise, produces no elements.
*/
override def subscribe(observer: Observer[_ >: TResult]): Unit = wrapped.subscribe(observer)

/**
* Requests [[MapReduceObservable]] to start streaming data according to the specified map-reduce function with the given options.
*
* - If the aggregation produces results inline, then finds all documents in the
* affected namespace and produces them. You may want to use [[toCollection]] instead.
* - Otherwise, produces no elements.
*/
override def subscribe(observer: Subscriber[_ >: TResult]): Unit = wrapped.subscribe(observer)
}
Loading