Skip to content

Commit d8699d5

Browse files
Sobes76rusAnton
and
Anton
authored
fix: worker hangs sometimes (#162)
Co-authored-by: Anton <[email protected]>
1 parent f323fcd commit d8699d5

File tree

2 files changed

+49
-23
lines changed

2 files changed

+49
-23
lines changed

taskiq/cli/worker/process_manager.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
22
import signal
3+
from contextlib import suppress
34
from dataclasses import dataclass
4-
from multiprocessing import Process, Queue
5+
from multiprocessing import Event, Process, Queue, current_process
6+
from multiprocessing.synchronize import Event as EventType
57
from time import sleep
68
from typing import Any, Callable, List, Optional
79

@@ -54,7 +56,7 @@ def handle(
5456
self,
5557
workers: List[Process],
5658
args: WorkerArgs,
57-
worker_func: Callable[[WorkerArgs], None],
59+
worker_func: Callable[[WorkerArgs, EventType], None],
5860
) -> None:
5961
"""
6062
This action reloads a single process.
@@ -73,22 +75,31 @@ def handle(
7375
logger.debug(f"Process {worker.name} is already terminated.")
7476
# Waiting worker shutdown.
7577
worker.join()
78+
event: EventType = Event()
7679
new_process = Process(
7780
target=worker_func,
78-
kwargs={"args": args},
81+
kwargs={"args": args, "event": event},
7982
name=f"worker-{self.worker_num}",
8083
daemon=True,
8184
)
8285
new_process.start()
8386
logger.info(f"Process {new_process.name} restarted with pid {new_process.pid}")
8487
workers[self.worker_num] = new_process
88+
_wait_for_worker_startup(new_process, event)
8589

8690

8791
@dataclass
8892
class ShutdownAction(ProcessActionBase):
8993
"""This action shuts down process manager loop."""
9094

9195

96+
def _wait_for_worker_startup(process: Process, event: EventType) -> None:
97+
while process.is_alive():
98+
with suppress(TimeoutError):
99+
event.wait(0.1)
100+
return
101+
102+
92103
def schedule_workers_reload(
93104
action_queue: "Queue[ProcessActionBase]",
94105
) -> None:
@@ -118,6 +129,9 @@ def get_signal_handler(
118129
"""
119130

120131
def _signal_handler(signum: int, _frame: Any) -> None:
132+
if current_process().name.startswith("worker"):
133+
raise KeyboardInterrupt
134+
121135
logger.debug(f"Got signal {signum}.")
122136
action_queue.put(ShutdownAction())
123137
logger.warn("Workers are scheduled for shutdown.")
@@ -137,8 +151,8 @@ class ProcessManager:
137151
def __init__(
138152
self,
139153
args: WorkerArgs,
140-
worker_function: Callable[[WorkerArgs], None],
141-
observer: Optional[Observer] = None,
154+
worker_function: Callable[[WorkerArgs, EventType], None],
155+
observer: Optional[Observer] = None, # type: ignore[valid-type]
142156
) -> None:
143157
self.worker_function = worker_function
144158
self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)
@@ -162,10 +176,12 @@ def __init__(
162176

163177
def prepare_workers(self) -> None:
164178
"""Spawn multiple processes."""
179+
events: List[EventType] = []
165180
for process in range(self.args.workers):
181+
event = Event()
166182
work_proc = Process(
167183
target=self.worker_function,
168-
kwargs={"args": self.args},
184+
kwargs={"args": self.args, "event": event},
169185
name=f"worker-{process}",
170186
daemon=True,
171187
)
@@ -176,6 +192,11 @@ def prepare_workers(self) -> None:
176192
work_proc.pid,
177193
)
178194
self.workers.append(work_proc)
195+
events.append(event)
196+
197+
# Wait for workers startup
198+
for worker, event in zip(self.workers, events):
199+
_wait_for_worker_startup(worker, event)
179200

180201
def start(self) -> None: # noqa: C901, WPS213
181202
"""

taskiq/cli/worker/run.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import signal
44
from concurrent.futures import ThreadPoolExecutor
5+
from multiprocessing.synchronize import Event
56
from typing import Any, Type
67

78
from taskiq.abc.broker import AsyncBroker
@@ -65,7 +66,7 @@ def get_receiver_type(args: WorkerArgs) -> Type[Receiver]:
6566
return receiver_type
6667

6768

68-
def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213
69+
def start_listen(args: WorkerArgs, event: Event) -> None: # noqa: WPS210, WPS213
6970
"""
7071
This function starts actual listening process.
7172
@@ -76,25 +77,10 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213
7677
field.
7778
7879
:param args: CLI arguments.
80+
:param event: Event for notification.
7981
:raises ValueError: if broker is not an AsyncBroker instance.
8082
:raises ValueError: if receiver is not a Receiver type.
8183
"""
82-
if uvloop is not None:
83-
logger.debug("UVLOOP found. Installing policy.")
84-
uvloop.install()
85-
# This option signals that current
86-
# broker is running as a worker.
87-
# We must set this field before importing tasks,
88-
# so broker will remember all tasks it's related to.
89-
AsyncBroker.is_worker_process = True
90-
broker = import_object(args.broker)
91-
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
92-
if not isinstance(broker, AsyncBroker):
93-
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")
94-
95-
receiver_type = get_receiver_type(args)
96-
receiver_args = dict(args.receiver_arg)
97-
9884
# Here how we manage interruptions.
9985
# We have to remember shutting_down state,
10086
# because KeyboardInterrupt can be send multiple
@@ -122,6 +108,25 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
122108
signal.signal(signal.SIGINT, interrupt_handler)
123109
signal.signal(signal.SIGTERM, interrupt_handler)
124110

111+
# Notify parent process, worker is ready
112+
event.set()
113+
114+
if uvloop is not None:
115+
logger.debug("UVLOOP found. Installing policy.")
116+
uvloop.install()
117+
# This option signals that current
118+
# broker is running as a worker.
119+
# We must set this field before importing tasks,
120+
# so broker will remember all tasks it's related to.
121+
AsyncBroker.is_worker_process = True
122+
broker = import_object(args.broker)
123+
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
124+
if not isinstance(broker, AsyncBroker):
125+
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")
126+
127+
receiver_type = get_receiver_type(args)
128+
receiver_args = dict(args.receiver_arg)
129+
125130
loop = asyncio.get_event_loop()
126131

127132
try:

0 commit comments

Comments
 (0)