diff --git a/docs/dax.rst b/docs/dax.rst new file mode 100644 index 000000000..e082593f3 --- /dev/null +++ b/docs/dax.rst @@ -0,0 +1,31 @@ +.. _dax: + +Use DAX +==================== + +Amazon DynamoDB Accelerator (DAX) is a write-through caching service that is designed to simplify the process of adding a cache to DynamoDB tables. + + +.. note:: + + 'query' and 'scan' requests will not hit DAX due to serious consistent issues. + + Because DAX operates separately from DynamoDB, it is important that you understand the consistency models of both DAX and DynamoDB to ensure that your applications behave as you expect. + See + `the documentation for more information `__. + + +.. code-block:: python + + from pynamodb.models import Model + from pynamodb.attributes import UnicodeAttribute + + + class Thread(Model): + class Meta: + table_name = "Thread" + dax_read_endpoints = ['xxxx:8111'] + dax_write_endpoints = ['xxxx:8111'] + + forum_name = UnicodeAttribute(hash_key=True) + diff --git a/docs/index.rst b/docs/index.rst index e52f36db4..377778ef6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,6 +19,7 @@ Features * Batch operations with automatic pagination * Iterators for working with Query and Scan operations * `Fully tested `_ +* Dax support Topics ====== @@ -47,6 +48,7 @@ Topics contributing release_notes versioning + dax API docs ======== diff --git a/pynamodb/connection/base.py b/pynamodb/connection/base.py index 57caef319..1e9769bb3 100644 --- a/pynamodb/connection/base.py +++ b/pynamodb/connection/base.py @@ -48,6 +48,7 @@ TableError, QueryError, PutError, DeleteError, UpdateError, GetError, ScanError, TableDoesNotExist, VerboseClientError, TransactGetError, TransactWriteError) +from amazondax.DaxError import DaxClientError from pynamodb.expressions.condition import Condition from pynamodb.expressions.operand import Path from pynamodb.expressions.projection import create_projection_expression @@ -55,6 +56,7 @@ from pynamodb.settings import get_settings_value from pynamodb.signals import pre_dynamodb_send, post_dynamodb_send from pynamodb.types import HASH, RANGE +from pynamodb.connection.dax import OP_READ, OP_WRITE, DaxClient from pynamodb.util import snake_to_camel_case BOTOCORE_EXCEPTIONS = (BotoCoreError, ClientError) @@ -249,7 +251,9 @@ def __init__(self, max_retry_attempts: Optional[int] = None, base_backoff_ms: Optional[int] = None, max_pool_connections: Optional[int] = None, - extra_headers: Optional[Mapping[str, str]] = None): + extra_headers: Optional[Mapping[str, str]] = None, + dax_write_endpoints: Optional[List[str]] = None, + dax_read_endpoints: Optional[List[str]] = None): self._tables: Dict[str, MetaTable] = {} self.host = host self._local = local() @@ -288,6 +292,16 @@ def __init__(self, self._extra_headers = extra_headers else: self._extra_headers = get_settings_value('extra_headers') + if dax_write_endpoints is not None: + self.dax_write_endpoints = dax_write_endpoints + else: + self.dax_write_endpoints = get_settings_value('dax_write_endpoints') + if dax_read_endpoints is not None: + self.dax_read_endpoints = dax_read_endpoints + else: + self.dax_read_endpoints = get_settings_value('dax_read_endpoints') + self._dax_write_client: Optional[DaxClient] = None + self._dax_read_client: Optional[DaxClient] = None def __repr__(self) -> str: return "Connection<{}>".format(self.client.meta.endpoint_url) @@ -360,6 +374,13 @@ def _make_api_call(self, operation_name, operation_kwargs): 1. It's faster to avoid using botocore's response parsing 2. It provides a place to monkey patch HTTP requests for unit testing """ + try: + if operation_name in OP_WRITE and self.dax_write_endpoints: + return self.dax_write_client.dispatch(operation_name, operation_kwargs) + elif operation_name in OP_READ and self.dax_read_endpoints: + return self.dax_read_client.dispatch(operation_name, operation_kwargs) + except DaxClientError: + raise operation_model = self.client._service_model.operation_model(operation_name) request_dict = self.client._convert_to_request_dict( operation_kwargs, @@ -532,6 +553,24 @@ def client(self): self._client = self.session.create_client(SERVICE_NAME, self.region, endpoint_url=self.host, config=config) return self._client + @property + def dax_write_client(self): + if self._dax_write_client is None: + self._dax_write_client = DaxClient( + endpoints=self.dax_write_endpoints, + region_name=self.region + ) + return self._dax_write_client + + @property + def dax_read_client(self): + if self._dax_read_client is None: + self._dax_read_client = DaxClient( + endpoints=self.dax_read_endpoints, + region_name=self.region + ) + return self._dax_read_client + def get_meta_table(self, table_name: str, refresh: bool = False): """ Returns a MetaTable diff --git a/pynamodb/connection/dax.py b/pynamodb/connection/dax.py new file mode 100644 index 000000000..6bbf54b9e --- /dev/null +++ b/pynamodb/connection/dax.py @@ -0,0 +1,37 @@ +from amazondax import AmazonDaxClient + + +OP_WRITE = { + 'PutItem': 'put_item', + 'DeleteItem': 'delete_item', + 'UpdateItem': 'update_item', + 'BatchWriteItem': 'batch_write_item', + 'TransactWriteItems': 'transact_write_items', + +} + +OP_READ = { + 'GetItem': 'get_item', + 'BatchGetItem': 'batch_get_item', + # query and scan has a serious consistency issue + # https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DAX.consistency.html#DAX.consistency.query-cache + # 'Query': 'query', + # 'Scan': 'scan', + 'TransactGetItems': 'transact_get_items', +} + +OP_NAME_TO_METHOD = OP_WRITE.copy() +OP_NAME_TO_METHOD.update(OP_READ) + + +class DaxClient(object): + + def __init__(self, endpoints, region_name): + self.connection = AmazonDaxClient( + endpoints=endpoints, + region_name=region_name + ) + + def dispatch(self, operation_name, kwargs): + method = getattr(self.connection, OP_NAME_TO_METHOD[operation_name]) + return method(**kwargs) diff --git a/pynamodb/connection/table.py b/pynamodb/connection/table.py index eebd153e0..638d24c4c 100644 --- a/pynamodb/connection/table.py +++ b/pynamodb/connection/table.py @@ -3,7 +3,7 @@ ~~~~~~~~~~~~~~~~~~~~~~~~~~~ """ -from typing import Any, Dict, Mapping, Optional, Sequence +from typing import Any, Dict, Mapping, Optional, Sequence, List from pynamodb.connection.base import Connection, MetaTable from pynamodb.constants import DEFAULT_BILLING_MODE, KEY @@ -30,10 +30,16 @@ def __init__( aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, + dax_write_endpoints: Optional[List[str]] = None, + dax_read_endpoints: Optional[List[str]] = None ) -> None: self._hash_keyname = None self._range_keyname = None self.table_name = table_name + if not dax_read_endpoints: + dax_read_endpoints = [] + if not dax_write_endpoints: + dax_write_endpoints = [] self.connection = Connection(region=region, host=host, connect_timeout_seconds=connect_timeout_seconds, @@ -41,7 +47,9 @@ def __init__( max_retry_attempts=max_retry_attempts, base_backoff_ms=base_backoff_ms, max_pool_connections=max_pool_connections, - extra_headers=extra_headers) + extra_headers=extra_headers, + dax_write_endpoints=dax_write_endpoints, + dax_read_endpoints=dax_read_endpoints) if aws_access_key_id and aws_secret_access_key: self.connection.session.set_credentials(aws_access_key_id, diff --git a/pynamodb/models.py b/pynamodb/models.py index 3faa0d2e3..f4ff3512f 100644 --- a/pynamodb/models.py +++ b/pynamodb/models.py @@ -192,6 +192,8 @@ class MetaModel(AttributeContainerMeta): aws_session_token: Optional[str] billing_mode: Optional[str] stream_view_type: Optional[str] + dax_write_endpoints: Optional[List[str]] + dax_read_endpoints: Optional[List[str]] """ Model meta class @@ -242,6 +244,10 @@ def __init__(self, name: str, bases: Any, attrs: Dict[str, Any]) -> None: setattr(attr_obj, 'aws_secret_access_key', None) if not hasattr(attr_obj, 'aws_session_token'): setattr(attr_obj, 'aws_session_token', None) + if not hasattr(attr_obj, 'dax_write_endpoints'): + setattr(attr_obj, 'dax_write_endpoints', get_settings_value('dax_write_endpoints')) + if not hasattr(attr_obj, 'dax_read_endpoints'): + setattr(attr_obj, 'dax_read_endpoints', get_settings_value('dax_read_endpoints')) elif isinstance(attr_obj, Index): attr_obj.Meta.model = cls if not hasattr(attr_obj.Meta, "index_name"): @@ -1108,7 +1114,9 @@ def _get_connection(cls) -> TableConnection: extra_headers=cls.Meta.extra_headers, aws_access_key_id=cls.Meta.aws_access_key_id, aws_secret_access_key=cls.Meta.aws_secret_access_key, - aws_session_token=cls.Meta.aws_session_token) + aws_session_token=cls.Meta.aws_session_token, + dax_write_endpoints=cls.Meta.dax_write_endpoints, + dax_read_endpoints=cls.Meta.dax_read_endpoints) return cls._connection def _deserialize(self, attrs): diff --git a/pynamodb/settings.py b/pynamodb/settings.py index e1ccd6549..c675f8662 100644 --- a/pynamodb/settings.py +++ b/pynamodb/settings.py @@ -16,6 +16,8 @@ 'region': 'us-east-1', 'max_pool_connections': 10, 'extra_headers': None, + 'dax_write_endpoints': [], + 'dax_read_endpoints': [] } OVERRIDE_SETTINGS_PATH = getenv('PYNAMODB_CONFIG', '/etc/pynamodb/global_default_settings.py') diff --git a/requirements-dev.txt b/requirements-dev.txt index faac6c1ed..c2b8409c6 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,6 +1,7 @@ pytest>=5 pytest-env pytest-mock +amazon-dax-client>=1.1.7 # Due to https://github.com/boto/botocore/issues/1872. Remove after botocore fixes. python-dateutil==2.8.0 diff --git a/setup.py b/setup.py index bbf7e1e65..63fda0afc 100644 --- a/setup.py +++ b/setup.py @@ -4,6 +4,7 @@ install_requires = [ 'botocore>=1.12.54', 'python-dateutil>=2.1,<3.0.0', + 'amazon-dax-client>=1.1.7' ] setup(