Skip to content

Commit 441be43

Browse files
committed
CLOUDSTACK-5358: Bring back concurrency control in sync-queue management
1 parent d5dc6aa commit 441be43

6 files changed

Lines changed: 93 additions & 76 deletions

File tree

engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
4848
import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
4949
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
50+
import org.apache.cloudstack.engine.subsystem.api.storage.PrimaryDataStoreInfo;
5051
import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
5152
import org.apache.cloudstack.framework.config.ConfigDepot;
5253
import org.apache.cloudstack.framework.config.ConfigKey;
@@ -769,7 +770,7 @@ public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object>
769770
Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
770771

771772
try {
772-
outcome.get();
773+
VirtualMachine vm = outcome.get();
773774
} catch (InterruptedException e) {
774775
throw new RuntimeException("Operation is interrupted", e);
775776
} catch (java.util.concurrent.ExecutionException e) {
@@ -1317,7 +1318,7 @@ public void advanceStop(String vmUuid, boolean cleanUpEvenIfUnableToStop)
13171318
Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
13181319

13191320
try {
1320-
outcome.get();
1321+
VirtualMachine vm = outcome.get();
13211322
} catch (InterruptedException e) {
13221323
throw new RuntimeException("Operation is interrupted", e);
13231324
} catch (java.util.concurrent.ExecutionException e) {
@@ -1626,7 +1627,7 @@ public void storageMigration(String vmUuid, StoragePool destPool) {
16261627
Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
16271628

16281629
try {
1629-
outcome.get();
1630+
VirtualMachine vm = outcome.get();
16301631
} catch (InterruptedException e) {
16311632
throw new RuntimeException("Operation is interrupted", e);
16321633
} catch (java.util.concurrent.ExecutionException e) {
@@ -1718,7 +1719,7 @@ public void migrate(String vmUuid, long srcHostId, DeployDestination dest)
17181719
Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
17191720

17201721
try {
1721-
outcome.get();
1722+
VirtualMachine vm = outcome.get();
17221723
} catch (InterruptedException e) {
17231724
throw new RuntimeException("Operation is interrupted", e);
17241725
} catch (java.util.concurrent.ExecutionException e) {
@@ -2001,7 +2002,7 @@ public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, M
20012002
Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
20022003

20032004
try {
2004-
outcome.get();
2005+
VirtualMachine vm = outcome.get();
20052006
} catch (InterruptedException e) {
20062007
throw new RuntimeException("Operation is interrupted", e);
20072008
} catch (java.util.concurrent.ExecutionException e) {
@@ -2014,7 +2015,7 @@ public void migrateWithStorage(String vmUuid, long srcHostId, long destHostId, M
20142015
throw (ResourceUnavailableException)jobException;
20152016
else if (jobException instanceof ConcurrentOperationException)
20162017
throw (ConcurrentOperationException)jobException;
2017-
}
2018+
}
20182019
}
20192020
}
20202021

@@ -2296,7 +2297,7 @@ public void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object
22962297
Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
22972298

22982299
try {
2299-
outcome.get();
2300+
VirtualMachine vm = outcome.get();
23002301
} catch (InterruptedException e) {
23012302
throw new RuntimeException("Operation is interrupted", e);
23022303
} catch (java.util.concurrent.ExecutionException e) {
@@ -2994,10 +2995,10 @@ public void processConnect(Host agent, StartupCommand cmd, boolean forRebalance)
29942995
return;
29952996
}
29962997

2997-
if (s_logger.isDebugEnabled())
2998+
if(s_logger.isDebugEnabled())
29982999
s_logger.debug("Received startup command from hypervisor host. host id: " + agent.getId());
29993000

3000-
if (VmJobEnabled.value()) {
3001+
if(VmJobEnabled.value()) {
30013002
_syncMgr.resetHostSyncState(agent.getId());
30023003
}
30033004

@@ -3589,7 +3590,7 @@ public void migrateForScale(String vmUuid, long srcHostId, DeployDestination des
35893590
Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
35903591

35913592
try {
3592-
outcome.get();
3593+
VirtualMachine vm = outcome.get();
35933594
} catch (InterruptedException e) {
35943595
throw new RuntimeException("Operation is interrupted", e);
35953596
} catch (java.util.concurrent.ExecutionException e) {
@@ -3793,7 +3794,7 @@ public boolean plugNic(Network network, NicTO nic, VirtualMachineTO vm, Reservat
37933794
}
37943795

37953796
public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext context, DeployDestination dest) throws ConcurrentOperationException,
3796-
ResourceUnavailableException {
3797+
ResourceUnavailableException {
37973798

37983799
boolean result = true;
37993800
VMInstanceVO router = _vmDao.findById(vm.getId());
@@ -3828,7 +3829,7 @@ public boolean unplugNic(Network network, NicTO nic, VirtualMachineTO vm, Reserv
38283829
@Override
38293830
public VMInstanceVO reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering,
38303831
boolean reconfiguringOnExistingHost)
3831-
throws ResourceUnavailableException, InsufficientServerCapacityException, ConcurrentOperationException {
3832+
throws ResourceUnavailableException, InsufficientServerCapacityException, ConcurrentOperationException {
38323833

38333834
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
38343835
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
@@ -3974,8 +3975,8 @@ private void HandlePowerStateReport(String subject, String senderAddress, Object
39743975
handlePowerOffReportWithNoPendingJobsOnVM(vm);
39753976
break;
39763977

3977-
// PowerUnknown shouldn't be reported, it is a derived
3978-
// VM power state from host state (host un-reachable)
3978+
// PowerUnknown shouldn't be reported, it is a derived
3979+
// VM power state from host state (host un-reachable)
39793980
case PowerUnknown:
39803981
default:
39813982
assert (false);
@@ -4009,7 +4010,7 @@ private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
40094010
// we need to alert admin or user about this risky state transition
40104011
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
40114012
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName()
4012-
+ ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
4013+
+ ") state is sync-ed (Starting -> Running) from out-of-context transition. VM network environment may need to be reset");
40134014
break;
40144015

40154016
case Running:
@@ -4031,7 +4032,7 @@ private void handlePowerOnReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
40314032
}
40324033
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
40334034
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState()
4034-
+ " -> Running) from out-of-context transition. VM network environment may need to be reset");
4035+
+ " -> Running) from out-of-context transition. VM network environment may need to be reset");
40354036
break;
40364037

40374038
case Destroyed:
@@ -4074,7 +4075,7 @@ private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
40744075
}
40754076
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
40764077
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState()
4077-
+ " -> Stopped) from out-of-context transition.");
4078+
+ " -> Stopped) from out-of-context transition.");
40784079
// TODO: we need to forcely release all resource allocation
40794080
break;
40804081

@@ -4101,7 +4102,7 @@ private void scanStalledVMInTransitionStateOnUpHost(long hostId) {
41014102
// however, if VM is missing from the host report (it may happen in out of band changes
41024103
// or from designed behave of XS/KVM), the VM may not get a chance to run the state-sync logic
41034104
//
4104-
// Therefor, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
4105+
// Therefore, we will scan thoses VMs on UP host based on last update timestamp, if the host is UP
41054106
// and a VM stalls for status update, we will consider them to be powered off
41064107
// (which is relatively safe to do so)
41074108

@@ -4134,7 +4135,7 @@ private void scanStalledVMInTransitionStateOnDisconnectedHosts() {
41344135
// We now only alert administrator about this situation
41354136
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
41364137
VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") is stuck in " + vm.getState()
4137-
+ " state and its host is unreachable for too long");
4138+
+ " state and its host is unreachable for too long");
41384139
}
41394140
}
41404141

@@ -4332,7 +4333,7 @@ public Object[] doInTransaction(TransactionStatus status) {
43324333
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
43334334
}
43344335

4335-
return new Object[] {workJob, workJob.getId()};
4336+
return new Object[] {workJob, new Long(workJob.getId())};
43364337
}
43374338
});
43384339

