3636import com .microsoft .java .debug .core .protocol .Events .DebugEvent ;
3737
3838import io .reactivex .disposables .Disposable ;
39+ import io .reactivex .schedulers .Schedulers ;
3940import io .reactivex .subjects .PublishSubject ;
4041
4142public abstract class AbstractProtocolServer implements IProtocolServer {
@@ -55,19 +56,30 @@ public abstract class AbstractProtocolServer implements IProtocolServer {
5556 private AtomicInteger sequenceNumber = new AtomicInteger (1 );
5657
5758 private PublishSubject <Messages .Response > responseSubject = PublishSubject .<Messages .Response >create ();
59+ private PublishSubject <Messages .Request > requestSubject = PublishSubject .<Messages .Request >create ();
5860
5961 /**
60- * Constructs a protocol server instance based on the given input stream and output stream.
62+ * Constructs a protocol server instance based on the given input stream and
63+ * output stream.
64+ *
6165 * @param input
62- * the input stream
66+ * the input stream
6367 * @param output
64- * the output stream
68+ * the output stream
6569 */
6670 public AbstractProtocolServer (InputStream input , OutputStream output ) {
6771 this .reader = new BufferedReader (new InputStreamReader (input , PROTOCOL_ENCODING ));
6872 this .writer = new PrintWriter (new BufferedWriter (new OutputStreamWriter (output , PROTOCOL_ENCODING )));
6973 this .contentLength = -1 ;
7074 this .rawData = new ByteBuffer ();
75+
76+ requestSubject .observeOn (Schedulers .newThread ()).subscribe (request -> {
77+ try {
78+ this .dispatchRequest (request );
79+ } catch (Exception e ) {
80+ logger .log (Level .SEVERE , String .format ("Dispatch debug protocol error: %s" , e .toString ()), e );
81+ }
82+ });
7183 }
7284
7385 /**
@@ -91,16 +103,18 @@ public void run() {
91103 }
92104
93105 /**
94- * Sets terminateSession flag to true. And the dispatcher loop will be terminated after current dispatching operation finishes.
106+ * Sets terminateSession flag to true. And the dispatcher loop will be
107+ * terminated after current dispatching operation finishes.
95108 */
96109 public void stop () {
97110 this .terminateSession = true ;
98111 }
99112
100113 /**
101114 * Send a request/response/event to the DA.
115+ *
102116 * @param message
103- * the message.
117+ * the message.
104118 */
105119 private void sendMessage (Messages .ProtocolMessage message ) {
106120 message .seq = this .sequenceNumber .getAndIncrement ();
@@ -152,13 +166,18 @@ public CompletableFuture<Messages.Response> sendRequest(Messages.Request request
152166 CompletableFuture <Messages .Response > future = new CompletableFuture <>();
153167 Timer timer = new Timer ();
154168 Disposable [] disposable = new Disposable [1 ];
155- disposable [0 ] = responseSubject .filter (response -> response .request_seq == request .seq ).take (1 ).subscribe ((response ) -> {
156- timer .cancel ();
157- future .complete (response );
158- if (disposable [0 ] != null ) {
159- disposable [0 ].dispose ();
160- }
161- });
169+ disposable [0 ] = responseSubject .filter (response -> response .request_seq == request .seq ).take (1 )
170+ .observeOn (Schedulers .newThread ()).subscribe ((response ) -> {
171+ try {
172+ timer .cancel ();
173+ future .complete (response );
174+ if (disposable [0 ] != null ) {
175+ disposable [0 ].dispose ();
176+ }
177+ } catch (Exception e ) {
178+ logger .log (Level .SEVERE , String .format ("Handle response error: %s" , e .toString ()), e );
179+ }
180+ });
162181 sendMessage (request );
163182 if (timeout > 0 ) {
164183 try {
@@ -181,46 +200,47 @@ public void run() {
181200 private void processData () {
182201 while (true ) {
183202 /**
184- * In vscode debug protocol, the content length represents the message's byte length with utf8 format.
203+ * In vscode debug protocol, the content length represents the
204+ * message's byte length with utf8 format.
185205 */
186206 if (this .contentLength >= 0 ) {
187207 if (this .rawData .length () >= this .contentLength ) {
188208 byte [] buf = this .rawData .removeFirst (this .contentLength );
189209 this .contentLength = -1 ;
190210 String messageData = new String (buf , PROTOCOL_ENCODING );
191- Messages .ProtocolMessage message = JsonUtils .fromJson (messageData , Messages .ProtocolMessage .class );
211+ try {
212+ Messages .ProtocolMessage message = JsonUtils .fromJson (messageData , Messages .ProtocolMessage .class );
192213
193- logger .fine (String .format ("\n [%s]\n %s" , message .type , messageData ));
214+ logger .fine (String .format ("\n [%s]\n %s" , message .type , messageData ));
194215
195- if (message .type .equals ("request" )) {
196- try {
197- this .dispatchRequest (JsonUtils .fromJson (messageData , Messages .Request .class ));
198- } catch (Exception e ) {
199- logger .log (Level .SEVERE , String .format ("Dispatch debug protocol error: %s" , e .toString ()), e );
200- }
201- } else if (message .type .equals ("response" )) {
202- try {
216+ if (message .type .equals ("request" )) {
217+ Messages .Request request = JsonUtils .fromJson (messageData , Messages .Request .class );
218+ requestSubject .onNext (request );
219+ } else if (message .type .equals ("response" )) {
203220 Messages .Response response = JsonUtils .fromJson (messageData , Messages .Response .class );
204221 responseSubject .onNext (response );
205- } catch (Exception e ) {
206- logger .log (Level .SEVERE , String .format ("Handle response error: %s" , e .toString ()), e );
207222 }
223+ } catch (Exception ex ) {
224+ logger .log (Level .SEVERE , String .format ("Error parsing message: %s" , ex .toString ()), ex );
208225 }
226+
209227 continue ;
210228 }
211- } else {
212- String rawMessage = this .rawData .getString (PROTOCOL_ENCODING );
213- int idx = rawMessage .indexOf (TWO_CRLF );
214- if (idx != -1 ) {
215- Matcher matcher = CONTENT_LENGTH_MATCHER .matcher (rawMessage );
216- if (matcher .find ()) {
217- this .contentLength = Integer .parseInt (matcher .group (1 ));
218- int headerByteLength = rawMessage .substring (0 , idx + TWO_CRLF .length ()).getBytes (PROTOCOL_ENCODING ).length ;
219- this .rawData .removeFirst (headerByteLength ); // Remove the header from the raw message.
220- continue ;
221- }
229+ }
230+
231+ String rawMessage = this .rawData .getString (PROTOCOL_ENCODING );
232+ int idx = rawMessage .indexOf (TWO_CRLF );
233+ if (idx != -1 ) {
234+ Matcher matcher = CONTENT_LENGTH_MATCHER .matcher (rawMessage );
235+ if (matcher .find ()) {
236+ this .contentLength = Integer .parseInt (matcher .group (1 ));
237+ int headerByteLength = rawMessage .substring (0 , idx + TWO_CRLF .length ())
238+ .getBytes (PROTOCOL_ENCODING ).length ;
239+ this .rawData .removeFirst (headerByteLength ); // Remove the header from the raw message.
240+ continue ;
222241 }
223242 }
243+
224244 break ;
225245 }
226246 }
0 commit comments