Skip to content

Commit a95a68b

Browse files
committed
Wired JSR 356 implementation.
1 parent ed71c6b commit a95a68b

59 files changed

Lines changed: 1840 additions & 769 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cometd-demo/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,13 @@
126126
<artifactId>cometd-java-oort</artifactId>
127127
<version>${project.version}</version>
128128
</dependency>
129+
<!-- TODO: udpate dependency
129130
<dependency>
130131
<groupId>org.cometd.java</groupId>
131132
<artifactId>cometd-jetty-websocket-server</artifactId>
132133
<version>${project.version}</version>
133134
</dependency>
135+
-->
134136
<dependency>
135137
<groupId>org.cometd.java</groupId>
136138
<artifactId>cometd-java-server</artifactId>

cometd-java/cometd-java-annotations/src/test/java/org/cometd/annotation/ClientAnnotationProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public static void stopServer() throws Exception
100100
@Before
101101
public void init()
102102
{
103-
bayeuxClient = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient));
103+
bayeuxClient = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient));
104104
bayeuxClient.setDebugEnabled(Boolean.getBoolean("debugTests"));
105105
processor = new ClientAnnotationProcessor(bayeuxClient);
106106
}

cometd-java/cometd-java-benchmark/cometd-java-benchmark-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
</dependency>
6868
<dependency>
6969
<groupId>org.cometd.java</groupId>
70-
<artifactId>cometd-jetty-websocket-client</artifactId>
70+
<artifactId>cometd-java-websocket-jetty-client</artifactId>
7171
<version>${project.version}</version>
7272
</dependency>
7373
<dependency>

cometd-java/cometd-java-benchmark/cometd-java-benchmark-client/src/main/java/org/cometd/benchmark/client/BayeuxLoadClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ private ClientTransport newClientTransport(ClientTransportType clientTransportTy
438438
Map<String, Object> options = new HashMap<>();
439439
options.put(ClientTransport.JSON_CONTEXT, new Jackson1JSONContextClient());
440440
options.put(JettyWebSocketTransport.IDLE_TIMEOUT_OPTION, 35000);
441-
return new JettyWebSocketTransport(options, webSocketClient, scheduler);
441+
return new JettyWebSocketTransport(options, scheduler, webSocketClient);
442442
}
443443
default:
444444
{

cometd-java/cometd-java-benchmark/cometd-java-benchmark-server/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
</dependency>
5656
<dependency>
5757
<groupId>org.cometd.java</groupId>
58-
<artifactId>cometd-jetty-websocket-server</artifactId>
58+
<artifactId>cometd-java-websocket-jetty-server</artifactId>
5959
<version>${project.version}</version>
6060
</dependency>
6161
<dependency>

cometd-java/cometd-java-benchmark/cometd-java-benchmark-server/src/main/java/org/cometd/benchmark/server/BayeuxLoadServer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ public void run() throws Exception
216216
MonitoringThreadPoolExecutor websocketThreadPool = new MonitoringThreadPoolExecutor(maxThreads, jettyThreadPool.getIdleTimeout(), TimeUnit.MILLISECONDS, new ThreadPoolExecutor.AbortPolicy());
217217

218218
LoadWebSocketTransport webSocketTransport = new LoadWebSocketTransport(bayeux, websocketThreadPool);
219-
webSocketTransport.init();
220219
bayeux.addTransport(webSocketTransport);
221220
bayeux.setAllowedTransports("websocket", "long-polling");
222221

cometd-java/cometd-java-client/src/main/java/org/cometd/client/transport/ClientTransport.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,9 @@ protected String generateJSON(Message.Mutable[] messages)
126126
{
127127
return jsonContext.generate(messages);
128128
}
129+
130+
public interface Factory
131+
{
132+
public ClientTransport newClientTransport(Map<String, Object> options);
133+
}
129134
}

cometd-java/cometd-java-client/src/main/java/org/cometd/client/transport/LongPollingTransport.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -50,31 +50,6 @@ public class LongPollingTransport extends HttpClientTransport
5050
public static final String NAME = "long-polling";
5151
public static final String PREFIX = "long-polling.json";
5252

