Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* The current logic relies on hardcoded values which are not suitable for large hosts. The new logic takes into account the size of hosts and also tries to be more aggressive with misbehaving frames.

Prevent host from entering an OOM state where oom-killer might start killing important OS processes. The kill logic will kick in one of the following conditions is met:

Host has less than OOM_MEMORY_LEFT_THRESHOLD_PERCENT memory available
A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had reserved For frames that are using more than they had reserved but not above the threshold, negotiate expanding the reservations with other frames on the same host

(cherry picked from commit e88a5295f23bd927614de6d5af6a09d496d3e6ac)

* Frames killed for OOM should be retried

(cherry picked from commit b88f7bcb1ad43f83fb8357576c33483dc2bf4952)

* OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD can be deactivated with -1

(cherry picked from commit 647e75e2254c7a7ff68c544e438080f412bf04c1)

* Limit the number of kill retries

There's an error condition on rqd where a frame that cannot be killed will end up preventing the host from picking up new jobs. This logic limits the number of repeated killRequests to give host a chance to pick up new jobs. At the same time, blocked frames are logged to spcue.log to be handled manually.

(cherry picked from commit aea4864ef66aca494fb455a7c103e4a832b63d41)

* Fix merge conflicts

* Handle MR comments

* Minor improvements to the logic

Signed-off-by: Diego Tavares <[email protected]>

---------

Signed-off-by: Diego Tavares <[email protected]>
  • Loading branch information
DiegoTavares authored and carlosfelgarcia committed May 22, 2024
1 parent edf899e commit f834ee7
Show file tree
Hide file tree
Showing 25 changed files with 764 additions and 376 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* Copyright Contributors to the OpenCue Project
*
Expand Down
7 changes: 7 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ boolean updateFrameStopped(FrameInterface frame, FrameState state, int exitStatu
* @return
*/
boolean updateFrameCleared(FrameInterface frame);
/**
* Sets a frame exitStatus to EXIT_STATUS_MEMORY_FAILURE
*
* @param frame
* @return whether the frame has been updated
*/
boolean updateFrameMemoryError(FrameInterface frame);

/**
* Sets a frame to an unreserved waiting state.
Expand Down
9 changes: 0 additions & 9 deletions cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,6 @@ public interface HostDao {
*/
void updateThreadMode(HostInterface host, ThreadMode mode);

/**
* When a host is in kill mode that means its 256MB+ into the swap and the
* the worst memory offender is killed.
*
* @param h HostInterface
* @return boolean
*/
boolean isKillMode(HostInterface h);

/**
* Update the specified host's hardware information.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ public boolean updateFrameCleared(FrameInterface frame) {
return updateFrame(frame, Dispatcher.EXIT_STATUS_FRAME_CLEARED) > 0;
}

private static final String UPDATE_FRAME_MEMORY_ERROR =
"UPDATE "+
"frame "+
"SET " +
"int_exit_status = ?, " +
"int_version = int_version + 1 " +
"WHERE " +
"frame.pk_frame = ? ";
@Override
public boolean updateFrameMemoryError(FrameInterface frame) {
int result = getJdbcTemplate().update(
UPDATE_FRAME_MEMORY_ERROR,
Dispatcher.EXIT_STATUS_MEMORY_FAILURE,
frame.getFrameId());

return result > 0;
}

private static final String UPDATE_FRAME_STARTED =
"UPDATE " +
"frame " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,15 +612,6 @@ public void updateHostOs(HostInterface host, String os) {
os, host.getHostId());
}

@Override
public boolean isKillMode(HostInterface h) {
return getJdbcTemplate().queryForObject(
"SELECT COUNT(1) FROM host_stat WHERE pk_host = ? " +
"AND int_swap_total - int_swap_free > ? AND int_mem_free < ?",
Integer.class, h.getHostId(), Dispatcher.KILL_MODE_SWAP_THRESHOLD,
Dispatcher.KILL_MODE_MEM_THRESHOLD) > 0;
}

@Override
public int getStrandedCoreUnits(HostInterface h) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {
value, p.getProcId(), value) == 1;
} catch (Exception e) {
// check by trigger erify_host_resources
throw new ResourceReservationFailureException("failed to increase memory reserveration for proc "
throw new ResourceReservationFailureException("failed to increase memory reservation for proc "
+ p.getProcId() + " to " + value + ", proc does not have that much memory to spare.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,14 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
*/
void clearFrame(DispatchFrame frame);

/**
* Sets the frame state exitStatus to EXIT_STATUS_MEMORY_FAILURE
*
* @param frame
* @return whether the frame has been updated
*/
boolean updateFrameMemoryError(FrameInterface frame);

/**
* Update Memory usage data and LLU time for the given frame.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.dao.DataAccessException;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -184,7 +185,11 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {

@Override
public boolean clearVirtualProcAssignement(ProcInterface proc) {
return procDao.clearVirtualProcAssignment(proc);
try {
return procDao.clearVirtualProcAssignment(proc);
} catch (DataAccessException e) {
return false;
}
}

@Transactional(propagation = Propagation.REQUIRED)
Expand Down Expand Up @@ -343,6 +348,12 @@ public void clearFrame(DispatchFrame frame) {
frameDao.updateFrameCleared(frame);
}

@Override
@Transactional(propagation = Propagation.REQUIRED)
public boolean updateFrameMemoryError(FrameInterface frame) {
return frameDao.updateFrameMemoryError(frame);
}

@Transactional(propagation = Propagation.SUPPORTS)
public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) {
int threads = proc.coresReserved / 100;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* Copyright Contributors to the OpenCue Project
*
Expand Down Expand Up @@ -108,13 +107,8 @@ public interface Dispatcher {
// without being penalized for it.
public static final long VIRTUAL_MEM_THRESHHOLD = CueUtil.GB2;

// The amount of swap that must be used before a host can go
// into kill mode.
public static final long KILL_MODE_SWAP_THRESHOLD = CueUtil.MB128;

// When the amount of free memory drops below this point, the
// host can go into kill mode.
public static final long KILL_MODE_MEM_THRESHOLD = CueUtil.MB512;
// How long to keep track of a frame kill request
public static final int FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES = 3;

// A higher number gets more deep booking but less spread on the cue.
public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.imageworks.spcue.DispatchFrame;
import com.imageworks.spcue.DispatchHost;
import com.imageworks.spcue.DispatchJob;
import com.imageworks.spcue.FrameDetail;
import com.imageworks.spcue.JobDetail;
import com.imageworks.spcue.LayerDetail;
import com.imageworks.spcue.LayerInterface;
Expand Down Expand Up @@ -143,49 +144,35 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) {
}

try {

final VirtualProc proc;

try {

proc = hostManager.getVirtualProc(
report.getFrame().getResourceId());
}
catch (EmptyResultDataAccessException e) {
/*
* Do not propagate this exception to RQD. This
* usually means the cue lost connectivity to
* the host and cleared out the record of the proc.
* If this is propagated back to RQD, RQD will
* keep retrying the operation forever.
*/
logger.info("failed to acquire data needed to " +
"process completed frame: " +
report.getFrame().getFrameName() + " in job " +
report.getFrame().getJobName() + "," + e);
return;
}

