Skip to content

Commit 1adb5a5

Browse files
committed
Add max_attempts_at_message
1 parent 49c0408 commit 1adb5a5

File tree

12 files changed

+185
-10
lines changed

12 files changed

+185
-10
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ ignore = [
159159
"ANN401", # typing.Any are disallowed in `**kwargs
160160
"PLR0913", # Too many arguments for function call
161161
"D106", # Missing docstring in public nested class
162+
"D205", # 1 blank line required between summary line and description
162163
]
163164
exclude = [".venv/"]
164165
mccabe = { max-complexity = 10 }

taskiq/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Distributed task manager."""
2+
23
from importlib.metadata import version
34

45
from taskiq_dependencies import Depends as TaskiqDepends
@@ -8,7 +9,7 @@
89
from taskiq.abc.middleware import TaskiqMiddleware
910
from taskiq.abc.result_backend import AsyncResultBackend
1011
from taskiq.abc.schedule_source import ScheduleSource
11-
from taskiq.acks import AckableMessage
12+
from taskiq.acks import AckableMessage, AckableMessageWithDeliveryCount
1213
from taskiq.brokers.inmemory_broker import InMemoryBroker
1314
from taskiq.brokers.shared_broker import async_shared_broker
1415
from taskiq.brokers.zmq_broker import ZeroMQBroker
@@ -24,7 +25,7 @@
2425
TaskiqResultTimeoutError,
2526
)
2627
from taskiq.funcs import gather
27-
from taskiq.message import BrokerMessage, TaskiqMessage
28+
from taskiq.message import BrokerMessage, DeliveryCountMessage, TaskiqMessage
2829
from taskiq.middlewares.prometheus_middleware import PrometheusMiddleware
2930
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
3031
from taskiq.result import TaskiqResult
@@ -53,6 +54,8 @@
5354
"NoResultError",
5455
"SendTaskError",
5556
"AckableMessage",
57+
"DeliveryCountMessage",
58+
"AckableMessageWithDeliveryCount",
5659
"InMemoryBroker",
5760
"ScheduleSource",
5861
"TaskiqScheduler",

taskiq/abc/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Abstract classes for taskiq."""
2+
23
from taskiq.abc.broker import AsyncBroker
34
from taskiq.abc.result_backend import AsyncResultBackend
45

taskiq/abc/broker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def __init__(
7777
self,
7878
result_backend: "Optional[AsyncResultBackend[_T]]" = None,
7979
task_id_generator: Optional[Callable[[], str]] = None,
80+
max_attempts_at_message: Optional[int] = None,
8081
) -> None:
8182
if result_backend is None:
8283
result_backend = DummyResultBackend()
@@ -113,6 +114,7 @@ def __init__(
113114
self.state = TaskiqState()
114115
self.custom_dependency_context: Dict[Any, Any] = {}
115116
self.dependency_overrides: Dict[Any, Any] = {}
117+
self.max_attempts_at_message = max_attempts_at_message
116118
# True only if broker runs in worker process.
117119
self.is_worker_process: bool = False
118120
# True only if broker runs in scheduler process.

taskiq/acks.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import enum
22
from typing import Awaitable, Callable, Union
33

4-
from pydantic import BaseModel
4+
from taskiq.message import DeliveryCountMessage, WrappedMessage
55

66

77
@enum.unique
@@ -20,7 +20,7 @@ class AcknowledgeType(str, enum.Enum):
2020
WHEN_SAVED = "when_saved"
2121

2222

23-
class AckableMessage(BaseModel):
23+
class AckableMessage(WrappedMessage):
2424
"""
2525
Message that can be acknowledged.
2626
@@ -33,5 +33,8 @@ class AckableMessage(BaseModel):
3333
as a whole.
3434
"""
3535

36-
data: bytes
3736
ack: Callable[[], Union[None, Awaitable[None]]]
37+
38+
39+
class AckableMessageWithDeliveryCount(AckableMessage, DeliveryCountMessage):
40+
"""Message that can be acknowledged and has a delivery count."""

taskiq/cli/worker/run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
143143
ack_type=args.ack_type,
144144
max_tasks_to_execute=args.max_tasks_per_child,
145145
wait_tasks_timeout=args.wait_tasks_timeout,
146+
max_attempts_at_message=broker.max_attempts_at_message,
146147
**receiver_kwargs, # type: ignore
147148
)
148149
loop.run_until_complete(receiver.listen())

