8282import org .apache .http .protocol .ResponseDate ;
8383import org .apache .http .protocol .ResponseServer ;
8484import org .apache .log4j .Logger ;
85+ import org .springframework .beans .factory .NoSuchBeanDefinitionException ;
8586import org .springframework .stereotype .Component ;
8687
8788import org .apache .cloudstack .acl .APIChecker ;
119120import org .apache .cloudstack .context .CallContext ;
120121import org .apache .cloudstack .framework .config .dao .ConfigurationDao ;
121122import org .apache .cloudstack .framework .config .impl .ConfigurationVO ;
123+ import org .apache .cloudstack .framework .events .EventBus ;
124+ import org .apache .cloudstack .framework .events .EventBusException ;
122125import org .apache .cloudstack .framework .jobs .AsyncJob ;
123126import org .apache .cloudstack .framework .jobs .AsyncJobManager ;
124127import org .apache .cloudstack .framework .jobs .impl .AsyncJobVO ;
128+ import org .apache .cloudstack .framework .messagebus .MessageBus ;
129+ import org .apache .cloudstack .framework .messagebus .MessageDispatcher ;
130+ import org .apache .cloudstack .framework .messagebus .MessageHandler ;
125131import org .apache .cloudstack .managed .context .ManagedContextRunnable ;
126132
127133import com .cloud .api .dispatch .DispatchChainFactory ;
130136import com .cloud .configuration .Config ;
131137import com .cloud .domain .Domain ;
132138import com .cloud .domain .DomainVO ;
139+ import com .cloud .domain .dao .DomainDao ;
133140import com .cloud .event .ActionEventUtils ;
134141import com .cloud .event .EventTypes ;
142+ import com .cloud .event .EventCategory ;
135143import com .cloud .exception .AccountLimitException ;
136144import com .cloud .exception .CloudAuthenticationException ;
137145import com .cloud .exception .InsufficientCapacityException ;
@@ -181,6 +189,9 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
181189 private AccountManager _accountMgr ;
182190 @ Inject
183191 private DomainManager _domainMgr ;
192+ @ Inject
193+ private DomainDao _domainDao ;
194+
184195 @ Inject
185196 private AsyncJobManager _asyncMgr ;
186197 @ Inject
@@ -200,15 +211,92 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
200211
201212 private static ExecutorService s_executor = new ThreadPoolExecutor (10 , 150 , 60 , TimeUnit .SECONDS , new LinkedBlockingQueue <Runnable >(), new NamedThreadFactory (
202213 "ApiServer" ));
214+ @ Inject
215+ MessageBus _messageBus ;
203216
204217 public ApiServer () {
205218 }
206219
207220 @ Override
208221 public boolean configure (final String name , final Map <String , Object > params ) throws ConfigurationException {
222+ _messageBus .subscribe (AsyncJob .Topics .JOB_EVENT_PUBLISH , MessageDispatcher .getDispatcher (this ));
209223 return true ;
210224 }
211225
226+ @ MessageHandler (topic = AsyncJob .Topics .JOB_EVENT_PUBLISH )
227+ private void handleAsyncJobPublishEvent (String subject , String senderAddress , Object args ) {
228+ assert (args != null );
229+
230+ @ SuppressWarnings ("unchecked" )
231+ Pair <AsyncJob , String > eventInfo = (Pair <AsyncJob , String >)args ;
232+ AsyncJob job = eventInfo .first ();
233+ String jobEvent = eventInfo .second ();
234+
235+ if (s_logger .isTraceEnabled ())
236+ s_logger .trace ("Handle asyjob publish event " + jobEvent );
237+
238+ EventBus eventBus = null ;
239+ try {
240+ eventBus = ComponentContext .getComponent (EventBus .class );
241+ } catch (NoSuchBeanDefinitionException nbe ) {
242+ return ; // no provider is configured to provide events bus, so just return
243+ }
244+
245+ if (!job .getDispatcher ().equalsIgnoreCase ("ApiAsyncJobDispatcher" )) {
246+ return ;
247+ }
248+
249+ User userJobOwner = _accountMgr .getUserIncludingRemoved (job .getUserId ());
250+ Account jobOwner = _accountMgr .getAccount (userJobOwner .getAccountId ());
251+
252+ // Get the event type from the cmdInfo json string
253+ String info = job .getCmdInfo ();
254+ String cmdEventType ;
255+ if (info == null ) {
256+ cmdEventType = "unknown" ;
257+ } else {
258+ String marker = "\" cmdEventType\" " ;
259+ int begin = info .indexOf (marker );
260+ cmdEventType = info .substring (begin + marker .length () + 2 , info .indexOf ("," , begin ) - 1 );
261+ }
262+
263+ // For some reason, the instanceType / instanceId are not abstract, which means we may get null values.
264+ org .apache .cloudstack .framework .events .Event event = new org .apache .cloudstack .framework .events .Event (
265+ "management-server" ,
266+ EventCategory .ASYNC_JOB_CHANGE_EVENT .getName (),
267+ jobEvent ,
268+ (job .getInstanceType () != null ? job .getInstanceType ().toString () : "unknown" ), null );
269+
270+ Map <String , String > eventDescription = new HashMap <String , String >();
271+ eventDescription .put ("command" , job .getCmd ());
272+ eventDescription .put ("user" , userJobOwner .getUuid ());
273+ eventDescription .put ("account" , jobOwner .getUuid ());
274+ eventDescription .put ("processStatus" , "" + job .getProcessStatus ());
275+ eventDescription .put ("resultCode" , "" + job .getResultCode ());
276+ eventDescription .put ("instanceUuid" , ApiDBUtils .findJobInstanceUuid (job ));
277+ eventDescription .put ("instanceType" , (job .getInstanceType () != null ? job .getInstanceType ().toString () : "unknown" ));
278+ eventDescription .put ("commandEventType" , cmdEventType );
279+ eventDescription .put ("jobId" , job .getUuid ());
280+ // If the event.accountinfo boolean value is set, get the human readable value for the username / domainname
281+ Map <String , String > configs = _configDao .getConfiguration ("management-server" , new HashMap <String , String >());
282+ if (Boolean .valueOf (configs .get ("event.accountinfo" ))) {
283+ DomainVO domain = _domainDao .findById (jobOwner .getDomainId ());
284+ eventDescription .put ("username" , userJobOwner .getUsername ());
285+ eventDescription .put ("domainname" , domain .getName ());
286+ }
287+ event .setDescription (eventDescription );
288+
289+ try {
290+ eventBus .publish (event );
291+ } catch (EventBusException evx ) {
292+ String errMsg = "F" +
293+ "" +
294+ "ailed to publish async job event on the the event bus." ;
295+ s_logger .warn (errMsg , evx );
296+ throw new CloudRuntimeException (errMsg );
297+ }
298+ }
299+
212300 @ Override
213301 public boolean start () {
214302 Integer apiPort = null ; // api port, null by default
0 commit comments