diff --git a/coderd/x/chatd/runner.go b/coderd/x/chatd/runner.go index 38edc7249d84e..abb94ec71325c 100644 --- a/coderd/x/chatd/runner.go +++ b/coderd/x/chatd/runner.go @@ -4,10 +4,12 @@ import ( "context" "errors" "sync" + "sync/atomic" "github.com/google/uuid" "golang.org/x/xerrors" + "cdr.dev/slog/v3" "github.com/coder/coder/v2/coderd/database" coderdpubsub "github.com/coder/coder/v2/coderd/pubsub" ) @@ -47,6 +49,8 @@ type runner struct { rec *runnerRecord opts chatWorkerOptions + runStarted atomic.Bool + lastSnapshotVersion int64 hasAcceptedState bool latestState runnerStateUpdate @@ -73,6 +77,9 @@ func newRunner(ctx context.Context, mgr *runnerManager, rec *runnerRecord, opts } func (r *runner) run() { + if !r.beginRun() { + return + } if !r.bootstrap() { return } @@ -89,6 +96,18 @@ func (r *runner) run() { } } +func (r *runner) beginRun() bool { + if r.runStarted.CompareAndSwap(false, true) { + return true + } + r.opts.Logger.Error(r.ctx, "chatworker runner run called more than once", + slog.F("chat_id", r.rec.key.ChatID), + slog.F("runner_id", r.rec.key.RunnerID), + ) + r.mgr.requestCleanup(r.ctx, r.rec.key) + return false +} + func (r *runner) bootstrap() bool { // Pubsub can deliver chat:update messages that were already queued by // Postgres before this runner subscribed. Hold those hints until the diff --git a/coderd/x/chatd/runner_internal_test.go b/coderd/x/chatd/runner_internal_test.go new file mode 100644 index 0000000000000..7572ab2f37758 --- /dev/null +++ b/coderd/x/chatd/runner_internal_test.go @@ -0,0 +1,134 @@ +package chatd + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "golang.org/x/xerrors" + + "github.com/coder/coder/v2/coderd/database" + "github.com/coder/coder/v2/testutil" +) + +func TestRunner_BeginRunTreatsMultipleCallsAsError(t *testing.T) { + t.Parallel() + + sink := testutil.NewFakeSink(t) + managerCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + mgr := &runnerManager{ + ctx: managerCtx, + cleanupReqCh: make(chan runnerKey, 1), + } + key := runnerKey{ChatID: uuid.New(), RunnerID: uuid.New()} + r := &runner{ + ctx: context.Background(), + mgr: mgr, + rec: &runnerRecord{key: key}, + opts: chatWorkerOptions{ + Logger: sink.Logger(), + }, + } + + require.True(t, r.beginRun()) + require.False(t, r.beginRun()) + + select { + case got := <-mgr.cleanupReqCh: + require.Equal(t, key, got) + case <-time.After(testutil.WaitLong): + t.Fatal("timed out waiting for cleanup request") + } + require.Len(t, entriesWithMessage(sink, "chatworker runner run called more than once"), 1) +} + +func TestRunnerManager_RouteStateHintPreservesBufferedHintsWhenFull(t *testing.T) { + t.Parallel() + + managerCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + chatID := uuid.New() + runnerID := uuid.New() + first := runnerStateUpdate{ + ChatID: chatID, + SnapshotVersion: 1, + HistoryVersion: 1, + Status: database.ChatStatusRunning, + } + latest := runnerStateUpdate{ + ChatID: chatID, + SnapshotVersion: 2, + HistoryVersion: 2, + Status: database.ChatStatusRunning, + } + done := make(chan struct{}) + stateCh := make(chan runnerStateUpdate, 1) + stateCh <- first + mgr := &runnerManager{ + ctx: managerCtx, + runnersByChat: map[uuid.UUID]map[uuid.UUID]*runnerRecord{ + chatID: { + runnerID: {done: done, stateCh: stateCh}, + }, + }, + } + + mgr.RouteStateHint(context.Background(), latest) + + select { + case got := <-stateCh: + require.Equal(t, first, got) + default: + t.Fatal("expected buffered state hint to remain queued") + } + select { + case got := <-stateCh: + t.Fatalf("unexpected state hint queued after full channel: %#v", got) + default: + } +} + +func TestRunnerManager_RegisterCleanupWaiterDoesNotLockManagerOnShutdown(t *testing.T) { + t.Parallel() + + managerCtx, cancel := context.WithCancel(context.Background()) + mgr := &runnerManager{ctx: managerCtx, cleanupDoneCh: make(chan runnerKey, 1)} + done := make(chan struct{}) + rec := &runnerRecord{done: done} + + mgr.mu.Lock() + defer mgr.mu.Unlock() + cancel() + mgr.registerCleanupWaiter(runnerKey{ChatID: uuid.New(), RunnerID: uuid.New()}, rec) + close(done) + + waited := make(chan struct{}) + go func() { + mgr.wait() + close(waited) + }() + select { + case <-waited: + case <-time.After(testutil.WaitLong): + t.Fatal("cleanup waiter blocked on manager lock during shutdown") + } +} + +func TestRunnerManager_ShouldLogLoopError(t *testing.T) { + t.Parallel() + + errBoom := xerrors.New("boom") + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + require.False(t, shouldLogRunnerManagerLoopError(context.Background(), nil)) + require.False(t, shouldLogRunnerManagerLoopError(ctx, errBoom)) + require.False(t, shouldLogRunnerManagerLoopError(context.Background(), context.Canceled)) + require.False(t, shouldLogRunnerManagerLoopError(context.Background(), xerrors.Errorf("wrapped: %w", context.Canceled))) + require.True(t, shouldLogRunnerManagerLoopError(context.Background(), errBoom)) + require.True(t, shouldLogRunnerManagerLoopError(context.Background(), errors.Join(errBoom, context.DeadlineExceeded))) +} diff --git a/coderd/x/chatd/runner_manager.go b/coderd/x/chatd/runner_manager.go index dc9737c8d6d43..f30429533b839 100644 --- a/coderd/x/chatd/runner_manager.go +++ b/coderd/x/chatd/runner_manager.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "sync" "time" @@ -178,7 +179,15 @@ func (m *runnerManager) RouteStateHint(ctx context.Context, state runnerStateUpd case <-m.ctx.Done(): return default: - // stateCh is full; drop the hint for this runner. + // Each hint carries a snapshot version, and runners discard stale + // snapshots after reading them. If this channel is full, preserve + // the buffered older hints instead of evicting the head. Dropping + // the newest hint can delay processing, but it does not lose + // correctness because pubsub or database sync will provide a later + // snapshot, and runner-owned tasks verify durable state before + // committing. The default buffer of 64 absorbs ordinary bursts. A + // full channel means the runner is already behind, so blocking here + // would spread backpressure to pubsub and worker maintenance loops. } } } @@ -289,18 +298,12 @@ func (m *runnerManager) handleCleanupRequest(key runnerKey) { func (m *runnerManager) registerCleanupWaiter(key runnerKey, rec *runnerRecord) { m.wg.Go(func() { <-rec.done - if m.ctx.Err() != nil { - m.mu.Lock() - delete(m.cleaning, key) - m.mu.Unlock() - return - } select { case m.cleanupDoneCh <- key: case <-m.ctx.Done(): - m.mu.Lock() - delete(m.cleaning, key) - m.mu.Unlock() + // During shutdown the manager is being discarded. Avoid touching + // m.cleaning here so callers waiting on m.wg do not depend on this + // goroutine acquiring m.mu before it can exit. } }) } @@ -409,7 +412,7 @@ func (m *runnerManager) databaseSyncLoop() { for { select { case <-ticker.C: - if err := m.syncOnce(m.ctx); err != nil { + if err := m.syncOnce(m.ctx); shouldLogRunnerManagerLoopError(m.ctx, err) { m.opts.Logger.Warn(m.ctx, "chatworker runner sync failed", slogError(err)) } case <-m.ctx.Done(): @@ -454,7 +457,7 @@ func (m *runnerManager) heartbeatLoop() { for { select { case <-ticker.C: - if err := m.heartbeatOnce(m.ctx); err != nil { + if err := m.heartbeatOnce(m.ctx); shouldLogRunnerManagerLoopError(m.ctx, err) { m.opts.Logger.Warn(m.ctx, "chatworker heartbeat failed", slogError(err)) } case <-m.ctx.Done(): @@ -500,6 +503,16 @@ func (m *runnerManager) heartbeatCleanupOnce(ctx context.Context) error { return err } +func shouldLogRunnerManagerLoopError(ctx context.Context, err error) bool { + if err == nil { + return false + } + if ctx.Err() != nil { + return false + } + return !errors.Is(err, context.Canceled) +} + func stateUpdateFromChat(chat database.Chat) runnerStateUpdate { var workerID *uuid.UUID if chat.WorkerID.Valid { diff --git a/deferred-review-threads-pr26109.md b/deferred-review-threads-pr26109.md new file mode 100644 index 0000000000000..828993b0d9813 --- /dev/null +++ b/deferred-review-threads-pr26109.md @@ -0,0 +1,154 @@ +# Deferred Review Threads from PR #26109 + + + +Source: https://github.com/coder/coder/pull/26109 + +This file lists every review comment thread that hugodutka marked as deferred. It serves as a template for a plan to address these issues. Each entry describes what the thread was about, with a link to the original discussion. + +## Threads + +### coderd/database/dbpurge/dbpurge.go + +1. [DONE] **Stale comment removal** (line 61) https://github.com/coder/coder/pull/26109#discussion_r3379072397 mafredri flagged an LLM-generated comment/reasoning that is no longer relevant and should be removed if unused. + +2. [DONE] **Error reuse across long distance** (line 258) https://github.com/coder/coder/pull/26109#discussion_r3379093655 The code reuses an `err` variable far from where it was set, which is unusual and clever in a bad way. A boolean flag would be clearer than reusing the error value. + +3. [SKIPPED] **InTx assertion support in database/Store** (line 340) https://github.com/coder/coder/pull/26109#discussion_r3379104920 Side note/future work: add support to the database Store to assert that code is running inside a transaction (InTx) and error out if not. + +### coderd/x/chatd/chatdebug/service.go + +4. [DONE] **Undocumented silent failure** (line 543) https://github.com/coder/coder/pull/26109#discussion_r3379151284 Behavior needs to either be documented on the method or return an error. Same applies to `TouchRun`. + +5. [DONE] **Rename to maybeResetTicker** (line 551) https://github.com/coder/coder/pull/26109#discussion_r3379164243 Naming suggestion: rename the function to `maybeResetTicker`. + +### coderd/x/chatd/messagepartbuffer/message_part_buffer.go + +6. **Extract getEpisode helper** (line 192) https://github.com/coder/coder/pull/26109#discussion_r3379915812 Extract a helper method (e.g. `getEpisode`) that handles the not-found check, instead of repeating it. + +7. **Package needs much more documentation** (file-level) https://github.com/coder/coder/pull/26109#discussion_r3379988015 The package is hard to review because intent is not documented. Add package and method documentation explaining why the code does what it does. + +8. **Extract episode finalization helper** (line 300) https://github.com/coder/coder/pull/26109#discussion_r3380010948 Repeated logic in several places could be extracted into a method like `episode.markCreated()` or `episode.finalize()`. + +9. **Document channel buffering decisions** (line 262) https://github.com/coder/coder/pull/26109#discussion_r3380029684 Add comments explaining why a channel is buffered, and why the unbuffered channels are safe to be unbuffered. + +### coderd/x/chatd/messagepartbuffer/message_part_buffer_test.go + +10. [DONE] **Add goleak (HIGH PRIORITY)** (file-level) https://github.com/coder/coder/pull/26109#discussion_r3380039964 mafredri suspects goleak would surface straggling goroutines since teardown appears to be "eventual". hugodutka deferred with high priority: add goleak to all packages affected by the refactor, and consider whether code paths should wait for teardown. + +### coderd/x/chatd/auto_archive.go + +11. **Archival loop ticker behavior** (line 42) https://github.com/coder/coder/pull/26109#discussion_r3380197310 A slow `archiveOnce` or congested DB could create a constant archival loop. Suggested restructure: archiveOnce -> createTicker -> select -> archiveOnce + ticker.Reset(interval). + +12. **Document the UTC 00:00 cutoff choice** (line 68) https://github.com/coder/coder/pull/26109#discussion_r3380219922 Add docs explaining why UTC midnight minus N days was selected as the archival cutoff. + +13. **Postgres trigger for updated_at** (line 85) https://github.com/coder/coder/pull/26109#discussion_r3380232630 Concern that an archived chat could be wiped by dbpurge almost immediately because `updated_at` reflected last chat activity. hugodutka confirmed every chat state transition (including archival) already bumps `updated_at` via `UpdateChatExecutionState`, so nothing is broken, but a Postgres trigger that auto-bumps `updated_at` would be more robust. Deferred. + +### coderd/x/chatd/chatd.go + +14. **Use atomic value** (line 2429) https://github.com/coder/coder/pull/26109#discussion_r3380251082 An atomic value seems more appropriate than the current approach. + +15. **Make newChatWorker/withDefaults dependencies explicit** (line 3591) https://github.com/coder/coder/pull/26109#discussion_r3380270527 Make the implicit dependencies explicit (e.g. `withDefaults(db, ...)`) so the runtime panic for missing options is unnecessary. + +16. **Workspace context gathering refactor** (line 5042) https://github.com/coder/coder/pull/26109#discussion_r3386806941 A comment warns about easy misuse of `appendRootChatTools` regarding workspace context. Suggested renaming it to something like `appendRootChatToolsWithoutWorkspaceContext` or guarding against the mistake in the implementation. hugodutka: workspace context gathering in general should be refactored. + +### coderd/x/chatd/generation.go + +17. **Runtime checks for required options** (line 336) https://github.com/coder/coder/pull/26109#discussion_r3380311853 Question whether all the runtime checks for required options are needed; make dependencies explicit instead. + +18. **Too many juggled variables** (line 370) https://github.com/coder/coder/pull/26109#discussion_r3387161874 Too many variables in scope (`locked`, etc.) remain referenceable after they stop being useful. Prefer a single name like `chat` and override when appropriate. + +19. **Error handling structure prevents misuse** (line 482) https://github.com/coder/coder/pull/26109#discussion_r3387191468 Suggestion to restructure error handling so all error cases are handled in one `if err != nil` block (sql.ErrNoRows -> errTaskExpectedExit, else wrap), preferring structures that prevent misuse. + +20. **Unnamed return signature hard to decipher** (line 817) https://github.com/coder/coder/pull/26109#discussion_r3387251382 A function returns many unnamed values. Use named returns at minimum, or return a struct. + +21. **Unify fence verification query** (line 828) https://github.com/coder/coder/pull/26109#discussion_r3387288234 All call sites of fence verification require the running state and history; this could be unified via something like `tx.GetChatForTask`. + +22. **Machine update failure does not record outcome** (line 1049) https://github.com/coder/coder/pull/26109#discussion_r3387544273 If the machine update fails, no outcome is recorded, possibly leaving untracked work in chatdebug. hugodutka: chatdebug should be removed. + +### coderd/x/chatd/generation_preparer.go + +23. **Magic value should be a documented const** (line 100) https://github.com/coder/coder/pull/26109#discussion_r3387640179 + +24. **Reuse earlier err variable** (line 124) https://github.com/coder/coder/pull/26109#discussion_r3387645148 Nit: could just use the `err` defined earlier. + +25. **Dangerous cleanup pattern** (line 162) https://github.com/coder/coder/pull/26109#discussion_r3387675842 The function is dangerous to edit. Suggested a named `err` return coupled with a `defer func() { if err != nil { cleanup() } }()` so cleanup happens on any error return. + +### coderd/x/chatd/runner.go + +26. [DONE] **Multiple calls should be an error state** (line 77) https://github.com/coder/coder/pull/26109#discussion_r3387720344 Allowing multiple calls feels like it should be an error state instead. + +### coderd/x/chatd/runner_manager.go + +27. [DONE] **Noise when ctx cancelled** (line 413) https://github.com/coder/coder/pull/26109#discussion_r3380581402 Skip logging this error when the context is cancelled. + +28. [DONE] **Potential wg.Wait/mu.Lock deadlock (concurrency)** (line 301) https://github.com/coder/coder/pull/26109#discussion_r3380592305 Caution about potential deadlocks between `m.wg.Wait` and `m.mu.Lock`. hugodutka: ensure this is corrected from a concurrency perspective. + +29. [DONE] **Skip logging context canceled errors** (line 458) https://github.com/coder/coder/pull/26109#discussion_r3387355724 Same as thread 27, applied to another log site. + +30. [DONE] **Document stateCh buffering semantics** (line 180) https://github.com/coder/coder/pull/26109#discussion_r3387788957 A target whose stateCh is full gets no state update and must process all previous states. Add comments explaining why this is fine: why a gap at the tail is preferred over the head, expectations around the default 64 buffer size, etc. + +### coderd/x/chatd/testhooks.go + +31. **Hard-coded timeout** (line 19) https://github.com/coder/coder/pull/26109#discussion_r3382365135 Accept a `context.Context` parameter instead of a hard-coded timeout. + +### coderd/x/chatd/tasks.go and tasks_test.go + +32. **Extract side effects to an interface** (tasks.go line 124) https://github.com/coder/coder/pull/26109#discussion_r3382554277 Extracting side-effecting dependencies to an interface would make the seam clearer and easier to mock or spy on in tests. + +33. **taskStarter test spy / gomock** (tasks_test.go line 1040) https://github.com/coder/coder/pull/26109#discussion_r3382564035 Related to thread 32: `taskStarter` has many side-effecting dependencies. An interface would allow using gomock for assertions. + +34. **Required options as newTaskStarter args** (tasks.go line 141) https://github.com/coder/coder/pull/26109#discussion_r3387867697 Required options should be explicit arguments of `newTaskStarter`. + +35. **State invariant should be enforced by the machine** (tasks.go line 434) https://github.com/coder/coder/pull/26109#discussion_r3387919402 An invariant is verified in each `Update` call; it should instead be enforced by the state machine itself. + +### coderd/x/chatd/worker.go + +36. **Rename ctx to parentCtx** (line 49) https://github.com/coder/coder/pull/26109#discussion_r3387954876 Rename the outer ctx to `parentCtx` and use `ctx` inline to avoid bugs where the wrong context is referenced. + +37. **Magic number** (line 120) https://github.com/coder/coder/pull/26109#discussion_r3387973304 Replace magic number with a documented const. + +### coderd/x/chatd/quickgen.go + +38. **Separate timeout bound** (line 171) https://github.com/coder/coder/pull/26109#discussion_r3387365392 Question whether this operation should still be bounded by a separate timeout. + +## Todos + +- [x] 1. dbpurge.go: remove stale LLM-generated comment (r3379072397) +- [x] 2. dbpurge.go: replace distant err reuse with a flag (r3379093655) +- [ ] 3. database/Store: support asserting InTx (r3379104920) +- [x] 4. chatdebug/service.go: document or return error, incl. TouchRun (r3379151284) +- [x] 5. chatdebug/service.go: rename to maybeResetTicker (r3379164243) +- [ ] 6. messagepartbuffer: extract getEpisode helper (r3379915812) +- [ ] 7. messagepartbuffer: add package and method documentation (r3379988015) +- [ ] 8. messagepartbuffer: extract episode.markCreated/finalize helper (r3380010948) +- [ ] 9. messagepartbuffer: document channel buffering decisions (r3380029684) +- [x] 10. HIGH PRIORITY: add goleak to all packages affected by the refactor; fix straggling goroutines (r3380039964) +- [ ] 11. auto_archive.go: restructure archival loop ticker (r3380197310) +- [ ] 12. auto_archive.go: document UTC 00:00 cutoff choice (r3380219922) +- [ ] 13. auto_archive.go/DB: add Postgres trigger to bump updated_at (r3380232630) +- [ ] 14. chatd.go: use an atomic value (r3380251082) +- [ ] 15. chatd.go: explicit deps for newChatWorker/withDefaults, remove panic (r3380270527) +- [ ] 16. chatd.go: refactor workspace context gathering; rename appendRootChatTools (r3386806941) +- [ ] 17. generation.go: remove runtime checks via explicit required deps (r3380311853) +- [ ] 18. generation.go: reduce juggled variables around locked/chat (r3387161874) +- [ ] 19. generation.go: restructure error handling to prevent misuse (r3387191468) +- [ ] 20. generation.go: named returns or struct for multi-value return (r3387251382) +- [ ] 21. generation.go: unify fence verification via tx.GetChatForTask (r3387288234) +- [ ] 22. generation.go/chatdebug: handle machine update failure outcome; consider removing chatdebug (r3387544273) +- [ ] 23. generation_preparer.go: move magic value to documented const (r3387640179) +- [ ] 24. generation_preparer.go: reuse earlier err variable (r3387645148) +- [ ] 25. generation_preparer.go: named err return + deferred cleanup on error (r3387675842) +- [x] 26. runner.go: treat multiple calls as an error state (r3387720344) +- [x] 27. runner_manager.go: skip logging on ctx cancellation, line 413 (r3380581402) +- [x] 28. runner_manager.go: fix wg.Wait/mu.Lock concurrency concern (r3380592305) +- [x] 29. runner_manager.go: skip logging context canceled errors, line 458 (r3387355724) +- [x] 30. runner_manager.go: document stateCh buffering semantics (r3387788957) +- [ ] 31. testhooks.go: accept context.Context instead of hard-coded timeout (r3382365135) +- [ ] 32. tasks.go: extract side-effecting deps to interface (r3382554277) +- [ ] 33. tasks_test.go: use interface and gomock for taskStarter spy (r3382564035) +- [ ] 34. tasks.go: required options as newTaskStarter args (r3387867697) +- [ ] 35. tasks.go: enforce invariant in state machine, not each Update (r3387919402) +- [ ] 36. worker.go: rename ctx to parentCtx (r3387954876) +- [ ] 37. worker.go: replace magic number with documented const (r3387973304) +- [ ] 38. quickgen.go: consider separate timeout bound (r3387365392)