@@ -40,7 +40,7 @@ public MessageBusBase() {
4040 _gate = new Gate ();
4141 _pendingActions = new ArrayList <ActionRecord >();
4242
43- _subscriberRoot = new SubscriptionNode ("/" , null );
43+ _subscriberRoot = new SubscriptionNode (null , "/" , null );
4444 }
4545
4646 @ Override
@@ -72,18 +72,62 @@ public void subscribe(String subject, MessageSubscriber subscriber) {
7272 @ Override
7373 public void unsubscribe (String subject , MessageSubscriber subscriber ) {
7474 if (_gate .enter ()) {
75- SubscriptionNode current = locate (subject , null , false );
76- if (current != null )
77- current .removeSubscriber (subscriber );
78-
75+ if (subject != null ) {
76+ SubscriptionNode current = locate (subject , null , false );
77+ if (current != null )
78+ current .removeSubscriber (subscriber , false );
79+ } else {
80+ this ._subscriberRoot .removeSubscriber (subscriber , true );
81+ }
7982 _gate .leave ();
8083 } else {
8184 synchronized (_pendingActions ) {
8285 _pendingActions .add (new ActionRecord (ActionType .Unsubscribe , subject , subscriber ));
8386 }
8487 }
8588 }
86-
89+
90+ @ Override
91+ public void clearAll () {
92+ if (_gate .enter ()) {
93+ _subscriberRoot .clearAll ();
94+ doPrune ();
95+ _gate .leave ();
96+ } else {
97+ synchronized (_pendingActions ) {
98+ _pendingActions .add (new ActionRecord (ActionType .ClearAll , null , null ));
99+ }
100+ }
101+ }
102+
103+ @ Override
104+ public void prune () {
105+ if (_gate .enter ()) {
106+ doPrune ();
107+ _gate .leave ();
108+ } else {
109+ synchronized (_pendingActions ) {
110+ _pendingActions .add (new ActionRecord (ActionType .Prune , null , null ));
111+ }
112+ }
113+ }
114+
115+ private void doPrune () {
116+ List <SubscriptionNode > trimNodes = new ArrayList <SubscriptionNode >();
117+ _subscriberRoot .prune (trimNodes );
118+
119+ while (trimNodes .size () > 0 ) {
120+ SubscriptionNode node = trimNodes .remove (0 );
121+ SubscriptionNode parent = node .getParent ();
122+ if (parent != null ) {
123+ parent .removeChild (node .getNodeKey ());
124+ if (parent .isTrimmable ()) {
125+ trimNodes .add (parent );
126+ }
127+ }
128+ }
129+ }
130+
87131 @ Override
88132 public void publish (String senderAddress , String subject , PublishScope scope ,
89133 Object args ) {
@@ -119,12 +163,22 @@ private void onGateOpen() {
119163 break ;
120164
121165 case Unsubscribe :
122- {
166+ if ( record . getSubject () != null ) {
123167 SubscriptionNode current = locate (record .getSubject (), null , false );
124168 if (current != null )
125- current .removeSubscriber (record .getSubscriber ());
169+ current .removeSubscriber (record .getSubscriber (), false );
170+ } else {
171+ this ._subscriberRoot .removeSubscriber (record .getSubscriber (), true );
126172 }
127173 break ;
174+
175+ case ClearAll :
176+ _subscriberRoot .clearAll ();
177+ break ;
178+
179+ case Prune :
180+ doPrune ();
181+ break ;
128182
129183 default :
130184 assert (false );
@@ -136,11 +190,13 @@ private void onGateOpen() {
136190 }
137191 }
138192
139-
140193 private SubscriptionNode locate (String subject , List <SubscriptionNode > chainFromTop ,
141194 boolean createPath ) {
142195
143196 assert (subject != null );
197+ // "/" is special name for root node
198+ if (subject .equals ("/" ))
199+ return _subscriberRoot ;
144200
145201 String [] subjectPathTokens = subject .split ("\\ ." );
146202 return locate (subjectPathTokens , _subscriberRoot , chainFromTop , createPath );
@@ -159,7 +215,7 @@ private static SubscriptionNode locate(String[] subjectPathTokens,
159215 SubscriptionNode next = current .getChild (subjectPathTokens [0 ]);
160216 if (next == null ) {
161217 if (createPath ) {
162- next = new SubscriptionNode (subjectPathTokens [0 ], null );
218+ next = new SubscriptionNode (current , subjectPathTokens [0 ], null );
163219 current .addChild (subjectPathTokens [0 ], next );
164220 } else {
165221 return null ;
@@ -180,7 +236,9 @@ private static SubscriptionNode locate(String[] subjectPathTokens,
180236 //
181237 private static enum ActionType {
182238 Subscribe ,
183- Unsubscribe
239+ Unsubscribe ,
240+ ClearAll ,
241+ Prune
184242 }
185243
186244 private static class ActionRecord {
@@ -262,13 +320,14 @@ public void leave() {
262320 }
263321
264322 private static class SubscriptionNode {
265- @ SuppressWarnings ("unused" )
266323 private String _nodeKey ;
267324 private List <MessageSubscriber > _subscribers ;
268325 private Map <String , SubscriptionNode > _children ;
326+ private SubscriptionNode _parent ;
269327
270- public SubscriptionNode (String nodeKey , MessageSubscriber subscriber ) {
328+ public SubscriptionNode (SubscriptionNode parent , String nodeKey , MessageSubscriber subscriber ) {
271329 assert (nodeKey != null );
330+ _parent = parent ;
272331 _nodeKey = nodeKey ;
273332 _subscribers = new ArrayList <MessageSubscriber >();
274333
@@ -278,16 +337,30 @@ public SubscriptionNode(String nodeKey, MessageSubscriber subscriber) {
278337 _children = new HashMap <String , SubscriptionNode >();
279338 }
280339
340+ public SubscriptionNode getParent () {
341+ return _parent ;
342+ }
343+
344+ public String getNodeKey () {
345+ return _nodeKey ;
346+ }
347+
281348 @ SuppressWarnings ("unused" )
282349 public List <MessageSubscriber > getSubscriber () {
283350 return _subscribers ;
284351 }
285352
286353 public void addSubscriber (MessageSubscriber subscriber ) {
287- _subscribers .add (subscriber );
354+ if (!_subscribers .contains (subscriber ))
355+ _subscribers .add (subscriber );
288356 }
289357
290- public void removeSubscriber (MessageSubscriber subscriber ) {
358+ public void removeSubscriber (MessageSubscriber subscriber , boolean recursively ) {
359+ if (recursively ) {
360+ for (Map .Entry <String , SubscriptionNode > entry : _children .entrySet ()) {
361+ entry .getValue ().removeSubscriber (subscriber , true );
362+ }
363+ }
291364 _subscribers .remove (subscriber );
292365 }
293366
@@ -299,10 +372,37 @@ public void addChild(String key, SubscriptionNode childNode) {
299372 _children .put (key , childNode );
300373 }
301374
375+ public void removeChild (String key ) {
376+ _children .remove (key );
377+ }
378+
379+ public void clearAll () {
380+ // depth-first
381+ for (Map .Entry <String , SubscriptionNode > entry : _children .entrySet ()) {
382+ entry .getValue ().clearAll ();
383+ }
384+ _subscribers .clear ();
385+ }
386+
387+ public void prune (List <SubscriptionNode > trimNodes ) {
388+ assert (trimNodes != null );
389+
390+ for (Map .Entry <String , SubscriptionNode > entry : _children .entrySet ()) {
391+ entry .getValue ().prune (trimNodes );
392+ }
393+
394+ if (isTrimmable ())
395+ trimNodes .add (this );
396+ }
397+
302398 public void notifySubscribers (String senderAddress , String subject , Object args ) {
303399 for (MessageSubscriber subscriber : _subscribers ) {
304400 subscriber .onPublishMessage (senderAddress , subject , args );
305401 }
306402 }
403+
404+ public boolean isTrimmable () {
405+ return _children .size () == 0 && _subscribers .size () == 0 ;
406+ }
307407 }
308408}
0 commit comments