Skip to content

Commit 4083a32

Browse files
simorenohgahl-levykushagraThapar
authored
[Cosmos] Add max_integrated_cache_staleness param option to item methods (Azure#22946)
* consistency level gets set to default found in database account * async client default change * updated docs based on finding and updated samples to reflect best practices * Update CHANGELOG.md * Update README.md * Update README.md * Update README.md * Update CHANGELOG.md * formatting * formatting * updated consistency for first request to Eventual (lowest latency) * pylint * from_connection_string methods * from_connection_string2 * Update sdk/cosmos/azure-cosmos/README.md Co-authored-by: Gahl Levy <[email protected]> * Apply suggestions from code review Co-authored-by: Gahl Levy <[email protected]> * Update README.md * removed forceful header usage, changed setup to only check for Session consistency to start client session * need to set header if Session consistency for updating session if needed (thanks Jake!) * Apply suggestions from code review Kushagra improved documentation and comments Co-authored-by: Kushagra Thapar <[email protected]> * added test for session token * Update CHANGELOG.md * Update _cosmos_client_connection_async.py * added max_integrated_cache_staleness to item methods in containers * added validation and provisional comments * pylint * only applied to read-only operations * Update container.py * Update CHANGELOG.md * Apply suggestions from code review Co-authored-by: Kushagra Thapar <[email protected]> * Update _base.py * updated param comments to mention integrated cache configuration * moved to kwargs * added tests to verify functionality * Update test_integrated_cache.py * Update test_integrated_cache.py * updates to test to ensure it works with setup * added headers test and new way to track client headers before sending out these changes will also likely be used for creating the diagnostics later on * Update test_integrated_cache.py * Create test_axq.py * Added mocking tests for max integrated cache staleness. Fixed issue with int value being false * upgrade version for release Co-authored-by: Gahl Levy <[email protected]> Co-authored-by: Kushagra Thapar <[email protected]> Co-authored-by: Kushagra Thapar <[email protected]>
1 parent 25479bb commit 4083a32

File tree

13 files changed

+337
-89
lines changed

13 files changed

+337
-89
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@
1212
> for more details on consistency levels, or the README section on this change [here](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos#note-on-client-consistency-levels).
1313
1414
#### Features Added
15-
- Added support for split-proof queries for the async client.
15+
- Added new **provisional** `max_integrated_cache_staleness_in_ms` parameter to read item and query items APIs in order
16+
to make use of the **preview** CosmosDB integrated cache functionality.
17+
Please see [Azure Cosmos DB integrated cache](https://docs.microsoft.com/azure/cosmos-db/integrated-cache) for more details.
18+
- Added support for split-proof queries for the async client
1619

1720
### Bugs fixed
18-
- Default consistency level for the sync and async clients is no longer "Session" and will instead be set to the
19-
consistency level of the user's cosmos account setting on initialization if not passed during client initialization.
20-
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`.
21-
Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
21+
- Default consistency level for the sync and async clients is no longer `Session` and will instead be set to the
22+
consistency level of the user's cosmos account setting on initialization if not passed during client initialization.
23+
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency
24+
will need to pass it explicitly if their account consistency is different than `Session`.
25+
Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
2226
- Fixed invalid request body being sent when passing in `serverScript` body parameter to replace operations for trigger, sproc and udf resources.
2327
- Moved `is_system_key` logic in async client.
2428
- Fixed TypeErrors not being thrown when passing in invalid connection retry policies to the client.

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
'query_version': 'queryVersion'
7272
}
7373

74+
7475
def _get_match_headers(kwargs):
7576
# type: (Dict[str, Any]) -> Tuple(Optional[str], Optional[str])
7677
if_match = kwargs.pop('if_match', None)
@@ -112,14 +113,14 @@ def build_options(kwargs):
112113

113114

114115
def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
115-
cosmos_client_connection,
116-
default_headers,
117-
verb,
118-
path,
119-
resource_id,
120-
resource_type,
121-
options,
122-
partition_key_range_id=None,
116+
cosmos_client_connection,
117+
default_headers,
118+
verb,
119+
path,
120+
resource_id,
121+
resource_type,
122+
options,
123+
partition_key_range_id=None,
123124
):
124125
"""Gets HTTP request headers.
125126
@@ -292,6 +293,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
292293
if options.get("populateQuotaInfo"):
293294
headers[http_constants.HttpHeaders.PopulateQuotaInfo] = options["populateQuotaInfo"]
294295

296+
if options.get("maxIntegratedCacheStaleness"):
297+
headers[http_constants.HttpHeaders.DedicatedGatewayCacheStaleness] = options["maxIntegratedCacheStaleness"]
298+
295299
return headers
296300

297301

@@ -638,7 +642,7 @@ def ParsePaths(paths):
638642
newIndex += 1
639643

640644
# This will extract the token excluding the quote chars
641-
token = path[currentIndex + 1 : newIndex]
645+
token = path[currentIndex + 1: newIndex]
642646
tokens.append(token)
643647
currentIndex = newIndex + 1
644648
else:
@@ -657,3 +661,9 @@ def ParsePaths(paths):
657661
tokens.append(token)
658662

659663
return tokens
664+
665+
666+
def validate_cache_staleness_value(max_integrated_cache_staleness):
667+
int(max_integrated_cache_staleness) # Will throw error if data type cant be converted to int
668+
if max_integrated_cache_staleness <= 0:
669+
raise ValueError("Parameter 'max_integrated_cache_staleness_in_ms' can only be a positive integer.")

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def __init__(
141141
http_constants.HttpHeaders.IsContinuationExpected: False,
142142
}
143143

144-
# Keeps the latest response headers from server.
144+
# Keeps the latest response headers from the server.
145145
self.last_response_headers = None
146146

147147
self._useMultipleWriteLocations = False

sdk/cosmos/azure-cosmos/azure/cosmos/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@
1919
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2020
# SOFTWARE.
2121

22-
VERSION = "4.3.0b2"
22+
VERSION = "4.3.0b3"

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def __init__(
141141
if consistency_level is not None:
142142
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
143143

144-
# Keeps the latest response headers from server.
144+
# Keeps the latest response headers from the server.
145145
self.last_response_headers = None
146146

147147
self._useMultipleWriteLocations = False

sdk/cosmos/azure-cosmos/azure/cosmos/aio/container.py

Lines changed: 88 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from azure.core.tracing.decorator_async import distributed_trace_async # type: ignore
3030

3131
from ._cosmos_client_connection_async import CosmosClientConnection
32-
from .._base import build_options as _build_options
32+
from .._base import build_options as _build_options, validate_cache_staleness_value
3333
from ..exceptions import CosmosResourceNotFoundError
3434
from ..http_constants import StatusCodes
3535
from ..offer import Offer
@@ -38,6 +38,7 @@
3838

3939
__all__ = ("ContainerProxy",)
4040

41+
4142
# pylint: disable=protected-access
4243
# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs
4344

@@ -112,10 +113,10 @@ async def _set_partition_key(self, partition_key):
112113

113114
@distributed_trace_async
114115
async def read(
115-
self,
116-
populate_partition_key_range_statistics=None, # type: Optional[bool]
117-
populate_quota_info=None, # type: Optional[bool]
118-
**kwargs # type: Any
116+
self,
117+
populate_partition_key_range_statistics=None, # type: Optional[bool]
118+
populate_quota_info=None, # type: Optional[bool]
119+
**kwargs # type: Any
119120
):
120121
# type: (...) -> Dict[str, Any]
121122
"""Read the container properties.
@@ -150,9 +151,9 @@ async def read(
150151

151152
@distributed_trace_async
152153
async def create_item(
153-
self,
154-
body, # type: Dict[str, Any]
155-
**kwargs # type: Any
154+
self,
155+
body, # type: Dict[str, Any]
156+
**kwargs # type: Any
156157
):
157158
# type: (...) -> Dict[str, Any]
158159
"""Create an item in the container.
@@ -198,10 +199,10 @@ async def create_item(
198199

199200
@distributed_trace_async
200201
async def read_item(
201-
self,
202-
item, # type: Union[str, Dict[str, Any]]
203-
partition_key, # type: Any
204-
**kwargs # type: Any
202+
self,
203+
item, # type: Union[str, Dict[str, Any]]
204+
partition_key, # type: Any
205+
**kwargs # type: Any
205206
):
206207
# type: (...) -> Dict[str, Any]
207208
"""Get the item identified by `item`.
@@ -211,6 +212,11 @@ async def read_item(
211212
:keyword str session_token: Token for use with Session consistency.
212213
:keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request.
213214
:keyword Callable response_hook: A callable invoked with the response metadata.
215+
**Provisional** keyword argument max_integrated_cache_staleness_in_ms
216+
:keyword int max_integrated_cache_staleness_in_ms:
217+
The max cache staleness for the integrated cache in milliseconds.
218+
For accounts configured to use the integrated cache, using Session or Eventual consistency,
219+
responses are guaranteed to be no staler than this value.
214220
:returns: Dict representing the item to be retrieved.
215221
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved.
216222
:rtype: dict[str, Any]
@@ -230,6 +236,10 @@ async def read_item(
230236
response_hook = kwargs.pop('response_hook', None)
231237
if partition_key is not None:
232238
request_options["partitionKey"] = await self._set_partition_key(partition_key)
239+
max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None)
240+
if max_integrated_cache_staleness_in_ms is not None:
241+
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
242+
request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms
233243

234244
result = await self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs)
235245
if response_hook:
@@ -238,9 +248,9 @@ async def read_item(
238248

239249
@distributed_trace
240250
def read_all_items(
241-
self,
242-
max_item_count=None, # type: Optional[int]
243-
**kwargs # type: Any
251+
self,
252+
max_item_count=None, # type: Optional[int]
253+
**kwargs # type: Any
244254
):
245255
# type: (...) -> AsyncItemPaged[Dict[str, Any]]
246256
"""List all the items in the container.
@@ -249,13 +259,22 @@ def read_all_items(
249259
:keyword str session_token: Token for use with Session consistency.
250260
:keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request.
251261
:keyword Callable response_hook: A callable invoked with the response metadata.
262+
**Provisional** keyword argument max_integrated_cache_staleness_in_ms
263+
:keyword int max_integrated_cache_staleness_in_ms:
264+
The max cache staleness for the integrated cache in milliseconds.
265+
For accounts configured to use the integrated cache, using Session or Eventual consistency,
266+
responses are guaranteed to be no staler than this value.
252267
:returns: An AsyncItemPaged of items (dicts).
253268
:rtype: AsyncItemPaged[Dict[str, Any]]
254269
"""
255270
feed_options = _build_options(kwargs)
256271
response_hook = kwargs.pop('response_hook', None)
257272
if max_item_count is not None:
258273
feed_options["maxItemCount"] = max_item_count
274+
max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None)
275+
if max_integrated_cache_staleness_in_ms:
276+
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
277+
feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms
259278

260279
if hasattr(response_hook, "clear"):
261280
response_hook.clear()
@@ -269,14 +288,14 @@ def read_all_items(
269288

270289
@distributed_trace
271290
def query_items(
272-
self,
273-
query, # type: str
274-
parameters=None, # type: Optional[List[Dict[str, Any]]]
275-
partition_key=None, # type: Optional[Any]
276-
max_item_count=None, # type: Optional[int]
277-
enable_scan_in_query=None, # type: Optional[bool]
278-
populate_query_metrics=None, # type: Optional[bool]
279-
**kwargs # type: Any
291+
self,
292+
query, # type: str
293+
parameters=None, # type: Optional[List[Dict[str, Any]]]
294+
partition_key=None, # type: Optional[Any]
295+
max_item_count=None, # type: Optional[int]
296+
enable_scan_in_query=None, # type: Optional[bool]
297+
populate_query_metrics=None, # type: Optional[bool]
298+
**kwargs # type: Any
280299
):
281300
# type: (...) -> AsyncItemPaged[Dict[str, Any]]
282301
"""Return all results matching the given `query`.
@@ -299,6 +318,11 @@ def query_items(
299318
:keyword str session_token: Token for use with Session consistency.
300319
:keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request.
301320
:keyword Callable response_hook: A callable invoked with the response metadata.
321+
**Provisional** keyword argument max_integrated_cache_staleness_in_ms
322+
:keyword int max_integrated_cache_staleness_in_ms:
323+
The max cache staleness for the integrated cache in milliseconds.
324+
For accounts configured to use the integrated cache, using Session or Eventual consistency,
325+
responses are guaranteed to be no staler than this value.
302326
:returns: An AsyncItemPaged of items (dicts).
303327
:rtype: AsyncItemPaged[Dict[str, Any]]
304328
@@ -332,6 +356,10 @@ def query_items(
332356
feed_options["partitionKey"] = self._set_partition_key(partition_key)
333357
else:
334358
feed_options["enableCrossPartitionQuery"] = True
359+
max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None)
360+
if max_integrated_cache_staleness_in_ms:
361+
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
362+
feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms
335363

336364
if hasattr(response_hook, "clear"):
337365
response_hook.clear()
@@ -350,12 +378,12 @@ def query_items(
350378

351379
@distributed_trace
352380
def query_items_change_feed(
353-
self,
354-
partition_key_range_id=None, # type: Optional[str]
355-
is_start_from_beginning=False, # type: bool
356-
continuation=None, # type: Optional[str]
357-
max_item_count=None, # type: Optional[int]
358-
**kwargs # type: Any
381+
self,
382+
partition_key_range_id=None, # type: Optional[str]
383+
is_start_from_beginning=False, # type: bool
384+
continuation=None, # type: Optional[str]
385+
max_item_count=None, # type: Optional[int]
386+
**kwargs # type: Any
359387
):
360388
# type: (...) -> AsyncItemPaged[Dict[str, Any]]
361389
"""Get a sorted list of items that were changed, in the order in which they were modified.
@@ -397,11 +425,11 @@ def query_items_change_feed(
397425

398426
@distributed_trace_async
399427
async def upsert_item(
400-
self,
401-
body, # type: Dict[str, Any]
402-
pre_trigger_include=None, # type: Optional[str]
403-
post_trigger_include=None, # type: Optional[str]
404-
**kwargs # type: Any
428+
self,
429+
body, # type: Dict[str, Any]
430+
pre_trigger_include=None, # type: Optional[str]
431+
post_trigger_include=None, # type: Optional[str]
432+
**kwargs # type: Any
405433
):
406434
# type: (...) -> Dict[str, Any]
407435
"""Insert or update the specified item.
@@ -442,12 +470,12 @@ async def upsert_item(
442470

443471
@distributed_trace_async
444472
async def replace_item(
445-
self,
446-
item, # type: Union[str, Dict[str, Any]]
447-
body, # type: Dict[str, Any]
448-
pre_trigger_include=None, # type: Optional[str]
449-
post_trigger_include=None, # type: Optional[str]
450-
**kwargs # type: Any
473+
self,
474+
item, # type: Union[str, Dict[str, Any]]
475+
body, # type: Dict[str, Any]
476+
pre_trigger_include=None, # type: Optional[str]
477+
post_trigger_include=None, # type: Optional[str]
478+
**kwargs # type: Any
451479
):
452480
# type: (...) -> Dict[str, Any]
453481
"""Replaces the specified item if it exists in the container.
@@ -487,12 +515,12 @@ async def replace_item(
487515

488516
@distributed_trace_async
489517
async def delete_item(
490-
self,
491-
item, # type: Union[str, Dict[str, Any]]
492-
partition_key, # type: Any
493-
pre_trigger_include=None, # type: Optional[str]
494-
post_trigger_include=None, # type: Optional[str]
495-
**kwargs # type: Any
518+
self,
519+
item, # type: Union[str, Dict[str, Any]]
520+
partition_key, # type: Any
521+
pre_trigger_include=None, # type: Optional[str]
522+
post_trigger_include=None, # type: Optional[str]
523+
**kwargs # type: Any
496524
):
497525
# type: (...) -> None
498526
"""Delete the specified item from the container.
@@ -617,12 +645,12 @@ def list_conflicts(self, max_item_count=None, **kwargs):
617645

618646
@distributed_trace
619647
def query_conflicts(
620-
self,
621-
query, # type: str
622-
parameters=None, # type: Optional[List[Dict[str, Any]]]
623-
partition_key=None, # type: Optional[Any]
624-
max_item_count=None, # type: Optional[int]
625-
**kwargs # type: Any
648+
self,
649+
query, # type: str
650+
parameters=None, # type: Optional[List[Dict[str, Any]]]
651+
partition_key=None, # type: Optional[Any]
652+
max_item_count=None, # type: Optional[int]
653+
**kwargs # type: Any
626654
):
627655
# type: (...) -> AsyncItemPaged[Dict[str, Any]]
628656
"""Return all conflicts matching a given `query`.
@@ -657,10 +685,10 @@ def query_conflicts(
657685

658686
@distributed_trace_async
659687
async def read_conflict(
660-
self,
661-
conflict, # type: Union[str, Dict[str, Any]]
662-
partition_key, # type: Any
663-
**kwargs # type: Any
688+
self,
689+
conflict, # type: Union[str, Dict[str, Any]]
690+
partition_key, # type: Any
691+
**kwargs # type: Any
664692
):
665693
# type: (Union[str, Dict[str, Any]], Any, Any) -> Dict[str, Any]
666694
"""Get the conflict identified by `conflict`.
@@ -686,10 +714,10 @@ async def read_conflict(
686714

687715
@distributed_trace_async
688716
async def delete_conflict(
689-
self,
690-
conflict, # type: Union[str, Dict[str, Any]]
691-
partition_key, # type: Any
692-
**kwargs # type: Any
717+
self,
718+
conflict, # type: Union[str, Dict[str, Any]]
719+
partition_key, # type: Any
720+
**kwargs # type: Any
693721
):
694722
# type: (Union[str, Dict[str, Any]], Any, Any) -> None
695723
"""Delete a specified conflict from the container.

0 commit comments

Comments
 (0)