Skip to content

Commit d44e25e

Browse files
committed
add unit test to message bus in master branch
1 parent 85e73d1 commit d44e25e

4 files changed

Lines changed: 284 additions & 15 deletions

File tree

framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBus.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public interface MessageBus {
2727

2828
void subscribe(String subject, MessageSubscriber subscriber);
2929
void unsubscribe(String subject, MessageSubscriber subscriber);
30+
void clearAll();
31+
void prune();
3032

3133
void publish(String senderAddress, String subject, PublishScope scope, Object args);
3234
}

framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java

Lines changed: 115 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public MessageBusBase() {
4040
_gate = new Gate();
4141
_pendingActions = new ArrayList<ActionRecord>();
4242

43-
_subscriberRoot = new SubscriptionNode("/", null);
43+
_subscriberRoot = new SubscriptionNode(null, "/", null);
4444
}
4545

4646
@Override
@@ -72,18 +72,62 @@ public void subscribe(String subject, MessageSubscriber subscriber) {
7272
@Override
7373
public void unsubscribe(String subject, MessageSubscriber subscriber) {
7474
if(_gate.enter()) {
75-
SubscriptionNode current = locate(subject, null, false);
76-
if(current != null)
77-
current.removeSubscriber(subscriber);
78-
75+
if(subject != null) {
76+
SubscriptionNode current = locate(subject, null, false);
77+
if(current != null)
78+
current.removeSubscriber(subscriber, false);
79+
} else {
80+
this._subscriberRoot.removeSubscriber(subscriber, true);
81+
}
7982
_gate.leave();
8083
} else {
8184
synchronized(_pendingActions) {
8285
_pendingActions.add(new ActionRecord(ActionType.Unsubscribe, subject, subscriber));
8386
}
8487
}
8588
}
86-
89+
90+
@Override
91+
public void clearAll() {
92+
if(_gate.enter()) {
93+
_subscriberRoot.clearAll();
94+
doPrune();
95+
_gate.leave();
96+
} else {
97+
synchronized(_pendingActions) {
98+
_pendingActions.add(new ActionRecord(ActionType.ClearAll, null, null));
99+
}
100+
}
101+
}
102+
103+
@Override
104+
public void prune() {
105+
if(_gate.enter()) {
106+
doPrune();
107+
_gate.leave();
108+
} else {
109+
synchronized(_pendingActions) {
110+
_pendingActions.add(new ActionRecord(ActionType.Prune, null, null));
111+
}
112+
}
113+
}
114+
115+
private void doPrune() {
116+
List<SubscriptionNode> trimNodes = new ArrayList<SubscriptionNode>();
117+
_subscriberRoot.prune(trimNodes);
118+
119+
while(trimNodes.size() > 0) {
120+
SubscriptionNode node = trimNodes.remove(0);
121+
SubscriptionNode parent = node.getParent();
122+
if(parent != null) {
123+
parent.removeChild(node.getNodeKey());
124+
if(parent.isTrimmable()) {
125+
trimNodes.add(parent);
126+
}
127+
}
128+
}
129+
}
130+
87131
@Override
88132
public void publish(String senderAddress, String subject, PublishScope scope,
89133
Object args) {
@@ -119,12 +163,22 @@ private void onGateOpen() {
119163
break;
120164

121165
case Unsubscribe :
122-
{
166+
if(record.getSubject() != null) {
123167
SubscriptionNode current = locate(record.getSubject(), null, false);
124168
if(current != null)
125-
current.removeSubscriber(record.getSubscriber());
169+
current.removeSubscriber(record.getSubscriber(), false);
170+
} else {
171+
this._subscriberRoot.removeSubscriber(record.getSubscriber(), true);
126172
}
127173
break;
174+
175+
case ClearAll :
176+
_subscriberRoot.clearAll();
177+
break;
178+
179+
case Prune :
180+
doPrune();
181+
break;
128182

129183
default :
130184
assert(false);
@@ -136,11 +190,13 @@ private void onGateOpen() {
136190
}
137191
}
138192

139-
140193
private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop,
141194
boolean createPath) {
142195

143196
assert(subject != null);
197+
// "/" is special name for root node
198+
if(subject.equals("/"))
199+
return _subscriberRoot;
144200

145201
String[] subjectPathTokens = subject.split("\\.");
146202
return locate(subjectPathTokens, _subscriberRoot, chainFromTop, createPath);
@@ -159,7 +215,7 @@ private static SubscriptionNode locate(String[] subjectPathTokens,
159215
SubscriptionNode next = current.getChild(subjectPathTokens[0]);
160216
if(next == null) {
161217
if(createPath) {
162-
next = new SubscriptionNode(subjectPathTokens[0], null);
218+
next = new SubscriptionNode(current, subjectPathTokens[0], null);
163219
current.addChild(subjectPathTokens[0], next);
164220
} else {
165221
return null;
@@ -180,7 +236,9 @@ private static SubscriptionNode locate(String[] subjectPathTokens,
180236
//
181237
private static enum ActionType {
182238
Subscribe,
183-
Unsubscribe
239+
Unsubscribe,
240+
ClearAll,
241+
Prune
184242
}
185243

186244
private static class ActionRecord {
@@ -262,13 +320,14 @@ public void leave() {
262320
}
263321

264322
private static class SubscriptionNode {
265-
@SuppressWarnings("unused")
266323
private String _nodeKey;
267324
private List<MessageSubscriber> _subscribers;
268325
private Map<String, SubscriptionNode> _children;
326+
private SubscriptionNode _parent;
269327

270-
public SubscriptionNode(String nodeKey, MessageSubscriber subscriber) {
328+
public SubscriptionNode(SubscriptionNode parent, String nodeKey, MessageSubscriber subscriber) {
271329
assert(nodeKey != null);
330+
_parent = parent;
272331
_nodeKey = nodeKey;
273332
_subscribers = new ArrayList<MessageSubscriber>();
274333

@@ -278,16 +337,30 @@ public SubscriptionNode(String nodeKey, MessageSubscriber subscriber) {
278337
_children = new HashMap<String, SubscriptionNode>();
279338
}
280339

340+
public SubscriptionNode getParent() {
341+
return _parent;
342+
}
343+
344+
public String getNodeKey() {
345+
return _nodeKey;
346+
}
347+
281348
@SuppressWarnings("unused")
282349
public List<MessageSubscriber> getSubscriber() {
283350
return _subscribers;
284351
}
285352

286353
public void addSubscriber(MessageSubscriber subscriber) {
287-
_subscribers.add(subscriber);
354+
if(!_subscribers.contains(subscriber))
355+
_subscribers.add(subscriber);
288356
}
289357

290-
public void removeSubscriber(MessageSubscriber subscriber) {
358+
public void removeSubscriber(MessageSubscriber subscriber, boolean recursively) {
359+
if(recursively) {
360+
for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) {
361+
entry.getValue().removeSubscriber(subscriber, true);
362+
}
363+
}
291364
_subscribers.remove(subscriber);
292365
}
293366

@@ -299,10 +372,37 @@ public void addChild(String key, SubscriptionNode childNode) {
299372
_children.put(key, childNode);
300373
}
301374

375+
public void removeChild(String key) {
376+
_children.remove(key);
377+
}
378+
379+
public void clearAll() {
380+
// depth-first
381+
for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) {
382+
entry.getValue().clearAll();
383+
}
384+
_subscribers.clear();
385+
}
386+
387+
public void prune(List<SubscriptionNode> trimNodes) {
388+
assert(trimNodes != null);
389+
390+
for(Map.Entry<String, SubscriptionNode> entry : _children.entrySet()) {
391+
entry.getValue().prune(trimNodes);
392+
}
393+
394+
if(isTrimmable())
395+
trimNodes.add(this);
396+
}
397+
302398
public void notifySubscribers(String senderAddress, String subject, Object args) {
303399
for(MessageSubscriber subscriber : _subscribers) {
304400
subscriber.onPublishMessage(senderAddress, subject, args);
305401
}
306402
}
403+
404+
public boolean isTrimmable() {
405+
return _children.size() == 0 && _subscribers.size() == 0;
406+
}
307407
}
308408
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.cloudstack.messagebus;
20+
21+
import javax.inject.Inject;
22+
23+
import junit.framework.TestCase;
24+
25+
import org.apache.cloudstack.framework.messagebus.MessageBus;
26+
import org.apache.cloudstack.framework.messagebus.MessageSubscriber;
27+
import org.apache.cloudstack.framework.messagebus.PublishScope;
28+
import org.junit.Assert;
29+
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
import org.mockito.Mockito;
32+
import org.springframework.test.context.ContextConfiguration;
33+
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
34+
35+
@RunWith(SpringJUnit4ClassRunner.class)
36+
@ContextConfiguration(locations="classpath:/MessageBusTestContext.xml")
37+
public class TestMessageBus extends TestCase {
38+
39+
@Inject MessageBus _messageBus;
40+
41+
@Test
42+
public void testExactSubjectMatch() {
43+
_messageBus.subscribe("Host", new MessageSubscriber() {
44+
45+
@Override
46+
public void onPublishMessage(String senderAddress, String subject, Object args) {
47+
Assert.assertEquals(subject, "Host");
48+
}
49+
});
50+
51+
_messageBus.publish(null, "Host", PublishScope.LOCAL, null);
52+
_messageBus.publish(null, "VM", PublishScope.LOCAL, null);
53+
_messageBus.clearAll();
54+
}
55+
56+
@Test
57+
public void testRootSubjectMatch() {
58+
_messageBus.subscribe("/", new MessageSubscriber() {
59+
60+
@Override
61+
public void onPublishMessage(String senderAddress, String subject, Object args) {
62+
Assert.assertTrue(subject.equals("Host") || subject.equals("VM"));
63+
}
64+
});
65+
66+
_messageBus.publish(null, "Host", PublishScope.LOCAL, null);
67+
_messageBus.publish(null, "VM", PublishScope.LOCAL, null);
68+
_messageBus.clearAll();
69+
}
70+
71+
@Test
72+
public void testMiscMatch() {
73+
MessageSubscriber subscriberAtParentLevel = new MessageSubscriber() {
74+
@Override
75+
public void onPublishMessage(String senderAddress, String subject, Object args) {
76+
Assert.assertTrue(subject.startsWith(("Host")) || subject.startsWith("VM"));
77+
}
78+
};
79+
80+
MessageSubscriber subscriberAtChildLevel = new MessageSubscriber() {
81+
@Override
82+
public void onPublishMessage(String senderAddress, String subject, Object args) {
83+
Assert.assertTrue(subject.equals("Host.123"));
84+
}
85+
};
86+
87+
subscriberAtParentLevel = Mockito.spy(subscriberAtParentLevel);
88+
subscriberAtChildLevel = Mockito.spy(subscriberAtChildLevel);
89+
90+
_messageBus.subscribe("Host", subscriberAtParentLevel);
91+
_messageBus.subscribe("VM", subscriberAtParentLevel);
92+
_messageBus.subscribe("Host.123", subscriberAtChildLevel);
93+
94+
_messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
95+
_messageBus.publish(null, "Host.321", PublishScope.LOCAL, null);
96+
_messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
97+
98+
Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.123", null);
99+
Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "Host.321", null);
100+
Mockito.verify(subscriberAtParentLevel).onPublishMessage(null, "VM.123", null);
101+
Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null);
102+
103+
Mockito.reset(subscriberAtParentLevel);
104+
Mockito.reset(subscriberAtChildLevel);
105+
106+
_messageBus.unsubscribe(null, subscriberAtParentLevel);
107+
_messageBus.publish(null, "Host.123", PublishScope.LOCAL, null);
108+
_messageBus.publish(null, "VM.123", PublishScope.LOCAL, null);
109+
110+
Mockito.verify(subscriberAtChildLevel).onPublishMessage(null, "Host.123", null);
111+
Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "Host.123", null);
112+
Mockito.verify(subscriberAtParentLevel, Mockito.times(0)).onPublishMessage(null, "VM.123", null);
113+
114+
_messageBus.clearAll();
115+
}
116+
}

0 commit comments

Comments
 (0)