From 3ac3fcef337b8654283305a24851c971feddf889 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Sat, 19 Apr 2025 01:36:07 +0200 Subject: [PATCH] ListShceduleSource now always try to fetch tasks from the past. --- taskiq_redis/list_schedule_source.py | 13 ++++++++++++- taskiq_redis/schedule_source.py | 7 +++++++ tests/test_list_schedule_source.py | 10 ++++++---- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/taskiq_redis/list_schedule_source.py b/taskiq_redis/list_schedule_source.py index 1350f6e..97d5169 100644 --- a/taskiq_redis/list_schedule_source.py +++ b/taskiq_redis/list_schedule_source.py @@ -25,6 +25,17 @@ def __init__( skip_past_schedules: bool = False, **connection_kwargs: Any, ) -> None: + """ + Create a new schedule source. + + :param url: Redis URL + :param prefix: Prefix for all the keys + :param max_connection_pool_size: Maximum size of the connection pool + :param serializer: Serializer to use for the schedules + :param buffer_size: Buffer size for getting schedules + :param skip_past_schedules: Skip schedules that are in the past. + :param connection_kwargs: Additional connection kwargs + """ super().__init__() self._prefix = prefix self._buffer_size = buffer_size @@ -179,7 +190,7 @@ async def get_schedules(self) -> List["ScheduledTask"]: current_time = datetime.datetime.now(datetime.timezone.utc) timed: list[bytes] = [] # Only during first run, we need to get previous time schedules - if self._is_first_run and not self._skip_past_schedules: + if not self._skip_past_schedules: timed = await self._get_previous_time_schedules() self._is_first_run = False async with Redis(connection_pool=self._connection_pool) as redis: diff --git a/taskiq_redis/schedule_source.py b/taskiq_redis/schedule_source.py index 1527ed1..2779cd7 100644 --- a/taskiq_redis/schedule_source.py +++ b/taskiq_redis/schedule_source.py @@ -1,4 +1,5 @@ import sys +import warnings from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional, Tuple @@ -53,6 +54,12 @@ def __init__( serializer: Optional[TaskiqSerializer] = None, **connection_kwargs: Any, ) -> None: + warnings.warn( + "RedisScheduleSource is deprecated. " + "Please switch to ListRedisScheduleSource", + DeprecationWarning, + stacklevel=2, + ) self.prefix = prefix self.connection_pool: _BlockingConnectionPool = BlockingConnectionPool.from_url( url=url, diff --git a/tests/test_list_schedule_source.py b/tests/test_list_schedule_source.py index bb5bdd5..1038fed 100644 --- a/tests/test_list_schedule_source.py +++ b/tests/test_list_schedule_source.py @@ -44,12 +44,14 @@ async def test_schedule_from_past(redis_url: str) -> None: await source.add_schedule(schedule) # When running for the first time, the scheduler will get all the # schedules that are in the past. - scehdules = await source.get_schedules() - assert scehdules == [schedule] + schedules = await source.get_schedules() + assert schedules == [schedule] + for schedule in schedules: + await source.post_send(schedule) # After getting the schedules for the second time, # all the schedules in the past are ignored. - scehdules = await source.get_schedules() - assert scehdules == [] + schedules = await source.get_schedules() + assert schedules == [] @pytest.mark.anyio