Skip to content

Commit 6fd6b38

Browse files
committed
Add AsyncMethod support
1 parent 1b91641 commit 6fd6b38

17 files changed

Lines changed: 422 additions & 80 deletions
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cloudstack.framework.messaging;
21+
22+
import java.lang.reflect.InvocationTargetException;
23+
import java.lang.reflect.Method;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
public class AsyncCallbackDispatcher {
28+
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
29+
30+
public static boolean dispatch(Object target, AsyncCompletionCallback callback) {
31+
assert(callback != null);
32+
assert(target != null);
33+
34+
Method handler = resolveHandler(target.getClass(), callback.getOperationName());
35+
if(handler == null)
36+
return false;
37+
38+
try {
39+
handler.invoke(target, callback);
40+
} catch (IllegalArgumentException e) {
41+
throw new RuntimeException("IllegalArgumentException when invoking RPC callback for command: " + callback.getOperationName());
42+
} catch (IllegalAccessException e) {
43+
throw new RuntimeException("IllegalAccessException when invoking RPC callback for command: " + callback.getOperationName());
44+
} catch (InvocationTargetException e) {
45+
throw new RuntimeException("InvocationTargetException when invoking RPC callback for command: " + callback.getOperationName());
46+
}
47+
48+
return true;
49+
}
50+
51+
public static Method resolveHandler(Class<?> handlerClz, String operationName) {
52+
synchronized(s_handlerCache) {
53+
Method handler = s_handlerCache.get(handlerClz);
54+
if(handler != null)
55+
return handler;
56+
57+
for(Method method : handlerClz.getMethods()) {
58+
AsyncCallbackHandler annotation = method.getAnnotation(AsyncCallbackHandler.class);
59+
if(annotation != null) {
60+
if(annotation.operationName().equals(operationName)) {
61+
s_handlerCache.put(handlerClz, method);
62+
return method;
63+
}
64+
}
65+
}
66+
}
67+
68+
return null;
69+
}
70+
}

framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java renamed to framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@
1818
*/
1919
package org.apache.cloudstack.framework.messaging;
2020

21-
public interface ComponentContainer {
22-
ComponentEndpoint wire(ComponentEndpoint endpoint, String predefinedAddress);
21+
public interface AsyncCallbackDriver {
22+
public void performCompletionCallback(AsyncCompletionCallback callback);
2323
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.cloudstack.framework.messaging;
20+
21+
public @interface AsyncCallbackHandler {
22+
String operationName();
23+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.cloudstack.framework.messaging;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
public class AsyncCompletionCallback {
25+
private Map<String, Object> _contextMap = new HashMap<String, Object>();
26+
private String _operationName;
27+
private Object _targetObject;
28+
29+
public AsyncCompletionCallback(Object target) {
30+
_targetObject = target;
31+
}
32+
33+
public AsyncCompletionCallback setContextParam(String key, Object param) {
34+
// ???
35+
return this;
36+
}
37+
38+
public AsyncCompletionCallback attachDriver(AsyncCallbackDriver driver) {
39+
// ???
40+
return this;
41+
}
42+
43+
public AsyncCompletionCallback setOperationName(String name) {
44+
_operationName = name;
45+
return this;
46+
}
47+
48+
public String getOperationName() {
49+
return _operationName;
50+
}
51+
52+
public <T> T getContextParam(String key) {
53+
// ???
54+
return null;
55+
}
56+
57+
public void complete(Object resultObject) {
58+
///
59+
}
60+
61+
public <T> T getResult() {
62+
63+
// ???
64+
return null;
65+
}
66+
}

framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java

Lines changed: 0 additions & 65 deletions
This file was deleted.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cloudstack.framework.messaging;
21+
22+
public class EventBusEndpoint {
23+
private EventBus _eventBus;
24+
private String _sender;
25+
private PublishScope _scope;
26+
27+
public EventBusEndpoint(EventBus eventBus, String sender, PublishScope scope) {
28+
_eventBus = eventBus;
29+
_sender = sender;
30+
_scope = scope;
31+
}
32+
33+
public EventBusEndpoint setEventBus(EventBus eventBus) {
34+
_eventBus = eventBus;
35+
return this;
36+
}
37+
38+
public EventBusEndpoint setScope(PublishScope scope) {
39+
_scope = scope;
40+
return this;
41+
}
42+
43+
public PublishScope getScope() {
44+
return _scope;
45+
}
46+
47+
public EventBusEndpoint setSender(String sender) {
48+
_sender = sender;
49+
return this;
50+
}
51+
52+
public String getSender() {
53+
return _sender;
54+
}
55+
56+
public void Publish(String subject, Object args) {
57+
assert(_eventBus != null);
58+
_eventBus.publish(_sender, subject, _scope, args);
59+
}
60+
}

framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,39 @@
2323
import java.util.HashMap;
2424
import java.util.Map;
2525

26-
public class EventDispatcher {
26+
public class EventDispatcher implements Subscriber {
2727
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
2828

29+
private static Map<Object, EventDispatcher> s_targetMap = new HashMap<Object, EventDispatcher>();
30+
private Object _targetObject;
31+
32+
public EventDispatcher(Object targetObject) {
33+
_targetObject = targetObject;
34+
}
35+
36+
@Override
37+
public void onPublishEvent(String senderAddress, String subject, Object args) {
38+
dispatch(_targetObject, subject, senderAddress, args);
39+
}
40+
41+
public static EventDispatcher getDispatcher(Object targetObject) {
42+
EventDispatcher dispatcher;
43+
synchronized(s_targetMap) {
44+
dispatcher = s_targetMap.get(targetObject);
45+
if(dispatcher == null) {
46+
dispatcher = new EventDispatcher(targetObject);
47+
s_targetMap.put(targetObject, dispatcher);
48+
}
49+
}
50+
return dispatcher;
51+
}
52+
53+
public static void removeDispatcher(Object targetObject) {
54+
synchronized(s_targetMap) {
55+
s_targetMap.remove(targetObject);
56+
}
57+
}
58+
2959
public static boolean dispatch(Object target, String subject, String senderAddress, Object args) {
3060
assert(subject != null);
3161
assert(target != null);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.cloudstack.framework.messaging;
20+
21+
public class InplaceAsyncCallbackDriver implements AsyncCallbackDriver {
22+
23+
@Override
24+
public void performCompletionCallback(AsyncCompletionCallback callback) {
25+
// TODO Auto-generated method stub
26+
}
27+
}

framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,10 @@ public RpcClientCall setContextParam(String key, Object param) {
8686
return this;
8787
}
8888

89+
@SuppressWarnings("unchecked")
8990
@Override
90-
public Object getContextParam(String key) {
91-
return _contextParams.get(key);
91+
public <T> T getContextParam(String key) {
92+
return (T)_contextParams.get(key);
9293
}
9394

9495
@Override

0 commit comments

Comments
 (0)