diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index 557a2857..dab6327f 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -1,7 +1,9 @@ import logging import signal +from contextlib import suppress from dataclasses import dataclass -from multiprocessing import Process, Queue +from multiprocessing import Event, Process, Queue, current_process +from multiprocessing.synchronize import Event as EventType from time import sleep from typing import Any, Callable, List, Optional @@ -54,7 +56,7 @@ def handle( self, workers: List[Process], args: WorkerArgs, - worker_func: Callable[[WorkerArgs], None], + worker_func: Callable[[WorkerArgs, EventType], None], ) -> None: """ This action reloads a single process. @@ -73,15 +75,17 @@ def handle( logger.debug(f"Process {worker.name} is already terminated.") # Waiting worker shutdown. worker.join() + event: EventType = Event() new_process = Process( target=worker_func, - kwargs={"args": args}, + kwargs={"args": args, "event": event}, name=f"worker-{self.worker_num}", daemon=True, ) new_process.start() logger.info(f"Process {new_process.name} restarted with pid {new_process.pid}") workers[self.worker_num] = new_process + _wait_for_worker_startup(new_process, event) @dataclass @@ -89,6 +93,13 @@ class ShutdownAction(ProcessActionBase): """This action shuts down process manager loop.""" +def _wait_for_worker_startup(process: Process, event: EventType) -> None: + while process.is_alive(): + with suppress(TimeoutError): + event.wait(0.1) + return + + def schedule_workers_reload( action_queue: "Queue[ProcessActionBase]", ) -> None: @@ -118,6 +129,9 @@ def get_signal_handler( """ def _signal_handler(signum: int, _frame: Any) -> None: + if current_process().name.startswith("worker"): + raise KeyboardInterrupt + logger.debug(f"Got signal {signum}.") action_queue.put(ShutdownAction()) logger.warn("Workers are scheduled for shutdown.") @@ -137,8 +151,8 @@ class ProcessManager: def __init__( self, args: WorkerArgs, - worker_function: Callable[[WorkerArgs], None], - observer: Optional[Observer] = None, + worker_function: Callable[[WorkerArgs, EventType], None], + observer: Optional[Observer] = None, # type: ignore[valid-type] ) -> None: self.worker_function = worker_function self.action_queue: "Queue[ProcessActionBase]" = Queue(-1) @@ -162,10 +176,12 @@ def __init__( def prepare_workers(self) -> None: """Spawn multiple processes.""" + events: List[EventType] = [] for process in range(self.args.workers): + event = Event() work_proc = Process( target=self.worker_function, - kwargs={"args": self.args}, + kwargs={"args": self.args, "event": event}, name=f"worker-{process}", daemon=True, ) @@ -176,6 +192,11 @@ def prepare_workers(self) -> None: work_proc.pid, ) self.workers.append(work_proc) + events.append(event) + + # Wait for workers startup + for worker, event in zip(self.workers, events): + _wait_for_worker_startup(worker, event) def start(self) -> None: # noqa: C901, WPS213 """ diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index 466886fc..93582686 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -2,6 +2,7 @@ import logging import signal from concurrent.futures import ThreadPoolExecutor +from multiprocessing.synchronize import Event from typing import Any, Type from taskiq.abc.broker import AsyncBroker @@ -65,7 +66,7 @@ def get_receiver_type(args: WorkerArgs) -> Type[Receiver]: return receiver_type -def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213 +def start_listen(args: WorkerArgs, event: Event) -> None: # noqa: WPS210, WPS213 """ This function starts actual listening process. @@ -76,25 +77,10 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213 field. :param args: CLI arguments. + :param event: Event for notification. :raises ValueError: if broker is not an AsyncBroker instance. :raises ValueError: if receiver is not a Receiver type. """ - if uvloop is not None: - logger.debug("UVLOOP found. Installing policy.") - uvloop.install() - # This option signals that current - # broker is running as a worker. - # We must set this field before importing tasks, - # so broker will remember all tasks it's related to. - AsyncBroker.is_worker_process = True - broker = import_object(args.broker) - import_tasks(args.modules, args.tasks_pattern, args.fs_discover) - if not isinstance(broker, AsyncBroker): - raise ValueError("Unknown broker type. Please use AsyncBroker instance.") - - receiver_type = get_receiver_type(args) - receiver_args = dict(args.receiver_arg) - # Here how we manage interruptions. # We have to remember shutting_down state, # because KeyboardInterrupt can be send multiple @@ -122,6 +108,25 @@ def interrupt_handler(signum: int, _frame: Any) -> None: signal.signal(signal.SIGINT, interrupt_handler) signal.signal(signal.SIGTERM, interrupt_handler) + # Notify parent process, worker is ready + event.set() + + if uvloop is not None: + logger.debug("UVLOOP found. Installing policy.") + uvloop.install() + # This option signals that current + # broker is running as a worker. + # We must set this field before importing tasks, + # so broker will remember all tasks it's related to. + AsyncBroker.is_worker_process = True + broker = import_object(args.broker) + import_tasks(args.modules, args.tasks_pattern, args.fs_discover) + if not isinstance(broker, AsyncBroker): + raise ValueError("Unknown broker type. Please use AsyncBroker instance.") + + receiver_type = get_receiver_type(args) + receiver_args = dict(args.receiver_arg) + loop = asyncio.get_event_loop() try: