77import com .azure .cosmos .CosmosException ;
88import com .azure .cosmos .implementation .apachecommons .lang .StringUtils ;
99import com .azure .cosmos .implementation .caches .RxClientCollectionCache ;
10+ import com .azure .cosmos .implementation .caches .RxPartitionKeyRangeCache ;
1011import com .azure .cosmos .implementation .directconnectivity .DirectBridgeInternal ;
12+ import com .azure .cosmos .implementation .directconnectivity .GatewayServiceConfigurationReader ;
1113import com .azure .cosmos .implementation .directconnectivity .HttpUtils ;
14+ import com .azure .cosmos .implementation .directconnectivity .RequestHelper ;
1215import com .azure .cosmos .implementation .directconnectivity .StoreResponse ;
1316import com .azure .cosmos .implementation .directconnectivity .WebExceptionUtility ;
1417import com .azure .cosmos .implementation .http .HttpClient ;
1518import com .azure .cosmos .implementation .http .HttpHeaders ;
1619import com .azure .cosmos .implementation .http .HttpRequest ;
1720import com .azure .cosmos .implementation .http .HttpResponse ;
1821import com .azure .cosmos .implementation .http .ReactorNettyRequestRecord ;
22+ import com .azure .cosmos .implementation .routing .PartitionKeyInternal ;
23+ import com .azure .cosmos .implementation .routing .PartitionKeyInternalHelper ;
1924import com .azure .cosmos .implementation .throughputControl .ThroughputControlStore ;
2025import io .netty .handler .codec .http .HttpMethod ;
2126import io .netty .handler .codec .http .HttpResponseStatus ;
@@ -53,6 +58,9 @@ class RxGatewayStoreModel implements RxStoreModel {
5358 private ConsistencyLevel defaultConsistencyLevel ;
5459 private ISessionContainer sessionContainer ;
5560 private ThroughputControlStore throughputControlStore ;
61+ private boolean useMultipleWriteLocations ;
62+ private RxPartitionKeyRangeCache partitionKeyRangeCache ;
63+ private GatewayServiceConfigurationReader gatewayServiceConfigurationReader ;
5664 private RxClientCollectionCache collectionCache ;
5765
5866 public RxGatewayStoreModel (
@@ -94,7 +102,35 @@ public RxGatewayStoreModel(
94102 this .sessionContainer = sessionContainer ;
95103 }
96104
97- void setCollectionCache (RxClientCollectionCache collectionCache ) {
105+ void setGatewayServiceConfigurationReader (GatewayServiceConfigurationReader gatewayServiceConfigurationReader ) {
106+ this .gatewayServiceConfigurationReader = gatewayServiceConfigurationReader ;
107+ }
108+
109+ public void setPartitionKeyRangeCache (RxPartitionKeyRangeCache partitionKeyRangeCache ) {
110+ this .partitionKeyRangeCache = partitionKeyRangeCache ;
111+ }
112+
113+ public void setUseMultipleWriteLocations (boolean useMultipleWriteLocations ) {
114+ this .useMultipleWriteLocations = useMultipleWriteLocations ;
115+ }
116+
117+ boolean isUseMultipleWriteLocations () {
118+ return useMultipleWriteLocations ;
119+ }
120+
121+ RxPartitionKeyRangeCache getPartitionKeyRangeCache () {
122+ return partitionKeyRangeCache ;
123+ }
124+
125+ GatewayServiceConfigurationReader getGatewayServiceConfigurationReader () {
126+ return gatewayServiceConfigurationReader ;
127+ }
128+
129+ RxClientCollectionCache getCollectionCache () {
130+ return collectionCache ;
131+ }
132+
133+ public void setCollectionCache (RxClientCollectionCache collectionCache ) {
98134 this .collectionCache = collectionCache ;
99135 }
100136
@@ -473,11 +509,8 @@ public Mono<RxDocumentServiceResponse> processMessage(RxDocumentServiceRequest r
473509
474510 return Mono .error (dce );
475511 }
476- ).map (response ->
477- {
478- this .captureSessionToken (request , response .getResponseHeaders ());
479- return response ;
480- }
512+ ).flatMap (response ->
513+ this .captureSessionTokenAndHandlePartitionSplit (request , response .getResponseHeaders ()).then (Mono .just (response ))
481514 );
482515 }
483516
@@ -503,12 +536,29 @@ private void captureSessionToken(RxDocumentServiceRequest request, Map<String, S
503536 }
504537 }
505538
539+ private Mono <Void > captureSessionTokenAndHandlePartitionSplit (RxDocumentServiceRequest request ,
540+ Map <String , String > responseHeaders ) {
541+ this .captureSessionToken (request , responseHeaders );
542+ if (request .requestContext .resolvedPartitionKeyRange != null &&
543+ StringUtils .isNotEmpty (request .requestContext .resolvedCollectionRid ) &&
544+ StringUtils .isNotEmpty (responseHeaders .get (HttpConstants .HttpHeaders .PARTITION_KEY_RANGE_ID )) &&
545+ !responseHeaders .get (HttpConstants .HttpHeaders .PARTITION_KEY_RANGE_ID ).equals (request .requestContext .resolvedPartitionKeyRange .getId ())) {
546+ return this .partitionKeyRangeCache .refreshAsync (BridgeInternal .getMetaDataDiagnosticContext (request .requestContext .cosmosDiagnostics ), request .requestContext .resolvedCollectionRid )
547+ .flatMap (collectionRoutingMapValueHolder -> Mono .empty ());
548+ }
549+ return Mono .empty ();
550+ }
551+
506552 private Mono <Void > addIntendedCollectionRidAndSessionToken (RxDocumentServiceRequest request ) {
507- applySessionToken (request );
508- if (this .collectionCache != null && request .getResourceType ().equals (ResourceType .Document )) {
553+ return applySessionToken (request ).then (addIntendedCollectionRid (request ));
554+ }
555+
556+ private Mono <Void > addIntendedCollectionRid (RxDocumentServiceRequest request ) {
557+ if (this .collectionCache != null && request .getResourceType ().equals (ResourceType .Document )) {
509558 return this .collectionCache .resolveCollectionAsync (BridgeInternal .getMetaDataDiagnosticContext (request .requestContext .cosmosDiagnostics ), request ).flatMap (documentCollectionValueHolder -> {
510- if (StringUtils .isEmpty (request .getHeaders ().get (INTENDED_COLLECTION_RID_HEADER ))) {
511- request .getHeaders ().put (INTENDED_COLLECTION_RID_HEADER , request .requestContext .resolvedCollectionRid );
559+ if (StringUtils .isEmpty (request .getHeaders ().get (INTENDED_COLLECTION_RID_HEADER ))) {
560+ request .getHeaders ().put (INTENDED_COLLECTION_RID_HEADER ,
561+ request .requestContext .resolvedCollectionRid );
512562 } else {
513563 request .intendedCollectionRidPassedIntoSDK = true ;
514564 }
@@ -518,40 +568,105 @@ private Mono<Void> addIntendedCollectionRidAndSessionToken(RxDocumentServiceRequ
518568 return Mono .empty ();
519569 }
520570
521- private void applySessionToken (RxDocumentServiceRequest request ) {
571+ private Mono < Void > applySessionToken (RxDocumentServiceRequest request ) {
522572 Map <String , String > headers = request .getHeaders ();
523573 Objects .requireNonNull (headers , "RxDocumentServiceRequest::headers is required and cannot be null" );
524574
525- String requestConsistencyLevel = headers .get (HttpConstants .HttpHeaders .CONSISTENCY_LEVEL );
575+ // Master resource operations don't require session token.
576+ if (isMasterOperation (request .getResourceType (), request .getOperationType ())) {
577+ if (!Strings .isNullOrEmpty (request .getHeaders ().get (HttpConstants .HttpHeaders .SESSION_TOKEN ))) {
578+ request .getHeaders ().remove (HttpConstants .HttpHeaders .SESSION_TOKEN );
579+ }
580+ return Mono .empty ();
581+ }
526582
527- boolean sessionTokenApplicable =
528- Strings .areEqual (requestConsistencyLevel , ConsistencyLevel .SESSION .toString ()) ||
529- (this .defaultConsistencyLevel == ConsistencyLevel .SESSION &&
530- // skip applying the session token when Eventual Consistency is explicitly requested
531- // on request-level for data plane operations.
532- // The session token is ignored on teh backend/gateway in this case anyway
533- // and the session token can be rather large (even run in the 16 KB header length problem
534- // on the gateway - so not worth sending when not needed
535- (!request .isReadOnlyRequest () ||
536- request .getResourceType () != ResourceType .Document ||
537- !Strings .areEqual (requestConsistencyLevel , ConsistencyLevel .EVENTUAL .toString ())));
583+ boolean sessionConsistency = RequestHelper .getConsistencyLevelToUse (this .gatewayServiceConfigurationReader ,
584+ request ) == ConsistencyLevel .SESSION ;
538585
539586 if (!Strings .isNullOrEmpty (request .getHeaders ().get (HttpConstants .HttpHeaders .SESSION_TOKEN ))) {
540- if (!sessionTokenApplicable || isMasterOperation (request .getResourceType (), request .getOperationType ())) {
587+ if (!sessionConsistency ||
588+ (!request .isReadOnlyRequest () && request .getOperationType () != OperationType .Batch && !this .useMultipleWriteLocations )){
541589 request .getHeaders ().remove (HttpConstants .HttpHeaders .SESSION_TOKEN );
542590 }
543- return ; //User is explicitly controlling the session.
591+ return Mono . empty () ; //User is explicitly controlling the session.
544592 }
545593
546- if (!sessionTokenApplicable || isMasterOperation (request .getResourceType (), request .getOperationType ())) {
547- return ; // Only apply the session token in case of session consistency and when resource is not a master resource
594+ if (!sessionConsistency ||
595+ (!request .isReadOnlyRequest () && request .getOperationType () != OperationType .Batch && !this .useMultipleWriteLocations )) {
596+ return Mono .empty ();
597+ // Only apply the session token in case of session consistency and if request is read only,
598+ // apply token for write request only if batch operation or multi master
548599 }
549600
550- //Apply the ambient session.
551- String sessionToken = this .sessionContainer .resolveGlobalSessionToken (request );
601+ if (this .collectionCache != null && this .partitionKeyRangeCache != null ) {
602+ return this .collectionCache .resolveCollectionAsync (BridgeInternal .getMetaDataDiagnosticContext (request .requestContext .cosmosDiagnostics ), request ).
603+ flatMap (collectionValueHolder -> {
552604
553- if (!Strings .isNullOrEmpty (sessionToken )) {
554- headers .put (HttpConstants .HttpHeaders .SESSION_TOKEN , sessionToken );
605+ if (collectionValueHolder == null || collectionValueHolder .v == null ) {
606+ //Apply the ambient session.
607+ String sessionToken = this .sessionContainer .resolveGlobalSessionToken (request );
608+
609+ if (!Strings .isNullOrEmpty (sessionToken )) {
610+ headers .put (HttpConstants .HttpHeaders .SESSION_TOKEN , sessionToken );
611+ }
612+ return Mono .empty ();
613+ }
614+ return partitionKeyRangeCache .tryLookupAsync (BridgeInternal .getMetaDataDiagnosticContext (request .requestContext .cosmosDiagnostics ),
615+ collectionValueHolder .v .getResourceId (),
616+ null ,
617+ null ).flatMap (collectionRoutingMapValueHolder -> {
618+ if (collectionRoutingMapValueHolder == null || collectionRoutingMapValueHolder .v == null ) {
619+ //Apply the ambient session.
620+ String sessionToken = this .sessionContainer .resolveGlobalSessionToken (request );
621+
622+ if (!Strings .isNullOrEmpty (sessionToken )) {
623+ headers .put (HttpConstants .HttpHeaders .SESSION_TOKEN , sessionToken );
624+ }
625+ return Mono .empty ();
626+ }
627+ String partitionKeyRangeId =
628+ request .getHeaders ().get (HttpConstants .HttpHeaders .PARTITION_KEY_RANGE_ID );
629+ PartitionKeyInternal partitionKeyInternal = request .getPartitionKeyInternal ();
630+
631+ if (StringUtils .isNotEmpty (partitionKeyRangeId )) {
632+ PartitionKeyRange range =
633+ collectionRoutingMapValueHolder .v .getRangeByPartitionKeyRangeId (partitionKeyRangeId );
634+ request .requestContext .resolvedPartitionKeyRange = range ;
635+ if (request .requestContext .resolvedPartitionKeyRange == null ) {
636+ SessionTokenHelper .setPartitionLocalSessionToken (request , partitionKeyRangeId ,
637+ sessionContainer );
638+ } else {
639+ SessionTokenHelper .setPartitionLocalSessionToken (request , sessionContainer );
640+ }
641+ } else if (partitionKeyInternal != null ) {
642+ String effectivePartitionKeyString = PartitionKeyInternalHelper
643+ .getEffectivePartitionKeyString (
644+ partitionKeyInternal ,
645+ collectionValueHolder .v .getPartitionKey ());
646+ PartitionKeyRange range =
647+ collectionRoutingMapValueHolder .v .getRangeByEffectivePartitionKey (effectivePartitionKeyString );
648+ request .requestContext .resolvedPartitionKeyRange = range ;
649+ SessionTokenHelper .setPartitionLocalSessionToken (request , sessionContainer );
650+ } else {
651+ //Apply the ambient session.
652+ String sessionToken = this .sessionContainer .resolveGlobalSessionToken (request );
653+
654+ if (!Strings .isNullOrEmpty (sessionToken )) {
655+ headers .put (HttpConstants .HttpHeaders .SESSION_TOKEN , sessionToken );
656+ }
657+ }
658+
659+ return Mono .empty ();
660+ });
661+ });
662+ } else {
663+ //Apply the ambient session.
664+ String sessionToken = this .sessionContainer .resolveGlobalSessionToken (request );
665+
666+ if (!Strings .isNullOrEmpty (sessionToken )) {
667+ headers .put (HttpConstants .HttpHeaders .SESSION_TOKEN , sessionToken );
668+ }
669+ return Mono .empty ();
555670 }
556671 }
557672
0 commit comments