Skip to content

Commit 41e15ae

Browse files
authored
feat: make process output blocking-capable (coder#23312)
Replace the 200ms polling loop in chatd's execute and process_output tools with server-side blocking via sync.Cond on HeadTailBuffer. The agent's GET /{id}/output endpoint accepts ?wait=true to block until the process exits or a 5-minute server cap expires. The process_output tool blocks by default for 10s (overridable via wait_timeout), and falls back to a non-blocking snapshot on timeout. The execute tool's foreground path makes a single blocking call instead of polling. Related coder#23316
1 parent c8e5857 commit 41e15ae

10 files changed

Lines changed: 375 additions & 81 deletions

File tree

agent/agentproc/api.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package agentproc
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
78
"net/http"
89
"sort"
10+
"time"
911

1012
"github.com/go-chi/chi/v5"
1113
"github.com/google/uuid"
@@ -18,6 +20,13 @@ import (
1820
"github.com/coder/coder/v2/codersdk/workspacesdk"
1921
)
2022

23+
const (
24+
// maxWaitDuration is the maximum time a blocking
25+
// process output request can wait, regardless of
26+
// what the client requests.
27+
maxWaitDuration = 5 * time.Minute
28+
)
29+
2130
// API exposes process-related operations through the agent.
2231
type API struct {
2332
logger slog.Logger
@@ -163,6 +172,30 @@ func (api *API) handleProcessOutput(rw http.ResponseWriter, r *http.Request) {
163172
}
164173
}
165174

175+
// Check for blocking mode via query params.
176+
waitStr := r.URL.Query().Get("wait")
177+
wantWait := waitStr == "true"
178+
179+
if wantWait {
180+
// Extend the write deadline so the HTTP server's
181+
// WriteTimeout does not kill the connection while
182+
// we block.
183+
rc := http.NewResponseController(rw)
184+
if err := rc.SetWriteDeadline(time.Now().Add(maxWaitDuration)); err != nil {
185+
api.logger.Error(ctx, "extend write deadline for blocking process output",
186+
slog.Error(err),
187+
)
188+
}
189+
190+
// Cap the wait at maxWaitDuration regardless of
191+
// client-supplied timeout.
192+
waitCtx, waitCancel := context.WithTimeout(ctx, maxWaitDuration)
193+
defer waitCancel()
194+
195+
_ = proc.waitForOutput(waitCtx)
196+
// Fall through to read snapshot below.
197+
}
198+
166199
output, truncated := proc.output()
167200
info := proc.info()
168201

agent/agentproc/api_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os"
1111
"runtime"
1212
"strings"
13+
"sync"
1314
"testing"
1415
"time"
1516

@@ -783,6 +784,133 @@ func TestProcessOutput(t *testing.T) {
783784
w2 := getOutput(t, handler, id)
784785
require.Equal(t, http.StatusOK, w2.Code)
785786
})
787+
788+
t.Run("WaitForExit", func(t *testing.T) {
789+
t.Parallel()
790+
791+
handler := newTestAPI(t)
792+
793+
id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{
794+
Command: "echo hello-wait && sleep 0.1",
795+
})
796+
797+
w := getOutputWithWait(t, handler, id)
798+
require.Equal(t, http.StatusOK, w.Code)
799+
800+
var resp workspacesdk.ProcessOutputResponse
801+
err := json.NewDecoder(w.Body).Decode(&resp)
802+
require.NoError(t, err)
803+
require.False(t, resp.Running)
804+
require.NotNil(t, resp.ExitCode)
805+
require.Equal(t, 0, *resp.ExitCode)
806+
require.Contains(t, resp.Output, "hello-wait")
807+
})
808+
809+
t.Run("WaitAlreadyExited", func(t *testing.T) {
810+
t.Parallel()
811+
812+
handler := newTestAPI(t)
813+
814+
id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{
815+
Command: "echo done",
816+
})
817+
818+
waitForExit(t, handler, id)
819+
820+
w := getOutputWithWait(t, handler, id)
821+
require.Equal(t, http.StatusOK, w.Code)
822+
823+
var resp workspacesdk.ProcessOutputResponse
824+
err := json.NewDecoder(w.Body).Decode(&resp)
825+
require.NoError(t, err)
826+
require.False(t, resp.Running)
827+
require.Contains(t, resp.Output, "done")
828+
})
829+
830+
t.Run("WaitTimeout", func(t *testing.T) {
831+
t.Parallel()
832+
833+
handler := newTestAPI(t)
834+
835+
id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{
836+
Command: "sleep 300",
837+
Background: true,
838+
})
839+
840+
ctx, cancel := context.WithTimeout(context.Background(), testutil.IntervalMedium)
841+
defer cancel()
842+
843+
w := getOutputWithWaitCtx(ctx, t, handler, id)
844+
require.Equal(t, http.StatusOK, w.Code)
845+
846+
var resp workspacesdk.ProcessOutputResponse
847+
err := json.NewDecoder(w.Body).Decode(&resp)
848+
require.NoError(t, err)
849+
require.True(t, resp.Running)
850+
851+
// Kill and wait for the process so cleanup does
852+
// not hang.
853+
postSignal(
854+
t, handler, id,
855+
workspacesdk.SignalProcessRequest{Signal: "kill"},
856+
)
857+
waitForExit(t, handler, id)
858+
})
859+
860+
t.Run("ConcurrentWaiters", func(t *testing.T) {
861+
t.Parallel()
862+
863+
handler := newTestAPI(t)
864+
865+
id := startAndGetID(t, handler, workspacesdk.StartProcessRequest{
866+
Command: "sleep 300",
867+
Background: true,
868+
})
869+
870+
var (
871+
wg sync.WaitGroup
872+
resps [2]workspacesdk.ProcessOutputResponse
873+
codes [2]int
874+
)
875+
for i := range 2 {
876+
wg.Add(1)
877+
go func() {
878+
defer wg.Done()
879+
w := getOutputWithWait(t, handler, id)
880+
codes[i] = w.Code
881+
_ = json.NewDecoder(w.Body).Decode(&resps[i])
882+
}()
883+
}
884+
885+
// Signal the process to exit so both waiters unblock.
886+
postSignal(
887+
t, handler, id,
888+
workspacesdk.SignalProcessRequest{Signal: "kill"},
889+
)
890+
891+
wg.Wait()
892+
893+
for i := range 2 {
894+
require.Equal(t, http.StatusOK, codes[i], "waiter %d", i)
895+
require.False(t, resps[i].Running, "waiter %d", i)
896+
}
897+
})
898+
}
899+
900+
func getOutputWithWait(t *testing.T, handler http.Handler, id string) *httptest.ResponseRecorder {
901+
t.Helper()
902+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
903+
defer cancel()
904+
return getOutputWithWaitCtx(ctx, t, handler, id)
905+
}
906+
907+
func getOutputWithWaitCtx(ctx context.Context, t *testing.T, handler http.Handler, id string) *httptest.ResponseRecorder {
908+
t.Helper()
909+
path := fmt.Sprintf("/%s/output?wait=true", id)
910+
req := httptest.NewRequestWithContext(ctx, http.MethodGet, path, nil)
911+
w := httptest.NewRecorder()
912+
handler.ServeHTTP(w, req)
913+
return w
786914
}
787915

