Skip to content

Commit 8396d21

Browse files
committed
Fixes COMETD-447 (Client-annotated services do not restore
subscriptions on re-handshake). Also fixed memory leaks in case of deprocessing, where listeners, subscriptions and sessions were not removed from maps.
1 parent 14983ef commit 8396d21

4 files changed

Lines changed: 143 additions & 29 deletions

File tree

cometd-java/cometd-java-annotations/src/main/java/org/cometd/annotation/ClientAnnotationProcessor.java

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
*/
6161
public class ClientAnnotationProcessor extends AnnotationProcessor
6262
{
63+
private final ConcurrentMap<Object, ClientSessionChannel.MessageListener> handshakeListeners = new ConcurrentHashMap<Object, ClientSessionChannel.MessageListener>();
6364
private final ConcurrentMap<Object, List<ListenerCallback>> listeners = new ConcurrentHashMap<Object, List<ListenerCallback>>();
6465
private final ConcurrentMap<Object, List<SubscriptionCallback>> subscribers = new ConcurrentHashMap<Object, List<SubscriptionCallback>>();
6566
private final ClientSession clientSession;
@@ -83,12 +84,24 @@ public ClientAnnotationProcessor(ClientSession clientSession, Object... injectab
8384
*/
8485
public boolean process(Object bean)
8586
{
87+
processMetaHandshakeListener(bean);
8688
boolean result = processDependencies(bean);
8789
result |= processCallbacks(bean);
8890
result |= processPostConstruct(bean);
8991
return result;
9092
}
9193

94+
private void processMetaHandshakeListener(Object bean)
95+
{
96+
if (bean != null)
97+
{
98+
MetaHandshakeListener listener = new MetaHandshakeListener(bean);
99+
ClientSessionChannel.MessageListener existing = handshakeListeners.putIfAbsent(bean, listener);
100+
if (existing == null)
101+
clientSession.getChannel(Channel.META_HANDSHAKE).addListener(listener);
102+
}
103+
}
104+
92105
/**
93106
* Processes lifecycle methods annotated with {@link PostConstruct}.
94107
* @param bean the annotated service instance
@@ -128,11 +141,19 @@ private boolean processCallbacks(Object bean)
128141
*/
129142
public boolean deprocess(Object bean)
130143
{
144+
deprocessMetaHandshakeListener(bean);
131145
boolean result = deprocessCallbacks(bean);
132146
result |= processPreDestroy(bean);
133147
return result;
134148
}
135149

150+
private void deprocessMetaHandshakeListener(Object bean)
151+
{
152+
ClientSessionChannel.MessageListener listener = handshakeListeners.remove(bean);
153+
if (listener != null)
154+
clientSession.getChannel(Channel.META_HANDSHAKE).removeListener(listener);
155+
}
156+
136157
/**
137158
* Deconfigures callbacks annotated with {@link Listener} and {@link Subscription}.
138159
* @param bean the annotated service instance
@@ -252,7 +273,7 @@ private boolean processListener(Object bean)
252273
private boolean deprocessListener(Object bean)
253274
{
254275
boolean result = false;
255-
List<ListenerCallback> callbacks = listeners.get(bean);
276+
List<ListenerCallback> callbacks = listeners.remove(bean);
256277
if (callbacks != null)
257278
{
258279
for (ListenerCallback callback : callbacks)
@@ -289,8 +310,6 @@ private boolean processSubscription(Object bean)
289310
// We should delay the subscription if the client session did not complete the handshake
290311
if (clientSession.isHandshook())
291312
clientSession.getChannel(channel).subscribe(subscriptionCallback);
292-
else
293-
clientSession.getChannel(Channel.META_HANDSHAKE).addListener(subscriptionCallback);
294313

295314
List<SubscriptionCallback> callbacks = subscribers.get(bean);
296315
if (callbacks == null)
@@ -313,7 +332,7 @@ private boolean processSubscription(Object bean)
313332
private boolean deprocessSubscription(Object bean)
314333
{
315334
boolean result = false;
316-
List<SubscriptionCallback> callbacks = subscribers.get(bean);
335+
List<SubscriptionCallback> callbacks = subscribers.remove(bean);
317336
if (callbacks != null)
318337
{
319338
for (SubscriptionCallback callback : callbacks)
@@ -327,7 +346,7 @@ private boolean deprocessSubscription(Object bean)
327346

328347
private static class ListenerCallback implements ClientSessionChannel.MessageListener
329348
{
330-
private static final Class<?>[] signature = new Class[]{Message.class};
349+
private static final Class<?>[] signature = new Class<?>[]{Message.class};
331350
private final Object target;
332351
private final Method method;
333352
private final String channel;
@@ -368,7 +387,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
368387

369388
private static class SubscriptionCallback implements ClientSessionChannel.MessageListener
370389
{
371-
private static final Class<?>[] signature = new Class[]{Message.class};
390+
private static final Class<?>[] signature = new Class<?>[]{Message.class};
372391
private final ClientSession clientSession;
373392
private final Object target;
374393
private final Method method;
@@ -388,25 +407,6 @@ public SubscriptionCallback(ClientSession clientSession, Object target, Method m
388407
}
389408

390409
public void onMessage(ClientSessionChannel channel, Message message)
391-
{
392-
if (Channel.META_HANDSHAKE.equals(channel.getId()))
393-
{
394-
if (message.isSuccessful())
395-
subscribe();
396-
}
397-
else
398-
{
399-
forward(message);
400-
}
401-
}
402-
403-
private void subscribe()
404-
{
405-
clientSession.getChannel(channel).subscribe(this);
406-
clientSession.getChannel(Channel.META_HANDSHAKE).removeListener(this);
407-
}
408-
409-
private void forward(Message message)
410410
{
411411
try
412412
{
@@ -426,5 +426,39 @@ private void forward(Message message)
426426
throw new RuntimeException(x);
427427
}
428428
}
429+
430+
private void subscribe()
431+
{
432+
clientSession.getChannel(channel).subscribe(this);
433+
}
434+
}
435+
436+
private class MetaHandshakeListener implements ClientSessionChannel.MessageListener
437+
{
438+
private final Object bean;
439+
440+
public MetaHandshakeListener(Object bean)
441+
{
442+
this.bean = bean;
443+
}
444+
445+
public void onMessage(ClientSessionChannel channel, Message message)
446+
{
447+
if (message.isSuccessful())
448+
{
449+
final List<SubscriptionCallback> subscriptions = subscribers.get(bean);
450+
if (subscriptions != null)
451+
{
452+
clientSession.batch(new Runnable()
453+
{
454+
public void run()
455+
{
456+
for (SubscriptionCallback subscription : subscriptions)
457+
subscription.subscribe();
458+
}
459+
});
460+
}
461+
}
462+
}
429463
}
430464
}

cometd-java/cometd-java-annotations/src/main/java/org/cometd/annotation/ServerAnnotationProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,17 @@ public boolean deprocessCallbacks(Object bean)
290290

291291
boolean result = deprocessListener(bean);
292292
result |= deprocessSubscription(bean);
293+
destroyLocalSession(bean);
293294
return result;
294295
}
295296

297+
private void destroyLocalSession(Object bean)
298+
{
299+
LocalSession session = sessions.remove(bean);
300+
if (session != null)
301+
session.disconnect();
302+
}
303+
296304
/**
297305
* Processes lifecycle methods annotated with {@link PreDestroy}.
298306
*
@@ -416,7 +424,7 @@ private boolean processListener(Object bean, LocalSession localSession)
416424
private boolean deprocessListener(Object bean)
417425
{
418426
boolean result = false;
419-
List<ListenerCallback> callbacks = listeners.get(bean);
427+
List<ListenerCallback> callbacks = listeners.remove(bean);
420428
if (callbacks != null)
421429
{
422430
for (ListenerCallback callback : callbacks)
@@ -473,7 +481,7 @@ private boolean processSubscription(Object bean, LocalSession localSession)
473481
private boolean deprocessSubscription(Object bean)
474482
{
475483
boolean result = false;
476-
List<SubscriptionCallback> callbacks = subscribers.get(bean);
484+
List<SubscriptionCallback> callbacks = subscribers.remove(bean);
477485
if (callbacks != null)
478486
{
479487
for (SubscriptionCallback callback : callbacks)
@@ -487,7 +495,7 @@ private boolean deprocessSubscription(Object bean)
487495

488496
private static class ListenerCallback implements ServerChannel.MessageListener
489497
{
490-
private static final Class<?>[] signature = new Class[]{ServerSession.class, ServerMessage.Mutable.class};
498+
private static final Class<?>[] signature = new Class<?>[]{ServerSession.class, ServerMessage.Mutable.class};
491499
private final LocalSession localSession;
492500
private final Object target;
493501
private final Method method;
@@ -534,7 +542,7 @@ public boolean onMessage(ServerSession from, ServerChannel channel, ServerMessag
534542

535543
private static class SubscriptionCallback implements ClientSessionChannel.MessageListener
536544
{
537-
private static final Class<?>[] signature = new Class[]{Message.class};
545+
private static final Class<?>[] signature = new Class<?>[]{Message.class};
538546
private final LocalSession localSession;
539547
private final Object target;
540548
private final Method method;

cometd-java/cometd-java-annotations/src/test/java/org/cometd/annotation/ClientAnnotationProcessorTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,4 +415,75 @@ public static class InjectablesService
415415
@Inject
416416
private Injectable i;
417417
}
418+
419+
@Test
420+
public void testResubscribeOnRehandshake() throws Exception
421+
{
422+
AtomicReference<CountDownLatch> messageLatch = new AtomicReference<CountDownLatch>();
423+
ResubscribeOnRehandshakeService s = new ResubscribeOnRehandshakeService(messageLatch);
424+
boolean processed = processor.process(s);
425+
assertTrue(processed);
426+
427+
final CountDownLatch subscribeLatch = new CountDownLatch(1);
428+
bayeuxClient.getChannel(Channel.META_SUBSCRIBE).addListener(new ClientSessionChannel.MessageListener()
429+
{
430+
public void onMessage(ClientSessionChannel channel, Message message)
431+
{
432+
subscribeLatch.countDown();
433+
}
434+
});
435+
436+
bayeuxClient.handshake();
437+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.CONNECTED));
438+
assertTrue(subscribeLatch.await(5, TimeUnit.SECONDS));
439+
440+
messageLatch.set(new CountDownLatch(1));
441+
bayeuxClient.getChannel("/foo").publish(new HashMap());
442+
assertTrue(messageLatch.get().await(5, TimeUnit.SECONDS));
443+
444+
bayeuxClient.disconnect();
445+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.DISCONNECTED));
446+
447+
// Rehandshake
448+
bayeuxClient.handshake();
449+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.CONNECTED));
450+
451+
// Republish, it must have resubscribed
452+
messageLatch.set(new CountDownLatch(1));
453+
bayeuxClient.getChannel("/foo").publish(new HashMap());
454+
assertTrue(messageLatch.get().await(5, TimeUnit.SECONDS));
455+
456+
bayeuxClient.disconnect();
457+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.DISCONNECTED));
458+
459+
boolean deprocessed = processor.deprocess(s);
460+
assertTrue(deprocessed);
461+
462+
// Rehandshake
463+
bayeuxClient.handshake();
464+
assertTrue(bayeuxClient.waitFor(1000, BayeuxClient.State.CONNECTED));
465+
466+
// Republish, it must not have resubscribed
467+
messageLatch.set(new CountDownLatch(1));
468+
bayeuxClient.getChannel("/foo").publish(new HashMap());
469+
assertFalse(messageLatch.get().await(1, TimeUnit.SECONDS));
470+
}
471+
472+
@Service
473+
public static class ResubscribeOnRehandshakeService
474+
{
475+
private final AtomicReference<CountDownLatch> messageLatch;
476+
477+
public ResubscribeOnRehandshakeService(AtomicReference<CountDownLatch> messageLatch)
478+
{
479+
this.messageLatch = messageLatch;
480+
}
481+
482+
@Subscription("/foo")
483+
public void foo(Message message)
484+
{
485+
if (message.getData() != null)
486+
messageLatch.get().countDown();
487+
}
488+
}
418489
}

cometd-java/cometd-java-annotations/src/test/resources/log4j.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ log4j.appender.CONSOLE.layout.ConversionPattern=%d %t [%5p][%c{1}] %m%n
88
log4j.appender.CONSOLE.target=System.err
99

1010
# Level tuning
11+
log4j.logger.org.springframework=INFO
1112
log4j.logger.org.eclipse.jetty=INFO
1213
log4j.logger.org.cometd=INFO

0 commit comments

Comments
 (0)