53-
public static LongPollingTransport create(Map<String, Object> options)
54-
{
55-
HttpClient httpClient = new HttpClient();
56-
httpClient.setIdleTimeout(5000);
57-
httpClient.setMaxConnectionsPerDestination(32768);
58-
return create(options, httpClient);
59-
}
60-
61-
public static LongPollingTransport create(Map<String, Object> options, HttpClient httpClient)
62-
{
63-
LongPollingTransport transport = new LongPollingTransport(options, httpClient);
64-
if (!httpClient.isStarted())
65-
{
66-
try
67-
{
68-
httpClient.start();
69-
}
70-
catch (Exception x)
71-
{
72-
throw new RuntimeException(x);
73-
}
74-
}
75-
return transport;
76-
}
77-
7853
private final HttpClient _httpClient;
7954
private final List<Request> _requests = new ArrayList<>();
8055
private volatile boolean _aborted;
@@ -290,4 +265,20 @@ public void onComplete(Result result)
290265
protected void customize(Request request)
291266
{
292267
}
268+
269+
public static class Factory implements ClientTransport.Factory
270+
{
271+
private final HttpClient httpClient;
272+
273+
public Factory(HttpClient httpClient)
274+
{
275+
this.httpClient = httpClient;
276+
}
277+
278+
@Override
279+
public ClientTransport newClientTransport(Map<String, Object> options)
280+
{
281+
return new LongPollingTransport(options, httpClient);
282+
}
283+
}
293284
}

