Skip to content

Commit 521ac79

Browse files
committed
Move EventBus hookup on job framework to ApiServer to decouple job framework away from business logic related hookups. The decoupling is done through internal messaging facility provided inside management server.
1 parent fc04e4b commit 521ac79

3 files changed

Lines changed: 94 additions & 0 deletions

File tree

framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public enum JournalType {
3030
public static interface Topics {
3131
public static final String JOB_HEARTBEAT = "job.heartbeat";
3232
public static final String JOB_STATE = "job.state";
33+
public static final String JOB_EVENT_PUBLISH = "job.eventpublish";
3334
}
3435

3536
public static interface Constants {

framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import com.cloud.cluster.ClusterManagerListener;
6161
import com.cloud.cluster.ManagementServerHost;
6262
import com.cloud.utils.DateUtil;
63+
import com.cloud.utils.Pair;
6364
import com.cloud.utils.Predicate;
6465
import com.cloud.utils.component.ManagerBase;
6566
import com.cloud.utils.concurrency.NamedThreadFactory;
@@ -1009,4 +1010,8 @@ protected AsyncJobManagerImpl() {
10091010

10101011
}
10111012

1013+
private void publishOnEventBus(AsyncJob job, String jobEvent) {
1014+
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
1015+
new Pair<AsyncJob, String>(job, jobEvent));
1016+
}
10121017
}

server/src/com/cloud/api/ApiServer.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.http.protocol.ResponseDate;
8383
import org.apache.http.protocol.ResponseServer;
8484
import org.apache.log4j.Logger;
85+
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
8586
import org.springframework.stereotype.Component;
8687

8788
import org.apache.cloudstack.acl.APIChecker;
@@ -119,9 +120,14 @@
119120
import org.apache.cloudstack.context.CallContext;
120121
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
121122
import org.apache.cloudstack.framework.config.impl.ConfigurationVO;
123+
import org.apache.cloudstack.framework.events.EventBus;
124+
import org.apache.cloudstack.framework.events.EventBusException;
122125
import org.apache.cloudstack.framework.jobs.AsyncJob;
123126
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
124127
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
128+
import org.apache.cloudstack.framework.messagebus.MessageBus;
129+
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
130+
import org.apache.cloudstack.framework.messagebus.MessageHandler;
125131
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
126132

127133
import com.cloud.api.dispatch.DispatchChainFactory;
@@ -130,8 +136,10 @@
130136
import com.cloud.configuration.Config;
131137
import com.cloud.domain.Domain;
132138
import com.cloud.domain.DomainVO;
139+
import com.cloud.domain.dao.DomainDao;
133140
import com.cloud.event.ActionEventUtils;
134141
import com.cloud.event.EventTypes;
142+
import com.cloud.event.EventCategory;
135143
import com.cloud.exception.AccountLimitException;
136144
import com.cloud.exception.CloudAuthenticationException;
137145
import com.cloud.exception.InsufficientCapacityException;
@@ -181,6 +189,9 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
181189
private AccountManager _accountMgr;
182190
@Inject
183191
private DomainManager _domainMgr;
192+
@Inject
193+
private DomainDao _domainDao;
194+
184195
@Inject
185196
private AsyncJobManager _asyncMgr;
186197
@Inject
@@ -200,15 +211,92 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
200211

201212
private static ExecutorService s_executor = new ThreadPoolExecutor(10, 150, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
202213
"ApiServer"));
214+
@Inject
215+
MessageBus _messageBus;
203216

204217
public ApiServer() {
205218
}
206219

207220
@Override
208221
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
222+
_messageBus.subscribe(AsyncJob.Topics.JOB_EVENT_PUBLISH, MessageDispatcher.getDispatcher(this));
209223
return true;
210224
}
211225

226+
@MessageHandler(topic = AsyncJob.Topics.JOB_EVENT_PUBLISH)
227+
private void handleAsyncJobPublishEvent(String subject, String senderAddress, Object args) {
228+
assert (args != null);
229+
230+
@SuppressWarnings("unchecked")
231+
Pair<AsyncJob, String> eventInfo = (Pair<AsyncJob, String>)args;
232+
AsyncJob job = eventInfo.first();
233+
String jobEvent = eventInfo.second();
234+
235+
if (s_logger.isTraceEnabled())
236+
s_logger.trace("Handle asyjob publish event " + jobEvent);
237+
238+
EventBus eventBus = null;
239+
try {
240+
eventBus = ComponentContext.getComponent(EventBus.class);
241+
} catch (NoSuchBeanDefinitionException nbe) {
242+
return; // no provider is configured to provide events bus, so just return
243+
}
244+
245+
if (!job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher")) {
246+
return;
247+
}
248+
249+
User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId());
250+
Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId());
251+
252+
// Get the event type from the cmdInfo json string
253+
String info = job.getCmdInfo();
254+
String cmdEventType;
255+
if (info == null) {
256+
cmdEventType = "unknown";
257+
} else {
258+
String marker = "\"cmdEventType\"";
259+
int begin = info.indexOf(marker);
260+
cmdEventType = info.substring(begin + marker.length() + 2, info.indexOf(",", begin) - 1);
261+
}
262+
263+
// For some reason, the instanceType / instanceId are not abstract, which means we may get null values.
264+
org.apache.cloudstack.framework.events.Event event = new org.apache.cloudstack.framework.events.Event(
265+
"management-server",
266+
EventCategory.ASYNC_JOB_CHANGE_EVENT.getName(),
267+
jobEvent,
268+
(job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown"), null);
269+
270+
Map<String, String> eventDescription = new HashMap<String, String>();
271+
eventDescription.put("command", job.getCmd());
272+
eventDescription.put("user", userJobOwner.getUuid());
273+
eventDescription.put("account", jobOwner.getUuid());
274+
eventDescription.put("processStatus", "" + job.getProcessStatus());
275+
eventDescription.put("resultCode", "" + job.getResultCode());
276+
eventDescription.put("instanceUuid", ApiDBUtils.findJobInstanceUuid(job));
277+
eventDescription.put("instanceType", (job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown"));
278+
eventDescription.put("commandEventType", cmdEventType);
279+
eventDescription.put("jobId", job.getUuid());
280+
// If the event.accountinfo boolean value is set, get the human readable value for the username / domainname
281+
Map<String, String> configs = _configDao.getConfiguration("management-server", new HashMap<String, String>());
282+
if (Boolean.valueOf(configs.get("event.accountinfo"))) {
283+
DomainVO domain = _domainDao.findById(jobOwner.getDomainId());
284+
eventDescription.put("username", userJobOwner.getUsername());
285+
eventDescription.put("domainname", domain.getName());
286+
}
287+
event.setDescription(eventDescription);
288+
289+
try {
290+
eventBus.publish(event);
291+
} catch (EventBusException evx) {
292+
String errMsg = "F" +
293+
"" +
294+
"ailed to publish async job event on the the event bus.";
295+
s_logger.warn(errMsg, evx);
296+
throw new CloudRuntimeException(errMsg);
297+
}
298+
}
299+
212300
@Override
213301
public boolean start() {
214302
Integer apiPort = null; // api port, null by default

0 commit comments

Comments
 (0)