Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions coderd/x/chatd/chatdebug/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ func (s *attemptSink) record(a Attempt) {
s.attempts = append(s.attempts, a)
}

// replaceByNumber overwrites a previously recorded attempt whose Number
// matches. If no match is found, the attempt is appended. This supports
// the provisional-then-upgrade flow used for SSE bodies where Read()
// records a completed attempt on EOF and Close() later needs to replace
// it with a failed attempt when inner.Close() surfaces an error.
func (s *attemptSink) replaceByNumber(number int, a Attempt) {
s.mu.Lock()
defer s.mu.Unlock()

for i := range s.attempts {
if s.attempts[i].Number == number {
s.attempts[i] = a
return
}
}
s.attempts = append(s.attempts, a)
}

func (s *attemptSink) snapshot() []Attempt {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
156 changes: 115 additions & 41 deletions coderd/x/chatd/chatdebug/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ type recordingBody struct {
truncated bool
sawEOF bool
bytesRead int64
// recordedProvisional is true when recordProvisional() has fired
// for an SSE body's Read-path EOF but Close() has not yet run. A
// subsequent inner.Close() error in Close() upgrades the
// provisional entry in the sink so the close error is not lost.
recordedProvisional bool

recordOnce sync.Once
closeOnce sync.Once
Expand Down Expand Up @@ -225,12 +230,24 @@ func (r *recordingBody) Read(p []byte) (int, error) {
r.accumulateReadLocked(p, n, err)
r.mu.Unlock()

// Only record non-EOF errors immediately. io.EOF is deferred
// to Close() which runs more sophisticated validation (JSON
// completeness checks, content-length verification, etc.).
// Recording EOF here would preempt Close() via recordOnce.
// Record non-EOF errors immediately. EOF is handled
// below for SSE or deferred to Close() for validation.
if err != nil && !errors.Is(err, io.EOF) {
r.record(err)
return n, err
}

// For server-sent-events bodies, record eagerly on EOF. Streaming
// consumers like fantasy's Anthropic SSE adapter iterate the
// response to EOF and abandon it without calling Close(), so the
// Close-only recording path would never fire and the attempt would
// be lost. The recording is provisional so Close() can still
// upgrade it to failed if inner.Close() surfaces a transport error.
// Non-SSE bodies stay on the Close-only path so that JSON
// integrity, content-length validation, and inner-Close errors
// keep their existing semantics.
if errors.Is(err, io.EOF) && isSSEContentType(r.contentType) {
r.recordProvisional(io.EOF)
}
Comment thread
ThomasK33 marked this conversation as resolved.
return n, err
}
Expand Down Expand Up @@ -259,7 +276,27 @@ func (r *recordingBody) Close() error {
closeErr = r.inner.Close()
})
if closeErr != nil {
r.record(closeErr)
// Hold r.mu across the flag check AND the publish/replace so a
// concurrent recordProvisional cannot slip its recordOnce
// publish between our read of recordedProvisional and our call
// into the sink. Without this serialization, Close() could
// observe recordedProvisional=false, then lose the race and
// see r.record(closeErr) become a no-op once recordOnce has
// already fired from the SSE EOF path.
r.mu.Lock()
if r.recordedProvisional {
// The SSE EOF path already appended a completed attempt.
// inner.Close() surfaced a transport error, so upgrade
// that entry to failed instead of losing the close error.
upgraded := r.buildAttemptLocked(closeErr)
r.sink.replaceByNumber(upgraded.Number, upgraded)
r.recordedProvisional = false
} else {
r.recordOnce.Do(func() {
r.sink.record(r.buildAttemptLocked(closeErr))
Comment thread
ThomasK33 marked this conversation as resolved.
})
}
r.mu.Unlock()
return closeErr
}

Expand Down Expand Up @@ -334,6 +371,12 @@ func isNDJSONContentType(contentType string) bool {
return parseMediaType(contentType) == "application/x-ndjson"
}

// isSSEContentType reports whether contentType is a
// server-sent-events stream.
func isSSEContentType(contentType string) bool {
return parseMediaType(contentType) == "text/event-stream"
}

// maxDrainBytes caps how many trailing bytes drainToEOF will consume.
// This prevents Close() from blocking indefinitely on a misbehaving
// or extremely large chunked body.
Expand Down Expand Up @@ -384,44 +427,75 @@ func isCompleteUnknownLengthJSONBody(contentType string, body []byte) bool {
return errors.Is(decoder.Decode(&extra), io.EOF)
}

// buildAttemptLocked materializes the final Attempt from the current
// buffered response data plus err. Callers use this from both the
// record-once append path and the provisional-upgrade replace path so
// both sites apply the same redaction and status rules. The caller
// must hold r.mu for the duration of the call.
func (r *recordingBody) buildAttemptLocked(err error) Attempt {
finishedAt := time.Now()

truncated := r.truncated
responseBody := append([]byte(nil), r.buf.Bytes()...)
base := r.base
startedAt := r.startedAt

contentType := r.contentType
switch {
case truncated:
base.ResponseBody = []byte("[TRUNCATED]")
case isNDJSONContentType(contentType):
base.ResponseBody = RedactNDJSONSecrets(responseBody)
case contentType == "" || isJSONLikeContentType(contentType):
// Redact JSON secrets when the content type is JSON-like
// or absent (unknown). For unknown types, RedactJSONSecrets
// fails closed by replacing non-JSON payloads with a
// diagnostic message.
base.ResponseBody = RedactJSONSecrets(responseBody)
default:
// Non-JSON content types (SSE, text/plain, HTML, etc.)
// are preserved as-is to avoid losing debug content.
base.ResponseBody = responseBody
}
base.StartedAt = startedAt.UTC().Format(time.RFC3339Nano)
base.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano)
// Recompute duration to include body read time.
base.DurationMs = finishedAt.Sub(startedAt).Milliseconds()
if err != nil && !errors.Is(err, io.EOF) {
base.Error = sanitizeErrorString(err.Error())
base.Status = attemptStatusFailed
} else {
base.Status = attemptStatusCompleted
}
return base
}