cometd-java/cometd-java-client/src/test/java/org/cometd/client/BayeuxClientConcurrentTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void startServer() throws Exception
4545
public void testHandshakeFailsConcurrentDisconnect() throws Exception
4646
{
4747
final CountDownLatch latch = new CountDownLatch(1);
48-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
48+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
4949
{
5050
@Override
5151
protected boolean sendHandshake()
@@ -68,7 +68,7 @@ protected boolean sendHandshake()
6868
public void testConnectFailsConcurrentDisconnect() throws Exception
6969
{
7070
final CountDownLatch latch = new CountDownLatch(1);
71-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
71+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
7272
{
7373
@Override
7474
protected boolean sendConnect()
@@ -90,7 +90,7 @@ protected boolean sendConnect()
9090
@Test
9191
public void testSubscribeFailsConcurrentDisconnect() throws Exception
9292
{
93-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
93+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
9494
{
9595
@Override
9696
protected void enqueueSend(Message.Mutable message)
@@ -127,7 +127,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
127127
public void testPublishFailsConcurrentDisconnect() throws Exception
128128
{
129129
final String channelName = "/test";
130-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
130+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
131131
{
132132
@Override
133133
protected void enqueueSend(Message.Mutable message)
@@ -185,7 +185,7 @@ public void testPublishFailsConcurrentNetworkDown() throws Exception
185185
{
186186
final String channelName = "/test";
187187
final AtomicInteger connects = new AtomicInteger();
188-
final BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
188+
final BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
189189
{
190190
@Override
191191
protected boolean sendConnect()
@@ -224,7 +224,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
224224
@Test
225225
public void testHandshakeListenersAreNotifiedBeforeConnectListeners() throws Exception
226226
{
227-
final BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient));
227+
final BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient));
228228
client.setDebugEnabled(debugTests());
229229
final int sleep = 1000;
230230
final AtomicBoolean handshaken = new AtomicBoolean();
@@ -263,7 +263,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
263263
public void testConcurrentHandshakeAndBatch() throws Exception
264264
{
265265
final CountDownLatch sendLatch = new CountDownLatch(1);
266-
final BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
266+
final BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
267267
{
268268
@Override
269269
protected boolean sendMessages(Message.Mutable... messages)

cometd-java/cometd-java-client/src/test/java/org/cometd/client/BayeuxClientTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ protected void customize(Request request)
205205
request.method(HttpMethod.PUT);
206206
}
207207
};
208-
final AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
208+
final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
209209
BayeuxClient client = new BayeuxClient(cometdURL, transport)
210210
{
211211
@Override
@@ -278,7 +278,7 @@ public void testHandshakeFailsNoTransports() throws Exception
278278
final CountDownLatch handshakeLatch = new CountDownLatch(1);
279279
final CountDownLatch connectLatch = new CountDownLatch(1);
280280

281-
final BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
281+
final BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
282282
{
283283
@Override
284284
protected void processHandshake(Message.Mutable message)
@@ -326,7 +326,7 @@ public void testHandshakeRetries() throws Exception
326326

327327
final BlockingArrayQueue<Message> queue = new BlockingArrayQueue<>(100, 100);
328328
final AtomicBoolean connected = new AtomicBoolean(false);
329-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
329+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
330330
{
331331
@Override
332332
public void onFailure(Throwable x, Message[] messages)
@@ -404,7 +404,7 @@ public void testConnectRetries() throws Exception
404404
{
405405
final AtomicInteger connects = new AtomicInteger();
406406
final CountDownLatch attempts = new CountDownLatch(4);
407-
final BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
407+
final BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
408408
{
409409
@Override
410410
protected boolean scheduleConnect(long interval, long backoff)
@@ -581,7 +581,7 @@ public boolean onMessage(ServerSession from, ServerChannel channel, Mutable mess
581581
@Test
582582
public void testWaitFor() throws Exception
583583
{
584-
final BlockingArrayQueue<String> results = new BlockingArrayQueue<String>();
584+
final BlockingArrayQueue<String> results = new BlockingArrayQueue<>();
585585

586586
String channelName = "/chat/msg";
587587
MarkedReference<ServerChannel> channel = bayeux.createChannelIfAbsent(channelName);
@@ -650,7 +650,7 @@ public void testURLWithImplicitPort() throws Exception
650650
}
651651

652652
final CountDownLatch latch = new CountDownLatch(1);
653-
BayeuxClient client = new BayeuxClient("http://localhost/cometd", LongPollingTransport.create(null, httpClient))
653+
BayeuxClient client = new BayeuxClient("http://localhost/cometd", new LongPollingTransport(null, httpClient))
654654
{
655655
@Override
656656
public void onFailure(Throwable x, Message[] messages)
@@ -687,7 +687,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
687687
@Test
688688
public void testAbortNotifiesListeners() throws Exception
689689
{
690-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
690+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
691691
{
692692
@Override
693693
public void onFailure(Throwable x, Message[] messages)
@@ -724,7 +724,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
724724
public void testAbortThenRestart() throws Exception
725725
{
726726
final AtomicReference<CountDownLatch> connectLatch = new AtomicReference<>(new CountDownLatch(2));
727-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
727+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
728728
{
729729
@Override
730730
public void onSending(Message[] messages)
@@ -769,7 +769,7 @@ public void testAbortBeforePublishThenRestart() throws Exception
769769
final AtomicReference<CountDownLatch> connectLatch = new AtomicReference<>(new CountDownLatch(1));
770770
final CountDownLatch publishLatch = new CountDownLatch(1);
771771
final CountDownLatch failureLatch = new CountDownLatch(1);
772-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
772+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
773773
{
774774
@Override
775775
protected AbstractSessionChannel newChannel(ChannelId channelId)
@@ -846,7 +846,7 @@ public void testAbortAfterPublishThenRestart() throws Exception
846846
final AtomicBoolean abort = new AtomicBoolean(false);
847847
final AtomicReference<CountDownLatch> connectLatch = new AtomicReference<>(new CountDownLatch(1));
848848
final AtomicReference<CountDownLatch> publishLatch = new AtomicReference<>(new CountDownLatch(1));
849-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
849+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
850850
{
851851
@Override
852852
protected boolean sendMessages(Message.Mutable... messages)
@@ -905,7 +905,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
905905
@Test
906906
public void testRestart() throws Exception
907907
{
908-
BayeuxClient client = new BayeuxClient(cometdURL, LongPollingTransport.create(null, httpClient))
908+
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(null, httpClient))
909909
{
910910
@Override
911911
public void onFailure(Throwable x, Message[] messages)

0 commit comments

Comments
 (0)