Skip to content

Commit e8508b2

Browse files
authored
fix: recover chatd from poisoned chain anchor on retry (coder#25097)
When OpenAI's Responses API returns `Previous response with id ... not found` for a chained turn, classify it as a `ChainBroken` retry, clear `previous_response_id`, exit chain mode, reload full history, and let `chatretry` retry. Self-heals chats whose anchor was poisoned before coder#25074 stopped truncated streams from being persisted as a successful turn with a stored response id. The new state is exposed via the existing `coderd_chatd_stream_retries_total` counter as a `chain_broken="true"|"false"` label. Aggregating queries (`sum`, `rate` over `provider`/`model`/`kind`) keep working without changes; raw-series matchers without aggregation will now see two series per `(provider, model, kind)` where they previously saw one. The metric is internal-only so the blast radius should be small, but if you have dashboards that index by exact label matchers without aggregation they will need an extra `sum` or an explicit `chain_broken` selector. > 🤖 This PR was created with the help of Coder Agents, and was reviewed by a human 🧑‍💻
1 parent c2dfaa4 commit e8508b2

8 files changed

Lines changed: 814 additions & 43 deletions

File tree

coderd/x/chatd/chaterror/classify.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ type ClassifiedError struct {
2222
// RetryAfter is a normalized minimum retry delay derived from
2323
// provider response metadata when available.
2424
RetryAfter time.Duration
25+
26+
// ChainBroken is true when the provider reported that the
27+
// previous_response_id (or analogous chain anchor) is no longer
28+
// retrievable. The chatloop retry path uses this signal to exit
29+
// chain mode and replay full history before the next attempt.
30+
// This is an internal signal; it is not surfaced as a separate
31+
// codersdk.ChatErrorKind so the user-visible kind set stays
32+
// stable.
33+
ChainBroken bool
2534
}
2635

2736
const responsesAPIDiagnosticMessage = "The chat continuation failed due to an " +
@@ -165,6 +174,20 @@ func Classify(err error) ClassifiedError {
165174
return classified
166175
}
167176

177+
// Chain-broken detection runs before the generic rule table so a
178+
// 404 carrying a chain anchor failure is not classified as a
179+
// generic non-retryable error. The chatloop retry callback uses
180+
// the ChainBroken flag to exit chain mode and replay full
181+
// history.
182+
if classified, ok := chainBrokenClassification(
183+
lower,
184+
provider,
185+
statusCode,
186+
structured,
187+
); ok {
188+
return classified
189+
}
190+
168191
deadline := errors.Is(err, context.DeadlineExceeded) || strings.Contains(lower, "context deadline exceeded")
169192
overloadedMatch := statusCode == 529 || containsAny(lower, overloadedPatterns...)
170193
authStrong := statusCode == 401 || containsAny(lower, authStrongPatterns...)
@@ -276,6 +299,35 @@ func streamIncompleteMessage(provider string) string {
276299
return providerSubject(provider) + " stream closed unexpectedly before the response completed."
277300
}
278301

302+
// chainBrokenClassification recognizes the OpenAI error
303+
// "Previous response with id ... not found" returned when a
304+
// chained turn references a previous_response_id the provider no
305+
// longer recognizes.
306+
func chainBrokenClassification(
307+
lowerMessage string,
308+
provider string,
309+
statusCode int,
310+
structured providerErrorDetails,
311+
) (ClassifiedError, bool) {
312+
if !(strings.Contains(lowerMessage, "previous response with id") &&
313+
strings.Contains(lowerMessage, "not found")) {
314+
return ClassifiedError{}, false
315+
}
316+
// This class of error has so far only been observed with OpenAI.
317+
if provider == "" {
318+
provider = "openai"
319+
}
320+
return normalizeClassification(ClassifiedError{
321+
Detail: structured.detail,
322+
Kind: codersdk.ChatErrorKindGeneric,
323+
Provider: provider,
324+
Retryable: true,
325+
StatusCode: statusCode,
326+
RetryAfter: structured.retryAfter,
327+
ChainBroken: true,
328+
}), true
329+
}
330+
279331
func responsesAPIDiagnostic(lowerMessage, detail string) (string, bool) {
280332
lowerDetail := strings.ToLower(detail)
281333
for _, match := range responsesAPIDiagnosticMatches {

coderd/x/chatd/chaterror/classify_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,123 @@ func TestClassify_TruncatesProviderDetail(t *testing.T) {
746746
require.True(t, strings.HasSuffix(classified.Detail, "…"))
747747
}
748748

749+
func TestClassify_ChainBroken(t *testing.T) {
750+
t.Parallel()
751+
752+
tests := []struct {
753+
name string
754+
err error
755+
wantChainBroken bool
756+
wantRetryable bool
757+
wantProvider string
758+
wantStatusCode int
759+
}{
760+
{
761+
name: "OpenAIPreviousResponseNotFoundBareString",
762+
err: xerrors.New(
763+
"Previous response with id 'resp_abc' not found.",
764+
),
765+
wantChainBroken: true,
766+
wantRetryable: true,
767+
wantProvider: "openai",
768+
wantStatusCode: 0,
769+
},
770+
{
771+
name: "OpenAIPreviousResponseNotFoundProviderError",
772+
err: testProviderError(
773+
"Previous response with id 'resp_096c70c5bb8d52bc0069fa11e0630c81a3ba210cddfa75bae9' not found.",
774+
404,
775+
nil,
776+
),
777+
wantChainBroken: true,
778+
wantRetryable: true,
779+
wantProvider: "openai",
780+
wantStatusCode: 404,
781+
},
782+
{
783+
name: "OpenAIPreviousResponseCaseInsensitive",
784+
err: testProviderError(
785+
"PREVIOUS RESPONSE WITH ID 'resp_abc' NOT FOUND.",
786+
404,
787+
nil,
788+
),
789+
wantChainBroken: true,
790+
wantRetryable: true,
791+
wantProvider: "openai",
792+
wantStatusCode: 404,
793+
},
794+
{
795+
name: "PreviousResponseWithoutNotFoundIsNotChainBroken",
796+
err: testProviderError(
797+
"Previous response with id 'resp_abc' is invalid.",
798+
400,
799+
nil,
800+
),
801+
wantChainBroken: false,
802+
},
803+
{
804+
name: "UnrelatedNotFoundIsNotChainBroken",
805+
err: testProviderError(
806+
"resource not found",
807+
404,
808+
nil,
809+
),
810+
wantChainBroken: false,
811+
},
812+
{
813+
name: "UnrelatedInvalidRequestIsNotChainBroken",
814+
err: testProviderError(
815+
"",
816+
400,
817+
nil,
818+
testProviderResponseDump(`{"error":{"type":"invalid_request_error","message":"Image exceeds 5 MB maximum."}}`),
819+
),
820+
wantChainBroken: false,
821+
},
822+
}
823+
824+
for _, tt := range tests {
825+
t.Run(tt.name, func(t *testing.T) {
826+
t.Parallel()
827+
828+
classified := chaterror.Classify(tt.err)
829+
require.Equal(t, tt.wantChainBroken, classified.ChainBroken,
830+
"chain broken flag mismatch")
831+
if !tt.wantChainBroken {
832+
return
833+
}
834+
require.Equal(t, tt.wantRetryable, classified.Retryable,
835+
"chain-broken errors must be retryable so the loop"+
836+
" can self-heal")
837+
require.Equal(t, tt.wantProvider, classified.Provider)
838+
require.Equal(t, tt.wantStatusCode, classified.StatusCode)
839+
require.Equal(t, codersdk.ChatErrorKindGeneric, classified.Kind,
840+
"chain-broken keeps the user-visible kind unchanged"+
841+
" so we don't add a new codersdk surface")
842+
})
843+
}
844+
}
845+
846+
func TestClassify_ChainBrokenSurvivesWithClassification(t *testing.T) {
847+
t.Parallel()
848+
849+
original := chaterror.Classify(testProviderError(
850+
"Previous response with id 'resp_abc' not found.",
851+
404,
852+
nil,
853+
))
854+
require.True(t, original.ChainBroken)
855+
856+
wrapped := chaterror.WithClassification(
857+
xerrors.New("transport blew up"),
858+
original,
859+
)
860+
round := chaterror.Classify(wrapped)
861+
require.True(t, round.ChainBroken,
862+
"WithClassification round-trips ChainBroken so the retry path"+
863+
" can detect it after re-classification")
864+
}
865+
749866
func testProviderError(
750867
message string,
751868
statusCode int,

coderd/x/chatd/chatloop/chatloop.go

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,14 @@ type RunOptions struct {
161161
Compaction *CompactionOptions
162162
ReloadMessages func(context.Context) ([]fantasy.Message, error)
163163
DisableChainMode func()
164-
// PrepareMessages is called before each LLM step with the
165-
// current message history. If it returns non-nil, the returned
166-
// slice replaces messages for this and all subsequent steps.
164+
// PrepareMessages is called at least once before each LLM step
165+
// with the current message history. If it returns non-nil, the
166+
// returned slice replaces messages for this and all subsequent
167+
// steps.
167168
// Used to inject system context that becomes available mid-loop
168169
// (e.g. AGENTS.md after create_workspace).
170+
// NOTE: It may be called more than once per step in case of a
171+
// retry, so callbacks should avoid duplicating messages.
169172
PrepareMessages func([]fantasy.Message) []fantasy.Message
170173

171174
// OnRetry is called before each retry attempt when the LLM
@@ -349,7 +352,6 @@ func Run(ctx context.Context, opts RunOptions) error {
349352
}
350353

351354
tools := buildToolDefinitions(opts.Tools, opts.ActiveTools, opts.ProviderTools)
352-
applyAnthropicCaching := shouldApplyAnthropicPromptCaching(opts.Model)
353355

354356
messages := opts.Messages
355357
var lastUsage fantasy.Usage
@@ -390,30 +392,10 @@ func Run(ctx context.Context, opts RunOptions) error {
390392
modelName := opts.Model.Model()
391393
opts.Metrics.StepsTotal.WithLabelValues(provider, modelName).Inc()
392394
stepStart := time.Now()
393-
// Copy messages so that provider-specific caching
394-
// mutations don't leak back to the caller's slice.
395-
// copy copies Message structs by value, so field
396-
// reassignments in addAnthropicPromptCaching only
397-
// affect the prepared slice.
398-
if opts.PrepareMessages != nil {
399-
if updated := opts.PrepareMessages(messages); updated != nil {
400-
messages = updated
401-
}
402-
}
403-
prepared := make([]fantasy.Message, len(messages))
404-
copy(prepared, messages)
405-
prepared, sanitizeStats := chatsanitize.SanitizeAnthropicProviderToolHistory(provider, prepared)
406-
chatsanitize.LogAnthropicProviderToolSanitization(
407-
ctx, opts.Logger, "pre_request", provider, modelName, sanitizeStats,
408-
slog.F("step_index", step),
409-
slog.F("total_steps", totalSteps),
410-
)
411-
prepared = chatsanitize.ApplyAnthropicProviderToolGuard(
412-
ctx, opts.Logger, provider, modelName, prepared,
395+
var prepared []fantasy.Message
396+
messages, prepared = prepareMessagesForRequest(
397+
ctx, opts, messages, provider, modelName, step, totalSteps,
413398
)
414-
if applyAnthropicCaching {
415-
addAnthropicPromptCaching(prepared)
416-
}
417399
opts.Metrics.MessageCount.WithLabelValues(provider, modelName).Observe(float64(len(prepared)))
418400
opts.Metrics.PromptSizeBytes.WithLabelValues(provider, modelName).Observe(float64(EstimatePromptSize(prepared)))
419401

@@ -469,6 +451,33 @@ func Run(ctx context.Context, opts RunOptions) error {
469451
// classified payload handed to OnRetry.
470452
classified = classified.WithProvider(provider)
471453
opts.Metrics.RecordStreamRetry(provider, modelName, classified)
454+
if classified.ChainBroken {
455+
if chatopenai.HasPreviousResponseID(opts.ProviderOptions) {
456+
opts.ProviderOptions = chatopenai.ClearPreviousResponseID(opts.ProviderOptions)
457+
}
458+
if chatopenai.HasPreviousResponseID(call.ProviderOptions) {
459+
call.ProviderOptions = chatopenai.ClearPreviousResponseID(call.ProviderOptions)
460+
}
461+
if opts.DisableChainMode != nil {
462+
opts.DisableChainMode()
463+
}
464+
if opts.ReloadMessages != nil {
465+
reloaded, err := opts.ReloadMessages(ctx)
466+
if err != nil {
467+
opts.Logger.Warn(ctx,
468+
"chain-broken recovery: reload messages failed",
469+
slog.Error(err),
470+
)
471+
} else {
472+
// Reloaded history replaces the prompt prepared before
473+
// the failed attempt, so run the same preparation
474+
// pipeline used by normal provider requests.
475+
messages, call.Prompt = prepareMessagesForRequest(
476+
ctx, opts, reloaded, provider, modelName, step, totalSteps,
477+
)
478+
}
479+
}
480+
}
472481
if opts.OnRetry != nil {
473482
opts.OnRetry(attempt, retryErr, classified, delay)
474483
}
@@ -656,6 +665,43 @@ func Run(ctx context.Context, opts RunOptions) error {
656665
return nil
657666
}
658667

668+
// prepareMessagesForRequest applies the prompt preparation pipeline used
669+
// immediately before sending messages to a provider. It returns the
670+
// possibly updated canonical messages and an independent provider-ready
671+
// prompt.
672+
func prepareMessagesForRequest(
673+
ctx context.Context,
674+
opts RunOptions,
675+
messages []fantasy.Message,
676+
provider string,
677+
modelName string,
678+
step int,
679+
totalSteps int,
680+
) (canonical []fantasy.Message, prompt []fantasy.Message) {
681+
canonical = messages
682+
if opts.PrepareMessages != nil {
683+
if updated := opts.PrepareMessages(canonical); updated != nil {
684+
canonical = updated
685+
}
686+
}
687+
// Copy messages so provider-specific caching mutations don't leak
688+
// back to the canonical message slice.
689+
prompt = slices.Clone(canonical)
690+
prompt, sanitizeStats := chatsanitize.SanitizeAnthropicProviderToolHistory(provider, prompt)
691+
chatsanitize.LogAnthropicProviderToolSanitization(
692+
ctx, opts.Logger, "pre_request", provider, modelName, sanitizeStats,
693+
slog.F("step_index", step),
694+
slog.F("total_steps", totalSteps),
695+
)
696+
prompt = chatsanitize.ApplyAnthropicProviderToolGuard(
697+
ctx, opts.Logger, provider, modelName, prompt,
698+
)
699+
if shouldApplyAnthropicPromptCaching(opts.Model) {
700+
addAnthropicPromptCaching(prompt)
701+
}
702+
return canonical, prompt
703+
}
704+
659705
// guardedAttempt owns an attempt-scoped context and startup guard
660706
// around a provider stream. release is idempotent and frees the
661707
// attempt-scoped timer/context. finish canonicalizes startup timeout

0 commit comments

Comments
 (0)