788916
func TestSignalProcess(t *testing.T) {

agent/agentproc/headtail.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@ const (
3939
// how much output is written.
4040
type HeadTailBuffer struct {
4141
mu sync.Mutex
42+
cond *sync.Cond
4243
head []byte
4344
tail []byte
4445
tailPos int
4546
tailFull bool
4647
headFull bool
48+
closed bool
4749
totalBytes int
4850
maxHead int
4951
maxTail int
@@ -52,20 +54,24 @@ type HeadTailBuffer struct {
5254
// NewHeadTailBuffer creates a new HeadTailBuffer with the
5355
// default head and tail sizes.
5456
func NewHeadTailBuffer() *HeadTailBuffer {
55-
return &HeadTailBuffer{
57+
b := &HeadTailBuffer{
5658
maxHead: MaxHeadBytes,
5759
maxTail: MaxTailBytes,
5860
}
61+
b.cond = sync.NewCond(&b.mu)
62+
return b
5963
}
6064

6165
// NewHeadTailBufferSized creates a HeadTailBuffer with custom
6266
// head and tail sizes. This is useful for testing truncation
6367
// logic with smaller buffers.
6468
func NewHeadTailBufferSized(maxHead, maxTail int) *HeadTailBuffer {
65-
return &HeadTailBuffer{
69+
b := &HeadTailBuffer{
6670
maxHead: maxHead,
6771
maxTail: maxTail,
6872
}
73+
b.cond = sync.NewCond(&b.mu)
74+
return b
6975
}
7076

7177
// Write implements io.Writer. It is safe for concurrent use.
@@ -296,6 +302,15 @@ func truncateLines(s string) string {
296302
return b.String()
297303
}
298304

305+
// Close marks the buffer as closed and wakes any waiters.
306+
// This is called when the process exits.
307+
func (b *HeadTailBuffer) Close() {
308+
b.mu.Lock()
309+
defer b.mu.Unlock()
310+
b.closed = true
311+
b.cond.Broadcast()
312+
}
313+
299314
// Reset clears the buffer, discarding all data.
300315
func (b *HeadTailBuffer) Reset() {
301316
b.mu.Lock()
@@ -305,5 +320,7 @@ func (b *HeadTailBuffer) Reset() {
305320
b.tailPos = 0
306321
b.tailFull = false
307322
b.headFull = false
323+
b.closed = false
308324
b.totalBytes = 0
325+
b.cond.Broadcast()
309326
}

agent/agentproc/process.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ func (m *manager) start(req workspacesdk.StartProcessRequest, chatID string) (*p
208208
proc.exitCode = &code
209209
proc.mu.Unlock()
210210

211+
// Wake any waiters blocked on new output or
212+
// process exit before closing the done channel.
213+
proc.buf.Close()
211214
close(proc.done)
212215
}()
213216

@@ -320,6 +323,36 @@ func (m *manager) Close() error {
320323
return nil
321324
}
322325

326+
// waitForOutput blocks until the buffer is closed (process
327+
// exited) or the context is canceled. Returns nil when the
328+
// buffer closed, ctx.Err() when the context expired.
329+
func (p *process) waitForOutput(ctx context.Context) error {
330+
p.buf.cond.L.Lock()
331+
defer p.buf.cond.L.Unlock()
332+
333+
nevermind := make(chan struct{})
334+
defer close(nevermind)
335+
go func() {
336+
select {
337+
case <-ctx.Done():
338+
// Acquire the lock before broadcasting to
339+
// guarantee the waiter has entered cond.Wait()
340+
// (which atomically releases the lock).
341+
// Without this, a Broadcast between the loop
342+
// predicate check and cond.Wait() is lost.
343+
p.buf.cond.L.Lock()
344+
defer p.buf.cond.L.Unlock()
345+
p.buf.cond.Broadcast()
346+
case <-nevermind:
347+
}
348+
}()
349+
350+
for ctx.Err() == nil && !p.buf.closed {
351+
p.buf.cond.Wait()
352+
}
353+
return ctx.Err()
354+
}
355+
323356
// resolveWorkDir returns the directory a process should start in.
324357
// Priority: explicit request dir > agent configured dir > $HOME.
325358
// Falls through when a candidate is empty or does not exist on

coderd/chatd/chatd_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1590,7 +1590,7 @@ func TestPersistToolResultWithBinaryData(t *testing.T) {
15901590
}).
15911591
Times(1)
15921592
mockConn.EXPECT().
1593-
ProcessOutput(gomock.Any(), "proc-binary").
1593+
ProcessOutput(gomock.Any(), "proc-binary", gomock.Any()).
15941594
Return(workspacesdk.ProcessOutputResponse{
15951595
Output: string(binaryOutput),
15961596
Running: false,

0 commit comments

Comments
 (0)