Skip to content

Commit 292d2da

Browse files
authored
Use proper scheduler to unblock request/response streams (#141)
* Use proper scheduler to unblock request/response streams * Schedule request processing on a new thread * Catch exception when parsing message data
1 parent f649bc6 commit 292d2da

File tree

1 file changed

+56
-36
lines changed

1 file changed

+56
-36
lines changed

com.microsoft.java.debug.core/src/main/java/com/microsoft/java/debug/core/protocol/AbstractProtocolServer.java

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.microsoft.java.debug.core.protocol.Events.DebugEvent;
3737

3838
import io.reactivex.disposables.Disposable;
39+
import io.reactivex.schedulers.Schedulers;
3940
import io.reactivex.subjects.PublishSubject;
4041

4142
public abstract class AbstractProtocolServer implements IProtocolServer {
@@ -55,19 +56,30 @@ public abstract class AbstractProtocolServer implements IProtocolServer {
5556
private AtomicInteger sequenceNumber = new AtomicInteger(1);
5657

5758
private PublishSubject<Messages.Response> responseSubject = PublishSubject.<Messages.Response>create();
59+
private PublishSubject<Messages.Request> requestSubject = PublishSubject.<Messages.Request>create();
5860

5961
/**
60-
* Constructs a protocol server instance based on the given input stream and output stream.
62+
* Constructs a protocol server instance based on the given input stream and
63+
* output stream.
64+
*
6165
* @param input
62-
* the input stream
66+
* the input stream
6367
* @param output
64-
* the output stream
68+
* the output stream
6569
*/
6670
public AbstractProtocolServer(InputStream input, OutputStream output) {
6771
this.reader = new BufferedReader(new InputStreamReader(input, PROTOCOL_ENCODING));
6872
this.writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(output, PROTOCOL_ENCODING)));
6973
this.contentLength = -1;
7074
this.rawData = new ByteBuffer();
75+
76+
requestSubject.observeOn(Schedulers.newThread()).subscribe(request -> {
77+
try {
78+
this.dispatchRequest(request);
79+
} catch (Exception e) {
80+
logger.log(Level.SEVERE, String.format("Dispatch debug protocol error: %s", e.toString()), e);
81+
}
82+
});
7183
}
7284

7385
/**
@@ -91,16 +103,18 @@ public void run() {
91103
}
92104

93105
/**
94-
* Sets terminateSession flag to true. And the dispatcher loop will be terminated after current dispatching operation finishes.
106+
* Sets terminateSession flag to true. And the dispatcher loop will be
107+
* terminated after current dispatching operation finishes.
95108
*/
96109
public void stop() {
97110
this.terminateSession = true;
98111
}
99112

100113
/**
101114
* Send a request/response/event to the DA.
115+
*
102116
* @param message
103-
* the message.
117+
* the message.
104118
*/
105119
private void sendMessage(Messages.ProtocolMessage message) {
106120
message.seq = this.sequenceNumber.getAndIncrement();
@@ -152,13 +166,18 @@ public CompletableFuture<Messages.Response> sendRequest(Messages.Request request
152166
CompletableFuture<Messages.Response> future = new CompletableFuture<>();
153167
Timer timer = new Timer();
154168
Disposable[] disposable = new Disposable[1];
155-
disposable[0] = responseSubject.filter(response -> response.request_seq == request.seq).take(1).subscribe((response) -> {
156-
timer.cancel();
157-
future.complete(response);
158-
if (disposable[0] != null) {
159-
disposable[0].dispose();
160-
}
161-
});
169+
disposable[0] = responseSubject.filter(response -> response.request_seq == request.seq).take(1)
170+
.observeOn(Schedulers.newThread()).subscribe((response) -> {
171+
try {
172+
timer.cancel();
173+
future.complete(response);
174+
if (disposable[0] != null) {
175+
disposable[0].dispose();
176+
}
177+
} catch (Exception e) {
178+
logger.log(Level.SEVERE, String.format("Handle response error: %s", e.toString()), e);
179+
}
180+
});
162181
sendMessage(request);
163182
if (timeout > 0) {
164183
try {
@@ -181,46 +200,47 @@ public void run() {
181200
private void processData() {
182201
while (true) {
183202
/**
184-
* In vscode debug protocol, the content length represents the message's byte length with utf8 format.
203+
* In vscode debug protocol, the content length represents the
204+
* message's byte length with utf8 format.
185205
*/
186206
if (this.contentLength >= 0) {
187207
if (this.rawData.length() >= this.contentLength) {
188208
byte[] buf = this.rawData.removeFirst(this.contentLength);
189209
this.contentLength = -1;
190210
String messageData = new String(buf, PROTOCOL_ENCODING);
191-
Messages.ProtocolMessage message = JsonUtils.fromJson(messageData, Messages.ProtocolMessage.class);
211+
try {
212+
Messages.ProtocolMessage message = JsonUtils.fromJson(messageData, Messages.ProtocolMessage.class);
192213

193-
logger.fine(String.format("\n[%s]\n%s", message.type, messageData));
214+
logger.fine(String.format("\n[%s]\n%s", message.type, messageData));
194215

195-
if (message.type.equals("request")) {
196-
try {
197-
this.dispatchRequest(JsonUtils.fromJson(messageData, Messages.Request.class));
198-
} catch (Exception e) {
199-
logger.log(Level.SEVERE, String.format("Dispatch debug protocol error: %s", e.toString()), e);
200-
}
201-
} else if (message.type.equals("response")) {
202-
try {
216+
if (message.type.equals("request")) {
217+
Messages.Request request = JsonUtils.fromJson(messageData, Messages.Request.class);
218+
requestSubject.onNext(request);
219+
} else if (message.type.equals("response")) {
203220
Messages.Response response = JsonUtils.fromJson(messageData, Messages.Response.class);
204221
responseSubject.onNext(response);
205-
} catch (Exception e) {
206-
logger.log(Level.SEVERE, String.format("Handle response error: %s", e.toString()), e);
207222
}
223+
} catch (Exception ex) {
224+
logger.log(Level.SEVERE, String.format("Error parsing message: %s", ex.toString()), ex);
208225
}
226+
209227
continue;
210228
}
211-
} else {
212-
String rawMessage = this.rawData.getString(PROTOCOL_ENCODING);
213-
int idx = rawMessage.indexOf(TWO_CRLF);
214-
if (idx != -1) {
215-
Matcher matcher = CONTENT_LENGTH_MATCHER.matcher(rawMessage);
216-
if (matcher.find()) {
217-
this.contentLength = Integer.parseInt(matcher.group(1));
218-
int headerByteLength = rawMessage.substring(0, idx + TWO_CRLF.length()).getBytes(PROTOCOL_ENCODING).length;
219-
this.rawData.removeFirst(headerByteLength); // Remove the header from the raw message.
220-
continue;
221-
}
229+
}
230+
231+
String rawMessage = this.rawData.getString(PROTOCOL_ENCODING);
232+
int idx = rawMessage.indexOf(TWO_CRLF);
233+
if (idx != -1) {
234+
Matcher matcher = CONTENT_LENGTH_MATCHER.matcher(rawMessage);
235+
if (matcher.find()) {
236+
this.contentLength = Integer.parseInt(matcher.group(1));
237+
int headerByteLength = rawMessage.substring(0, idx + TWO_CRLF.length())
238+
.getBytes(PROTOCOL_ENCODING).length;
239+
this.rawData.removeFirst(headerByteLength); // Remove the header from the raw message.
240+
continue;
222241
}
223242
}
243+
224244
break;
225245
}
226246
}

0 commit comments

Comments
 (0)