Skip to content

fix: worker hangs sometimes #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import signal
from contextlib import suppress
from dataclasses import dataclass
from multiprocessing import Process, Queue
from multiprocessing import Event, Process, Queue, current_process
from multiprocessing.synchronize import Event as EventType
from time import sleep
from typing import Any, Callable, List, Optional

Expand Down Expand Up @@ -54,7 +56,7 @@ def handle(
self,
workers: List[Process],
args: WorkerArgs,
worker_func: Callable[[WorkerArgs], None],
worker_func: Callable[[WorkerArgs, EventType], None],
) -> None:
"""
This action reloads a single process.
Expand All @@ -73,22 +75,31 @@ def handle(
logger.debug(f"Process {worker.name} is already terminated.")
# Waiting worker shutdown.
worker.join()
event: EventType = Event()
new_process = Process(
target=worker_func,
kwargs={"args": args},
kwargs={"args": args, "event": event},
name=f"worker-{self.worker_num}",
daemon=True,
)
new_process.start()
logger.info(f"Process {new_process.name} restarted with pid {new_process.pid}")
workers[self.worker_num] = new_process
_wait_for_worker_startup(new_process, event)


@dataclass
class ShutdownAction(ProcessActionBase):
"""This action shuts down process manager loop."""


def _wait_for_worker_startup(process: Process, event: EventType) -> None:
while process.is_alive():
with suppress(TimeoutError):
event.wait(0.1)
return


def schedule_workers_reload(
action_queue: "Queue[ProcessActionBase]",
) -> None:
Expand Down Expand Up @@ -118,6 +129,9 @@ def get_signal_handler(
"""

def _signal_handler(signum: int, _frame: Any) -> None:
if current_process().name.startswith("worker"):
raise KeyboardInterrupt

logger.debug(f"Got signal {signum}.")
action_queue.put(ShutdownAction())
logger.warn("Workers are scheduled for shutdown.")
Expand All @@ -137,8 +151,8 @@ class ProcessManager:
def __init__(
self,
args: WorkerArgs,
worker_function: Callable[[WorkerArgs], None],
observer: Optional[Observer] = None,
worker_function: Callable[[WorkerArgs, EventType], None],
observer: Optional[Observer] = None, # type: ignore[valid-type]
) -> None:
self.worker_function = worker_function
self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)
Expand All @@ -162,10 +176,12 @@ def __init__(

def prepare_workers(self) -> None:
"""Spawn multiple processes."""
events: List[EventType] = []
for process in range(self.args.workers):
event = Event()
work_proc = Process(
target=self.worker_function,
kwargs={"args": self.args},
kwargs={"args": self.args, "event": event},
name=f"worker-{process}",
daemon=True,
)
Expand All @@ -176,6 +192,11 @@ def prepare_workers(self) -> None:
work_proc.pid,
)
self.workers.append(work_proc)
events.append(event)

# Wait for workers startup
for worker, event in zip(self.workers, events):
_wait_for_worker_startup(worker, event)

def start(self) -> None: # noqa: C901, WPS213
"""
Expand Down
39 changes: 22 additions & 17 deletions taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import signal
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.synchronize import Event
from typing import Any, Type

from taskiq.abc.broker import AsyncBroker
Expand Down Expand Up @@ -65,7 +66,7 @@ def get_receiver_type(args: WorkerArgs) -> Type[Receiver]:
return receiver_type


def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213
def start_listen(args: WorkerArgs, event: Event) -> None: # noqa: WPS210, WPS213
"""
This function starts actual listening process.

Expand All @@ -76,25 +77,10 @@ def start_listen(args: WorkerArgs) -> None: # noqa: WPS210, WPS213
field.

:param args: CLI arguments.
:param event: Event for notification.
:raises ValueError: if broker is not an AsyncBroker instance.
:raises ValueError: if receiver is not a Receiver type.
"""
if uvloop is not None:
logger.debug("UVLOOP found. Installing policy.")
uvloop.install()
# This option signals that current
# broker is running as a worker.
# We must set this field before importing tasks,
# so broker will remember all tasks it's related to.
AsyncBroker.is_worker_process = True
broker = import_object(args.broker)
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
if not isinstance(broker, AsyncBroker):
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")

receiver_type = get_receiver_type(args)
receiver_args = dict(args.receiver_arg)

# Here how we manage interruptions.
# We have to remember shutting_down state,
# because KeyboardInterrupt can be send multiple
Expand Down Expand Up @@ -122,6 +108,25 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
signal.signal(signal.SIGINT, interrupt_handler)
signal.signal(signal.SIGTERM, interrupt_handler)

# Notify parent process, worker is ready
event.set()

if uvloop is not None:
logger.debug("UVLOOP found. Installing policy.")
uvloop.install()
# This option signals that current
# broker is running as a worker.
# We must set this field before importing tasks,
# so broker will remember all tasks it's related to.
AsyncBroker.is_worker_process = True
broker = import_object(args.broker)
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
if not isinstance(broker, AsyncBroker):
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")

receiver_type = get_receiver_type(args)
receiver_args = dict(args.receiver_arg)

loop = asyncio.get_event_loop()

try:
Expand Down