Skip to content

Commit c48532a

Browse files
Container Initialization for caches and connections (Azure#20075)
* starting client warmup code * moving warm up to container * adding strong query * adding test case * removing unwanted code * removing unwanted code * resolving comments * resolving comments * fix build error * removing atomic ref * moving sqlQuerySpec above the for loop * moving flux list inside mono * remove strong consistency * merge with master and adding check for multiple init call on container * updating documentation
1 parent 6939924 commit c48532a

File tree

5 files changed

+392
-5
lines changed

5 files changed

+392
-5
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,24 @@
4343
import com.azure.cosmos.models.FeedResponse;
4444
import com.azure.cosmos.models.ModelBridgeInternal;
4545
import com.azure.cosmos.models.PartitionKey;
46+
import com.azure.cosmos.models.SqlParameter;
4647
import com.azure.cosmos.models.SqlQuerySpec;
4748
import com.azure.cosmos.models.ThroughputProperties;
4849
import com.azure.cosmos.models.ThroughputResponse;
4950
import com.azure.cosmos.util.Beta;
5051
import com.azure.cosmos.util.CosmosPagedFlux;
5152
import com.azure.cosmos.util.UtilBridgeInternal;
53+
import com.fasterxml.jackson.databind.node.ObjectNode;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
5256
import reactor.core.publisher.Flux;
5357
import reactor.core.publisher.Mono;
5458

59+
import java.util.ArrayList;
60+
import java.util.Collections;
5561
import java.util.List;
62+
import java.util.UUID;
63+
import java.util.concurrent.atomic.AtomicBoolean;
5664
import java.util.function.Function;
5765
import java.util.stream.Collectors;
5866

@@ -67,6 +75,8 @@
6775
*/
6876
public class CosmosAsyncContainer {
6977

78+
private final static Logger logger = LoggerFactory.getLogger(CosmosAsyncContainer.class);
79+
7080
private final CosmosAsyncDatabase database;
7181
private final String id;
7282
private final String link;
@@ -87,6 +97,7 @@ public class CosmosAsyncContainer {
8797
private final String readAllConflictsSpanName;
8898
private final String queryConflictsSpanName;
8999
private final String batchSpanName;
100+
private final AtomicBoolean isInitialized;
90101
private CosmosAsyncScripts scripts;
91102

92103
CosmosAsyncContainer(String id, CosmosAsyncDatabase database) {
@@ -110,6 +121,7 @@ public class CosmosAsyncContainer {
110121
this.readAllConflictsSpanName = "readAllConflicts." + this.id;
111122
this.queryConflictsSpanName = "queryConflicts." + this.id;
112123
this.batchSpanName = "transactionalBatch." + this.id;
124+
this.isInitialized = new AtomicBoolean(false);
113125
}
114126

115127
/**
@@ -413,6 +425,47 @@ public <T> CosmosPagedFlux<T> queryItems(String query, Class<T> classType) {
413425
return queryItemsInternal(new SqlQuerySpec(query), new CosmosQueryRequestOptions(), classType);
414426
}
415427

428+
/**
429+
* Initializes the container by warming up the caches and connections for the current read region.
430+
*
431+
* <p><br>The execution of this method is expected to result in some RU charges to your account.
432+
* The number of RU consumed by this request varies, depending on data consistency, size of the overall data in the container,
433+
* item indexing, number of projections. For more information regarding RU considerations please visit
434+
* <a href="https://docs.microsoft.com/en-us/azure/cosmos-db/request-units#request-unit-considerations">https://docs.microsoft.com/en-us/azure/cosmos-db/request-units#request-unit-considerations</a>.
435+
* </p>
436+
*
437+
* <p>
438+
* <br>NOTE: This API ideally should be called only once during application initialization before any workload.
439+
* <br>In case of any transient error, caller should consume the error and continue the regular workload.
440+
* </p>
441+
*
442+
* @return Mono of Void
443+
*/
444+
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
445+
public Mono<Void> openConnectionsAndInitCaches() {
446+
if(isInitialized.compareAndSet(false, true)) {
447+
return this.getFeedRanges().flatMap(feedRanges -> {
448+
List<Flux<FeedResponse<ObjectNode>>> fluxList = new ArrayList<>();
449+
SqlQuerySpec querySpec = new SqlQuerySpec();
450+
querySpec.setQueryText("select * from c where c.id = @id");
451+
querySpec.setParameters(Collections.singletonList(new SqlParameter("@id",
452+
UUID.randomUUID().toString())));
453+
for (FeedRange feedRange : feedRanges) {
454+
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
455+
options.setFeedRange(feedRange);
456+
CosmosPagedFlux<ObjectNode> cosmosPagedFlux = this.queryItems(querySpec, options,
457+
ObjectNode.class);
458+
fluxList.add(cosmosPagedFlux.byPage());
459+
}
460+
Mono<List<FeedResponse<ObjectNode>>> listMono = Flux.merge(fluxList).collectList();
461+
return listMono.flatMap(objects -> Mono.empty());
462+
});
463+
} else {
464+
logger.warn("openConnectionsAndInitCaches is already called once on Container {}, no operation will take place in this call", this.getId());
465+
return Mono.empty();
466+
}
467+
}
468+
416469
/**
417470
* Query for items in the current container using a string.
418471
* <p>

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55

66
import com.azure.cosmos.implementation.throughputControl.config.GlobalThroughputControlGroup;
77
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
8-
import com.azure.cosmos.models.CosmosItemIdentity;
9-
import com.azure.cosmos.models.CosmosItemResponse;
108
import com.azure.cosmos.models.CosmosContainerProperties;
119
import com.azure.cosmos.models.CosmosContainerRequestOptions;
1210
import com.azure.cosmos.models.CosmosContainerResponse;
11+
import com.azure.cosmos.models.CosmosItemIdentity;
1312
import com.azure.cosmos.models.CosmosItemRequestOptions;
13+
import com.azure.cosmos.models.CosmosItemResponse;
1414
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
1515
import com.azure.cosmos.models.CosmosQueryRequestOptions;
1616
import com.azure.cosmos.models.FeedRange;
@@ -749,4 +749,37 @@ public void enableLocalThroughputControlGroup(ThroughputControlGroupConfig group
749749
public void enableGlobalThroughputControlGroup(ThroughputControlGroupConfig groupConfig, GlobalThroughputControlConfig globalControlConfig) {
750750
this.asyncContainer.enableGlobalThroughputControlGroup(groupConfig, globalControlConfig);
751751
}
752+
753+
/**
754+
* Initializes the container by warming up the caches and connections for the current read region.
755+
*
756+
* <p><br>The execution of this method is expected to result in some RU charges to your account.
757+
* The number of RU consumed by this request varies, depending on data consistency, size of the overall data in the container,
758+
* item indexing, number of projections. For more information regarding RU considerations please visit
759+
* <a href="https://docs.microsoft.com/en-us/azure/cosmos-db/request-units#request-unit-considerations">https://docs.microsoft.com/en-us/azure/cosmos-db/request-units#request-unit-considerations</a>.
760+
* </p>
761+
*
762+
* <p>
763+
* <br>NOTE: This API ideally should be called only once during application initialization before any workload.
764+
* <br>In case of any transient error, caller should consume the error and continue the regular workload.
765+
* </p>
766+
*
767+
*/
768+
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
769+
public void openConnectionsAndInitCaches() {
770+
blockVoidResponse(this.asyncContainer.openConnectionsAndInitCaches());
771+
}
772+
773+
private void blockVoidResponse(Mono<Void> voidMono) {
774+
try {
775+
voidMono.block();
776+
} catch (Exception ex) {
777+
final Throwable throwable = Exceptions.unwrap(ex);
778+
if (throwable instanceof CosmosException) {
779+
throw (CosmosException) throwable;
780+
} else {
781+
throw Exceptions.propagate(ex);
782+
}
783+
}
784+
}
752785
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public enum SinceVersion {
5858
/** v4.12.0 */
5959
V4_12_0,
6060
/** v4.13.0 */
61-
V4_13_0
61+
V4_13_0,
62+
/** v4.14.0 */
63+
V4_14_0
6264
}
6365
}

0 commit comments

Comments
 (0)