@@ -68,33 +68,32 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
6868
6969 private int port ;
7070
71+
7172 @ Before
7273 public void setUp () throws Exception {
7374
7475 this .port = SocketUtils .findAvailableTcpPort (61613 );
7576
76- createAndStartBroker ();
77-
7877 this .responseChannel = new ExecutorSubscribableChannel ();
7978 this .responseHandler = new ExpectationMatchingMessageHandler ();
8079 this .responseChannel .subscribe (this .responseHandler );
81-
8280 this .eventPublisher = new ExpectationMatchingEventPublisher ();
8381
82+ startActiveMqBroker ();
8483 createAndStartRelay ();
8584 }
8685
87- private void createAndStartBroker () throws Exception {
86+ private void startActiveMqBroker () throws Exception {
8887 this .activeMQBroker = new BrokerService ();
89- this .activeMQBroker .addConnector ("stomp://localhost:" + port );
88+ this .activeMQBroker .addConnector ("stomp://localhost:" + this . port );
9089 this .activeMQBroker .setStartAsync (false );
9190 this .activeMQBroker .setDeleteAllMessagesOnStartup (true );
9291 this .activeMQBroker .start ();
9392 }
9493
9594 private void createAndStartRelay () throws InterruptedException {
9695 this .relay = new StompBrokerRelayMessageHandler (this .responseChannel , Arrays .asList ("/queue/" , "/topic/" ));
97- this .relay .setRelayPort (port );
96+ this .relay .setRelayPort (this . port );
9897 this .relay .setApplicationEventPublisher (this .eventPublisher );
9998 this .relay .setSystemHeartbeatReceiveInterval (0 );
10099 this .relay .setSystemHeartbeatSendInterval (0 );
@@ -110,10 +109,28 @@ public void tearDown() throws Exception {
110109 this .relay .stop ();
111110 }
112111 finally {
113- stopBrokerAndAwait ();
112+ stopActiveMqBrokerAndAwait ();
114113 }
115114 }
116115
116+ private void stopActiveMqBrokerAndAwait () throws Exception {
117+ logger .debug ("Stopping ActiveMQ broker and will await shutdown" );
118+ if (!this .activeMQBroker .isStarted ()) {
119+ logger .debug ("Broker not running" );
120+ return ;
121+ }
122+ final CountDownLatch latch = new CountDownLatch (1 );
123+ this .activeMQBroker .addShutdownHook (new Runnable () {
124+ public void run () {
125+ latch .countDown ();
126+ }
127+ });
128+ this .activeMQBroker .stop ();
129+ assertTrue ("Broker did not stop" , latch .await (5 , TimeUnit .SECONDS ));
130+ logger .debug ("Broker stopped" );
131+ }
132+
133+
117134 // When TCP client is behind interface and configurable:
118135 // test "host" header (virtualHost property)
119136 // test "/user/.." destination is excluded
@@ -122,23 +139,22 @@ public void tearDown() throws Exception {
122139 public void publishSubscribe () throws Exception {
123140
124141 String sess1 = "sess1" ;
125- MessageExchange conn1 = MessageExchangeBuilder .connect (sess1 ).build ();
126- this .relay .handleMessage (conn1 .message );
127- this .responseHandler .expect (conn1 );
128-
129142 String sess2 = "sess2" ;
143+ MessageExchange conn1 = MessageExchangeBuilder .connect (sess1 ).build ();
130144 MessageExchange conn2 = MessageExchangeBuilder .connect (sess2 ).build ();
131- this .relay .handleMessage (conn2 .message );
132- this .responseHandler .expect (conn2 );
145+ this .responseHandler .expect (conn1 , conn2 );
133146
147+ this .relay .handleMessage (conn1 .message );
148+ this .relay .handleMessage (conn2 .message );
134149 this .responseHandler .awaitAndAssert ();
135150
136151 String subs1 = "subs1" ;
137152 String destination = "/topic/test" ;
138153
139154 MessageExchange subscribe = MessageExchangeBuilder .subscribeWithReceipt (sess1 , subs1 , destination , "r1" ).build ();
140- this .relay .handleMessage (subscribe .message );
141155 this .responseHandler .expect (subscribe );
156+
157+ this .relay .handleMessage (subscribe .message );
142158 this .responseHandler .awaitAndAssert ();
143159
144160 MessageExchange send = MessageExchangeBuilder .send (destination , "foo" ).andExpectMessage (sess1 , subs1 ).build ();
@@ -151,7 +167,7 @@ public void publishSubscribe() throws Exception {
151167 @ Test
152168 public void brokerUnvailableErrorFrameOnConnect () throws Exception {
153169
154- stopBrokerAndAwait ();
170+ stopActiveMqBrokerAndAwait ();
155171
156172 MessageExchange connect = MessageExchangeBuilder .connectWithError ("sess1" ).build ();
157173 this .responseHandler .expect (connect );
@@ -162,7 +178,7 @@ public void brokerUnvailableErrorFrameOnConnect() throws Exception {
162178
163179 @ Test (expected =MessageDeliveryException .class )
164180 public void messageDeliverExceptionIfSystemSessionForwardFails () throws Exception {
165- stopBrokerAndAwait ();
181+ stopActiveMqBrokerAndAwait ();
166182 StompHeaderAccessor headers = StompHeaderAccessor .create (StompCommand .SEND );
167183 this .relay .handleMessage (MessageBuilder .withPayload ("test" .getBytes ()).setHeaders (headers ).build ());
168184 }
@@ -175,20 +191,19 @@ public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {
175191 this .responseHandler .expect (connect );
176192
177193 this .relay .handleMessage (connect .message );
178-
179194 this .responseHandler .awaitAndAssert ();
180195
181196 this .responseHandler .expect (MessageExchangeBuilder .error (sess1 ).build ());
182197
183- stopBrokerAndAwait ();
198+ stopActiveMqBrokerAndAwait ();
184199
185200 this .responseHandler .awaitAndAssert ();
186201 }
187202
188203 @ Test
189204 public void brokerAvailabilityEventWhenStopped () throws Exception {
190205 this .eventPublisher .expectAvailabilityStatusChanges (false );
191- stopBrokerAndAwait ();
206+ stopActiveMqBrokerAndAwait ();
192207 this .eventPublisher .awaitAndAssert ();
193208 }
194209
@@ -198,6 +213,7 @@ public void relayReconnectsIfBrokerComesBackUp() throws Exception {
198213 String sess1 = "sess1" ;
199214 MessageExchange conn1 = MessageExchangeBuilder .connect (sess1 ).build ();
200215 this .responseHandler .expect (conn1 );
216+
201217 this .relay .handleMessage (conn1 .message );
202218 this .responseHandler .awaitAndAssert ();
203219
@@ -212,15 +228,15 @@ public void relayReconnectsIfBrokerComesBackUp() throws Exception {
212228
213229 this .responseHandler .expect (MessageExchangeBuilder .error (sess1 ).build ());
214230
215- stopBrokerAndAwait ();
231+ stopActiveMqBrokerAndAwait ();
216232
217233 this .responseHandler .awaitAndAssert ();
218234
219235 this .eventPublisher .expectAvailabilityStatusChanges (false );
220236 this .eventPublisher .awaitAndAssert ();
221237
222238 this .eventPublisher .expectAvailabilityStatusChanges (true );
223- createAndStartBroker ();
239+ startActiveMqBroker ();
224240 this .eventPublisher .awaitAndAssert ();
225241
226242 // TODO The event publisher assertions show that the broker's back up and the system relay session
@@ -231,14 +247,15 @@ public void relayReconnectsIfBrokerComesBackUp() throws Exception {
231247
232248 @ Test
233249 public void disconnectClosesRelaySessionCleanly () throws Exception {
250+
234251 MessageExchange connect = MessageExchangeBuilder .connect ("sess1" ).build ();
235252 this .responseHandler .expect (connect );
253+
236254 this .relay .handleMessage (connect .message );
237255 this .responseHandler .awaitAndAssert ();
238256
239257 StompHeaderAccessor headers = StompHeaderAccessor .create (StompCommand .DISCONNECT );
240258 headers .setSessionId ("sess1" );
241-
242259 this .relay .handleMessage (MessageBuilder .withPayload (new byte [0 ]).setHeaders (headers ).build ());
243260
244261 Thread .sleep (2000 );
@@ -248,24 +265,6 @@ public void disconnectClosesRelaySessionCleanly() throws Exception {
248265 }
249266
250267
251- private void stopBrokerAndAwait () throws Exception {
252- logger .debug ("Stopping ActiveMQ broker and will await shutdown" );
253- if (!this .activeMQBroker .isStarted ()) {
254- logger .debug ("Broker not running" );
255- return ;
256- }
257- final CountDownLatch latch = new CountDownLatch (1 );
258- this .activeMQBroker .addShutdownHook (new Runnable () {
259- public void run () {
260- latch .countDown ();
261- }
262- });
263- this .activeMQBroker .stop ();
264- assertTrue ("Broker did not stop" , latch .await (5 , TimeUnit .SECONDS ));
265- logger .debug ("Broker stopped" );
266- }
267-
268-
269268 /**
270269 * Handles messages by matching them to expectations including a latch to wait for
271270 * the completion of expected messages.
@@ -408,6 +407,7 @@ public static MessageExchangeBuilder connect(String sessionId) {
408407 StompHeaderAccessor headers = StompHeaderAccessor .create (StompCommand .CONNECT );
409408 headers .setSessionId (sessionId );
410409 headers .setAcceptVersion ("1.1,1.2" );
410+ headers .setHeartbeat (0 , 0 );
411411 Message <?> message = MessageBuilder .withPayload (new byte [0 ]).setHeaders (headers ).build ();
412412
413413 MessageExchangeBuilder builder = new MessageExchangeBuilder (message );
@@ -595,8 +595,8 @@ public void expectAvailabilityStatusChanges(Boolean... expected) {
595595
596596 public void awaitAndAssert () throws InterruptedException {
597597 synchronized (this .monitor ) {
598- long endTime = System .currentTimeMillis () + 6000 ;
599- while (this .expected .size () != this .actual .size () && System .currentTimeMillis () < endTime ) {
598+ long endTime = System .currentTimeMillis () + 10000 ;
599+ while (( this .expected .size () != this .actual .size ()) && ( System .currentTimeMillis () < endTime ) ) {
600600 this .monitor .wait (500 );
601601 }
602602 assertEquals (this .expected , this .actual );
@@ -605,6 +605,7 @@ public void awaitAndAssert() throws InterruptedException {
605605
606606 @ Override
607607 public void publishEvent (ApplicationEvent event ) {
608+ logger .debug ("Processing ApplicationEvent " + event );
608609 if (event instanceof BrokerAvailabilityEvent ) {
609610 synchronized (this .monitor ) {
610611 this .actual .add (((BrokerAvailabilityEvent ) event ).isBrokerAvailable ());
0 commit comments