6060 */
6161public class ClientAnnotationProcessor extends AnnotationProcessor
6262{
63+ private final ConcurrentMap <Object , ClientSessionChannel .MessageListener > handshakeListeners = new ConcurrentHashMap <Object , ClientSessionChannel .MessageListener >();
6364 private final ConcurrentMap <Object , List <ListenerCallback >> listeners = new ConcurrentHashMap <Object , List <ListenerCallback >>();
6465 private final ConcurrentMap <Object , List <SubscriptionCallback >> subscribers = new ConcurrentHashMap <Object , List <SubscriptionCallback >>();
6566 private final ClientSession clientSession ;
@@ -83,12 +84,24 @@ public ClientAnnotationProcessor(ClientSession clientSession, Object... injectab
8384 */
8485 public boolean process (Object bean )
8586 {
87+ processMetaHandshakeListener (bean );
8688 boolean result = processDependencies (bean );
8789 result |= processCallbacks (bean );
8890 result |= processPostConstruct (bean );
8991 return result ;
9092 }
9193
94+ private void processMetaHandshakeListener (Object bean )
95+ {
96+ if (bean != null )
97+ {
98+ MetaHandshakeListener listener = new MetaHandshakeListener (bean );
99+ ClientSessionChannel .MessageListener existing = handshakeListeners .putIfAbsent (bean , listener );
100+ if (existing == null )
101+ clientSession .getChannel (Channel .META_HANDSHAKE ).addListener (listener );
102+ }
103+ }
104+
92105 /**
93106 * Processes lifecycle methods annotated with {@link PostConstruct}.
94107 * @param bean the annotated service instance
@@ -128,11 +141,19 @@ private boolean processCallbacks(Object bean)
128141 */
129142 public boolean deprocess (Object bean )
130143 {
144+ deprocessMetaHandshakeListener (bean );
131145 boolean result = deprocessCallbacks (bean );
132146 result |= processPreDestroy (bean );
133147 return result ;
134148 }
135149
150+ private void deprocessMetaHandshakeListener (Object bean )
151+ {
152+ ClientSessionChannel .MessageListener listener = handshakeListeners .remove (bean );
153+ if (listener != null )
154+ clientSession .getChannel (Channel .META_HANDSHAKE ).removeListener (listener );
155+ }
156+
136157 /**
137158 * Deconfigures callbacks annotated with {@link Listener} and {@link Subscription}.
138159 * @param bean the annotated service instance
@@ -252,7 +273,7 @@ private boolean processListener(Object bean)
252273 private boolean deprocessListener (Object bean )
253274 {
254275 boolean result = false ;
255- List <ListenerCallback > callbacks = listeners .get (bean );
276+ List <ListenerCallback > callbacks = listeners .remove (bean );
256277 if (callbacks != null )
257278 {
258279 for (ListenerCallback callback : callbacks )
@@ -289,8 +310,6 @@ private boolean processSubscription(Object bean)
289310 // We should delay the subscription if the client session did not complete the handshake
290311 if (clientSession .isHandshook ())
291312 clientSession .getChannel (channel ).subscribe (subscriptionCallback );
292- else
293- clientSession .getChannel (Channel .META_HANDSHAKE ).addListener (subscriptionCallback );
294313
295314 List <SubscriptionCallback > callbacks = subscribers .get (bean );
296315 if (callbacks == null )
@@ -313,7 +332,7 @@ private boolean processSubscription(Object bean)
313332 private boolean deprocessSubscription (Object bean )
314333 {
315334 boolean result = false ;
316- List <SubscriptionCallback > callbacks = subscribers .get (bean );
335+ List <SubscriptionCallback > callbacks = subscribers .remove (bean );
317336 if (callbacks != null )
318337 {
319338 for (SubscriptionCallback callback : callbacks )
@@ -327,7 +346,7 @@ private boolean deprocessSubscription(Object bean)
327346
328347 private static class ListenerCallback implements ClientSessionChannel .MessageListener
329348 {
330- private static final Class <?>[] signature = new Class []{Message .class };
349+ private static final Class <?>[] signature = new Class <?> []{Message .class };
331350 private final Object target ;
332351 private final Method method ;
333352 private final String channel ;
@@ -368,7 +387,7 @@ public void onMessage(ClientSessionChannel channel, Message message)
368387
369388 private static class SubscriptionCallback implements ClientSessionChannel .MessageListener
370389 {
371- private static final Class <?>[] signature = new Class []{Message .class };
390+ private static final Class <?>[] signature = new Class <?> []{Message .class };
372391 private final ClientSession clientSession ;
373392 private final Object target ;
374393 private final Method method ;
@@ -388,25 +407,6 @@ public SubscriptionCallback(ClientSession clientSession, Object target, Method m
388407 }
389408
390409 public void onMessage (ClientSessionChannel channel , Message message )
391- {
392- if (Channel .META_HANDSHAKE .equals (channel .getId ()))
393- {
394- if (message .isSuccessful ())
395- subscribe ();
396- }
397- else
398- {
399- forward (message );
400- }
401- }
402-
403- private void subscribe ()
404- {
405- clientSession .getChannel (channel ).subscribe (this );
406- clientSession .getChannel (Channel .META_HANDSHAKE ).removeListener (this );
407- }
408-
409- private void forward (Message message )
410410 {
411411 try
412412 {
@@ -426,5 +426,39 @@ private void forward(Message message)
426426 throw new RuntimeException (x );
427427 }
428428 }
429+
430+ private void subscribe ()
431+ {
432+ clientSession .getChannel (channel ).subscribe (this );
433+ }
434+ }
435+
436+ private class MetaHandshakeListener implements ClientSessionChannel .MessageListener
437+ {
438+ private final Object bean ;
439+
440+ public MetaHandshakeListener (Object bean )
441+ {
442+ this .bean = bean ;
443+ }
444+
445+ public void onMessage (ClientSessionChannel channel , Message message )
446+ {
447+ if (message .isSuccessful ())
448+ {
449+ final List <SubscriptionCallback > subscriptions = subscribers .get (bean );
450+ if (subscriptions != null )
451+ {
452+ clientSession .batch (new Runnable ()
453+ {
454+ public void run ()
455+ {
456+ for (SubscriptionCallback subscription : subscriptions )
457+ subscription .subscribe ();
458+ }
459+ });
460+ }
461+ }
462+ }
429463 }
430464}
0 commit comments