From 8e2666522f82186ba7803d713b8efca236f241b3 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Fri, 29 Nov 2019 22:54:14 +0100 Subject: [PATCH 01/27] improved scheduling by using quantize for clustering --- lapis/monitor/general.py | 2 +- lapis/scheduler.py | 109 +++++++++++++++++++++++---------------- pyproject.toml | 2 +- 3 files changed, 67 insertions(+), 46 deletions(-) diff --git a/lapis/monitor/general.py b/lapis/monitor/general.py index be6d24d..f03a8ea 100644 --- a/lapis/monitor/general.py +++ b/lapis/monitor/general.py @@ -89,7 +89,7 @@ def job_statistics(scheduler: CondorJobScheduler) -> List[Dict]: :return: list of records for logging """ result = 0 - for cluster in scheduler.drone_cluster.copy(): + for cluster in (scheduler.drone_cluster.copy()).values(): for drone in cluster: result += drone.jobs return [ diff --git a/lapis/scheduler.py b/lapis/scheduler.py index dc564b9..5dac2ca 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,4 +1,7 @@ from typing import Dict + +from classad._functions import quantize +from classad._primitives import HTCInt from usim import Scope, interval, Resources from lapis.drone import Drone @@ -9,6 +12,13 @@ class JobQueue(list): pass +quantization_defaults = { + "memory": HTCInt(128 * 1024 * 1024), + "disk": HTCInt(1024 * 1024), + "cores": HTCInt(1), +} + + class CondorJobScheduler(object): """ Goal of the htcondor job scheduler is to have a scheduler that somehow @@ -28,7 +38,7 @@ class CondorJobScheduler(object): def __init__(self, job_queue): self._stream_queue = job_queue - self.drone_cluster = [] + self.drone_cluster = {} self.interval = 60 self.job_queue = JobQueue() self._collecting = True @@ -36,7 +46,7 @@ def __init__(self, job_queue): @property def drone_list(self): - for cluster in self.drone_cluster: + for cluster in self.drone_cluster.values(): for drone in cluster: yield drone @@ -44,41 +54,33 @@ def register_drone(self, drone: Drone): self._add_drone(drone) def unregister_drone(self, drone: Drone): - for cluster in self.drone_cluster: + for key in self.drone_cluster: try: - cluster.remove(drone) + self.drone_cluster[key].remove(drone) except ValueError: pass else: - if len(cluster) == 0: - self.drone_cluster.remove(cluster) + break + else: + # nothing was removed + return + if len(self.drone_cluster[key]) == 0: + del self.drone_cluster[key] + + def _clustering_key(self, resource_dict: Dict): + clustering_key = [] + for key, value in resource_dict.items(): + clustering_key.append( + int(quantize(value, quantization_defaults.get(key, 1))) + ) + return tuple(clustering_key) def _add_drone(self, drone: Drone, drone_resources: Dict = None): - minimum_distance_cluster = None - distance = float("Inf") - if len(self.drone_cluster) > 0: - for cluster in self.drone_cluster: - current_distance = 0 - for key in {*cluster[0].pool_resources, *drone.pool_resources}: - if drone_resources: - current_distance += abs( - cluster[0].theoretical_available_resources.get(key, 0) - - drone_resources.get(key, 0) - ) - else: - current_distance += abs( - cluster[0].theoretical_available_resources.get(key, 0) - - drone.theoretical_available_resources.get(key, 0) - ) - if current_distance < distance: - minimum_distance_cluster = cluster - distance = current_distance - if distance < 1: - minimum_distance_cluster.append(drone) - else: - self.drone_cluster.append([drone]) + if drone_resources: + clustering_key = self._clustering_key(drone_resources) else: - self.drone_cluster.append([drone]) + clustering_key = self._clustering_key(drone.theoretical_available_resources) + self.drone_cluster.setdefault(clustering_key, []).append(drone) def update_drone(self, drone: Drone): self.unregister_drone(drone) @@ -88,19 +90,25 @@ async def run(self): async with Scope() as scope: scope.do(self._collect_jobs()) async for _ in interval(self.interval): + job_drone_mapping = {} for job in self.job_queue: - best_match = self._schedule_job(job) + job_key = self._clustering_key(job.resources) + try: + drone_key = job_drone_mapping[job_key] + if drone_key is None: + continue + best_match = self._schedule_job( + job, self.drone_cluster[drone_key] + ) + except KeyError: + best_match = self._schedule_job(job) if best_match: - await best_match.schedule_job(job) - self.job_queue.remove(job) - await sampling_required.put(self.job_queue) - self.unregister_drone(best_match) - left_resources = best_match.theoretical_available_resources - left_resources = { - key: value - job.resources.get(key, 0) - for key, value in left_resources.items() - } - self._add_drone(best_match, left_resources) + job_drone_mapping[job_key] = self._clustering_key( + best_match.theoretical_available_resources + ) + await self._execute_job(job, best_match) + else: + job_drone_mapping[job_key] = None if ( not self._collecting and not self.job_queue @@ -109,6 +117,17 @@ async def run(self): break await sampling_required.put(self) + async def _execute_job(self, job, drone): + await drone.schedule_job(job) + self.job_queue.remove(job) + await sampling_required.put(self.job_queue) + self.unregister_drone(drone) + left_resources = { + key: value - job.resources.get(key, 0) + for key, value in drone.theoretical_available_resources.items() + } + self._add_drone(drone, left_resources) + async def _collect_jobs(self): async for job in self._stream_queue: self.job_queue.append(job) @@ -121,11 +140,13 @@ async def job_finished(self, job): if job.successful: await self._processing.decrease(jobs=1) else: - await self._stream_queue.put(job) + self.job_queue.append(job) - def _schedule_job(self, job) -> Drone: + def _schedule_job(self, job, cluster=None) -> Drone: priorities = {} - for cluster in self.drone_cluster: + if cluster and len(cluster) > 0: + return cluster[0] + for cluster in self.drone_cluster.values(): drone = cluster[0] cost = 0 resources = drone.theoretical_available_resources diff --git a/pyproject.toml b/pyproject.toml index 3217107..85fdc59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", ] -requires = ["cobald", "usim == 0.4", "click"] +requires = ["cobald", "usim == 0.4", "click", "classad"] [tool.flit.metadata.requires-extra] test = [ From 3935d4a47f0c8507691f4e3f41e3064ce3a01ef5 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Thu, 12 Dec 2019 18:08:08 +0100 Subject: [PATCH 02/27] job as weakref and fixed small typo --- lapis/job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lapis/job.py b/lapis/job.py index c4627e0..ba50f22 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -24,6 +24,7 @@ class Job(object): "_name", "drone", "_success", + "__weakref__", ) def __init__( @@ -83,7 +84,7 @@ def successful(self) -> Optional[bool]: def waiting_time(self) -> float: """ The time the job spent in the simulators scheduling queue. `Inf` when - the job is still waitiing. + the job is still waiting. :return: Time in queue """ From ac5ee4aefd69fe444b8cad71840eafbde322b5f2 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Thu, 12 Dec 2019 18:10:06 +0100 Subject: [PATCH 03/27] adapted scheduler to work with classads --- lapis/monitor/general.py | 5 +- lapis/scheduler.py | 223 ++++++++++++++++++++++++++------------- 2 files changed, 151 insertions(+), 77 deletions(-) diff --git a/lapis/monitor/general.py b/lapis/monitor/general.py index f03a8ea..78919fc 100644 --- a/lapis/monitor/general.py +++ b/lapis/monitor/general.py @@ -89,9 +89,8 @@ def job_statistics(scheduler: CondorJobScheduler) -> List[Dict]: :return: list of records for logging """ result = 0 - for cluster in (scheduler.drone_cluster.copy()).values(): - for drone in cluster: - result += drone.jobs + for drone in scheduler.drone_list: + result += drone.jobs return [ { "pool_configuration": "None", diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 5dac2ca..9fe50d9 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,10 +1,14 @@ -from typing import Dict +from typing import Dict, Union +from weakref import WeakKeyDictionary +from classad import parse from classad._functions import quantize from classad._primitives import HTCInt +from classad._expression import ClassAd from usim import Scope, interval, Resources from lapis.drone import Drone +from lapis.job import Job from lapis.monitor import sampling_required @@ -19,6 +23,49 @@ class JobQueue(list): } +class WrappedClassAd(ClassAd): + + __slots__ = "_wrapped" + + _wrapped: Union[Job, Drone] + + def __init__(self, classad: ClassAd, wrapped: Union[Job, Drone]): + super(WrappedClassAd, self).__init__() + self._wrapped = wrapped + self._data = classad._data + + def __getitem__(self, item): + def access_wrapped(name, requested=True): + if isinstance(self._wrapped, Drone): + if requested: + return self._wrapped.theoretical_available_resources[name] + return self._wrapped.available_resources[name] + if requested: + return self._wrapped.resources[name] + return self._wrapped.used_resources[name] + + if "target" not in item: + if "requestcpus" in item: + return access_wrapped("cores", requested=True) + elif "requestmemory" in item: + return 0.000000953674316 * access_wrapped("memory", requested=True) + elif "requestdisk" in item: + return 0.0009765625 * access_wrapped("disk", requested=True) + elif "cpus" in item: + return access_wrapped("cores", requested=False) + elif "memory" in item: + return 0.000001 * access_wrapped("memory", requested=False) + elif "disk" in item: + return 0.0009765625 * access_wrapped("disk", requested=False) + return super(WrappedClassAd, self).__getitem__(item) + + def __repr__(self): + return f"<{self.__class__.__name__}>: {self._wrapped}" + + def __eq__(self, other): + return super().__eq__(other) and self._wrapped == other._wrapped + + class CondorJobScheduler(object): """ Goal of the htcondor job scheduler is to have a scheduler that somehow @@ -39,24 +86,41 @@ class CondorJobScheduler(object): def __init__(self, job_queue): self._stream_queue = job_queue self.drone_cluster = {} + self.job_cluster = {} # TODO: should be sorted self.interval = 60 self.job_queue = JobQueue() self._collecting = True self._processing = Resources(jobs=0) + # temporary solution + self._wrapped_classads = WeakKeyDictionary() + self._machine_classad = parse( + """ + requirements = target.requestcpus > my.cpus + """ + ) + self._job_classad = parse( + """ + requirements = my.requestcpus <= target.cpus && my.requestmemory <= target.memory + """ + ) + @property def drone_list(self): for cluster in self.drone_cluster.values(): for drone in cluster: - yield drone + yield drone._wrapped def register_drone(self, drone: Drone): - self._add_drone(drone) + wrapped_drone = WrappedClassAd(classad=self._machine_classad, wrapped=drone) + self._wrapped_classads[drone] = wrapped_drone + self._add_drone(wrapped_drone) def unregister_drone(self, drone: Drone): + drone_wrapper = self._wrapped_classads[drone] for key in self.drone_cluster: try: - self.drone_cluster[key].remove(drone) + self.drone_cluster[key].remove(drone_wrapper) except ValueError: pass else: @@ -75,40 +139,77 @@ def _clustering_key(self, resource_dict: Dict): ) return tuple(clustering_key) - def _add_drone(self, drone: Drone, drone_resources: Dict = None): + def _add_drone(self, drone: WrappedClassAd, drone_resources: Dict = None): + wrapped_drone = drone._wrapped if drone_resources: clustering_key = self._clustering_key(drone_resources) else: - clustering_key = self._clustering_key(drone.theoretical_available_resources) + # TODO: I think this should be available_resources + clustering_key = self._clustering_key( + wrapped_drone.theoretical_available_resources + ) self.drone_cluster.setdefault(clustering_key, []).append(drone) def update_drone(self, drone: Drone): self.unregister_drone(drone) - self._add_drone(drone) + self._add_drone(self._wrapped_classads[drone]) + + def _sort_drone_cluster(self): + return [[list(drones) for drones in self.drone_cluster.values()]] + + def _sort_job_cluster(self): + return list(self.job_cluster.values()) async def run(self): + def filter_drones(job, drone_bucket): + result = {} + for drones in drone_bucket: + drone = drones[0] + filtered = job.evaluate("requirements", my=job, target=drone) + if filtered: + rank = job.evaluate("rank", my=job, target=drone) + result.setdefault(rank, []).append(drones) + return result + + def pop_first(ranked_drones: Dict): + keys = sorted(ranked_drones.keys()) + if len(keys) == 0: + return None + values = ranked_drones.get(keys[0]) + result = values[0] + values.remove(result) + if len(values) == 0: + del ranked_drones[keys[0]] + return result[0] + async with Scope() as scope: scope.do(self._collect_jobs()) async for _ in interval(self.interval): - job_drone_mapping = {} - for job in self.job_queue: - job_key = self._clustering_key(job.resources) - try: - drone_key = job_drone_mapping[job_key] - if drone_key is None: - continue - best_match = self._schedule_job( - job, self.drone_cluster[drone_key] - ) - except KeyError: - best_match = self._schedule_job(job) - if best_match: - job_drone_mapping[job_key] = self._clustering_key( - best_match.theoretical_available_resources - ) - await self._execute_job(job, best_match) - else: - job_drone_mapping[job_key] = None + # TODO: get sorted job cluster [{Job, ...}, ...] + # TODO: get set of drone cluster {{PSlot, ...}, ...} + # TODO: get sorted drone clusters PreJob [{{PSlot, ...}, ...}, ...] + # TODO: filter (Job.Requirements) and sort (Job.Rank) for job and drones => lazy + + all_drone_buckets = self._sort_drone_cluster().copy() + filtered_drones = {} + current_drone_bucket = 0 + for jobs in self._sort_job_cluster().copy(): + for job in jobs: + best_match = pop_first(filtered_drones) + while best_match is None: + # lazily evaluate more PSlots + try: + # TODO: sort filtered_drones + filtered_drones = filter_drones( + job, all_drone_buckets[current_drone_bucket] + ) + except IndexError: + break + current_drone_bucket += 1 + best_match = pop_first(filtered_drones) + else: + # TODO: update drone and check if it gets reinserted to filtered_drones + await self._execute_job(job=job, drone=best_match) if ( not self._collecting and not self.job_queue @@ -117,22 +218,33 @@ async def run(self): break await sampling_required.put(self) - async def _execute_job(self, job, drone): - await drone.schedule_job(job) + async def _execute_job(self, job: WrappedClassAd, drone: WrappedClassAd): + wrapped_job = job._wrapped + wrapped_drone = drone._wrapped + await wrapped_drone.schedule_job(wrapped_job) self.job_queue.remove(job) + cluster_key = self._clustering_key(wrapped_job.resources) + self.job_cluster[cluster_key].remove(job) + if len(self.job_cluster[cluster_key]) == 0: + del self.job_cluster[cluster_key] await sampling_required.put(self.job_queue) - self.unregister_drone(drone) + self.unregister_drone(wrapped_drone) left_resources = { - key: value - job.resources.get(key, 0) - for key, value in drone.theoretical_available_resources.items() + key: value - wrapped_job.resources.get(key, 0) + for key, value in wrapped_drone.theoretical_available_resources.items() } self._add_drone(drone, left_resources) async def _collect_jobs(self): async for job in self._stream_queue: - self.job_queue.append(job) + wrapped_job = WrappedClassAd(classad=self._job_classad, wrapped=job) + self._wrapped_classads[job] = wrapped_job + self.job_queue.append(wrapped_job) + cluster_key = self._clustering_key(job.resources) + self.job_cluster.setdefault(cluster_key, []).append(wrapped_job) await self._processing.increase(jobs=1) # TODO: logging happens with each job + # TODO: job queue to the outside now contains wrapped classads... await sampling_required.put(self.job_queue) self._collecting = False @@ -140,45 +252,8 @@ async def job_finished(self, job): if job.successful: await self._processing.decrease(jobs=1) else: - self.job_queue.append(job) - - def _schedule_job(self, job, cluster=None) -> Drone: - priorities = {} - if cluster and len(cluster) > 0: - return cluster[0] - for cluster in self.drone_cluster.values(): - drone = cluster[0] - cost = 0 - resources = drone.theoretical_available_resources - for resource_type in job.resources: - if resources.get(resource_type, 0) < job.resources[resource_type]: - # Inf for all job resources that a drone does not support - # and all resources that are too small to even be considered - cost = float("Inf") - break - else: - try: - cost += 1 / ( - resources[resource_type] // job.resources[resource_type] - ) - except KeyError: - pass - for additional_resource_type in [ - key for key in drone.pool_resources if key not in job.resources - ]: - cost += resources[additional_resource_type] - cost /= len((*job.resources, *drone.pool_resources)) - if cost <= 1: - # directly start job - return drone - try: - priorities[cost].append(drone) - except KeyError: - priorities[cost] = [drone] - try: - minimal_key = min(priorities) - if minimal_key < float("Inf"): - return priorities[minimal_key][0] - except ValueError: - pass - return None + self.job_queue.append(self._wrapped_classads[job]) + cluster_key = self._clustering_key(job.resources) + self.job_cluster.setdefault(cluster_key, []).append( + self._wrapped_classads[job] + ) From dd63690bb151e28364cccfb9f8c36c8c62395153 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Mon, 16 Dec 2019 15:58:10 +0100 Subject: [PATCH 04/27] removed redundant update of drone at scheduler --- lapis/drone.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lapis/drone.py b/lapis/drone.py index 48142c6..15da85b 100644 --- a/lapis/drone.py +++ b/lapis/drone.py @@ -143,7 +143,6 @@ async def _run_job(self, job: Job, kill: bool): except KeyError: # check is not relevant if the data is not stored pass - self.scheduler.update_drone(self) await job_execution.done except ResourcesUnavailable: await instant From b9a3d0a9ba2d63d41cac515f5edcabd9a067caf1 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Mon, 16 Dec 2019 16:01:28 +0100 Subject: [PATCH 05/27] current implementation of scheduling with classads --- lapis/scheduler.py | 90 +++++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 40 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 9fe50d9..a89d931 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,9 +1,9 @@ -from typing import Dict, Union +from typing import Dict, Iterator, Union, Tuple, List, TypeVar, Generic, Optional from weakref import WeakKeyDictionary from classad import parse from classad._functions import quantize -from classad._primitives import HTCInt +from classad._primitives import HTCInt, Undefined from classad._expression import ClassAd from usim import Scope, interval, Resources @@ -22,14 +22,14 @@ class JobQueue(list): "cores": HTCInt(1), } +DJ = TypeVar("DJ", Drone, Job) -class WrappedClassAd(ClassAd): - __slots__ = "_wrapped" +class WrappedClassAd(ClassAd, Generic[DJ]): - _wrapped: Union[Job, Drone] + __slots__ = "_wrapped" - def __init__(self, classad: ClassAd, wrapped: Union[Job, Drone]): + def __init__(self, classad: ClassAd, wrapped: DJ): super(WrappedClassAd, self).__init__() self._wrapped = wrapped self._data = classad._data @@ -37,9 +37,7 @@ def __init__(self, classad: ClassAd, wrapped: Union[Job, Drone]): def __getitem__(self, item): def access_wrapped(name, requested=True): if isinstance(self._wrapped, Drone): - if requested: - return self._wrapped.theoretical_available_resources[name] - return self._wrapped.available_resources[name] + return self._wrapped.theoretical_available_resources[name] if requested: return self._wrapped.resources[name] return self._wrapped.used_resources[name] @@ -48,15 +46,15 @@ def access_wrapped(name, requested=True): if "requestcpus" in item: return access_wrapped("cores", requested=True) elif "requestmemory" in item: - return 0.000000953674316 * access_wrapped("memory", requested=True) + return (1 / 1024 / 1024) * access_wrapped("memory", requested=True) elif "requestdisk" in item: - return 0.0009765625 * access_wrapped("disk", requested=True) + return (1 / 1024) * access_wrapped("disk", requested=True) elif "cpus" in item: return access_wrapped("cores", requested=False) elif "memory" in item: - return 0.000001 * access_wrapped("memory", requested=False) + return (1 / 1000 / 1000) * access_wrapped("memory", requested=False) elif "disk" in item: - return 0.0009765625 * access_wrapped("disk", requested=False) + return (1 / 1024) * access_wrapped("disk", requested=False) return super(WrappedClassAd, self).__getitem__(item) def __repr__(self): @@ -66,6 +64,14 @@ def __eq__(self, other): return super().__eq__(other) and self._wrapped == other._wrapped +class Cluster(List[WrappedClassAd[DJ]], Generic[DJ]): + pass + + +class Bucket(List[Cluster[DJ]], Generic[DJ]): + pass + + class CondorJobScheduler(object): """ Goal of the htcondor job scheduler is to have a scheduler that somehow @@ -85,8 +91,8 @@ class CondorJobScheduler(object): def __init__(self, job_queue): self._stream_queue = job_queue - self.drone_cluster = {} - self.job_cluster = {} # TODO: should be sorted + self.drone_cluster: Dict[Tuple[float, ...], Cluster[WrappedClassAd[Drone]]] = {} + self.job_cluster: Dict[Tuple[float, ...], Cluster[WrappedClassAd[Job]]] = {} self.interval = 60 self.job_queue = JobQueue() self._collecting = True @@ -96,7 +102,7 @@ def __init__(self, job_queue): self._wrapped_classads = WeakKeyDictionary() self._machine_classad = parse( """ - requirements = target.requestcpus > my.cpus + requirements = target.requestcpus <= my.cpus """ ) self._job_classad = parse( @@ -106,7 +112,7 @@ def __init__(self, job_queue): ) @property - def drone_list(self): + def drone_list(self) -> Iterator[Drone]: for cluster in self.drone_cluster.values(): for drone in cluster: yield drone._wrapped @@ -131,7 +137,8 @@ def unregister_drone(self, drone: Drone): if len(self.drone_cluster[key]) == 0: del self.drone_cluster[key] - def _clustering_key(self, resource_dict: Dict): + @staticmethod + def _clustering_key(resource_dict: Dict): clustering_key = [] for key, value in resource_dict.items(): clustering_key.append( @@ -144,43 +151,46 @@ def _add_drone(self, drone: WrappedClassAd, drone_resources: Dict = None): if drone_resources: clustering_key = self._clustering_key(drone_resources) else: - # TODO: I think this should be available_resources - clustering_key = self._clustering_key( - wrapped_drone.theoretical_available_resources - ) - self.drone_cluster.setdefault(clustering_key, []).append(drone) + clustering_key = self._clustering_key(wrapped_drone.available_resources) + self.drone_cluster.setdefault(clustering_key, Cluster()).append(drone) def update_drone(self, drone: Drone): self.unregister_drone(drone) self._add_drone(self._wrapped_classads[drone]) def _sort_drone_cluster(self): - return [[list(drones) for drones in self.drone_cluster.values()]] + return [Bucket(self.drone_cluster.values())] def _sort_job_cluster(self): - return list(self.job_cluster.values()) + return Bucket(self.job_cluster.values()) async def run(self): - def filter_drones(job, drone_bucket): - result = {} + def filter_drones(job: WrappedClassAd[Job], drone_bucket: Bucket[Drone]): + result = {} # type: Dict[Union[Undefined, float], Bucket[Drone]] for drones in drone_bucket: - drone = drones[0] - filtered = job.evaluate("requirements", my=job, target=drone) - if filtered: + drone = drones[0] # type: WrappedClassAd[Drone] + if job.evaluate( + "requirements", my=job, target=drone + ) and drone.evaluate("requirements", my=drone, target=job): rank = job.evaluate("rank", my=job, target=drone) - result.setdefault(rank, []).append(drones) + result.setdefault(rank, Bucket()).append(drones) return result - def pop_first(ranked_drones: Dict): - keys = sorted(ranked_drones.keys()) - if len(keys) == 0: + def pop_first( + ranked_drones: Dict[Union[Undefined, float], Bucket[Drone]] + ) -> Optional[WrappedClassAd[Drone]]: + if not ranked_drones: return None - values = ranked_drones.get(keys[0]) + key = sorted(ranked_drones)[0] + values = ranked_drones[key] result = values[0] values.remove(result) - if len(values) == 0: - del ranked_drones[keys[0]] - return result[0] + if not values: + del ranked_drones[key] + try: + return result[0] + except IndexError: + return pop_first(ranked_drones) async with Scope() as scope: scope.do(self._collect_jobs()) @@ -190,10 +200,10 @@ def pop_first(ranked_drones: Dict): # TODO: get sorted drone clusters PreJob [{{PSlot, ...}, ...}, ...] # TODO: filter (Job.Requirements) and sort (Job.Rank) for job and drones => lazy - all_drone_buckets = self._sort_drone_cluster().copy() + all_drone_buckets = self._sort_drone_cluster() filtered_drones = {} - current_drone_bucket = 0 for jobs in self._sort_job_cluster().copy(): + current_drone_bucket = 0 for job in jobs: best_match = pop_first(filtered_drones) while best_match is None: From 10df7bcba9ad4707f5bea5f710255c07499bb974 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Fri, 31 Jan 2020 14:13:50 +0100 Subject: [PATCH 06/27] added base class for job scheduler --- lapis/scheduler.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index a89d931..f8ec109 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,3 +1,4 @@ +from abc import ABC from typing import Dict, Iterator, Union, Tuple, List, TypeVar, Generic, Optional from weakref import WeakKeyDictionary @@ -72,7 +73,30 @@ class Bucket(List[Cluster[DJ]], Generic[DJ]): pass -class CondorJobScheduler(object): +class JobScheduler(ABC): + __slots__ = () + + @property + def drone_list(self) -> Iterator[Drone]: + raise NotImplementedError + + def register_drone(self, drone: Drone): + raise NotImplementedError + + def unregister_drone(self, drone: Drone): + raise NotImplementedError + + def update_drone(self, drone: Drone): + raise NotImplementedError + + async def run(self): + raise NotImplementedError + + async def job_finished(self, job): + raise NotImplementedError + + +class CondorJobScheduler(JobScheduler): """ Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor does schedule jobs. From 2f8bec63afe4b7a8f07c310658a7f2b967764201 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Fri, 31 Jan 2020 14:51:28 +0100 Subject: [PATCH 07/27] added older scheduler and renamed classad scheduler to CondorClassadJobScheduler --- lapis/scheduler.py | 155 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 154 insertions(+), 1 deletion(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index f8ec109..ec027e6 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -96,7 +96,160 @@ async def job_finished(self, job): raise NotImplementedError -class CondorJobScheduler(JobScheduler): +class CondorJobScheduler(object): + """ + Goal of the htcondor job scheduler is to have a scheduler that somehow + mimics how htcondor does schedule jobs. + Htcondor does scheduling based on a priority queue. The priorities itself + are managed by operators of htcondor. + So different instances can apparently behave very different. + In my case I am going to try building a priority queue that sorts job slots + by increasing cost. The cost itself is calculated based on the current + strategy that is used at GridKa. The scheduler checks if a job either + exactly fits a slot or if it does fit into it several times. The cost for + putting a job at a given slot is given by the amount of resources that + might remain unallocated. + :return: + """ + + def __init__(self, job_queue): + self._stream_queue = job_queue + self.drone_cluster = [] + self.interval = 60 + self.job_queue = JobQueue() + self._collecting = True + self._processing = Resources(jobs=0) + + @property + def drone_list(self): + for cluster in self.drone_cluster: + for drone in cluster: + yield drone + + def register_drone(self, drone: Drone): + self._add_drone(drone) + + def unregister_drone(self, drone: Drone): + for cluster in self.drone_cluster: + try: + cluster.remove(drone) + except ValueError: + pass + else: + if len(cluster) == 0: + self.drone_cluster.remove(cluster) + + def _add_drone(self, drone: Drone, drone_resources: Dict = None): + minimum_distance_cluster = None + distance = float("Inf") + if len(self.drone_cluster) > 0: + for cluster in self.drone_cluster: + current_distance = 0 + for key in {*cluster[0].pool_resources, *drone.pool_resources}: + if drone_resources: + current_distance += abs( + cluster[0].theoretical_available_resources.get(key, 0) + - drone_resources.get(key, 0) + ) + else: + current_distance += abs( + cluster[0].theoretical_available_resources.get(key, 0) + - drone.theoretical_available_resources.get(key, 0) + ) + if current_distance < distance: + minimum_distance_cluster = cluster + distance = current_distance + if distance < 1: + minimum_distance_cluster.append(drone) + else: + self.drone_cluster.append([drone]) + else: + self.drone_cluster.append([drone]) + + def update_drone(self, drone: Drone): + self.unregister_drone(drone) + self._add_drone(drone) + + async def run(self): + async with Scope() as scope: + scope.do(self._collect_jobs()) + async for _ in interval(self.interval): + for job in self.job_queue.copy(): + best_match = self._schedule_job(job) + if best_match: + await best_match.schedule_job(job) + self.job_queue.remove(job) + await sampling_required.put(self.job_queue) + self.unregister_drone(best_match) + left_resources = best_match.theoretical_available_resources + left_resources = { + key: value - job.resources.get(key, 0) + for key, value in left_resources.items() + } + self._add_drone(best_match, left_resources) + if ( + not self._collecting + and not self.job_queue + and self._processing.levels.jobs == 0 + ): + break + await sampling_required.put(self) + + async def _collect_jobs(self): + async for job in self._stream_queue: + self.job_queue.append(job) + await self._processing.increase(jobs=1) + # TODO: logging happens with each job + await sampling_required.put(self.job_queue) + self._collecting = False + + async def job_finished(self, job): + if job.successful: + await self._processing.decrease(jobs=1) + else: + await self._stream_queue.put(job) + + def _schedule_job(self, job) -> Drone: + priorities = {} + for cluster in self.drone_cluster: + drone = cluster[0] + cost = 0 + resources = drone.theoretical_available_resources + for resource_type in job.resources: + if resources.get(resource_type, 0) < job.resources[resource_type]: + # Inf for all job resources that a drone does not support + # and all resources that are too small to even be considered + cost = float("Inf") + break + else: + try: + cost += 1 / ( + resources[resource_type] // job.resources[resource_type] + ) + except KeyError: + pass + for additional_resource_type in [ + key for key in drone.pool_resources if key not in job.resources + ]: + cost += resources[additional_resource_type] + cost /= len((*job.resources, *drone.pool_resources)) + if cost <= 1: + # directly start job + return drone + try: + priorities[cost].append(drone) + except KeyError: + priorities[cost] = [drone] + try: + minimal_key = min(priorities) + if minimal_key < float("Inf"): + return priorities[minimal_key][0] + except ValueError: + pass + return None + + +class CondorClassadJobScheduler(JobScheduler): """ Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor does schedule jobs. From 7722f12bd7700e6c74ed1a5af7220e565aca14cb Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Fri, 31 Jan 2020 15:02:29 +0100 Subject: [PATCH 08/27] CondorJobScheduler now implements interface of JobScheduler --- lapis/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index ec027e6..fb7a9c8 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -96,7 +96,7 @@ async def job_finished(self, job): raise NotImplementedError -class CondorJobScheduler(object): +class CondorJobScheduler(JobScheduler): """ Goal of the htcondor job scheduler is to have a scheduler that somehow mimics how htcondor does schedule jobs. From 5fc237f501cd1613b439afbe64e148d0a67de22f Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Fri, 31 Jan 2020 16:05:02 +0100 Subject: [PATCH 09/27] added possibility to save temporary resources at drone wrapper --- lapis/scheduler.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index fb7a9c8..457c96a 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -28,12 +28,13 @@ class JobQueue(list): class WrappedClassAd(ClassAd, Generic[DJ]): - __slots__ = "_wrapped" + __slots__ = "_wrapped", "_temp" def __init__(self, classad: ClassAd, wrapped: DJ): super(WrappedClassAd, self).__init__() self._wrapped = wrapped self._data = classad._data + self._temp = {} def __getitem__(self, item): def access_wrapped(name, requested=True): @@ -51,13 +52,25 @@ def access_wrapped(name, requested=True): elif "requestdisk" in item: return (1 / 1024) * access_wrapped("disk", requested=True) elif "cpus" in item: - return access_wrapped("cores", requested=False) + try: + return self._temp["cores"] + except KeyError: + return access_wrapped("cores", requested=False) elif "memory" in item: - return (1 / 1000 / 1000) * access_wrapped("memory", requested=False) + try: + return self._temp["memory"] + except KeyError: + return (1 / 1000 / 1000) * access_wrapped("memory", requested=False) elif "disk" in item: - return (1 / 1024) * access_wrapped("disk", requested=False) + try: + return self._temp["disk"] + except KeyError: + return (1 / 1024) * access_wrapped("disk", requested=False) return super(WrappedClassAd, self).__getitem__(item) + def clear_temporary_resources(self): + self._temp.clear() + def __repr__(self): return f"<{self.__class__.__name__}>: {self._wrapped}" From 6461b92ebb5d7703055f988aff008b772e76cdef Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Fri, 31 Jan 2020 16:50:42 +0100 Subject: [PATCH 10/27] added docstrings to scheduler base class --- lapis/scheduler.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 457c96a..1462aa5 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -91,21 +91,31 @@ class JobScheduler(ABC): @property def drone_list(self) -> Iterator[Drone]: + """Yields the registered drones""" raise NotImplementedError def register_drone(self, drone: Drone): + """Register a drone at the scheduler""" raise NotImplementedError def unregister_drone(self, drone: Drone): + """Unregister a drone at the scheduler""" raise NotImplementedError def update_drone(self, drone: Drone): + """Update parameters of a drone""" raise NotImplementedError async def run(self): + """Run method of the scheduler""" raise NotImplementedError async def job_finished(self, job): + """ + Declare a job as finished by a drone. This might even mean, that the job + has failed and that the scheduler needs to requeue the job for further + processing. + """ raise NotImplementedError From 4589d888c22292c46513fa9cde31667a6db08340 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Wed, 12 Feb 2020 12:42:35 +0100 Subject: [PATCH 11/27] condor classad scheduler draft --- lapis/scheduler.py | 299 ++++++++++++++++++++++++++------------------- 1 file changed, 174 insertions(+), 125 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 457c96a..28483a6 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,8 +1,11 @@ from abc import ABC -from typing import Dict, Iterator, Union, Tuple, List, TypeVar, Generic, Optional +from typing import Dict, Iterator, Tuple, List, TypeVar, Generic, Set, NamedTuple from weakref import WeakKeyDictionary +from sortedcontainers import SortedDict + from classad import parse +from classad._base_expression import Expression from classad._functions import quantize from classad._primitives import HTCInt, Undefined from classad._expression import ClassAd @@ -23,6 +26,15 @@ class JobQueue(list): "cores": HTCInt(1), } +machine_ad_defaults = """ +requirements = target.requestcpus <= my.cpus +""".strip() + +job_ad_defaults = """ +requirements = my.requestcpus <= target.cpus && my.requestmemory <= target.memory +""" + +T = TypeVar("T") DJ = TypeVar("DJ", Drone, Job) @@ -262,6 +274,88 @@ def _schedule_job(self, job) -> Drone: return None +# HTCondor ClassAd Scheduler + +class NoMatch(Exception): + """A job could not be matched to any drone""" + + +class RankedClusterKey(NamedTuple): + rank: float + key: Tuple[float, ...] + + +class RankedAutoClusters(Generic[DJ]): + """Automatically cluster similar jobs or drones""" + def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): + self._quantization = quantization + self._ranking = ranking + self._clusters: Dict[RankedClusterKey, Set[WrappedClassAd[DJ]]] = SortedDict() + self._inverse: Dict[WrappedClassAd[DJ], RankedClusterKey] = {} + + def copy(self) -> 'RankedAutoClusters[DJ]': + """Copy the entire ranked auto clusters""" + clone = type(self)(quantization=self._quantization, ranking=self._ranking) + clone._clusters = SortedDict( + (key, value.copy()) + for key, value in self._clusters.items() + ) + clone._inverse = self._inverse.copy() + return clone + + def add(self, item: WrappedClassAd[DJ]): + """Add a new item""" + if item in self._inverse: + raise ValueError(f'{item!r} already stored; use `.update(item)` instead') + item_key = self._clustering_key(item) + try: + self._clusters[item_key].add(item) + except KeyError: + self._clusters[item_key] = {item} + self._inverse[item] = item_key + + def remove(self, item: WrappedClassAd[DJ]): + """Remove an existing item""" + item_key = self._inverse.pop(item) + cluster = self._clusters[item_key] + cluster.remove(item) + if not cluster: + del self._clusters[item_key] + + def update(self, item): + """Update an existing item with its current state""" + self.remove(item) + self.add(item) + + def _clustering_key(self, item: WrappedClassAd[DJ]): + # TODO: assert that order is consistent + quantization = self._quantization + return RankedClusterKey( + rank=self._ranking.evaluate(my=item), + key=tuple( + int(quantize(value, quantization.get(key, 1))) + for key, value in item._wrapped.available_resources.items() + ) + ) + + def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: + return iter(self._clusters.values()) + + def items(self) -> Iterator[Tuple[RankedClusterKey, Set[WrappedClassAd[DJ]]]]: + return iter(self._clusters.items()) + + def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: + """Group autoclusters by PreJobRank""" + group = [] + current_rank = 0 + for ranked_key, drones in self._clusters.items(): + if ranked_key.rank != current_rank and group: + current_rank = ranked_key.rank + yield group + group = [] + group.append(drones) + + class CondorClassadJobScheduler(JobScheduler): """ Goal of the htcondor job scheduler is to have a scheduler that somehow @@ -279,169 +373,128 @@ class CondorClassadJobScheduler(JobScheduler): :return: """ - def __init__(self, job_queue): + def __init__( + self, + job_queue, + machine_ad: str = machine_ad_defaults, + job_ad: str = job_ad_defaults, + pre_job_rank: str = '0', + interval: float = 60, + ): self._stream_queue = job_queue - self.drone_cluster: Dict[Tuple[float, ...], Cluster[WrappedClassAd[Drone]]] = {} - self.job_cluster: Dict[Tuple[float, ...], Cluster[WrappedClassAd[Job]]] = {} - self.interval = 60 + self._drones: RankedAutoClusters[Drone] = RankedAutoClusters( + quantization=quantization_defaults, + ranking=parse(pre_job_rank), + ) + self.interval = interval self.job_queue = JobQueue() self._collecting = True self._processing = Resources(jobs=0) # temporary solution self._wrapped_classads = WeakKeyDictionary() - self._machine_classad = parse( - """ - requirements = target.requestcpus <= my.cpus - """ - ) - self._job_classad = parse( - """ - requirements = my.requestcpus <= target.cpus && my.requestmemory <= target.memory - """ - ) + self._machine_classad = parse(machine_ad) + self._job_classad = parse(job_ad) @property def drone_list(self) -> Iterator[Drone]: - for cluster in self.drone_cluster.values(): + for cluster in self._drones.clusters(): for drone in cluster: yield drone._wrapped def register_drone(self, drone: Drone): wrapped_drone = WrappedClassAd(classad=self._machine_classad, wrapped=drone) self._wrapped_classads[drone] = wrapped_drone - self._add_drone(wrapped_drone) + self._drones.add(wrapped_drone) def unregister_drone(self, drone: Drone): drone_wrapper = self._wrapped_classads[drone] - for key in self.drone_cluster: - try: - self.drone_cluster[key].remove(drone_wrapper) - except ValueError: - pass - else: - break - else: - # nothing was removed - return - if len(self.drone_cluster[key]) == 0: - del self.drone_cluster[key] - - @staticmethod - def _clustering_key(resource_dict: Dict): - clustering_key = [] - for key, value in resource_dict.items(): - clustering_key.append( - int(quantize(value, quantization_defaults.get(key, 1))) - ) - return tuple(clustering_key) - - def _add_drone(self, drone: WrappedClassAd, drone_resources: Dict = None): - wrapped_drone = drone._wrapped - if drone_resources: - clustering_key = self._clustering_key(drone_resources) - else: - clustering_key = self._clustering_key(wrapped_drone.available_resources) - self.drone_cluster.setdefault(clustering_key, Cluster()).append(drone) + self._drones.remove(drone_wrapper) def update_drone(self, drone: Drone): - self.unregister_drone(drone) - self._add_drone(self._wrapped_classads[drone]) - - def _sort_drone_cluster(self): - return [Bucket(self.drone_cluster.values())] - - def _sort_job_cluster(self): - return Bucket(self.job_cluster.values()) + drone_wrapper = self._wrapped_classads[drone] + self._drones.update(drone_wrapper) async def run(self): - def filter_drones(job: WrappedClassAd[Job], drone_bucket: Bucket[Drone]): - result = {} # type: Dict[Union[Undefined, float], Bucket[Drone]] - for drones in drone_bucket: - drone = drones[0] # type: WrappedClassAd[Drone] - if job.evaluate( - "requirements", my=job, target=drone - ) and drone.evaluate("requirements", my=drone, target=job): - rank = job.evaluate("rank", my=job, target=drone) - result.setdefault(rank, Bucket()).append(drones) - return result - - def pop_first( - ranked_drones: Dict[Union[Undefined, float], Bucket[Drone]] - ) -> Optional[WrappedClassAd[Drone]]: - if not ranked_drones: - return None - key = sorted(ranked_drones)[0] - values = ranked_drones[key] - result = values[0] - values.remove(result) - if not values: - del ranked_drones[key] - try: - return result[0] - except IndexError: - return pop_first(ranked_drones) - async with Scope() as scope: scope.do(self._collect_jobs()) async for _ in interval(self.interval): - # TODO: get sorted job cluster [{Job, ...}, ...] - # TODO: get set of drone cluster {{PSlot, ...}, ...} - # TODO: get sorted drone clusters PreJob [{{PSlot, ...}, ...}, ...] - # TODO: filter (Job.Requirements) and sort (Job.Rank) for job and drones => lazy - - all_drone_buckets = self._sort_drone_cluster() - filtered_drones = {} - for jobs in self._sort_job_cluster().copy(): - current_drone_bucket = 0 - for job in jobs: - best_match = pop_first(filtered_drones) - while best_match is None: - # lazily evaluate more PSlots - try: - # TODO: sort filtered_drones - filtered_drones = filter_drones( - job, all_drone_buckets[current_drone_bucket] - ) - except IndexError: - break - current_drone_bucket += 1 - best_match = pop_first(filtered_drones) - else: - # TODO: update drone and check if it gets reinserted to filtered_drones - await self._execute_job(job=job, drone=best_match) + await self._schedule_jobs() if ( not self._collecting and not self.job_queue and self._processing.levels.jobs == 0 ): break - await sampling_required.put(self) + + @staticmethod + def _match_job(job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[Drone]]]]): + if job['Requirements'] != Undefined: + pre_job_clusters = ( + [ + cluster for cluster in cluster_group + if job.evaluate('Requirements', my=job, target=next(iter(cluster))) + ] + for cluster_group in pre_job_clusters + ) + if job['Rank'] != Undefined: + pre_job_clusters = ( + sorted( + cluster_group, + key=lambda cluster: job.evaluate('Rank', my=job, target=next(iter(cluster))) + ) + for cluster_group in pre_job_clusters + ) + for cluster_group in pre_job_clusters: + # TODO: if we have POST_JOB_RANK, collect *all* matches of a group + for cluster in cluster_group: + for drone in cluster: + if ( + drone['Requirements'] == Undefined + or drone.evaluate('Requirements', my=drone, target=job) + ): + return drone + raise NoMatch() + + async def _schedule_jobs(self): + # Pre Job Rank is the same for all jobs + # Use a copy to allow temporary "remainder after match" estimates + pre_job_drones = self._drones.copy() + matches: List[Tuple[int, WrappedClassAd[Job], WrappedClassAd[Drone]]] = [] + for queue_index, candidate_job in enumerate(self.job_queue): + try: + matched_drone = self._match_job( + candidate_job, + pre_job_drones.cluster_groups() + ) + except NoMatch: + continue + else: + matches.append((queue_index, candidate_job, matched_drone)) + # TODO: deduct job-resources from matched drone + # and update instead of remove + pre_job_drones.remove(matched_drone) + if not matches: + return + # TODO: optimize for few matches, many matches, all matches + for queue_index, _, _ in reversed(matches): + del self.job_queue[queue_index] + for _, job, drone in matches: + await self._execute_job(job=job, drone=drone) + await sampling_required.put(self) + # NOTE: Is this correct? Triggers once instead of for each job + await sampling_required.put(self.job_queue) async def _execute_job(self, job: WrappedClassAd, drone: WrappedClassAd): wrapped_job = job._wrapped wrapped_drone = drone._wrapped await wrapped_drone.schedule_job(wrapped_job) - self.job_queue.remove(job) - cluster_key = self._clustering_key(wrapped_job.resources) - self.job_cluster[cluster_key].remove(job) - if len(self.job_cluster[cluster_key]) == 0: - del self.job_cluster[cluster_key] - await sampling_required.put(self.job_queue) - self.unregister_drone(wrapped_drone) - left_resources = { - key: value - wrapped_job.resources.get(key, 0) - for key, value in wrapped_drone.theoretical_available_resources.items() - } - self._add_drone(drone, left_resources) async def _collect_jobs(self): async for job in self._stream_queue: wrapped_job = WrappedClassAd(classad=self._job_classad, wrapped=job) self._wrapped_classads[job] = wrapped_job self.job_queue.append(wrapped_job) - cluster_key = self._clustering_key(job.resources) - self.job_cluster.setdefault(cluster_key, []).append(wrapped_job) await self._processing.increase(jobs=1) # TODO: logging happens with each job # TODO: job queue to the outside now contains wrapped classads... @@ -453,7 +506,3 @@ async def job_finished(self, job): await self._processing.decrease(jobs=1) else: self.job_queue.append(self._wrapped_classads[job]) - cluster_key = self._clustering_key(job.resources) - self.job_cluster.setdefault(cluster_key, []).append( - self._wrapped_classads[job] - ) From 3c3bbc6c1f0db799d180e63f58f33ead3a796ea8 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Wed, 12 Feb 2020 15:24:51 +0100 Subject: [PATCH 12/27] added hash for wrappedclassad --- lapis/scheduler.py | 44 +++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index bd20a57..ab4402d 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -89,6 +89,9 @@ def __repr__(self): def __eq__(self, other): return super().__eq__(other) and self._wrapped == other._wrapped + def __hash__(self): + return id(self._wrapped) + class Cluster(List[WrappedClassAd[DJ]], Generic[DJ]): pass @@ -286,6 +289,7 @@ def _schedule_job(self, job) -> Drone: # HTCondor ClassAd Scheduler + class NoMatch(Exception): """A job could not be matched to any drone""" @@ -297,18 +301,18 @@ class RankedClusterKey(NamedTuple): class RankedAutoClusters(Generic[DJ]): """Automatically cluster similar jobs or drones""" + def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): self._quantization = quantization self._ranking = ranking self._clusters: Dict[RankedClusterKey, Set[WrappedClassAd[DJ]]] = SortedDict() self._inverse: Dict[WrappedClassAd[DJ], RankedClusterKey] = {} - def copy(self) -> 'RankedAutoClusters[DJ]': + def copy(self) -> "RankedAutoClusters[DJ]": """Copy the entire ranked auto clusters""" clone = type(self)(quantization=self._quantization, ranking=self._ranking) clone._clusters = SortedDict( - (key, value.copy()) - for key, value in self._clusters.items() + (key, value.copy()) for key, value in self._clusters.items() ) clone._inverse = self._inverse.copy() return clone @@ -316,7 +320,7 @@ def copy(self) -> 'RankedAutoClusters[DJ]': def add(self, item: WrappedClassAd[DJ]): """Add a new item""" if item in self._inverse: - raise ValueError(f'{item!r} already stored; use `.update(item)` instead') + raise ValueError(f"{item!r} already stored; use `.update(item)` instead") item_key = self._clustering_key(item) try: self._clusters[item_key].add(item) @@ -345,7 +349,7 @@ def _clustering_key(self, item: WrappedClassAd[DJ]): key=tuple( int(quantize(value, quantization.get(key, 1))) for key, value in item._wrapped.available_resources.items() - ) + ), ) def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: @@ -388,13 +392,12 @@ def __init__( job_queue, machine_ad: str = machine_ad_defaults, job_ad: str = job_ad_defaults, - pre_job_rank: str = '0', + pre_job_rank: str = "0", interval: float = 60, ): self._stream_queue = job_queue self._drones: RankedAutoClusters[Drone] = RankedAutoClusters( - quantization=quantization_defaults, - ranking=parse(pre_job_rank), + quantization=quantization_defaults, ranking=parse(pre_job_rank) ) self.interval = interval self.job_queue = JobQueue() @@ -438,20 +441,25 @@ async def run(self): break @staticmethod - def _match_job(job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[Drone]]]]): - if job['Requirements'] != Undefined: + def _match_job( + job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[Drone]]]] + ): + if job["Requirements"] != Undefined: pre_job_clusters = ( [ - cluster for cluster in cluster_group - if job.evaluate('Requirements', my=job, target=next(iter(cluster))) + cluster + for cluster in cluster_group + if job.evaluate("Requirements", my=job, target=next(iter(cluster))) ] for cluster_group in pre_job_clusters ) - if job['Rank'] != Undefined: + if job["Rank"] != Undefined: pre_job_clusters = ( sorted( cluster_group, - key=lambda cluster: job.evaluate('Rank', my=job, target=next(iter(cluster))) + key=lambda cluster: job.evaluate( + "Rank", my=job, target=next(iter(cluster)) + ), ) for cluster_group in pre_job_clusters ) @@ -459,9 +467,8 @@ def _match_job(job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[ # TODO: if we have POST_JOB_RANK, collect *all* matches of a group for cluster in cluster_group: for drone in cluster: - if ( - drone['Requirements'] == Undefined - or drone.evaluate('Requirements', my=drone, target=job) + if drone["Requirements"] == Undefined or drone.evaluate( + "Requirements", my=drone, target=job ): return drone raise NoMatch() @@ -474,8 +481,7 @@ async def _schedule_jobs(self): for queue_index, candidate_job in enumerate(self.job_queue): try: matched_drone = self._match_job( - candidate_job, - pre_job_drones.cluster_groups() + candidate_job, pre_job_drones.cluster_groups() ) except NoMatch: continue From ee779a4c59dcb9b12b607affa575cf61e37f6b28 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Wed, 12 Feb 2020 16:48:06 +0100 Subject: [PATCH 13/27] made scheduler working --- lapis/scheduler.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index ab4402d..6199222 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -361,13 +361,16 @@ def items(self) -> Iterator[Tuple[RankedClusterKey, Set[WrappedClassAd[DJ]]]]: def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: """Group autoclusters by PreJobRank""" group = [] - current_rank = 0 + current_rank = None for ranked_key, drones in self._clusters.items(): - if ranked_key.rank != current_rank and group: + if ranked_key.rank != current_rank: current_rank = ranked_key.rank - yield group - group = [] + if group: + yield group + group = [] group.append(drones) + if group: + yield group class CondorClassadJobScheduler(JobScheduler): @@ -444,7 +447,7 @@ async def run(self): def _match_job( job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[Drone]]]] ): - if job["Requirements"] != Undefined: + if job["Requirements"] != Undefined(): pre_job_clusters = ( [ cluster @@ -453,7 +456,7 @@ def _match_job( ] for cluster_group in pre_job_clusters ) - if job["Rank"] != Undefined: + if job["Rank"] != Undefined(): pre_job_clusters = ( sorted( cluster_group, @@ -467,7 +470,7 @@ def _match_job( # TODO: if we have POST_JOB_RANK, collect *all* matches of a group for cluster in cluster_group: for drone in cluster: - if drone["Requirements"] == Undefined or drone.evaluate( + if drone["Requirements"] == Undefined() or drone.evaluate( "Requirements", my=drone, target=job ): return drone From a2bf31666c53e9fc9575f184b78c9afa0317dbd2 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Wed, 12 Feb 2020 17:35:38 +0100 Subject: [PATCH 14/27] updating of available resources in drones, closes #82 --- lapis/scheduler.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 6199222..fe0a1cb 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -70,12 +70,12 @@ def access_wrapped(name, requested=True): return access_wrapped("cores", requested=False) elif "memory" in item: try: - return self._temp["memory"] + return (1 / 1000 / 1000) * self._temp["memory"] except KeyError: return (1 / 1000 / 1000) * access_wrapped("memory", requested=False) elif "disk" in item: try: - return self._temp["disk"] + return (1 / 1024) * self._temp["disk"] except KeyError: return (1 / 1024) * access_wrapped("disk", requested=False) return super(WrappedClassAd, self).__getitem__(item) @@ -490,15 +490,22 @@ async def _schedule_jobs(self): continue else: matches.append((queue_index, candidate_job, matched_drone)) - # TODO: deduct job-resources from matched drone - # and update instead of remove - pre_job_drones.remove(matched_drone) + for key, value in candidate_job._wrapped.resources.items(): + matched_drone._temp[key] = ( + matched_drone._temp.get( + key, + matched_drone._wrapped.theoretical_available_resources[key], + ) + - value + ) + pre_job_drones.update(matched_drone) if not matches: return # TODO: optimize for few matches, many matches, all matches for queue_index, _, _ in reversed(matches): del self.job_queue[queue_index] for _, job, drone in matches: + drone.clear_temporary_resources() await self._execute_job(job=job, drone=drone) await sampling_required.put(self) # NOTE: Is this correct? Triggers once instead of for each job From 079b979fdf3864cd948809add7113870bcd738e0 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Wed, 12 Feb 2020 17:42:07 +0100 Subject: [PATCH 15/27] gardening --- lapis/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index fe0a1cb..4c566a3 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -490,7 +490,7 @@ async def _schedule_jobs(self): continue else: matches.append((queue_index, candidate_job, matched_drone)) - for key, value in candidate_job._wrapped.resources.items(): + for key, value in enumerate(candidate_job._wrapped.resources): matched_drone._temp[key] = ( matched_drone._temp.get( key, From 420c74efa18b61184c6d2b326250415cc4f99ee3 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Thu, 13 Feb 2020 12:24:23 +0100 Subject: [PATCH 16/27] reversed gardening, sorry --- lapis/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 4c566a3..fe0a1cb 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -490,7 +490,7 @@ async def _schedule_jobs(self): continue else: matches.append((queue_index, candidate_job, matched_drone)) - for key, value in enumerate(candidate_job._wrapped.resources): + for key, value in candidate_job._wrapped.resources.items(): matched_drone._temp[key] = ( matched_drone._temp.get( key, From 307ba218ff08a932dcce5a4fecfe3b3d378e3ccd Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Thu, 13 Feb 2020 14:54:18 +0100 Subject: [PATCH 17/27] fixed calculation of clustering key and reversed pre_job_cluster --- lapis/scheduler.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index fe0a1cb..a652714 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -345,10 +345,10 @@ def _clustering_key(self, item: WrappedClassAd[DJ]): # TODO: assert that order is consistent quantization = self._quantization return RankedClusterKey( - rank=self._ranking.evaluate(my=item), + rank=-self._ranking.evaluate(my=item), key=tuple( - int(quantize(value, quantization.get(key, 1))) - for key, value in item._wrapped.available_resources.items() + int(quantize(item[key], quantization.get(key, 1))) + for key in ("cpus", "memory", "disk") ), ) @@ -463,6 +463,7 @@ def _match_job( key=lambda cluster: job.evaluate( "Rank", my=job, target=next(iter(cluster)) ), + reverse=True, ) for cluster_group in pre_job_clusters ) From bd56e204e3a6c76c1b965fb09e4a52d9615de637 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Thu, 13 Feb 2020 15:06:05 +0100 Subject: [PATCH 18/27] shuffling cluster group to remove bias --- lapis/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index a652714..4e808d0 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,3 +1,4 @@ +import random from abc import ABC from typing import Dict, Iterator, Tuple, List, TypeVar, Generic, Set, NamedTuple from weakref import WeakKeyDictionary @@ -469,6 +470,7 @@ def _match_job( ) for cluster_group in pre_job_clusters: # TODO: if we have POST_JOB_RANK, collect *all* matches of a group + random.shuffle(cluster_group) # shuffle cluster to remove bias towards cpus for cluster in cluster_group: for drone in cluster: if drone["Requirements"] == Undefined() or drone.evaluate( From 0f562ee403a5db69f0f69497e2928acf96c938e9 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Sun, 16 Feb 2020 10:41:52 +0100 Subject: [PATCH 19/27] fixed inversion of rank --- lapis/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 4e808d0..fde2a53 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -346,7 +346,7 @@ def _clustering_key(self, item: WrappedClassAd[DJ]): # TODO: assert that order is consistent quantization = self._quantization return RankedClusterKey( - rank=-self._ranking.evaluate(my=item), + rank=-1.0 * self._ranking.evaluate(my=item), key=tuple( int(quantize(item[key], quantization.get(key, 1))) for key in ("cpus", "memory", "disk") From 1562428fefa65ef13d812397fd9b22381aaf99c9 Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Mon, 17 Feb 2020 09:37:08 +0100 Subject: [PATCH 20/27] job-rank uses non-stable sorting --- lapis/scheduler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index fde2a53..3ca2bc8 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -461,16 +461,15 @@ def _match_job( pre_job_clusters = ( sorted( cluster_group, - key=lambda cluster: job.evaluate( + key=lambda cluster: (job.evaluate( "Rank", my=job, target=next(iter(cluster)) - ), + ), random.random()), reverse=True, ) for cluster_group in pre_job_clusters ) for cluster_group in pre_job_clusters: # TODO: if we have POST_JOB_RANK, collect *all* matches of a group - random.shuffle(cluster_group) # shuffle cluster to remove bias towards cpus for cluster in cluster_group: for drone in cluster: if drone["Requirements"] == Undefined() or drone.evaluate( From 1efab0de227fa4a288b7225bd3d7bd5d54885fea Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Mon, 17 Feb 2020 18:35:20 +0100 Subject: [PATCH 21/27] added fake auto-clusters --- lapis/scheduler.py | 58 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 3ca2bc8..f77d4f7 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -374,6 +374,64 @@ def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: yield group +class RankedNonClusters(Generic[DJ]): + """Automatically cluster jobs or drones by rank only""" + + def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): + self._quantization = quantization + self._ranking = ranking + self._clusters: Dict[float, Set[WrappedClassAd[DJ]]] = SortedDict() + self._inverse: Dict[WrappedClassAd[DJ], float] = {} + + def copy(self) -> "RankedNonClusters[DJ]": + """Copy the entire ranked auto clusters""" + clone = type(self)(quantization=self._quantization, ranking=self._ranking) + clone._clusters = SortedDict( + (key, value.copy()) for key, value in self._clusters.items() + ) + clone._inverse = self._inverse.copy() + return clone + + def add(self, item: WrappedClassAd[DJ]): + """Add a new item""" + if item in self._inverse: + raise ValueError(f"{item!r} already stored; use `.update(item)` instead") + item_key = self._clustering_key(item) + try: + self._clusters[item_key].add(item) + except KeyError: + self._clusters[item_key] = {item} + self._inverse[item] = item_key + + def remove(self, item: WrappedClassAd[DJ]): + """Remove an existing item""" + item_key = self._inverse.pop(item) + cluster = self._clusters[item_key] + cluster.remove(item) + if not cluster: + del self._clusters[item_key] + + def update(self, item): + """Update an existing item with its current state""" + self.remove(item) + self.add(item) + + def _clustering_key(self, item: WrappedClassAd[DJ]): + # TODO: assert that order is consistent + return -1.0 * self._ranking.evaluate(my=item) + + def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: + return iter(self._clusters.values()) + + def items(self) -> Iterator[Tuple[float, Set[WrappedClassAd[DJ]]]]: + return iter(self._clusters.items()) + + def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: + """Group autoclusters by PreJobRank""" + for ranked_key, drones in self._clusters.items(): + yield [{item} for item in drones] + + class CondorClassadJobScheduler(JobScheduler): """ Goal of the htcondor job scheduler is to have a scheduler that somehow From 6571c7a1590cac4a58b7fc2b7eaff9a14966709d Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Mon, 17 Feb 2020 18:50:11 +0100 Subject: [PATCH 22/27] added interface for ranked autoclusters --- lapis/scheduler.py | 69 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index f77d4f7..3cfa039 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,6 +1,6 @@ import random -from abc import ABC -from typing import Dict, Iterator, Tuple, List, TypeVar, Generic, Set, NamedTuple +from abc import ABC, abstractmethod +from typing import Dict, Iterator, Tuple, List, TypeVar, Generic, Set, NamedTuple, Any from weakref import WeakKeyDictionary from sortedcontainers import SortedDict @@ -300,7 +300,51 @@ class RankedClusterKey(NamedTuple): key: Tuple[float, ...] -class RankedAutoClusters(Generic[DJ]): +RC = TypeVar('RC', bound='RankedClusters') + + +class RankedClusters(Generic[DJ]): + """Automatically cluster drones by rank""" + + @abstractmethod + def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): + raise NotImplementedError + + @abstractmethod + def copy(self: RC[DJ]) -> RC[DJ]: + """Copy the entire ranked auto clusters""" + raise NotImplementedError + + @abstractmethod + def add(self, item: WrappedClassAd[DJ]) -> None: + """Add a new item""" + raise NotImplementedError + + @abstractmethod + def remove(self, item: WrappedClassAd[DJ]) -> None: + """Remove an existing item""" + raise NotImplementedError + + def update(self, item) -> None: + """Update an existing item with its current state""" + self.remove(item) + self.add(item) + + @abstractmethod + def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: + raise NotImplementedError + + @abstractmethod + def items(self) -> Iterator[Tuple[Any, Set[WrappedClassAd[DJ]]]]: + raise NotImplementedError + + @abstractmethod + def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: + """Group autoclusters by PreJobRank""" + raise NotImplementedError + + +class RankedAutoClusters(RankedClusters[DJ]): """Automatically cluster similar jobs or drones""" def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): @@ -310,7 +354,6 @@ def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): self._inverse: Dict[WrappedClassAd[DJ], RankedClusterKey] = {} def copy(self) -> "RankedAutoClusters[DJ]": - """Copy the entire ranked auto clusters""" clone = type(self)(quantization=self._quantization, ranking=self._ranking) clone._clusters = SortedDict( (key, value.copy()) for key, value in self._clusters.items() @@ -319,7 +362,6 @@ def copy(self) -> "RankedAutoClusters[DJ]": return clone def add(self, item: WrappedClassAd[DJ]): - """Add a new item""" if item in self._inverse: raise ValueError(f"{item!r} already stored; use `.update(item)` instead") item_key = self._clustering_key(item) @@ -330,18 +372,12 @@ def add(self, item: WrappedClassAd[DJ]): self._inverse[item] = item_key def remove(self, item: WrappedClassAd[DJ]): - """Remove an existing item""" item_key = self._inverse.pop(item) cluster = self._clusters[item_key] cluster.remove(item) if not cluster: del self._clusters[item_key] - def update(self, item): - """Update an existing item with its current state""" - self.remove(item) - self.add(item) - def _clustering_key(self, item: WrappedClassAd[DJ]): # TODO: assert that order is consistent quantization = self._quantization @@ -360,7 +396,6 @@ def items(self) -> Iterator[Tuple[RankedClusterKey, Set[WrappedClassAd[DJ]]]]: return iter(self._clusters.items()) def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: - """Group autoclusters by PreJobRank""" group = [] current_rank = None for ranked_key, drones in self._clusters.items(): @@ -374,7 +409,7 @@ def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: yield group -class RankedNonClusters(Generic[DJ]): +class RankedNonClusters(RankedClusters[DJ]): """Automatically cluster jobs or drones by rank only""" def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): @@ -384,7 +419,6 @@ def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): self._inverse: Dict[WrappedClassAd[DJ], float] = {} def copy(self) -> "RankedNonClusters[DJ]": - """Copy the entire ranked auto clusters""" clone = type(self)(quantization=self._quantization, ranking=self._ranking) clone._clusters = SortedDict( (key, value.copy()) for key, value in self._clusters.items() @@ -393,7 +427,6 @@ def copy(self) -> "RankedNonClusters[DJ]": return clone def add(self, item: WrappedClassAd[DJ]): - """Add a new item""" if item in self._inverse: raise ValueError(f"{item!r} already stored; use `.update(item)` instead") item_key = self._clustering_key(item) @@ -404,7 +437,6 @@ def add(self, item: WrappedClassAd[DJ]): self._inverse[item] = item_key def remove(self, item: WrappedClassAd[DJ]): - """Remove an existing item""" item_key = self._inverse.pop(item) cluster = self._clusters[item_key] cluster.remove(item) @@ -412,12 +444,10 @@ def remove(self, item: WrappedClassAd[DJ]): del self._clusters[item_key] def update(self, item): - """Update an existing item with its current state""" self.remove(item) self.add(item) def _clustering_key(self, item: WrappedClassAd[DJ]): - # TODO: assert that order is consistent return -1.0 * self._ranking.evaluate(my=item) def clusters(self) -> Iterator[Set[WrappedClassAd[DJ]]]: @@ -427,7 +457,6 @@ def items(self) -> Iterator[Tuple[float, Set[WrappedClassAd[DJ]]]]: return iter(self._clusters.items()) def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: - """Group autoclusters by PreJobRank""" for ranked_key, drones in self._clusters.items(): yield [{item} for item in drones] @@ -458,7 +487,7 @@ def __init__( interval: float = 60, ): self._stream_queue = job_queue - self._drones: RankedAutoClusters[Drone] = RankedAutoClusters( + self._drones: RankedClusters[Drone] = RankedAutoClusters( quantization=quantization_defaults, ranking=parse(pre_job_rank) ) self.interval = interval From 60a582f43e6fca1fd8a8af8d1043e0499d34f64a Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Mon, 24 Feb 2020 10:09:22 +0100 Subject: [PATCH 23/27] fixed signature of copy for RankedAutoClusters --- lapis/scheduler.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 3cfa039..f71c9a5 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -300,7 +300,7 @@ class RankedClusterKey(NamedTuple): key: Tuple[float, ...] -RC = TypeVar('RC', bound='RankedClusters') +RC = TypeVar("RC", bound="RankedClusters") class RankedClusters(Generic[DJ]): @@ -311,7 +311,7 @@ def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): raise NotImplementedError @abstractmethod - def copy(self: RC[DJ]) -> RC[DJ]: + def copy(self: "RankedAutoClusters[DJ]") -> "RankedAutoClusters[DJ]": """Copy the entire ranked auto clusters""" raise NotImplementedError @@ -548,9 +548,10 @@ def _match_job( pre_job_clusters = ( sorted( cluster_group, - key=lambda cluster: (job.evaluate( - "Rank", my=job, target=next(iter(cluster)) - ), random.random()), + key=lambda cluster: ( + job.evaluate("Rank", my=job, target=next(iter(cluster))), + random.random(), + ), reverse=True, ) for cluster_group in pre_job_clusters From f15937ecae19a6fdb2788ab8813eded603a2bec5 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Mon, 24 Feb 2020 10:10:22 +0100 Subject: [PATCH 24/27] added possibility to check for empty drones, closes #92 --- lapis/drone.py | 5 +++++ lapis/scheduler.py | 2 ++ 2 files changed, 7 insertions(+) diff --git a/lapis/drone.py b/lapis/drone.py index 15da85b..4bca01d 100644 --- a/lapis/drone.py +++ b/lapis/drone.py @@ -15,6 +15,7 @@ def __init__( pool_resources: dict, scheduling_duration: float, ignore_resources: list = None, + empty: callable = lambda drone: False, ): """ :param scheduler: @@ -41,6 +42,10 @@ def __init__( self._allocation = None self._utilisation = None self._job_queue = Queue() + self._empty = empty + + def empty(self): + return self._empty(self) @property def theoretical_available_resources(self): diff --git a/lapis/scheduler.py b/lapis/scheduler.py index f71c9a5..9d13192 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -399,6 +399,8 @@ def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: group = [] current_rank = None for ranked_key, drones in self._clusters.items(): + if next(iter(drones))._wrapped.empty(): + continue if ranked_key.rank != current_rank: current_rank = ranked_key.rank if group: From c802bee33a384c59eeedf3e88918fff7491c6bb6 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Mon, 24 Feb 2020 10:14:02 +0100 Subject: [PATCH 25/27] added underscore to ranked_key, as it is unused --- lapis/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 9d13192..527a36e 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -459,7 +459,7 @@ def items(self) -> Iterator[Tuple[float, Set[WrappedClassAd[DJ]]]]: return iter(self._clusters.items()) def cluster_groups(self) -> Iterator[List[Set[WrappedClassAd[Drone]]]]: - for ranked_key, drones in self._clusters.items(): + for _ranked_key, drones in self._clusters.items(): yield [{item} for item in drones] From 46398939772f2094bc2c3cc07fc1355a26eff1c3 Mon Sep 17 00:00:00 2001 From: Eileen Kuehn Date: Mon, 24 Feb 2020 14:02:01 +0100 Subject: [PATCH 26/27] skipping scheduling cycles when drones are all empty --- lapis/scheduler.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 527a36e..fdd1c1e 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -310,6 +310,11 @@ class RankedClusters(Generic[DJ]): def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): raise NotImplementedError + @abstractmethod + def empty(self) -> bool: + """"Whether there are no resources available""" + raise NotImplementedError + @abstractmethod def copy(self: "RankedAutoClusters[DJ]") -> "RankedAutoClusters[DJ]": """Copy the entire ranked auto clusters""" @@ -353,6 +358,12 @@ def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): self._clusters: Dict[RankedClusterKey, Set[WrappedClassAd[DJ]]] = SortedDict() self._inverse: Dict[WrappedClassAd[DJ], RankedClusterKey] = {} + def empty(self) -> bool: + for drones in self._clusters.values(): + if not next(iter(drones))._wrapped.empty(): + return False + return True + def copy(self) -> "RankedAutoClusters[DJ]": clone = type(self)(quantization=self._quantization, ranking=self._ranking) clone._clusters = SortedDict( @@ -420,6 +431,12 @@ def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): self._clusters: Dict[float, Set[WrappedClassAd[DJ]]] = SortedDict() self._inverse: Dict[WrappedClassAd[DJ], float] = {} + def empty(self) -> bool: + for drones in self._clusters.values(): + if not next(iter(drones))._wrapped.empty(): + return False + return True + def copy(self) -> "RankedNonClusters[DJ]": clone = type(self)(quantization=self._quantization, ranking=self._ranking) clone._clusters = SortedDict( @@ -571,6 +588,8 @@ def _match_job( async def _schedule_jobs(self): # Pre Job Rank is the same for all jobs # Use a copy to allow temporary "remainder after match" estimates + if self._drones.empty(): + return pre_job_drones = self._drones.copy() matches: List[Tuple[int, WrappedClassAd[Job], WrappedClassAd[Drone]]] = [] for queue_index, candidate_job in enumerate(self.job_queue): From 37c38de845d12575aa54002a6a715b39fd54630e Mon Sep 17 00:00:00 2001 From: Max Fischer Date: Mon, 24 Feb 2020 17:15:26 +0100 Subject: [PATCH 27/27] unclustered drones are checked for emptiness idnividually --- lapis/scheduler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lapis/scheduler.py b/lapis/scheduler.py index fdd1c1e..716f1a3 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -433,8 +433,9 @@ def __init__(self, quantization: Dict[str, HTCInt], ranking: Expression): def empty(self) -> bool: for drones in self._clusters.values(): - if not next(iter(drones))._wrapped.empty(): - return False + for drone in drones: + if not drone._wrapped.empty(): + return False return True def copy(self) -> "RankedNonClusters[DJ]":