Skip to content

Commit

Permalink
Adding Streaming Transactions API (#31)
Browse files Browse the repository at this point in the history
* Adding support for streaming transactions

* Fixing error codes
  • Loading branch information
apetenchea authored Jan 1, 2025
1 parent 4e4ac99 commit c45a95b
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 4 deletions.
222 changes: 220 additions & 2 deletions arangoasync/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
__all__ = [
"Database",
"StandardDatabase",
"TransactionDatabase",
]


Expand All @@ -24,14 +25,19 @@
PermissionResetError,
PermissionUpdateError,
ServerStatusError,
TransactionAbortError,
TransactionCommitError,
TransactionInitError,
TransactionListError,
TransactionStatusError,
UserCreateError,
UserDeleteError,
UserGetError,
UserListError,
UserReplaceError,
UserUpdateError,
)
from arangoasync.executor import ApiExecutor, DefaultApiExecutor
from arangoasync.executor import ApiExecutor, DefaultApiExecutor, TransactionApiExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.serialization import Deserializer, Serializer
Expand Down Expand Up @@ -84,6 +90,16 @@ def deserializer(self) -> Deserializer[Json, Jsons]:
"""Return the deserializer."""
return self._executor.deserializer

@property
def context(self) -> str:
"""Return the API execution context.
Returns:
str: API execution context. Possible values are "default", "transaction".
:rtype: str
"""
return self._executor.context

