From f9498b188b94921ad9d2d48920d8204177517814 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 22 Dec 2025 13:18:45 +0300 Subject: [PATCH] allow routers in Timers init, more tests --- redis_timers/timers.py | 22 ++++++++--- tests/test_timers.py | 85 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 98 insertions(+), 9 deletions(-) diff --git a/redis_timers/timers.py b/redis_timers/timers.py index 7ac9476..1d8a34f 100644 --- a/redis_timers/timers.py +++ b/redis_timers/timers.py @@ -1,6 +1,5 @@ import asyncio import contextlib -import dataclasses import datetime import logging import random @@ -26,11 +25,21 @@ TIMEZONE = zoneinfo.ZoneInfo("UTC") -@dataclasses.dataclass(kw_only=True, slots=True, frozen=True) class Timers: - redis_client: "aioredis.Redis[str]" - context: dict[str, typing.Any] = dataclasses.field(default_factory=dict) - handlers_by_topics: dict[str, Handler[typing.Any]] = dataclasses.field(default_factory=dict, init=False) + __slots__ = "context", "handlers_by_topics", "redis_client" + + def __init__( + self, + *, + redis_client: "aioredis.Redis[str]", + context: dict[str, typing.Any], + routers: list[Router] | None = None, + ) -> None: + self.handlers_by_topics: dict[str, Handler[typing.Any]] = {} + self.redis_client = redis_client + self.context = context + if routers: + self.include_routers(*routers) def include_router(self, router: Router) -> None: for h in router.handlers: @@ -83,6 +92,7 @@ async def handle_ready_timers(self) -> None: key=timer_key, ) if await lock.locked(): + logger.debug(f"Timer is locked, {timer_key=}") continue with contextlib.suppress(LockError): @@ -90,7 +100,7 @@ async def handle_ready_timers(self) -> None: await self._handle_one_timer(timer_key) await self._remove_timer_by_key(timer_key) - async def run_forever(self) -> None: + async def run_forever(self) -> None: # pragma: no cover while True: try: await self.handle_ready_timers() diff --git a/tests/test_timers.py b/tests/test_timers.py index 7d37d66..c52cac4 100644 --- a/tests/test_timers.py +++ b/tests/test_timers.py @@ -1,4 +1,5 @@ import datetime +import logging import os import typing from collections.abc import AsyncGenerator @@ -7,7 +8,7 @@ import pytest from redis import asyncio as aioredis -from redis_timers import Timers, settings +from redis_timers import Timers, consume_lock, settings from redis_timers.router import Router @@ -70,8 +71,7 @@ async def another_handler(data: AnotherPayloadModel, context: dict[str, typing.A handler_results.add_result(data) assert context["some_key"] == "some_value" - timers = Timers(redis_client=redis_client, context={"some_key": "some_value"}) - timers.include_router(router1) + timers = Timers(redis_client=redis_client, context={"some_key": "some_value"}, routers=[router1]) timers.include_routers(router2) return timers @@ -104,6 +104,17 @@ async def test_set_and_remove_timer(timers_instance: Timers) -> None: assert not payloads_dict +async def test_set_undefined_timer(timers_instance: Timers) -> None: + payload = SomePayloadModel(message="test", count=1) + with pytest.raises(RuntimeError, match="Handler is not found, topic='wrong_topic'"): + await timers_instance.set_timer( + topic="wrong_topic", + timer_id="test_timer_1", + payload=payload, + activation_period=datetime.timedelta(seconds=1), + ) + + async def test_handle_ready_timers(timers_instance: Timers, handler_results: HandlerResults) -> None: payload = SomePayloadModel(message="ready_timer", count=42) await timers_instance.set_timer( @@ -129,6 +140,74 @@ async def test_handle_ready_timers(timers_instance: Timers, handler_results: Han assert not payloads_dict +async def test_handle_ready_timer_no_handler(timers_instance: Timers, caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.INFO) + payload = SomePayloadModel(message="ready_timer", count=42) + await timers_instance.set_timer( + topic="some_topic", + timer_id="ready_timer_1", + payload=payload, + activation_period=datetime.timedelta(seconds=0), # Ready immediately + ) + del timers_instance.handlers_by_topics["some_topic"] + await timers_instance.handle_ready_timers() + + assert len(caplog.records) == 1 + assert "Handler is not found" in caplog.records[0].message + + +async def test_handle_ready_timer_no_payload(timers_instance: Timers, caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.INFO) + payload = SomePayloadModel(message="ready_timer", count=42) + await timers_instance.set_timer( + topic="some_topic", + timer_id="ready_timer_1", + payload=payload, + activation_period=datetime.timedelta(seconds=0), # Ready immediately + ) + await timers_instance.redis_client.hdel(settings.TIMERS_PAYLOADS_KEY, "some_topic--ready_timer_1") + await timers_instance.handle_ready_timers() + + assert len(caplog.records) == 1 + assert "No payload found" in caplog.records[0].message + + +async def test_handle_ready_timer_invalid_payload(timers_instance: Timers, caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.INFO) + payload = SomePayloadModel(message="ready_timer", count=42) + await timers_instance.set_timer( + topic="some_topic", + timer_id="ready_timer_1", + payload=payload, + activation_period=datetime.timedelta(seconds=0), # Ready immediately + ) + await timers_instance.redis_client.hset(settings.TIMERS_PAYLOADS_KEY, "some_topic--ready_timer_1", "{}") + await timers_instance.handle_ready_timers() + + assert len(caplog.records) == 1 + assert "Failed to parse payload" in caplog.records[0].message + + +async def test_handle_ready_timer_locked(timers_instance: Timers, caplog: pytest.LogCaptureFixture) -> None: + caplog.set_level(logging.DEBUG) + payload = SomePayloadModel(message="ready_timer", count=42) + await timers_instance.set_timer( + topic="some_topic", + timer_id="ready_timer_1", + payload=payload, + activation_period=datetime.timedelta(seconds=0), # Ready immediately + ) + lock = consume_lock( + redis_client=timers_instance.redis_client, + key="some_topic--ready_timer_1", + ) + async with lock: + await timers_instance.handle_ready_timers() + + assert len(caplog.records) == 1 + assert "Timer is locked" in caplog.records[0].message + + async def test_handle_multiple_ready_timers(timers_instance: Timers, handler_results: HandlerResults) -> None: payload1 = SomePayloadModel(message="timer_1", count=1) payload2 = AnotherPayloadModel(value="timer_2")