|
31 | 31 | import java.util.concurrent.Future; |
32 | 32 | import java.util.concurrent.ScheduledExecutorService; |
33 | 33 | import java.util.concurrent.TimeUnit; |
| 34 | +import java.util.concurrent.atomic.AtomicBoolean; |
34 | 35 |
|
35 | 36 | import com.cloud.network.Network; |
36 | 37 | import com.cloud.usage.dao.UsageNetworksDao; |
@@ -179,6 +180,7 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna |
179 | 180 | private final List<UsageVmDiskVO> usageVmDisks = new ArrayList<UsageVmDiskVO>(); |
180 | 181 |
|
181 | 182 | private final ScheduledExecutorService _executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Job")); |
| 183 | + private final AtomicBoolean isParsingJobRunning = new AtomicBoolean(false); |
182 | 184 | private final ScheduledExecutorService _heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-HB")); |
183 | 185 | private final ScheduledExecutorService _sanityExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Sanity")); |
184 | 186 | private Future _scheduledFuture = null; |
@@ -354,7 +356,12 @@ public void run() { |
354 | 356 | (new ManagedContextRunnable() { |
355 | 357 | @Override |
356 | 358 | protected void runInContext() { |
357 | | - runInContextInternal(); |
| 359 | + isParsingJobRunning.set(true); |
| 360 | + try { |
| 361 | + runInContextInternal(); |
| 362 | + } finally { |
| 363 | + isParsingJobRunning.set(false); |
| 364 | + } |
358 | 365 | } |
359 | 366 | }).run(); |
360 | 367 | } |
@@ -2177,9 +2184,14 @@ protected void runInContext() { |
2177 | 2184 |
|
2178 | 2185 | if ((timeSinceLastSuccessJob > 0) && (timeSinceLastSuccessJob > (aggregationDurationMillis - 100))) { |
2179 | 2186 | if (timeToJob > (aggregationDurationMillis / 2)) { |
2180 | | - logger.debug("it's been {} ms since last usage job and {} ms until next job, scheduling an immediate job to catch up (aggregation duration is {} minutes)" |
2181 | | - , timeSinceLastSuccessJob, timeToJob, _aggregationDuration); |
2182 | | - scheduleParse(); |
| 2187 | + logger.debug("Heartbeat: it's been {} ms since last finished usage job and {} ms until next job (aggregation duration is {} minutes)", |
| 2188 | + timeSinceLastSuccessJob, timeToJob, _aggregationDuration); |
| 2189 | + if (isParsingJobRunning.get()) { |
| 2190 | + logger.debug("Heartbeat: A parsing job is already running"); |
| 2191 | + } else { |
| 2192 | + logger.debug("Heartbeat: Scheduling an immediate job to catch up"); |
| 2193 | + scheduleParse(); |
| 2194 | + } |
2183 | 2195 | } |
2184 | 2196 | } |
2185 | 2197 |
|
|
0 commit comments