Skip to content

Remove unfinished usage job entries of the host on start #10848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 4.19
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ public interface UsageJobDao extends GenericDao<UsageJobVO, Long> {
UsageJobVO isOwner(String hostname, int pid);

void updateJobSuccess(Long jobId, long startMillis, long endMillis, long execTime, boolean success);

void removeLastOpenJobsOwned(String hostname, int pid);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;


import org.apache.commons.collections.CollectionUtils;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -116,7 +117,7 @@
public UsageJobVO isOwner(String hostname, int pid) {
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
try {
if ((hostname == null) || (pid <= 0)) {
if (hostname == null || pid <= 0) {
return null;
}

Expand Down Expand Up @@ -176,7 +177,7 @@
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
sc.addAnd("jobType", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_TYPE_SINGLE));
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(0));
sc.addAnd("scheduled", SearchCriteria.Op.EQ, Integer.valueOf(UsageJobVO.JOB_NOT_SCHEDULED));

Check warning on line 180 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L180

Added line #L180 was not covered by tests
List<UsageJobVO> jobs = search(sc, filter);

if ((jobs == null) || jobs.isEmpty()) {
Expand All @@ -196,4 +197,36 @@
}
return jobs.get(0).getHeartbeat();
}

private List<UsageJobVO> getLastOpenJobsOwned(String hostname, int pid) {
SearchCriteria<UsageJobVO> sc = createSearchCriteria();
sc.addAnd("endMillis", SearchCriteria.Op.EQ, Long.valueOf(0));
sc.addAnd("host", SearchCriteria.Op.EQ, hostname);

Check warning on line 204 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L201-L204

Added lines #L201 - L204 were not covered by tests
if (pid > 0) {
sc.addAnd("pid", SearchCriteria.Op.EQ, Integer.valueOf(pid));

Check warning on line 206 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L206

Added line #L206 was not covered by tests
}
return listBy(sc);
}

Check warning on line 209 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L208-L209

Added lines #L208 - L209 were not covered by tests

@Override
public void removeLastOpenJobsOwned(String hostname, int pid) {

Check warning on line 212 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L212

Added line #L212 was not covered by tests
if (hostname == null) {
return;

Check warning on line 214 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L214

Added line #L214 was not covered by tests
}

TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
try {
List<UsageJobVO> jobs = getLastOpenJobsOwned(hostname, pid);

Check warning on line 219 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L217-L219

Added lines #L217 - L219 were not covered by tests
if (CollectionUtils.isNotEmpty(jobs)) {
s_logger.info(String.format("Found %s opens job, to remove", jobs.size()));

Check warning on line 221 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L221

Added line #L221 was not covered by tests
for (UsageJobVO job : jobs) {
s_logger.debug(String.format("Removing job - id: %d, pid: %d, job type: %d, scheduled: %d, heartbeat: %s",
job.getId(), job.getPid(), job.getJobType(), job.getScheduled(), job.getHeartbeat()));
remove(job.getId());
}
}

Check warning on line 227 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L223-L227

Added lines #L223 - L227 were not covered by tests
} finally {
txn.close();

Check warning on line 229 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L229

Added line #L229 was not covered by tests
}
}

Check warning on line 231 in engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/usage/dao/UsageJobDaoImpl.java#L231

Added line #L231 was not covered by tests
}
23 changes: 10 additions & 13 deletions usage/src/main/java/com/cloud/usage/UsageManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@
s_logger.info("Starting Usage Manager");
}

_usageJobDao.removeLastOpenJobsOwned(_hostname, 0);
Runtime.getRuntime().addShutdownHook(new AbandonJob());

Check warning on line 323 in usage/src/main/java/com/cloud/usage/UsageManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

usage/src/main/java/com/cloud/usage/UsageManagerImpl.java#L322-L323

Added lines #L322 - L323 were not covered by tests

// use the configured exec time and aggregation duration for scheduling the job
_scheduledFuture =
_executor.scheduleAtFixedRate(this, _jobExecTime.getTimeInMillis() - System.currentTimeMillis(), _aggregationDuration * 60 * 1000, TimeUnit.MILLISECONDS);
Expand All @@ -331,7 +334,6 @@
_sanity = _sanityExecutor.scheduleAtFixedRate(new SanityCheck(), 1, _sanityCheckInterval, TimeUnit.DAYS);
}

Runtime.getRuntime().addShutdownHook(new AbandonJob());
TransactionLegacy usageTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
try {
if (_heartbeatLock.lock(3)) { // 3 second timeout
Expand Down Expand Up @@ -2197,17 +2199,17 @@
// the aggregation range away from executing the next job
long now = System.currentTimeMillis();
long timeToJob = _jobExecTime.getTimeInMillis() - now;
long timeSinceJob = 0;
long timeSinceLastSuccessJob = 0;
long aggregationDurationMillis = _aggregationDuration * 60L * 1000L;
long lastSuccess = _usageJobDao.getLastJobSuccessDateMillis();
if (lastSuccess > 0) {
timeSinceJob = now - lastSuccess;
timeSinceLastSuccessJob = now - lastSuccess;
}

if ((timeSinceJob > 0) && (timeSinceJob > (aggregationDurationMillis - 100))) {
if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) {

Check warning on line 2209 in usage/src/main/java/com/cloud/usage/UsageManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

usage/src/main/java/com/cloud/usage/UsageManagerImpl.java#L2209

Added line #L2209 was not covered by tests
if (timeToJob > (aggregationDurationMillis / 2)) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("it's been " + timeSinceJob + " ms since last usage job and " + timeToJob +
s_logger.debug("it's been " + timeSinceLastSuccessJob + " ms since last usage job and " + timeToJob +

Check warning on line 2212 in usage/src/main/java/com/cloud/usage/UsageManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

usage/src/main/java/com/cloud/usage/UsageManagerImpl.java#L2212

Added line #L2212 was not covered by tests
" ms until next job, scheduling an immediate job to catch up (aggregation duration is " + _aggregationDuration + " minutes)");
}
scheduleParse();
Expand Down Expand Up @@ -2294,17 +2296,12 @@
}
}
}

private class AbandonJob extends Thread {
@Override
public void run() {
s_logger.info("exitting Usage Manager");
deleteOpenjob();
}
private void deleteOpenjob() {
UsageJobVO job = _usageJobDao.isOwner(_hostname, _pid);
if (job != null) {
_usageJobDao.remove(job.getId());
}
s_logger.info("exiting Usage Manager");
_usageJobDao.removeLastOpenJobsOwned(_hostname, _pid);
}
}
}
Loading