async def properties(self) -> Result[DatabaseProperties]:
"""Return database properties.
Expand Down Expand Up @@ -1065,7 +1081,209 @@ def response_handler(resp: Response) -> Json:


class StandardDatabase(Database):
"""Standard database API wrapper."""
"""Standard database API wrapper.
Args:
connection (Connection): Connection object to be used by the API executor.
"""

def __init__(self, connection: Connection) -> None:
super().__init__(DefaultApiExecutor(connection))

def __repr__(self) -> str:
return f"<StandardDatabase {self.name}>"

async def begin_transaction(
self,
read: Optional[str | Sequence[str]] = None,
write: Optional[str | Sequence[str]] = None,
exclusive: Optional[str | Sequence[str]] = None,
wait_for_sync: Optional[bool] = None,
allow_implicit: Optional[bool] = None,
lock_timeout: Optional[int] = None,
max_transaction_size: Optional[int] = None,
allow_dirty_read: Optional[bool] = None,
skip_fast_lock_round: Optional[bool] = None,
) -> "TransactionDatabase":
"""Begin a Stream Transaction.
Args:
read (str | list | None): Name(s) of collections read during transaction.
Read-only collections are added lazily but should be declared if
possible to avoid deadlocks.
write (str | list | None): Name(s) of collections written to during
transaction with shared access.
exclusive (str | list | None): Name(s) of collections written to during
transaction with exclusive access.
wait_for_sync (bool | None): If `True`, will force the transaction to write
all data to disk before returning
allow_implicit (bool | None): Allow reading from undeclared collections.
lock_timeout (int | None): Timeout for waiting on collection locks. Setting
it to 0 will make ArangoDB not time out waiting for a lock.
max_transaction_size (int | None): Transaction size limit in bytes.
allow_dirty_read (bool | None): If `True`, allows the Coordinator to ask any
shard replica for the data, not only the shard leader. This may result
in “dirty reads”. This setting decides about dirty reads for the entire
transaction. Individual read operations, that are performed as part of
the transaction, cannot override it.
skip_fast_lock_round (bool | None): Whether to disable fast locking for
write operations.
Returns:
TransactionDatabase: Database API wrapper specifically tailored for
transactions.
Raises:
TransactionInitError: If the operation fails on the server side.
"""
collections = dict()
if read is not None:
collections["read"] = read
if write is not None:
collections["write"] = write
if exclusive is not None:
collections["exclusive"] = exclusive

data: Json = dict(collections=collections)
if wait_for_sync is not None:
data["waitForSync"] = wait_for_sync
if allow_implicit is not None:
data["allowImplicit"] = allow_implicit
if lock_timeout is not None:
data["lockTimeout"] = lock_timeout
if max_transaction_size is not None:
data["maxTransactionSize"] = max_transaction_size
if skip_fast_lock_round is not None:
data["skipFastLockRound"] = skip_fast_lock_round

headers = dict()
if allow_dirty_read is not None:
headers["x-arango-allow-dirty-read"] = str(allow_dirty_read).lower()

request = Request(
method=Method.POST,
endpoint="/_api/transaction/begin",
data=self.serializer.dumps(data),
headers=headers,
)

def response_handler(resp: Response) -> str:
if not resp.is_success:
raise TransactionInitError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)["result"]
return cast(str, result["id"])

transaction_id = await self._executor.execute(request, response_handler)
return TransactionDatabase(self.connection, transaction_id)

def fetch_transaction(self, transaction_id: str) -> "TransactionDatabase":
"""Fetch an existing transaction.
Args:
transaction_id (str): Transaction ID.
Returns:
TransactionDatabase: Database API wrapper specifically tailored for
transactions.
"""
return TransactionDatabase(self.connection, transaction_id)

async def list_transactions(self) -> Result[Jsons]:
"""List all currently running stream transactions.
Returns:
list: List of transactions, with each transaction containing
an "id" and a "state" field.
Raises:
TransactionListError: If the operation fails on the server side.
"""
request = Request(method=Method.GET, endpoint="/_api/transaction")

def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise TransactionListError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)
return cast(Jsons, result["transactions"])

return await self._executor.execute(request, response_handler)


class TransactionDatabase(Database):
"""Database API tailored specifically for
`Stream Transactions <https://docs.arangodb.com/stable/develop/http-api/transactions/stream-transactions/>`__.
It allows you start a transaction, run multiple operations (eg. AQL queries) over a short period of time,
and then commit or abort the transaction.
See :func:`arangoasync.database.StandardDatabase.begin_transaction`.
Args:
connection (Connection): Connection object to be used by the API executor.
transaction_id (str): Transaction ID.
""" # noqa: E501

def __init__(self, connection: Connection, transaction_id: str) -> None:
super().__init__(TransactionApiExecutor(connection, transaction_id))
self._standard_executor = DefaultApiExecutor(connection)
self._transaction_id = transaction_id

def __repr__(self) -> str:
return f"<TransactionDatabase {self.name}>"

@property
def transaction_id(self) -> str:
"""Transaction ID."""
return self._transaction_id

async def transaction_status(self) -> str:
"""Get the status of the transaction.
Returns:
str: Transaction status: one of "running", "committed" or "aborted".
Raises:
TransactionStatusError: If the transaction is not found.
"""
request = Request(
method=Method.GET,
endpoint=f"/_api/transaction/{self.transaction_id}",
)

def response_handler(resp: Response) -> str:
if not resp.is_success:
raise TransactionStatusError(resp, request)
result: Json = self.deserializer.loads(resp.raw_body)["result"]
return cast(str, result["status"])

return await self._executor.execute(request, response_handler)

async def commit_transaction(self) -> None:
"""Commit the transaction.
Raises:
TransactionCommitError: If the operation fails on the server side.
"""
request = Request(
method=Method.PUT,
endpoint=f"/_api/transaction/{self.transaction_id}",
)

def response_handler(resp: Response) -> None:
if not resp.is_success:
raise TransactionCommitError(resp, request)

await self._executor.execute(request, response_handler)

async def abort_transaction(self) -> None:
"""Abort the transaction."""
request = Request(
method=Method.DELETE,
endpoint=f"/_api/transaction/{self.transaction_id}",
)

def response_handler(resp: Response) -> None:
if not resp.is_success:
raise TransactionAbortError(resp, request)

await self._executor.execute(request, response_handler)
20 changes: 20 additions & 0 deletions arangoasync/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,26 @@ class ServerStatusError(ArangoServerError):
"""Failed to retrieve server status."""


class TransactionAbortError(ArangoServerError):
"""Failed to abort transaction."""


class TransactionCommitError(ArangoServerError):
"""Failed to commit transaction."""


class TransactionInitError(ArangoServerError):
"""Failed to initialize transaction."""


class TransactionListError(ArangoServerError):
"""Failed to retrieve transactions."""


class TransactionStatusError(ArangoServerError):
"""Failed to retrieve transaction status."""


class UserCreateError(ArangoServerError):
"""Failed to create user."""

Expand Down
37 changes: 36 additions & 1 deletion arangoasync/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,39 @@ async def execute(
return response_handler(response)


ApiExecutor = DefaultApiExecutor
class TransactionApiExecutor(DefaultApiExecutor):
"""Executes transaction API requests.
Args:
connection: HTTP connection.
transaction_id: str: Transaction ID generated by the server.
"""

def __init__(self, connection: Connection, transaction_id: str) -> None:
super().__init__(connection)
self._id = transaction_id

@property
def context(self) -> str:
return "transaction"

@property
def id(self) -> str:
"""Return the transaction ID."""
return self._id

async def execute(
self, request: Request, response_handler: Callable[[Response], T]
) -> T:
"""Execute the request and handle the response.
Args:
request: HTTP request.
response_handler: HTTP response handler.
"""
request.headers["x-arango-trx-id"] = self.id
response = await self._conn.send_request(request)
return response_handler(response)


ApiExecutor = DefaultApiExecutor | TransactionApiExecutor
2 changes: 1 addition & 1 deletion arangoasync/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ class IndexProperties(JsonWrapper):
}
References:
- `get-an-index <https://docs.arangodb.com/3.12/develop/http-api/indexes/#get-an-index>`__
- `get-an-index <https://docs.arangodb.com/stable/develop/http-api/indexes/#get-an-index>`__
""" # noqa: E501

def __init__(self, data: Json) -> None:
Expand Down
Loading

0 comments on commit c45a95b

Please sign in to comment.