Skip to content

Commit ba56210

Browse files
committed
Deal with concurrent state update for VM and Host objects.
1 parent 14a4dd1 commit ba56210

5 files changed

Lines changed: 80 additions & 95 deletions

File tree

api/src/com/cloud/vm/VirtualMachine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public static StateMachine2<State, VirtualMachine.Event, VirtualMachine> getStat
102102
s_fsm.addTransition(State.Running, VirtualMachine.Event.StopRequested, State.Stopping);
103103
s_fsm.addTransition(State.Running, VirtualMachine.Event.AgentReportShutdowned, State.Stopped);
104104
s_fsm.addTransition(State.Running, VirtualMachine.Event.AgentReportMigrated, State.Running);
105+
s_fsm.addTransition(State.Running, VirtualMachine.Event.OperationSucceeded, State.Running);
105106
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.MigrationRequested, State.Migrating);
106107
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.OperationSucceeded, State.Running);
107108
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.OperationFailed, State.Running);

engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,10 @@
3636
import java.util.concurrent.ScheduledExecutorService;
3737
import java.util.concurrent.TimeUnit;
3838

39-
import javax.ejb.Local;
4039
import javax.inject.Inject;
4140
import javax.naming.ConfigurationException;
4241

4342
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
44-
import org.apache.cloudstack.context.CallContext;
4543
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
4644
import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
4745
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
@@ -51,19 +49,9 @@
5149
import org.apache.cloudstack.framework.config.ConfigKey;
5250
import org.apache.cloudstack.framework.config.Configurable;
5351
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
54-
import org.apache.cloudstack.framework.jobs.AsyncJob;
55-
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
56-
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
57-
import org.apache.cloudstack.framework.jobs.Outcome;
58-
import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
59-
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
60-
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
61-
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
6252
import org.apache.cloudstack.framework.messagebus.MessageBus;
6353
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
6454
import org.apache.cloudstack.framework.messagebus.MessageHandler;
65-
import org.apache.cloudstack.jobs.JobInfo;
66-
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
6755
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
6856
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
6957
import org.apache.cloudstack.storage.to.VolumeObjectTO;
@@ -291,7 +279,7 @@ public List<HostAllocator> getHostAllocators() {
291279
}
292280

293281
public void setHostAllocators(List<HostAllocator> hostAllocators) {
294-
this.hostAllocators = hostAllocators;
282+
hostAllocators = hostAllocators;
295283
}
296284

