@@ -85,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
8585 private static final ConfigKey <Long > JobExpireMinutes = new ConfigKey <Long >(Long .class , "job.expire.minutes" , "Advanced" , "1440" ,
8686 "Time (in minutes) for async-jobs to be kept in system" , true , ConfigKey .Scope .Global , 60l );
8787 private static final ConfigKey <Long > JobCancelThresholdMinutes = new ConfigKey <Long >(Long .class , "job.cancel.threshold.minutes" , "Advanced" , "60" ,
88- "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long" , true , ConfigKey .Scope .Global , 60l );
88+ "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long" , true , ConfigKey .Scope .Global , 240l );
8989
9090 private static final Logger s_logger = Logger .getLogger (AsyncJobManagerImpl .class );
9191
9292 private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3 ; // 3 seconds
93- private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60 ; // 60 seconds
9493
9594 private static final int MAX_ONETIME_SCHEDULE_SIZE = 50 ;
9695 private static final int HEARTBEAT_INTERVAL = 2000 ;
@@ -706,30 +705,36 @@ public void reallyRun() {
706705 try {
707706 s_logger .trace ("Begin cleanup expired async-jobs" );
708707
709- Date cutTime = new Date (DateUtil .currentGMTTime ().getTime () - JobExpireMinutes .value () * 1000 );
708+ Date cutTime = new Date (DateUtil .currentGMTTime ().getTime () - JobExpireMinutes .value () * 60000 );
710709
711710 // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
712711 // hopefully this will be fast enough to balance potential growth of job table
713712 //1) Expire unfinished jobs that weren't processed yet
714713 List <AsyncJobVO > l = _jobDao .getExpiredUnfinishedJobs (cutTime , 100 );
715714 for (AsyncJobVO job : l ) {
716- s_logger .trace ("Expunging unfinished job " + job );
715+ s_logger .info ("Expunging unfinished job " + job );
716+
717+ _jobMonitor .unregisterByJobId (job .getId ());
717718 expungeAsyncJob (job );
718719 }
719720
720721 //2) Expunge finished jobs
721722 List <AsyncJobVO > completedJobs = _jobDao .getExpiredCompletedJobs (cutTime , 100 );
722723 for (AsyncJobVO job : completedJobs ) {
723724 s_logger .trace ("Expunging completed job " + job );
725+
724726 expungeAsyncJob (job );
725727 }
726728
727729 // forcefully cancel blocking queue items if they've been staying there for too long
728- List <SyncQueueItemVO > blockItems = _queueMgr .getBlockedQueueItems (JobCancelThresholdMinutes .value () * 1000 , false );
730+ List <SyncQueueItemVO > blockItems = _queueMgr .getBlockedQueueItems (JobCancelThresholdMinutes .value () * 60000 , false );
729731 if (blockItems != null && blockItems .size () > 0 ) {
730732 for (SyncQueueItemVO item : blockItems ) {
731733 if (item .getContentType ().equalsIgnoreCase (SyncQueueItem .AsyncJobContentType )) {
734+ s_logger .info ("Remove Job-" + item .getContentId () + " from Queue-" + item .getId () + " since it has been blocked for too long" );
732735 completeAsyncJob (item .getContentId (), JobInfo .Status .FAILED , 0 , "Job is cancelled as it has been blocking others for too long" );
736+
737+ _jobMonitor .unregisterByJobId (item .getContentId ());
733738 }
734739
735740 // purge the item and resume queue processing
0 commit comments