@@ -4383,7 +4384,7 @@ public Object[] doInTransaction(TransactionStatus status) {
43834384
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
43844385
}
43854386

4386-
return new Object[] {workJob, workJob.getId()};
4387+
return new Object[] {workJob, new Long(workJob.getId())};
43874388
}
43884389
});
43894390

@@ -4436,7 +4437,7 @@ public Object[] doInTransaction(TransactionStatus status) {
44364437
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
44374438
}
44384439

4439-
return new Object[] {workJob, workJob.getId()};
4440+
return new Object[] {workJob, new Long(workJob.getId())};
44404441
}
44414442
});
44424443

@@ -4458,6 +4459,8 @@ public Outcome<VirtualMachine> migrateVmThroughJobQueue(final String vmUuid, fin
44584459
@Override
44594460
public Object[] doInTransaction(TransactionStatus status) {
44604461

4462+
_vmDao.lockRow(vm.getId(), true);
4463+
44614464
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
44624465
VirtualMachine.Type.Instance, vm.getId(),
44634466
VmWorkMigrate.class.getName());
@@ -4485,7 +4488,7 @@ public Object[] doInTransaction(TransactionStatus status) {
44854488

44864489
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
44874490
}
4488-
return new Object[] {workJob, workJob.getId()};
4491+
return new Object[] {workJob, new Long(workJob.getId())};
44894492
}
44904493
});
44914494

