Skip to content

Commit 11e9bac

Browse files
committed
Add server side transport driver
1 parent f529506 commit 11e9bac

4 files changed

Lines changed: 86 additions & 6 deletions

File tree

framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,25 @@
2424
import java.util.Map;
2525

2626
public class TransportEndpointSite {
27+
private TransportProvider _provider;
2728
private TransportEndpoint _endpoint;
2829
private TransportAddress _address;
2930

3031
private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
3132
private Map<String, TransportMultiplexier> _multiplexierMap = new HashMap<String, TransportMultiplexier>();
3233

33-
public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) {
34+
private int _outstandingSignalRequests;
35+
36+
public TransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint, TransportAddress address) {
37+
assert(provider != null);
3438
assert(endpoint != null);
3539
assert(address != null);
3640

41+
_provider = provider;
3742
_endpoint = endpoint;
3843
_address = address;
44+
45+
_outstandingSignalRequests = 0;
3946
}
4047

4148
public TransportEndpoint getEndpoint() {
@@ -68,7 +75,7 @@ public void addOutputPdu(TransportPdu pdu) {
6875
_outputQueue.add(pdu);
6976
}
7077

71-
processOutput();
78+
signalOutputProcessRequest();
7279
}
7380

7481
public TransportPdu getNextOutputPdu() {
@@ -80,7 +87,7 @@ public TransportPdu getNextOutputPdu() {
8087
return null;
8188
}
8289

83-
private void processOutput() {
90+
public void processOutput() {
8491
TransportPdu pdu;
8592
TransportEndpoint endpoint = getEndpoint();
8693

@@ -104,4 +111,24 @@ private TransportMultiplexier getRoutedMultiplexier(String multiplexierName) {
104111

105112
return multiplexier;
106113
}
114+
115+
private void signalOutputProcessRequest() {
116+
boolean proceed = false;
117+
synchronized(this) {
118+
if(_outstandingSignalRequests == 0) {
119+
_outstandingSignalRequests++;
120+
proceed = true;
121+
}
122+
}
123+
124+
if(proceed)
125+
_provider.requestSiteOutput(this);
126+
}
127+
128+
public void ackOutputProcessSignal() {
129+
synchronized(this) {
130+
assert(_outstandingSignalRequests == 1);
131+
_outstandingSignalRequests--;
132+
}
133+
}
107134
}

framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public interface TransportProvider {
2222
TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress);
2323
boolean detach(TransportEndpoint endpoint);
2424

25+
void requestSiteOutput(TransportEndpointSite site);
26+
2527
void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
2628
String multiplexier, String message);
2729
}

framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ public boolean detach(TransportEndpoint endpoint) {
3737
return false;
3838
}
3939

40+
@Override
41+
public void requestSiteOutput(TransportEndpointSite site) {
42+
// ???
43+
}
44+
4045
@Override
4146
public void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
4247
String multiplexier, String message) {

framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,57 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323
import java.util.Random;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
2426

2527
import org.apache.cloudstack.framework.messaging.TransportAddress;
2628
import org.apache.cloudstack.framework.messaging.TransportDataPdu;
2729
import org.apache.cloudstack.framework.messaging.TransportEndpoint;
2830
import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
2931
import org.apache.cloudstack.framework.messaging.TransportPdu;
3032
import org.apache.cloudstack.framework.messaging.TransportProvider;
33+
import org.apache.log4j.Logger;
34+
35+
import com.cloud.utils.concurrency.NamedThreadFactory;
3136

3237
public class ServerTransportProvider implements TransportProvider {
38+
private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class);
39+
40+
public static final int DEFAULT_WORKER_POOL_SIZE = 5;
41+
3342
private String _nodeId;
3443

3544
private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>();
45+
private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
46+
private ExecutorService _executor;
3647

3748
private int _nextEndpointId = new Random().nextInt();
3849

3950
public ServerTransportProvider() {
4051
}
4152

42-
public String getNodeId() { return _nodeId; }
43-
public void setNodeId(String nodeId) {
53+
public String getNodeId() {
54+
return _nodeId;
55+
}
56+
57+
public ServerTransportProvider setNodeId(String nodeId) {
4458
_nodeId = nodeId;
59+
return this;
60+
}
61+
62+
public int getWorkerPoolSize() {
63+
return _poolSize;
64+
}
65+
66+
public ServerTransportProvider setWorkerPoolSize(int poolSize) {
67+
assert(poolSize > 0);
68+
69+
_poolSize = poolSize;
70+
return this;
71+
}
72+
73+
public void initialize() {
74+
_executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
4575
}
4676

4777
@Override
@@ -64,7 +94,7 @@ public TransportEndpointSite attach(TransportEndpoint endpoint, String predefine
6494
// already attached
6595
return endpointSite;
6696
}
67-
endpointSite = new TransportEndpointSite(endpoint, transportAddress);
97+
endpointSite = new TransportEndpointSite(this, endpoint, transportAddress);
6898
_endpointMap.put(endpointId, endpointSite);
6999
}
70100

@@ -86,6 +116,22 @@ public boolean detach(TransportEndpoint endpoint) {
86116
return false;
87117
}
88118

119+
@Override
120+
public void requestSiteOutput(final TransportEndpointSite site) {
121+
_executor.execute(new Runnable() {
122+
123+
@Override
124+
public void run() {
125+
try {
126+
site.processOutput();
127+
site.ackOutputProcessSignal();
128+
} catch(Throwable e) {
129+
s_logger.error("Unhandled exception", e);
130+
}
131+
}
132+
});
133+
}
134+
89135
@Override
90136
public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress,
91137
String multiplexier, String message) {

0 commit comments

Comments
 (0)