|
31 | 31 | import java.util.concurrent.Future; |
32 | 32 | import java.util.concurrent.ScheduledExecutorService; |
33 | 33 | import java.util.concurrent.TimeUnit; |
| 34 | +import java.util.concurrent.atomic.AtomicBoolean; |
34 | 35 |
|
35 | 36 | import com.cloud.network.Network; |
36 | 37 | import com.cloud.usage.dao.UsageNetworksDao; |
@@ -179,6 +180,7 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna |
179 | 180 | private final List<UsageVmDiskVO> usageVmDisks = new ArrayList<UsageVmDiskVO>(); |
180 | 181 |
|
181 | 182 | private final ScheduledExecutorService _executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Job")); |
| 183 | + private final AtomicBoolean isParsingJobRunning = new AtomicBoolean(false); |
182 | 184 | private final ScheduledExecutorService _heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-HB")); |
183 | 185 | private final ScheduledExecutorService _sanityExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Sanity")); |
184 | 186 | private Future _scheduledFuture = null; |
@@ -354,7 +356,12 @@ public void run() { |
354 | 356 | (new ManagedContextRunnable() { |
355 | 357 | @Override |
356 | 358 | protected void runInContext() { |
357 | | - runInContextInternal(); |
| 359 | + isParsingJobRunning.set(true); |
| 360 | + try { |
| 361 | + runInContextInternal(); |
| 362 | + } finally { |
| 363 | + isParsingJobRunning.set(false); |
| 364 | + } |
358 | 365 | } |
359 | 366 | }).run(); |
360 | 367 | } |
@@ -2172,9 +2179,14 @@ protected void runInContext() { |
2172 | 2179 |
|
2173 | 2180 | if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) { |
2174 | 2181 | if (timeToJob > (aggregationDurationMillis / 2)) { |
2175 | | - logger.debug("it's been {} ms since last usage job and {} ms until next job, scheduling an immediate job to catch up (aggregation duration is {} minutes)" |
2176 | | - , timeSinceLastSuccessJob, timeToJob, _aggregationDuration); |
2177 | | - scheduleParse(); |
| 2182 | + logger.debug("Heartbeat: it's been {} ms since last finished usage job and {} ms until next job (aggregation duration is {} minutes)", |
| 2183 | + timeSinceLastSuccessJob, timeToJob, _aggregationDuration); |
| 2184 | + if (isParsingJobRunning.get()) { |
| 2185 | + logger.debug("Heartbeat: A parsing job is already running"); |
| 2186 | + } else { |
| 2187 | + logger.debug("Heartbeat: Scheduling an immediate job to catch up"); |
| 2188 | + scheduleParse(); |
| 2189 | + } |
2178 | 2190 | } |
2179 | 2191 | } |
2180 | 2192 |
|
|
0 commit comments