Skip to content
Merged
Next Next commit
On branch edburns/dd-2758695-virtual-threads Add **Shared `ScheduledE…
…xecutorService`** for timeouts

## CopilotSession.java

- Added `ScheduledExecutorService` import.
- New field `timeoutScheduler`: shared single-thread scheduler, daemon thread named `sendAndWait-timeout`.
- Initialized in 3-arg constructor.
- `sendAndWait()`: replaced per-call `Executors.newSingleThreadScheduledExecutor()` with `timeoutScheduler.schedule()`. Cleanup calls `timeoutTask.cancel(false)` instead of `scheduler.shutdown()`.
- `close()`: added `timeoutScheduler.shutdownNow()` before the blocking `session.destroy` RPC call so stale timeouts cannot fire after close.

## TimeoutEdgeCaseTest.java (new)

- `testTimeoutDoesNotFireAfterSessionClose`: proves close() cancels pending timeouts (future not completed by stale TimeoutException).
- `testSendAndWaitReusesTimeoutThread`: proves two sendAndWait calls share one scheduler thread instead of spawning two.
- Uses reflection to construct a hanging `JsonRpcClient` (blocking InputStream, sink OutputStream).

Signed-off-by: Ed Burns <edburns@microsoft.com>
  • Loading branch information
edburns committed Mar 27, 2026
commit 3c405b75a4a9f17e26f516e80042c51023d4397d
21 changes: 12 additions & 9 deletions src/main/java/com/github/copilot/sdk/CopilotSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -121,6 +122,7 @@ public final class CopilotSession implements AutoCloseable {
private volatile EventErrorHandler eventErrorHandler;
private volatile EventErrorPolicy eventErrorPolicy = EventErrorPolicy.PROPAGATE_AND_LOG_ERRORS;
private volatile Map<String, java.util.function.Function<String, CompletableFuture<String>>> transformCallbacks;
private final ScheduledExecutorService timeoutScheduler;

/** Tracks whether this session instance has been terminated via close(). */
private volatile boolean isTerminated = false;
Expand Down Expand Up @@ -157,6 +159,11 @@ public final class CopilotSession implements AutoCloseable {
this.sessionId = sessionId;
this.rpc = rpc;
this.workspacePath = workspacePath;
this.timeoutScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
var t = new Thread(r, "sendAndWait-timeout");
t.setDaemon(true);
return t;
});
}

/**
Expand Down Expand Up @@ -407,17 +414,11 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
return null;
});

// Set up timeout with daemon thread so it doesn't prevent JVM exit
var scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
var t = new Thread(r, "sendAndWait-timeout");
t.setDaemon(true);
return t;
});
scheduler.schedule(() -> {
// Schedule timeout on the shared session-level scheduler
var timeoutTask = timeoutScheduler.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
}
scheduler.shutdown();
}, timeoutMs, TimeUnit.MILLISECONDS);

var result = new CompletableFuture<AssistantMessageEvent>();
Expand All @@ -429,7 +430,7 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
} catch (IOException e) {
LOG.log(Level.SEVERE, "Error closing subscription", e);
}
scheduler.shutdown();
timeoutTask.cancel(false);
if (!result.isDone()) {
if (ex != null) {
result.completeExceptionally(ex);
Expand Down Expand Up @@ -1303,6 +1304,8 @@ public void close() {
isTerminated = true;
}

timeoutScheduler.shutdownNow();

try {
rpc.invoke("session.destroy", Map.of("sessionId", sessionId), Void.class).get(5, TimeUnit.SECONDS);
} catch (Exception e) {
Expand Down
148 changes: 148 additions & 0 deletions src/test/java/com/github/copilot/sdk/TimeoutEdgeCaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

package com.github.copilot.sdk;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Test;

import com.github.copilot.sdk.events.AssistantMessageEvent;
import com.github.copilot.sdk.json.MessageOptions;

/**
* Tests for timeout edge cases in {@link CopilotSession#sendAndWait}.
* <p>
* These tests prove two defects in the current per-call
* {@code ScheduledExecutorService} approach:
* <ol>
* <li>A timeout fires after {@code close()}, leaking a {@code TimeoutException}
* onto the returned future.</li>
* <li>Each {@code sendAndWait} call spawns a new OS thread (~1 MB stack),
* instead of reusing a shared scheduler thread.</li>
* </ol>
Comment thread
edburns marked this conversation as resolved.
*/
public class TimeoutEdgeCaseTest {

/**
* Creates a {@link JsonRpcClient} whose {@code invoke()} returns futures that
* never complete. The reader thread blocks forever on the input stream, and
* writes go to a no-op output stream.
*/
private JsonRpcClient createHangingRpcClient() throws Exception {
InputStream blockingInput = new InputStream() {
@Override
public int read() throws IOException {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
}
return -1;
}
};
ByteArrayOutputStream sinkOutput = new ByteArrayOutputStream();

var ctor = JsonRpcClient.class.getDeclaredConstructor(
InputStream.class, java.io.OutputStream.class, Socket.class, Process.class);
ctor.setAccessible(true);
return (JsonRpcClient) ctor.newInstance(blockingInput, sinkOutput, null, null);
}

