@@ -23,12 +23,16 @@ import com.mongodb.client.model.MapReduceAction
23
23
import com .mongodb .reactivestreams .client .MapReducePublisher
24
24
import org .mongodb .scala .bson .conversions .Bson
25
25
import org .mongodb .scala .model .Collation
26
+ import org .reactivestreams .Subscriber
26
27
27
28
import scala .concurrent .duration .Duration
28
29
29
30
/**
30
31
* Observable for map reduce.
31
32
*
33
+ * By default, the [[MapReduceObservable ]] produces the results inline. You can write map-reduce output to a collection by using the
34
+ * [[collectionName ]] and [[toCollection ]] methods.
35
+ *
32
36
* @define docsRef https://www.mongodb.com/docs/manual/reference
33
37
*
34
38
* @tparam TResult The type of the result.
@@ -44,6 +48,7 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
44
48
*
45
49
* @param collectionName the name of the collection that you want the map-reduce operation to write its output.
46
50
* @return this
51
+ * @see [[toCollection ]]
47
52
*/
48
53
def collectionName (collectionName : String ): MapReduceObservable [TResult ] = {
49
54
wrapped.collectionName(collectionName)
@@ -214,11 +219,14 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
214
219
}
215
220
216
221
/**
217
- * Aggregates documents to a collection according to the specified map-reduce function with the given options, which must specify a
218
- * non-inline result.
222
+ * Aggregates documents to a collection according to the specified map-reduce function with the given options, which must not produce
223
+ * results inline. Calling this method and then subscribing to the returned [[SingleObservable ]] is a preferred alternative to
224
+ * subscribing to this [[MapReduceObservable ]].
219
225
*
220
226
* @return an Observable that indicates when the operation has completed
221
227
* [[https://www.mongodb.com/docs/manual/aggregation/ Aggregation ]]
228
+ * @throws java.lang.IllegalStateException if a collection name to write the results to has not been specified
229
+ * @see [[collectionName ]]
222
230
*/
223
231
def toCollection (): SingleObservable [Unit ] = wrapped.toCollection()
224
232
@@ -246,5 +254,21 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
246
254
*/
247
255
def first (): SingleObservable [TResult ] = wrapped.first()
248
256
257
+ /**
258
+ * Requests [[MapReduceObservable ]] to start streaming data according to the specified map-reduce function with the given options.
259
+ *
260
+ * - If the aggregation produces results inline, then finds all documents in the
261
+ * affected namespace and produces them. You may want to use [[toCollection ]] instead.
262
+ * - Otherwise, produces no elements.
263
+ */
249
264
override def subscribe (observer : Observer [_ >: TResult ]): Unit = wrapped.subscribe(observer)
265
+
266
+ /**
267
+ * Requests [[MapReduceObservable ]] to start streaming data according to the specified map-reduce function with the given options.
268
+ *
269
+ * - If the aggregation produces results inline, then finds all documents in the
270
+ * affected namespace and produces them. You may want to use [[toCollection ]] instead.
271
+ * - Otherwise, produces no elements.
272
+ */
273
+ override def subscribe (observer : Subscriber [_ >: TResult ]): Unit = wrapped.subscribe(observer)
250
274
}
0 commit comments