final VirtualProc proc = hostManager.getVirtualProc(report.getFrame().getResourceId());
final DispatchJob job = jobManager.getDispatchJob(proc.getJobId());
final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId());
final FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId());
final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId());
final FrameState newFrameState = determineFrameState(job, layer, frame, report);
final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() +
"_" + report.getFrame().getFrameId();

if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(),
report.getFrame().getMaxRss())) {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState);
} catch (Exception e) {
logger.warn("Exception during handlePostFrameCompleteOperations " +
"in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e));
if (dispatcher.isTestMode()) {
// Database modifications on a threadpool cannot be captured by the test thread
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState, frameDetail);
} else {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState, frameDetail);
} catch (Exception e) {
logger.warn("Exception during handlePostFrameCompleteOperations " +
"in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e));
}
}
}
});
});
}
}
else {
/*
Expand Down Expand Up @@ -222,6 +209,19 @@ public void run() {
}
}
}
catch (EmptyResultDataAccessException e) {
/*
* Do not propagate this exception to RQD. This
* usually means the cue lost connectivity to
* the host and cleared out the record of the proc.
* If this is propagated back to RQD, RQD will
* keep retrying the operation forever.
*/
logger.info("failed to acquire data needed to " +
"process completed frame: " +
report.getFrame().getFrameName() + " in job " +
report.getFrame().getJobName() + "," + e);
}
catch (Exception e) {

/*
Expand Down Expand Up @@ -259,7 +259,7 @@ public void run() {
*/
public void handlePostFrameCompleteOperations(VirtualProc proc,
FrameCompleteReport report, DispatchJob job, DispatchFrame frame,
FrameState newFrameState) {
FrameState newFrameState, FrameDetail frameDetail) {
try {

/*
Expand Down Expand Up @@ -313,7 +313,8 @@ public void handlePostFrameCompleteOperations(VirtualProc proc,
* specified in the show's service override, service or 2GB.
*/
if (report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
long increase = CueUtil.GB2;

// since there can be multiple services, just going for the
Expand Down
Loading

0 comments on commit f834ee7

Please sign in to comment.