/**
* After {@code close()}, the future returned by {@code sendAndWait} must NOT be
* completed by a stale timeout.
* <p>
* Current buggy behavior: the per-call scheduler is not cancelled by
* {@code close()}, so its 2-second timeout fires during the 5-second
* {@code session.destroy} RPC wait, completing the future with
* {@code TimeoutException}.
* <p>
* Expected behavior after fix: {@code close()} cancels pending timeouts before
* the blocking RPC call, so the future remains incomplete.
*/
@Test
void testTimeoutDoesNotFireAfterSessionClose() throws Exception {
JsonRpcClient rpc = createHangingRpcClient();
try {
CopilotSession session = new CopilotSession("test-timeout-id", rpc);

CompletableFuture<AssistantMessageEvent> result = session.sendAndWait(
new MessageOptions().setPrompt("hello"), 2000);

assertFalse(result.isDone(), "Future should be pending before timeout fires");

// close() blocks up to 5s on session.destroy RPC. The 2s timeout
// fires during that window with the current per-call scheduler.
session.close();

assertFalse(result.isDone(),
"Future should not be completed by a timeout after session is closed. "
+ "The per-call ScheduledExecutorService leaked a TimeoutException.");
} finally {
rpc.close();
}
}

/**
* A shared scheduler should reuse a single thread across multiple
* {@code sendAndWait} calls, rather than spawning a new OS thread per call.
* <p>
* Current buggy behavior: two calls create two {@code sendAndWait-timeout}
* threads.
* <p>
* Expected behavior after fix: two calls still use only one scheduler thread.
*/
@Test
void testSendAndWaitReusesTimeoutThread() throws Exception {
JsonRpcClient rpc = createHangingRpcClient();
try {
CopilotSession session = new CopilotSession("test-thread-count-id", rpc);

long baselineCount = countTimeoutThreads();

CompletableFuture<AssistantMessageEvent> result1 = session.sendAndWait(
new MessageOptions().setPrompt("hello1"), 30000);

Thread.sleep(100);
long afterFirst = countTimeoutThreads();
assertTrue(afterFirst >= baselineCount + 1,
"Expected at least one new sendAndWait-timeout thread after first call. "
+ "Baseline: " + baselineCount + ", after: " + afterFirst);

CompletableFuture<AssistantMessageEvent> result2 = session.sendAndWait(
new MessageOptions().setPrompt("hello2"), 30000);

Thread.sleep(100);
long afterSecond = countTimeoutThreads();
assertTrue(afterSecond == afterFirst,
"Shared scheduler should reuse the same thread — no new threads after second call. "
+ "After first: " + afterFirst + ", after second: " + afterSecond);

result1.cancel(true);
result2.cancel(true);
session.close();
} finally {
rpc.close();
}
}

/**
* Counts the number of live threads whose name contains "sendAndWait-timeout".
*/
private long countTimeoutThreads() {
return Thread.getAllStackTraces().keySet().stream()
.filter(t -> t.getName().contains("sendAndWait-timeout"))
.filter(Thread::isAlive)
.count();
}
}
Loading