diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 633cf85f..ac635e5b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -10,7 +10,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: self-hosted strategy: matrix: python-version: ["3.10"] @@ -34,4 +34,4 @@ jobs: # flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - python -m pytest tests/ --reruns 5 + python -m pytest tests/ --reruns 5 -n 16 diff --git a/pyproject.toml b/pyproject.toml index 26bbe10a..946366a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "sampo" -version = "0.1.2" +version = "0.1.2.1" description = "Open-source framework for adaptive manufacturing processes scheduling" authors = ["iAirLab "] license = "BSD-3-Clause" diff --git a/sampo/scheduler/generic.py b/sampo/scheduler/generic.py index 211ca5b4..bdbae7d5 100644 --- a/sampo/scheduler/generic.py +++ b/sampo/scheduler/generic.py @@ -19,13 +19,10 @@ from sampo.utilities.validation import validate_schedule -# TODO Кажется, это не работает - лаги не учитываются def get_finish_time_default(node, worker_team, node2swork, spec, assigned_parent_time, timeline, work_estimator) -> Time: - return timeline.find_min_start_time(node, worker_team, node2swork, spec, - assigned_parent_time, work_estimator) \ - + calculate_working_time_cascade(node, worker_team, - work_estimator) # TODO Кажется, это не работает - лаги не учитываются + return timeline.find_min_start_time_with_additional(node, worker_team, node2swork, spec, + assigned_parent_time, work_estimator)[1] PRIORITIZATION_F = Callable[ @@ -33,7 +30,7 @@ def get_finish_time_default(node, worker_team, node2swork, spec, assigned_parent ] RESOURCE_OPTIMIZE_F = Callable[[GraphNode, list[Contractor], WorkSpec, WorkerContractorPool, dict[GraphNode, ScheduledWork], Time, Timeline, WorkTimeEstimator], - tuple[Time, Time, Contractor, list[Worker]]] + tuple[Time, Time, dict[GraphNode, Time], Contractor, list[Worker]]] class GenericScheduler(Scheduler): @@ -78,12 +75,12 @@ def ft_getter(worker_team) -> Time: return get_finish_time(node, worker_team, node2swork, spec, assigned_parent_time, timeline, work_estimator) - def run_with_contractor(contractor: Contractor) -> tuple[Time, Time, list[Worker]]: + def run_with_contractor(contractor: Contractor) -> tuple[Time, Time, dict[GraphNode, Time], list[Worker]]: min_count_worker_team, max_count_worker_team, workers \ = get_worker_borders(worker_pool, contractor, node.work_unit.worker_reqs) if len(workers) != len(node.work_unit.worker_reqs): - return assigned_parent_time, Time.inf(), [] + return assigned_parent_time, Time.inf(), {}, [] workers = [worker.copy() for worker in workers] @@ -95,9 +92,10 @@ def run_with_contractor(contractor: Contractor) -> tuple[Time, Time, list[Worker min_count_worker_team, max_count_worker_team, ft_getter)) - c_st, c_ft, _ = timeline.find_min_start_time_with_additional(node, workers, node2swork, spec, None, - assigned_parent_time, work_estimator) - return c_st, c_ft, workers + c_st, c_ft, exec_times = timeline.find_min_start_time_with_additional(node, workers, node2swork, spec, + None, assigned_parent_time, + None, work_estimator) + return c_st, c_ft, exec_times, workers return run_contractor_search(contractors, spec, run_with_contractor) @@ -164,11 +162,11 @@ def build_scheduler(self, work_unit = node.work_unit work_spec = spec.get_work_spec(work_unit.id) - start_time, finish_time, contractor, best_worker_team = self.optimize_resources(node, contractors, - work_spec, worker_pool, - node2swork, - assigned_parent_time, - timeline, work_estimator) + start_time, finish_time, exec_times, contractor, best_worker_team = self.optimize_resources(node, contractors, + work_spec, worker_pool, + node2swork, + assigned_parent_time, + timeline, work_estimator) # we are scheduling the work `start of the project` if index == 0: @@ -177,12 +175,15 @@ def build_scheduler(self, finish_time += start_time if index == len(ordered_nodes) - 1: # we are scheduling the work `end of the project` - finish_time, finalizing_zones = timeline.zone_timeline.finish_statuses() + zone_finish_time, finalizing_zones = timeline.zone_timeline.finish_statuses() + finish_time = max(finish_time, zone_finish_time) start_time = max(start_time, finish_time) # apply work to scheduling + # FIXME these (finish_time - start_time) contain lags! + # make Timeline#schedule receive `exec_times` instead of fixed time timeline.schedule(node, node2swork, best_worker_team, contractor, work_spec, - start_time, work_spec.assigned_time, assigned_parent_time, work_estimator) + start_time, assigned_parent_time, exec_times, work_estimator) if index == len(ordered_nodes) - 1: # we are scheduling the work `end of the project` node2swork[node].zones_pre = finalizing_zones diff --git a/sampo/scheduler/genetic/converter.py b/sampo/scheduler/genetic/converter.py index 4cea5297..1f1820b9 100644 --- a/sampo/scheduler/genetic/converter.py +++ b/sampo/scheduler/genetic/converter.py @@ -3,13 +3,11 @@ import numpy as np from sampo.api.genetic_api import ChromosomeType, ScheduleGenerationScheme -from sampo.base import SAMPO from sampo.scheduler.base import Scheduler from sampo.scheduler.timeline import JustInTimeTimeline, MomentumTimeline from sampo.scheduler.timeline.base import Timeline from sampo.scheduler.timeline.general_timeline import GeneralTimeline from sampo.scheduler.utils import WorkerContractorPool -from sampo.scheduler.utils.time_computaion import calculate_working_time_cascade from sampo.schemas import ZoneReq from sampo.schemas.contractor import Contractor from sampo.schemas.graph import GraphNode @@ -19,7 +17,7 @@ from sampo.schemas.schedule_spec import ScheduleSpec from sampo.schemas.time import Time from sampo.schemas.time_estimator import WorkTimeEstimator, DefaultWorkEstimator -from sampo.utilities.collections_util import reverse_dictionary +from sampo.utilities.inseparables import get_exec_times_from_assigned_time_for_chain, find_min_time_slot_size from sampo.utilities.linked_list import LinkedList @@ -174,11 +172,19 @@ def decode(work_index): .copy().with_count(worker_count) for worker_index, worker_count in enumerate(cur_resources) if worker_count > 0] + # apply worker spec + Scheduler.optimize_resources_using_spec(cur_node.work_unit, cur_worker_team, cur_work_spec) + + cur_inseparable_chain = cur_node.get_inseparable_chain_with_self() if cur_work_spec.assigned_time is not None: - cur_exec_time = cur_work_spec.assigned_time + cur_exec_times = get_exec_times_from_assigned_time_for_chain(cur_inseparable_chain, + cur_work_spec.assigned_time) else: - cur_exec_time = calculate_working_time_cascade(cur_node, cur_worker_team, work_estimator) - return cur_node, cur_worker_team, cur_contractor, cur_exec_time, cur_work_spec + cur_exec_times = {} + for dep_node in cur_inseparable_chain: + cur_exec_times[dep_node] = work_estimator.estimate_time(dep_node.work_unit, cur_worker_team) + + return cur_node, cur_worker_team, cur_contractor, cur_exec_times, cur_work_spec # account the remaining works enumerated_works_remaining = LinkedList(iterable=enumerate( @@ -191,23 +197,27 @@ def decode(work_index): prev_start_time = start_time - 1 def work_scheduled(args) -> bool: - idx, (work_idx, node, worker_team, contractor, exec_time, work_spec) = args + idx, (work_idx, node, worker_team, contractor, exec_times, work_spec) = args + + exec_time = find_min_time_slot_size(node.get_inseparable_chain_with_self(), node2swork, exec_times, start_time) if timeline.can_schedule_at_the_moment(node, worker_team, work_spec, node2swork, start_time, exec_time): - # apply worker spec - Scheduler.optimize_resources_using_spec(node.work_unit, worker_team, work_spec) + finish_time = start_time + exec_time st = start_time if idx == 0: # we are scheduling the work `start of the project` st = assigned_parent_time # this work should always have st = 0, so we just re-assign it if idx == len(works_order) - 1: # we are scheduling the work `end of the project` - finish_time, finalizing_zones = timeline.zone_timeline.finish_statuses() + zone_finish_time, finalizing_zones = timeline.zone_timeline.finish_statuses() + finish_time = max(finish_time, zone_finish_time) st = max(start_time, finish_time) + # assert timeline.can_schedule_at_the_moment(node, worker_team, work_spec, node2swork, st, exec_time) + # finish using time spec timeline.schedule(node, node2swork, worker_team, contractor, work_spec, - st, exec_time, assigned_parent_time, work_estimator) + st, assigned_parent_time, exec_times, work_estimator) if idx == len(works_order) - 1: # we are scheduling the work `end of the project` node2swork[node].zones_pre = finalizing_zones diff --git a/sampo/scheduler/heft/base.py b/sampo/scheduler/heft/base.py index e5b9fd1d..6c970641 100644 --- a/sampo/scheduler/heft/base.py +++ b/sampo/scheduler/heft/base.py @@ -48,4 +48,4 @@ def __init__(self, @staticmethod def get_finish_time(node, worker_team, node2swork, spec, assigned_parent_time, timeline, work_estimator) -> Time: return timeline.find_min_start_time_with_additional(node, worker_team, node2swork, spec, None, - assigned_parent_time, work_estimator)[1] + assigned_parent_time, None, work_estimator)[1] diff --git a/sampo/scheduler/lft/base.py b/sampo/scheduler/lft/base.py index 3732d9e2..24715ad6 100644 --- a/sampo/scheduler/lft/base.py +++ b/sampo/scheduler/lft/base.py @@ -158,17 +158,18 @@ def build_scheduler(self, timeline = self._timeline_type(worker_pool, landscape) for index, node in enumerate(ordered_nodes): - work_unit = node.work_unit work_spec = spec[node.id] # get assigned contractor and workers contractor, best_worker_team = self._node_id2workers[node.id] # find start time - start_time, finish_time, _ = timeline.find_min_start_time_with_additional(node, best_worker_team, - node2swork, work_spec, None, - assigned_parent_time, - work_estimator) + start_time, finish_time, exec_times = timeline.find_min_start_time_with_additional(node, best_worker_team, + node2swork, work_spec, + None, + assigned_parent_time, + None, + work_estimator) # we are scheduling the work `start of the project` if index == 0: # this work should always have start_time = 0, so we just re-assign it @@ -176,12 +177,13 @@ def build_scheduler(self, finish_time += start_time if index == len(ordered_nodes) - 1: # we are scheduling the work `end of the project` - finish_time, finalizing_zones = timeline.zone_timeline.finish_statuses() + zone_finish_time, finalizing_zones = timeline.zone_timeline.finish_statuses() + finish_time = max(finish_time, zone_finish_time) start_time = max(start_time, finish_time) # apply work to scheduling timeline.schedule(node, node2swork, best_worker_team, contractor, work_spec, - start_time, work_spec.assigned_time, assigned_parent_time, work_estimator) + start_time, assigned_parent_time, exec_times, work_estimator) if index == len(ordered_nodes) - 1: # we are scheduling the work `end of the project` node2swork[node].zones_pre = finalizing_zones diff --git a/sampo/scheduler/timeline/base.py b/sampo/scheduler/timeline/base.py index f69e7369..b322fb81 100644 --- a/sampo/scheduler/timeline/base.py +++ b/sampo/scheduler/timeline/base.py @@ -21,17 +21,16 @@ class Timeline(ABC): def schedule(self, node: GraphNode, node2swork: dict[GraphNode, ScheduledWork], - passed_agents: list[Worker], + workers: list[Worker], contractor: Contractor, spec: WorkSpec, - assigned_start_time: Time | None = None, - assigned_time: Time | None = None, + assigned_start_time: Optional[Time] = None, assigned_parent_time: Time = Time(0), - work_estimator: WorkTimeEstimator = DefaultWorkEstimator()) -> Time: + exec_times: Optional[dict[GraphNode, Time]] = None, + work_estimator: WorkTimeEstimator = DefaultWorkEstimator()): """ - Schedules the given `GraphNode` using passed agents, spec and times. + Schedules the given `GraphNode` using passed workers, spec and times. If start time not passed, it should be computed as minimum work start time. - :return: scheduled finish time of given work """ ... @@ -41,6 +40,7 @@ def find_min_start_time(self, node2swork: dict[GraphNode, ScheduledWork], spec: WorkSpec, parent_time: Time = Time(0), + exec_times: dict[GraphNode, Time] = None, work_estimator: WorkTimeEstimator = DefaultWorkEstimator()) -> Time: """ Computes start time, max parent time, contractor and exec times for given node. @@ -50,11 +50,12 @@ def find_min_start_time(self, :param node2swork: dictionary, that match GraphNode to ScheduleWork respectively :param spec: specification for given `GraphNode` :param parent_time: the minimum start time + :param exec_times: :param work_estimator: function that calculates execution time of the GraphNode :return: minimum time """ return self.find_min_start_time_with_additional(node, worker_team, node2swork, spec, None, - parent_time, work_estimator)[0] + parent_time, exec_times, work_estimator)[0] @abstractmethod def find_min_start_time_with_additional(self, @@ -64,8 +65,9 @@ def find_min_start_time_with_additional(self, spec: WorkSpec, assigned_start_time: Optional[Time] = None, assigned_parent_time: Time = Time(0), + exec_times: dict[GraphNode, Time] = None, work_estimator: WorkTimeEstimator = DefaultWorkEstimator()) \ - -> tuple[Time, Time, dict[GraphNode, tuple[Time, Time]]]: + -> tuple[Time, Time, dict[GraphNode, Time]]: ... @abstractmethod diff --git a/sampo/scheduler/timeline/just_in_time_timeline.py b/sampo/scheduler/timeline/just_in_time_timeline.py index 7115b879..a39cc246 100644 --- a/sampo/scheduler/timeline/just_in_time_timeline.py +++ b/sampo/scheduler/timeline/just_in_time_timeline.py @@ -3,7 +3,6 @@ from sampo.scheduler.timeline.base import Timeline from sampo.scheduler.timeline.hybrid_supply_timeline import HybridSupplyTimeline from sampo.scheduler.timeline.zone_timeline import ZoneTimeline -from sampo.scheduler.timeline.utils import get_exec_times_from_assigned_time_for_chain from sampo.scheduler.utils import WorkerContractorPool from sampo.schemas import Contractor from sampo.schemas.graph import GraphNode @@ -13,6 +12,7 @@ from sampo.schemas.scheduled_work import ScheduledWork from sampo.schemas.time import Time from sampo.schemas.time_estimator import WorkTimeEstimator, DefaultWorkEstimator +from sampo.utilities.inseparables import calculate_exec_times, find_min_time_slot_size class JustInTimeTimeline(Timeline): @@ -32,28 +32,41 @@ def __init__(self, worker_pool: WorkerContractorPool, landscape: LandscapeConfig self._material_timeline = HybridSupplyTimeline(landscape) self.zone_timeline = ZoneTimeline(landscape.zone_config) - def find_min_start_time_with_additional(self, node: GraphNode, + def find_min_start_time_with_additional(self, + node: GraphNode, worker_team: list[Worker], node2swork: dict[GraphNode, ScheduledWork], spec: WorkSpec, assigned_start_time: Time | None = None, assigned_parent_time: Time = Time(0), + exec_times: dict[GraphNode, Time] = None, work_estimator: WorkTimeEstimator = DefaultWorkEstimator()) \ - -> tuple[Time, Time, dict[GraphNode, tuple[Time, Time]]]: + -> tuple[Time, Time, dict[GraphNode, Time]]: """ Define the nearest possible start time for the current job. It is equal the max value from: 1. end time of all parent tasks, 2. time previous job off all needed workers to complete the current task. - :param assigned_parent_time: minimum start time - :param assigned_start_time: :param node: the GraphNode whose minimum time we are trying to find :param worker_team: the worker team under testing :param node2swork: dictionary, that match GraphNode to ScheduleWork respectively :param spec: given work specification + :param assigned_start_time: + :param assigned_parent_time: minimum start time + :param exec_times: :param work_estimator: function that calculates execution time of the GraphNode :return: start time, end time, None(exec_times not needed in this timeline) """ + # define the max worker time when all needed workers are off from previous tasks + cur_start_time = node.min_start_time(node2swork) + + inseparable_chain = node.get_inseparable_chain_with_self() + + # 2. calculating execution time of the task + + if not exec_times: + exec_times = calculate_exec_times(inseparable_chain, spec, worker_team, work_estimator) + # if current job is the first if not node2swork: max_material_time = self._material_timeline.find_min_material_time(node, @@ -62,29 +75,7 @@ def find_min_start_time_with_additional(self, node: GraphNode, max_zone_time = self.zone_timeline.find_min_start_time(node.work_unit.zone_reqs, max_material_time, Time(0)) - return max_zone_time, max_zone_time, None - # define the max end time of all parent tasks - max_parent_time = max(node.min_start_time(node2swork), assigned_parent_time) - # define the max agents time when all needed workers are off from previous tasks - max_agent_time = Time(0) - cur_start_time = max_agent_time - - inseparable_chain = node.get_inseparable_chain_with_self() - - new_finish_time = cur_start_time - for dep_node in inseparable_chain: - # set start time as finish time of original work - # set finish time as finish time + working time of current node with identical resources - # (the same as in original work) - # set the same workers on it - # TODO Decide where this should be - dep_parent_time = dep_node.min_start_time(node2swork) - - dep_st = max(new_finish_time, dep_parent_time) - working_time = work_estimator.estimate_time(dep_node.work_unit, worker_team) - new_finish_time = dep_st + working_time - - exec_time = new_finish_time - cur_start_time + return max_zone_time, max_zone_time, exec_times found_earliest_time = False while not found_earliest_time: @@ -96,26 +87,32 @@ def find_min_start_time_with_additional(self, node: GraphNode, if material_time > cur_start_time: cur_start_time = material_time continue + # material_time <= cur_start_time, i.e. materials can be delivered to the `cur_start_time` moment + # now we are scheduling at the `cur_start_time`, so our `cur_exec_time` is still valid + exec_time = find_min_time_slot_size(inseparable_chain, node2swork, exec_times, start_time=cur_start_time) zone_time = self.zone_timeline.find_min_start_time(node.work_unit.zone_reqs, cur_start_time, exec_time) + if zone_time > cur_start_time: cur_start_time = zone_time - else: + elif zone_time == cur_start_time: found_earliest_time = True c_st = cur_start_time - c_ft = c_st + exec_time - return c_st, c_ft, None - def _find_min_start_time(self, worker_team: list[Worker], _max_agent_time: Time, spec: WorkSpec): - max_agent_time = _max_agent_time + assert self.can_schedule_at_the_moment(node, worker_team, spec, node2swork, c_st, exec_time) + + return c_st, c_ft, exec_times + + def _find_min_start_time(self, worker_team: list[Worker], min_start_time: Time, spec: WorkSpec): + max_worker_time = min_start_time if spec.is_independent: # grab from the end for worker in worker_team: offer_stack = self._timeline[worker.get_agent_id()] - max_agent_time = max(max_agent_time, offer_stack[0][0]) + max_worker_time = max(max_worker_time, offer_stack[0][0]) else: # grab from whole sequence # for each resource type @@ -126,14 +123,14 @@ def _find_min_start_time(self, worker_team: list[Worker], _max_agent_time: Time, ind = len(offer_stack) - 1 while needed_count > 0: offer_time, offer_count = offer_stack[ind] - max_agent_time = max(max_agent_time, offer_time) + max_worker_time = max(max_worker_time, offer_time) if needed_count < offer_count: offer_count = needed_count needed_count -= offer_count ind -= 1 - return max_agent_time + return max_worker_time def can_schedule_at_the_moment(self, node: GraphNode, @@ -237,8 +234,8 @@ def schedule(self, contractor: Contractor, spec: WorkSpec, assigned_start_time: Optional[Time] = None, - assigned_time: Optional[Time] = None, assigned_parent_time: Time = Time(0), + exec_times: dict[GraphNode, Time] = None, work_estimator: WorkTimeEstimator = DefaultWorkEstimator()): inseparable_chain = node.get_inseparable_chain_with_self() @@ -247,15 +244,11 @@ def schedule(self, node2swork, spec, assigned_parent_time, + exec_times, work_estimator) - if assigned_time is not None: - exec_times = get_exec_times_from_assigned_time_for_chain(inseparable_chain, assigned_time) - return self._schedule_with_inseparables(node, node2swork, workers, contractor, spec, - inseparable_chain, start_time, exec_times, work_estimator) - else: - return self._schedule_with_inseparables(node, node2swork, workers, contractor, spec, - inseparable_chain, start_time, {}, work_estimator) + return self._schedule_with_inseparables(node, node2swork, workers, contractor, spec, + inseparable_chain, start_time, exec_times, work_estimator) def _schedule_with_inseparables(self, node: GraphNode, @@ -265,7 +258,7 @@ def _schedule_with_inseparables(self, spec: WorkSpec, inseparable_chain: list[GraphNode], start_time: Time, - exec_times: dict[GraphNode, tuple[Time, Time]], + exec_times: dict[GraphNode, Time] | None = None, work_estimator: WorkTimeEstimator = DefaultWorkEstimator()): """ Makes ScheduledWork object from `GraphNode` and worker list, assigned `start_end_time` @@ -283,6 +276,11 @@ def _schedule_with_inseparables(self, :return: """ + if not exec_times: + exec_times = calculate_exec_times(inseparable_chain, spec, workers, work_estimator) + + # assert self.can_schedule_at_the_moment(node, workers, spec, node2swork, start_time, exec_times[node]) + c_ft = start_time for dep_node in inseparable_chain: # set start time as finish time of original work @@ -292,14 +290,15 @@ def _schedule_with_inseparables(self, # TODO Decide where this should be max_parent_time = dep_node.min_start_time(node2swork) - if dep_node.is_inseparable_son(): - assert max_parent_time >= node2swork[dep_node.inseparable_parent].finish_time + # TODO Remove + # if dep_node.is_inseparable_son(): + # assert max_parent_time >= node2swork[dep_node.inseparable_parent].finish_time + + working_time = exec_times[dep_node] + + c_st = max(c_ft, max_parent_time) - if dep_node in exec_times: - lag, working_time = exec_times[dep_node] - else: - lag, working_time = 0, work_estimator.estimate_time(dep_node.work_unit, workers) - c_st = max(c_ft + lag, max_parent_time) + # assert self.can_schedule_at_the_moment(dep_node, workers, spec, node2swork, c_st, working_time), f'{dep_node.is_inseparable_son()}' deliveries, mat_del_time = self._material_timeline.deliver_resources(dep_node, c_st, diff --git a/sampo/scheduler/timeline/momentum_timeline.py b/sampo/scheduler/timeline/momentum_timeline.py index ae641687..8f73241a 100644 --- a/sampo/scheduler/timeline/momentum_timeline.py +++ b/sampo/scheduler/timeline/momentum_timeline.py @@ -6,7 +6,6 @@ from sampo.scheduler.timeline.base import Timeline from sampo.scheduler.timeline.hybrid_supply_timeline import HybridSupplyTimeline from sampo.scheduler.timeline.zone_timeline import ZoneTimeline -from sampo.scheduler.timeline.utils import get_exec_times_from_assigned_time_for_chain from sampo.scheduler.utils import WorkerContractorPool from sampo.schemas.contractor import Contractor from sampo.schemas.graph import GraphNode @@ -19,6 +18,7 @@ from sampo.schemas.time_estimator import WorkTimeEstimator, DefaultWorkEstimator from sampo.schemas.types import ScheduleEvent, EventType from sampo.utilities.collections_util import build_index +from sampo.utilities.inseparables import calculate_exec_times, find_min_time_slot_size class MomentumTimeline(Timeline): @@ -78,8 +78,9 @@ def find_min_start_time_with_additional(self, spec: WorkSpec, assigned_start_time: Optional[Time] = None, assigned_parent_time: Time = Time(0), + exec_times: dict[GraphNode, Time] | None = None, work_estimator: WorkTimeEstimator = DefaultWorkEstimator()) \ - -> tuple[Time, Time, dict[GraphNode, tuple[Time, Time]]]: + -> tuple[Time, Time, dict[GraphNode, Time]]: """ Looking for an available time slot for given 'GraphNode' @@ -90,6 +91,7 @@ def find_min_start_time_with_additional(self, :param assigned_start_time: start time, that can be received from another algorithms of calculation the earliest start time :param assigned_parent_time: minimum start time + :param exec_times: :param work_estimator: function that calculates execution time of the GraphNode :return: start time, end time, time of execution """ @@ -104,23 +106,14 @@ def apply_time_spec(time: Time) -> Time: max_parent_time: Time = max(apply_time_spec(node.min_start_time(node2swork)), assigned_parent_time) - nodes_max_parent_times = {ins_node: max(apply_time_spec(ins_node.min_start_time(node2swork)), - assigned_parent_time) - for ins_node in inseparable_chain} - # 2. calculating execution time of the task - exec_time: Time = Time(0) - exec_times: dict[GraphNode, tuple[Time, Time]] = {} # node: (lag, exec_time) - for chain_node in inseparable_chain: - node_exec_time: Time = Time(0) if len(chain_node.work_unit.worker_reqs) == 0 else \ - work_estimator.estimate_time(chain_node.work_unit, worker_team) - - lag_req = nodes_max_parent_times[chain_node] - max_parent_time - exec_time - lag = lag_req if lag_req > 0 else 0 + if not exec_times: + exec_times = calculate_exec_times(inseparable_chain, spec, worker_team, work_estimator) - exec_times[chain_node] = lag, node_exec_time - exec_time += lag + node_exec_time + # if spec.assigned_time: + # assert spec.assigned_time == exec_time + exec_time = find_min_time_slot_size(inseparable_chain, node2swork, exec_times, start_time=max_parent_time) if len(worker_team) == 0: max_material_time = self._material_timeline.find_min_material_time(node, max_parent_time, @@ -140,17 +133,28 @@ def apply_time_spec(time: Time) -> Time: found_earliest_time = False while not found_earliest_time: cur_start_time = self._find_min_start_time(self._timeline[contractor_id], inseparable_chain, spec, - cur_start_time, exec_time, worker_team) + node2swork, cur_start_time, exec_times, worker_team) + # TODO Make `self._find_min_start_time` return it? + exec_time = find_min_time_slot_size(inseparable_chain, node2swork, exec_times, + start_time=cur_start_time) + + # assert self._validate(cur_start_time + exec_time, exec_time, worker_team) material_time = self._material_timeline.find_min_material_time(node, cur_start_time, node.work_unit.need_materials()) + + # assert self._validate(cur_start_time + exec_time, exec_time, worker_team) + if material_time > cur_start_time: cur_start_time = material_time continue zone_time = self.zone_timeline.find_min_start_time(node.work_unit.zone_reqs, cur_start_time, exec_time) + + # assert self._validate(cur_start_time + exec_time, exec_time, worker_team) + if zone_time > cur_start_time: cur_start_time = zone_time else: @@ -158,15 +162,19 @@ def apply_time_spec(time: Time) -> Time: st = cur_start_time - self._validate(st + exec_time, exec_time, worker_team) + # assert self._validate(st + exec_time, exec_time, worker_team) + + assert max_parent_time <= st + return st, st + exec_time, exec_times def _find_min_start_time(self, resource_timeline: dict[str, SortedList[ScheduleEvent]], inseparable_chain: list[GraphNode], spec: WorkSpec, + node2swork: dict[GraphNode, ScheduledWork], parent_time: Time, - exec_time: Time, + exec_times: dict[GraphNode, Time], passed_workers: list[Worker]) -> Time: """ Find start time for the whole 'GraphNode' @@ -211,6 +219,8 @@ def _find_min_start_time(self, type2count: dict[str, int] = build_index(passed_workers, lambda w: w.name, lambda w: w.count) + exec_time = find_min_time_slot_size(inseparable_chain, node2swork, exec_times, start_time=start) + i = 0 while len(queue) > 0: @@ -231,6 +241,10 @@ def _find_min_start_time(self, # In this case we need to add back all previously scheduled wreq-s into the queue # to be scheduled again with the new start time (e.g. found start). # This process should reach its termination at least at the very end of this contractor's schedule. + + # recalculate `exec_time` with new `start_time` + exec_time = find_min_time_slot_size(inseparable_chain, node2swork, exec_times, start_time=found_start) + queue.extend(scheduled_wreqs) scheduled_wreqs.clear() @@ -358,8 +372,9 @@ def update_timeline(self, task_index = self._task_index self._task_index += 1 - # experimental logics lightening. debugging showed its efficiency. + # assert self._validate(node, finish_time, exec_time, worker_team) + # experimental logics lightening. debugging showed its efficiency. start = finish_time - exec_time end = finish_time for w in worker_team: @@ -395,17 +410,21 @@ def schedule(self, contractor: Contractor, spec: WorkSpec, assigned_start_time: Optional[Time] = None, - assigned_time: Optional[Time] = None, assigned_parent_time: Time = Time(0), + exec_times: Optional[dict[GraphNode, Time]] = None, work_estimator: WorkTimeEstimator = DefaultWorkEstimator()): inseparable_chain = node.get_inseparable_chain_with_self() start_time, _, exec_times = \ self.find_min_start_time_with_additional(node, workers, node2swork, spec, assigned_start_time, - assigned_parent_time, work_estimator) - if assigned_time is not None: - exec_times = get_exec_times_from_assigned_time_for_chain(inseparable_chain, assigned_time) + assigned_parent_time, exec_times, work_estimator) + + if spec.assigned_time: + assert sum(exec_times.values()) == spec.assigned_time + + max_parent_time: Time = node.min_start_time(node2swork) + + assert start_time >= max_parent_time - # TODO Decide how to deal with exec_times(maybe we should remove using pre-computed exec_times) self._schedule_with_inseparables(node, node2swork, inseparable_chain, spec, workers, contractor, start_time, exec_times) @@ -417,27 +436,40 @@ def _schedule_with_inseparables(self, worker_team: list[Worker], contractor: Contractor, start_time: Time, - exec_times: dict[GraphNode, tuple[Time, Time]], - work_estimator: WorkTimeEstimator = DefaultWorkEstimator()): + exec_times: dict[GraphNode, Time]): # 6. create a schedule entry for the task # nodes_start_times = {ins_node: ins_node.min_start_time(node2swork) for ins_node in inseparable_chain} + for ft in exec_times.values(): + assert ft >= 0 + + def apply_time_spec(time: Time) -> Time: + return max(time, start_time) if start_time is not None else time + + nodes_max_parent_times = {ins_node: apply_time_spec(ins_node.min_start_time(node2swork)) + for ins_node in inseparable_chain} + curr_time = start_time for i, chain_node in enumerate(inseparable_chain): - if chain_node in exec_times: - node_lag, node_time = exec_times[chain_node] - else: - node_lag, node_time = 0, work_estimator.estimate_time(chain_node.work_unit, worker_team) + node_time = exec_times[chain_node] - # lag_req = nodes_start_times[chain_node] - curr_time - # node_lag = lag_req if lag_req > 0 else 0 + lag_req = nodes_max_parent_times[chain_node] - curr_time + node_lag = max(lag_req, 0) start_work = curr_time + node_lag + + # assert self._validate(start_work + node_time, node_time, worker_team), f'{i}' + + # assert self._material_timeline.can_schedule_at_the_moment(chain_node, + # start_work, + # chain_node.work_unit.need_materials()), f'{i}' + deliveries, mat_del_time = self._material_timeline.deliver_resources(chain_node, start_work, chain_node.work_unit.need_materials()) - start_work = max(start_work, mat_del_time) - # self._validate(start_work + node_time, node_time, worker_team) + + assert mat_del_time == start_work + swork = ScheduledWork( work_unit=chain_node.work_unit, start_end_time=(start_work, start_work + node_time), @@ -448,6 +480,10 @@ def _schedule_with_inseparables(self, curr_time = start_work + node_time node2swork[chain_node] = swork + max_parent_time: Time = node.min_start_time(node2swork) + + assert start_time >= max_parent_time + self.update_timeline(curr_time, curr_time - start_time, node, worker_team, spec) zones = [zone_req.to_zone() for zone_req in node.work_unit.zone_reqs] node2swork[node].zones_pre = self.zone_timeline.update_timeline(len(node2swork), zones, start_time, @@ -456,9 +492,9 @@ def _schedule_with_inseparables(self, def _validate(self, finish_time: Time, exec_time: Time, - worker_team: list[Worker]): + worker_team: list[Worker]) -> bool: if exec_time == 0: - return + return True start = finish_time - exec_time end = finish_time @@ -469,6 +505,10 @@ def _validate(self, available_workers_count = state[start_idx - 1].available_workers_count # updating all events in between the start and the end of our current task for event in state[start_idx: end_idx]: - assert event.available_workers_count >= w.count + if not (event.available_workers_count >= w.count): + return False - assert available_workers_count >= w.count + if not (available_workers_count >= w.count): + return False + + return True diff --git a/sampo/scheduler/timeline/utils.py b/sampo/scheduler/timeline/utils.py deleted file mode 100644 index 203a65be..00000000 --- a/sampo/scheduler/timeline/utils.py +++ /dev/null @@ -1,59 +0,0 @@ -from sampo.schemas.graph import GraphNode -from sampo.schemas.time import Time - - -def get_exec_times_from_assigned_time_for_chain(inseparable_chain: list[GraphNode], - assigned_time: Time) -> dict[GraphNode, tuple[Time, Time]]: - """ - Distributes a given total execution time among work nodes in an inseparable chain. - - The time distribution is proportional to each node's volume, ensuring that - the entire `assigned_time` is utilized. Any rounding discrepancies are - allocated to the last node in the chain. - - Args: - inseparable_chain: A list of nodes representing an inseparable sequence of work units. - assigned_time: The total `Time` allocated for the entire chain's execution. - - Returns: - A dictionary mapping each `GraphNode` to a tuple `(lag, node_execution_time)`. - `lag` is always `Time(0)` as the chain is inseparable, and - `node_execution_time` is the calculated execution time for that specific node. - """ - total_volume = sum(n.work_unit.volume for n in inseparable_chain) - - # Handle the edge case where total_volume is zero or negative. - # In this scenario, time is distributed equally among nodes, - # with any remainder allocated to the last node. - if total_volume <= 0: - node_time = assigned_time // len(inseparable_chain) - # Distribute time equally among all but the last node - exec_times = {n: (Time(0), node_time) for n in inseparable_chain[:-1]} - # Assign remaining time to the last node - exec_times[inseparable_chain[-1]] = Time(0), assigned_time - node_time * len(exec_times) - return exec_times - - exec_times: dict[GraphNode, tuple[Time, Time]] = {} - remaining_time = assigned_time # Initialize remaining time to distribute - - # Iterate through all nodes except the last one. - # Time is distributed based on the node's volume relative to the *remaining* total volume. - for i, n in enumerate(inseparable_chain[:-1]): - # Calculate the proportion of the current node's volume to the remaining total volume. - volume_proportion = 0 - if total_volume > 0: - volume_proportion = n.work_unit.volume / total_volume - - # Calculate execution time for the current node and convert to integer. - # This takes a portion of the *remaining* time. - exec_time = int(remaining_time.value * volume_proportion) - exec_times[n] = (Time(0), Time(exec_time)) - - # Deduct the current node's volume and allocated time from the totals - total_volume -= n.work_unit.volume - remaining_time -= exec_time - - # The last node receives all remaining time to account for any rounding errors - # during the distribution to previous nodes. - exec_times[inseparable_chain[-1]] = (Time(0), remaining_time) - return exec_times diff --git a/sampo/scheduler/utils/local_optimization.py b/sampo/scheduler/utils/local_optimization.py index 194e5fd1..a3ac8106 100644 --- a/sampo/scheduler/utils/local_optimization.py +++ b/sampo/scheduler/utils/local_optimization.py @@ -187,8 +187,8 @@ def recalc_schedule(self, # st = timeline.find_min_start_time(node, node_schedule.workers, node2swork_new) # ft = st + node_schedule.get_actual_duration(work_estimator) timeline.schedule(node, node2swork_new, node_schedule.workers, - id2contractor[node_schedule.contractor], work_spec, None, work_spec.assigned_time, - assigned_parent_time, work_estimator) + id2contractor[node_schedule.contractor], work_spec, None, + assigned_parent_time, None, work_estimator) # node_schedule.start_end_time = (st, ft) node2swork_new[node] = node_schedule @@ -289,7 +289,7 @@ def optimize(self, scheduled_works: dict[GraphNode, ScheduledWork], node_order: # candidate_schedule.start_time = my_schedule.start_time break - return self.recalc_schedule(reversed(node_order), contractors, landscape_config, spec, scheduled_works, + return self.recalc_schedule(node_order, contractors, landscape_config, spec, scheduled_works, worker_pool, assigned_parent_time, work_estimator) diff --git a/sampo/scheduler/utils/multi_contractor.py b/sampo/scheduler/utils/multi_contractor.py index 3ca2e91d..b8139936 100644 --- a/sampo/scheduler/utils/multi_contractor.py +++ b/sampo/scheduler/utils/multi_contractor.py @@ -3,6 +3,7 @@ import numpy as np from sampo.scheduler.utils import WorkerContractorPool +from sampo.schemas import GraphNode from sampo.schemas.contractor import Contractor from sampo.schemas.exceptions import NoSufficientContractorError from sampo.schemas.requirements import WorkerReq @@ -44,12 +45,13 @@ def get_worker_borders(agents: WorkerContractorPool, contractor: Contractor, wor def run_contractor_search(contractors: list[Contractor], work_spec: WorkSpec, - runner: Callable[[Contractor], tuple[Time, Time, list[Worker]]]) \ - -> tuple[Time, Time, Contractor, list[Worker]]: + runner: Callable[[Contractor], tuple[Time, Time, dict[GraphNode, Time], list[Worker]]]) \ + -> tuple[Time, Time, dict[GraphNode, Time], Contractor, list[Worker]]: """ Performs the best contractor search. :param contractors: contractors' list + :param work_spec: work spec :param runner: a runner function, should be inner of the calling code. Calculates Tuple[start time, finish time, worker team] from given contractor object. :return: start time, finish time, the best contractor, worker team with the best contractor @@ -59,18 +61,20 @@ def run_contractor_search(contractors: list[Contractor], # optimization metric best_finish_time = Time.inf() best_contractor = None + best_exec_times = None # heuristic: if contractors' finish times are equal, we prefer smaller one best_contractor_size = float('inf') contractors = work_spec.filter_contractors(contractors) for contractor in contractors: - start_time, finish_time, worker_team = runner(contractor) + start_time, finish_time, exec_times, worker_team = runner(contractor) contractor_size = sum(w.count for w in contractor.workers.values()) if not finish_time.is_inf() and (finish_time < best_finish_time or (finish_time == best_finish_time and contractor_size < best_contractor_size)): best_finish_time = finish_time + best_exec_times = exec_times best_contractor = contractor best_contractor_size = contractor_size @@ -78,6 +82,7 @@ def run_contractor_search(contractors: list[Contractor], raise NoSufficientContractorError(f'There is no contractor that can satisfy given search; contractors: ' f'{contractors}') - best_start_time, best_finish_time, best_worker_team = runner(best_contractor) + # TODO Consider removing this line; we shouldn't re-run it if cycle that we made upper is right + best_start_time, best_finish_time, best_exec_times, best_worker_team = runner(best_contractor) - return best_start_time, best_finish_time, best_contractor, best_worker_team + return best_start_time, best_finish_time, best_exec_times, best_contractor, best_worker_team diff --git a/sampo/structurator/base.py b/sampo/structurator/base.py index 444cc668..087df59d 100644 --- a/sampo/structurator/base.py +++ b/sampo/structurator/base.py @@ -192,6 +192,9 @@ def make_new_stage_node(volume_proportion: float, del wu_attrs['name'] # make new work unit for new stage node with updated attributes new_wu = WorkUnit(**wu_attrs) + + # TODO Make sampo work with material reqs in whole inseparable chain? + new_wu.material_reqs = [] # make new graph node for new stage with created work unit and with passed edge to previous stage node return GraphNode(new_wu, edge_with_prev_stage_node) @@ -221,7 +224,7 @@ def match_prev_edges_with_stage_nodes_id(restructuring_edges2new_nodes_id: dict[ # define mapper of requirements attribute names and classes reqs2classes = {'worker_reqs': WorkerReq, 'equipment_reqs': EquipmentReq, - 'object_reqs': ConstructionObjectReq, 'material_reqs': MaterialReq} + 'object_reqs': ConstructionObjectReq} # make mapper of work unit requirement names and copied class object attributes reqs2attrs = {reqs: [dict(req.__dict__) for req in getattr(wu, reqs)] for reqs in reqs2classes} # make mapper of work unit requirement names and accumulated amounts @@ -257,6 +260,9 @@ def match_prev_edges_with_stage_nodes_id(restructuring_edges2new_nodes_id: dict[ # make first stage node and add it to id2new_nodes id2new_nodes[stage_node_id] = make_new_stage_node(accum, [], wu_attrs, reqs2attrs) + # TODO Make sampo work with material reqs in whole inseparable chain? + id2new_nodes[stage_node_id].work_unit.material_reqs = wu.material_reqs + # initialize a list that stores the edges along which a stage node has already been created. # used only if the use_lag_edge_optimization is True, # since otherwise the matching of the edges and new node IDs is trivial diff --git a/sampo/utilities/inseparables.py b/sampo/utilities/inseparables.py new file mode 100644 index 00000000..3ab242f5 --- /dev/null +++ b/sampo/utilities/inseparables.py @@ -0,0 +1,109 @@ +from sampo.schemas import GraphNode, ScheduledWork, Time, Worker, WorkTimeEstimator +from sampo.schemas.schedule_spec import WorkSpec + + +def get_exec_times_from_assigned_time_for_chain(inseparable_chain: list[GraphNode], + assigned_time: Time) -> dict[GraphNode, Time]: + """ + Distributes a given total execution time among work nodes in an inseparable chain. + + The time distribution is proportional to each node's volume, ensuring that + the entire `assigned_time` is utilized. Any rounding discrepancies are + allocated to the last node in the chain. + + Args: + inseparable_chain: A list of nodes representing an inseparable sequence of work units. + assigned_time: The total `Time` allocated for the entire chain's execution. + + Returns: + A dictionary mapping each `GraphNode` to a tuple `(lag, node_execution_time)`. + `lag` is always `Time(0)` as the chain is inseparable, and + `node_execution_time` is the calculated execution time for that specific node. + """ + total_volume = sum(n.work_unit.volume for n in inseparable_chain) + + # Handle the edge case where total_volume is zero or negative. + # In this scenario, time is distributed equally among nodes, + # with any remainder allocated to the last node. + if total_volume <= 0: + node_time = assigned_time // len(inseparable_chain) + # Distribute time equally among all but the last node + exec_times = {n: node_time for n in inseparable_chain[:-1]} + # Assign remaining time to the last node + exec_times[inseparable_chain[-1]] = assigned_time - node_time * len(exec_times) + return exec_times + + exec_times: dict[GraphNode, Time] = {} + remaining_time = assigned_time # Initialize remaining time to distribute + + # Iterate through all nodes except the last one. + # Time is distributed based on the node's volume relative to the *remaining* total volume. + for i, n in enumerate(inseparable_chain[:-1]): + # Calculate the proportion of the current node's volume to the remaining total volume. + volume_proportion = 0 + if total_volume > 0: + volume_proportion = n.work_unit.volume / total_volume + + # Calculate execution time for the current node and convert to integer. + # This takes a portion of the *remaining* time. + exec_time = int(remaining_time.value * volume_proportion) + exec_times[n] = Time(exec_time) + + # Deduct the current node's volume and allocated time from the totals + total_volume -= n.work_unit.volume + remaining_time -= exec_time + + # The last node receives all remaining time to account for any rounding errors + # during the distribution to previous nodes. + exec_times[inseparable_chain[-1]] = remaining_time + return exec_times + + +def find_min_time_slot_size(inseparable_chain: list[GraphNode], + node2swork: dict[GraphNode, ScheduledWork], + exec_times: dict[GraphNode, Time], + start_time: Time) -> Time: + cur_finish_time = start_time + for dep_node in inseparable_chain: + # set start time as finish time of original work + # set finish time as finish time + working time of current node with identical resources + # (the same as in original work) + # set the same workers on it + # TODO Decide where this should be + dep_parent_time = dep_node.min_start_time(node2swork) + + dep_st = max(cur_finish_time, dep_parent_time) + + working_time = exec_times[dep_node] + + cur_finish_time = dep_st + working_time + + return cur_finish_time - start_time + + +def calculate_exec_times(inseparable_chain: list[GraphNode], + spec: WorkSpec, + worker_team: list[Worker], + work_estimator: WorkTimeEstimator) -> dict[GraphNode, Time]: + # TODO Refactor + spec_times = {} + if spec.assigned_time: + spec_times = get_exec_times_from_assigned_time_for_chain(inseparable_chain, spec.assigned_time) + + assert sum(spec_times.values()) == spec.assigned_time + + # 2. calculating execution time of the task + + exec_times: dict[GraphNode, Time] = {} # node: (lag, exec_time) + for chain_node in inseparable_chain: + # node_exec_time: Time = Time(0) if len(chain_node.work_unit.worker_reqs) == 0 else \ + # work_estimator.estimate_time(chain_node.work_unit, worker_team) + if spec.assigned_time: + node_exec_time = spec_times[chain_node] + else: + node_exec_time: Time = Time(0) if len(chain_node.work_unit.worker_reqs) == 0 else \ + work_estimator.estimate_time(chain_node.work_unit, worker_team) + + exec_times[chain_node] = node_exec_time + + return exec_times diff --git a/sampo/utilities/validation.py b/sampo/utilities/validation.py index 9eba876b..f3353a4a 100644 --- a/sampo/utilities/validation.py +++ b/sampo/utilities/validation.py @@ -2,9 +2,9 @@ from operator import attrgetter, itemgetter from sampo.schemas.contractor import Contractor -from sampo.schemas.graph import WorkGraph +from sampo.schemas.graph import WorkGraph, GraphNode from sampo.schemas.schedule import ScheduledWork, Schedule -from sampo.schemas.schedule_spec import ScheduleSpec +from sampo.schemas.schedule_spec import ScheduleSpec, WorkSpec from sampo.schemas.time import Time from sampo.utilities.collections_util import build_index @@ -55,9 +55,22 @@ def _check_parent_dependencies(schedule: Schedule, wg: WorkGraph) -> None: start, end = scheduled_works[node.work_unit.id].start_end_time for pnode in node.parents: pstart, pend = scheduled_works[pnode.work_unit.id].start_end_time + + assert pnode in node.parents_set + + if not (pstart <= pend <= start <= end): + print('cringe') assert pstart <= pend <= start <= end +def _check_swork_workers(swork: ScheduledWork, work_spec: WorkSpec): + assert len(work_spec.assigned_workers) == len(swork.workers) + + for actual_worker in swork.workers: + assert actual_worker.name in work_spec.assigned_workers + assert actual_worker.count == work_spec.assigned_workers[actual_worker.name] + + def _check_all_tasks_corresponds_to_spec(schedule: Schedule, wg: WorkGraph, spec: ScheduleSpec) -> None: scheduled_works: dict[str, ScheduledWork] = {work.id: work for work in schedule.works} @@ -67,7 +80,31 @@ def _check_all_tasks_corresponds_to_spec(schedule: Schedule, wg: WorkGraph, spec if work_spec.contractors: assert work_spec.is_contractor_enabled(scheduled_works[node.id].contractor_id) - # TODO Check other spec entries + # workers + for node in wg.nodes: + work_spec = spec[node.id] + if work_spec.assigned_workers: + for chain_node in node.get_inseparable_chain_with_self(): + _check_swork_workers(scheduled_works[chain_node.id], work_spec) + + # time + for node in wg.nodes: + work_spec = spec[node.id] + if work_spec.assigned_time: + accumulated_time = Time(0) + for chain_node in node.get_inseparable_chain_with_self(): + accumulated_time += scheduled_works[chain_node.id].duration + assert accumulated_time == work_spec.assigned_time, f'{accumulated_time}; {work_spec.assigned_time}' + + # # time + # for node in wg.nodes: + # work_spec = spec[node.id] + # if work_spec.assigned_time: + # chain = node.get_inseparable_chain_with_self() + # start_time = scheduled_works[chain[0].id].start_time + # end_time = scheduled_works[chain[-1].id].finish_time + # + # assert end_time - start_time == work_spec.assigned_time, f'{start_time}, {end_time}; {work_spec.assigned_time}' def _check_all_tasks_have_valid_duration(schedule: Schedule) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 7e8afdaa..b1f1d68e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,6 +11,7 @@ from sampo.scheduler.genetic.base import GeneticScheduler from sampo.scheduler.heft import HEFTScheduler, HEFTBetweenScheduler from sampo.scheduler.topological import TopologicalScheduler +from sampo.schemas import Time from sampo.schemas.contractor import Contractor from sampo.schemas.exceptions import NoSufficientContractorError from sampo.schemas.graph import WorkGraph, EdgeType @@ -145,46 +146,60 @@ def generate_wg_core(request, setup_sampler, setup_simple_synthetic) -> tuple[Wo def create_spec(wg: WorkGraph, contractors: list[Contractor], - rand: Random, - generate_contractor_spec: bool) -> ScheduleSpec: + rand: Random) -> ScheduleSpec: spec = ScheduleSpec() - if generate_contractor_spec: - for node in wg.nodes: - if not node.is_inseparable_son(): - selected_contractor_indices = rand.choices(list(range(len(contractors))), - k=rand.randint(1, len(contractors))) - spec.assign_contractors(node.id, {contractors[i].id for i in selected_contractor_indices}) + # contractors + for node in wg.nodes: + if not node.is_inseparable_son(): + selected_contractor_indices = rand.choices(list(range(len(contractors))), + k=rand.randint(1, len(contractors))) + spec.assign_contractors(node.id, {contractors[i].id for i in selected_contractor_indices}) + + # workers + contractor_min_capabilities = {worker: min(contractor.workers[worker].count + for contractor in contractors) + for worker in contractors[0].workers} + for node in wg.nodes: + if not node.is_inseparable_son(): + worker_dict = {wr.kind: rand.randint(wr.min_count, + min(wr.max_count, contractor_min_capabilities[wr.kind])) + for wr in node.work_unit.worker_reqs} + spec.assign_workers_dict(node.id, worker_dict) + + # time + for node in wg.nodes: + if not node.is_inseparable_son() and not node.work_unit.is_service_unit: + spec.set_exec_time(node.id, Time(rand.randint(1, 10))) return spec # TODO Make parametrization with different(specialized) contractors -@fixture(params=[(i, 5 * j, generate_contractors_spec) - for j in range(2) - for i in range(2, 3) - for generate_contractors_spec in [True, False]], - ids=[f'Contractors: count={i}, min_size={5 * j}, generate_contractor_spec={generate_contractors_spec}' - for j in range(2) - for i in range(2, 3) - for generate_contractors_spec in [True, False]], +@fixture(params=[(i, 5 * j, generate_spec) + for j in [1, 2, 4] + for i in [2] + for generate_spec in [True, False]], + ids=[f'Contractors: count={i}, min_size={5 * j}, generate_spec={generate_spec}' + for j in [1, 2, 4] + for i in [2] + for generate_spec in [True, False]], scope='module') def setup_scheduler_parameters(request, setup_wg_with_random, setup_simple_synthetic) \ -> tuple[WorkGraph, list[Contractor], LandscapeConfiguration | Any, ScheduleSpec, Random]: - num_contractors, contractor_min_resources, generate_contractors_spec = request.param + num_contractors, contractor_size, generate_spec = request.param wg, rand = setup_wg_with_random generate_landscape = False materials = [material for node in wg.nodes for material in node.work_unit.need_materials()] if len(materials) > 0: generate_landscape = True - resource_req: Dict[str, int] = {} - resource_req_count: Dict[str, int] = {} + resource_req: dict[str, int] = {} + resource_req_count: dict[str, int] = {} for node in wg.nodes: for req in node.work_unit.worker_reqs: - resource_req[req.kind] = max(contractor_min_resources, - resource_req.get(req.kind, 0) + (req.min_count + req.max_count) // 2) + resource_req[req.kind] = resource_req.get(req.kind, 0) + (req.min_count + req.max_count) // 2 resource_req_count[req.kind] = resource_req_count.get(req.kind, 0) + 1 for req in resource_req.keys(): @@ -201,14 +216,14 @@ def setup_scheduler_parameters(request, setup_wg_with_random, setup_simple_synth contractors.append(Contractor(id=contractor_id, name='OOO Berezka', workers={ - name: Worker(str(uuid4()), name, count * 100, contractor_id=contractor_id) + name: Worker(str(uuid4()), name, count, contractor_id=contractor_id) for name, count in resource_req.items()}, equipments={})) landscape = setup_simple_synthetic.synthetic_landscape(wg) \ if generate_landscape else LandscapeConfiguration() - spec = create_spec(wg, contractors, rand, generate_contractors_spec) + spec = create_spec(wg, contractors, rand) if generate_spec else ScheduleSpec() return wg, contractors, landscape, spec, rand @@ -261,7 +276,7 @@ def setup_schedule(setup_scheduler, setup_scheduler_parameters): return scheduler.schedule(setup_wg, setup_contractors, spec=spec, - validate=False, + validate=True, landscape=landscape)[0], scheduler.scheduler_type, setup_scheduler_parameters except NoSufficientContractorError: pytest.skip('Given contractor configuration can\'t support given work graph') diff --git a/tests/scheduler/material_scheduling_test.py b/tests/scheduler/material_scheduling_test.py index ae1149dd..598c45f8 100644 --- a/tests/scheduler/material_scheduling_test.py +++ b/tests/scheduler/material_scheduling_test.py @@ -82,8 +82,8 @@ def test_empty_node_find_start_time(setup_default_schedules): # raise AssertionError(f'Scheduler {scheduler} failed validation', e) -def test_momentum_scheduling_with_materials(setup_default_schedules): - setup_wg, setup_contractors, landscape, spec, _ = setup_default_schedules[0] +def test_momentum_scheduling_with_materials(setup_scheduler_parameters): + setup_wg, setup_contractors, landscape, spec, _ = setup_scheduler_parameters if setup_wg.vertex_count > 14: pytest.skip('Non-material graph') diff --git a/tests/scheduler/timeline/just_in_time_timeline_test.py b/tests/scheduler/timeline/just_in_time_timeline_test.py index ada32d61..8233fca0 100644 --- a/tests/scheduler/timeline/just_in_time_timeline_test.py +++ b/tests/scheduler/timeline/just_in_time_timeline_test.py @@ -56,7 +56,7 @@ def test_schedule(setup_timeline): nodes, node_id2parent_ids, node_id2child_ids = get_head_nodes_with_connections_mappings(setup_wg) ordered_nodes = prioritization(nodes, node_id2parent_ids, node_id2child_ids, DefaultWorkEstimator()) - node = ordered_nodes[-1] + node = ordered_nodes[0] reqs = build_index(node.work_unit.worker_reqs, attrgetter('kind')) worker_team = [list(cont2worker.values())[0].copy() for name, cont2worker in setup_worker_pool.items() if name in reqs] @@ -64,7 +64,7 @@ def test_schedule(setup_timeline): contractor_index = build_index(setup_contractors, attrgetter('id')) contractor = contractor_index[worker_team[0].contractor_id] if worker_team else None - node2swork: Dict[GraphNode, ScheduledWork] = {} + node2swork: dict[GraphNode, ScheduledWork] = {} setup_timeline.schedule(node, node2swork, worker_team, contractor, WorkSpec()) assert len(node2swork) == 1 diff --git a/tests/scheduler/timeline/material_timeline_test.py b/tests/scheduler/timeline/material_timeline_test.py index 5027d9c1..0795a29d 100644 --- a/tests/scheduler/timeline/material_timeline_test.py +++ b/tests/scheduler/timeline/material_timeline_test.py @@ -35,6 +35,37 @@ def test_supply_resources(setup_scheduler_parameters, setup_rand): delivery, delivery_time = timeline.deliver_resources(node, deadline, materials) + # FIXME WTF? should be delivery_time <= deadline... assert delivery_time >= deadline assert not delivery_time.is_inf() + + +def test_timeline_consistency(setup_scheduler_parameters, setup_rand): + wg, contractors, landscape, _, _ = setup_scheduler_parameters + if not landscape.platforms: + pytest.skip('Non-landscape test') + timeline = HybridSupplyTimeline(landscape) + + nodes, node_id2parent_ids, node_id2child_ids = get_head_nodes_with_connections_mappings(wg) + ordered_nodes = prioritization(nodes, node_id2parent_ids, node_id2child_ids, DefaultWorkEstimator()) + for node in ordered_nodes: + if node.work_unit.is_service_unit: + continue + node.platform = landscape.platforms[setup_rand.randint(0, len(landscape.platforms) - 1)] + + delivery_time = Time(0) + for node in ordered_nodes: + if node.work_unit.is_service_unit: + continue + materials = node.work_unit.need_materials() + deadline = delivery_time + + min_material_time = timeline.find_min_material_time(node, deadline, materials) + + delivery, delivery_time = timeline.deliver_resources(node, + min_material_time, + materials) + assert delivery_time == min_material_time + + assert not delivery_time.is_inf() diff --git a/tests/scheduler/timeline/zone_timeline_test.py b/tests/scheduler/timeline/zone_timeline_test.py index d5d16223..37eb5ba7 100644 --- a/tests/scheduler/timeline/zone_timeline_test.py +++ b/tests/scheduler/timeline/zone_timeline_test.py @@ -14,13 +14,18 @@ def setup_zoned_wg(setup_rand, setup_simple_synthetic) -> WorkGraph: wg = setup_simple_synthetic.work_graph(mode=SyntheticGraphType.PARALLEL, top_border=100) for node in wg.nodes: - node.work_unit.zone_reqs.append(ZoneReq(kind='zone1', required_status=setup_rand.randint(1, 2))) + if not node.work_unit.is_service_unit: + node.work_unit.zone_reqs.append(ZoneReq(kind='zone1', required_status=setup_rand.randint(1, 2))) return wg -@fixture(params=[(costs_mode, start_status_mode) for start_status_mode in range(3) for costs_mode in range(2)], - ids=[f'Costs mode: {costs_mode}, start status mode: {start_status_mode}' for start_status_mode in range(3) for costs_mode in range(2)]) +@fixture(params=[(costs_mode, start_status_mode) + for start_status_mode in range(3) + for costs_mode in range(2)], + ids=[f'Costs mode: {costs_mode}, start status mode: {start_status_mode}' + for start_status_mode in range(3) + for costs_mode in range(2)]) def setup_landscape_config(request) -> LandscapeConfiguration: costs_mode, start_status_mode = request.param diff --git a/tests/serialization_test.py b/tests/serialization_test.py index feb64891..91f6d27b 100644 --- a/tests/serialization_test.py +++ b/tests/serialization_test.py @@ -44,15 +44,15 @@ def setup_core_resources(request): @pytest.fixture(scope='function') -def setup_inherited_resources(request, setup_wg, setup_schedule): - schedule, _, _ = setup_schedule +def setup_inherited_resources(request, setup_schedule): + schedule, _, (setup_wg, _, _, _, _) = setup_schedule return { 'work_graph': setup_wg, 'schedule': schedule } -def perform_generalized_serializable_test(resource: S, name: str = None, verbose: bool = True) -> S: +def perform_generalized_serializable_test(resource: S, name: str = None, verbose: bool = False) -> S: serialized = resource._serialize() new_resource = type(resource)._deserialize(serialized) if verbose: