Skip to content

Commit 5310e66

Browse files
committed
Do not use row lock in sync-queue scheduling to work around mysql locking issues.
1 parent 7fa4715 commit 5310e66

3 files changed

Lines changed: 29 additions & 11 deletions

File tree

api/src/com/cloud/vm/VirtualMachine.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ public static StateMachine2<State, VirtualMachine.Event, VirtualMachine> getStat
119119
s_fsm.addTransition(State.Error, VirtualMachine.Event.DestroyRequested, State.Expunging);
120120
s_fsm.addTransition(State.Error, VirtualMachine.Event.ExpungeOperation, State.Expunging);
121121

122+
s_fsm.addTransition(State.Starting, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
122123
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
123124
s_fsm.addTransition(State.Stopped, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
124125
s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
125126
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
127+
126128
s_fsm.addTransition(State.Starting, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
127129
s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
128130
s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);

framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,8 +659,24 @@ private void checkQueue(long queueId) {
659659

660660
private Runnable getHeartbeatTask() {
661661
return new ManagedContextRunnable() {
662+
662663
@Override
663664
protected void runInContext() {
665+
GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerHeartbeat");
666+
try {
667+
if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
668+
try {
669+
reallyRun();
670+
} finally {
671+
scanLock.unlock();
672+
}
673+
}
674+
} finally {
675+
scanLock.releaseRef();
676+
}
677+
}
678+
679+
protected void reallyRun() {
664680
try {
665681
List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
666682
if (l != null && l.size() > 0) {

framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public SyncQueueItemVO dequeueFromOne(final long queueId, final Long msid) {
8383
return Transaction.execute(new TransactionCallback<SyncQueueItemVO>() {
8484
@Override
8585
public SyncQueueItemVO doInTransaction(TransactionStatus status) {
86-
SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true);
87-
if (queueVO == null) {
86+
SyncQueueVO queueVO = _syncQueueDao.findById(queueId);
87+
if(queueVO == null) {
8888
s_logger.error("Sync queue(id: " + queueId + ") does not exist");
8989
return null;
9090
}
@@ -139,11 +139,11 @@ public List<SyncQueueItemVO> dequeueFromAny(final Long msid, final int maxItems)
139139
@Override
140140
public void doInTransactionWithoutResult(TransactionStatus status) {
141141
List<SyncQueueItemVO> l = _syncQueueItemDao.getNextQueueItems(maxItems);
142-
if (l != null && l.size() > 0) {
143-
for (SyncQueueItemVO item : l) {
144-
SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true);
145-
SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true);
146-
if (queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
142+
if(l != null && l.size() > 0) {
143+
for(SyncQueueItemVO item : l) {
144+
SyncQueueVO queueVO = _syncQueueDao.findById(item.getQueueId());
145+
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(item.getId());
146+
if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) {
147147
Long processNumber = queueVO.getLastProcessNumber();
148148
if (processNumber == null)
149149
processNumber = new Long(1);
@@ -184,8 +184,8 @@ public void purgeItem(final long queueItemId) {
184184
@Override
185185
public void doInTransactionWithoutResult(TransactionStatus status) {
186186
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
187-
if (itemVO != null) {
188-
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
187+
if(itemVO != null) {
188+
SyncQueueVO queueVO = _syncQueueDao.findById(itemVO.getQueueId());
189189

190190
_syncQueueItemDao.expunge(itemVO.getId());
191191

@@ -213,8 +213,8 @@ public void returnItem(final long queueItemId) {
213213
@Override
214214
public void doInTransactionWithoutResult(TransactionStatus status) {
215215
SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId);
216-
if (itemVO != null) {
217-
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
216+
if(itemVO != null) {
217+
SyncQueueVO queueVO = _syncQueueDao.findById(itemVO.getQueueId());
218218

219219
itemVO.setLastProcessMsid(null);
220220
itemVO.setLastProcessNumber(null);

0 commit comments

Comments
 (0)