Skip to content

Commit 339aa41

Browse files
author
Alena Prokharchyk
committed
CLOUDSTACK-304: Synchronization support for createSnapshot command - don't sent more than "concurrent.snapshots.threshold.perhost" createSnapshots commands to the backend host.
Conflicts: core/src/com/cloud/async/AsyncJobVO.java core/src/com/cloud/async/SyncQueueVO.java server/src/com/cloud/api/ApiDispatcher.java server/src/com/cloud/api/ApiServer.java server/src/com/cloud/async/AsyncJobManagerImpl.java server/src/com/cloud/async/SyncQueueManager.java server/src/com/cloud/async/SyncQueueManagerImpl.java server/src/com/cloud/async/dao/SyncQueueDao.java server/src/com/cloud/storage/snapshot/SnapshotSchedulerImpl.java server/test/com/cloud/async/TestSyncQueueManager.java setup/db/create-schema.sql
1 parent c36744a commit 339aa41

20 files changed

Lines changed: 671 additions & 626 deletions

api/src/com/cloud/api/BaseAsyncCmd.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@
2525
* queryAsyncJobResult API command.
2626
*/
2727
public abstract class BaseAsyncCmd extends BaseCmd {
28+
2829
public static final String ipAddressSyncObject = "ipaddress";
2930
public static final String networkSyncObject = "network";
3031
public static final String vpcSyncObject = "vpc";
31-
32+
public static final String snapshotHostSyncObject = "snapshothost";
3233

3334
private AsyncJob job;
3435

api/src/com/cloud/api/commands/CreateSnapshotCmd.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.log4j.Logger;
2020

2121
import com.cloud.api.ApiConstants;
22+
import com.cloud.api.BaseAsyncCmd;
2223
import com.cloud.api.BaseAsyncCreateCmd;
2324
import com.cloud.api.BaseCmd;
2425
import com.cloud.api.IdentityMapper;
@@ -60,6 +61,8 @@ public class CreateSnapshotCmd extends BaseAsyncCreateCmd {
6061
@IdentityMapper(entityTableName="snapshot_policy")
6162
@Parameter(name = ApiConstants.POLICY_ID, type = CommandType.LONG, description = "policy id of the snapshot, if this is null, then use MANUAL_POLICY.")
6263
private Long policyId;
64+
65+
private String syncObjectType = BaseAsyncCmd.snapshotHostSyncObject;
6366

6467
// ///////////////////////////////////////////////////
6568
// ///////////////// Accessors ///////////////////////
@@ -88,7 +91,16 @@ public Long getPolicyId() {
8891
return Snapshot.MANUAL_POLICY_ID;
8992
}
9093
}
94+
95+
private Long getHostId() {
96+
Volume volume = _entityMgr.findById(Volume.class, getVolumeId());
97+
if (volume == null) {
98+
throw new InvalidParameterValueException("Unable to find volume by id");
99+
}
100+
return _snapshotService.getHostIdForSnapshotOperation(volume);
101+
}
91102

103+
92104
// ///////////////////////////////////////////////////
93105
// ///////////// API Implementation///////////////////
94106
// ///////////////////////////////////////////////////
@@ -161,4 +173,21 @@ public void execute() {
161173
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Failed to create snapshot due to an internal error creating snapshot for volume " + volumeId);
162174
}
163175
}
176+
177+
178+
@Override
179+
public String getSyncObjType() {
180+
if (getSyncObjId() != null) {
181+
return syncObjectType;
182+
}
183+
return null;
184+
}
185+
186+
@Override
187+
public Long getSyncObjId() {
188+
if (getHostId() != null) {
189+
return getHostId();
190+
}
191+
return null;
192+
}
164193
}

api/src/com/cloud/storage/snapshot/SnapshotService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.cloud.exception.PermissionDeniedException;
2727
import com.cloud.exception.ResourceAllocationException;
2828
import com.cloud.storage.Snapshot;
29+
import com.cloud.storage.Volume;
2930
import com.cloud.user.Account;
3031

