Skip to content

Commit 3e1d253

Browse files
committed
Merged branch '2.7.x' into 'master'.
2 parents b6a6e75 + 61619fd commit 3e1d253

18 files changed

Lines changed: 966 additions & 273 deletions

File tree

cometd-java/cometd-java-annotations/src/main/java/org/cometd/annotation/ClientAnnotationProcessor.java

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
*/
6161
public class ClientAnnotationProcessor extends AnnotationProcessor
6262
{
63+
private final ConcurrentMap<Object, ClientSessionChannel.MessageListener> handshakeListeners = new ConcurrentHashMap<>();
6364
private final ConcurrentMap<Object, List<ListenerCallback>> listeners = new ConcurrentHashMap<>();
6465
private final ConcurrentMap<Object, List<SubscriptionCallback>> subscribers = new ConcurrentHashMap<>();
6566
private final ClientSession clientSession;
@@ -83,12 +84,24 @@ public ClientAnnotationProcessor(ClientSession clientSession, Object... injectab
8384
*/
8485
public boolean process(Object bean)
8586
{
87+
processMetaHandshakeListener(bean);
8688
boolean result = processDependencies(bean);
8789
result |= processCallbacks(bean);
8890
result |= processPostConstruct(bean);
8991
return result;
9092
}
9193

94+
private void processMetaHandshakeListener(Object bean)
95+
{
96+
if (bean != null)
97+
{
98+
MetaHandshakeListener listener = new MetaHandshakeListener(bean);
99+
ClientSessionChannel.MessageListener existing = handshakeListeners.putIfAbsent(bean, listener);
100+
if (existing == null)
101+
clientSession.getChannel(Channel.META_HANDSHAKE).addListener(listener);
102+
}
103+
}
104+
92105
/**
93106
* Processes lifecycle methods annotated with {@link PostConstruct}.
94107
* @param bean the annotated service instance
@@ -128,11 +141,19 @@ private boolean processCallbacks(Object bean)
128141
*/
129142
public boolean deprocess(Object bean)
130143
{
144+
deprocessMetaHandshakeListener(bean);
131145
boolean result = deprocessCallbacks(bean);
132146
result |= processPreDestroy(bean);
133147
return result;
134148
}
135149

150+
private void deprocessMetaHandshakeListener(Object bean)
151+
{
152+
ClientSessionChannel.MessageListener listener = handshakeListeners.remove(bean);
153+
if (listener != null)
154+
clientSession.getChannel(Channel.META_HANDSHAKE).removeListener(listener);
155+
}
156+
136157
/**
137158
* Deconfigures callbacks annotated with {@link Listener} and {@link Subscription}.
138159
* @param bean the annotated service instance
@@ -252,7 +273,7 @@ private boolean processListener(Object bean)
252273
private boolean deprocessListener(Object bean)
253274
{
254275
boolean result = false;
255-
List<ListenerCallback> callbacks = listeners.get(bean);
276+
List<ListenerCallback> callbacks = listeners.remove(bean);
256277
if (callbacks != null)
257278
{
258279
for (ListenerCallback callback : callbacks)
@@ -289,8 +310,6 @@ private boolean processSubscription(Object bean)
289310
// We should delay the subscription if the client session did not complete the handshake
290311
if (clientSession.isHandshook())
291312
clientSession.getChannel(channel).subscribe(subscriptionCallback);
292-
else
293-
clientSession.getChannel(Channel.META_HANDSHAKE).addListener(subscriptionCallback);
294313

295314
List<SubscriptionCallback> callbacks = subscribers.get(bean);
296315
if (callbacks == null)
@@ -313,7 +332,7 @@ private boolean processSubscription(Object bean)
313332
private boolean deprocessSubscription(Object bean)
314333
{
315334
boolean result = false;
316-
List<SubscriptionCallback> callbacks = subscribers.get(bean);
335+
List<SubscriptionCallback> callbacks = subscribers.remove(bean);
317336
if (callbacks != null)
318337
{
319338
for (SubscriptionCallback callback : callbacks)
@@ -388,25 +407,6 @@ public SubscriptionCallback(ClientSession clientSession, Object target, Method m
388407
}
389408

390409
public void onMessage(ClientSessionChannel channel, Message message)
391-
{
392-
if (Channel.META_HANDSHAKE.equals(channel.getId()))
393-
{
394-
if (message.isSuccessful())
395-
subscribe();
396-
}
397-
else
398-
{
399-
forward(message);
400-
}
401-
}
402-
403-
private void subscribe()
404-
{
405-
clientSession.getChannel(channel).subscribe(this);
406-
clientSession.getChannel(Channel.META_HANDSHAKE).removeListener(this);
407-
}
408-
409-
private void forward(Message message)
410410
{
411411
try
412412
{
@@ -426,5 +426,39 @@ private void forward(Message message)
426426
throw new RuntimeException(x);
427427
}
428428
}
429+
430+
private void subscribe()
431+
{
432+
clientSession.getChannel(channel).subscribe(this);
433+
}
434+
}
435+
436+
private class MetaHandshakeListener implements ClientSessionChannel.MessageListener
437+
{
438+
private final Object bean;
439+
440+
public MetaHandshakeListener(Object bean)
441+
{
442+
this.bean = bean;
443+
}
444+
445+
public void onMessage(ClientSessionChannel channel, Message message)
446+
{
447+
if (message.isSuccessful())
448+
{
449+
final List<SubscriptionCallback> subscriptions = subscribers.get(bean);
450+
if (subscriptions != null)
451+
{
452+
clientSession.batch(new Runnable()
453+
{
454+
public void run()
455+
{
456+
for (SubscriptionCallback subscription : subscriptions)
457+
subscription.subscribe();
458+
}
459+
});
460+
}
461+
}
462+
}
429463
}
430464
}

cometd-java/cometd-java-annotations/src/main/java/org/cometd/annotation/ServerAnnotationProcessor.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,17 @@ public boolean deprocessCallbacks(Object bean)
290290

291291
boolean result = deprocessListener(bean);
292292
result |= deprocessSubscription(bean);
293+
destroyLocalSession(bean);
293294
return result;
294295
}
295296

297+
private void destroyLocalSession(Object bean)
298+
{
299+
LocalSession session = sessions.remove(bean);
300+
if (session != null)
301+
session.disconnect();
302+
}
303+
296304
/**
297305
* Processes lifecycle methods annotated with {@link PreDestroy}.
298306
*
@@ -416,7 +424,7 @@ private boolean processListener(Object bean, LocalSession localSession)
416424
private boolean deprocessListener(Object bean)
417425
{
418426
boolean result = false;
419-
List<ListenerCallback> callbacks = listeners.get(bean);
427+
List<ListenerCallback> callbacks = listeners.remove(bean);
420428
if (callbacks != null)
421429
{
422430
for (ListenerCallback callback : callbacks)
@@ -473,7 +481,7 @@ private boolean processSubscription(Object bean, LocalSession localSession)
473481
private boolean deprocessSubscription(Object bean)
474482
{
475483
boolean result = false;
476-
List<SubscriptionCallback> callbacks = subscribers.get(bean);
484+
List<SubscriptionCallback> callbacks = subscribers.remove(bean);
477485
if (callbacks != null)
478486
{
479487
for (SubscriptionCallback callback : callbacks)

cometd-java/cometd-java-annotations/src/test/java/org/cometd/annotation/ClientAnnotationProcessorTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,4 +414,75 @@ public static class InjectablesService
414414
@Inject
415415
private Injectable i;
416416
}
417+
418+
@Test
419+
public void testResubscribeOnRehandshake() throws Exception
420+
{
421+
AtomicReference<CountDownLatch> messageLatch = new AtomicReference<CountDownLatch>();
422+
ResubscribeOnRehandshakeService s = new ResubscribeOnRehandshakeService(messageLatch);
423+
boolean processed = processor.process(s);
424+
assertTrue(processed);
425+
426+
final CountDownLatch subscribeLatch = new CountDownLatch(1);
427+
bayeuxClient.getChannel(Channel.META_SUBSCRIBE).addListener(new ClientSessionChannel.MessageListener()
428+
{
429+
public void onMessage(ClientSessionChannel channel, Message message)
430+
{
431+
subscribeLatch.countDown();
432+
}
433+
});
434+
435+
bayeuxClient.handshake();
436+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.CONNECTED));
437+
assertTrue(subscribeLatch.await(5, TimeUnit.SECONDS));
438+
439+
messageLatch.set(new CountDownLatch(1));
440+
bayeuxClient.getChannel("/foo").publish(new HashMap());
441+
assertTrue(messageLatch.get().await(5, TimeUnit.SECONDS));
442+
443+
bayeuxClient.disconnect();
444+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.DISCONNECTED));
445+
446+
// Rehandshake
447+
bayeuxClient.handshake();
448+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.CONNECTED));
449+
450+
// Republish, it must have resubscribed
451+
messageLatch.set(new CountDownLatch(1));
452+
bayeuxClient.getChannel("/foo").publish(new HashMap());
453+
assertTrue(messageLatch.get().await(5, TimeUnit.SECONDS));
454+
455+
bayeuxClient.disconnect();
456+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.DISCONNECTED));
457+
458+
boolean deprocessed = processor.deprocess(s);
459+
assertTrue(deprocessed);
460+
461+
// Rehandshake
462+
bayeuxClient.handshake();
463+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.CONNECTED));
464+
465+
// Republish, it must not have resubscribed
466+
messageLatch.set(new CountDownLatch(1));
467+
bayeuxClient.getChannel("/foo").publish(new HashMap());
468+
assertFalse(messageLatch.get().await(1, TimeUnit.SECONDS));
469+
}
470+
471+
@Service
472+
public static class ResubscribeOnRehandshakeService
473+
{
474+
private final AtomicReference<CountDownLatch> messageLatch;
475+
476+
public ResubscribeOnRehandshakeService(AtomicReference<CountDownLatch> messageLatch)
477+
{
478+
this.messageLatch = messageLatch;
479+
}
480+
481+
@Subscription("/foo")
482+
public void foo(Message message)
483+
{
484+
if (message.getData() != null)
485+
messageLatch.get().countDown();
486+
}
487+
}
417488
}

cometd-java/cometd-java-annotations/src/test/resources/log4j.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ log4j.appender.CONSOLE.layout.ConversionPattern=%d %t [%5p][%c{1}] %m%n
88
log4j.appender.CONSOLE.target=System.err
99

1010
# Level tuning
11+
log4j.logger.org.springframework=INFO
1112
log4j.logger.org.eclipse.jetty=INFO
1213
log4j.logger.org.cometd=INFO
1314
log4j.logger.org.springframework=INFO

cometd-java/cometd-java-client/src/test/java/org/cometd/client/JacksonCustomSerializationTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public void testJacksonCustomSerialization() throws Exception
7676
startServer(serverOptions);
7777

7878
String channelName = "/data";
79-
final String content = "random";
79+
final String dataContent = "random";
80+
final long extraContent = 13;
8081
final CountDownLatch latch = new CountDownLatch(1);
8182

8283
LocalSession service = bayeux.newLocalSession("custom_serialization");
@@ -86,25 +87,25 @@ public void testJacksonCustomSerialization() throws Exception
8687
public void onMessage(ClientSessionChannel channel, Message message)
8788
{
8889
Data data = (Data)message.getData();
89-
Assert.assertEquals(content, data.content);
90+
Assert.assertEquals(dataContent, data.content);
9091
Map<String, Object> ext = message.getExt();
9192
Assert.assertNotNull(ext);
9293
Extra extra = (Extra)ext.get("extra");
93-
Assert.assertEquals(content, extra.content);
94+
Assert.assertEquals(extraContent, extra.content);
9495
latch.countDown();
9596
}
9697
});
9798

9899
BayeuxClient client = new BayeuxClient(cometdURL, new LongPollingTransport(clientOptions, httpClient));
99100
client.setDebugEnabled(debugTests());
100-
client.addExtension(new ExtraExtension(content));
101+
client.addExtension(new ExtraExtension(extraContent));
101102

102103
client.handshake();
103104
Assert.assertTrue(client.waitFor(5000, BayeuxClient.State.CONNECTED));
104105
// Wait for the connect to establish
105106
Thread.sleep(1000);
106107

107-
client.getChannel(channelName).publish(new Data(content));
108+
client.getChannel(channelName).publish(new Data(dataContent));
108109
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
109110

110111
disconnectBayeuxClient(client);
@@ -124,7 +125,7 @@ public void testParserGenerator() throws Exception
124125

125126
JSONContext.Client jsonContext = (JSONContext.Client)getClass().getClassLoader().loadClass(jacksonContextClientClassName).newInstance();
126127
Data data1 = new Data("data");
127-
Extra extra1 = new Extra("extra");
128+
Extra extra1 = new Extra(42L);
128129
Map<String, Object> map1 = new HashMap<>();
129130
map1.put("data", data1);
130131
map1.put("extra", extra1);
@@ -138,9 +139,9 @@ public void testParserGenerator() throws Exception
138139

139140
private static class ExtraExtension extends ClientSession.Extension.Adapter
140141
{
141-
private final String content;
142+
private final long content;
142143

143-
public ExtraExtension(String content)
144+
public ExtraExtension(long content)
144145
{
145146
this.content = content;
146147
}
@@ -207,14 +208,14 @@ private static class Extra
207208
{
208209
@com.fasterxml.jackson.annotation.JsonProperty
209210
@org.codehaus.jackson.annotate.JsonProperty
210-
private String content;
211+
private long content;
211212

212213
private Extra()
213214
{
214215
// Needed by Jackson
215216
}
216217

217-
private Extra(String content)
218+
private Extra(long content)
218219
{
219220
this.content = content;
220221
}

0 commit comments

Comments
 (0)