Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Change OOM protection logic #1317

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* Copyright Contributors to the OpenCue Project
*
Expand Down
7 changes: 7 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ boolean updateFrameStopped(FrameInterface frame, FrameState state, int exitStatu
* @return
*/
boolean updateFrameCleared(FrameInterface frame);
/**
* Sets a frame exitStatus to EXIT_STATUS_MEMORY_FAILURE
*
* @param frame
* @return
*/
boolean updateFrameMemoryError(FrameInterface frame);

/**
* Sets a frame to an unreserved waiting state.
Expand Down
9 changes: 0 additions & 9 deletions cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,6 @@ public interface HostDao {
*/
void updateThreadMode(HostInterface host, ThreadMode mode);

/**
* When a host is in kill mode that means its 256MB+ into the swap and the
* the worst memory offender is killed.
*
* @param h HostInterface
* @return boolean
*/
boolean isKillMode(HostInterface h);

/**
* Update the specified host's hardware information.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ public boolean updateFrameCleared(FrameInterface frame) {
return updateFrame(frame, Dispatcher.EXIT_STATUS_FRAME_CLEARED) > 0;
}

private static final String UPDATE_FRAME_MEMORY_ERROR =
"UPDATE "+
"frame "+
"SET " +
"int_exit_status = ?, " +
"int_version = int_version + 1 " +
"WHERE " +
"frame.pk_frame = ? ";
@Override
public boolean updateFrameMemoryError(FrameInterface frame) {
int result = getJdbcTemplate().update(
UPDATE_FRAME_MEMORY_ERROR,
Dispatcher.EXIT_STATUS_MEMORY_FAILURE,
frame.getFrameId());

return result > 0;
}

private static final String UPDATE_FRAME_STARTED =
"UPDATE " +
"frame " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,6 @@ public void updateHostOs(HostInterface host, String os) {
os, host.getHostId());
}

@Override
public boolean isKillMode(HostInterface h) {
return getJdbcTemplate().queryForObject(
"SELECT COUNT(1) FROM host_stat WHERE pk_host = ? " +
"AND int_swap_total - int_swap_free > ? AND int_mem_free < ?",
Integer.class, h.getHostId(), Dispatcher.KILL_MODE_SWAP_THRESHOLD,
Dispatcher.KILL_MODE_MEM_THRESHOLD) > 0;
}

@Override
public int getStrandedCoreUnits(HostInterface h) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {
value, p.getProcId(), value) == 1;
} catch (Exception e) {
// check by trigger erify_host_resources
throw new ResourceReservationFailureException("failed to increase memory reserveration for proc "
throw new ResourceReservationFailureException("failed to increase memory reservation for proc "
+ p.getProcId() + " to " + value + ", proc does not have that much memory to spare.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
*/
void clearFrame(DispatchFrame frame);

/**
* Sets the frame state exitStatus to EXIT_STATUS_MEMORY_FAILURE
*
* @param frame
*/
boolean updateFrameMemoryError(FrameInterface frame);

/**
* Update Memory usage data and LLU time for the given frame.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


import com.imageworks.spcue.AllocationInterface;
import com.imageworks.spcue.DispatchFrame;
import com.imageworks.spcue.DispatchHost;
Expand All @@ -42,6 +43,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.dao.DataAccessException;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -184,7 +186,11 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {

@Override
public boolean clearVirtualProcAssignement(ProcInterface proc) {
return procDao.clearVirtualProcAssignment(proc);
try {
return procDao.clearVirtualProcAssignment(proc);
} catch (DataAccessException e) {
return false;
}
}

@Transactional(propagation = Propagation.REQUIRED)
Expand Down Expand Up @@ -343,6 +349,12 @@ public void clearFrame(DispatchFrame frame) {
frameDao.updateFrameCleared(frame);
}

@Override
@Transactional(propagation = Propagation.REQUIRED)
public boolean updateFrameMemoryError(FrameInterface frame) {
return frameDao.updateFrameMemoryError(frame);
}

@Transactional(propagation = Propagation.SUPPORTS)
public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) {
int threads = proc.coresReserved / 100;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* Copyright Contributors to the OpenCue Project
*
Expand Down Expand Up @@ -108,13 +107,8 @@ public interface Dispatcher {
// without being penalized for it.
public static final long VIRTUAL_MEM_THRESHHOLD = CueUtil.GB2;

// The amount of swap that must be used before a host can go
// into kill mode.
public static final long KILL_MODE_SWAP_THRESHOLD = CueUtil.MB128;

// When the amount of free memory drops below this point, the
// host can go into kill mode.
public static final long KILL_MODE_MEM_THRESHOLD = CueUtil.MB512;
// How long to keep track of a frame kill request
public static final int FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES = 3;

// A higher number gets more deep booking but less spread on the cue.
public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.imageworks.spcue.DispatchJob;
import com.imageworks.spcue.JobDetail;
import com.imageworks.spcue.LayerDetail;
import com.imageworks.spcue.FrameDetail;
import com.imageworks.spcue.LayerInterface;
import com.imageworks.spcue.Source;
import com.imageworks.spcue.VirtualProc;
Expand Down Expand Up @@ -143,49 +144,35 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) {
}

try {

final VirtualProc proc;

try {

proc = hostManager.getVirtualProc(
report.getFrame().getResourceId());
}
catch (EmptyResultDataAccessException e) {
/*
* Do not propagate this exception to RQD. This
* usually means the cue lost connectivity to
* the host and cleared out the record of the proc.
* If this is propagated back to RQD, RQD will
* keep retrying the operation forever.
*/
logger.info("failed to acquire data needed to " +
"process completed frame: " +
report.getFrame().getFrameName() + " in job " +
report.getFrame().getJobName() + "," + e);
return;
}

