-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathtest_list_schedule_source.py
140 lines (123 loc) · 4.35 KB
/
test_list_schedule_source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import datetime
import uuid
import pytest
from freezegun import freeze_time
from taskiq import ScheduledTask
from taskiq_redis.list_schedule_source import ListRedisScheduleSource
from taskiq_redis.schedule_source import RedisScheduleSource
@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_schedule_cron(redis_url: str) -> None:
"""Test adding a cron schedule."""
prefix = uuid.uuid4().hex
source = ListRedisScheduleSource(redis_url, prefix=prefix)
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
cron="* * * * *",
)
await source.add_schedule(schedule)
scehdules = await source.get_schedules()
assert scehdules == [schedule]
@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_schedule_from_past(redis_url: str) -> None:
"""Test adding a cron schedule."""
prefix = uuid.uuid4().hex
source = ListRedisScheduleSource(redis_url, prefix=prefix)
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
time=datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(minutes=4),
)
await source.add_schedule(schedule)
# When running for the first time, the scheduler will get all the
# schedules that are in the past.
schedules = await source.get_schedules()
assert schedules == [schedule]
for schedule in schedules:
await source.post_send(schedule)
# After getting the schedules for the second time,
# all the schedules in the past are ignored.
schedules = await source.get_schedules()
assert schedules == []
@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_schedule_removal(redis_url: str) -> None:
"""Test adding a cron schedule."""
prefix = uuid.uuid4().hex
source = ListRedisScheduleSource(redis_url, prefix=prefix)
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
time=datetime.datetime.now(datetime.timezone.utc)
+ datetime.timedelta(minutes=4),
)
await source.add_schedule(schedule)
# When running for the first time, the scheduler will get all the
# schedules that are in the past.
scehdules = await source.get_schedules()
assert scehdules == []
# Assert that we will get the schedule after the time has passed.
with freeze_time("2025-01-01 00:04:00"):
scehdules = await source.get_schedules()
assert scehdules == [schedule]
@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_deletion(redis_url: str) -> None:
"""Test adding a cron schedule."""
prefix = uuid.uuid4().hex
source = ListRedisScheduleSource(redis_url, prefix=prefix)
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
time=datetime.datetime.now(datetime.timezone.utc),
)
await source.add_schedule(schedule)
# When running for the first time, the scheduler will get all the
# schedules that are in the past.
scehdules = await source.get_schedules()
assert scehdules == [schedule]
await source.delete_schedule(schedule.schedule_id)
scehdules = await source.get_schedules()
assert scehdules == []
@pytest.mark.anyio
@freeze_time("2025-01-01 00:00:00")
async def test_migration(redis_url: str) -> None:
"""Test adding a cron schedule."""
new_prefix = uuid.uuid4().hex
old_prefix = uuid.uuid4().hex
old_source = RedisScheduleSource(redis_url, prefix=old_prefix)
for i in range(30):
schedule = ScheduledTask(
task_name="test_task",
labels={},
args=[],
kwargs={},
time=datetime.datetime.now(datetime.timezone.utc)
+ datetime.timedelta(minutes=i),
)
await old_source.add_schedule(schedule)
old_schedules = await old_source.get_schedules()
source = ListRedisScheduleSource(
redis_url,
prefix=new_prefix,
skip_past_schedules=True,
).with_migrate_from(
old_source,
delete_schedules=True,
)
await source.startup()
assert await old_source.get_schedules() == []
for old_schedule in old_schedules:
with freeze_time(old_schedule.time):
assert await source.get_schedules() == [old_schedule]