2121import com .azure .cosmos .implementation .routing .Range ;
2222import com .azure .cosmos .models .ModelBridgeInternal ;
2323import com .azure .cosmos .implementation .apachecommons .lang .StringUtils ;
24+ import org .slf4j .Logger ;
25+ import org .slf4j .LoggerFactory ;
2426import reactor .core .publisher .Flux ;
2527import reactor .core .publisher .Mono ;
2628
2729import java .time .Instant ;
2830import java .util .Collections ;
31+ import java .util .ConcurrentModificationException ;
2932import java .util .List ;
3033import java .util .Map ;
3134import java .util .UUID ;
35+ import java .util .concurrent .ConcurrentMap ;
3236
3337/**
3438 * While this class is public, but it is not part of our published public APIs.
3741public class DocumentQueryExecutionContextFactory {
3842
3943 private final static int PageSizeFactorForTop = 5 ;
40-
44+ private static final Logger logger = LoggerFactory .getLogger (DocumentQueryExecutionContextFactory .class );
45+ // Limiting cache size to 1000 for now. Can be updated in future based on need
46+ private static final int MAX_CACHE_SIZE = 1000 ;
4147 private static Mono <Utils .ValueHolder <DocumentCollection >> resolveCollection (DiagnosticsClientContext diagnosticsClientContext ,
4248 IDocumentQueryClient client ,
4349 ResourceType resourceTypeEnum ,
@@ -61,7 +67,8 @@ private static <T extends Resource> Mono<Pair<List<PartitionKeyRange>,QueryInfo>
6167 CosmosQueryRequestOptions cosmosQueryRequestOptions ,
6268 String resourceLink ,
6369 DocumentCollection collection ,
64- DefaultDocumentQueryExecutionContext <T > queryExecutionContext ) {
70+ DefaultDocumentQueryExecutionContext <T > queryExecutionContext , boolean queryPlanCachingEnabled ,
71+ ConcurrentMap <String , PartitionedQueryExecutionInfo > queryPlanCache ) {
6572
6673 // The partitionKeyRangeIdInternal is no more a public API on
6774 // FeedOptions, but have the below condition
@@ -78,49 +85,108 @@ private static <T extends Resource> Mono<Pair<List<PartitionKeyRange>,QueryInfo>
7885 }
7986
8087 Instant startTime = Instant .now ();
81- Mono <PartitionedQueryExecutionInfo > queryExecutionInfoMono =
82- QueryPlanRetriever
83- .getQueryPlanThroughGatewayAsync (diagnosticsClientContext , client , query , resourceLink );
88+ Mono <PartitionedQueryExecutionInfo > queryExecutionInfoMono ;
89+ if (queryPlanCachingEnabled &&
90+ isScopedToSinglePartition (cosmosQueryRequestOptions ) &&
91+ queryPlanCache .containsKey (query .getQueryText ())) {
92+ Instant endTime = Instant .now (); // endTime for query plan diagnostics
93+ PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = queryPlanCache .get (query .getQueryText ());
94+ if (partitionedQueryExecutionInfo != null ) {
95+ return getTargetRangesFromQueryPlan (cosmosQueryRequestOptions , collection , queryExecutionContext ,
96+ partitionedQueryExecutionInfo , startTime , endTime );
97+ }
98+ }
99+
100+ queryExecutionInfoMono = QueryPlanRetriever
101+ .getQueryPlanThroughGatewayAsync (diagnosticsClientContext , client , query ,
102+ resourceLink );
84103
85104 return queryExecutionInfoMono .flatMap (
86105 partitionedQueryExecutionInfo -> {
87106
88107 Instant endTime = Instant .now ();
89- QueryInfo queryInfo =
90- partitionedQueryExecutionInfo .getQueryInfo ();
91- queryInfo .setQueryPlanDiagnosticsContext (new QueryInfo .QueryPlanDiagnosticsContext (startTime , endTime ));
92-
93- List <Range <String >> queryRanges =
94- partitionedQueryExecutionInfo .getQueryRanges ();
95-
96- if (cosmosQueryRequestOptions != null
97- && cosmosQueryRequestOptions .getPartitionKey () != null
98- && cosmosQueryRequestOptions .getPartitionKey () != PartitionKey .NONE ) {
99- PartitionKeyInternal internalPartitionKey =
100- BridgeInternal .getPartitionKeyInternal (cosmosQueryRequestOptions .getPartitionKey ());
101- Range <String > range = Range
102- .getPointRange (internalPartitionKey
103- .getEffectivePartitionKeyString (internalPartitionKey , collection .getPartitionKey ()));
104- queryRanges = Collections .singletonList (range );
108+
109+ if (queryPlanCachingEnabled ) {
110+ tryCacheQueryPlan (query , partitionedQueryExecutionInfo , queryPlanCache );
105111 }
106- return
107- queryExecutionContext .getTargetPartitionKeyRanges (collection .getResourceId (), queryRanges )
108- .map (pkRanges -> Pair .of (
109- pkRanges ,
110- partitionedQueryExecutionInfo .getQueryInfo ()));
112+
113+ return getTargetRangesFromQueryPlan (cosmosQueryRequestOptions , collection , queryExecutionContext ,
114+ partitionedQueryExecutionInfo , startTime , endTime );
111115 });
112116 }
113117
118+ private static <T extends Resource > Mono <Pair <List <PartitionKeyRange >, QueryInfo >> getTargetRangesFromQueryPlan (
119+ CosmosQueryRequestOptions cosmosQueryRequestOptions , DocumentCollection collection ,
120+ DefaultDocumentQueryExecutionContext <T > queryExecutionContext ,
121+ PartitionedQueryExecutionInfo partitionedQueryExecutionInfo , Instant planFetchStartTime ,
122+ Instant planFetchEndTime ) {
123+ QueryInfo queryInfo =
124+ partitionedQueryExecutionInfo .getQueryInfo ();
125+ queryInfo .setQueryPlanDiagnosticsContext (new QueryInfo .QueryPlanDiagnosticsContext (planFetchStartTime ,
126+ planFetchEndTime ));
127+ List <Range <String >> queryRanges =
128+ partitionedQueryExecutionInfo .getQueryRanges ();
129+
130+ if (isScopedToSinglePartition (cosmosQueryRequestOptions )) {
131+ PartitionKeyInternal internalPartitionKey =
132+ BridgeInternal .getPartitionKeyInternal (cosmosQueryRequestOptions .getPartitionKey ());
133+ Range <String > range = Range
134+ .getPointRange (internalPartitionKey
135+ .getEffectivePartitionKeyString (internalPartitionKey ,
136+ collection
137+ .getPartitionKey ()));
138+ queryRanges = Collections .singletonList (range );
139+ }
140+ return
141+ queryExecutionContext .getTargetPartitionKeyRanges (collection .getResourceId (), queryRanges )
142+ .map (pkRanges -> Pair .of (
143+ pkRanges ,
144+ partitionedQueryExecutionInfo .getQueryInfo ()));
145+ }
146+
147+ private static void tryCacheQueryPlan (
148+ SqlQuerySpec query ,
149+ PartitionedQueryExecutionInfo partitionedQueryExecutionInfo ,
150+ ConcurrentMap <String , PartitionedQueryExecutionInfo > queryPlanCache ) {
151+ QueryInfo queryInfo = partitionedQueryExecutionInfo .getQueryInfo ();
152+ if (canCacheQuery (queryInfo ) && !queryPlanCache .containsKey (query .getQueryText ())) {
153+ if (queryPlanCache .size () > MAX_CACHE_SIZE ) {
154+ // Clearing query plan cache if size is above max size. This can be optimized in future by using
155+ // a threadsafe LRU cache
156+ queryPlanCache .clear ();
157+ }
158+ queryPlanCache .put (query .getQueryText (), partitionedQueryExecutionInfo );
159+ }
160+ }
161+
162+ private static boolean canCacheQuery (QueryInfo queryInfo ) {
163+ // Query plan will not be cached for the types below
164+ return !queryInfo .hasAggregates ()
165+ && !queryInfo .hasDistinct ()
166+ && !queryInfo .hasGroupBy ()
167+ && !queryInfo .hasLimit ()
168+ && !queryInfo .hasTop ()
169+ && !queryInfo .hasOffset ();
170+ }
171+
172+ private static boolean isScopedToSinglePartition (CosmosQueryRequestOptions cosmosQueryRequestOptions ) {
173+ return cosmosQueryRequestOptions != null
174+ && cosmosQueryRequestOptions .getPartitionKey () != null
175+ && cosmosQueryRequestOptions .getPartitionKey () != PartitionKey .NONE ;
176+ }
177+
114178 public static <T extends Resource > Flux <? extends IDocumentQueryExecutionContext <T >> createDocumentQueryExecutionContextAsync (
115- DiagnosticsClientContext diagnosticsClientContext ,
116- IDocumentQueryClient client ,
117- ResourceType resourceTypeEnum ,
118- Class <T > resourceType ,
119- SqlQuerySpec query ,
120- CosmosQueryRequestOptions cosmosQueryRequestOptions ,
121- String resourceLink ,
122- boolean isContinuationExpected ,
123- UUID correlatedActivityId ) {
179+ DiagnosticsClientContext diagnosticsClientContext ,
180+ IDocumentQueryClient client ,
181+ ResourceType resourceTypeEnum ,
182+ Class <T > resourceType ,
183+ SqlQuerySpec query ,
184+ CosmosQueryRequestOptions cosmosQueryRequestOptions ,
185+ String resourceLink ,
186+ boolean isContinuationExpected ,
187+ UUID correlatedActivityId ,
188+ boolean queryPlanCachingEnabled ,
189+ ConcurrentMap <String , PartitionedQueryExecutionInfo > queryPlanCache ) {
124190
125191 // return proxy
126192 Flux <Utils .ValueHolder <DocumentCollection >> collectionObs = Flux .just (new Utils .ValueHolder <>(null ));
@@ -146,12 +212,14 @@ public static <T extends Resource> Flux<? extends IDocumentQueryExecutionContext
146212
147213 return collectionObs .single ().flatMap (collectionValueHolder -> {
148214 Mono <Pair <List <PartitionKeyRange >, QueryInfo >> queryPlanTask = getPartitionKeyRangesAndQueryInfo (diagnosticsClientContext ,
149- client ,
150- query ,
151- cosmosQueryRequestOptions ,
152- resourceLink ,
153- collectionValueHolder .v ,
154- queryExecutionContext );
215+ client ,
216+ query ,
217+ cosmosQueryRequestOptions ,
218+ resourceLink ,
219+ collectionValueHolder .v ,
220+ queryExecutionContext ,
221+ queryPlanCachingEnabled ,
222+ queryPlanCache );
155223
156224 return queryPlanTask
157225 .flatMap (queryPlan -> createSpecializedDocumentQueryExecutionContextAsync (diagnosticsClientContext ,
0 commit comments