Skip to content

Commit bfa6645

Browse files
committed
Make changes for timing related test failures
1 parent 2d78a06 commit bfa6645

2 files changed

Lines changed: 49 additions & 42 deletions

File tree

spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,10 @@ public BrokerAvailabilityEvent(boolean brokerAvailable, Object source) {
4545
public boolean isBrokerAvailable() {
4646
return this.brokerAvailable;
4747
}
48+
49+
@Override
50+
public String toString() {
51+
return "BrokerAvailabilityEvent=" + this.brokerAvailable;
52+
}
53+
4854
}

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -68,33 +68,32 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
6868

6969
private int port;
7070

71+
7172
@Before
7273
public void setUp() throws Exception {
7374

7475
this.port = SocketUtils.findAvailableTcpPort(61613);
7576

76-
createAndStartBroker();
77-
7877
this.responseChannel = new ExecutorSubscribableChannel();
7978
this.responseHandler = new ExpectationMatchingMessageHandler();
8079
this.responseChannel.subscribe(this.responseHandler);
81-
8280
this.eventPublisher = new ExpectationMatchingEventPublisher();
8381

82+
startActiveMqBroker();
8483
createAndStartRelay();
8584
}
8685

87-
private void createAndStartBroker() throws Exception {
86+
private void startActiveMqBroker() throws Exception {
8887
this.activeMQBroker = new BrokerService();
89-
this.activeMQBroker.addConnector("stomp://localhost:" + port);
88+
this.activeMQBroker.addConnector("stomp://localhost:" + this.port);
9089
this.activeMQBroker.setStartAsync(false);
9190
this.activeMQBroker.setDeleteAllMessagesOnStartup(true);
9291
this.activeMQBroker.start();
9392
}
9493

9594
private void createAndStartRelay() throws InterruptedException {
9695
this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/"));
97-
this.relay.setRelayPort(port);
96+
this.relay.setRelayPort(this.port);
9897
this.relay.setApplicationEventPublisher(this.eventPublisher);
9998
this.relay.setSystemHeartbeatReceiveInterval(0);
10099
this.relay.setSystemHeartbeatSendInterval(0);
@@ -110,10 +109,28 @@ public void tearDown() throws Exception {
110109
this.relay.stop();
111110
}
112111
finally {
113-
stopBrokerAndAwait();
112+
stopActiveMqBrokerAndAwait();
114113
}
115114
}
116115

116+
private void stopActiveMqBrokerAndAwait() throws Exception {
117+
logger.debug("Stopping ActiveMQ broker and will await shutdown");
118+
if (!this.activeMQBroker.isStarted()) {
119+
logger.debug("Broker not running");
120+
return;
121+
}
122+
final CountDownLatch latch = new CountDownLatch(1);
123+
this.activeMQBroker.addShutdownHook(new Runnable() {
124+
public void run() {
125+
latch.countDown();
126+
}
127+
});
128+
this.activeMQBroker.stop();
129+
assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS));
130+
logger.debug("Broker stopped");
131+
}
132+
133+
117134
// When TCP client is behind interface and configurable:
118135
// test "host" header (virtualHost property)
119136
// test "/user/.." destination is excluded
@@ -122,23 +139,22 @@ public void tearDown() throws Exception {
122139
public void publishSubscribe() throws Exception {
123140

124141
String sess1 = "sess1";
125-
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
126-
this.relay.handleMessage(conn1.message);
127-
this.responseHandler.expect(conn1);
128-
129142
String sess2 = "sess2";
143+
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
130144
MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build();
131-
this.relay.handleMessage(conn2.message);
132-
this.responseHandler.expect(conn2);
145+
this.responseHandler.expect(conn1, conn2);
133146

147+
this.relay.handleMessage(conn1.message);
148+
this.relay.handleMessage(conn2.message);
134149
this.responseHandler.awaitAndAssert();
135150

136151
String subs1 = "subs1";
137152
String destination = "/topic/test";
138153

139154
MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
140-
this.relay.handleMessage(subscribe.message);
141155
this.responseHandler.expect(subscribe);
156+
157+
this.relay.handleMessage(subscribe.message);
142158
this.responseHandler.awaitAndAssert();
143159

144160
MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build();
@@ -151,7 +167,7 @@ public void publishSubscribe() throws Exception {
151167
@Test
152168
public void brokerUnvailableErrorFrameOnConnect() throws Exception {
153169

154-
stopBrokerAndAwait();
170+
stopActiveMqBrokerAndAwait();
155171

156172
MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build();
157173
this.responseHandler.expect(connect);
@@ -162,7 +178,7 @@ public void brokerUnvailableErrorFrameOnConnect() throws Exception {
162178

163179
@Test(expected=MessageDeliveryException.class)
164180
public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception {
165-
stopBrokerAndAwait();
181+
stopActiveMqBrokerAndAwait();
166182
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
167183
this.relay.handleMessage(MessageBuilder.withPayload("test".getBytes()).setHeaders(headers).build());
168184
}
@@ -175,20 +191,19 @@ public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {
175191
this.responseHandler.expect(connect);
176192

177193
this.relay.handleMessage(connect.message);
178-
179194
this.responseHandler.awaitAndAssert();
180195

181196
this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build());
182197

183-
stopBrokerAndAwait();
198+
stopActiveMqBrokerAndAwait();
184199

185200
this.responseHandler.awaitAndAssert();
186201
}
187202

188203
@Test
189204
public void brokerAvailabilityEventWhenStopped() throws Exception {
190205
this.eventPublisher.expectAvailabilityStatusChanges(false);
191-
stopBrokerAndAwait();
206+
stopActiveMqBrokerAndAwait();
192207
this.eventPublisher.awaitAndAssert();
193208
}
194209

@@ -198,6 +213,7 @@ public void relayReconnectsIfBrokerComesBackUp() throws Exception {
198213
String sess1 = "sess1";
199214
MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
200215
this.responseHandler.expect(conn1);
216+
201217
this.relay.handleMessage(conn1.message);
202218
this.responseHandler.awaitAndAssert();
203219

@@ -212,15 +228,15 @@ public void relayReconnectsIfBrokerComesBackUp() throws Exception {
212228

213229
this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build());
214230

215-
stopBrokerAndAwait();
231+
stopActiveMqBrokerAndAwait();
216232

217233
this.responseHandler.awaitAndAssert();
218234

219235
this.eventPublisher.expectAvailabilityStatusChanges(false);
220236
this.eventPublisher.awaitAndAssert();
221237

222238
this.eventPublisher.expectAvailabilityStatusChanges(true);
223-
createAndStartBroker();
239+
startActiveMqBroker();
224240
this.eventPublisher.awaitAndAssert();
225241

226242
// TODO The event publisher assertions show that the broker's back up and the system relay session
@@ -231,14 +247,15 @@ public void relayReconnectsIfBrokerComesBackUp() throws Exception {
231247

232248
@Test
233249
public void disconnectClosesRelaySessionCleanly() throws Exception {
250+
234251
MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
235252
this.responseHandler.expect(connect);
253+
236254
this.relay.handleMessage(connect.message);
237255
this.responseHandler.awaitAndAssert();
238256

239257
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
240258
headers.setSessionId("sess1");
241-
242259
this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build());
243260

244261
Thread.sleep(2000);
@@ -248,24 +265,6 @@ public void disconnectClosesRelaySessionCleanly() throws Exception {
248265
}
249266

250267

251-
private void stopBrokerAndAwait() throws Exception {
252-
logger.debug("Stopping ActiveMQ broker and will await shutdown");
253-
if (!this.activeMQBroker.isStarted()) {
254-
logger.debug("Broker not running");
255-
return;
256-
}
257-
final CountDownLatch latch = new CountDownLatch(1);
258-
this.activeMQBroker.addShutdownHook(new Runnable() {
259-
public void run() {
260-
latch.countDown();
261-
}
262-
});
263-
this.activeMQBroker.stop();
264-
assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS));
265-
logger.debug("Broker stopped");
266-
}
267-
268-
269268
/**
270269
* Handles messages by matching them to expectations including a latch to wait for
271270
* the completion of expected messages.
@@ -408,6 +407,7 @@ public static MessageExchangeBuilder connect(String sessionId) {
408407
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
409408
headers.setSessionId(sessionId);
410409
headers.setAcceptVersion("1.1,1.2");
410+
headers.setHeartbeat(0, 0);
411411
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
412412

413413
MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
@@ -595,8 +595,8 @@ public void expectAvailabilityStatusChanges(Boolean... expected) {
595595

596596
public void awaitAndAssert() throws InterruptedException {
597597
synchronized(this.monitor) {
598-
long endTime = System.currentTimeMillis() + 6000;
599-
while (this.expected.size() != this.actual.size() && System.currentTimeMillis() < endTime) {
598+
long endTime = System.currentTimeMillis() + 10000;
599+
while ((this.expected.size() != this.actual.size()) && (System.currentTimeMillis() < endTime)) {
600600
this.monitor.wait(500);
601601
}
602602
assertEquals(this.expected, this.actual);
@@ -605,6 +605,7 @@ public void awaitAndAssert() throws InterruptedException {
605605

606606
@Override
607607
public void publishEvent(ApplicationEvent event) {
608+
logger.debug("Processing ApplicationEvent " + event);
608609
if (event instanceof BrokerAvailabilityEvent) {
609610
synchronized(this.monitor) {
610611
this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable());

0 commit comments

Comments
 (0)