diff --git a/tmt/base.py b/tmt/base.py index 768c3887de..e152be324c 100644 --- a/tmt/base.py +++ b/tmt/base.py @@ -4,6 +4,7 @@ import collections import copy +import dataclasses import enum import functools import itertools @@ -46,6 +47,7 @@ import tmt.log import tmt.plugins import tmt.plugins.plan_shapers +import tmt.queue import tmt.result import tmt.steps import tmt.steps.discover @@ -68,6 +70,7 @@ field, ) from tmt.lint import LinterOutcome, LinterReturn +from tmt.queue import Queue from tmt.result import Result, ResultInterpret from tmt.utils import ( Command, @@ -1872,6 +1875,9 @@ class Plan( 'gate', ] + def step_logger(self, step_name: str) -> tmt.log.Logger: + return self._logger.descend(logger_name=step_name) + def __init__( self, *, @@ -1928,32 +1934,32 @@ def __init__( # Initialize test steps self.discover = tmt.steps.discover.Discover( - logger=logger.descend(logger_name='discover'), + logger=self.step_logger('discover'), plan=self, data=self.node.get('discover'), ) self.provision = tmt.steps.provision.Provision( - logger=logger.descend(logger_name='provision'), + logger=self.step_logger('provision'), plan=self, data=self.node.get('provision'), ) self.prepare = tmt.steps.prepare.Prepare( - logger=logger.descend(logger_name='prepare'), + logger=self.step_logger('prepare'), plan=self, data=self.node.get('prepare'), ) self.execute = tmt.steps.execute.Execute( - logger=logger.descend(logger_name='execute'), + logger=self.step_logger('execute'), plan=self, data=self.node.get('execute'), ) self.report = tmt.steps.report.Report( - logger=logger.descend(logger_name='report'), + logger=self.step_logger('report'), plan=self, data=self.node.get('report'), ) self.finish = tmt.steps.finish.Finish( - logger=logger.descend(logger_name='finish'), + logger=self.step_logger('finish'), plan=self, data=self.node.get('finish'), ) @@ -3768,6 +3774,64 @@ class RunData(SerializableContainer): ) +@container +class PlanTask(tmt.queue.GuestlessTask[None]): + """A task to run a plan""" + + plans: list[Plan] + + #: Plan that was executed. + plan: Optional[Plan] + + # Custom yet trivial `__init__` is necessary, see note in `tmt.queue.Task`. + def __init__(self, logger: tmt.log.Logger, plans: list[Plan], **kwargs: Any) -> None: + super().__init__(logger, **kwargs) + + self.plans = plans + self.plan = None + + @property + def name(self) -> str: + return cast(str, fmf.utils.listed([plan.name for plan in self.plans])) + + def go(self) -> Iterator['PlanTask']: + """ + Perform the task. + + Called by :py:class:`Queue` machinery to accomplish the task. It expects + the child class would implement :py:meth:`run`, with ``go`` taking care + of task/queue interaction. + + :yields: instances of the same class, describing invocations of the + task and their outcome. For each guest, one instance would be + yielded. + """ + + def inject_logger(task: 'PlanTask', plan: Plan, logger: tmt.log.Logger) -> None: + plan.inject_logger(logger) + + for step_name in tmt.steps.STEPS: + getattr(plan, step_name).inject_logger(plan.step_logger(step_name)) + + yield from tmt.queue.execute_units( + self, + self.plans, + lambda task, plan: plan.name, + inject_logger, + lambda task, plan, logger, executor: executor.submit(plan.go), + lambda task, plan, logger, result: dataclasses.replace( + self, result=result, exc=None, requested_exit=None, plan=plan + ), + lambda task, plan, logger, exc: dataclasses.replace( + self, result=None, exc=exc, requested_exit=None, plan=plan + ), + lambda task, plan, logger, exc: dataclasses.replace( + self, result=None, exc=None, requested_exit=exc, plan=plan + ), + self.logger, + ) + + class Run(tmt.utils.Common): """ Test run, a container of plans @@ -3999,9 +4063,9 @@ def plans(self) -> Sequence[Plan]: return self._plans @functools.cached_property - def plan_queue(self) -> Sequence[Plan]: + def plan_staging_queue(self) -> Sequence[Plan]: """ - A list of plans remaining to be executed. + A list of plans remaining to be queued by run and executed. It is being populated via :py:attr:`plans`, but eventually, :py:meth:`go` will remove plans from it as they get processed. @@ -4020,7 +4084,7 @@ def swap_plans(self, plan: Plan, *others: Plan) -> None: """ plans = cast(list[Plan], self.plans) - plan_queue = cast(list[Plan], self.plan_queue) + plan_queue = cast(list[Plan], self.plan_staging_queue) if plan in plan_queue: plan_queue.remove(plan) @@ -4199,23 +4263,45 @@ def go(self) -> None: self.verbose(f"Found {listed(self.plans, 'plan')}.") self.save() - # Iterate over plans - crashed_plans: list[tuple[Plan, Exception]] = [] + queue: Queue[PlanTask] = Queue('plans', self._logger.descend(logger_name=f'{self}.queue')) - while self.plan_queue: - plan = cast(list[Plan], self.plan_queue).pop(0) + failed_tasks: list[PlanTask] = [] - try: - plan.go() + def _enqueue_new_plans() -> None: + staging_queue = self.plan_staging_queue[:] - except Exception as exc: - if self.opt('on-plan-error') == 'quit': - raise tmt.utils.GeneralError('plan failed', causes=[exc]) + if not staging_queue: + return + + queue.enqueue_task(PlanTask(self._logger, cast(list[Plan], staging_queue))) + + for plan in staging_queue: + cast(list[Plan], self.plan_staging_queue).remove(plan) + + _enqueue_new_plans() + + for outcome in queue.run(stop_on_error=False): + _enqueue_new_plans() + + if outcome.exc: + outcome.logger.fail(str(outcome.exc)) + + failed_tasks.append(outcome) + continue + + if failed_tasks: + raise tmt.utils.GeneralError( + 'plan failed', + causes=[outcome.exc for outcome in failed_tasks if outcome.exc is not None], + ) - crashed_plans.append((plan, exc)) + # crashed_plans: list[tuple[Plan, Exception]] = [] - if crashed_plans: - raise tmt.utils.GeneralError('plan failed', causes=[exc for _, exc in crashed_plans]) + # except Exception as exc: + # if self.opt('on-plan-error') == 'quit': + # raise tmt.utils.GeneralError( + # 'plan failed', + # causes=[exc]) # Update the last run id at the very end # (override possible runs created during execution) diff --git a/tmt/queue.py b/tmt/queue.py index ad83a03d16..feb841d0e3 100644 --- a/tmt/queue.py +++ b/tmt/queue.py @@ -1,18 +1,100 @@ import dataclasses import functools +import itertools +import queue from collections.abc import Iterator from concurrent.futures import Future, ThreadPoolExecutor, as_completed -from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar from tmt.container import container from tmt.log import Logger if TYPE_CHECKING: + import tmt.utils from tmt._compat.typing import Self from tmt.steps.provision import Guest TaskResultT = TypeVar('TaskResultT') +TaskT = TypeVar('TaskT', bound='Task') +T = TypeVar('T', bound='tmt.utils.Common') + + +def execute_units( + task: 'Task[TaskResultT]', + units: list[T], + get_label: Callable[['Task[TaskResultT]', T], str], + inject_logger: Callable[['Task[TaskResultT]', T, Logger], None], + submit: Callable[['Task[TaskResultT]', T, Logger, ThreadPoolExecutor], Future[TaskResultT]], + on_success: Callable[['Task[TaskResultT]', T, Logger, TaskResultT], 'Task[TaskResultT]'], + on_error: Callable[['Task[TaskResultT]', T, Logger, Exception], 'Task[TaskResultT]'], + on_exit: Callable[['Task[TaskResultT]', T, Logger, SystemExit], 'Task[TaskResultT]'], + logger: Logger, +) -> Iterator['Task[TaskResultT]']: + multiple_units = len(units) > 1 + + new_loggers = prepare_loggers(logger, [get_label(task, unit) for unit in units]) + old_loggers: dict[str, Logger] = {} + + with ThreadPoolExecutor(max_workers=len(units)) as executor: + futures: dict[Future[TaskResultT], T] = {} + + for unit in units: + # Swap guest's logger for the one we prepared, with labels + # and stuff. + # + # We can't do the same for phases - phase is shared among + # guests, its `self.$loggingmethod()` calls need to be + # fixed to use a logger we pass to it through the executor. + # + # Possibly, the same thing should happen to guest methods as + # well, then the phase would pass the given logger to guest + # methods when it calls them, propagating the single logger we + # prepared... + old_loggers[get_label(task, unit)] = unit._logger + new_logger = new_loggers[get_label(task, unit)] + + inject_logger(task, unit, new_logger) + + if multiple_units: + new_logger.info('started', color='cyan') + + # Submit each task/guest combination (save the guest & logger + # for later)... + futures[submit(task, unit, new_logger, executor)] = unit + + # ... and then sit and wait as they get delivered to us as they + # finish. Unpack the guest and logger, so we could preserve logging + # and prepare the right outcome package. + for future in as_completed(futures): + unit = futures[future] + + old_logger = old_loggers[get_label(task, unit)] + new_logger = new_loggers[get_label(task, unit)] + + if multiple_units: + new_logger.info('finished', color='cyan') + + # `Future.result()` will either 1. reraise an exception the + # callable raised, if any, or 2. return whatever the callable + # returned - which is `None` in our case, therefore we can + # ignore the return value. + try: + result = future.result() + + except SystemExit as exc: + yield on_exit(task, unit, new_logger, exc) + + except Exception as exc: + yield on_error(task, unit, new_logger, exc) + + else: + yield on_success(task, unit, new_logger, result) + + yield task + + # Don't forget to restore the original logger. + inject_logger(task, unit, old_logger) @container @@ -34,6 +116,8 @@ class Task(Generic[TaskResultT]): to their defaults "manually". """ + id: Optional[int] + #: A logger to use for logging events related to the outcome. logger: Logger @@ -86,9 +170,6 @@ def go(self) -> Iterator['Self']: raise NotImplementedError -TaskT = TypeVar('TaskT', bound='Task') # type: ignore[type-arg] - - def prepare_loggers(logger: Logger, labels: list[str]) -> dict[str, Logger]: """ Create loggers for a set of labels. @@ -197,7 +278,7 @@ def __init__(self, logger: Logger, guests: list['Guest'], **kwargs: Any) -> None def guest_ids(self) -> list[str]: return sorted([guest.multihost_name for guest in self.guests]) - def run_on_guest(self, guest: 'Guest', logger: Logger) -> None: + def run_on_guest(self, guest: 'Guest', logger: Logger) -> TaskResultT: """ Perform the task. @@ -210,7 +291,7 @@ def run_on_guest(self, guest: 'Guest', logger: Logger) -> None: raise NotImplementedError - def go(self) -> Iterator['Self']: + def go(self) -> Iterator['MultiGuestTask[TaskResultT]']: """ Perform the task. @@ -223,99 +304,55 @@ def go(self) -> Iterator['Self']: yielded. """ - multiple_guests = len(self.guests) > 1 - - new_loggers = prepare_loggers(self.logger, [guest.multihost_name for guest in self.guests]) - old_loggers: dict[str, Logger] = {} - - with ThreadPoolExecutor(max_workers=len(self.guests)) as executor: - futures: dict[Future[None], Guest] = {} - - for guest in self.guests: - # Swap guest's logger for the one we prepared, with labels - # and stuff. - # - # We can't do the same for phases - phase is shared among - # guests, its `self.$loggingmethod()` calls need to be - # fixed to use a logger we pass to it through the executor. - # - # Possibly, the same thing should happen to guest methods as - # well, then the phase would pass the given logger to guest - # methods when it calls them, propagating the single logger we - # prepared... - old_loggers[guest.multihost_name] = guest._logger - new_logger = new_loggers[guest.multihost_name] - - guest.inject_logger(new_logger) - - if multiple_guests: - new_logger.info('started', color='cyan') - - # Submit each task/guest combination (save the guest & logger - # for later)... - futures[executor.submit(self.run_on_guest, guest, new_logger)] = guest - - # ... and then sit and wait as they get delivered to us as they - # finish. Unpack the guest and logger, so we could preserve logging - # and prepare the right outcome package. - for future in as_completed(futures): - guest = futures[future] - - old_logger = old_loggers[guest.multihost_name] - new_logger = new_loggers[guest.multihost_name] - - if multiple_guests: - new_logger.info('finished', color='cyan') - - # `Future.result()` will either 1. reraise an exception the - # callable raised, if any, or 2. return whatever the callable - # returned - which is `None` in our case, therefore we can - # ignore the return value. - try: - result = future.result() - - except SystemExit as exc: - task = dataclasses.replace(self, result=None, exc=None, requested_exit=exc) - - except Exception as exc: - task = dataclasses.replace(self, result=None, exc=exc, requested_exit=None) - - else: - task = dataclasses.replace(self, result=result, exc=None, requested_exit=None) - - task.guest = guest + yield from execute_units( + self, + self.guests, + lambda task, guest: guest.multihost_name, + lambda task, guest, logger: guest.inject_logger(logger), + lambda task, guest, logger, executor: executor.submit( + self.run_on_guest, guest, logger + ), + lambda task, guest, logger, result: dataclasses.replace( + self, result=result, exc=None, requested_exit=None, guest=guest + ), + lambda task, guest, logger, exc: dataclasses.replace( + self, result=None, exc=exc, requested_exit=None, guest=guest + ), + lambda task, guest, logger, exc: dataclasses.replace( + self, result=None, exc=None, requested_exit=exc, guest=guest + ), + self.logger, + ) - yield task - # Don't forget to restore the original logger. - guest.inject_logger(old_logger) +class Queue(queue.Queue[TaskT]): + """Queue class for running tasks""" - -class Queue(list[TaskT]): - """ - Queue class for running tasks - """ + _task_counter: 'itertools.count[int]' def __init__(self, name: str, logger: Logger) -> None: super().__init__() self.name = name self._logger = logger + self._task_counter = itertools.count(start=1) def enqueue_task(self, task: TaskT) -> None: """ Put new task into a queue """ - self.append(task) + task.id = next(self._task_counter) + + self.put(task) self._logger.info( - f'queued {self.name} task #{len(self)}', + f'queued {self.name} task #{task.id}', task.name, color='cyan', ) - def run(self) -> Iterator[TaskT]: + def run(self, stop_on_error: bool = True) -> Iterator[TaskT]: """ Start crunching the queued tasks. @@ -323,11 +360,17 @@ def run(self) -> Iterator[TaskT]: combination a :py:class:`Task` instance is yielded. """ - for i, task in enumerate(self): + while True: + try: + task = self.get_nowait() + + except queue.Empty: + return + self._logger.info('') self._logger.info( - f'{self.name} task #{i + 1}', + f'{self.name} task #{task.id}', task.name, color='cyan', ) @@ -341,5 +384,5 @@ def run(self) -> Iterator[TaskT]: yield outcome # TODO: make this optional - if failed_tasks: + if failed_tasks and stop_on_error: return diff --git a/tmt/steps/__init__.py b/tmt/steps/__init__.py index d19fe3cad7..9af9b127b5 100644 --- a/tmt/steps/__init__.py +++ b/tmt/steps/__init__.py @@ -2536,8 +2536,8 @@ def phase_name(self) -> str: def name(self) -> str: return f'{self.phase_name} on {fmf.utils.listed(self.guest_ids)}' - def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> None: - self.phase.go(guest=guest, logger=logger) + def run_on_guest(self, guest: 'Guest', logger: tmt.log.Logger) -> PluginReturnValueT: + return self.phase.go(guest=guest, logger=logger) class PhaseQueue(tmt.queue.Queue[Union[ActionTask, PluginTask[StepDataT, PluginReturnValueT]]]): diff --git a/tmt/steps/provision/__init__.py b/tmt/steps/provision/__init__.py index 770e9c3a5c..783bf5b8c9 100644 --- a/tmt/steps/provision/__init__.py +++ b/tmt/steps/provision/__init__.py @@ -2702,6 +2702,7 @@ def go(self) -> Iterator['ProvisionTask']: except SystemExit as exc: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=None, @@ -2712,6 +2713,7 @@ def go(self) -> Iterator['ProvisionTask']: except Exception as exc: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=None, @@ -2722,6 +2724,7 @@ def go(self) -> Iterator['ProvisionTask']: else: yield ProvisionTask( + id=None, logger=new_logger, result=None, guest=phase.guest(), @@ -2743,6 +2746,7 @@ class ProvisionQueue(tmt.queue.Queue[ProvisionTask]): def enqueue(self, *, phases: list[ProvisionPlugin[ProvisionStepData]], logger: Logger) -> None: self.enqueue_task( ProvisionTask( + id=None, logger=logger, result=None, guest=None,