Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import signal
from dataclasses import dataclass
from multiprocessing import Process, Queue
from multiprocessing import Process, Queue, current_process
from time import sleep
from typing import Any, Callable, List, Optional

Expand Down Expand Up @@ -82,6 +82,7 @@ def handle(
new_process.start()
logger.info(f"Process {new_process.name} restarted with pid {new_process.pid}")
workers[self.worker_num] = new_process
sleep(0.1)


@dataclass
Expand Down Expand Up @@ -118,6 +119,9 @@ def get_signal_handler(
"""

def _signal_handler(signum: int, _frame: Any) -> None:
if current_process().name.startswith("worker"):
raise KeyboardInterrupt

logger.debug(f"Got signal {signum}.")
action_queue.put(ShutdownAction())
logger.warn("Workers are scheduled for shutdown.")
Expand Down
32 changes: 16 additions & 16 deletions taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,6 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213
:raises ValueError: if broker is not an AsyncBroker instance.
:raises ValueError: if receiver is not a Receiver type.
"""
if uvloop is not None:
logger.debug("UVLOOP found. Installing policy.")
uvloop.install()
# This option signals that current
# broker is running as a worker.
# We must set this field before importing tasks,
# so broker will remember all tasks it's related to.
AsyncBroker.is_worker_process = True
broker = import_object(args.broker)
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
if not isinstance(broker, AsyncBroker):
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")

receiver_type = get_receiver_type(args)
receiver_args = dict(args.receiver_arg)

# Here how we manage interruptions.
# We have to remember shutting_down state,
# because KeyboardInterrupt can be send multiple
Expand Down Expand Up @@ -122,6 +106,22 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
signal.signal(signal.SIGINT, interrupt_handler)
signal.signal(signal.SIGTERM, interrupt_handler)

if uvloop is not None:
logger.debug("UVLOOP found. Installing policy.")
uvloop.install()
# This option signals that current
# broker is running as a worker.
# We must set this field before importing tasks,
# so broker will remember all tasks it's related to.
AsyncBroker.is_worker_process = True
broker = import_object(args.broker)
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
if not isinstance(broker, AsyncBroker):
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")

receiver_type = get_receiver_type(args)
receiver_args = dict(args.receiver_arg)

loop = asyncio.get_event_loop()

try:
Expand Down