Skip to content

Commit 5dd4fb2

Browse files
committed
Remove cancelled jobs from job monitoring, correct mis-calculated time-unit in job cancellation.
1 parent bbf5a91 commit 5dd4fb2

2 files changed

Lines changed: 35 additions & 10 deletions

File tree

framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
8585
private static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>(Long.class, "job.expire.minutes", "Advanced", "1440",
8686
"Time (in minutes) for async-jobs to be kept in system", true, ConfigKey.Scope.Global, 60l);
8787
private static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>(Long.class, "job.cancel.threshold.minutes", "Advanced", "60",
88-
"Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 60l);
88+
"Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 240l);
8989

9090
private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
9191

9292
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
93-
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds
9493

9594
private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
9695
private static final int HEARTBEAT_INTERVAL = 2000;
@@ -706,30 +705,36 @@ public void reallyRun() {
706705
try {
707706
s_logger.trace("Begin cleanup expired async-jobs");
708707

709-
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 1000);
708+
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000);
710709

711710
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
712711
// hopefully this will be fast enough to balance potential growth of job table
713712
//1) Expire unfinished jobs that weren't processed yet
714713
List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
715714
for (AsyncJobVO job : l) {
716-
s_logger.trace("Expunging unfinished job " + job);
715+
s_logger.info("Expunging unfinished job " + job);
716+
717+
_jobMonitor.unregisterByJobId(job.getId());
717718
expungeAsyncJob(job);
718719
}
719720

720721
//2) Expunge finished jobs
721722
List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
722723
for (AsyncJobVO job : completedJobs) {
723724
s_logger.trace("Expunging completed job " + job);
725+
724726
expungeAsyncJob(job);
725727
}
726728

727729
// forcefully cancel blocking queue items if they've been staying there for too long
728-
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 1000, false);
730+
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false);
729731
if (blockItems != null && blockItems.size() > 0) {
730732
for (SyncQueueItemVO item : blockItems) {
731733
if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
734+
s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long");
732735
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
736+
737+
_jobMonitor.unregisterByJobId(item.getContentId());
733738
}
734739

735740
// purge the item and resume queue processing

framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.cloudstack.framework.jobs.impl;
1818

1919
import java.util.HashMap;
20+
import java.util.Iterator;
2021
import java.util.Map;
2122
import java.util.Timer;
2223
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,8 +39,7 @@
3839
public class AsyncJobMonitor extends ManagerBase {
3940
public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class);
4041

41-
@Inject
42-
private MessageBus _messageBus;
42+
@Inject private MessageBus _messageBus;
4343

4444
private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>();
4545
private final Timer _timer = new Timer();
@@ -86,15 +86,16 @@ private void heartbeat() {
8686
synchronized (this) {
8787
for (Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) {
8888
if (entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) {
89-
s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + entry.getValue().millisSinceLastJobHeartbeat() / 1000 +
90-
" seconds");
89+
s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for "
90+
+ entry.getValue().millisSinceLastJobHeartbeat() / 1000 + " seconds");
9191
}
9292
}
9393
}
9494
}
9595

9696
@Override
97-
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
97+
public boolean configure(String name, Map<String, Object> params)
98+
throws ConfigurationException {
9899

99100
_messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this));
100101
_timer.scheduleAtFixedRate(new ManagedContextTimerTask() {
@@ -141,6 +142,25 @@ public void unregisterActiveTask(long runNumber) {
141142
}
142143
}
143144

145+
public void unregisterByJobId(long jobId) {
146+
synchronized (this) {
147+
Iterator<Map.Entry<Long, ActiveTaskRecord>> it = _activeTasks.entrySet().iterator();
148+
while (it.hasNext()) {
149+
Map.Entry<Long, ActiveTaskRecord> entry = it.next();
150+
if (entry.getValue().getJobId() == jobId) {
151+
s_logger.info("Remove Job-" + entry.getValue().getJobId() + " from job monitoring due to job cancelling");
152+
153+
if (entry.getValue().isPoolThread())
154+
_activePoolThreads.decrementAndGet();
155+
else
156+
_activeInplaceThreads.decrementAndGet();
157+
158+
it.remove();
159+
}
160+
}
161+
}
162+
}
163+
144164
public int getActivePoolThreads() {
145165
return _activePoolThreads.get();
146166
}

0 commit comments

Comments
 (0)