|
14 | 14 | import com.azure.cosmos.implementation.RequestOptions; |
15 | 15 | import com.azure.cosmos.implementation.TracerProvider; |
16 | 16 | import com.azure.cosmos.implementation.Utils; |
| 17 | +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; |
| 18 | +import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; |
17 | 19 | import com.azure.cosmos.implementation.query.QueryInfo; |
18 | 20 | import com.azure.cosmos.models.CosmosConflictProperties; |
19 | 21 | import com.azure.cosmos.models.CosmosContainerProperties; |
|
28 | 30 | import com.azure.cosmos.models.SqlQuerySpec; |
29 | 31 | import com.azure.cosmos.models.ThroughputProperties; |
30 | 32 | import com.azure.cosmos.models.ThroughputResponse; |
| 33 | +import com.azure.cosmos.util.Beta; |
31 | 34 | import com.azure.cosmos.util.CosmosPagedFlux; |
32 | 35 | import com.azure.cosmos.util.UtilBridgeInternal; |
| 36 | + |
33 | 37 | import reactor.core.publisher.Flux; |
34 | 38 | import reactor.core.publisher.Mono; |
35 | 39 |
|
@@ -530,6 +534,102 @@ public <T> Mono<CosmosItemResponse<T>> readItem( |
530 | 534 | return withContext(context -> readItemInternal(itemId, requestOptions, itemType, context)); |
531 | 535 | } |
532 | 536 |
|
| 537 | + /** |
| 538 | + * Reads many documents. |
| 539 | + * |
| 540 | + * @param <T> the type parameter |
| 541 | + * @param itemKeyList document id and partition key pair that needs to be read |
| 542 | + * @param classType class type |
| 543 | + * @return a Mono with feed response of cosmos items |
| 544 | + */ |
| 545 | + @Beta(Beta.SinceVersion.V4_4_0) |
| 546 | + public <T> Mono<FeedResponse<T>> readMany( |
| 547 | + List<Pair<String, PartitionKey>> itemKeyList, |
| 548 | + Class<T> classType) { |
| 549 | + |
| 550 | + return this.readMany(itemKeyList, null, classType); |
| 551 | + } |
| 552 | + |
| 553 | + /** |
| 554 | + * Reads many documents. |
| 555 | + * |
| 556 | + * @param <T> the type parameter |
| 557 | + * @param itemKeyList document id and partition key pair that needs to be read |
| 558 | + * @param sessionToken the optional Session token - null if the read can be made without specific session token |
| 559 | + * @param classType class type |
| 560 | + * @return a Mono with feed response of cosmos items |
| 561 | + */ |
| 562 | + @Beta(Beta.SinceVersion.V4_4_0) |
| 563 | + public <T> Mono<FeedResponse<T>> readMany( |
| 564 | + List<Pair<String, PartitionKey>> itemKeyList, |
| 565 | + String sessionToken, |
| 566 | + Class<T> classType) { |
| 567 | + |
| 568 | + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); |
| 569 | + |
| 570 | + if (!StringUtils.isNotEmpty(sessionToken)) { |
| 571 | + options = options.setSessionToken(sessionToken); |
| 572 | + } |
| 573 | + |
| 574 | + options.setMaxDegreeOfParallelism(-1); |
| 575 | + return CosmosBridgeInternal |
| 576 | + .getAsyncDocumentClient(this.getDatabase()) |
| 577 | + .readMany(itemKeyList, BridgeInternal.getLink(this), options, classType); |
| 578 | + } |
| 579 | + |
| 580 | + /** |
| 581 | + * Reads all the items of a logical partition |
| 582 | + * <p> |
| 583 | + * After subscription the operation will be performed. The {@link CosmosPagedFlux} will |
| 584 | + * contain one or several feed responses of the read Cosmos items. In case of |
| 585 | + * failure the {@link CosmosPagedFlux} will error. |
| 586 | + * |
| 587 | + * @param <T> the type parameter. |
| 588 | + * @param partitionKey the partition key value of the documents that need to be read |
| 589 | + * @param classType the class type. |
| 590 | + * @return a {@link CosmosPagedFlux} containing one or several feed response pages |
| 591 | + * of the read Cosmos items or an error. |
| 592 | + */ |
| 593 | + @Beta(Beta.SinceVersion.V4_4_0) |
| 594 | + public <T> CosmosPagedFlux<T> readAllItems( |
| 595 | + PartitionKey partitionKey, |
| 596 | + Class<T> classType) { |
| 597 | + |
| 598 | + return this.readAllItems(partitionKey, new CosmosQueryRequestOptions(), classType); |
| 599 | + } |
| 600 | + |
| 601 | + /** |
| 602 | + * Reads all the items of a logical partition |
| 603 | + * <p> |
| 604 | + * After subscription the operation will be performed. The {@link CosmosPagedFlux} will |
| 605 | + * contain one or several feed responses of the read Cosmos items. In case of |
| 606 | + * failure the {@link CosmosPagedFlux} will error. |
| 607 | + * |
| 608 | + * @param <T> the type parameter. |
| 609 | + * @param partitionKey the partition key value of the documents that need to be read |
| 610 | + * @param options the feed options. |
| 611 | + * @param classType the class type. |
| 612 | + * @return a {@link CosmosPagedFlux} containing one or several feed response pages |
| 613 | + * of the read Cosmos items or an error. |
| 614 | + */ |
| 615 | + @Beta(Beta.SinceVersion.V4_4_0) |
| 616 | + public <T> CosmosPagedFlux<T> readAllItems( |
| 617 | + PartitionKey partitionKey, |
| 618 | + CosmosQueryRequestOptions options, |
| 619 | + Class<T> classType) { |
| 620 | + |
| 621 | + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { |
| 622 | + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), |
| 623 | + this.readAllItemsSpanName, |
| 624 | + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); |
| 625 | + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); |
| 626 | + return getDatabase() |
| 627 | + .getDocClientWrapper() |
| 628 | + .readAllDocuments(getLink(), partitionKey, options) |
| 629 | + .map(response -> prepareFeedResponse(response, classType)); |
| 630 | + }); |
| 631 | + } |
| 632 | + |
533 | 633 | /** |
534 | 634 | * Replaces an item with the passed in item. |
535 | 635 | * <p> |
|
0 commit comments