final VirtualProc proc = hostManager.getVirtualProc(report.getFrame().getResourceId());
final DispatchJob job = jobManager.getDispatchJob(proc.getJobId());
final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId());
final FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId());
final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId());
final FrameState newFrameState = determineFrameState(job, layer, frame, report);
final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() +
"_" + report.getFrame().getFrameId();

if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(),
report.getFrame().getMaxRss())) {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState);
} catch (Exception e) {
logger.warn("Exception during handlePostFrameCompleteOperations " +
"in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e));
if (dispatcher.isTestMode()) {
// Database modifications on a threadpool cannot be captured by the test thread
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState, frameDetail);
} else {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState, frameDetail);
} catch (Exception e) {
logger.warn("Exception during handlePostFrameCompleteOperations " +
"in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e));
}
}
}
});
});
}
}
else {
/*
Expand Down Expand Up @@ -222,6 +209,19 @@ public void run() {
}
}
}
catch (EmptyResultDataAccessException e) {
/*
* Do not propagate this exception to RQD. This
* usually means the cue lost connectivity to
* the host and cleared out the record of the proc.
* If this is propagated back to RQD, RQD will
* keep retrying the operation forever.
*/
logger.info("failed to acquire data needed to " +
"process completed frame: " +
report.getFrame().getFrameName() + " in job " +
report.getFrame().getJobName() + "," + e);
}
catch (Exception e) {

/*
Expand Down Expand Up @@ -259,7 +259,7 @@ public void run() {
*/
public void handlePostFrameCompleteOperations(VirtualProc proc,
FrameCompleteReport report, DispatchJob job, DispatchFrame frame,
FrameState newFrameState) {
FrameState newFrameState, FrameDetail frameDetail) {
try {

/*
Expand Down Expand Up @@ -313,7 +313,8 @@ public void handlePostFrameCompleteOperations(VirtualProc proc,
* specified in the show's service override, service or 2GB.
*/
if (report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
long increase = CueUtil.GB2;

// since there can be multiple services, just going for the
Expand Down
Loading