@@ -4540,7 +4543,7 @@ public Object[] doInTransaction(TransactionStatus status) {
45404543

45414544
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
45424545
}
4543-
return new Object[] {workJob, workJob.getId()};
4546+
return new Object[] {workJob, new Long(workJob.getId())};
45444547
}
45454548
});
45464549

@@ -4564,6 +4567,8 @@ public Outcome<VirtualMachine> migrateVmForScaleThroughJobQueue(
45644567
@Override
45654568
public Object[] doInTransaction(TransactionStatus status) {
45664569

4570+
_vmDao.lockRow(vm.getId(), true);
4571+
45674572
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
45684573
VirtualMachine.Type.Instance, vm.getId(),
45694574
VmWorkMigrateForScale.class.getName());
@@ -4593,7 +4598,7 @@ public Object[] doInTransaction(TransactionStatus status) {
45934598
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
45944599
}
45954600

4596-
return new Object[] {workJob, workJob.getId()};
4601+
return new Object[] {workJob, new Long(workJob.getId())};
45974602
}
45984603
});
45994604

@@ -4616,6 +4621,8 @@ public Outcome<VirtualMachine> migrateVmStorageThroughJobQueue(
46164621
@Override
46174622
public Object[] doInTransaction(TransactionStatus status) {
46184623

4624+
_vmDao.lockRow(vm.getId(), true);
4625+
46194626
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
46204627
VirtualMachine.Type.Instance, vm.getId(),
46214628
VmWorkStorageMigration.class.getName());
@@ -4639,13 +4646,13 @@ public Object[] doInTransaction(TransactionStatus status) {
46394646

46404647
// save work context info (there are some duplications)
46414648
VmWorkStorageMigration workInfo = new VmWorkStorageMigration(user.getId(), account.getId(), vm.getId(),
4642-
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool);
4649+
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, destPool.getId());
46434650
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
46444651

46454652
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
46464653
}
46474654

4648-
return new Object[] {workJob, workJob.getId()};
4655+
return new Object[] {workJob, new Long(workJob.getId())};
46494656
}
46504657
});
46514658

@@ -4666,6 +4673,8 @@ public Outcome<VirtualMachine> addVmToNetworkThroughJobQueue(
46664673
@Override
46674674
public Object[] doInTransaction(TransactionStatus status) {
46684675

4676+
_vmDao.lockRow(vm.getId(), true);
4677+
46694678
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
46704679
VirtualMachine.Type.Instance, vm.getId(),
46714680
VmWorkAddVmToNetwork.class.getName());
@@ -4694,7 +4703,7 @@ public Object[] doInTransaction(TransactionStatus status) {
46944703

46954704
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
46964705
}
4697-
return new Object[] {workJob, workJob.getId()};
4706+
return new Object[] {workJob, new Long(workJob.getId())};
46984707
}
46994708
});
47004709

