Skip to content

Commit fc16e1e

Browse files
committed
Finish RPC service server side implementation
1 parent 1d75063 commit fc16e1e

4 files changed

Lines changed: 136 additions & 14 deletions

File tree

framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public interface RpcClientCall {
3232
Object getContextParam(String key);
3333

3434
<T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener);
35+
RpcClientCall setCallbackDispatcherTarget(Object target);
36+
3537
RpcClientCall setOneway();
3638

3739
void apply();

framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public class RpcClientCallImpl implements RpcClientCall {
3333
private boolean _oneway = false;
3434

3535
private List<RpcCallbackListener> _callbackListeners = new ArrayList<RpcCallbackListener>();
36-
36+
private Object _callbackDispatcherTarget;
37+
3738
private RpcProvider _rpcProvider;
3839
private long _startTickInMs;
3940
private long _callTag;
@@ -95,6 +96,13 @@ public <T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener) {
9596
_callbackListeners.add(listener);
9697
return this;
9798
}
99+
100+
@Override
101+
public RpcClientCall setCallbackDispatcherTarget(Object target) {
102+
_callbackDispatcherTarget = target;
103+
return this;
104+
}
105+
98106

99107
@Override
100108
public RpcClientCall setOneway() {
@@ -189,11 +197,16 @@ public void complete(String result) {
189197
_responseDone = true;
190198
_responseLock.notifyAll();
191199
}
192-
193-
assert(_rpcProvider.getMessageSerializer() != null);
194-
Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
195-
for(RpcCallbackListener listener: _callbackListeners)
196-
listener.onSuccess(resultObject);
200+
201+
if(_callbackListeners.size() > 0) {
202+
assert(_rpcProvider.getMessageSerializer() != null);
203+
Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
204+
for(RpcCallbackListener listener: _callbackListeners)
205+
listener.onSuccess(resultObject);
206+
} else {
207+
if(_callbackDispatcherTarget != null)
208+
RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
209+
}
197210
}
198211

199212
public void complete(RpcException e) {
@@ -205,7 +218,12 @@ public void complete(RpcException e) {
205218
_responseLock.notifyAll();
206219
}
207220

208-
for(RpcCallbackListener listener: _callbackListeners)
209-
listener.onFailure(e);
221+
if(_callbackListeners.size() > 0) {
222+
for(RpcCallbackListener listener: _callbackListeners)
223+
listener.onFailure(e);
224+
} else {
225+
if(_callbackDispatcherTarget != null)
226+
RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
227+
}
210228
}
211229
}

framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public void onTransportMessage(String senderEndpointAddress,
5050

5151
Object pdu = _messageSerializer.serializeFrom(message);
5252
if(pdu instanceof RpcCallRequestPdu) {
53-
handleCallRequestPdu((RpcCallRequestPdu)pdu);
53+
handleCallRequestPdu(senderEndpointAddress, targetEndpointAddress, (RpcCallRequestPdu)pdu);
5454
} else if(pdu instanceof RpcCallResponsePdu) {
55-
handleCallResponsePdu((RpcCallResponsePdu)pdu);
55+
handleCallResponsePdu(senderEndpointAddress, targetEndpointAddress, (RpcCallResponsePdu)pdu);
5656
} else {
5757
assert(false);
5858
}
@@ -81,7 +81,7 @@ public void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
8181
_serviceEndpoints.remove(rpcEndpoint);
8282
}
8383
}
84-
84+
8585
@Override
8686
public RpcClientCall newCall(String sourceAddress, String targetAddress) {
8787
long callTag = getNextCallTag();
@@ -124,11 +124,42 @@ protected synchronized long getNextCallTag() {
124124
return tag;
125125
}
126126

127-
private void handleCallRequestPdu(RpcCallRequestPdu pdu) {
128-
// ???
127+
private void handleCallRequestPdu(String sourceAddress, String targetAddress, RpcCallRequestPdu pdu) {
128+
try {
129+
RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu);
130+
131+
// TODO, we are trying to avoid locking when calling into callbacks
132+
// this can be optimized later
133+
List<RpcServiceEndpoint> endpoints = new ArrayList<RpcServiceEndpoint>();
134+
synchronized(_serviceEndpoints) {
135+
endpoints.addAll(_serviceEndpoints);
136+
}
137+
138+
for(RpcServiceEndpoint endpoint : endpoints) {
139+
if(RpcServiceDispatcher.dispatch(endpoint, call))
140+
return;
141+
}
142+
143+
RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
144+
responsePdu.setCommand(pdu.getCommand());
145+
responsePdu.setRequestStartTick(pdu.getRequestStartTick());
146+
responsePdu.setRequestTag(pdu.getRequestTag());
147+
responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST);
148+
sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
149+
150+
} catch (Throwable e) {
151+
152+
RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
153+
responsePdu.setCommand(pdu.getCommand());
154+
responsePdu.setRequestStartTick(pdu.getRequestStartTick());
155+
responsePdu.setRequestTag(pdu.getRequestTag());
156+
responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION);
157+
158+
sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
159+
}
129160
}
130161

131-
private void handleCallResponsePdu(RpcCallResponsePdu pdu) {
162+
private void handleCallResponsePdu(String sourceAddress, String targetAddress, RpcCallResponsePdu pdu) {
132163
RpcClientCallImpl call = null;
133164

134165
synchronized(this) {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.cloudstack.framework.messaging;
20+
21+
public class RpcServerCallImpl implements RpcServerCall {
22+
23+
private RpcProvider _rpcProvider;
24+
private String _sourceAddress;
25+
private String _targetAddress;
26+
27+
private RpcCallRequestPdu _requestPdu;
28+
29+
public RpcServerCallImpl(RpcProvider provider, String sourceAddress, String targetAddress,
30+
RpcCallRequestPdu requestPdu) {
31+
32+
_rpcProvider = provider;
33+
_sourceAddress = sourceAddress;
34+
_targetAddress = targetAddress;
35+
_requestPdu = requestPdu;
36+
}
37+
38+
@Override
39+
public String getCommand() {
40+
assert(_requestPdu != null);
41+
return _requestPdu.getCommand();
42+
}
43+
44+
@Override
45+
public Object getCommandArgument() {
46+
if(_requestPdu.getSerializedCommandArg() == null)
47+
return null;
48+
49+
assert(_rpcProvider.getMessageSerializer() != null);
50+
return _rpcProvider.getMessageSerializer().serializeFrom(_requestPdu.getSerializedCommandArg());
51+
}
52+
53+
@Override
54+
public void completeCall(Object returnObject) {
55+
assert(_sourceAddress != null);
56+
assert(_targetAddress != null);
57+
58+
RpcCallResponsePdu pdu = new RpcCallResponsePdu();
59+
pdu.setCommand(_requestPdu.getCommand());
60+
pdu.setRequestTag(_requestPdu.getRequestTag());
61+
pdu.setRequestStartTick(_requestPdu.getRequestStartTick());
62+
pdu.setRequestStartTick(RpcCallResponsePdu.RESULT_SUCCESSFUL);
63+
if(returnObject != null) {
64+
assert(_rpcProvider.getMessageSerializer() != null);
65+
pdu.setSerializedResult(_rpcProvider.getMessageSerializer().serializeTo(returnObject.getClass(), returnObject));
66+
}
67+
68+
_rpcProvider.sendRpcPdu(_targetAddress, _sourceAddress,
69+
_rpcProvider.getMessageSerializer().serializeTo(RpcCallResponsePdu.class, pdu));
70+
}
71+
}

0 commit comments

Comments
 (0)