11package io .jooby ;
22
3- import io .jooby .internal .ArrayValue ;
43import okhttp3 .Headers ;
54import okhttp3 .OkHttpClient ;
65import okhttp3 .RequestBody ;
3029import java .util .concurrent .CountDownLatch ;
3130import java .util .concurrent .LinkedBlockingQueue ;
3231import java .util .concurrent .TimeUnit ;
32+ import java .util .concurrent .atomic .AtomicBoolean ;
3333import java .util .function .BiConsumer ;
3434import java .util .function .Consumer ;
3535
@@ -39,23 +39,30 @@ private class SyncWebSocketListener extends WebSocketListener {
3939
4040 private CountDownLatch opened = new CountDownLatch (1 );
4141
42- private CountDownLatch closed = new CountDownLatch (1 );
43-
44- private List <Throwable > errors = new ArrayList <>();
42+ private AtomicBoolean closed = new AtomicBoolean (false );
4543
4644 private BlockingQueue messages = new LinkedBlockingQueue ();
4745
46+ private String testName ;
47+
48+ public SyncWebSocketListener (String testName ) {
49+ this .testName = testName ;
50+ }
51+
4852 @ Override public void onOpen (@ NotNull WebSocket webSocket , @ NotNull Response response ) {
4953 opened .countDown ();
5054 }
5155
5256 @ Override public void onClosed (@ NotNull WebSocket webSocket , int code , @ NotNull String reason ) {
53- closed .countDown ( );
57+ closed .set ( true );
5458 }
5559
5660 @ Override public void onFailure (@ NotNull WebSocket webSocket , @ NotNull Throwable e ,
5761 @ Nullable Response response ) {
58- errors .add (e );
62+ if (!Server .connectionLost (e )) {
63+ System .err .println ("Unexpected web socket error: " + testName );
64+ e .printStackTrace ();
65+ }
5966 }
6067
6168 @ Override public void onMessage (@ NotNull WebSocket webSocket , @ NotNull String text ) {
@@ -64,7 +71,7 @@ private class SyncWebSocketListener extends WebSocketListener {
6471
6572 public String lastMessage () {
6673 try {
67- return (String ) messages .take ( );
74+ return (String ) messages .poll ( 10 , TimeUnit . SECONDS );
6875 } catch (Exception x ) {
6976 throw SneakyThrows .propagate (x );
7077 }
@@ -95,6 +102,12 @@ public String send(String message) {
95102 ws .send (message );
96103 return listener .lastMessage ();
97104 }
105+
106+ public void close () {
107+ if (listener .closed .compareAndSet (false , true )) {
108+ ws .close (WebSocketCloseStatus .NORMAL_CODE , WebSocketCloseStatus .NORMAL .getReason ());
109+ }
110+ }
98111 }
99112
100113 public class Request {
@@ -218,15 +231,17 @@ public void get(String path, SneakyThrows.Consumer<Response> callback) {
218231 get (path ).execute (callback );
219232 }
220233
221- public WebSocket syncWebSocket (String path , SneakyThrows .Consumer <BlockingWebSocket > consumer ) {
234+ public void syncWebSocket (String path , SneakyThrows .Consumer <BlockingWebSocket > consumer ) {
222235 okhttp3 .Request .Builder req = new okhttp3 .Request .Builder ();
223236 req .url ("ws://localhost:" + port + path );
224237 setRequestHeaders (req );
225238 okhttp3 .Request r = req .build ();
226- SyncWebSocketListener listener = new SyncWebSocketListener ();
239+ SyncWebSocketListener listener = new SyncWebSocketListener (
240+ System .getProperty ("___app_name__" ) + "(" + System .getProperty ("___server_name__" ) + ")" );
227241 WebSocket webSocket = client .newWebSocket (r , listener );
228- consumer .accept (new BlockingWebSocket (webSocket , listener ));
229- return webSocket ;
242+ BlockingWebSocket blockingWebSocket = new BlockingWebSocket (webSocket , listener );
243+ consumer .accept (blockingWebSocket );
244+ blockingWebSocket .close ();
230245 }
231246
232247 public Request options (String path ) {
0 commit comments