3132
public interface SnapshotService {
@@ -97,4 +98,10 @@ public interface SnapshotService {
9798
* @return the Snapshot that was created
9899
*/
99100
Snapshot createSnapshot(Long volumeId, Long policyId, Long snapshotId, Account snapshotOwner);
101+
102+
/**
103+
* @param vol
104+
* @return
105+
*/
106+
Long getHostIdForSnapshotOperation(Volume vol);
100107
}

core/src/com/cloud/async/AsyncJobVO.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,28 +119,31 @@ public class AsyncJobVO implements AsyncJob {
119119
@Transient
120120
private boolean fromPreviousSession = false;
121121

122-
public AsyncJobVO() {
123-
this.uuid = UUID.randomUUID().toString();
124-
}
125122

126-
public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo) {
127-
this.userId = userId;
128-
this.accountId = accountId;
129-
this.cmd = cmd;
130-
this.cmdInfo = cmdInfo;
123+
public AsyncJobVO() {
124+
this.uuid = UUID.randomUUID().toString();
125+
}
126+
127+
public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo, Long instanceId, Type instanceType) {
128+
this.userId = userId;
129+
this.accountId = accountId;
130+
this.cmd = cmd;
131+
this.cmdInfo = cmdInfo;
131132
this.callbackType = CALLBACK_POLLING;
132133
this.uuid = UUID.randomUUID().toString();
133-
}
134-
135-
public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo,
136-
int callbackType, String callbackAddress) {
137-
138-
this(userId, accountId, cmd, cmdInfo);
139-
this.callbackType = callbackType;
140-
this.callbackAddress = callbackAddress;
134+
this.instanceId = instanceId;
135+
}
136+
137+
public AsyncJobVO(long userId, long accountId, String cmd, String cmdInfo,
138+
int callbackType, String callbackAddress, Long instanceId, Type instanceType) {
139+
140+
this(userId, accountId, cmd, cmdInfo, instanceId, instanceType);
141+
this.callbackType = callbackType;
142+
this.callbackAddress = callbackAddress;
141143
this.uuid = UUID.randomUUID().toString();
142144
}
143-
145+
146+
144147
@Override
145148
public Long getId() {
146149
return id;

core/src/com/cloud/async/SyncQueueItemVO.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import javax.persistence.GenerationType;
2525
import javax.persistence.Id;
2626
import javax.persistence.Table;
27+
import javax.persistence.Temporal;
28+
import javax.persistence.TemporalType;
2729

2830
@Entity
2931
@Table(name="sync_queue_item")
@@ -49,6 +51,10 @@ public class SyncQueueItemVO implements SyncQueueItem{
4951
@Column(name="queue_proc_number")
5052
private Long lastProcessNumber;
5153

54+
@Column(name="queue_proc_time")
55+
@Temporal(TemporalType.TIMESTAMP)
56+
private Date lastProcessTime;
57+
5258
@Column(name="created")
5359
private Date created;
5460

@@ -93,7 +99,7 @@ public Long getLastProcessMsid() {
9399
public void setLastProcessMsid(Long lastProcessMsid) {
94100
this.lastProcessMsid = lastProcessMsid;
95101
}
96-
102+
97103
public Long getLastProcessNumber() {
98104
return lastProcessNumber;
99105
}
@@ -109,16 +115,25 @@ public Date getCreated() {
109115
public void setCreated(Date created) {
110116
this.created = created;
111117
}
112-
118+
113119
public String toString() {
114120
StringBuffer sb = new StringBuffer();
115121
sb.append("SyncQueueItemVO {id:").append(getId()).append(", queueId: ").append(getQueueId());
116122
sb.append(", contentType: ").append(getContentType());
117123
sb.append(", contentId: ").append(getContentId());
118124
sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
119125
sb.append(", lastprocessNumber: ").append(getLastProcessNumber());
126+
sb.append(", lastProcessTime: ").append(getLastProcessTime());
120127
sb.append(", created: ").append(getCreated());
121128
sb.append("}");
122129
return sb.toString();
123130
}
131+
132+
public Date getLastProcessTime() {
133+
return lastProcessTime;
134+
}
135+
136+
public void setLastProcessTime(Date lastProcessTime) {
137+
this.lastProcessTime = lastProcessTime;
138+
}
124139
}

core/src/com/cloud/async/SyncQueueVO.java

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.cloud.async;
1919

2020
import java.util.Date;
21-
2221
import javax.persistence.Column;
2322
import javax.persistence.Entity;
2423
import javax.persistence.GeneratedValue;
@@ -40,36 +39,31 @@ public class SyncQueueVO {
4039
@Column(name="sync_objtype")
4140

4241
private String syncObjType;
43-
42+
4443
@Column(name="sync_objid")
4544
private Long syncObjId;
46-
45+
4746
@Column(name="queue_proc_number")
4847
private Long lastProcessNumber;
49-
50-
@Column(name="queue_proc_time")
51-
@Temporal(TemporalType.TIMESTAMP)
52-
private Date lastProcessTime;
53-
54-
@Column(name="queue_proc_msid")
55-
private Long lastProcessMsid;
56-
48+
5749
@Column(name="created")
5850
@Temporal(TemporalType.TIMESTAMP)
5951
private Date created;
60-
52+
6153
@Column(name="last_updated")
6254
@Temporal(TemporalType.TIMESTAMP)
6355
private Date lastUpdated;
64-
56+
57+
@Column(name="queue_size")
58+
private long queueSize = 0;
59+
60+
@Column(name="queue_size_limit")
61+
private long queueSizeLimit = 0;
62+
6563
public Long getId() {
6664
return id;
6765
}
6866

69-
public void setId(Long id) {
70-
this.id = id;
71-
}
72-
7367
public String getSyncObjType() {
7468
return syncObjType;
7569
}
@@ -94,22 +88,6 @@ public void setLastProcessNumber(Long number) {
9488
lastProcessNumber = number;
9589
}
9690

97-
public Date getLastProcessTime() {
98-
return lastProcessTime;
99-
}
100-
101-
public void setLastProcessTime(Date lastProcessTime) {
102-
this.lastProcessTime = lastProcessTime;
103-
}
104-
105-
public Long getLastProcessMsid() {
106-
return lastProcessMsid;
107-
}
108-
109-
public void setLastProcessMsid(Long lastProcessMsid) {
110-
this.lastProcessMsid = lastProcessMsid;
111-
}
112-
11391
public Date getCreated() {
11492
return created;
11593
}
@@ -125,18 +103,33 @@ public Date getLastUpdated() {
125103
public void setLastUpdated(Date lastUpdated) {
126104
this.lastUpdated = lastUpdated;
127105
}
128-
106+
129107
public String toString() {
130108
StringBuffer sb = new StringBuffer();
131109
sb.append("SyncQueueVO {id:").append(getId());
132110
sb.append(", syncObjType: ").append(getSyncObjType());
133111
sb.append(", syncObjId: ").append(getSyncObjId());
134-
sb.append(", lastProcessMsid: ").append(getLastProcessMsid());
135112
sb.append(", lastProcessNumber: ").append(getLastProcessNumber());
136-
sb.append(", lastProcessTime: ").append(getLastProcessTime());
137113
sb.append(", lastUpdated: ").append(getLastUpdated());
138114
sb.append(", created: ").append(getCreated());
115+
sb.append(", count: ").append(getQueueSize());
139116
sb.append("}");
140117
return sb.toString();
141118
}
119+
120+
public long getQueueSize() {
121+
return queueSize;
122+
}
123+
124+
public void setQueueSize(long queueSize) {
125+
this.queueSize = queueSize;
126+
}
127+
128+
public long getQueueSizeLimit() {
129+
return queueSizeLimit;
130+
}
131+
132+
public void setQueueSizeLimit(long queueSizeLimit) {
133+
this.queueSizeLimit = queueSizeLimit;
134+
}
142135
}

server/src/com/cloud/api/ApiDispatcher.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.cloud.api.commands.ListEventsCmd;
3434
import com.cloud.async.AsyncCommandQueued;
3535
import com.cloud.async.AsyncJobManager;
36+
import com.cloud.configuration.Config;
37+
import com.cloud.configuration.dao.ConfigurationDao;
3638
import com.cloud.exception.AccountLimitException;
3739
import com.cloud.exception.InsufficientCapacityException;
3840
import com.cloud.exception.InvalidParameterValueException;
@@ -44,6 +46,8 @@
4446
import com.cloud.user.Account;
4547
import com.cloud.user.UserContext;
4648
import com.cloud.utils.DateUtil;
49+
import com.cloud.utils.IdentityProxy;
50+
import com.cloud.utils.NumbersUtil;
4751
import com.cloud.utils.component.ComponentLocator;
4852
import com.cloud.utils.component.PluggableService;
4953
import com.cloud.utils.exception.CSExceptionErrorCode;
@@ -59,6 +63,7 @@ public class ApiDispatcher {
5963
ComponentLocator _locator;
6064
AsyncJobManager _asyncMgr;
6165
IdentityDao _identityDao;
66+
long _createSnapshotQueueSizeLimit;
6267

6368
// singleton class
6469
private static ApiDispatcher s_instance = new ApiDispatcher();
@@ -71,6 +76,9 @@ private ApiDispatcher() {
7176
_locator = ComponentLocator.getLocator(ManagementServer.Name);
7277
_asyncMgr = _locator.getManager(AsyncJobManager.class);
7378
_identityDao = _locator.getDao(IdentityDao.class);
79+
ConfigurationDao configDao = _locator.getDao(ConfigurationDao.class);
80+
Map<String, String> configs = configDao.getConfiguration();
81+
_createSnapshotQueueSizeLimit = NumbersUtil.parseInt(configs.get(Config.ConcurrentSnapshotsThresholdPerHost.key()), 10);
7482
}
7583

7684
public void dispatchCreateCmd(BaseAsyncCreateCmd cmd, Map<String, String> params) {
@@ -130,8 +138,14 @@ public void dispatch(BaseCmd cmd, Map<String, String> params) {
130138
ctx.setStartEventId(Long.valueOf(startEventId));
131139

132140
// Synchronise job on the object if needed
141+
133142
if (asyncCmd.getJob() != null && asyncCmd.getSyncObjId() != null && asyncCmd.getSyncObjType() != null) {
134-
_asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(), asyncCmd.getSyncObjId().longValue());
143+
long queueSizeLimit = 1;
144+
if (asyncCmd.getSyncObjType() != null && asyncCmd.getSyncObjType().equalsIgnoreCase(BaseAsyncCmd.snapshotHostSyncObject)) {
145+
queueSizeLimit = _createSnapshotQueueSizeLimit;
146+
}
147+
_asyncMgr.syncAsyncJobExecution(asyncCmd.getJob(), asyncCmd.getSyncObjType(),
148+
asyncCmd.getSyncObjId().longValue(), queueSizeLimit);
135149
}
136150
}
137151

server/src/com/cloud/api/ApiServer.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import com.cloud.utils.exception.CSExceptionErrorCode;
117117
import com.cloud.uuididentity.dao.IdentityDao;
118118

119+
119120
public class ApiServer implements HttpRequestHandler {
120121
private static final Logger s_logger = Logger.getLogger(ApiServer.class.getName());
121122
private static final Logger s_accessLogger = Logger.getLogger("apiserver." + ApiServer.class.getName());
@@ -299,7 +300,7 @@ public void init(String[] apiConfig) {
299300
if (apiPort != null) {
300301
ListenerThread listenerThread = new ListenerThread(this, apiPort);
301302
listenerThread.start();
302-
}
303+
}
303304
}
304305

305306
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -515,14 +516,9 @@ private String queueCommand(BaseCmd cmdObj, Map<String, String> params) {
515516

516517
ctx.setAccountId(asyncCmd.getEntityOwnerId());
517518

518-
AsyncJobVO job = new AsyncJobVO();
519-
job.setInstanceId((objectId == null) ? asyncCmd.getInstanceId() : objectId);
520-
job.setInstanceType(asyncCmd.getInstanceType());
521-
job.setUserId(callerUserId);
522-
job.setAccountId(caller.getId());
523-
524-
job.setCmd(cmdObj.getClass().getName());
525-
job.setCmdInfo(ApiGsonHelper.getBuilder().create().toJson(params));
519+
Long instanceId = (objectId == null) ? asyncCmd.getInstanceId() : objectId;
520+
AsyncJobVO job = new AsyncJobVO(callerUserId, caller.getId(), cmdObj.getClass().getName(),
521+
ApiGsonHelper.getBuilder().create().toJson(params), instanceId, asyncCmd.getInstanceType());
526522

527523
long jobId = _asyncMgr.submitAsyncJob(job);
528524

0 commit comments

Comments
 (0)