taskiq/message.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,15 @@ class BrokerMessage(BaseModel):
4242
task_name: str
4343
message: bytes
4444
labels: Dict[str, Any]
45+
46+
47+
class WrappedMessage(BaseModel):
48+
"""Abstraction for an incoming message in a wrapper."""
49+
50+
data: bytes
51+
52+
53+
class DeliveryCountMessage(WrappedMessage):
54+
"""Message with a present delivery count."""
55+
56+
delivery_count: Optional[int] = None

taskiq/receiver/receiver.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from taskiq.acks import AcknowledgeType
1414
from taskiq.context import Context
1515
from taskiq.exceptions import NoResultError
16-
from taskiq.message import TaskiqMessage
16+
from taskiq.message import DeliveryCountMessage, TaskiqMessage, WrappedMessage
1717
from taskiq.receiver.params_parser import parse_params
1818
from taskiq.result import TaskiqResult
1919
from taskiq.state import TaskiqState
@@ -58,6 +58,7 @@ def __init__(
5858
on_exit: Optional[Callable[["Receiver"], None]] = None,
5959
max_tasks_to_execute: Optional[int] = None,
6060
wait_tasks_timeout: Optional[float] = None,
61+
max_attempts_at_message: Optional[int] = None,
6162
) -> None:
6263
self.broker = broker
6364
self.executor = executor
@@ -72,6 +73,7 @@ def __init__(
7273
self.known_tasks: Set[str] = set()
7374
self.max_tasks_to_execute = max_tasks_to_execute
7475
self.wait_tasks_timeout = wait_tasks_timeout
76+
self.max_attempts_at_message = max_attempts_at_message
7577
for task in self.broker.get_all_tasks().values():
7678
self._prepare_task(task.task_name, task.original_func)
7779
self.sem: "Optional[asyncio.Semaphore]" = None
@@ -86,7 +88,7 @@ def __init__(
8688

8789
async def callback( # noqa: C901, PLR0912
8890
self,
89-
message: Union[bytes, AckableMessage],
91+
message: Union[bytes, WrappedMessage],
9092
raise_err: bool = False,
9193
) -> None:
9294
"""
@@ -101,7 +103,31 @@ async def callback( # noqa: C901, PLR0912
101103
:param raise_err: raise an error if cannot save result in
102104
result_backend.
103105
"""
104-
message_data = message.data if isinstance(message, AckableMessage) else message
106+
message_data = message.data if isinstance(message, WrappedMessage) else message
107+
108+
delivery_count = (
109+
message.delivery_count
110+
if isinstance(message, DeliveryCountMessage)
111+
else None
112+
)
113+
if (
114+
delivery_count
115+
and self.max_attempts_at_message
116+
and delivery_count >= self.max_attempts_at_message
117+
):
118+
logger.error(
119+
"Permitted number of attempts at processing message %s "
120+
"has been exhausted after %s attempts.",
121+
message_data,
122+
self.max_attempts_at_message,
123+
)
124+
if isinstance(
125+
message,
126+
AckableMessage,
127+
):
128+
await maybe_awaitable(message.ack())
129+
return
130+
105131
try:
106132
taskiq_msg = self.broker.formatter.loads(message=message_data)
107133
taskiq_msg.parse_labels()

taskiq/schedule_sources/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Package for schedule sources."""
2+
23
from taskiq.schedule_sources.label_based import LabelScheduleSource
34

45
__all__ = [

taskiq/scheduler/created_schedule.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ async def kiq(
3232
...
3333

3434
@overload
35-
async def kiq(self: "CreatedSchedule[_ReturnType]") -> AsyncTaskiqTask[_ReturnType]:
35+
async def kiq(
36+
self: "CreatedSchedule[_ReturnType]",
37+
) -> AsyncTaskiqTask[_ReturnType]:
3638
...
3739

3840
async def kiq(self) -> Any:

0 commit comments

Comments
 (0)