Skip to content

Commit

Permalink
AQL Support (#34)
Browse files Browse the repository at this point in the history
* Cursor creation works

* Cursor functionality is complete. More tests to be added.

* Completing cursor tests

* Fixing test issues
  • Loading branch information
apetenchea authored Jan 25, 2025
1 parent abe25bb commit dca77aa
Show file tree
Hide file tree
Showing 9 changed files with 1,428 additions and 2 deletions.
115 changes: 115 additions & 0 deletions arangoasync/aql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
__all__ = ["AQL"]


from typing import Optional

from arangoasync.cursor import Cursor
from arangoasync.exceptions import AQLQueryExecuteError
from arangoasync.executor import ApiExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.serialization import Deserializer, Serializer
from arangoasync.typings import Json, Jsons, QueryProperties, Result


class AQL:
"""AQL (ArangoDB Query Language) API wrapper.
Allows you to execute, track, kill, explain, and validate queries written
in ArangoDB’s query language.
Args:
executor: API executor. Required to execute the API requests.
"""

def __init__(self, executor: ApiExecutor) -> None:
self._executor = executor

@property
def name(self) -> str:
"""Return the name of the current database."""
return self._executor.db_name

@property
def serializer(self) -> Serializer[Json]:
"""Return the serializer."""
return self._executor.serializer

@property
def deserializer(self) -> Deserializer[Json, Jsons]:
"""Return the deserializer."""
return self._executor.deserializer

def __repr__(self) -> str:
return f"<AQL in {self.name}>"

async def execute(
self,
query: str,
count: Optional[bool] = None,
batch_size: Optional[int] = None,
bind_vars: Optional[Json] = None,
cache: Optional[bool] = None,
memory_limit: Optional[int] = None,
ttl: Optional[int] = None,
allow_dirty_read: Optional[bool] = None,
options: Optional[QueryProperties | Json] = None,
) -> Result[Cursor]:
"""Execute the query and return the result cursor.
Args:
query (str): Query string to be executed.
count (bool | None): If set to `True`, the total document count is
calculated and included in the result cursor.
batch_size (int | None): Maximum number of result documents to be
transferred from the server to the client in one roundtrip.
bind_vars (dict | None): An object with key/value pairs representing
the bind parameters.
cache (bool | None): Flag to determine whether the AQL query results
cache shall be used.
memory_limit (int | None): Maximum memory (in bytes) that the query is
allowed to use.
ttl (int | None): The time-to-live for the cursor (in seconds). The cursor
will be removed on the server automatically after the specified amount
of time.
allow_dirty_read (bool | None): Allow reads from followers in a cluster.
options (QueryProperties | dict | None): Extra options for the query.
References:
- `create-a-cursor <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#create-a-cursor>`__
""" # noqa: E501
data: Json = dict(query=query)
if count is not None:
data["count"] = count
if batch_size is not None:
data["batchSize"] = batch_size
if bind_vars is not None:
data["bindVars"] = bind_vars
if cache is not None:
data["cache"] = cache
if memory_limit is not None:
data["memoryLimit"] = memory_limit
if ttl is not None:
data["ttl"] = ttl
if options is not None:
if isinstance(options, QueryProperties):
options = options.to_dict()
data["options"] = options

headers = dict()
if allow_dirty_read is not None:
headers["x-arango-allow-dirty-read"] = str(allow_dirty_read).lower()

request = Request(
method=Method.POST,
endpoint="/_api/cursor",
data=self.serializer.dumps(data),
headers=headers,
)

def response_handler(resp: Response) -> Cursor:
if not resp.is_success:
raise AQLQueryExecuteError(resp, request)
return Cursor(self._executor, self.deserializer.loads(resp.raw_body))

return await self._executor.execute(request, response_handler)
262 changes: 262 additions & 0 deletions arangoasync/cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
__all__ = ["Cursor"]


from collections import deque
from typing import Any, Deque, List, Optional

from arangoasync.errno import HTTP_NOT_FOUND
from arangoasync.exceptions import (
CursorCloseError,
CursorCountError,
CursorEmptyError,
CursorNextError,
CursorStateError,
)
from arangoasync.executor import ApiExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.serialization import Deserializer, Serializer
from arangoasync.typings import (
Json,
Jsons,
QueryExecutionExtra,
QueryExecutionPlan,
QueryExecutionProfile,
QueryExecutionStats,
)


class Cursor:
"""Cursor API wrapper.
Cursors fetch query results from ArangoDB server in batches. Cursor objects
are *stateful* as they store the fetched items in-memory. They must not be
shared across threads without a proper locking mechanism.
Args:
executor: Required to execute the API requests.
data: Cursor initialization data. Returned by the server when the query
is created.
"""

def __init__(self, executor: ApiExecutor, data: Json) -> None:
self._executor = executor
self._cached: Optional[bool] = None
self._count: Optional[int] = None
self._extra = QueryExecutionExtra({})
self._has_more: Optional[bool] = None
self._id: Optional[str] = None
self._next_batch_id: Optional[str] = None
self._batch: Deque[Any] = deque()
self._update(data)

def __aiter__(self) -> "Cursor":
return self

async def __anext__(self) -> Any:
return await self.next()

async def __aenter__(self) -> "Cursor":
return self

async def __aexit__(self, *_: Any) -> None:
await self.close(ignore_missing=True)

def __len__(self) -> int:
if self._count is None:
raise CursorCountError("Cursor count not enabled")
return self._count

def __repr__(self) -> str:
return f"<Cursor {self._id}>" if self._id else "<Cursor>"

@property
def cached(self) -> Optional[bool]:
"""Whether the result was served from the query cache or not."""
return self._cached

@property
def count(self) -> Optional[int]:
"""The total number of result documents available."""
return self._count

@property
def extra(self) -> QueryExecutionExtra:
"""Extra information about the query execution."""
return self._extra

@property
def has_more(self) -> Optional[bool]:
"""Whether there are more results available on the server."""
return self._has_more

@property
def id(self) -> Optional[str]:
"""Cursor ID."""
return self._id

@property
def next_batch_id(self) -> Optional[str]:
"""ID of the batch after current one."""
return self._next_batch_id

@property
def batch(self) -> Deque[Any]:
"""Return the current batch of results."""
return self._batch

@property
def serializer(self) -> Serializer[Json]:
"""Return the serializer."""
return self._executor.serializer

@property
def deserializer(self) -> Deserializer[Json, Jsons]:
"""Return the deserializer."""
return self._executor.deserializer

@property
def statistics(self) -> QueryExecutionStats:
"""Query statistics."""
return self.extra.stats

@property
def profile(self) -> QueryExecutionProfile:
"""Query profiling information."""
return self.extra.profile

@property
def plan(self) -> QueryExecutionPlan:
"""Execution plan for the query."""
return self.extra.plan

@property
def warnings(self) -> List[Json]:
"""Warnings generated during query execution."""
return self.extra.warnings

def empty(self) -> bool:
"""Check if the current batch is empty."""
return len(self._batch) == 0

async def next(self) -> Any:
"""Retrieve and pop the next item.
If current batch is empty/depleted, an API request is automatically
sent to fetch the next batch from the server and update the cursor.
Returns:
Any: Next item.
Raises:
StopAsyncIteration: If there are no more items to retrieve.
CursorNextError: If the cursor failed to fetch the next batch.
CursorStateError: If the cursor ID is not set.
"""
if self.empty():
if not self.has_more:
raise StopAsyncIteration
await self.fetch()
return self.pop()

def pop(self) -> Any:
"""Pop the next item from the current batch.
If current batch is empty/depleted, an exception is raised. You must
call :func:`arangoasync.cursor.Cursor.fetch` to manually fetch the next
batch from server.
Returns:
Any: Next item from the current batch.
Raises:
CursorEmptyError: If the current batch is empty.
"""
try:
return self._batch.popleft()
except IndexError:
raise CursorEmptyError("Current batch is empty")

async def fetch(self, batch_id: Optional[str] = None) -> List[Any]:
"""Fetch the next batch from the server and update the cursor.
Args:
batch_id (str | None): ID of the batch to fetch. If not set, the
next batch after the current one is fetched.
Returns:
List[Any]: New batch results.
Raises:
CursorNextError: If the cursor is empty.
CursorStateError: If the cursor ID is not set.
References:
- `read-the-next-batch-from-a-cursor <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#read-the-next-batch-from-a-cursor>`__
- `read-a-batch-from-the-cursor-again <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#read-a-batch-from-the-cursor-again>`__
""" # noqa: E501
if self._id is None:
raise CursorStateError("Cursor ID is not set")

endpoint = f"/_api/cursor/{self._id}"
if batch_id is not None:
endpoint += f"/{batch_id}"

request = Request(
method=Method.POST,
endpoint=endpoint,
)

def response_handler(resp: Response) -> List[Any]:
if not resp.is_success:
raise CursorNextError(resp, request)
return self._update(self.deserializer.loads(resp.raw_body))

return await self._executor.execute(request, response_handler)

async def close(self, ignore_missing: bool = False) -> bool:
"""Close the cursor and free any server resources associated with it.
Args:
ignore_missing (bool): Do not raise an exception on missing cursor.
Returns:
bool: `True` if the cursor was closed successfully. `False` if there
was no cursor to close. If there is no cursor associated with the
query, `False` is returned.
Raises:
CursorCloseError: If the cursor failed to close.
References:
- `delete-a-cursor <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#delete-a-cursor>`__
""" # noqa: E501
if self._id is None:
return False

request = Request(
method=Method.DELETE,
endpoint=f"/_api/cursor/{self._id}",
)

def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
if resp.status_code == HTTP_NOT_FOUND and ignore_missing:
return False
raise CursorCloseError(resp, request)

return await self._executor.execute(request, response_handler)

def _update(self, data: Json) -> List[Any]:
"""Update the cursor with the new data."""
if "id" in data:
self._id = data.get("id")
self._cached = data.get("cached")
self._count = data.get("count")
self._extra = QueryExecutionExtra(data.get("extra", dict()))
self._has_more = data.get("hasMore")
self._next_batch_id = data.get("nextBatchId")
result: List[Any] = data.get("result", list())
self._batch.extend(result)
return result
Loading

0 comments on commit dca77aa

Please sign in to comment.