Skip to content

Commit

Permalink
mypy (#4)
Browse files Browse the repository at this point in the history
Introduce mypy for type checking
  • Loading branch information
MeshanKhosla authored Jan 11, 2024
1 parent ecb0535 commit 761a214
Show file tree
Hide file tree
Showing 16 changed files with 103 additions and 90 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pytest = "^7.4.3"
black = "^23.12.1"
python-dotenv = "^1.0.0"
pytest-asyncio = "^0.23.3"
mypy = "^1.8.0"

[build-system]
requires = ["poetry-core"]
Expand Down
2 changes: 1 addition & 1 deletion upstash_qstash/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(
Asynchronous QStash client.
To use the blocking version, use the upstash_qstash client instead.
"""
self.http = HttpClient(token, retry, base_url)
self.http = HttpClient(token, retry, base_url or DEFAULT_BASE_URL)

async def publish(self, req: PublishRequest):
"""
Expand Down
17 changes: 9 additions & 8 deletions upstash_qstash/asyncio/dlq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional
from upstash_qstash.upstash_http import HttpClient
from upstash_qstash.dlq import ListMessagesOpts, ListMessageResponse
from upstash_qstash.qstash_types import UpstashRequest


class DLQ:
Expand All @@ -13,14 +14,14 @@ async def list_messages(
"""
Asynchronously list messages in the dlq
"""
cursor = opts.get("cursor") if opts else None
return await self.http.request_async(
{
"path": ["v2", "dlq"],
"method": "GET",
"query": {"cursor": cursor},
}
)
req: UpstashRequest = {
"path": ["v2", "dlq"],
"method": "GET",
}
if opts is not None and opts.get("cursor") is not None:
req["query"] = {"cursor": opts["cursor"]}

return await self.http.request_async(req)

async def delete(self, dlq_message_id: str):
"""
Expand Down
4 changes: 2 additions & 2 deletions upstash_qstash/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ async def get(
"""
Asynchronously retrieve logs.
"""
query: Dict[str, int] = {}
query = {}
if req is not None and req.get("cursor") is not None and req["cursor"] > 0:
query["cursor"] = req["cursor"]
query["cursor"] = str(req["cursor"])

return await http.request_async(
{
Expand Down
2 changes: 1 addition & 1 deletion upstash_qstash/asyncio/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def publish_async(

return await http.request_async(
{
"path": ["v2", "publish", req.get("url") or req.get("topic")],
"path": ["v2", "publish", req.get("url") or req["topic"]],
"body": req.get("body"),
"headers": headers,
"method": "POST",
Expand Down
3 changes: 2 additions & 1 deletion upstash_qstash/asyncio/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ async def create(self, req: CreateScheduleRequest) -> CreateScheduleResponse:
:return: A dictionary containing the 'scheduleId' of the created schedule.
:raises UpstashError: If required headers are missing.
"""
SyncSchedules._validate_schedule_request(req)
headers = SyncSchedules._prepare_headers(req)

return await self.http.request_async(
{
"path": ["v2", "schedules", req.get("destination")],
"path": ["v2", "schedules", req["destination"]],
"body": req.get("body"),
"headers": headers,
"method": "POST",
Expand Down
3 changes: 1 addition & 2 deletions upstash_qstash/asyncio/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
Topic,
Topics as SyncTopics,
)
from upstash_qstash.error import QstashException
import json


Expand Down Expand Up @@ -37,7 +36,7 @@ async def remove_endpoints(self, req: RemoveEndpointsRequest):
:param req: An instance of RemoveEndpointsRequest containing the name and endpoints
"""
self._validate_topic_request(req)
SyncTopics._validate_topic_request(req)
await self.http.request_async(
{
"path": ["v2", "topics", req["name"], "endpoints"],
Expand Down
2 changes: 1 addition & 1 deletion upstash_qstash/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(
Synchronous QStash client.
To use the blocking version, use the upstash_qstash client instead.
"""
self.http = HttpClient(token, retry, base_url)
self.http = HttpClient(token, retry, base_url or DEFAULT_BASE_URL)

def publish(self, req: PublishRequest):
"""
Expand Down
20 changes: 10 additions & 10 deletions upstash_qstash/dlq.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Optional, TypedDict, List, Dict
from upstash_qstash.qstash_types import Method
from upstash_qstash.qstash_types import Method, UpstashRequest
from upstash_qstash.upstash_http import HttpClient

DlqMessage = TypedDict(
Expand All @@ -23,7 +23,7 @@
ListMessagesOpts = TypedDict(
"ListMessagesOpts",
{
"cursor": Optional[str],
"cursor": str,
},
)

Expand All @@ -46,14 +46,14 @@ def list_messages(
"""
List messages in the dlq
"""
cursor = opts.get("cursor") if opts else None
return self.http.request(
{
"path": ["v2", "dlq"],
"method": "GET",
"query": {"cursor": cursor},
}
)
req: UpstashRequest = {
"path": ["v2", "dlq"],
"method": "GET",
}
if opts is not None and opts.get("cursor") is not None:
req["query"] = {"cursor": opts["cursor"]}

return self.http.request(req)

def delete(self, dlq_message_id: str):
"""
Expand Down
6 changes: 3 additions & 3 deletions upstash_qstash/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class State(Enum):
EventsRequest = TypedDict(
"EventsRequest",
{
"cursor": str,
"cursor": int,
},
)

Expand All @@ -48,9 +48,9 @@ def get(http: HttpClient, req: Optional[EventsRequest] = None) -> GetEventsRespo
"""
Retrieve logs.
"""
query: Dict[str, int] = {}
query = {}
if req is not None and req.get("cursor") is not None and req["cursor"] > 0:
query["cursor"] = req["cursor"]
query["cursor"] = str(req["cursor"])

return http.request(
{
Expand Down
29 changes: 15 additions & 14 deletions upstash_qstash/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@
{
"url": str,
"body": Any,
"headers": Optional[Dict[Any, Any]],
"delay": Optional[int],
"not_before": Optional[int],
"deduplication_id": Optional[str],
"content_based_deduplication": Optional[bool],
"retries": Optional[int],
"callback": Optional[str],
"failure_callback": Optional[str],
"method": Optional[Method],
"topic": Optional[str],
"headers": Dict[Any, Any],
"delay": int,
"not_before": int,
"deduplication_id": str,
"content_based_deduplication": bool,
"retries": int,
"callback": str,
"failure_callback": str,
"method": Method,
"topic": str,
},
total=False,
)

PublishToUrlResponse = TypedDict(
"PublishResponse",
"PublishToUrlResponse",
{"messageId": str, "deduplicated": Optional[bool]},
)

Expand Down Expand Up @@ -54,8 +55,8 @@ def _prepare_headers(req: PublishRequest) -> UpstashHeaders:
"""
Prepare and return headers for the publish request.
"""
headers: UpstashHeaders = req.get("headers") or {}
prefix_headers(headers)
init_headers = req.get("headers") or {}
headers = prefix_headers(init_headers)

headers["Upstash-Method"] = req.get("method") or "POST"

Expand Down Expand Up @@ -94,7 +95,7 @@ def publish(

return http.request(
{
"path": ["v2", "publish", req.get("url") or req.get("topic")],
"path": ["v2", "publish", req.get("url") or req["topic"]],
"body": req.get("body"),
"headers": headers,
"method": "POST",
Expand Down
55 changes: 25 additions & 30 deletions upstash_qstash/qstash_types.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,43 @@
from typing import Optional, Dict, Any, List, TypedDict
from enum import Enum
from typing import Optional, Dict, Any, List, TypedDict, Callable, Literal, Union


class Method(Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
PATCH = "PATCH"
Method = Union[
Literal["GET"], Literal["POST"], Literal["PUT"], Literal["DELETE"], Literal["PATCH"]
]

HeaderKey = Union[
Literal["Upstash-Method"],
Literal["Upstash-Delay"],
Literal["Upstash-Not-Before"],
Literal["Upstash-Deduplication-Id"],
Literal["Upstash-Content-Based-Deduplication"],
Literal["Upstash-Retries"],
Literal["Upstash-Callback"],
Literal["Upstash-Failure-Callback"],
Literal["Upstash-Cron"],
str, # This allows for any other string as a key (Upstash-Forward-*)
]

UpstashHeaders = TypedDict(
"UpstashHeaders",
{
"Upstash-Method": Method,
"Upstash-Delay": Optional[str],
"Upstash-Not-Before": Optional[str],
"Upstash-Deduplication-Id": Optional[str],
"Upstash-Content-Based-Deduplication": Optional[str],
"Upstash-Retries": Optional[str],
"Upstash-Callback": Optional[str],
"Upstash-Failure-Callback": Optional[str],
"Upstash-Cron": Optional[str],
# Other headers are prefixed with Upstash-Forward-
},
)
UpstashHeaders = Dict[HeaderKey, Optional[Union[str, Method]]]

UpstashRequest = TypedDict(
"UpstashRequest",
{
"path": List[str],
"body": Optional[Any],
"headers": Optional[UpstashHeaders],
"keepalive": Optional[bool],
"method": Optional[Method],
"query": Optional[Dict[str, str]],
"parse_response_as_json": Optional[bool],
"body": Any,
"headers": UpstashHeaders,
"keepalive": bool,
"method": Method,
"query": Dict[str, str],
"parse_response_as_json": bool,
},
total=False,
)

RetryConfig = TypedDict(
"RetryConfig",
{
"attempts": int,
"backoff": callable,
"backoff": Callable,
},
)
17 changes: 13 additions & 4 deletions upstash_qstash/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from upstash_qstash.qstash_types import Method, UpstashHeaders
from upstash_qstash.upstash_http import HttpClient
from upstash_qstash.utils import prefix_headers
from upstash_qstash.error import QstashException

Schedule = TypedDict(
"Schedule",
Expand Down Expand Up @@ -50,8 +51,8 @@ def __init__(self, http: HttpClient):

@staticmethod
def _prepare_headers(req: CreateScheduleRequest) -> UpstashHeaders:
headers: UpstashHeaders = req.get("headers") or {}
prefix_headers(headers)
init_headers = req.get("headers") or {}
headers = prefix_headers(init_headers)

headers["Upstash-Method"] = req.get("method") or "POST"

Expand All @@ -76,22 +77,30 @@ def _prepare_headers(req: CreateScheduleRequest) -> UpstashHeaders:
headers["Upstash-Callback"] = req.get("callback")

if req.get("failure_callback") is not None:
headers["Upstash-Failure-Callback"] = req.get("failureCallback")
headers["Upstash-Failure-Callback"] = req.get("failure_callback")

return headers

@staticmethod
def _validate_schedule_request(req: CreateScheduleRequest) -> None:
if req.get("cron") is None:
raise QstashException("Cron is required")
if req.get("destination") is None:
raise QstashException("Destination is required")

def create(self, req: CreateScheduleRequest) -> CreateScheduleResponse:
"""
Create a new schedule with the specified parameters.
:param req: A dictionary with the details of the schedule to create.
:return: A dictionary containing the 'scheduleId' of the created schedule.
:raises UpstashError: If required headers are missing.
"""
Schedules._validate_schedule_request(req)
headers = Schedules._prepare_headers(req)

return self.http.request(
{
"path": ["v2", "schedules", req.get("destination")],
"path": ["v2", "schedules", req["destination"]],
"body": req.get("body"),
"headers": headers,
"method": "POST",
Expand Down
4 changes: 3 additions & 1 deletion upstash_qstash/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def delete(self, name: str):
)

@staticmethod
def _validate_topic_request(req: dict):
def _validate_topic_request(
req: Union[AddEndpointsRequest, RemoveEndpointsRequest]
):
"""
Ensure that the request contains a valid topic name and valid endpoints
"""
Expand Down
10 changes: 6 additions & 4 deletions upstash_qstash/upstash_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import aiohttp
import asyncio
from typing import Union
from typing import Union, Optional
from urllib.parse import urlencode
from upstash_qstash.qstash_types import UpstashRequest, RetryConfig, UpstashHeaders
from upstash_qstash.error import QstashException, QstashRateLimitException
Expand All @@ -16,7 +16,9 @@


class HttpClient:
def __init__(self, token: str, retry: Union[RetryConfig, bool], base_url: str):
def __init__(
self, token: str, retry: Optional[Union[RetryConfig, bool]], base_url: str
):
"""
Initializes the HttpClient.
Expand Down Expand Up @@ -66,7 +68,7 @@ def request(self, req: UpstashRequest):
for i in range(self.retry["attempts"]):
try:
res = requests.request(
method=req.get("method"),
method=req["method"],
url=url,
headers=headers,
stream=req.get("keepalive", False),
Expand Down Expand Up @@ -113,7 +115,7 @@ async def request_async(self, req: UpstashRequest):
try:
async with aiohttp.ClientSession() as session:
async with session.request(
method=req.get("method"),
method=req["method"],
url=url,
headers=headers,
data=req.get("body"),
Expand Down
Loading

0 comments on commit 761a214

Please sign in to comment.