-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathfunctions_client.py
132 lines (111 loc) · 4.2 KB
/
functions_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from typing import Any, Dict, Literal, Optional, Union
from warnings import warn
from httpx import HTTPError, Response
from ..errors import FunctionsHttpError, FunctionsRelayError
from ..utils import (
FunctionRegion,
SyncClient,
is_http_url,
is_valid_jwt,
is_valid_str_arg,
)
from ..version import __version__
class SyncFunctionsClient:
def __init__(
self,
url: str,
headers: Dict,
timeout: int,
verify: bool = True,
proxy: Optional[str] = None,
):
if not is_http_url(url):
raise ValueError("url must be a valid HTTP URL string")
self.url = url
self.headers = {
"User-Agent": f"supabase-py/functions-py v{__version__}",
**headers,
}
self._client = SyncClient(
base_url=self.url,
headers=self.headers,
verify=bool(verify),
timeout=int(abs(timeout)),
proxy=proxy,
follow_redirects=True,
http2=True,
)
def _request(
self,
method: Literal["GET", "OPTIONS", "HEAD", "POST", "PUT", "PATCH", "DELETE"],
url: str,
headers: Optional[Dict[str, str]] = None,
json: Optional[Dict[Any, Any]] = None,
) -> Response:
user_data = {"data": json} if isinstance(json, str) else {"json": json}
response = self._client.request(method, url, **user_data, headers=headers)
try:
response.raise_for_status()
except HTTPError as exc:
status_code = None
if hasattr(response, "status_code"):
status_code = response.status_code
raise FunctionsHttpError(
response.json().get("error")
or f"An error occurred while requesting your edge function at {exc.request.url!r}.",
status_code,
) from exc
return response
def set_auth(self, token: str) -> None:
"""Updates the authorization header
Parameters
----------
token : str
the new jwt token sent in the authorization header
"""
if not is_valid_jwt(token):
raise ValueError("token must be a valid JWT authorization token string.")
self.headers["Authorization"] = f"Bearer {token}"
def invoke(
self, function_name: str, invoke_options: Optional[Dict] = None
) -> Union[Dict, bytes]:
"""Invokes a function
Parameters
----------
function_name : the name of the function to invoke
invoke_options : object with the following properties
`headers`: object representing the headers to send with the request
`body`: the body of the request
`responseType`: how the response should be parsed. The default is `json`
"""
if not is_valid_str_arg(function_name):
raise ValueError("function_name must a valid string value.")
headers = self.headers
body = None
response_type = "text/plain"
if invoke_options is not None:
headers.update(invoke_options.get("headers", {}))
response_type = invoke_options.get("responseType", "text/plain")
region = invoke_options.get("region")
if region:
if not isinstance(region, FunctionRegion):
warn(f"Use FunctionRegion({region})")
region = FunctionRegion(region)
if region.value != "any":
headers["x-region"] = region.value
body = invoke_options.get("body")
if isinstance(body, str):
headers["Content-Type"] = "text/plain"
elif isinstance(body, dict):
headers["Content-Type"] = "application/json"
response = self._request(
"POST", f"{self.url}/{function_name}", headers=headers, json=body
)
is_relay_error = response.headers.get("x-relay-header")
if is_relay_error and is_relay_error == "true":
raise FunctionsRelayError(response.json().get("error"))
if response_type == "json":
data = response.json()
else:
data = response.content
return data