Skip to content

ListShceduleSource now always try to fetch tasks from the past. #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion taskiq_redis/list_schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ def __init__(
skip_past_schedules: bool = False,
**connection_kwargs: Any,
) -> None:
"""
Create a new schedule source.

:param url: Redis URL
:param prefix: Prefix for all the keys
:param max_connection_pool_size: Maximum size of the connection pool
:param serializer: Serializer to use for the schedules
:param buffer_size: Buffer size for getting schedules
:param skip_past_schedules: Skip schedules that are in the past.
:param connection_kwargs: Additional connection kwargs
"""
super().__init__()
self._prefix = prefix
self._buffer_size = buffer_size
Expand Down Expand Up @@ -179,7 +190,7 @@ async def get_schedules(self) -> List["ScheduledTask"]:
current_time = datetime.datetime.now(datetime.timezone.utc)
timed: list[bytes] = []
# Only during first run, we need to get previous time schedules
if self._is_first_run and not self._skip_past_schedules:
if not self._skip_past_schedules:
timed = await self._get_previous_time_schedules()
self._is_first_run = False
async with Redis(connection_pool=self._connection_pool) as redis:
Expand Down
7 changes: 7 additions & 0 deletions taskiq_redis/schedule_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import warnings
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional, Tuple

Expand Down Expand Up @@ -53,6 +54,12 @@ def __init__(
serializer: Optional[TaskiqSerializer] = None,
**connection_kwargs: Any,
) -> None:
warnings.warn(
"RedisScheduleSource is deprecated. "
"Please switch to ListRedisScheduleSource",
DeprecationWarning,
stacklevel=2,
)
self.prefix = prefix
self.connection_pool: _BlockingConnectionPool = BlockingConnectionPool.from_url(
url=url,
Expand Down
10 changes: 6 additions & 4 deletions tests/test_list_schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ async def test_schedule_from_past(redis_url: str) -> None:
await source.add_schedule(schedule)
# When running for the first time, the scheduler will get all the
# schedules that are in the past.
scehdules = await source.get_schedules()
assert scehdules == [schedule]
schedules = await source.get_schedules()
assert schedules == [schedule]
for schedule in schedules:
await source.post_send(schedule)
# After getting the schedules for the second time,
# all the schedules in the past are ignored.
scehdules = await source.get_schedules()
assert scehdules == []
schedules = await source.get_schedules()
assert schedules == []


@pytest.mark.anyio
Expand Down