Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions aibridge/intercept/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ func (s *EventStream) Shutdown(shutdownCtx context.Context) error {

// IsStreaming checks if the stream has been initiated, or
// when events are buffered which - when processed - will initiate the stream.
//
// Note: there is a known race between the channel pop in Start and the
// subsequent InitiateStream call where this can briefly return false for
// a stream that's about to begin. Callers that use this to choose between
// JSON and SSE response formats can produce a malformed response under
// that race. Accepted until the MCP Gateway migration results in AI
// Gateway behaving like a reverse proxy, removing the inner agentic loop
// code. See https://github.com/coder/aibridge/issues/223 and
// https://github.com/coder/internal/issues/1524.
func (s *EventStream) IsStreaming() bool {
return s.initiated.Load() || len(s.eventsCh) > 0
}
Expand Down
4 changes: 4 additions & 0 deletions aibridge/intercept/messages/streaming_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,10 @@ func TestStreamingInterception_AgenticLoopFailover(t *testing.T) {
// keys 429 during the agentic continuation.
// Then: 3 requests, error injected as SSE event, both
// keys temporary.
//
// Known flake: race in eventstream.IsStreaming() can
// produce a malformed response on the all-keys-exhausted
// path. See https://github.com/coder/internal/issues/1524.
name: "agentic_all_keys_fail",
responses: []upstreamResponse{
{statusCode: http.StatusOK, headers: sseHeaders, body: toolUseStreamBody},
Expand Down
Loading