Skip to content

Commit 0aff3df

Browse files
Remove unfinished usage job entries for the host
1 parent f0838cd commit 0aff3df

File tree

3 files changed

+46
-14
lines changed

3 files changed

+46
-14
lines changed

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ public interface UsageJobDao extends GenericDao<UsageJobVO, Long> {
3737
UsageJobVO isOwner(String hostname, int pid);
3838

3939
void updateJobSuccess(Long jobId, long startMillis, long endMillis, long execTime, boolean success);
40+
41+
void removeLastOpenJobsOwned(String hostname, int pid);
4042
}

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323

2424

25+
import org.apache.commons.collections.CollectionUtils;
2526
import org.apache.log4j.Logger;
2627
import org.springframework.stereotype.Component;
2728

@@ -116,7 +117,7 @@ public Long checkHeartbeat(String hostname, int pid, int aggregationDuration) {
116117
public UsageJobVO isOwner(String hostname, int pid) {
117118
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
118119
try {
119-
if ((hostname == null) || (pid <= 0)) {
120+
if (hostname == null || pid <= 0) {
120121
return null;
121122
}
122123

@@ -176,7 +177,7 @@ public UsageJobVO getNextImmediateJob() {
176177
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
177178
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
178179
sc.addAnd("jobType", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_TYPE_SINGLE));
179-
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(0));
180+
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_NOT_SCHEDULED));
180181
List<UsageJobVO> jobs = search(sc, filter);
181182

182183
if ((jobs == null) || jobs.isEmpty()) {
@@ -196,4 +197,36 @@ public Date getLastHeartbeat() {
196197
}
197198
return jobs.get(0).getHeartbeat();
198199
}
200+
201+
private List<UsageJobVO> getLastOpenJobsOwned(String hostname, int pid) {
202+
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
203+
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
204+
sc.addAnd("host", SearchCriteria.Op.EQ, hostname);
205+
if (pid > 0) {
206+
sc.addAnd("pid", SearchCriteria.Op.EQ, Integer.valueOf(pid));
207+
}
208+
return listBy(sc);
209+
}
210+
211+
@Override
212+
public void removeLastOpenJobsOwned(String hostname, int pid) {
213+
if (hostname == null) {
214+
return;
215+
}
216+
217+
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
218+
try {
219+
List<UsageJobVO> jobs = getLastOpenJobsOwned(hostname, pid);
220+
if (CollectionUtils.isNotEmpty(jobs)) {
221+
s_logger.info(String.format("Found %s opens job, to remove", jobs.size()));
222+
for (UsageJobVO job : jobs) {
223+
s_logger.debug(String.format("Removing job - id: %d, pid: %d, job type: %d, scheduled: %d, heartbeat: %s",
224+
job.getId(), job.getPid(), job.getJobType(), job.getScheduled(), job.getHeartbeat()));
225+
remove(job.getId());
226+
}
227+
}
228+
} finally {
229+
txn.close();
230+
}
231+
}
199232
}

usage/src/main/java/com/cloud/usage/UsageManagerImpl.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,9 @@ public boolean start() {
331331
_sanity = _sanityExecutor.scheduleAtFixedRate(new SanityCheck(), 1, _sanityCheckInterval, TimeUnit.DAYS);
332332
}
333333

334+
_usageJobDao.removeLastOpenJobsOwned(_hostname, 0);
334335
Runtime.getRuntime().addShutdownHook(new AbandonJob());
336+
335337
TransactionLegacy usageTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
336338
try {
337339
if (_heartbeatLock.lock(3)) { // 3 second timeout
@@ -2197,17 +2199,17 @@ protected void runInContext() {
21972199
// the aggregation range away from executing the next job
21982200
long now = System.currentTimeMillis();
21992201
long timeToJob = _jobExecTime.getTimeInMillis() - now;
2200-
long timeSinceJob = 0;
2202+
long timeSinceLastSuccessJob = 0;
22012203
long aggregationDurationMillis = _aggregationDuration * 60L * 1000L;
22022204
long lastSuccess = _usageJobDao.getLastJobSuccessDateMillis();
22032205
if (lastSuccess > 0) {
2204-
timeSinceJob = now - lastSuccess;
2206+
timeSinceLastSuccessJob = now - lastSuccess;
22052207
}
22062208

2207-
if ((timeSinceJob > 0) && (timeSinceJob > (aggregationDurationMillis - 100))) {
2209+
if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) {
22082210
if (timeToJob > (aggregationDurationMillis / 2)) {
22092211
if (s_logger.isDebugEnabled()) {
2210-
s_logger.debug("it's been " + timeSinceJob + " ms since last usage job and " + timeToJob +
2212+
s_logger.debug("it's been " + timeSinceLastSuccessJob + " ms since last usage job and " + timeToJob +
22112213
" ms until next job, scheduling an immediate job to catch up (aggregation duration is " + _aggregationDuration + " minutes)");
22122214
}
22132215
scheduleParse();
@@ -2294,17 +2296,12 @@ protected void runInContext() {
22942296
}
22952297
}
22962298
}
2299+
22972300
private class AbandonJob extends Thread {
22982301
@Override
22992302
public void run() {
2300-
s_logger.info("exitting Usage Manager");
2301-
deleteOpenjob();
2302-
}
2303-
private void deleteOpenjob() {
2304-
UsageJobVO job = _usageJobDao.isOwner(_hostname, _pid);
2305-
if (job != null) {
2306-
_usageJobDao.remove(job.getId());
2307-
}
2303+
s_logger.info("exiting Usage Manager");
2304+
_usageJobDao.removeLastOpenJobsOwned(_hostname, _pid);
23082305
}
23092306
}
23102307
}

0 commit comments

Comments
 (0)