@@ -4715,6 +4724,8 @@ public Outcome<VirtualMachine> removeNicFromVmThroughJobQueue(
47154724
@Override
47164725
public Object[] doInTransaction(TransactionStatus status) {
47174726

4727+
_vmDao.lockRow(vm.getId(), true);
4728+
47184729
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
47194730
VirtualMachine.Type.Instance, vm.getId(),
47204731
VmWorkRemoveNicFromVm.class.getName());
@@ -4743,7 +4754,7 @@ public Object[] doInTransaction(TransactionStatus status) {
47434754

47444755
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
47454756
}
4746-
return new Object[] {workJob, workJob.getId()};
4757+
return new Object[] {workJob, new Long(workJob.getId())};
47474758
}
47484759
});
47494760

@@ -4764,6 +4775,8 @@ public Outcome<VirtualMachine> removeVmFromNetworkThroughJobQueue(
47644775
@Override
47654776
public Object[] doInTransaction(TransactionStatus status) {
47664777

4778+
_vmDao.lockRow(vm.getId(), true);
4779+
47674780
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
47684781
VirtualMachine.Type.Instance, vm.getId(),
47694782
VmWorkRemoveVmFromNetwork.class.getName());
@@ -4792,7 +4805,7 @@ public Object[] doInTransaction(TransactionStatus status) {
47924805

47934806
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
47944807
}
4795-
return new Object[] {workJob, workJob.getId()};
4808+
return new Object[] {workJob, new Long(workJob.getId())};
47964809
}
47974810
});
47984811

@@ -4815,6 +4828,8 @@ public Outcome<VirtualMachine> reconfigureVmThroughJobQueue(
48154828
@Override
48164829
public Object[] doInTransaction(TransactionStatus status) {
48174830

4831+
_vmDao.lockRow(vm.getId(), true);
4832+
48184833
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
48194834
VirtualMachine.Type.Instance, vm.getId(),
48204835
VmWorkReconfigure.class.getName());
@@ -4843,7 +4858,7 @@ public Object[] doInTransaction(TransactionStatus status) {
48434858

48444859
_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
48454860
}
4846-
return new Object[] {workJob, workJob.getId()};
4861+
return new Object[] {workJob, new Long(workJob.getId())};
48474862
}
48484863
});
48494864

@@ -4980,7 +4995,9 @@ private Pair<JobInfo.Status, String> orchestrateStorageMigration(VmWorkStorageMi
49804995
s_logger.info("Unable to find vm " + work.getVmId());
49814996
}
49824997
assert (vm != null);
4983-
orchestrateStorageMigration(vm.getUuid(), work.getDestStoragePool());
4998+
StoragePool pool = (PrimaryDataStoreInfo)dataStoreMgr.getPrimaryDataStore(work.getDestStoragePoolId());
4999+
orchestrateStorageMigration(vm.getUuid(), pool);
5000+
49845001
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
49855002
}
49865003

engine/orchestration/src/com/cloud/vm/VmWorkStorageMigration.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,18 @@
1616
// under the License.
1717
package com.cloud.vm;
1818

19-
import com.cloud.storage.StoragePool;
20-
2119
public class VmWorkStorageMigration extends VmWork {
2220
private static final long serialVersionUID = -8677979691741157474L;
2321

24-
StoragePool destPool;
22+
Long destPoolId;
2523

26-
public VmWorkStorageMigration(long userId, long accountId, long vmId, String handlerName, StoragePool destPool) {
24+
public VmWorkStorageMigration(long userId, long accountId, long vmId, String handlerName, Long destPoolId) {
2725
super(userId, accountId, vmId, handlerName);
2826

29-
this.destPool = destPool;
27+
this.destPoolId = destPoolId;
3028
}
3129

32-
public StoragePool getDestStoragePool() {
33-
return destPool;
30+
public Long getDestStoragePoolId() {
31+
return destPoolId;
3432
}
3533
}

framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDao.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
2626
public SyncQueueItemVO getNextQueueItem(long queueId);
27+
public int getActiveQueueItemCount(long queueId);
2728

2829
public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
2930

0 commit comments

Comments
 (0)