From 669be2dd40f3ceae51397173bea78b45310435b0 Mon Sep 17 00:00:00 2001 From: Kern Attila GERMAIN <5556461+kernattila@users.noreply.github.com> Date: Fri, 25 Aug 2023 16:10:58 +0200 Subject: [PATCH] feat: Added function and property `canHandleNegativeCoresRequest` doc: added some comments doc: Added some documentation doc: fix docstrings and parameters doc: added debug message doc: explain why we allow negative value doc: update debug message for rqd --- .../com/imageworks/spcue/DispatchHost.java | 41 +++++++++++-------- .../imageworks/spcue/LocalHostAssignment.java | 26 +++++++----- .../com/imageworks/spcue/VirtualProc.java | 19 ++------- .../com/imageworks/spcue/dao/LayerDao.java | 29 +++++++------ .../spcue/dispatcher/CoreUnitDispatcher.java | 13 ++---- .../spcue/service/JobManagerService.java | 4 +- .../com/imageworks/spcue/service/JobSpec.java | 16 +------- cuebot/src/main/resources/opencue.properties | 8 ++-- cuegui/cuegui/FilterDialog.py | 2 +- rqd/rqd/rqcore.py | 4 +- 10 files changed, 72 insertions(+), 90 deletions(-) diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java index 6f55235bf..687e906ac 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java @@ -80,33 +80,42 @@ public String getAllocationId() { public String getFacilityId() { return facilityId; } - public boolean canHandleNegativeCoresRequirement(int minCores) { - if (minCores > 0) { - logger.debug(getName() + " can handle the job with " + minCores + " cores."); + + public boolean canHandleNegativeCoresRequest(int requestedCores) { + // Request is positive, no need to test further. + if (requestedCores > 0) { + logger.debug(getName() + " can handle the job with " + requestedCores + " cores."); return true; } + // All cores are available, validate the request. if (cores == idleCores) { - logger.debug(getName() + " can handle the job with " + minCores + " cores."); + logger.debug(getName() + " can handle the job with " + requestedCores + " cores."); return true; } - logger.debug(getName() + " cannot handle the job with " + minCores + " cores."); + // Some or all cores are busy, avoid booking again. + logger.debug(getName() + " cannot handle the job with " + requestedCores + " cores."); return false; } - public int handleNegativeCoresRequirement(int minCores) { - // Do not process positive requests - logger.debug("requested minCores:" + minCores); - if (minCores > 0) { - return minCores; + public int handleNegativeCoresRequirement(int requestedCores) { + // If we request a <=0 amount of cores, return positive core count. + + if (requestedCores > 0) { + // Do not process positive core requests. + logger.debug("Requested " + requestedCores + " cores."); + return requestedCores; } - // If request is negative but cores are already used, return 0 - if (minCores <=0 && idleCores < cores) { + if (requestedCores <=0 && idleCores < cores) { + // If request is negative but cores are already used, return 0. + // We don't want to overbook the host. + logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs."); return 0; } - int requestedCores = idleCores + minCores; - logger.debug("Requested core number is " + minCores + " <= 0, " + - "matching up to max number with difference " + idleCores + " > " + requestedCores); - return requestedCores; + // Book all cores minus the request + int totalCores = idleCores + requestedCores; + logger.debug("Requested " + requestedCores + " cores <= 0, " + + idleCores + " cores are free, booking " + totalCores + " cores"); + return totalCores; } @Override diff --git a/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java b/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java index 93371834b..aa944e297 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java +++ b/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java @@ -67,19 +67,25 @@ public LocalHostAssignment(int maxCores, int threads, long maxMemory, int maxGpu this.maxGpuMemory = maxGpuMemory; } - public int handleNegativeCoresRequirement(int minCores) { - // Do not process positive requests - if (minCores > 0) { - return minCores; + public int handleNegativeCoresRequirement(int requestedCores) { + // If we request a <=0 amount of cores, return positive core count. + + if (requestedCores > 0) { + // Do not process positive core requests. + logger.debug("Requested " + requestedCores + " cores."); + return requestedCores; } - // If request is negative but cores are already used, return 0 - if (minCores <=0 && idleCoreUnits < threads) { + if (requestedCores <=0 && idleCoreUnits < threads) { + // If request is negative but cores are already used, return 0. + // We don't want to overbook the host. + logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs."); return 0; } - int requestedCores = idleCoreUnits + minCores; - logger.debug("Requested core number is " + minCores + " <= 0, " + - "matching up to max number with difference " + idleCoreUnits + " > " + requestedCores); - return requestedCores; + // Book all cores minus the request + int totalCores = idleCoreUnits + requestedCores; + logger.debug("Requested " + requestedCores + " cores <= 0, " + + idleCoreUnits + " cores are free, booking " + totalCores + " cores"); + return totalCores; } @Override diff --git a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java index 29d1df396..b8dd11131 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java @@ -36,7 +36,7 @@ public class VirtualProc extends FrameEntity implements ProcInterface { public String os; public byte[] childProcesses; - public boolean canLaunch; + public boolean canHandleNegativeCoresRequest; public int coresReserved; public long memoryReserved; public long memoryUsed; @@ -100,7 +100,6 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame) { proc.unbooked = false; proc.isLocalDispatch = host.isLocalDispatch; -// proc.canLaunch = host.canHandleNegativeCoresRequirement(frame.minCores); proc.coresReserved = frame.minCores; proc.memoryReserved = frame.minMemory; proc.gpusReserved = frame.minGpus; @@ -115,18 +114,17 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame) { */ if (host.strandedCores > 0) { - logger.debug("host.strandedCores > 0 : " + host.strandedCores); proc.coresReserved = proc.coresReserved + host.strandedCores; } - proc.canLaunch = host.canHandleNegativeCoresRequirement(proc.coresReserved); + proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved); if (proc.coresReserved == 0) { logger.debug("Reserving all cores"); proc.coresReserved = host.cores; } else if (proc.coresReserved < 0) { - logger.debug("Reserving all cores " + proc.coresReserved); + logger.debug("Reserving all cores minus " + proc.coresReserved); proc.coresReserved = host.cores + proc.coresReserved; } else if (proc.coresReserved >= 100) { @@ -148,7 +146,6 @@ else if (proc.coresReserved >= 100) { // CueUtil.isDayTime()) { if (host.threadMode == ThreadMode.ALL_VALUE) { proc.coresReserved = wholeCores * 100; - logger.debug("host.threadMode == ThreadMode.ALL_VALUE : proc.coresReserved=" + proc.coresReserved); } else { if (frame.threadable) { if (host.idleMemory - frame.minMemory @@ -156,7 +153,6 @@ else if (proc.coresReserved >= 100) { proc.coresReserved = wholeCores * 100; } else { proc.coresReserved = getCoreSpan(host, frame.minMemory); - logger.debug("proc.coresReserved = getCoreSpan(host, frame.minMemory):" + proc.coresReserved); } if (host.threadMode == ThreadMode.VARIABLE_VALUE @@ -183,7 +179,6 @@ else if (proc.coresReserved >= 100) { * original. */ if (proc.coresReserved < originalCores) { - logger.debug("proc.coresReserved < originalCores: " + proc.coresReserved + " < " + originalCores); proc.coresReserved = originalCores; } @@ -191,12 +186,10 @@ else if (proc.coresReserved >= 100) { * Check to ensure we haven't exceeded max cores. */ if (frame.maxCores > 0 && proc.coresReserved >= frame.maxCores) { - logger.debug("frame.maxCores > 0 && proc.coresReserved >= frame.maxCores"); proc.coresReserved = frame.maxCores; } if (proc.coresReserved > host.idleCores) { - logger.debug("proc.coresReserved > host.idleCores"); if (host.threadMode == ThreadMode.VARIABLE_VALUE && frame.threadable && wholeCores == 1) { throw new JobDispatchException( @@ -204,7 +197,6 @@ else if (proc.coresReserved >= 100) { } proc.coresReserved = wholeCores * 100; } - logger.debug("finally, proc.coresReserved = " + proc.coresReserved); } /* @@ -265,19 +257,14 @@ public static final VirtualProc build(DispatchHost host, */ public static int getCoreSpan(DispatchHost host, long minMemory) { int totalCores = (int) (Math.floor(host.cores / 100.0)); - logger.debug("getCoreSpan() -> totalCores = " + totalCores); int idleCores = (int) (Math.floor(host.idleCores / 100.0)); - logger.debug("getCoreSpan() -> idleCores = " + idleCores); if (idleCores < 1) { return 100; } long memPerCore = host.idleMemory / totalCores; - logger.debug("getCoreSpan() -> memPerCore = " + memPerCore); double procs = minMemory / (double) memPerCore; - logger.debug("getCoreSpan() -> procs = " + procs); int reserveCores = (int) (Math.round(procs)) * 100; - logger.debug("getCoreSpan() -> reserveCores = " + reserveCores); return reserveCores; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java index cdcda5d03..c4b07edf9 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java @@ -59,7 +59,7 @@ public interface LayerDao { public List getLayerDetails(JobInterface job); /** - * Returns true if supplied layer is compelte. + * Returns true if supplied layer is complete. * * @param layer * @return boolean @@ -82,7 +82,7 @@ public interface LayerDao { void insertLayerDetail(LayerDetail l); /** - * gets a layer detail from an object that implments layer + * gets a layer detail from an object that implements layer * * @param layer * @return LayerDetail @@ -167,7 +167,7 @@ public interface LayerDao { void updateLayerTags(LayerInterface layer, Set tags); /** - * Insert a key/valye pair into the layer environment + * Insert a key/value pair into the layer environment * * @param layer * @param key @@ -282,7 +282,7 @@ public interface LayerDao { /** * Update all layers of the set type in the specified job - * with the new max cores requirement. + * with the new min cores requirement. * * @param job * @param cores @@ -292,7 +292,7 @@ public interface LayerDao { /** * Update all layers of the set type in the specified job - * with the new min cores requirement. + * with the new min gpu requirement. * * @param job * @param gpus @@ -304,9 +304,8 @@ public interface LayerDao { * Update a layer's max cores value, which limits how * much threading can go on. * - * @param job - * @param cores - * @param type + * @param layer + * @param threadable */ void updateThreadable(LayerInterface layer, boolean threadable); @@ -314,7 +313,7 @@ public interface LayerDao { * Update a layer's timeout value, which limits how * much the frame can run on a host. * - * @param job + * @param layer * @param timeout */ void updateTimeout(LayerInterface layer, int timeout); @@ -323,8 +322,8 @@ public interface LayerDao { * Update a layer's LLU timeout value, which limits how * much the frame can run on a host without updates in the log file. * - * @param job - * @param timeout + * @param layer + * @param timeout_llu */ void updateTimeoutLLU(LayerInterface layer, int timeout_llu); @@ -341,7 +340,7 @@ public interface LayerDao { /** * Appends a tag to the current set of tags. If the tag - * already exists than nothing happens. + * already exists then nothing happens. * * @param layer * @param val @@ -363,8 +362,9 @@ public interface LayerDao { * Update layer usage with processor time usage. * This happens when the proc has completed or failed some work. * - * @param proc + * @param layer * @param newState + * @param exitStatus */ void updateUsage(LayerInterface layer, ResourceUsage usage, int exitStatus); @@ -387,6 +387,9 @@ public interface LayerDao { /** * Enable/disable memory optimizer. + * + * @param layer + * @param state */ void enableMemoryOptimizer(LayerInterface layer, boolean state); diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java index c344681f4..8dc770131 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java @@ -258,14 +258,9 @@ public List dispatchHost(DispatchHost host, JobInterface job) { host.getName() + " " + host.idleCores + "/" + host.idleMemory + " on job " + job.getName()); - logger.debug("Frames summary before dispatch:"); for (DispatchFrame frame: frames) { - logger.debug("frame.minCores: " + frame.minCores + ", frame.command: " + frame.command); - } - for (DispatchFrame frame: frames) { - VirtualProc proc = VirtualProc.build(host, frame); - if (frame.minCores <= 0 && !proc.canLaunch) { + if (frame.minCores <= 0 && !proc.canHandleNegativeCoresRequest) { logger.debug("Cannot dispatch job, host is busy."); break; } @@ -273,8 +268,7 @@ public List dispatchHost(DispatchHost host, JobInterface job) { host.idleMemory < frame.minMemory || host.idleGpus < frame.minGpus || host.idleGpuMemory < frame.minGpuMemory) { - logger.debug("Cannot dispatch, host.idleCores < host.handleNegativeCoresRequirement(frame.minCores)"); - logger.debug(host.idleCores + " < " + host.handleNegativeCoresRequirement(frame.minCores) + " : frame.minCores"); + logger.debug("Cannot dispatch, insufficient resources."); break; } @@ -290,7 +284,8 @@ public List dispatchHost(DispatchHost host, JobInterface job) { boolean success = new DispatchFrameTemplate(proc, job, frame, false) { public void wrapDispatchFrame() { - logger.debug("Dispatching frame with minCores: " + frame.minCores + " on proc with coresReserved= " + proc.coresReserved); + logger.debug("Dispatching frame with " + frame.minCores + " minCores on proc with " + + proc.coresReserved + " coresReserved"); dispatch(frame, proc); dispatchSummary(proc, frame, "Booking"); return; diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java index 5d49caa24..27dc82021 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java @@ -275,9 +275,7 @@ public JobDetail createJob(BuildableJob buildableJob) { } if (layer.minimumCores > 0 && layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { - logger.debug("layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN"); - logger.debug(layer.minimumCores + " < " +Dispatcher.CORE_POINTS_RESERVED_MIN); - // layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN; + layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN; } logger.info("creating layer " + layer.name + " range: " + layer.range); diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java index 3ed18ac49..c64afa5d3 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java @@ -606,7 +606,6 @@ private void determineMinimumCores(Element layerTag, LayerDetail layer) { String cores = layerTag.getChildTextTrim("cores"); if (cores == null) { - logger.debug("cores == null"); return; } @@ -614,27 +613,17 @@ private void determineMinimumCores(Element layerTag, LayerDetail layer) { if (cores.contains(".")) { if (cores.contains("-")) { - logger.debug("cores is negative : " + cores); corePoints = (int) (Double.valueOf(cores) * 100 - .5); } else { - logger.debug("cores is positive : " + cores); corePoints = (int) (Double.valueOf(cores) * 100 + .5); } } else { - logger.debug("cores is an integer : " + cores); corePoints = Integer.valueOf(cores); } - logger.debug("submission cores : " + cores); - logger.debug("layer.minimumCores : " + layer.minimumCores); - logger.debug("corePoints : " + corePoints); - logger.debug("Dispatcher.CORE_POINTS_RESERVED_MIN : " + Dispatcher.CORE_POINTS_RESERVED_MIN); - if (corePoints > 0 && corePoints < Dispatcher.CORE_POINTS_RESERVED_MIN) { - logger.debug("corePoints > 0 && corePoints < Dispatcher.CORE_POINTS_RESERVED_MIN"); - //corePoints = Dispatcher.CORE_POINTS_RESERVED_DEFAULT; + corePoints = Dispatcher.CORE_POINTS_RESERVED_DEFAULT; } - logger.debug("corePoints after : " + corePoints); layer.minimumCores = corePoints; } @@ -668,14 +657,11 @@ private void determineThreadable(Element layerTag, LayerDetail layer) { // Must have at least 1 core to thread. if (layer.minimumCores > 0 && layer.minimumCores < 100) { layer.isThreadable = false; - logger.debug("not threadable : " + layer.minimumCores); } else if (layerTag.getChildTextTrim("threadable") != null) { layer.isThreadable = Convert.stringToBool( layerTag.getChildTextTrim("threadable")); - logger.debug("layerTag.getChildTextTrim('threadable') : " + layerTag.getChildTextTrim("threadable")); } - logger.debug("layer.isThreadable : " + layer.isThreadable); } private void determineResourceDefaults(Element layerTag, diff --git a/cuebot/src/main/resources/opencue.properties b/cuebot/src/main/resources/opencue.properties index 69e8ac892..a08522eb1 100644 --- a/cuebot/src/main/resources/opencue.properties +++ b/cuebot/src/main/resources/opencue.properties @@ -65,15 +65,15 @@ log.frame-log-root.default_os=${CUE_FRAME_LOG_DIR:/shots} dispatcher.job_query_max=20 # Number of seconds before waiting to book the same job from a different host. # "0" disables the job_lock -dispatcher.job_lock_expire_seconds=10 +dispatcher.job_lock_expire_seconds=20 # Concurrency level to allow on the job lock cache dispatcher.job_lock_concurrency_level=14 # Maximum number of frames to query from the DB to attempt to dispatch. -dispatcher.frame_query_max=2 +dispatcher.frame_query_max=20 # Maximum number of frames to book at one time on the same host. -dispatcher.job_frame_dispatch_max=4 +dispatcher.job_frame_dispatch_max=8 # Maximum number of frames to dispatch from a host at one time. -dispatcher.host_frame_dispatch_max=4 +dispatcher.host_frame_dispatch_max=12 # Choose between different scheduling strategies: # - PRIORITY_ONLY: Sort by priority only # - FIFO: Whether or not to enable FIFO scheduling in the same priority. diff --git a/cuegui/cuegui/FilterDialog.py b/cuegui/cuegui/FilterDialog.py index 1b60c1d1f..31d7a08d7 100644 --- a/cuegui/cuegui/FilterDialog.py +++ b/cuegui/cuegui/FilterDialog.py @@ -454,7 +454,7 @@ def createAction(self): "Create Action", "What value should this property be set to?", 0, - -8, + -8, # Allow minimum core value to be negative, booking all host cores minus this value. 50000, 2) value = float(value) diff --git a/rqd/rqd/rqcore.py b/rqd/rqd/rqcore.py index 8e8f4348a..25ccbc197 100644 --- a/rqd/rqd/rqcore.py +++ b/rqd/rqd/rqcore.py @@ -908,9 +908,7 @@ def launchFrame(self, runFrame): err = "Not launching, numCores must be > 0, got {}".format(cores_to_reserve) log.warning(err) raise rqd.rqexceptions.CoreReservationFailureException(err) - - log.info("Requested core number is negative {}, " - "matching up to max number with difference {} > {}".format( + log.info("Requested {} cores <= 0, {} cores are free, booking {}} cores".format( runFrame.num_cores, self.cores.idle_cores, cores_to_reserve)