// record acquires r.mu before entering recordOnce.Do so it shares a
// single lock-acquisition order with recordProvisional. Without this,
// a concurrent Read (in recordProvisional, holding r.mu) and Close (in
// record, about to take r.mu inside the Do callback) would deadlock:
// the Do winner would block on r.mu while the loser would block on
// recordOnce. Callers must not hold r.mu.
func (r *recordingBody) record(err error) {
Comment thread
ThomasK33 marked this conversation as resolved.
r.mu.Lock()
defer r.mu.Unlock()
r.recordOnce.Do(func() {
finishedAt := time.Now()

r.mu.Lock()
truncated := r.truncated
responseBody := append([]byte(nil), r.buf.Bytes()...)
base := r.base
startedAt := r.startedAt
r.mu.Unlock()
r.sink.record(r.buildAttemptLocked(err))
})
}

contentType := r.contentType
switch {
case truncated:
base.ResponseBody = []byte("[TRUNCATED]")
case isNDJSONContentType(contentType):
base.ResponseBody = RedactNDJSONSecrets(responseBody)
case contentType == "" || isJSONLikeContentType(contentType):
// Redact JSON secrets when the content type is JSON-like
// or absent (unknown). For unknown types, RedactJSONSecrets
// fails closed by replacing non-JSON payloads with a
// diagnostic message.
base.ResponseBody = RedactJSONSecrets(responseBody)
default:
// Non-JSON content types (SSE, text/plain, HTML, etc.)
// are preserved as-is to avoid losing debug content.
base.ResponseBody = responseBody
}
base.StartedAt = startedAt.UTC().Format(time.RFC3339Nano)
base.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano)
// Recompute duration to include body read time.
base.DurationMs = finishedAt.Sub(startedAt).Milliseconds()
if err != nil && !errors.Is(err, io.EOF) {
base.Error = sanitizeErrorString(err.Error())
base.Status = attemptStatusFailed
} else {
base.Status = attemptStatusCompleted
}
r.sink.record(base)
// recordProvisional records err via recordOnce and marks the entry as
// eligible for a later upgrade from Close(). Safe to call multiple
// times; only the first call appends. The publish and the provisional
// flag are committed atomically under r.mu so a concurrent Close()
// that takes r.mu to inspect the flag cannot observe a half-finished
// state where the attempt is in the sink but recordedProvisional is
// still false.
func (r *recordingBody) recordProvisional(err error) {
Comment thread
ThomasK33 marked this conversation as resolved.
r.mu.Lock()
defer r.mu.Unlock()
r.recordOnce.Do(func() {
r.sink.record(r.buildAttemptLocked(err))
r.recordedProvisional = true
})
}
Loading
Loading