Skip to content

Commit

Permalink
AQL Query Management (#35)
Browse files Browse the repository at this point in the history
* Fixing error message

* Adding superuser fixture

* Adding asyncio setting explicitly

* Not raising when using JwtConnection

* Introducing AQL query management operations
  • Loading branch information
apetenchea authored Jan 26, 2025
1 parent dca77aa commit f6c1a34
Show file tree
Hide file tree
Showing 9 changed files with 694 additions and 7 deletions.
333 changes: 331 additions & 2 deletions arangoasync/aql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,30 @@
from typing import Optional

from arangoasync.cursor import Cursor
from arangoasync.exceptions import AQLQueryExecuteError
from arangoasync.errno import HTTP_NOT_FOUND
from arangoasync.exceptions import (
AQLQueryClearError,
AQLQueryExecuteError,
AQLQueryExplainError,
AQLQueryKillError,
AQLQueryListError,
AQLQueryRulesGetError,
AQLQueryTrackingGetError,
AQLQueryTrackingSetError,
AQLQueryValidateError,
)
from arangoasync.executor import ApiExecutor
from arangoasync.request import Method, Request
from arangoasync.response import Response
from arangoasync.serialization import Deserializer, Serializer
from arangoasync.typings import Json, Jsons, QueryProperties, Result
from arangoasync.typings import (
Json,
Jsons,
QueryExplainOptions,
QueryProperties,
QueryTrackingConfiguration,
Result,
)


class AQL:
Expand Down Expand Up @@ -75,6 +93,9 @@ async def execute(
allow_dirty_read (bool | None): Allow reads from followers in a cluster.
options (QueryProperties | dict | None): Extra options for the query.
Returns:
Cursor: Result cursor.
References:
- `create-a-cursor <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#create-a-cursor>`__
""" # noqa: E501
Expand Down Expand Up @@ -113,3 +134,311 @@ def response_handler(resp: Response) -> Cursor:
return Cursor(self._executor, self.deserializer.loads(resp.raw_body))

return await self._executor.execute(request, response_handler)

async def tracking(self) -> Result[QueryTrackingConfiguration]:
"""Returns the current query tracking configuration.
Returns:
QueryTrackingConfiguration: Returns the current query tracking configuration.
Raises:
AQLQueryTrackingGetError: If retrieval fails.
References:
- `get-the-aql-query-tracking-configuration <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#get-the-aql-query-tracking-configuration>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint="/_api/query/properties")

def response_handler(resp: Response) -> QueryTrackingConfiguration:
if not resp.is_success:
raise AQLQueryTrackingGetError(resp, request)
return QueryTrackingConfiguration(self.deserializer.loads(resp.raw_body))

return await self._executor.execute(request, response_handler)

async def set_tracking(
self,
enabled: Optional[bool] = None,
max_slow_queries: Optional[int] = None,
slow_query_threshold: Optional[int] = None,
max_query_string_length: Optional[int] = None,
track_bind_vars: Optional[bool] = None,
track_slow_queries: Optional[int] = None,
) -> Result[QueryTrackingConfiguration]:
"""Configure AQL query tracking properties.
Args:
enabled (bool | None): If set to `True`, then queries will be tracked.
If set to `False`, neither queries nor slow queries will be tracked.
max_slow_queries (int | None): Maximum number of slow queries to track. Oldest
entries are discarded first.
slow_query_threshold (int | None): Runtime threshold (in seconds) for treating a
query as slow.
max_query_string_length (int | None): The maximum query string length (in bytes)
to keep in the list of queries.
track_bind_vars (bool | None): If set to `True`, track bind variables used in
queries.
track_slow_queries (int | None): If set to `True`, then slow queries will be
tracked in the list of slow queries if their runtime exceeds the
value set in `slowQueryThreshold`.
Returns:
QueryTrackingConfiguration: Returns the updated query tracking configuration.
Raises:
AQLQueryTrackingSetError: If setting the configuration fails.
References:
- `update-the-aql-query-tracking-configuration <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#update-the-aql-query-tracking-configuration>`__
""" # noqa: E501
data: Json = dict()

if enabled is not None:
data["enabled"] = enabled
if max_slow_queries is not None:
data["maxSlowQueries"] = max_slow_queries
if max_query_string_length is not None:
data["maxQueryStringLength"] = max_query_string_length
if slow_query_threshold is not None:
data["slowQueryThreshold"] = slow_query_threshold
if track_bind_vars is not None:
data["trackBindVars"] = track_bind_vars
if track_slow_queries is not None:
data["trackSlowQueries"] = track_slow_queries

request = Request(
method=Method.PUT,
endpoint="/_api/query/properties",
data=self.serializer.dumps(data),
)

def response_handler(resp: Response) -> QueryTrackingConfiguration:
if not resp.is_success:
raise AQLQueryTrackingSetError(resp, request)
return QueryTrackingConfiguration(self.deserializer.loads(resp.raw_body))

return await self._executor.execute(request, response_handler)

async def queries(self, all_queries: bool = False) -> Result[Jsons]:
"""Return a list of currently running queries.
Args:
all_queries (bool): If set to `True`, will return the currently
running queries in all databases, not just the selected one.
Using the parameter is only allowed in the `_system` database
and with superuser privileges.
Returns:
list: List of currently running queries and their properties.
Raises:
AQLQueryListError: If retrieval fails.
References:
- `list-the-running-queries <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#list-the-running-queries>`__
""" # noqa: E501
request = Request(
method=Method.GET,
endpoint="/_api/query/current",
params={"all": all_queries},
)

def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLQueryListError(resp, request)
return self.deserializer.loads_many(resp.raw_body)

return await self._executor.execute(request, response_handler)

async def slow_queries(self, all_queries: bool = False) -> Result[Jsons]:
"""Returns a list containing the last AQL queries that are finished and
have exceeded the slow query threshold in the selected database.
Args:
all_queries (bool): If set to `True`, will return the slow queries
in all databases, not just the selected one. Using the parameter
is only allowed in the `_system` database and with superuser privileges.
Returns:
list: List of slow queries.
Raises:
AQLQueryListError: If retrieval fails.
References:
- `list-the-slow-aql-queries <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#list-the-slow-aql-queries>`__
""" # noqa: E501
request = Request(
method=Method.GET,
endpoint="/_api/query/slow",
params={"all": all_queries},
)

def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLQueryListError(resp, request)
return self.deserializer.loads_many(resp.raw_body)

return await self._executor.execute(request, response_handler)

async def clear_slow_queries(self, all_queries: bool = False) -> Result[None]:
"""Clears the list of slow queries.
Args:
all_queries (bool): If set to `True`, will clear the slow queries
in all databases, not just the selected one. Using the parameter
is only allowed in the `_system` database and with superuser privileges.
Returns:
dict: Empty dictionary.
Raises:
AQLQueryClearError: If retrieval fails.
References:
- `clear-the-list-of-slow-aql-queries <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#clear-the-list-of-slow-aql-queries>`__
""" # noqa: E501
request = Request(
method=Method.DELETE,
endpoint="/_api/query/slow",
params={"all": all_queries},
)

def response_handler(resp: Response) -> None:
if not resp.is_success:
raise AQLQueryClearError(resp, request)

return await self._executor.execute(request, response_handler)

async def kill(
self,
query_id: str,
ignore_missing: bool = False,
all_queries: bool = False,
) -> Result[bool]:
"""Kill a running query.
Args:
query_id (str): Thea ID of the query to kill.
ignore_missing (bool): If set to `True`, will not raise an exception
if the query is not found.
all_queries (bool): If set to `True`, will kill the query in all databases,
not just the selected one. Using the parameter is only allowed in the
`_system` database and with superuser privileges.
Returns:
bool: `True` if the query was killed successfully.
Raises:
AQLQueryKillError: If killing the query fails.
References:
- `kill-a-running-aql-query <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#kill-a-running-aql-query>`__
""" # noqa: E501
request = Request(
method=Method.DELETE,
endpoint=f"/_api/query/{query_id}",
params={"all": all_queries},
)

def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
if resp.status_code == HTTP_NOT_FOUND and ignore_missing:
return False
raise AQLQueryKillError(resp, request)

return await self._executor.execute(request, response_handler)

async def explain(
self,
query: str,
bind_vars: Optional[Json] = None,
options: Optional[QueryExplainOptions | Json] = None,
) -> Result[Json]:
"""Inspect the query and return its metadata without executing it.
Args:
query (str): Query string to be explained.
bind_vars (dict | None): An object with key/value pairs representing
the bind parameters.
options (QueryExplainOptions | dict | None): Extra options for the query.
Returns:
dict: Query execution plan.
Raises:
AQLQueryExplainError: If retrieval fails.
References:
- `explain-an-aql-query <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#explain-an-aql-query>`__
""" # noqa: E501
data: Json = dict(query=query)
if bind_vars is not None:
data["bindVars"] = bind_vars
if options is not None:
if isinstance(options, QueryExplainOptions):
options = options.to_dict()
data["options"] = options

request = Request(
method=Method.POST,
endpoint="/_api/explain",
data=self.serializer.dumps(data),
)

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise AQLQueryExplainError(resp, request)
return self.deserializer.loads(resp.raw_body)

return await self._executor.execute(request, response_handler)

async def validate(self, query: str) -> Result[Json]:
"""Parse and validate the query without executing it.
Args:
query (str): Query string to be validated.
Returns:
dict: Query information.
Raises:
AQLQueryValidateError: If validation fails.
References:
- `parse-an-aql-query <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#parse-an-aql-query>`__
""" # noqa: E501
request = Request(
method=Method.POST,
endpoint="/_api/query",
data=self.serializer.dumps(dict(query=query)),
)

def response_handler(resp: Response) -> Json:
if not resp.is_success:
raise AQLQueryValidateError(resp, request)
return self.deserializer.loads(resp.raw_body)

return await self._executor.execute(request, response_handler)

async def query_rules(self) -> Result[Jsons]:
"""A list of all optimizer rules and their properties.
Returns:
list: Available optimizer rules.
Raises:
AQLQueryRulesGetError: If retrieval fails.
References:
- `list-all-aql-optimizer-rules <https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#list-all-aql-optimizer-rules>`__
""" # noqa: E501
request = Request(method=Method.GET, endpoint="/_api/query/rules")

def response_handler(resp: Response) -> Jsons:
if not resp.is_success:
raise AQLQueryRulesGetError(resp, request)
return self.deserializer.loads_many(resp.raw_body)

return await self._executor.execute(request, response_handler)
2 changes: 1 addition & 1 deletion arangoasync/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def needs_refresh(self, leeway: int = 0) -> bool:
def _validate(self) -> None:
"""Validate the token."""
if type(self._token) is not str:
raise TypeError("Token must be str or bytes")
raise TypeError("Token must be str")

jwt_payload = jwt.decode(
self._token,
Expand Down
2 changes: 0 additions & 2 deletions arangoasync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ async def send_request(self, request: Request) -> Response:
# If the token has expired, refresh it and retry the request
await self.refresh_token()
resp = await self.process_request(request)
self.raise_for_status(request, resp)
return resp


Expand Down Expand Up @@ -509,7 +508,6 @@ async def send_request(self, request: Request) -> Response:
self.compress_request(request)

resp = await self.process_request(request)
self.raise_for_status(request, resp)
return resp


Expand Down
Loading

0 comments on commit f6c1a34

Please sign in to comment.