297285
protected List<StoragePoolAllocator> _storagePoolAllocators;
@@ -3243,9 +3231,9 @@ protected class AgentVmInfo {
32433231

32443232
@SuppressWarnings("unchecked")
32453233
public AgentVmInfo(String name, VMInstanceVO vm, State state, String host) {
3246-
this.name = name;
3247-
this.state = state;
3248-
this.vm = vm;
3234+
name = name;
3235+
state = state;
3236+
vm = vm;
32493237
hostUuid = host;
32503238
}
32513239

@@ -4100,7 +4088,7 @@ private void HandlePowerStateReport(String subject, String senderAddress, Object
41004088

41014089
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
41024090
VirtualMachine.Type.Instance, vmId);
4103-
if (pendingWorkJobs.size() == 0 || !_haMgr.hasPendingHaWork(vmId)) {
4091+
if (pendingWorkJobs.size() == 0 && !_haMgr.hasPendingHaWork(vmId)) {
41044092
// there is no pending operation job
41054093
VMInstanceVO vm = _vmDao.findById(vmId);
41064094
if (vm != null) {
@@ -4407,7 +4395,8 @@ public VmStateSyncOutcome(final AsyncJob job, final PowerState desiredPowerState
44074395
@Override
44084396
public boolean checkCondition() {
44094397
VMInstanceVO instance = _vmDao.findById(vmId);
4410-
if (instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && srcHostIdForMigration.equals(instance.getPowerHostId())))
4398+
if ((instance.getPowerState() == desiredPowerState && srcHostIdForMigration == null) ||
4399+
(instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && instance.getPowerHostId() != srcHostIdForMigration)))
44114400
return true;
44124401
return false;
44134402
}

engine/schema/src/com/cloud/host/dao/HostDaoImpl.java

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -953,46 +953,49 @@ public boolean updateState(Status oldStatus, Event event, Status newStatus, Host
953953
int result = update(ub, sc, null);
954954
assert result <= 1 : "How can this update " + result + " rows? ";
955955

956-
if (status_logger.isDebugEnabled() && result == 0) {
956+
if (result == 0) {
957957
HostVO ho = findById(host.getId());
958958
assert ho != null : "How how how? : " + host.getId();
959959

960-
StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString());
961-
str.append(". Name=").append(host.getName());
962-
str.append("; New=[status=")
963-
.append(newStatus.toString())
964-
.append(":msid=")
965-
.append(newStatus.lostConnection() ? "null" : host.getManagementServerId())
966-
.append(":lastpinged=")
967-
.append(host.getLastPinged())
968-
.append("]");
969-
str.append("; Old=[status=")
970-
.append(oldStatus.toString())
971-
.append(":msid=")
972-
.append(host.getManagementServerId())
973-
.append(":lastpinged=")
974-
.append(oldPingTime)
975-
.append("]");
976-
str.append("; DB=[status=")
977-
.append(vo.getStatus().toString())
978-
.append(":msid=")
979-
.append(vo.getManagementServerId())
980-
.append(":lastpinged=")
981-
.append(vo.getLastPinged())
982-
.append(":old update count=")
983-
.append(oldUpdateCount)
984-
.append("]");
985-
status_logger.debug(str.toString());
986-
} else {
987-
StringBuilder msg = new StringBuilder("Agent status update: [");
988-
msg.append("id = " + host.getId());
989-
msg.append("; name = " + host.getName());
990-
msg.append("; old status = " + oldStatus);
991-
msg.append("; event = " + event);
992-
msg.append("; new status = " + newStatus);
993-
msg.append("; old update count = " + oldUpdateCount);
994-
msg.append("; new update count = " + newUpdateCount + "]");
995-
status_logger.debug(msg.toString());
960+
if (status_logger.isDebugEnabled()) {
961+
962+
StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString());
963+
str.append(". Name=").append(host.getName());
964+
str.append("; New=[status=")
965+
.append(newStatus.toString())
966+
.append(":msid=")
967+
.append(newStatus.lostConnection() ? "null" : host.getManagementServerId())
968+
.append(":lastpinged=")
969+
.append(host.getLastPinged())
970+
.append("]");
971+
str.append("; Old=[status=").append(oldStatus.toString()).append(":msid=").append(host.getManagementServerId()).append(":lastpinged=").append(oldPingTime)
972+
.append("]");
973+
str.append("; DB=[status=")
974+
.append(vo.getStatus().toString())
975+
.append(":msid=")
976+
.append(vo.getManagementServerId())
977+
.append(":lastpinged=")
978+
.append(vo.getLastPinged())
979+
.append(":old update count=")
980+
.append(oldUpdateCount)
981+
.append("]");
982+
status_logger.debug(str.toString());
983+
} else {
984+
StringBuilder msg = new StringBuilder("Agent status update: [");
985+
msg.append("id = " + host.getId());
986+
msg.append("; name = " + host.getName());
987+
msg.append("; old status = " + oldStatus);
988+
msg.append("; event = " + event);
989+
msg.append("; new status = " + newStatus);
990+
msg.append("; old update count = " + oldUpdateCount);
991+
msg.append("; new update count = " + newUpdateCount + "]");
992+
status_logger.debug(msg.toString());
993+
}
994+
995+
if (ho.getState() == newStatus) {
996+
status_logger.debug("Host " + ho.getName() + " state has already been updated to " + newStatus);
997+
return true;
998+
}
996999
}
9971000

9981001
return result > 0;

engine/schema/src/com/cloud/vm/dao/VMInstanceDaoImpl.java

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -452,41 +452,29 @@ public boolean updateState(State oldState, Event event, State newState, VirtualM
452452
ub.set(vmi, _updateTimeAttr, new Date());
453453

454454
int result = update(vmi, sc);
455-
if (result == 0 && s_logger.isDebugEnabled()) {
456-
455+
if (result == 0) {
457456
VMInstanceVO vo = findByIdIncludingRemoved(vm.getId());
458457

459-
if (vo != null) {
460-
StringBuilder str = new StringBuilder("Unable to update ").append(vo.toString());
461-
str.append(": DB Data={Host=")
462-
.append(vo.getHostId())
463-
.append("; State=")
464-
.append(vo.getState().toString())
465-
.append("; updated=")
466-
.append(vo.getUpdated())
467-
.append("; time=")
468-
.append(vo.getUpdateTime());
469-
str.append("} New Data: {Host=")
470-
.append(vm.getHostId())
471-
.append("; State=")
472-
.append(vm.getState().toString())
473-
.append("; updated=")
474-
.append(vmi.getUpdated())
475-
.append("; time=")
476-
.append(vo.getUpdateTime());
477-
str.append("} Stale Data: {Host=")
478-
.append(oldHostId)
479-
.append("; State=")
480-
.append(oldState)
481-
.append("; updated=")
482-
.append(oldUpdated)
483-
.append("; time=")
484-
.append(oldUpdateDate)
485-
.append("}");
486-
s_logger.debug(str.toString());
487-
488-
} else {
489-
s_logger.debug("Unable to update the vm id=" + vm.getId() + "; the vm either doesn't exist or already removed");
458+
if (s_logger.isDebugEnabled()) {
459+
if (vo != null) {
460+
StringBuilder str = new StringBuilder("Unable to update ").append(vo.toString());
461+
str.append(": DB Data={Host=").append(vo.getHostId()).append("; State=").append(vo.getState().toString()).append("; updated=").append(vo.getUpdated())
462+
.append("; time=").append(vo.getUpdateTime());
463+
str.append("} New Data: {Host=").append(vm.getHostId()).append("; State=").append(vm.getState().toString()).append("; updated=").append(vmi.getUpdated())
464+
.append("; time=").append(vo.getUpdateTime());
465+
str.append("} Stale Data: {Host=").append(oldHostId).append("; State=").append(oldState).append("; updated=").append(oldUpdated).append("; time=")
466+
.append(oldUpdateDate).append("}");
467+
s_logger.debug(str.toString());
468+
469+
} else {
470+
s_logger.debug("Unable to update the vm id=" + vm.getId() + "; the vm either doesn't exist or already removed");
471+
}
472+
}
473+
474+
if (vo != null && vo.getState() == newState) {
475+
// allow for concurrent update if target state has already been matched
476+
s_logger.debug("VM " + vo.getInstanceName() + " state has been already been updated to " + newState);
477+
return true;
490478
}
491479
}
492480
return result > 0;

framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,17 @@ public List<Long> doInTransaction(TransactionStatus status) {
256256
}
257257
});
258258

259-
for (Long id : wakeupList) {
260-
// TODO, we assume that all jobs in this category is API job only
261-
AsyncJobVO jobToWakeup = _jobDao.findById(id);
262-
if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
263-
scheduleExecution(jobToWakeup, false);
264-
}
265-
259+
//
260+
// disable wakeup scheduling now, since all API jobs are currently using block-waiting for sub-jobs
261+
//
262+
/*
263+
for (Long id : wakeupList) {
264+
// TODO, we assume that all jobs in this category is API job only
265+
AsyncJobVO jobToWakeup = _jobDao.findById(id);
266+
if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
267+
scheduleExecution(jobToWakeup, false);
268+
}
269+
*/
266270
_messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
267271
}
268272

0 commit comments

Comments
 (0)