|
| 1 | +__all__ = [ |
| 2 | + "HTTPClient", |
| 3 | + "AioHTTPClient", |
| 4 | + "DefaultHTTPClient", |
| 5 | +] |
| 6 | + |
| 7 | +from abc import ABC, abstractmethod |
| 8 | +from typing import Any, Optional |
| 9 | + |
| 10 | +from aiohttp import BaseConnector, BasicAuth, ClientSession, ClientTimeout, TCPConnector |
| 11 | + |
| 12 | +from arangoasync.request import Request |
| 13 | +from arangoasync.response import Response |
| 14 | + |
| 15 | + |
| 16 | +class HTTPClient(ABC): # pragma: no cover |
| 17 | + """Abstract base class for HTTP clients. |
| 18 | + Custom HTTP clients should inherit from this class. |
| 19 | + """ |
| 20 | + |
| 21 | + @abstractmethod |
| 22 | + def create_session(self, host: str) -> Any: |
| 23 | + """Return a new session given the base host URL. |
| 24 | +
|
| 25 | + This method must be overridden by the user. |
| 26 | +
|
| 27 | + :param host: ArangoDB host URL. |
| 28 | + :type host: str |
| 29 | + :returns: Requests session object. |
| 30 | + :rtype: Any |
| 31 | + """ |
| 32 | + raise NotImplementedError |
| 33 | + |
| 34 | + @abstractmethod |
| 35 | + async def send_request( |
| 36 | + self, |
| 37 | + session: Any, |
| 38 | + request: Request, |
| 39 | + ) -> Response: |
| 40 | + """Send an HTTP request. |
| 41 | +
|
| 42 | + This method must be overridden by the user. |
| 43 | +
|
| 44 | + :param session: Session object. |
| 45 | + :type session: Any |
| 46 | + :param request: HTTP request. |
| 47 | + :type request: arangoasync.request.Request |
| 48 | + :returns: HTTP response. |
| 49 | + :rtype: arangoasync.response.Response |
| 50 | + """ |
| 51 | + raise NotImplementedError |
| 52 | + |
| 53 | + |
| 54 | +class AioHTTPClient(HTTPClient): |
| 55 | + """HTTP client implemented on top of [aiohttp](https://docs.aiohttp.org/en/stable/). |
| 56 | +
|
| 57 | + :param connector: Supports connection pooling. |
| 58 | + By default, 100 simultaneous connections are supported, with a 60-second timeout |
| 59 | + for connection reusing after release. |
| 60 | + :type connector: aiohttp.BaseConnector | None |
| 61 | + :param timeout: Timeout settings. |
| 62 | + 300s total timeout by default for a complete request/response operation. |
| 63 | + :type timeout: aiohttp.ClientTimeout | None |
| 64 | + :param read_bufsize: Size of read buffer (64KB default). |
| 65 | + :type read_bufsize: int |
| 66 | + :param auth: HTTP authentication helper. |
| 67 | + Should be used for specifying authorization data in client API. |
| 68 | + :type auth: aiohttp.BasicAuth | None |
| 69 | + :param compression_threshold: Will compress requests to the server if |
| 70 | + the size of the request body (in bytes) is at least the value of this |
| 71 | + option. |
| 72 | + :type compression_threshold: int |
| 73 | + """ |
| 74 | + |
| 75 | + def __init__( |
| 76 | + self, |
| 77 | + connector: Optional[BaseConnector] = None, |
| 78 | + timeout: Optional[ClientTimeout] = None, |
| 79 | + read_bufsize: int = 2**16, |
| 80 | + auth: Optional[BasicAuth] = None, |
| 81 | + compression_threshold: int = 1024, |
| 82 | + ) -> None: |
| 83 | + self._connector = connector or TCPConnector( |
| 84 | + keepalive_timeout=60, # timeout for connection reusing after releasing |
| 85 | + limit=100, # total number simultaneous connections |
| 86 | + ) |
| 87 | + self._timeout = timeout or ClientTimeout( |
| 88 | + total=300, # total number of seconds for the whole request |
| 89 | + connect=60, # max number of seconds for acquiring a pool connection |
| 90 | + ) |
| 91 | + self._read_bufsize = read_bufsize |
| 92 | + self._auth = auth |
| 93 | + self._compression_threshold = compression_threshold |
| 94 | + |
| 95 | + def create_session(self, host: str) -> ClientSession: |
| 96 | + """Return a new session given the base host URL. |
| 97 | +
|
| 98 | + :param host: ArangoDB host URL. Typically, the address and port of a coordinator |
| 99 | + (e.g. "http://127.0.0.1:8529"). |
| 100 | + :type host: str |
| 101 | + :returns: Session object. |
| 102 | + :rtype: aiohttp.ClientSession |
| 103 | + """ |
| 104 | + return ClientSession( |
| 105 | + base_url=host, |
| 106 | + connector=self._connector, |
| 107 | + timeout=self._timeout, |
| 108 | + auth=self._auth, |
| 109 | + read_bufsize=self._read_bufsize, |
| 110 | + ) |
| 111 | + |
| 112 | + async def send_request( |
| 113 | + self, |
| 114 | + session: ClientSession, |
| 115 | + request: Request, |
| 116 | + ) -> Response: |
| 117 | + """Send an HTTP request. |
| 118 | +
|
| 119 | + :param session: Session object. |
| 120 | + :type session: aiohttp.ClientSession |
| 121 | + :param request: HTTP request. |
| 122 | + :type request: arangoasync.request.Request |
| 123 | + :returns: HTTP response. |
| 124 | + :rtype: arangoasync.response.Response |
| 125 | + """ |
| 126 | + method = request.method |
| 127 | + endpoint = request.endpoint |
| 128 | + headers = request.headers |
| 129 | + params = request.params |
| 130 | + data = request.data |
| 131 | + compress = data is not None and len(data) >= self._compression_threshold |
| 132 | + |
| 133 | + async with session.request( |
| 134 | + method.name, |
| 135 | + endpoint, |
| 136 | + headers=headers, |
| 137 | + params=params, |
| 138 | + data=data, |
| 139 | + compress=compress, |
| 140 | + ) as response: |
| 141 | + raw_body = await response.read() |
| 142 | + return Response( |
| 143 | + method=method, |
| 144 | + url=str(response.real_url), |
| 145 | + headers=response.headers, |
| 146 | + status_code=response.status, |
| 147 | + status_text=response.reason, |
| 148 | + raw_body=raw_body, |
| 149 | + ) |
| 150 | + |
| 151 | + |
| 152 | +DefaultHTTPClient = AioHTTPClient |
0 commit comments