|
10 | 10 |
|
11 | 11 | import argparse
|
12 | 12 | import errno
|
13 |
| -import json |
14 | 13 | import logging
|
15 |
| -import math |
16 | 14 | import sys
|
17 | 15 |
|
18 | 16 | import flux
|
| 17 | +from flux.queue import QueueList |
19 | 18 | from flux.util import AltField, FilterActionSetUpdate, UtilConfig, parse_fsd
|
20 | 19 |
|
21 | 20 |
|
@@ -144,161 +143,78 @@ def validate(self, path, conf):
|
144 | 143 | raise ValueError(f"{path}: invalid key {key}")
|
145 | 144 |
|
146 | 145 |
|
147 |
| -class QueueLimitsJobSizeInfo: |
148 |
| - def __init__(self, name, config, minormax): |
149 |
| - self.name = name |
150 |
| - self.config = config |
151 |
| - self.minormax = minormax |
152 |
| - |
153 |
| - def get_limit(self, key): |
154 |
| - try: |
155 |
| - val = self.config["queues"][self.name]["policy"]["limits"]["job-size"][ |
156 |
| - self.minormax |
157 |
| - ][key] |
158 |
| - except KeyError: |
159 |
| - try: |
160 |
| - val = self.config["policy"]["limits"]["job-size"][self.minormax][key] |
161 |
| - except KeyError: |
162 |
| - val = math.inf if self.minormax == "max" else 0 |
163 |
| - if val < 0: |
164 |
| - val = math.inf |
165 |
| - return val |
166 |
| - |
167 |
| - @property |
168 |
| - def nnodes(self): |
169 |
| - return self.get_limit("nnodes") |
170 |
| - |
171 |
| - @property |
172 |
| - def ncores(self): |
173 |
| - return self.get_limit("ncores") |
174 |
| - |
175 |
| - @property |
176 |
| - def ngpus(self): |
177 |
| - return self.get_limit("ngpus") |
178 |
| - |
179 |
| - |
180 |
| -class QueueLimitsRangeInfo: |
181 |
| - def __init__(self, name, min, max): |
182 |
| - self.name = name |
183 |
| - self.min = min |
184 |
| - self.max = max |
185 |
| - |
186 |
| - @property |
187 |
| - def nnodes(self): |
188 |
| - return f"{self.min.nnodes}-{self.max.nnodes}" |
189 |
| - |
190 |
| - @property |
191 |
| - def ncores(self): |
192 |
| - return f"{self.min.ncores}-{self.max.ncores}" |
193 |
| - |
194 |
| - @property |
195 |
| - def ngpus(self): |
196 |
| - return f"{self.min.ngpus}-{self.max.ngpus}" |
| 146 | +class QueueLimitsRange: |
| 147 | + """ |
| 148 | + QueueLimits wrapper which supports a string range "{min}-{max}" |
| 149 | + """ |
197 | 150 |
|
| 151 | + def __init__(self, limits): |
| 152 | + for item in ("nnodes", "ncores", "ngpus"): |
| 153 | + minimum = getattr(limits.min, item) |
| 154 | + maximum = getattr(limits.max, item) |
| 155 | + setattr(self, item, f"{minimum}-{maximum}") |
198 | 156 |
|
199 |
| -class QueueLimitsInfo: |
200 |
| - def __init__(self, name, config): |
201 |
| - self.name = name |
202 |
| - self.config = config |
203 |
| - self.min = QueueLimitsJobSizeInfo(name, config, "min") |
204 |
| - self.max = QueueLimitsJobSizeInfo(name, config, "max") |
205 |
| - self.range = QueueLimitsRangeInfo(name, self.min, self.max) |
206 | 157 |
|
207 |
| - @property |
208 |
| - def timelimit(self): |
209 |
| - try: |
210 |
| - duration = self.config["queues"][self.name]["policy"]["limits"]["duration"] |
211 |
| - except KeyError: |
212 |
| - try: |
213 |
| - duration = self.config["policy"]["limits"]["duration"] |
214 |
| - except KeyError: |
215 |
| - duration = "inf" |
216 |
| - t = parse_fsd(duration) |
217 |
| - return t |
| 158 | +class QueueLimitsWrapper: |
| 159 | + def __init__(self, info): |
| 160 | + self.__limits = info.limits |
| 161 | + self.range = QueueLimitsRange(info.limits) |
218 | 162 |
|
219 |
| - |
220 |
| -class QueueDefaultsInfo: |
221 |
| - def __init__(self, name, config): |
222 |
| - self.name = name |
223 |
| - self.config = config |
224 |
| - |
225 |
| - @property |
226 |
| - def timelimit(self): |
227 |
| - try: |
228 |
| - duration = self.config["queues"][self.name]["policy"]["jobspec"][ |
229 |
| - "defaults" |
230 |
| - ]["system"]["duration"] |
231 |
| - except KeyError: |
232 |
| - try: |
233 |
| - duration = self.config["policy"]["jobspec"]["defaults"]["system"][ |
234 |
| - "duration" |
235 |
| - ] |
236 |
| - except KeyError: |
237 |
| - duration = "inf" |
238 |
| - t = parse_fsd(duration) |
239 |
| - return t |
| 163 | + def __getattr__(self, attr): |
| 164 | + # Forward most attribute lookups to underlying QueueLimits instance |
| 165 | + return getattr(self.__limits, attr) |
240 | 166 |
|
241 | 167 |
|
242 | 168 | class QueueInfo:
|
243 |
| - def __init__(self, name, config, enabled, started): |
244 |
| - self.name = name |
245 |
| - self.config = config |
246 |
| - self.limits = QueueLimitsInfo(name, config) |
247 |
| - self.defaults = QueueDefaultsInfo(name, config) |
248 |
| - self.scheduling = "started" if started else "stopped" |
249 |
| - self.submission = "enabled" if enabled else "disabled" |
250 |
| - self._enabled = enabled |
251 |
| - self._started = started |
| 169 | + def __init__(self, queue_info): |
| 170 | + self.__qinfo = queue_info |
| 171 | + self.is_started = queue_info.started |
| 172 | + self.is_enabled = queue_info.enabled |
| 173 | + self.limits = QueueLimitsWrapper(queue_info) |
252 | 174 |
|
253 | 175 | def __getattr__(self, attr):
|
254 | 176 | try:
|
255 |
| - return getattr(self, attr) |
| 177 | + return getattr(self.__qinfo, attr) |
256 | 178 | except (KeyError, AttributeError):
|
257 | 179 | raise AttributeError("invalid QueueInfo attribute '{}'".format(attr))
|
258 | 180 |
|
| 181 | + @property |
| 182 | + def scheduling(self): |
| 183 | + return "started" if self.is_started else "stopped" |
| 184 | + |
| 185 | + @property |
| 186 | + def submission(self): |
| 187 | + return "enabled" if self.is_enabled else "disabled" |
| 188 | + |
259 | 189 | @property
|
260 | 190 | def queue(self):
|
261 |
| - return self.name if self.name else "" |
| 191 | + return self.name |
262 | 192 |
|
263 | 193 | @property
|
264 | 194 | def queuem(self):
|
265 |
| - try: |
266 |
| - defaultq = self.config["policy"]["jobspec"]["defaults"]["system"]["queue"] |
267 |
| - except KeyError: |
268 |
| - defaultq = "" |
269 |
| - q = self.queue + ("*" if defaultq and self.queue == defaultq else "") |
270 |
| - return q |
| 195 | + if self.name and self.is_default: |
| 196 | + return f"{self.name}*" |
| 197 | + return self.name |
271 | 198 |
|
272 | 199 | @property
|
273 | 200 | def color_enabled(self):
|
274 |
| - return "\033[01;32m" if self._enabled else "\033[01;31m" |
| 201 | + return "\033[01;32m" if self.is_enabled else "\033[01;31m" |
275 | 202 |
|
276 | 203 | @property
|
277 | 204 | def color_off(self):
|
278 | 205 | return "\033[0;0m"
|
279 | 206 |
|
280 | 207 | @property
|
281 | 208 | def enabled(self):
|
282 |
| - return AltField("✔", "y") if self._enabled else AltField("✗", "n") |
| 209 | + return AltField("✔", "y") if self.is_enabled else AltField("✗", "n") |
283 | 210 |
|
284 | 211 | @property
|
285 | 212 | def color_started(self):
|
286 |
| - return "\033[01;32m" if self._started else "\033[01;31m" |
| 213 | + return "\033[01;32m" if self.is_started else "\033[01;31m" |
287 | 214 |
|
288 | 215 | @property
|
289 | 216 | def started(self):
|
290 |
| - return AltField("✔", "y") if self._started else AltField("✗", "n") |
291 |
| - |
292 |
| - |
293 |
| -def fetch_all_queue_status(handle, queues=None): |
294 |
| - if handle is None: |
295 |
| - # Return fake payload if handle is not open (e.g. during testing) |
296 |
| - return {"enable": True, "start": True} |
297 |
| - topic = "job-manager.queue-status" |
298 |
| - if queues is None: |
299 |
| - return handle.rpc(topic, {}).get() |
300 |
| - rpcs = {x: handle.rpc(topic, {"name": x}) for x in queues} |
301 |
| - return {x: rpcs[x].get() for x in rpcs} |
| 217 | + return AltField("✔", "y") if self.is_started else AltField("✗", "n") |
302 | 218 |
|
303 | 219 |
|
304 | 220 | def list(args):
|
@@ -326,42 +242,10 @@ def list(args):
|
326 | 242 | "limits.min.ngpus": "MINGPUS",
|
327 | 243 | "limits.max.ngpus": "MAXGPUS",
|
328 | 244 | }
|
329 |
| - config = None |
330 |
| - handle = flux.Flux() |
331 |
| - |
332 |
| - future = handle.rpc("config.get") |
333 |
| - try: |
334 |
| - config = future.get() |
335 |
| - except Exception as e: |
336 |
| - LOGGER.warning("Could not get flux config: " + str(e)) |
337 |
| - |
338 | 245 | fmt = FluxQueueConfig("list").load().get_format_string(args.format)
|
339 | 246 | formatter = flux.util.OutputFormat(fmt, headings=headings)
|
340 | 247 |
|
341 |
| - # Build queue_config from args.queue, or config["queue"] if --queue |
342 |
| - # was unused: |
343 |
| - queue_config = {} |
344 |
| - if args.queue: |
345 |
| - for queue in args.queue: |
346 |
| - try: |
347 |
| - queue_config[queue] = config["queues"][queue] |
348 |
| - except KeyError: |
349 |
| - raise ValueError(f"No such queue: {queue}") |
350 |
| - elif config and "queues" in config: |
351 |
| - queue_config = config["queues"] |
352 |
| - |
353 |
| - queues = [] |
354 |
| - if config and "queues" in config: |
355 |
| - status = fetch_all_queue_status(handle, queue_config.keys()) |
356 |
| - for key, value in queue_config.items(): |
357 |
| - queues.append( |
358 |
| - QueueInfo(key, config, status[key]["enable"], status[key]["start"]) |
359 |
| - ) |
360 |
| - else: |
361 |
| - # single anonymous queue |
362 |
| - status = fetch_all_queue_status(handle) |
363 |
| - queues.append(QueueInfo(None, config, status["enable"], status["start"])) |
364 |
| - |
| 248 | + queues = [QueueInfo(x) for x in QueueList(flux.Flux(), queues=args.queue)] |
365 | 249 | formatter.print_items(queues, no_header=args.no_header)
|
366 | 250 |
|
367 | 251 |
|
|
0 commit comments