From f834ee7cc4324720bfedd6ffe30e066f0dc78efc Mon Sep 17 00:00:00 2001 From: Diego Tavares da Silva Date: Wed, 8 Nov 2023 09:05:14 -0800 Subject: [PATCH] Oom protection (#1321) * The current logic relies on hardcoded values which are not suitable for large hosts. The new logic takes into account the size of hosts and also tries to be more aggressive with misbehaving frames. Prevent host from entering an OOM state where oom-killer might start killing important OS processes. The kill logic will kick in one of the following conditions is met: Host has less than OOM_MEMORY_LEFT_THRESHOLD_PERCENT memory available A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had reserved For frames that are using more than they had reserved but not above the threshold, negotiate expanding the reservations with other frames on the same host (cherry picked from commit e88a5295f23bd927614de6d5af6a09d496d3e6ac) * Frames killed for OOM should be retried (cherry picked from commit b88f7bcb1ad43f83fb8357576c33483dc2bf4952) * OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD can be deactivated with -1 (cherry picked from commit 647e75e2254c7a7ff68c544e438080f412bf04c1) * Limit the number of kill retries There's an error condition on rqd where a frame that cannot be killed will end up preventing the host from picking up new jobs. This logic limits the number of repeated killRequests to give host a chance to pick up new jobs. At the same time, blocked frames are logged to spcue.log to be handled manually. (cherry picked from commit aea4864ef66aca494fb455a7c103e4a832b63d41) * Fix merge conflicts * Handle MR comments * Minor improvements to the logic Signed-off-by: Diego Tavares --------- Signed-off-by: Diego Tavares --- .../com/imageworks/spcue/FrameInterface.java | 1 - .../com/imageworks/spcue/dao/FrameDao.java | 7 + .../com/imageworks/spcue/dao/HostDao.java | 9 - .../spcue/dao/postgres/FrameDaoJdbc.java | 18 + .../spcue/dao/postgres/HostDaoJdbc.java | 9 - .../spcue/dao/postgres/ProcDaoJdbc.java | 2 +- .../spcue/dispatcher/DispatchSupport.java | 8 + .../dispatcher/DispatchSupportService.java | 13 +- .../spcue/dispatcher/Dispatcher.java | 10 +- .../dispatcher/FrameCompleteHandler.java | 73 +-- .../spcue/dispatcher/HostReportHandler.java | 519 +++++++++++------- .../commands/DispatchRqdKillFrame.java | 20 +- .../commands/DispatchRqdKillFrameMemory.java | 78 +++ .../imageworks/spcue/service/HostManager.java | 9 - .../spcue/service/HostManagerService.java | 6 - .../spring/applicationContext-service.xml | 1 - cuebot/src/main/resources/opencue.properties | 16 +- .../spcue/test/dao/postgres/HostDaoTests.java | 18 - .../dispatcher/FrameCompleteHandlerTests.java | 6 +- .../dispatcher/HostReportHandlerGpuTests.java | 10 +- .../dispatcher/HostReportHandlerTests.java | 243 ++++++-- .../conf/jobspec/jobspec_multiple_frames.xml | 48 ++ cuebot/src/test/resources/opencue.properties | 13 +- rqd/rqd/rqdservicers.py | 2 + rqd/rqd/rqnetwork.py | 1 + 25 files changed, 764 insertions(+), 376 deletions(-) create mode 100644 cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java create mode 100644 cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml diff --git a/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java b/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java index eff22768f..945685444 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java +++ b/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java @@ -1,4 +1,3 @@ - /* * Copyright Contributors to the OpenCue Project * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java index 7c2e3c050..4dbb0e987 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java @@ -202,6 +202,13 @@ boolean updateFrameStopped(FrameInterface frame, FrameState state, int exitStatu * @return */ boolean updateFrameCleared(FrameInterface frame); + /** + * Sets a frame exitStatus to EXIT_STATUS_MEMORY_FAILURE + * + * @param frame + * @return whether the frame has been updated + */ + boolean updateFrameMemoryError(FrameInterface frame); /** * Sets a frame to an unreserved waiting state. diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java index 5ed18947e..94ba316b1 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java @@ -252,15 +252,6 @@ public interface HostDao { */ void updateThreadMode(HostInterface host, ThreadMode mode); - /** - * When a host is in kill mode that means its 256MB+ into the swap and the - * the worst memory offender is killed. - * - * @param h HostInterface - * @return boolean - */ - boolean isKillMode(HostInterface h); - /** * Update the specified host's hardware information. * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index 3c752ad96..21c197a3b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -155,6 +155,24 @@ public boolean updateFrameCleared(FrameInterface frame) { return updateFrame(frame, Dispatcher.EXIT_STATUS_FRAME_CLEARED) > 0; } + private static final String UPDATE_FRAME_MEMORY_ERROR = + "UPDATE "+ + "frame "+ + "SET " + + "int_exit_status = ?, " + + "int_version = int_version + 1 " + + "WHERE " + + "frame.pk_frame = ? "; + @Override + public boolean updateFrameMemoryError(FrameInterface frame) { + int result = getJdbcTemplate().update( + UPDATE_FRAME_MEMORY_ERROR, + Dispatcher.EXIT_STATUS_MEMORY_FAILURE, + frame.getFrameId()); + + return result > 0; + } + private static final String UPDATE_FRAME_STARTED = "UPDATE " + "frame " + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java index 6fe898b44..f703416b2 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java @@ -612,15 +612,6 @@ public void updateHostOs(HostInterface host, String os) { os, host.getHostId()); } - @Override - public boolean isKillMode(HostInterface h) { - return getJdbcTemplate().queryForObject( - "SELECT COUNT(1) FROM host_stat WHERE pk_host = ? " + - "AND int_swap_total - int_swap_free > ? AND int_mem_free < ?", - Integer.class, h.getHostId(), Dispatcher.KILL_MODE_SWAP_THRESHOLD, - Dispatcher.KILL_MODE_MEM_THRESHOLD) > 0; - } - @Override public int getStrandedCoreUnits(HostInterface h) { try { diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java index 5af292fb3..8f6322690 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java @@ -564,7 +564,7 @@ public boolean increaseReservedMemory(ProcInterface p, long value) { value, p.getProcId(), value) == 1; } catch (Exception e) { // check by trigger erify_host_resources - throw new ResourceReservationFailureException("failed to increase memory reserveration for proc " + throw new ResourceReservationFailureException("failed to increase memory reservation for proc " + p.getProcId() + " to " + value + ", proc does not have that much memory to spare."); } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java index 4accd0f8d..aa20e6266 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java @@ -415,6 +415,14 @@ List findNextDispatchFrames(LayerInterface layer, VirtualProc pro */ void clearFrame(DispatchFrame frame); + /** + * Sets the frame state exitStatus to EXIT_STATUS_MEMORY_FAILURE + * + * @param frame + * @return whether the frame has been updated + */ + boolean updateFrameMemoryError(FrameInterface frame); + /** * Update Memory usage data and LLU time for the given frame. * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 0b1082801..bde4732b9 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -42,6 +42,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.dao.DataAccessException; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -184,7 +185,11 @@ public boolean increaseReservedMemory(ProcInterface p, long value) { @Override public boolean clearVirtualProcAssignement(ProcInterface proc) { - return procDao.clearVirtualProcAssignment(proc); + try { + return procDao.clearVirtualProcAssignment(proc); + } catch (DataAccessException e) { + return false; + } } @Transactional(propagation = Propagation.REQUIRED) @@ -343,6 +348,12 @@ public void clearFrame(DispatchFrame frame) { frameDao.updateFrameCleared(frame); } + @Override + @Transactional(propagation = Propagation.REQUIRED) + public boolean updateFrameMemoryError(FrameInterface frame) { + return frameDao.updateFrameMemoryError(frame); + } + @Transactional(propagation = Propagation.SUPPORTS) public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) { int threads = proc.coresReserved / 100; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java index 6ac703a41..072b04113 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java @@ -1,4 +1,3 @@ - /* * Copyright Contributors to the OpenCue Project * @@ -108,13 +107,8 @@ public interface Dispatcher { // without being penalized for it. public static final long VIRTUAL_MEM_THRESHHOLD = CueUtil.GB2; - // The amount of swap that must be used before a host can go - // into kill mode. - public static final long KILL_MODE_SWAP_THRESHOLD = CueUtil.MB128; - - // When the amount of free memory drops below this point, the - // host can go into kill mode. - public static final long KILL_MODE_MEM_THRESHOLD = CueUtil.MB512; + // How long to keep track of a frame kill request + public static final int FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES = 3; // A higher number gets more deep booking but less spread on the cue. public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java index 55336aaf4..c405a9e31 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java @@ -33,6 +33,7 @@ import com.imageworks.spcue.DispatchFrame; import com.imageworks.spcue.DispatchHost; import com.imageworks.spcue.DispatchJob; +import com.imageworks.spcue.FrameDetail; import com.imageworks.spcue.JobDetail; import com.imageworks.spcue.LayerDetail; import com.imageworks.spcue.LayerInterface; @@ -143,49 +144,35 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) { } try { - - final VirtualProc proc; - - try { - - proc = hostManager.getVirtualProc( - report.getFrame().getResourceId()); - } - catch (EmptyResultDataAccessException e) { - /* - * Do not propagate this exception to RQD. This - * usually means the cue lost connectivity to - * the host and cleared out the record of the proc. - * If this is propagated back to RQD, RQD will - * keep retrying the operation forever. - */ - logger.info("failed to acquire data needed to " + - "process completed frame: " + - report.getFrame().getFrameName() + " in job " + - report.getFrame().getJobName() + "," + e); - return; - } - + final VirtualProc proc = hostManager.getVirtualProc(report.getFrame().getResourceId()); final DispatchJob job = jobManager.getDispatchJob(proc.getJobId()); final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId()); + final FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); final FrameState newFrameState = determineFrameState(job, layer, frame, report); final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() + "_" + report.getFrame().getFrameId(); + if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(), report.getFrame().getMaxRss())) { - dispatchQueue.execute(new KeyRunnable(key) { - @Override - public void run() { - try { - handlePostFrameCompleteOperations(proc, report, job, frame, - newFrameState); - } catch (Exception e) { - logger.warn("Exception during handlePostFrameCompleteOperations " + - "in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e)); + if (dispatcher.isTestMode()) { + // Database modifications on a threadpool cannot be captured by the test thread + handlePostFrameCompleteOperations(proc, report, job, frame, + newFrameState, frameDetail); + } else { + dispatchQueue.execute(new KeyRunnable(key) { + @Override + public void run() { + try { + handlePostFrameCompleteOperations(proc, report, job, frame, + newFrameState, frameDetail); + } catch (Exception e) { + logger.warn("Exception during handlePostFrameCompleteOperations " + + "in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e)); + } } - } - }); + }); + } } else { /* @@ -222,6 +209,19 @@ public void run() { } } } + catch (EmptyResultDataAccessException e) { + /* + * Do not propagate this exception to RQD. This + * usually means the cue lost connectivity to + * the host and cleared out the record of the proc. + * If this is propagated back to RQD, RQD will + * keep retrying the operation forever. + */ + logger.info("failed to acquire data needed to " + + "process completed frame: " + + report.getFrame().getFrameName() + " in job " + + report.getFrame().getJobName() + "," + e); + } catch (Exception e) { /* @@ -259,7 +259,7 @@ public void run() { */ public void handlePostFrameCompleteOperations(VirtualProc proc, FrameCompleteReport report, DispatchJob job, DispatchFrame frame, - FrameState newFrameState) { + FrameState newFrameState, FrameDetail frameDetail) { try { /* @@ -313,7 +313,8 @@ public void handlePostFrameCompleteOperations(VirtualProc proc, * specified in the show's service override, service or 2GB. */ if (report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE - || report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) { + || report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE + || frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) { long increase = CueUtil.GB2; // since there can be multiple services, just going for the diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 2adef34fb..997e32fd4 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -21,12 +21,16 @@ import java.sql.Timestamp; import java.util.ArrayList; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.imageworks.spcue.JobInterface; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.springframework.beans.factory.annotation.Autowired; @@ -50,6 +54,7 @@ import com.imageworks.spcue.dispatcher.commands.DispatchBookHostLocal; import com.imageworks.spcue.dispatcher.commands.DispatchHandleHostReport; import com.imageworks.spcue.dispatcher.commands.DispatchRqdKillFrame; +import com.imageworks.spcue.dispatcher.commands.DispatchRqdKillFrameMemory; import com.imageworks.spcue.grpc.host.HardwareState; import com.imageworks.spcue.grpc.host.LockState; import com.imageworks.spcue.grpc.report.BootReport; @@ -63,10 +68,11 @@ import com.imageworks.spcue.service.CommentManager; import com.imageworks.spcue.service.HostManager; import com.imageworks.spcue.service.JobManager; -import com.imageworks.spcue.service.JobManagerSupport; import com.imageworks.spcue.util.CueExceptionUtil; import com.imageworks.spcue.util.CueUtil; +import static com.imageworks.spcue.dispatcher.Dispatcher.*; + public class HostReportHandler { private static final Logger logger = LogManager.getLogger(HostReportHandler.class); @@ -81,7 +87,6 @@ public class HostReportHandler { private Dispatcher localDispatcher; private RqdClient rqdClient; private JobManager jobManager; - private JobManagerSupport jobManagerSupport; private JobDao jobDao; private LayerDao layerDao; @Autowired @@ -93,6 +98,13 @@ public class HostReportHandler { "space on the temporary directory (mcp)"; private static final String CUEBOT_COMMENT_USER = "cuebot"; + // A cache to store kill requests and count the number of occurrences. + // The cache expires after write to avoid growing unbounded. If a request for a host-frame doesn't appear + // for a period of time, the entry will be removed. + Cache killRequestCounterCache = CacheBuilder.newBuilder() + .expireAfterWrite(FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES, TimeUnit.MINUTES) + .build(); + /** * Boolean to toggle if this class is accepting data or not. */ @@ -143,7 +155,6 @@ public void queueHostReport(HostReport report) { reportQueue.execute(new DispatchHandleHostReport(report, this)); } - public void handleHostReport(HostReport report, boolean isBoot) { long startTime = System.currentTimeMillis(); try { @@ -211,9 +222,9 @@ public void handleHostReport(HostReport report, boolean isBoot) { killTimedOutFrames(report); /* - * Increase/decreased reserved memory. + * Prevent OOM (Out-Of-Memory) issues on the host and manage frame reserved memory */ - handleMemoryReservations(host, report); + handleMemoryUsage(host, report); /* * The checks are done in order of least CPU intensive to @@ -456,103 +467,279 @@ private void changeLockState(DispatchHost host, CoreDetail coreInfo) { } /** - * Handle memory reservations for the given host. This will re-balance memory - * reservations on the machine and kill and frames that are out of control. - * + * Prevent host from entering an OOM state where oom-killer might start killing important OS processes. + * The kill logic will kick in one of the following conditions is met: + * - Host has less than OOM_MEMORY_LEFT_THRESHOLD_PERCENT memory available + * - A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had reserved + * For frames that are using more than they had reserved but not above the threshold, negotiate expanding + * the reservations with other frames on the same host * @param host * @param report */ - private void handleMemoryReservations(final DispatchHost host, final HostReport report) { + private void handleMemoryUsage(final DispatchHost host, final HostReport report) { + // Don't keep memory balances on nimby hosts + if (host.isNimby) { + return; + } - // TODO: GPU: Need to keep frames from growing into space reserved for GPU frames - // However all this is done in the database without a chance to edit the values here + final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_max_safe_used_memory_threshold", Double.class); + final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_frame_overboard_allowed_threshold", Double.class); + RenderHost renderHost = report.getHost(); + List runningFrames = report.getFramesList(); + + boolean memoryWarning = renderHost.getTotalMem() > 0 && + ((double)renderHost.getFreeMem()/renderHost.getTotalMem() < + (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); + + if (memoryWarning) { + long memoryAvailable = renderHost.getFreeMem(); + long minSafeMemoryAvailable = (long)(renderHost.getTotalMem() * (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); + // Only allow killing up to 10 frames at a time + int killAttemptsRemaining = 10; + VirtualProc killedProc = null; + do { + killedProc = killWorstMemoryOffender(host); + killAttemptsRemaining -= 1; + if (killedProc != null) { + memoryAvailable = memoryAvailable + killedProc.memoryUsed; + } + } while (killAttemptsRemaining > 0 && + memoryAvailable < minSafeMemoryAvailable && + killedProc != null); + } else { + // When no mass cleaning was required, check for frames going overboard + // if frames didn't go overboard, manage its reservations trying to increase + // them accordingly + for (final RunningFrameInfo frame : runningFrames) { + if (OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD > 0 && isFrameOverboard(frame)) { + if (!killFrameOverusingMemory(frame, host.getName())) { + logger.warn("Frame " + frame.getJobName() + "." + frame.getFrameName() + + " is overboard but could not be killed"); + } + } else { + handleMemoryReservations(frame); + } + } + } + } - /* - * Check to see if we enable kill mode to free up memory. - */ - boolean killMode = hostManager.isSwapping(host); + public enum KillCause { + FrameOverboard("This frame is using more memory than it had reserved."), + HostUnderOom("Frame killed by host under OOM pressure"), + FrameTimedOut("Frame timed out"), + FrameLluTimedOut("Frame LLU timed out"), + FrameVerificationFailure("Frame failed to be verified on the database"); + private final String message; - for (final RunningFrameInfo f: report.getFramesList()) { + private KillCause(String message) { + this.message = message; + } + @Override + public String toString() { + return message; + } + } - VirtualProc proc = null; + private boolean killFrameOverusingMemory(RunningFrameInfo frame, String hostname) { + try { + VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId()); + + // Don't mess with localDispatch procs + if (proc.isLocalDispatch) { + return false; + } + + logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() + + ", using too much memory."); + return killProcForMemory(proc, hostname, KillCause.FrameOverboard); + } catch (EmptyResultDataAccessException e) { + return false; + } + } + + private boolean getKillClearance(String hostname, String frameId) { + String cacheKey = hostname + "-" + frameId; + final int FRAME_KILL_RETRY_LIMIT = + env.getRequiredProperty("dispatcher.frame_kill_retry_limit", Integer.class); + + // Cache frame+host receiving a killRequest and count how many times the request is being retried + // meaning rqd is probably failing at attempting to kill the related proc + long cachedCount; + try { + cachedCount = 1 + killRequestCounterCache.get(cacheKey, () -> 0L); + } catch (ExecutionException e) { + return false; + } + killRequestCounterCache.put(cacheKey, cachedCount); + if (cachedCount > FRAME_KILL_RETRY_LIMIT) { + FrameInterface frame = jobManager.getFrame(frameId); + JobInterface job = jobManager.getJob(frame.getJobId()); + + logger.warn("KillRequest blocked for " + job.getName() + "." + frame.getName() + + " blocked for host " + hostname + ". The kill retry limit has been reached."); + return false; + } + return true; + } + + private boolean killProcForMemory(VirtualProc proc, String hostname, KillCause killCause) { + if (!getKillClearance(hostname, proc.frameId)) { + return false; + } + + FrameInterface frame = jobManager.getFrame(proc.frameId); + if (dispatcher.isTestMode()) { + // Different threads don't share the same database state on the test environment + (new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient, + dispatchSupport, dispatcher.isTestMode())).run(); + } else { try { - proc = hostManager.getVirtualProc(f.getResourceId()); + killQueue.execute(new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient, + dispatchSupport, dispatcher.isTestMode())); + } catch (TaskRejectedException e) { + logger.warn("Unable to add a DispatchRqdKillFrame request, task rejected, " + e); + return false; + } + } + DispatchSupport.killedOffenderProcs.incrementAndGet(); + return true; + } - // TODO: handle memory management for local dispatches - // Skip local dispatches for now. - if (proc.isLocalDispatch) { - continue; - } + private boolean killFrame(String frameId, String hostname, KillCause killCause) { + if (!getKillClearance(hostname, frameId)) { + return false; + } + if (dispatcher.isTestMode()) { + // Different threads don't share the same database state on the test environment + (new DispatchRqdKillFrame(hostname, frameId, killCause.toString(), rqdClient)).run(); + } else { + try { + killQueue.execute(new DispatchRqdKillFrame(hostname, + frameId, + killCause.toString(), + rqdClient)); + } catch (TaskRejectedException e) { + logger.warn("Unable to add a DispatchRqdKillFrame request, task rejected, " + e); + } + } + DispatchSupport.killedOffenderProcs.incrementAndGet(); + return true; + } - if (f.getRss() > host.memory) { - try{ - logger.info("Killing frame " + f.getJobName() + "/" + f.getFrameName() + ", " - + proc.getName() + " was OOM"); - try { - killQueue.execute(new DispatchRqdKillFrame(proc, "The frame required " + - CueUtil.KbToMb(f.getRss()) + " but the machine only has " + - CueUtil.KbToMb(host.memory), rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); - } - DispatchSupport.killedOomProcs.incrementAndGet(); - } catch (Exception e) { - logger.info("failed to kill frame on " + proc.getName() + - "," + e); - } - } + /** + * Kill proc with the worst user/reserved memory ratio. + * + * @param host + * @return killed proc, or null if none could be found or failed to be killed + */ + private VirtualProc killWorstMemoryOffender(final DispatchHost host) { + try { + VirtualProc proc = hostManager.getWorstMemoryOffender(host); + logger.info("Killing frame on " + proc.getName() + ", host is under stress."); - if (dispatchSupport.increaseReservedMemory(proc, f.getRss())) { - proc.memoryReserved = f.getRss(); - logger.info("frame " + f.getFrameName() + " on job " + f.getJobName() - + " increased its reserved memory to " + - CueUtil.KbToMb(f.getRss())); - } + if (!killProcForMemory(proc, host.getName(), KillCause.HostUnderOom)) { + proc = null; + } + return proc; + } + catch (EmptyResultDataAccessException e) { + logger.error(host.name + " is under OOM and no proc is running on it."); + return null; + } + } + + /** + * Check frame memory usage comparing the amount used with the amount it had reserved + * @param frame + * @return + */ + private boolean isFrameOverboard(final RunningFrameInfo frame) { + final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_frame_overboard_allowed_threshold", Double.class); + + if (OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD < 0) { + return false; + } + + double rss = (double)frame.getRss(); + double maxRss = (double)frame.getMaxRss(); + final double MAX_RSS_OVERBOARD_THRESHOLD = OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD * 2; + final double RSS_AVAILABLE_FOR_MAX_RSS_TRIGGER = 0.1; + + try { + VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId()); + double reserved = (double)proc.memoryReserved; + + // Last memory report is higher than the threshold + if (isOverboard(rss, reserved, OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD)) { + return true; + } + // If rss is not overboard, handle the situation where the frame might be going overboard from + // time to time but the last report wasn't during a spike. For this case, consider a combination + // of rss and maxRss. maxRss > 2 * threshold and rss > 0.9 + else { + return isOverboard(maxRss, reserved, MAX_RSS_OVERBOARD_THRESHOLD) && + isOverboard(rss, reserved, -RSS_AVAILABLE_FOR_MAX_RSS_TRIGGER); + } + } catch (EmptyResultDataAccessException e) { + logger.info("HostReportHandler(isFrameOverboard): Virtual proc for frame " + + frame.getFrameName() + " on job " + frame.getJobName() + " doesn't exist on the database"); + // Not able to mark the frame overboard is it couldn't be found on the db. + // Proc accounting (verifyRunningProc) should take care of it + return false; + } + } - } catch (ResourceReservationFailureException e) { + private boolean isOverboard(double value, double total, double threshold) { + return value/total >= (1 + threshold); + } - long memNeeded = f.getRss() - proc.memoryReserved; + /** + * Handle memory reservations for the given frame + * + * @param frame + */ + private void handleMemoryReservations(final RunningFrameInfo frame) { + VirtualProc proc = null; + try { + proc = hostManager.getVirtualProc(frame.getResourceId()); - logger.info("frame " + f.getFrameName() + " on job " + f.getJobName() + if (proc.isLocalDispatch) { + return; + } + + if (dispatchSupport.increaseReservedMemory(proc, frame.getRss())) { + proc.memoryReserved = frame.getRss(); + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + + " increased its reserved memory to " + + CueUtil.KbToMb(frame.getRss())); + } + } catch (ResourceReservationFailureException e) { + if (proc != null) { + long memNeeded = frame.getRss() - proc.memoryReserved; + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + "was unable to reserve an additional " + CueUtil.KbToMb(memNeeded) + "on proc " + proc.getName() + ", " + e); - try { if (dispatchSupport.balanceReservedMemory(proc, memNeeded)) { - proc.memoryReserved = f.getRss(); + proc.memoryReserved = frame.getRss(); logger.info("was able to balance host: " + proc.getName()); - } - else { + } else { logger.info("failed to balance host: " + proc.getName()); } } catch (Exception ex) { logger.warn("failed to balance host: " + proc.getName() + ", " + e); } - } catch (EmptyResultDataAccessException e) { - logger.info("HostReportHandler: frame " + f.getFrameName() + - " on job " + f.getJobName() + - " was unable be processed" + - " because the proc could not be found"); - } - } - - if (killMode) { - VirtualProc proc; - try { - proc = hostManager.getWorstMemoryOffender(host); - } - catch (EmptyResultDataAccessException e) { - logger.info(host.name + " is swapping and no proc is running on it."); - return; + } else { + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + + "was unable to reserve an additional memory. Proc could not be found"); } - - logger.info("Killing frame on " + - proc.getName() + ", host is distressed."); - - DispatchSupport.killedOffenderProcs.incrementAndGet(); - jobManagerSupport.kill(proc, new Source( - "The host was dangerously low on memory and swapping.")); + } catch (EmptyResultDataAccessException e) { + logger.info("HostReportHandler: Memory reservations for frame " + frame.getFrameName() + + " on job " + frame.getJobName() + " proc could not be found"); } } @@ -562,7 +749,6 @@ private void handleMemoryReservations(final DispatchHost host, final HostReport * @param rFrames */ private void killTimedOutFrames(HostReport report) { - final Map layers = new HashMap(5); for (RunningFrameInfo frame: report.getFramesList()) { @@ -570,36 +756,16 @@ private void killTimedOutFrames(HostReport report) { LayerDetail layer = layerDao.getLayerDetail(layerId); long runtimeMinutes = ((System.currentTimeMillis() - frame.getStartTime()) / 1000l) / 60; - if (layer.timeout != 0 && runtimeMinutes > layer.timeout){ - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - frame.getFrameId(), - "This frame has reached it timeout.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); - } - } + String hostname = report.getHost().getName(); - if (layer.timeout_llu == 0){ - continue; - } - - if (frame.getLluTime() == 0){ - continue; - } - - long r = System.currentTimeMillis() / 1000; - long lastUpdate = (r - frame.getLluTime()) / 60; + if (layer.timeout != 0 && runtimeMinutes > layer.timeout){ + killFrame(frame.getFrameId(), hostname, KillCause.FrameTimedOut); + } else if (layer.timeout_llu != 0 && frame.getLluTime() != 0) { + long r = System.currentTimeMillis() / 1000; + long lastUpdate = (r - frame.getLluTime()) / 60; - if (layer.timeout_llu != 0 && lastUpdate > (layer.timeout_llu -1)){ - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - frame.getFrameId(), - "This frame has reached it LLU timeout.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); + if (layer.timeout_llu != 0 && lastUpdate > (layer.timeout_llu - 1)){ + killFrame(frame.getFrameId(), hostname, KillCause.FrameLluTimedOut); } } } @@ -725,98 +891,59 @@ public void verifyRunningFrameInfo(HostReport report) { continue; } + if (hostManager.verifyRunningProc(runningFrame.getResourceId(), runningFrame.getFrameId())) { + runningFrames.add(runningFrame); + continue; + } - if (!hostManager.verifyRunningProc(runningFrame.getResourceId(), - runningFrame.getFrameId())) { - - /* - * The frame this proc is running is no longer - * assigned to this proc. Don't ever touch - * the frame record. If we make it here that means - * the proc has been running for over 2 min. - */ - - String msg; - VirtualProc proc = null; + /* + * The frame this proc is running is no longer + * assigned to this proc. Don't ever touch + * the frame record. If we make it here that means + * the proc has been running for over 2 min. + */ + String msg; + VirtualProc proc = null; - try { - proc = hostManager.getVirtualProc(runningFrame.getResourceId()); - msg = "Virutal proc " + proc.getProcId() + + try { + proc = hostManager.getVirtualProc(runningFrame.getResourceId()); + msg = "Virtual proc " + proc.getProcId() + "is assigned to " + proc.getFrameId() + " not " + runningFrame.getFrameId(); - } - catch (Exception e) { - /* - * This will happen if the host goes off line and then - * comes back. In this case, we don't touch the frame - * since it might already be running somewhere else. We - * do however kill the proc. - */ - msg = "Virtual proc did not exist."; - } - - logger.info("warning, the proc " + - runningFrame.getResourceId() + " on host " + - report.getHost().getName() + " was running for " + - (runtimeSeconds / 60.0f) + " minutes " + - runningFrame.getJobName() + "/" + runningFrame.getFrameName() + - "but the DB did not " + - "reflect this " + - msg); - - DispatchSupport.accountingErrors.incrementAndGet(); - - try { - /* - * If the proc did exist unbook it if we can't - * verify its running something. - */ - boolean rqd_kill = false; - if (proc != null) { - - /* - * Check to see if the proc is an orphan. - */ - if (hostManager.isOprhan(proc)) { - dispatchSupport.clearVirtualProcAssignement(proc); - dispatchSupport.unbookProc(proc); - rqd_kill = true; - } - } - else { - /* Proc doesn't exist so a kill won't hurt */ - rqd_kill = true; - } - - if (rqd_kill) { - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - runningFrame.getFrameId(), - "OpenCue could not verify this frame.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); - } - } + } + catch (Exception e) { + /* + * This will happen if the host goes offline and then + * comes back. In this case, we don't touch the frame + * since it might already be running somewhere else. We + * do however kill the proc. + */ + msg = "Virtual proc did not exist."; + } - } catch (RqdClientException rqde) { - logger.warn("failed to kill " + - runningFrame.getJobName() + "/" + - runningFrame.getFrameName() + - " when trying to clear a failed " + - " frame verification, " + rqde); - - } catch (Exception e) { - CueExceptionUtil.logStackTrace("failed", e); - logger.warn("failed to verify " + - runningFrame.getJobName() + "/" + - runningFrame.getFrameName() + - " was running but the frame was " + - " unable to be killed, " + e); - } + DispatchSupport.accountingErrors.incrementAndGet(); + if (proc != null && hostManager.isOprhan(proc)) { + dispatchSupport.clearVirtualProcAssignement(proc); + dispatchSupport.unbookProc(proc); + proc = null; } - else { - runningFrames.add(runningFrame); + if (proc == null) { + if (killFrame(runningFrame.getFrameId(), + report.getHost().getName(), + KillCause.FrameVerificationFailure)) { + logger.info("FrameVerificationError, the proc " + + runningFrame.getResourceId() + " on host " + + report.getHost().getName() + " was running for " + + (runtimeSeconds / 60.0f) + " minutes " + + runningFrame.getJobName() + "/" + runningFrame.getFrameName() + + "but the DB did not " + + "reflect this. " + + msg); + } else { + logger.warn("FrameStuckWarning: frameId=" + runningFrame.getFrameId() + + " render_node=" + report.getHost().getName() + " - " + + runningFrame.getJobName() + "/" + runningFrame.getFrameName()); + } } } } @@ -877,14 +1004,6 @@ public void setJobManager(JobManager jobManager) { this.jobManager = jobManager; } - public JobManagerSupport getJobManagerSupport() { - return jobManagerSupport; - } - - public void setJobManagerSupport(JobManagerSupport jobManagerSupport) { - this.jobManagerSupport = jobManagerSupport; - } - public JobDao getJobDao() { return jobDao; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java index 61258a824..fe9bde60e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java @@ -31,9 +31,7 @@ public class DispatchRqdKillFrame extends KeyRunnable { private static final Logger logger = LogManager.getLogger(DispatchRqdKillFrame.class); - private VirtualProc proc = null; private String message; - private String hostname; private String frameId; @@ -47,28 +45,14 @@ public DispatchRqdKillFrame(String hostname, String frameId, String message, Rqd this.rqdClient = rqdClient; } - public DispatchRqdKillFrame(VirtualProc proc, String message, RqdClient rqdClient) { - super("disp_rqd_kill_frame_" + proc.getProcId() + "_" + rqdClient.toString()); - this.proc = proc; - this.hostname = proc.hostName; - this.message = message; - this.rqdClient = rqdClient; - } - @Override public void run() { long startTime = System.currentTimeMillis(); try { - if (proc != null) { - rqdClient.killFrame(proc, message); - } - else { - rqdClient.killFrame(hostname, frameId, message); - } + rqdClient.killFrame(hostname, frameId, message); } catch (RqdClientException e) { logger.info("Failed to contact host " + hostname + ", " + e); - } - finally { + } finally { long elapsedTime = System.currentTimeMillis() - startTime; logger.info("RQD communication with " + hostname + " took " + elapsedTime + "ms"); diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java new file mode 100644 index 000000000..301f77479 --- /dev/null +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java @@ -0,0 +1,78 @@ + +/* + * Copyright Contributors to the OpenCue Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package com.imageworks.spcue.dispatcher.commands; + +import com.imageworks.spcue.FrameInterface; +import com.imageworks.spcue.VirtualProc; +import com.imageworks.spcue.dispatcher.DispatchSupport; +import com.imageworks.spcue.rqd.RqdClient; +import com.imageworks.spcue.rqd.RqdClientException; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +/** + * A runnable to communicate with rqd requesting for a frame to be killed due to memory issues. + *

+ * Before killing a frame, the database is updated to mark the frame status as EXIT_STATUS_MEMORY_FAILURE, + * this allows the FrameCompleteHandler to possibly retry the frame after increasing its memory requirements + */ +public class DispatchRqdKillFrameMemory extends KeyRunnable { + + private static final Logger logger = LogManager.getLogger(DispatchRqdKillFrameMemory.class); + + private String message; + private String hostname; + private DispatchSupport dispatchSupport; + private final RqdClient rqdClient; + private final boolean isTestMode; + + private FrameInterface frame; + + public DispatchRqdKillFrameMemory(String hostname, FrameInterface frame, String message, RqdClient rqdClient, + DispatchSupport dispatchSupport, boolean isTestMode) { + super("disp_rqd_kill_frame_" + frame.getFrameId() + "_" + rqdClient.toString()); + this.frame = frame; + this.hostname = hostname; + this.message = message; + this.rqdClient = rqdClient; + this.dispatchSupport = dispatchSupport; + this.isTestMode = isTestMode; + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + try { + if (dispatchSupport.updateFrameMemoryError(frame) && !isTestMode) { + rqdClient.killFrame(hostname, frame.getFrameId(), message); + } else { + logger.warn("Could not update frame " + frame.getFrameId() + + " status to EXIT_STATUS_MEMORY_FAILURE. Canceling kill request!"); + } + } catch (RqdClientException e) { + logger.warn("Failed to contact host " + hostname + ", " + e); + } finally { + long elapsedTime = System.currentTimeMillis() - startTime; + logger.info("RQD communication with " + hostname + + " took " + elapsedTime + "ms"); + } + } +} + diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java index aaf401688..e62d8647b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java @@ -70,15 +70,6 @@ public interface HostManager { */ void setHostFreeTempDir(HostInterface host, Long freeTempDir); - /** - * Return true if the host is swapping hard enough - * that killing frames will save the entire machine. - * - * @param host - * @return - */ - boolean isSwapping(HostInterface host); - DispatchHost createHost(HostReport report); DispatchHost createHost(RenderHost host); diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java index a1533d695..36de34a1c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java @@ -98,12 +98,6 @@ public void setHostFreeTempDir(HostInterface host, Long freeTempDir) { hostDao.updateHostFreeTempDir(host, freeTempDir); } - @Override - @Transactional(propagation = Propagation.REQUIRED, readOnly=true) - public boolean isSwapping(HostInterface host) { - return hostDao.isKillMode(host); - } - public void rebootWhenIdle(HostInterface host) { try { hostDao.updateHostState(host, HardwareState.REBOOT_WHEN_IDLE); diff --git a/cuebot/src/main/resources/conf/spring/applicationContext-service.xml b/cuebot/src/main/resources/conf/spring/applicationContext-service.xml index 35bd5cbb8..5aedc91b3 100644 --- a/cuebot/src/main/resources/conf/spring/applicationContext-service.xml +++ b/cuebot/src/main/resources/conf/spring/applicationContext-service.xml @@ -384,7 +384,6 @@ - diff --git a/cuebot/src/main/resources/opencue.properties b/cuebot/src/main/resources/opencue.properties index 6b2875899..b7f2a23ff 100644 --- a/cuebot/src/main/resources/opencue.properties +++ b/cuebot/src/main/resources/opencue.properties @@ -130,7 +130,21 @@ dispatcher.booking_queue.max_pool_size=6 # Queue capacity for booking. dispatcher.booking_queue.queue_capacity=1000 -# Whether or not to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success +# Percentage of used memory to consider a risk for triggering oom-killer +dispatcher.oom_max_safe_used_memory_threshold=0.95 + +# How much can a frame exceed its reserved memory. +# - 0.5 means 50% above reserve +# - -1.0 makes the feature inactive +# This feature is being kept inactive for now as we work on improving the +# frame retry logic (See commit comment for more details). +dispatcher.oom_frame_overboard_allowed_threshold=-1.0 + +# How many times should cuebot send a kill request for the same frame-host before reporting +# the frame as stuck +dispatcher.frame_kill_retry_limit=3 + +# Whether to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success depend.satisfy_only_on_frame_success=true # Jobs will be archived to the history tables after being completed for this long. diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java index a6261e464..918b43679 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java @@ -331,24 +331,6 @@ public void testIsHostLocked() { assertEquals(hostDao.isHostLocked(host),true); } - @Test - @Transactional - @Rollback(true) - public void testIsKillMode() { - hostDao.insertRenderHost(buildRenderHost(TEST_HOST), - hostManager.getDefaultAllocationDetail(), - false); - - HostEntity host = hostDao.findHostDetail(TEST_HOST); - assertFalse(hostDao.isKillMode(host)); - - jdbcTemplate.update( - "UPDATE host_stat SET int_swap_free = ?, int_mem_free = ? WHERE pk_host = ?", - CueUtil.MB256, CueUtil.MB256, host.getHostId()); - - assertTrue(hostDao.isKillMode(host)); - } - @Test @Transactional @Rollback(true) diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java index d313c5293..1f452e92a 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java @@ -310,10 +310,11 @@ private void executeDepend( DispatchJob dispatchJob = jobManager.getDispatchJob(proc.getJobId()); DispatchFrame dispatchFrame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); + FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); dispatchSupport.stopFrame(dispatchFrame, frameState, report.getExitStatus(), report.getFrame().getMaxRss()); frameCompleteHandler.handlePostFrameCompleteOperations(proc, - report, dispatchJob, dispatchFrame, frameState); + report, dispatchJob, dispatchFrame, frameState, frameDetail); assertTrue(jobManager.isLayerComplete(layerFirst)); assertFalse(jobManager.isLayerComplete(layerSecond)); @@ -401,10 +402,11 @@ private void executeMinMemIncrease(int expected, boolean override) { DispatchJob dispatchJob = jobManager.getDispatchJob(proc.getJobId()); DispatchFrame dispatchFrame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); + FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); dispatchSupport.stopFrame(dispatchFrame, FrameState.DEAD, report.getExitStatus(), report.getFrame().getMaxRss()); frameCompleteHandler.handlePostFrameCompleteOperations(proc, - report, dispatchJob, dispatchFrame, FrameState.WAITING); + report, dispatchJob, dispatchFrame, FrameState.WAITING, frameDetail); assertFalse(jobManager.isLayerComplete(layer)); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java index ce1e98ae1..120c620a1 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java @@ -83,12 +83,12 @@ private static RenderHost getRenderHost() { .setBootTime(1192369572) // The minimum amount of free space in the temporary directory to book a host. .setFreeMcp(CueUtil.GB) - .setFreeMem(53500) - .setFreeSwap(20760) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) .setTotalMcp(CueUtil.GB4) - .setTotalMem(1048576L * 4096) - .setTotalSwap(20960) + .setTotalMem(CueUtil.GB8) + .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) .setNumProcs(2) .setCoresPerProc(100) @@ -115,7 +115,7 @@ public void testHandleHostReport() { hostReportHandler.handleHostReport(report, true); DispatchHost host = getHost(); assertEquals(host.lockState, LockState.OPEN); - assertEquals(host.memory, 4294443008L); + assertEquals(host.memory, CueUtil.GB8 - 524288); assertEquals(host.gpus, 64); assertEquals(host.idleGpus, 64); assertEquals(host.gpuMemory, 1048576L * 2048); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java index 970b97a95..971df8d14 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java @@ -21,10 +21,15 @@ import java.io.File; import java.sql.Timestamp; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import javax.annotation.Resource; +import com.imageworks.spcue.dispatcher.DispatchSupport; +import com.imageworks.spcue.dispatcher.HostReportQueue; +import com.imageworks.spcue.dispatcher.FrameCompleteHandler; +import com.imageworks.spcue.grpc.job.FrameState; import org.junit.Before; import org.junit.Test; import org.springframework.test.annotation.Rollback; @@ -44,6 +49,7 @@ import com.imageworks.spcue.grpc.report.HostReport; import com.imageworks.spcue.grpc.report.RenderHost; import com.imageworks.spcue.grpc.report.RunningFrameInfo; +import com.imageworks.spcue.grpc.report.FrameCompleteReport; import com.imageworks.spcue.service.AdminManager; import com.imageworks.spcue.service.CommentManager; import com.imageworks.spcue.service.HostManager; @@ -52,6 +58,7 @@ import com.imageworks.spcue.test.TransactionalTest; import com.imageworks.spcue.util.CueUtil; import com.imageworks.spcue.VirtualProc; +import com.imageworks.spcue.LayerDetail; import java.util.UUID; @@ -70,6 +77,9 @@ public class HostReportHandlerTests extends TransactionalTest { @Resource HostReportHandler hostReportHandler; + @Resource + FrameCompleteHandler frameCompleteHandler; + @Resource Dispatcher dispatcher; @@ -99,9 +109,9 @@ public void setTestMode() { public void createHost() { hostname = UUID.randomUUID().toString().substring(0, 8); hostname2 = UUID.randomUUID().toString().substring(0, 8); - hostManager.createHost(getRenderHost(hostname, HardwareState.UP), + hostManager.createHost(getRenderHost(hostname), adminManager.findAllocationDetail("spi","general")); - hostManager.createHost(getRenderHost(hostname2, HardwareState.UP), + hostManager.createHost(getRenderHost(hostname2), adminManager.findAllocationDetail("spi","general")); } @@ -118,52 +128,32 @@ private DispatchHost getHost(String hostname) { return hostManager.findDispatchHost(hostname); } - private static RenderHost getRenderHost(String hostname, HardwareState state) { + private static RenderHost.Builder getRenderHostBuilder(String hostname) { return RenderHost.newBuilder() .setName(hostname) .setBootTime(1192369572) // The minimum amount of free space in the temporary directory to book a host. .setFreeMcp(CueUtil.GB) - .setFreeMem((int) CueUtil.GB8) - .setFreeSwap(20760) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) .setTotalMcp(CueUtil.GB4) .setTotalMem(CueUtil.GB8) .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) - .setNumProcs(2) + .setNumProcs(16) .setCoresPerProc(100) .addTags("test") - .setState(state) + .setState(HardwareState.UP) .setFacility("spi") .putAttributes("SP_OS", "Linux") - .setFreeGpuMem((int) CueUtil.MB512) - .setTotalGpuMem((int) CueUtil.MB512) - .build(); + .setNumGpus(0) + .setFreeGpuMem(0) + .setTotalGpuMem(0); } - private static RenderHost getRenderHost(String hostname, HardwareState state, Long freeTempDir) { - return RenderHost.newBuilder() - .setName(hostname) - .setBootTime(1192369572) - // The minimum amount of free space in the temporary directory to book a host. - .setFreeMcp(freeTempDir) - .setFreeMem((int) CueUtil.GB8) - .setFreeSwap(20760) - .setLoad(0) - .setTotalMcp(freeTempDir * 4) - .setTotalMem(CueUtil.GB8) - .setTotalSwap(CueUtil.GB2) - .setNimbyEnabled(false) - .setNumProcs(2) - .setCoresPerProc(100) - .addTags("test") - .setState(state) - .setFacility("spi") - .putAttributes("SP_OS", "Linux") - .setFreeGpuMem((int) CueUtil.MB512) - .setTotalGpuMem((int) CueUtil.MB512) - .build(); + private static RenderHost getRenderHost(String hostname) { + return getRenderHostBuilder(hostname).build(); } private static RenderHost getNewRenderHost(String tags) { @@ -172,12 +162,12 @@ private static RenderHost getNewRenderHost(String tags) { .setBootTime(1192369572) // The minimum amount of free space in the temporary directory to book a host. .setFreeMcp(CueUtil.GB) - .setFreeMem(53500) - .setFreeSwap(20760) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) - .setTotalMcp(CueUtil.GB4) - .setTotalMem(8173264) - .setTotalSwap(20960) + .setTotalMcp(195430) + .setTotalMem(CueUtil.GB8) + .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) .setNumProcs(2) .setCoresPerProc(100) @@ -196,15 +186,15 @@ private static RenderHost getNewRenderHost(String tags) { public void testHandleHostReport() throws InterruptedException { CoreDetail cores = getCoreDetail(200, 200, 0, 0); HostReport report1 = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP)) + .setHost(getRenderHost(hostname)) .setCoreInfo(cores) .build(); HostReport report2 = HostReport.newBuilder() - .setHost(getRenderHost(hostname2, HardwareState.UP)) + .setHost(getRenderHost(hostname2)) .setCoreInfo(cores) .build(); HostReport report1_2 = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP)) + .setHost(getRenderHost(hostname)) .setCoreInfo(getCoreDetail(200, 200, 100, 0)) .build(); @@ -314,7 +304,7 @@ public void testHandleHostReportWithFullTemporaryDirectories() { * */ // Create HostReport HostReport report1 = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP, 1024L)) + .setHost(getRenderHostBuilder(hostname).setFreeMcp(1024L).build()) .setCoreInfo(cores) .build(); // Call handleHostReport() => Create the comment with subject=SUBJECT_COMMENT_FULL_TEMP_DIR and change the @@ -356,7 +346,7 @@ public void testHandleHostReportWithFullTemporaryDirectories() { * */ // Set the host freeTempDir to the minimum size required = 1GB (1048576 KB) HostReport report2 = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP, 1048576L)) + .setHost(getRenderHostBuilder(hostname).setFreeMcp(CueUtil.GB).build()) .setCoreInfo(cores) .build(); // Call handleHostReport() => Delete the comment with subject=SUBJECT_COMMENT_FULL_TEMP_DIR and change the @@ -397,7 +387,7 @@ public void testHandleHostReportWithHardwareStateRepairNotRelatedToFullTempDir() * */ // Create HostReport HostReport report = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP, 1048576L)) + .setHost(getRenderHostBuilder(hostname).setFreeMcp(CueUtil.GB).build()) .setCoreInfo(cores) .build(); // Get host @@ -451,7 +441,7 @@ public void testMemoryAndLlu() { .setMaxRss(420000) .build(); HostReport report = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP)) + .setHost(getRenderHost(hostname)) .setCoreInfo(cores) .addFrames(info) .build(); @@ -462,5 +452,170 @@ public void testMemoryAndLlu() { assertEquals(frame.dateLLU, new Timestamp(now / 1000 * 1000)); assertEquals(420000, frame.maxRss); } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionRss() { + jobLauncher.testMode = true; + dispatcher.setTestMode(true); + + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(1, procs.size()); + VirtualProc proc = procs.get(0); + + // 1.6 = 1 + dispatcher.oom_frame_overboard_allowed_threshold + long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * 1.6); + + // Test rss overboard + RunningFrameInfo info = RunningFrameInfo.newBuilder() + .setJobId(proc.getJobId()) + .setLayerId(proc.getLayerId()) + .setFrameId(proc.getFrameId()) + .setResourceId(proc.getProcId()) + .setRss(memoryOverboard) + .setMaxRss(memoryOverboard) + .build(); + HostReport report = HostReport.newBuilder() + .setHost(getRenderHost(hostname)) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addFrames(info) + .build(); + + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionMaxRss() { + jobLauncher.testMode = true; + dispatcher.setTestMode(true); + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(1, procs.size()); + VirtualProc proc = procs.get(0); + + // 0.6 = dispatcher.oom_frame_overboard_allowed_threshold + long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * + (1.0 + (2 * 0.6))); + + // Test rss>90% and maxRss overboard + RunningFrameInfo info = RunningFrameInfo.newBuilder() + .setJobId(proc.getJobId()) + .setLayerId(proc.getLayerId()) + .setFrameId(proc.getFrameId()) + .setResourceId(proc.getProcId()) + .setRss((long)Math.ceil(0.95 * proc.memoryReserved)) + .setMaxRss(memoryOverboard) + .build(); + HostReport report = HostReport.newBuilder() + .setHost(getRenderHost(hostname)) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addFrames(info) + .build(); + + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionMemoryWarning() { + jobLauncher.testMode = true; + dispatcher.setTestMode(true); + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_multiple_frames.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(3, procs.size()); + VirtualProc proc1 = procs.get(0); + VirtualProc proc2 = procs.get(1); + VirtualProc proc3 = procs.get(2); + + // Ok + RunningFrameInfo info1 = RunningFrameInfo.newBuilder() + .setJobId(proc1.getJobId()) + .setLayerId(proc1.getLayerId()) + .setFrameId(proc1.getFrameId()) + .setResourceId(proc1.getProcId()) + .setRss(CueUtil.GB2) + .setMaxRss(CueUtil.GB2) + .build(); + + // Overboard Rss + RunningFrameInfo info2 = RunningFrameInfo.newBuilder() + .setJobId(proc2.getJobId()) + .setLayerId(proc2.getLayerId()) + .setFrameId(proc2.getFrameId()) + .setResourceId(proc2.getProcId()) + .setRss(CueUtil.GB4) + .setMaxRss(CueUtil.GB4) + .build(); + + // Overboard Rss + long memoryUsedProc3 = CueUtil.GB8; + RunningFrameInfo info3 = RunningFrameInfo.newBuilder() + .setJobId(proc3.getJobId()) + .setLayerId(proc3.getLayerId()) + .setFrameId(proc3.getFrameId()) + .setResourceId(proc3.getProcId()) + .setRss(memoryUsedProc3) + .setMaxRss(memoryUsedProc3) + .build(); + + RenderHost hostAfterUpdate = getRenderHostBuilder(hostname).setFreeMem(0).build(); + + HostReport report = HostReport.newBuilder() + .setHost(hostAfterUpdate) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addAllFrames(Arrays.asList(info1, info2, info3)) + .build(); + + // Get layer state before report gets sent + LayerDetail layerBeforeIncrease = jobManager.getLayerDetail(proc3.getLayerId()); + + // In this case, killing one job should be enough to ge the machine to a safe state + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + + // Confirm the frame will be set to retry after it's completion has been processed + + RunningFrameInfo runningFrame = RunningFrameInfo.newBuilder() + .setFrameId(proc3.getFrameId()) + .setFrameName("frame_name") + .setLayerId(proc3.getLayerId()) + .setRss(memoryUsedProc3) + .setMaxRss(memoryUsedProc3) + .setResourceId(proc3.id) + .build(); + FrameCompleteReport completeReport = FrameCompleteReport.newBuilder() + .setHost(hostAfterUpdate) + .setFrame(runningFrame) + .setExitSignal(9) + .setRunTime(1) + .setExitStatus(1) + .build(); + + frameCompleteHandler.handleFrameCompleteReport(completeReport); + FrameDetail killedFrame = jobManager.getFrameDetail(proc3.getFrameId()); + LayerDetail layer = jobManager.getLayerDetail(proc3.getLayerId()); + assertEquals(FrameState.WAITING, killedFrame.state); + // Memory increases are processed in two different places one will set the new value to proc.reserved + 2GB + // and the other will set to the maximum reported proc.maxRss the end value will be whoever is higher. + // In this case, proc.maxRss + assertEquals(Math.max(memoryUsedProc3, layerBeforeIncrease.getMinimumMemory() + CueUtil.GB2), + layer.getMinimumMemory()); + } } diff --git a/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml b/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml new file mode 100644 index 000000000..3baa0b22b --- /dev/null +++ b/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml @@ -0,0 +1,48 @@ + + + + + + + + + spi + pipe + default + testuser + 9860 + + + False + 2 + False + + + + echo hello + 1-3 + 1 + 2gb + + + shell + + + + + + diff --git a/cuebot/src/test/resources/opencue.properties b/cuebot/src/test/resources/opencue.properties index 00d0c4463..10516f881 100644 --- a/cuebot/src/test/resources/opencue.properties +++ b/cuebot/src/test/resources/opencue.properties @@ -38,7 +38,7 @@ dispatcher.job_query_max=20 dispatcher.job_lock_expire_seconds=2 dispatcher.job_lock_concurrency_level=3 dispatcher.frame_query_max=10 -dispatcher.job_frame_dispatch_max=2 +dispatcher.job_frame_dispatch_max=3 dispatcher.host_frame_dispatch_max=12 dispatcher.launch_queue.core_pool_size=1 @@ -64,9 +64,8 @@ dispatcher.kill_queue.queue_capacity=1000 dispatcher.booking_queue.core_pool_size=6 dispatcher.booking_queue.max_pool_size=6 dispatcher.booking_queue.queue_capacity=1000 - -# The minimum amount of free space in the temporary directory (mcp) to book a host. -# E.g: 1G = 1048576 kB => dispatcher.min_bookable_free_temp_dir_kb=1048576 -# Default = 1G = 1048576 kB -# If equals to -1, it means the feature is turned off -dispatcher.min_bookable_free_temp_dir_kb=1048576 \ No newline at end of file +dispatcher.min_bookable_free_temp_dir_kb=1048576 +dispatcher.min_bookable_free_mcp_kb=1048576 +dispatcher.oom_max_safe_used_memory_threshold=0.95 +dispatcher.oom_frame_overboard_allowed_threshold=0.6 +dispatcher.frame_kill_retry_limit=3 \ No newline at end of file diff --git a/rqd/rqd/rqdservicers.py b/rqd/rqd/rqdservicers.py index 98ab358ac..b736ef43b 100644 --- a/rqd/rqd/rqdservicers.py +++ b/rqd/rqd/rqdservicers.py @@ -67,6 +67,8 @@ def KillRunningFrame(self, request, context): frame = self.rqCore.getRunningFrame(request.frame_id) if frame: frame.kill(message=request.message) + else: + log.warning("Wasn't able to find frame(%s) to kill", request.frame_id) return rqd.compiled_proto.rqd_pb2.RqdStaticKillRunningFrameResponse() def ShutdownRqdNow(self, request, context): diff --git a/rqd/rqd/rqnetwork.py b/rqd/rqd/rqnetwork.py index 9f33e64a2..9d3591fdd 100644 --- a/rqd/rqd/rqnetwork.py +++ b/rqd/rqd/rqnetwork.py @@ -162,6 +162,7 @@ def kill(self, message=""): else: os.killpg(self.pid, rqd.rqconstants.KILL_SIGNAL) finally: + log.warning("kill() successfully killed frameId=%s pid=%s", self.frameId, self.pid) rqd.rqutil.permissionsLow() except OSError as e: log.warning(