From 0c6fe1f378a052ca5e24b38bec8bf66d1d4c54cd Mon Sep 17 00:00:00 2001 From: Hugo Dutka Date: Fri, 12 Jun 2026 13:07:57 +0000 Subject: [PATCH 1/2] deferred review threads 26109 plan --- deferred-review-threads-pr26109.md | 152 +++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 deferred-review-threads-pr26109.md diff --git a/deferred-review-threads-pr26109.md b/deferred-review-threads-pr26109.md new file mode 100644 index 0000000000000..02063544bfbee --- /dev/null +++ b/deferred-review-threads-pr26109.md @@ -0,0 +1,152 @@ +# Deferred Review Threads from PR #26109 + +Source: https://github.com/coder/coder/pull/26109 + +This file lists every review comment thread that hugodutka marked as deferred. It serves as a template for a plan to address these issues. Each entry describes what the thread was about, with a link to the original discussion. + +## Threads + +### coderd/database/dbpurge/dbpurge.go + +1. [DONE] **Stale comment removal** (line 61) https://github.com/coder/coder/pull/26109#discussion_r3379072397 mafredri flagged an LLM-generated comment/reasoning that is no longer relevant and should be removed if unused. + +2. [DONE] **Error reuse across long distance** (line 258) https://github.com/coder/coder/pull/26109#discussion_r3379093655 The code reuses an `err` variable far from where it was set, which is unusual and clever in a bad way. A boolean flag would be clearer than reusing the error value. + +3. [SKIPPED] **InTx assertion support in database/Store** (line 340) https://github.com/coder/coder/pull/26109#discussion_r3379104920 Side note/future work: add support to the database Store to assert that code is running inside a transaction (InTx) and error out if not. + +### coderd/x/chatd/chatdebug/service.go + +4. [DONE] **Undocumented silent failure** (line 543) https://github.com/coder/coder/pull/26109#discussion_r3379151284 Behavior needs to either be documented on the method or return an error. Same applies to `TouchRun`. + +5. [DONE] **Rename to maybeResetTicker** (line 551) https://github.com/coder/coder/pull/26109#discussion_r3379164243 Naming suggestion: rename the function to `maybeResetTicker`. + +### coderd/x/chatd/messagepartbuffer/message_part_buffer.go + +6. **Extract getEpisode helper** (line 192) https://github.com/coder/coder/pull/26109#discussion_r3379915812 Extract a helper method (e.g. `getEpisode`) that handles the not-found check, instead of repeating it. + +7. **Package needs much more documentation** (file-level) https://github.com/coder/coder/pull/26109#discussion_r3379988015 The package is hard to review because intent is not documented. Add package and method documentation explaining why the code does what it does. + +8. **Extract episode finalization helper** (line 300) https://github.com/coder/coder/pull/26109#discussion_r3380010948 Repeated logic in several places could be extracted into a method like `episode.markCreated()` or `episode.finalize()`. + +9. **Document channel buffering decisions** (line 262) https://github.com/coder/coder/pull/26109#discussion_r3380029684 Add comments explaining why a channel is buffered, and why the unbuffered channels are safe to be unbuffered. + +### coderd/x/chatd/messagepartbuffer/message_part_buffer_test.go + +10. [DONE] **Add goleak (HIGH PRIORITY)** (file-level) https://github.com/coder/coder/pull/26109#discussion_r3380039964 mafredri suspects goleak would surface straggling goroutines since teardown appears to be "eventual". hugodutka deferred with high priority: add goleak to all packages affected by the refactor, and consider whether code paths should wait for teardown. + +### coderd/x/chatd/auto_archive.go + +11. **Archival loop ticker behavior** (line 42) https://github.com/coder/coder/pull/26109#discussion_r3380197310 A slow `archiveOnce` or congested DB could create a constant archival loop. Suggested restructure: archiveOnce -> createTicker -> select -> archiveOnce + ticker.Reset(interval). + +12. **Document the UTC 00:00 cutoff choice** (line 68) https://github.com/coder/coder/pull/26109#discussion_r3380219922 Add docs explaining why UTC midnight minus N days was selected as the archival cutoff. + +13. **Postgres trigger for updated_at** (line 85) https://github.com/coder/coder/pull/26109#discussion_r3380232630 Concern that an archived chat could be wiped by dbpurge almost immediately because `updated_at` reflected last chat activity. hugodutka confirmed every chat state transition (including archival) already bumps `updated_at` via `UpdateChatExecutionState`, so nothing is broken, but a Postgres trigger that auto-bumps `updated_at` would be more robust. Deferred. + +### coderd/x/chatd/chatd.go + +14. **Use atomic value** (line 2429) https://github.com/coder/coder/pull/26109#discussion_r3380251082 An atomic value seems more appropriate than the current approach. + +15. **Make newChatWorker/withDefaults dependencies explicit** (line 3591) https://github.com/coder/coder/pull/26109#discussion_r3380270527 Make the implicit dependencies explicit (e.g. `withDefaults(db, ...)`) so the runtime panic for missing options is unnecessary. + +16. **Workspace context gathering refactor** (line 5042) https://github.com/coder/coder/pull/26109#discussion_r3386806941 A comment warns about easy misuse of `appendRootChatTools` regarding workspace context. Suggested renaming it to something like `appendRootChatToolsWithoutWorkspaceContext` or guarding against the mistake in the implementation. hugodutka: workspace context gathering in general should be refactored. + +### coderd/x/chatd/generation.go + +17. **Runtime checks for required options** (line 336) https://github.com/coder/coder/pull/26109#discussion_r3380311853 Question whether all the runtime checks for required options are needed; make dependencies explicit instead. + +18. **Too many juggled variables** (line 370) https://github.com/coder/coder/pull/26109#discussion_r3387161874 Too many variables in scope (`locked`, etc.) remain referenceable after they stop being useful. Prefer a single name like `chat` and override when appropriate. + +19. **Error handling structure prevents misuse** (line 482) https://github.com/coder/coder/pull/26109#discussion_r3387191468 Suggestion to restructure error handling so all error cases are handled in one `if err != nil` block (sql.ErrNoRows -> errTaskExpectedExit, else wrap), preferring structures that prevent misuse. + +20. **Unnamed return signature hard to decipher** (line 817) https://github.com/coder/coder/pull/26109#discussion_r3387251382 A function returns many unnamed values. Use named returns at minimum, or return a struct. + +21. **Unify fence verification query** (line 828) https://github.com/coder/coder/pull/26109#discussion_r3387288234 All call sites of fence verification require the running state and history; this could be unified via something like `tx.GetChatForTask`. + +22. **Machine update failure does not record outcome** (line 1049) https://github.com/coder/coder/pull/26109#discussion_r3387544273 If the machine update fails, no outcome is recorded, possibly leaving untracked work in chatdebug. hugodutka: chatdebug should be removed. + +### coderd/x/chatd/generation_preparer.go + +23. **Magic value should be a documented const** (line 100) https://github.com/coder/coder/pull/26109#discussion_r3387640179 + +24. **Reuse earlier err variable** (line 124) https://github.com/coder/coder/pull/26109#discussion_r3387645148 Nit: could just use the `err` defined earlier. + +25. **Dangerous cleanup pattern** (line 162) https://github.com/coder/coder/pull/26109#discussion_r3387675842 The function is dangerous to edit. Suggested a named `err` return coupled with a `defer func() { if err != nil { cleanup() } }()` so cleanup happens on any error return. + +### coderd/x/chatd/runner.go + +26. **Multiple calls should be an error state** (line 77) https://github.com/coder/coder/pull/26109#discussion_r3387720344 Allowing multiple calls feels like it should be an error state instead. + +### coderd/x/chatd/runner_manager.go + +27. **Noise when ctx cancelled** (line 413) https://github.com/coder/coder/pull/26109#discussion_r3380581402 Skip logging this error when the context is cancelled. + +28. **Potential wg.Wait/mu.Lock deadlock (concurrency)** (line 301) https://github.com/coder/coder/pull/26109#discussion_r3380592305 Caution about potential deadlocks between `m.wg.Wait` and `m.mu.Lock`. hugodutka: ensure this is corrected from a concurrency perspective. + +29. **Skip logging context canceled errors** (line 458) https://github.com/coder/coder/pull/26109#discussion_r3387355724 Same as thread 27, applied to another log site. + +30. **Document stateCh buffering semantics** (line 180) https://github.com/coder/coder/pull/26109#discussion_r3387788957 A target whose stateCh is full gets no state update and must process all previous states. Add comments explaining why this is fine: why a gap at the tail is preferred over the head, expectations around the default 64 buffer size, etc. + +### coderd/x/chatd/testhooks.go + +31. **Hard-coded timeout** (line 19) https://github.com/coder/coder/pull/26109#discussion_r3382365135 Accept a `context.Context` parameter instead of a hard-coded timeout. + +### coderd/x/chatd/tasks.go and tasks_test.go + +32. **Extract side effects to an interface** (tasks.go line 124) https://github.com/coder/coder/pull/26109#discussion_r3382554277 Extracting side-effecting dependencies to an interface would make the seam clearer and easier to mock or spy on in tests. + +33. **taskStarter test spy / gomock** (tasks_test.go line 1040) https://github.com/coder/coder/pull/26109#discussion_r3382564035 Related to thread 32: `taskStarter` has many side-effecting dependencies. An interface would allow using gomock for assertions. + +34. **Required options as newTaskStarter args** (tasks.go line 141) https://github.com/coder/coder/pull/26109#discussion_r3387867697 Required options should be explicit arguments of `newTaskStarter`. + +35. **State invariant should be enforced by the machine** (tasks.go line 434) https://github.com/coder/coder/pull/26109#discussion_r3387919402 An invariant is verified in each `Update` call; it should instead be enforced by the state machine itself. + +### coderd/x/chatd/worker.go + +36. **Rename ctx to parentCtx** (line 49) https://github.com/coder/coder/pull/26109#discussion_r3387954876 Rename the outer ctx to `parentCtx` and use `ctx` inline to avoid bugs where the wrong context is referenced. + +37. **Magic number** (line 120) https://github.com/coder/coder/pull/26109#discussion_r3387973304 Replace magic number with a documented const. + +### coderd/x/chatd/quickgen.go + +38. **Separate timeout bound** (line 171) https://github.com/coder/coder/pull/26109#discussion_r3387365392 Question whether this operation should still be bounded by a separate timeout. + +## Todos + +- [x] 1. dbpurge.go: remove stale LLM-generated comment (r3379072397) +- [x] 2. dbpurge.go: replace distant err reuse with a flag (r3379093655) +- [ ] 3. database/Store: support asserting InTx (r3379104920) +- [x] 4. chatdebug/service.go: document or return error, incl. TouchRun (r3379151284) +- [x] 5. chatdebug/service.go: rename to maybeResetTicker (r3379164243) +- [ ] 6. messagepartbuffer: extract getEpisode helper (r3379915812) +- [ ] 7. messagepartbuffer: add package and method documentation (r3379988015) +- [ ] 8. messagepartbuffer: extract episode.markCreated/finalize helper (r3380010948) +- [ ] 9. messagepartbuffer: document channel buffering decisions (r3380029684) +- [x] 10. HIGH PRIORITY: add goleak to all packages affected by the refactor; fix straggling goroutines (r3380039964) +- [ ] 11. auto_archive.go: restructure archival loop ticker (r3380197310) +- [ ] 12. auto_archive.go: document UTC 00:00 cutoff choice (r3380219922) +- [ ] 13. auto_archive.go/DB: add Postgres trigger to bump updated_at (r3380232630) +- [ ] 14. chatd.go: use an atomic value (r3380251082) +- [ ] 15. chatd.go: explicit deps for newChatWorker/withDefaults, remove panic (r3380270527) +- [ ] 16. chatd.go: refactor workspace context gathering; rename appendRootChatTools (r3386806941) +- [ ] 17. generation.go: remove runtime checks via explicit required deps (r3380311853) +- [ ] 18. generation.go: reduce juggled variables around locked/chat (r3387161874) +- [ ] 19. generation.go: restructure error handling to prevent misuse (r3387191468) +- [ ] 20. generation.go: named returns or struct for multi-value return (r3387251382) +- [ ] 21. generation.go: unify fence verification via tx.GetChatForTask (r3387288234) +- [ ] 22. generation.go/chatdebug: handle machine update failure outcome; consider removing chatdebug (r3387544273) +- [ ] 23. generation_preparer.go: move magic value to documented const (r3387640179) +- [ ] 24. generation_preparer.go: reuse earlier err variable (r3387645148) +- [ ] 25. generation_preparer.go: named err return + deferred cleanup on error (r3387675842) +- [ ] 26. runner.go: treat multiple calls as an error state (r3387720344) +- [ ] 27. runner_manager.go: skip logging on ctx cancellation, line 413 (r3380581402) +- [ ] 28. runner_manager.go: fix wg.Wait/mu.Lock concurrency concern (r3380592305) +- [ ] 29. runner_manager.go: skip logging context canceled errors, line 458 (r3387355724) +- [ ] 30. runner_manager.go: document stateCh buffering semantics (r3387788957) +- [ ] 31. testhooks.go: accept context.Context instead of hard-coded timeout (r3382365135) +- [ ] 32. tasks.go: extract side-effecting deps to interface (r3382554277) +- [ ] 33. tasks_test.go: use interface and gomock for taskStarter spy (r3382564035) +- [ ] 34. tasks.go: required options as newTaskStarter args (r3387867697) +- [ ] 35. tasks.go: enforce invariant in state machine, not each Update (r3387919402) +- [ ] 36. worker.go: rename ctx to parentCtx (r3387954876) +- [ ] 37. worker.go: replace magic number with documented const (r3387973304) +- [ ] 38. quickgen.go: consider separate timeout bound (r3387365392) From f238ff2d53bb9b051fd4139d3bccc9e395670583 Mon Sep 17 00:00:00 2001 From: Hugo Dutka Date: Thu, 18 Jun 2026 10:37:39 +0000 Subject: [PATCH 2/2] refactor(coderd/x/chatd): clarify task side effects --- coderd/x/chatd/chatstate/transitions.go | 16 ++ .../chatstate/transitions_matrix_test.go | 13 +- coderd/x/chatd/chatstate/transitions_test.go | 71 ++++++++ coderd/x/chatd/generation.go | 36 ++-- .../x/chatd/generation_retry_internal_test.go | 12 +- .../mock_task_side_effects_internal_test.go | 79 +++++++++ coderd/x/chatd/mockgen_internal_test.go | 3 + coderd/x/chatd/tasks.go | 158 +++++++++++------- coderd/x/chatd/tasks_test.go | 65 +++++-- coderd/x/chatd/worker.go | 9 +- deferred-review-threads-pr26109.md | 10 +- 11 files changed, 361 insertions(+), 111 deletions(-) create mode 100644 coderd/x/chatd/mock_task_side_effects_internal_test.go create mode 100644 coderd/x/chatd/mockgen_internal_test.go diff --git a/coderd/x/chatd/chatstate/transitions.go b/coderd/x/chatd/chatstate/transitions.go index 964610f84d3a1..543c97e9d5b03 100644 --- a/coderd/x/chatd/chatstate/transitions.go +++ b/coderd/x/chatd/chatstate/transitions.go @@ -1344,6 +1344,7 @@ func (tx *Tx) FinishError(input FinishErrorInput) (FinishErrorResult, error) { // CancelRequiresActionInput configures [Tx.CancelRequiresAction]. type CancelRequiresActionInput struct { Reason string + Now time.Time } // CancelRequiresActionResult is returned by [Tx.CancelRequiresAction]. @@ -1358,6 +1359,21 @@ func (tx *Tx) CancelRequiresAction(input CancelRequiresActionInput) (CancelRequi if err != nil { return CancelRequiresActionResult{}, err } + if chat.RequiresActionDeadlineAt.Valid { + now := input.Now + if now.IsZero() { + now, err = tx.store.GetDatabaseNow(tx.ctx) + if err != nil { + return CancelRequiresActionResult{}, xerrors.Errorf("get database time: %w", err) + } + } + if now.Before(chat.RequiresActionDeadlineAt.Time) { + return CancelRequiresActionResult{}, newTransitionError( + TransitionCancelRequiresAction, from, + "requires-action deadline has not expired", + ) + } + } reason := input.Reason if reason == "" { reason = "Tool execution timed out" diff --git a/coderd/x/chatd/chatstate/transitions_matrix_test.go b/coderd/x/chatd/chatstate/transitions_matrix_test.go index ef35090f7dad2..0f39b1a6a5609 100644 --- a/coderd/x/chatd/chatstate/transitions_matrix_test.go +++ b/coderd/x/chatd/chatstate/transitions_matrix_test.go @@ -7,6 +7,7 @@ import ( "slices" "sync" "testing" + "time" "github.com/google/uuid" "github.com/sqlc-dev/pqtype" @@ -256,11 +257,19 @@ func applyFinishError(t *testing.T, _ *testFixture, tx *chatstate.Tx, _ seededCh return err } -func applyCancelRequiresAction(t *testing.T, _ *testFixture, tx *chatstate.Tx, _ seededChat, _ chatstate.ExecutionState, result *transitionCaseResult) error { +func applyCancelRequiresAction(t *testing.T, _ *testFixture, tx *chatstate.Tx, seeded seededChat, _ chatstate.ExecutionState, result *transitionCaseResult) error { t.Helper() - var err error + chat, err := tx.Store().GetChatByID(tx.Ctx(), seeded.chatID) + if err != nil { + return err + } + now := time.Time{} + if chat.RequiresActionDeadlineAt.Valid { + now = chat.RequiresActionDeadlineAt.Time.Add(time.Nanosecond) + } result.cancelRequiresAction, err = tx.CancelRequiresAction(chatstate.CancelRequiresActionInput{ Reason: "cancel from test", + Now: now, }) return err } diff --git a/coderd/x/chatd/chatstate/transitions_test.go b/coderd/x/chatd/chatstate/transitions_test.go index c4df476d7ba99..a09b89436132b 100644 --- a/coderd/x/chatd/chatstate/transitions_test.go +++ b/coderd/x/chatd/chatstate/transitions_test.go @@ -1,8 +1,10 @@ package chatstate_test import ( + "database/sql" "encoding/json" "testing" + "time" "github.com/google/uuid" "github.com/sqlc-dev/pqtype" @@ -361,6 +363,75 @@ func TestTransitionInputValidation(t *testing.T) { }) } }) + + t.Run("CancelRequiresAction_invalid_deadline", func(t *testing.T) { + t.Parallel() + for _, tc := range cancelRequiresActionDeadlineRejectCases() { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + runCancelRequiresActionDeadlineRejectCase(t, tc) + }) + } + }) +} + +type cancelRequiresActionDeadlineRejectCase struct { + name string + deadline sql.NullTime + now time.Time + want string +} + +func cancelRequiresActionDeadlineRejectCases() []cancelRequiresActionDeadlineRejectCase { + now := time.Now().UTC() + return []cancelRequiresActionDeadlineRejectCase{ + { + name: "not_expired", + deadline: sql.NullTime{Time: now.Add(time.Minute), Valid: true}, + now: now, + want: "requires-action deadline has not expired", + }, + } +} + +func runCancelRequiresActionDeadlineRejectCase(t *testing.T, tc cancelRequiresActionDeadlineRejectCase) { + t.Helper() + f := newTestFixture(t) + ctx := testutil.Context(t, testutil.WaitShort) + seeded := seedState(t, f, chatstate.StateA0) + require.Equal(t, chatstate.StateA0, f.classify(ctx, t, seeded.chatID)) + require.NoError(t, f.DB.InTx(func(store database.Store) error { + chat, err := store.GetChatByID(ctx, seeded.chatID) + if err != nil { + return err + } + _, err = store.UpdateChatExecutionState(ctx, database.UpdateChatExecutionStateParams{ + ID: chat.ID, + Status: chat.Status, + Archived: chat.Archived, + WorkerID: chat.WorkerID, + RunnerID: chat.RunnerID, + LastError: chat.LastError, + RequiresActionDeadlineAt: tc.deadline, + }) + return err + }, nil)) + base := captureBaseline(ctx, t, f, seeded) + m := chatstate.NewChatMachine(f.DB, f.Pub, seeded.chatID) + + err := m.Update(ctx, func(tx *chatstate.Tx, store database.Store) error { + _, cancelErr := tx.CancelRequiresAction(chatstate.CancelRequiresActionInput{Now: tc.now}) + return cancelErr + }) + + require.Error(t, err) + require.ErrorIs(t, err, chatstate.ErrTransitionNotAllowed) + var te *chatstate.TransitionError + require.ErrorAs(t, err, &te) + require.Equal(t, chatstate.TransitionCancelRequiresAction, te.Transition) + require.Equal(t, chatstate.StateA0, te.From) + require.Contains(t, te.Reason, tc.want) + assertNoMutationOrPublish(ctx, t, f, seeded.chatID, base) } // TestSendMessageQueueCapRejectsQueueAppend seeds a chat with the diff --git a/coderd/x/chatd/generation.go b/coderd/x/chatd/generation.go index c3afe31faa179..d1247f52ca822 100644 --- a/coderd/x/chatd/generation.go +++ b/coderd/x/chatd/generation.go @@ -348,7 +348,7 @@ func (s *taskStarter) StartGeneration(ctx context.Context, input chatWorkerTaskS if s.server == nil { return xerrors.New("chatworker: server is required") } - machine := chatstate.NewChatMachine(s.opts.Store, s.opts.Pubsub, input.ChatID) + machine := chatstate.NewChatMachine(s.store, s.pubsub, input.ChatID) chainModeDisabled := false for { locked, messages, err := loadGenerationState(ctx, machine, input) @@ -439,7 +439,7 @@ func (s *taskStarter) StartGeneration(ctx context.Context, input chatWorkerTaskS return err } if decision.retry { - s.opts.Logger.Warn(ctx, "chat generation retrying", + s.opts.logger.Warn(ctx, "chat generation retrying", slog.F("chat_id", input.ChatID), slog.F("worker_id", input.WorkerID), slog.F("action", action), @@ -556,7 +556,7 @@ func (*taskStarter) recordGenerationRetry( } func (s *taskStarter) waitGenerationRetry(ctx context.Context, delay time.Duration) error { - timer := s.opts.Clock.NewTimer(delay, "chatworker", "generation-retry") + timer := s.opts.clock.NewTimer(delay, "chatworker", "generation-retry") defer timer.Stop() select { case <-timer.C: @@ -609,7 +609,7 @@ func retryGenerationPhase[T any](ctx context.Context, starter *taskStarter, phas lastErr = err if attempt < generationPhaseMaxAttempts-1 { delay := generationPhaseBackoff(attempt) - starter.opts.Logger.Warn(ctx, "chat generation phase retrying", + starter.opts.logger.Warn(ctx, "chat generation phase retrying", slog.F("phase", phase), slog.F("attempt", attempt+1), slog.F("max_attempts", generationPhaseMaxAttempts), @@ -625,7 +625,7 @@ func retryGenerationPhase[T any](ctx context.Context, starter *taskStarter, phas } func (s *taskStarter) waitGenerationPhaseBackoff(ctx context.Context, delay time.Duration) error { - timer := s.opts.Clock.NewTimer(delay, "chatworker", "generation-phase-retry") + timer := s.opts.clock.NewTimer(delay, "chatworker", "generation-phase-retry") defer timer.Stop() select { case <-timer.C: @@ -658,8 +658,8 @@ func (s *taskStarter) generateAssistant( ModelConfig: prepared.ModelConfig, ProviderOptions: prepared.ProviderOptions, PublishMessagePart: publish, - Logger: s.opts.Logger, - Clock: s.opts.Clock, + Logger: s.opts.logger, + Clock: s.opts.clock, Metrics: s.server.metrics, }) if err != nil { @@ -673,7 +673,7 @@ func (s *taskStarter) generateAssistant( modelCallConfig: prepared.ModelConfig, step: stepDataFromPersisted(outcome.Step), toolNameToConfigID: prepared.ToolNameToConfigID, - logger: s.opts.Logger, + logger: s.opts.logger, contentVersion: chatprompt.CurrentContentVersion, }) if err != nil { @@ -715,9 +715,9 @@ func (s *taskStarter) executeLocalTools( ModelProvider: provider, ModelName: modelName, PublishMessagePart: publish, - Logger: s.opts.Logger, + Logger: s.opts.logger, Metrics: s.server.metrics, - Clock: s.opts.Clock, + Clock: s.opts.clock, }) if err != nil { return err @@ -727,7 +727,7 @@ func (s *taskStarter) executeLocalTools( modelCallConfig: prepared.ModelConfig, step: stepDataFromPersisted(outcome.Step), toolNameToConfigID: prepared.ToolNameToConfigID, - logger: s.opts.Logger, + logger: s.opts.logger, contentVersion: chatprompt.CurrentContentVersion, }) if err != nil { @@ -812,7 +812,7 @@ func (s *taskStarter) persistWorkspaceContext( if s.server == nil { return errTaskExpectedExit } - messages, err := s.opts.Store.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{ + messages, err := s.store.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{ ChatID: input.ChatID, AfterID: 0, }) @@ -882,14 +882,14 @@ func (s *taskStarter) beginGenerationAttempt( HistoryVersion: committed.HistoryVersion, GenerationAttempt: attempt, } - if err := s.opts.MessagePartBuffer.CreateEpisode(key); err != nil && ctx.Err() == nil { + if err := s.messagePartBuffer.CreateEpisode(key); err != nil && ctx.Err() == nil { return 0, messagepartbuffer.Key{}, nil, nil, taskRetryableError{err: xerrors.Errorf("create message part episode: %w", err)} } publish := func(role codersdk.ChatMessageRole, part codersdk.ChatMessagePart) { - _ = s.opts.MessagePartBuffer.AddPart(key, role, part) + _ = s.messagePartBuffer.AddPart(key, role, part) } closeEpisode := func() { - _ = s.opts.MessagePartBuffer.CloseEpisode(key) + _ = s.messagePartBuffer.CloseEpisode(key) } return attempt, key, publish, closeEpisode, nil } @@ -935,7 +935,7 @@ func (s *taskStarter) commitGenerationStep( if err != nil { return normalizeTaskTransitionError(err, "commit generation step") } - s.routeStateHint(ctx, stateUpdateFromChat(committed)) + s.sideEffects.RouteStateHint(ctx, stateUpdateFromChat(committed)) return s.afterGenerationOutcome(ctx, generationOutcome{ Chat: committed, Kind: runnerActionKind(kind), @@ -1040,7 +1040,7 @@ func (s *taskStarter) finishGenerationTurn( }); err != nil { return err } - s.routeStateHint(ctx, stateUpdateFromChat(committed)) + s.sideEffects.RouteStateHint(ctx, stateUpdateFromChat(committed)) return nil } @@ -1056,7 +1056,7 @@ func (s *taskStarter) finishGenerationError( // Log the unsanitized cause before persisting so administrators can // diagnose the failure even when the classified user-facing message // omits the underlying reason, and even if the persist below fails. - s.opts.Logger.Warn(ctx, "chat generation failed", + s.opts.logger.Warn(ctx, "chat generation failed", slog.F("chat_id", input.ChatID), slog.F("worker_id", input.WorkerID), slog.F("generation_attempt", input.GenerationAttempt), diff --git a/coderd/x/chatd/generation_retry_internal_test.go b/coderd/x/chatd/generation_retry_internal_test.go index f55b3502959d7..c2073462450f9 100644 --- a/coderd/x/chatd/generation_retry_internal_test.go +++ b/coderd/x/chatd/generation_retry_internal_test.go @@ -59,7 +59,7 @@ func TestRetryGenerationPhase(t *testing.T) { defer timerTrap.Close() sink := testutil.NewFakeSink(t) starter := newGenerationPhaseTestStarter(t, clock) - starter.opts.Logger = sink.Logger() + starter.opts.logger = sink.Logger() ctx := testutil.Context(t, testutil.WaitLong) calls := 0 done := make(chan phaseRetryResult[string], 1) @@ -172,10 +172,10 @@ type phaseRetryResult[T any] struct { func newGenerationPhaseTestStarter(t *testing.T, clock quartz.Clock) *taskStarter { t.Helper() require.NotNil(t, clock) - return &taskStarter{opts: chatWorkerOptions{ - Clock: clock, - Logger: testutil.NewFakeSink(t).Logger(), - TaskRetryInitialBackoff: time.Millisecond, - TaskRetryMaxBackoff: time.Millisecond, + return &taskStarter{opts: taskStarterOptions{ + clock: clock, + logger: testutil.NewFakeSink(t).Logger(), + taskRetryInitialBackoff: time.Millisecond, + taskRetryMaxBackoff: time.Millisecond, }} } diff --git a/coderd/x/chatd/mock_task_side_effects_internal_test.go b/coderd/x/chatd/mock_task_side_effects_internal_test.go new file mode 100644 index 0000000000000..5365e6905d3fd --- /dev/null +++ b/coderd/x/chatd/mock_task_side_effects_internal_test.go @@ -0,0 +1,79 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: tasks.go +// +// Generated by this command: +// +// mockgen -source=tasks.go -destination ./mock_task_side_effects_internal_test.go -package chatd -mock_names taskSideEffects=MockTaskSideEffects taskSideEffects +// + +// Package chatd is a generated GoMock package. +package chatd + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockTaskSideEffects is a mock of taskSideEffects interface. +type MockTaskSideEffects struct { + ctrl *gomock.Controller + recorder *MockTaskSideEffectsMockRecorder + isgomock struct{} +} + +// MockTaskSideEffectsMockRecorder is the mock recorder for MockTaskSideEffects. +type MockTaskSideEffectsMockRecorder struct { + mock *MockTaskSideEffects +} + +// NewMockTaskSideEffects creates a new mock instance. +func NewMockTaskSideEffects(ctrl *gomock.Controller) *MockTaskSideEffects { + mock := &MockTaskSideEffects{ctrl: ctrl} + mock.recorder = &MockTaskSideEffectsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskSideEffects) EXPECT() *MockTaskSideEffectsMockRecorder { + return m.recorder +} + +// AfterInterruptionOutcome mocks base method. +func (m *MockTaskSideEffects) AfterInterruptionOutcome(arg0 context.Context, arg1 interruptionOutcome) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AfterInterruptionOutcome", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AfterInterruptionOutcome indicates an expected call of AfterInterruptionOutcome. +func (mr *MockTaskSideEffectsMockRecorder) AfterInterruptionOutcome(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AfterInterruptionOutcome", reflect.TypeOf((*MockTaskSideEffects)(nil).AfterInterruptionOutcome), arg0, arg1) +} + +// RequestCleanup mocks base method. +func (m *MockTaskSideEffects) RequestCleanup(arg0 context.Context, arg1 runnerKey) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RequestCleanup", arg0, arg1) +} + +// RequestCleanup indicates an expected call of RequestCleanup. +func (mr *MockTaskSideEffectsMockRecorder) RequestCleanup(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestCleanup", reflect.TypeOf((*MockTaskSideEffects)(nil).RequestCleanup), arg0, arg1) +} + +// RouteStateHint mocks base method. +func (m *MockTaskSideEffects) RouteStateHint(arg0 context.Context, arg1 runnerStateUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RouteStateHint", arg0, arg1) +} + +// RouteStateHint indicates an expected call of RouteStateHint. +func (mr *MockTaskSideEffectsMockRecorder) RouteStateHint(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RouteStateHint", reflect.TypeOf((*MockTaskSideEffects)(nil).RouteStateHint), arg0, arg1) +} diff --git a/coderd/x/chatd/mockgen_internal_test.go b/coderd/x/chatd/mockgen_internal_test.go new file mode 100644 index 0000000000000..2b35f91f696d3 --- /dev/null +++ b/coderd/x/chatd/mockgen_internal_test.go @@ -0,0 +1,3 @@ +package chatd + +//go:generate go tool mockgen -source=tasks.go -destination ./mock_task_side_effects_internal_test.go -package chatd -mock_names taskSideEffects=MockTaskSideEffects taskSideEffects diff --git a/coderd/x/chatd/tasks.go b/coderd/x/chatd/tasks.go index f370ea852f7e9..342b3fe9671dd 100644 --- a/coderd/x/chatd/tasks.go +++ b/coderd/x/chatd/tasks.go @@ -166,52 +166,87 @@ type interruptionOutcome struct { WatchEventKind codersdk.ChatWatchEventKind } +type taskSideEffects interface { + RouteStateHint(context.Context, runnerStateUpdate) + RequestCleanup(context.Context, runnerKey) + AfterInterruptionOutcome(context.Context, interruptionOutcome) error +} + +type runnerManagerTaskSideEffects struct { + manager *runnerManager + server *Server +} + +func (s runnerManagerTaskSideEffects) RouteStateHint(ctx context.Context, state runnerStateUpdate) { + s.manager.RouteStateHint(ctx, state) +} + +func (s runnerManagerTaskSideEffects) RequestCleanup(ctx context.Context, key runnerKey) { + s.manager.requestCleanup(ctx, key) +} + +func (s runnerManagerTaskSideEffects) AfterInterruptionOutcome(ctx context.Context, outcome interruptionOutcome) error { + if s.server == nil { + return nil + } + return s.server.afterInterruptionOutcome(ctx, outcome) +} + +type taskStarterOptions struct { + logger slog.Logger + clock quartz.Clock + taskRetryInitialBackoff time.Duration + taskRetryMaxBackoff time.Duration +} + type taskStarter struct { - server *Server - opts chatWorkerOptions - routeStateHint func(context.Context, runnerStateUpdate) - requestCleanup func(context.Context, runnerKey) - afterInterruptionOutcome func(context.Context, interruptionOutcome) error + server *Server + store database.Store + pubsub chatWorkerPubsub + messagePartBuffer *messagepartbuffer.Buffer + sideEffects taskSideEffects + opts taskStarterOptions } func newTaskStarter( server *Server, - opts chatWorkerOptions, - routeStateHint func(context.Context, runnerStateUpdate), - requestCleanup func(context.Context, runnerKey), + store database.Store, + pubsub chatWorkerPubsub, + messagePartBuffer *messagepartbuffer.Buffer, + sideEffects taskSideEffects, + opts taskStarterOptions, ) (*taskStarter, error) { - if opts.Store == nil { + if store == nil { return nil, xerrors.New("chatworker: task store is required") } - if opts.Pubsub == nil { + if pubsub == nil { return nil, xerrors.New("chatworker: task pubsub is required") } - if opts.MessagePartBuffer == nil { + if messagePartBuffer == nil { return nil, xerrors.New("chatworker: message part buffer is required") } - if opts.Clock == nil { - opts.Clock = quartz.NewReal() - } - if opts.TaskRetryInitialBackoff <= 0 { - opts.TaskRetryInitialBackoff = defaultTaskRetryInitialBackoff + if sideEffects == nil { + return nil, xerrors.New("chatworker: task side effects are required") } - if opts.TaskRetryMaxBackoff <= 0 { - opts.TaskRetryMaxBackoff = defaultTaskRetryMaxBackoff + if opts.clock == nil { + opts.clock = quartz.NewReal() } - if opts.TaskRetryMaxBackoff < opts.TaskRetryInitialBackoff { - opts.TaskRetryMaxBackoff = opts.TaskRetryInitialBackoff + if opts.taskRetryInitialBackoff <= 0 { + opts.taskRetryInitialBackoff = defaultTaskRetryInitialBackoff } - if routeStateHint == nil { - return nil, xerrors.New("chatworker: route state hint callback is required") + if opts.taskRetryMaxBackoff <= 0 { + opts.taskRetryMaxBackoff = defaultTaskRetryMaxBackoff } - if requestCleanup == nil { - return nil, xerrors.New("chatworker: cleanup callback is required") + if opts.taskRetryMaxBackoff < opts.taskRetryInitialBackoff { + opts.taskRetryMaxBackoff = opts.taskRetryInitialBackoff } return &taskStarter{ - server: server, - opts: opts, - routeStateHint: routeStateHint, - requestCleanup: requestCleanup, + server: server, + store: store, + pubsub: pubsub, + messagePartBuffer: messagePartBuffer, + sideEffects: sideEffects, + opts: opts, }, nil } @@ -224,8 +259,17 @@ func (o chatWorkerOptions) retryOptions() retryWrapperOptions { } } +func (o chatWorkerOptions) taskStarterOptions() taskStarterOptions { + return taskStarterOptions{ + logger: o.Logger, + clock: o.Clock, + taskRetryInitialBackoff: o.TaskRetryInitialBackoff, + taskRetryMaxBackoff: o.TaskRetryMaxBackoff, + } +} + func (s *taskStarter) StartInterrupt(ctx context.Context, input chatWorkerTaskStartInput) error { - machine := chatstate.NewChatMachine(s.opts.Store, s.opts.Pubsub, input.ChatID) + machine := chatstate.NewChatMachine(s.store, s.pubsub, input.ChatID) var chat database.Chat err := machine.ReadLock(ctx, func(store database.Store) error { locked, err := store.GetChatByID(ctx, input.ChatID) @@ -250,13 +294,13 @@ func (s *taskStarter) StartInterrupt(ctx context.Context, input chatWorkerTaskSt HistoryVersion: input.HistoryVersion, GenerationAttempt: chat.GenerationAttempt, } - if err := s.opts.MessagePartBuffer.CloseEpisode(key); err != nil { + if err := s.messagePartBuffer.CloseEpisode(key); err != nil { if ctx.Err() != nil { return errors.Join(errTaskExpectedExit, xerrors.Errorf("close message part episode: %w", err), ctx.Err()) } return taskRetryableError{err: xerrors.Errorf("close message part episode: %w", err)} } - parts, err := s.opts.MessagePartBuffer.GetParts(key) + parts, err := s.messagePartBuffer.GetParts(key) if errors.Is(err, messagepartbuffer.ErrEpisodeNotFound) { parts = nil err = nil @@ -271,8 +315,8 @@ func (s *taskStarter) StartInterrupt(ctx context.Context, input chatWorkerTaskSt parts: parts, modelConfigID: chat.LastModelConfigID, contentVersion: chatprompt.CurrentContentVersion, - logger: s.opts.Logger, - interruptedAt: s.opts.Clock.Now("chatworker", "interrupt"), + logger: s.opts.logger, + interruptedAt: s.opts.clock.Now("chatworker", "interrupt"), }) if err != nil { return xerrors.Errorf("convert buffered parts: %w", err) @@ -291,7 +335,7 @@ func (s *taskStarter) StartInterrupt(ctx context.Context, input chatWorkerTaskSt return err } messages := partialMessages - committedCancels, err := committedPendingLocalToolCancellationMessages(ctx, store, locked, s.opts.Clock.Now("chatworker", "interrupt")) + committedCancels, err := committedPendingLocalToolCancellationMessages(ctx, store, locked, s.opts.clock.Now("chatworker", "interrupt")) if err != nil { return err } @@ -325,21 +369,14 @@ func (s *taskStarter) StartInterrupt(ctx context.Context, input chatWorkerTaskSt } func (s *taskStarter) runAfterInterruptionOutcome(ctx context.Context, outcome interruptionOutcome) error { - afterOutcome := s.afterInterruptionOutcome - if afterOutcome == nil && s.server != nil { - afterOutcome = s.server.afterInterruptionOutcome - } - if afterOutcome == nil { - return nil - } - if err := afterOutcome(ctx, outcome); err != nil { + if err := s.sideEffects.AfterInterruptionOutcome(ctx, outcome); err != nil { return taskRetryableError{err: xerrors.Errorf("interruption post-outcome side effects: %w", err)} } return nil } func (s *taskStarter) StartRequiresActionTimeout(ctx context.Context, input chatWorkerTaskStartInput) error { - machine := chatstate.NewChatMachine(s.opts.Store, s.opts.Pubsub, input.ChatID) + machine := chatstate.NewChatMachine(s.store, s.pubsub, input.ChatID) for { decision, err := decideRequiresActionTimeout(ctx, machine, input) if err != nil { @@ -404,11 +441,11 @@ func decideRequiresActionTimeout( } func (s *taskStarter) waitUntil(ctx context.Context, deadline time.Time) error { - now := s.opts.Clock.Now("chatworker", "requires-action-timeout") + now := s.opts.clock.Now("chatworker", "requires-action-timeout") if !now.Before(deadline) { return nil } - timer := s.opts.Clock.NewTimer(deadline.Sub(now), "chatworker", "requires-action-timeout") + timer := s.opts.clock.NewTimer(deadline.Sub(now), "chatworker", "requires-action-timeout") defer timer.Stop() select { case <-timer.C: @@ -436,15 +473,6 @@ func (s *taskStarter) cancelRequiresAction( if err := verifyTaskFence(locked, input, database.ChatStatusRequiresAction, taskFenceOptions{requireHistory: true}); err != nil { return err } - if locked.RequiresActionDeadlineAt.Valid { - now, err := store.GetDatabaseNow(ctx) - if err != nil { - return xerrors.Errorf("get database time: %w", err) - } - if now.Before(locked.RequiresActionDeadlineAt.Time) { - return errTaskExpectedExit - } - } if _, err := tx.CancelRequiresAction(chatstate.CancelRequiresActionInput{Reason: reason}); err != nil { return err } @@ -464,7 +492,7 @@ func (s *taskStarter) cancelRequiresAction( } func (s *taskStarter) StartAbandon(ctx context.Context, input chatWorkerTaskStartInput) error { - machine := chatstate.NewChatMachine(s.opts.Store, s.opts.Pubsub, input.ChatID) + machine := chatstate.NewChatMachine(s.store, s.pubsub, input.ChatID) mismatch := false err := machine.Update(ctx, func(tx *chatstate.Tx, store database.Store) error { locked, err := store.GetChatByID(ctx, input.ChatID) @@ -489,12 +517,12 @@ func (s *taskStarter) StartAbandon(ctx context.Context, input chatWorkerTaskStar }) if err != nil { if errors.Is(err, errTaskExpectedExit) && mismatch { - s.requestCleanup(ctx, runnerKey{ChatID: input.ChatID, RunnerID: input.RunnerID}) + s.sideEffects.RequestCleanup(ctx, runnerKey{ChatID: input.ChatID, RunnerID: input.RunnerID}) return nil } return normalizeTaskTransitionError(err, "abandon chat") } - s.requestCleanup(ctx, runnerKey{ChatID: input.ChatID, RunnerID: input.RunnerID}) + s.sideEffects.RequestCleanup(ctx, runnerKey{ChatID: input.ChatID, RunnerID: input.RunnerID}) return nil } @@ -502,7 +530,7 @@ func (s *taskStarter) committedStateAfterUpdateError(ctx context.Context, commit if committed.ID == uuid.Nil { return database.Chat{}, false } - current, err := s.opts.Store.GetChatByID(ctx, committed.ID) + current, err := s.store.GetChatByID(ctx, committed.ID) if err != nil { return database.Chat{}, false } @@ -529,7 +557,7 @@ func (s *taskStarter) publishWatchAndRoute( if err := s.publishWatchWithRetry(watchCtx, chat, kind); err != nil { return err } - s.routeStateHint(ctx, stateUpdateFromChat(chat)) + s.sideEffects.RouteStateHint(ctx, stateUpdateFromChat(chat)) return nil } @@ -538,14 +566,14 @@ func (s *taskStarter) publishWatchWithRetry( chat database.Chat, kind codersdk.ChatWatchEventKind, ) error { - delay := s.opts.TaskRetryInitialBackoff + delay := s.opts.taskRetryInitialBackoff for { - if err := publishChatWatchEvent(s.opts.Pubsub, chat, kind); err == nil { + if err := publishChatWatchEvent(s.pubsub, chat, kind); err == nil { return nil } else if ctx.Err() != nil { return errors.Join(errTaskExpectedExit, xerrors.Errorf("publishChatWatchEvent: %w", ctx.Err())) } - timer := s.opts.Clock.NewTimer(delay, "chatworker", "watch-publish-retry") + timer := s.opts.clock.NewTimer(delay, "chatworker", "watch-publish-retry") select { case <-timer.C: case <-ctx.Done(): @@ -553,10 +581,10 @@ func (s *taskStarter) publishWatchWithRetry( return errors.Join(errTaskExpectedExit, xerrors.Errorf("watch publish retry context done: %w", ctx.Err())) } timer.Stop() - if delay < s.opts.TaskRetryMaxBackoff { + if delay < s.opts.taskRetryMaxBackoff { delay *= 2 - if delay > s.opts.TaskRetryMaxBackoff { - delay = s.opts.TaskRetryMaxBackoff + if delay > s.opts.taskRetryMaxBackoff { + delay = s.opts.taskRetryMaxBackoff } } } diff --git a/coderd/x/chatd/tasks_test.go b/coderd/x/chatd/tasks_test.go index 8fcc588c35a6f..325d17ed3bbcd 100644 --- a/coderd/x/chatd/tasks_test.go +++ b/coderd/x/chatd/tasks_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/sqlc-dev/pqtype" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "golang.org/x/xerrors" "cdr.dev/slog/v3" @@ -230,6 +231,37 @@ func sinkFieldValue(t *testing.T, fields slog.Map, name string) string { return "" } +func TestTaskStarterSideEffectsMock(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + ctrl := gomock.NewController(t) + sideEffects := NewMockTaskSideEffects(ctrl) + chatID := uuid.New() + runnerID := uuid.New() + state := runnerStateUpdate{ + ChatID: chatID, + SnapshotVersion: 7, + Status: database.ChatStatusRunning, + } + cleanup := runnerKey{ChatID: chatID, RunnerID: runnerID} + outcome := interruptionOutcome{ + Chat: database.Chat{ID: chatID, Status: database.ChatStatusWaiting}, + Kind: runnerActionKindFinishInterruption, + WatchEventKind: codersdk.ChatWatchEventKindStatusChange, + } + + gomock.InOrder( + sideEffects.EXPECT().RouteStateHint(gomock.Any(), state), + sideEffects.EXPECT().RequestCleanup(gomock.Any(), cleanup), + sideEffects.EXPECT().AfterInterruptionOutcome(gomock.Any(), outcome).Return(nil), + ) + + sideEffects.RouteStateHint(ctx, state) + sideEffects.RequestCleanup(ctx, cleanup) + require.NoError(t, sideEffects.AfterInterruptionOutcome(ctx, outcome)) +} + func TestInterruptTask_FinishInterruptionOnly(t *testing.T) { t.Parallel() @@ -240,7 +272,7 @@ func TestInterruptTask_FinishInterruptionOnly(t *testing.T) { acquired := f.acquireChat(t, chat.ID, workerID, runnerID) recorder := newTaskSideEffectRecorder() starter := newTestTaskStarter(t, f, recorder) - buffer := starter.opts.MessagePartBuffer + buffer := starter.messagePartBuffer key := messagepartbuffer.Key{ ChatID: chat.ID, HistoryVersion: acquired.HistoryVersion, @@ -354,7 +386,7 @@ func TestInterruptTask_BufferedPartsBecomePartialMessages(t *testing.T) { acquired := f.acquireChat(t, chat.ID, workerID, runnerID) recorder := newTaskSideEffectRecorder() starter := newTestTaskStarter(t, f, recorder) - buffer := starter.opts.MessagePartBuffer + buffer := starter.messagePartBuffer key := messagepartbuffer.Key{ChatID: chat.ID, HistoryVersion: acquired.HistoryVersion, GenerationAttempt: acquired.GenerationAttempt} require.NoError(t, buffer.CreateEpisode(key)) callID := "call_" + uuid.NewString() @@ -1173,19 +1205,19 @@ func newTaskSideEffectRecorder() *taskSideEffectRecorder { return &taskSideEffectRecorder{} } -func (r *taskSideEffectRecorder) routeStateHint(_ context.Context, state runnerStateUpdate) { +func (r *taskSideEffectRecorder) RouteStateHint(_ context.Context, state runnerStateUpdate) { r.mu.Lock() r.hints = append(r.hints, state) r.mu.Unlock() } -func (r *taskSideEffectRecorder) requestCleanup(_ context.Context, key runnerKey) { +func (r *taskSideEffectRecorder) RequestCleanup(_ context.Context, key runnerKey) { r.mu.Lock() r.cleanups = append(r.cleanups, key) r.mu.Unlock() } -func (r *taskSideEffectRecorder) afterInterruptionOutcome(_ context.Context, outcome interruptionOutcome) error { +func (r *taskSideEffectRecorder) AfterInterruptionOutcome(_ context.Context, outcome interruptionOutcome) error { r.mu.Lock() r.interrupts = append(r.interrupts, outcome) r.mu.Unlock() @@ -1246,16 +1278,19 @@ func newTestTaskStarter(t *testing.T, f *taskTestFixture, recorder *taskSideEffe t.Helper() buffer := messagepartbuffer.New(messagepartbuffer.Options{}) t.Cleanup(buffer.Close) - starter, err := newTaskStarter(nil, chatWorkerOptions{ - Store: f.db, - Pubsub: f.pubsub, - Logger: slog.Make(), - Clock: quartz.NewReal(), - MessagePartBuffer: buffer, - TaskRetryInitialBackoff: time.Millisecond, - TaskRetryMaxBackoff: time.Millisecond, - }, recorder.routeStateHint, recorder.requestCleanup) + starter, err := newTaskStarter( + nil, + f.db, + f.pubsub, + buffer, + recorder, + taskStarterOptions{ + logger: slog.Make(), + clock: quartz.NewReal(), + taskRetryInitialBackoff: time.Millisecond, + taskRetryMaxBackoff: time.Millisecond, + }, + ) require.NoError(t, err) - starter.afterInterruptionOutcome = recorder.afterInterruptionOutcome return starter } diff --git a/coderd/x/chatd/worker.go b/coderd/x/chatd/worker.go index 7b3e8d5666fc1..20433e5b0cda6 100644 --- a/coderd/x/chatd/worker.go +++ b/coderd/x/chatd/worker.go @@ -56,7 +56,14 @@ func (w *chatWorker) Start(ctx context.Context) error { workerCtx, cancel := context.WithCancel(ctx) manager := newRunnerManager(workerCtx, w.server, w.opts) if manager.opts.TaskStarter == nil { - starter, err := newTaskStarter(manager.server, manager.opts, manager.RouteStateHint, manager.requestCleanup) + starter, err := newTaskStarter( + manager.server, + manager.opts.Store, + manager.opts.Pubsub, + manager.opts.MessagePartBuffer, + runnerManagerTaskSideEffects{manager: manager, server: manager.server}, + manager.opts.taskStarterOptions(), + ) if err != nil { cancel() return err diff --git a/deferred-review-threads-pr26109.md b/deferred-review-threads-pr26109.md index 02063544bfbee..749ec54369466 100644 --- a/deferred-review-threads-pr26109.md +++ b/deferred-review-threads-pr26109.md @@ -1,5 +1,7 @@ # Deferred Review Threads from PR #26109 + + Source: https://github.com/coder/coder/pull/26109 This file lists every review comment thread that hugodutka marked as deferred. It serves as a template for a plan to address these issues. Each entry describes what the thread was about, with a link to the original discussion. @@ -143,10 +145,10 @@ This file lists every review comment thread that hugodutka marked as deferred. I - [ ] 29. runner_manager.go: skip logging context canceled errors, line 458 (r3387355724) - [ ] 30. runner_manager.go: document stateCh buffering semantics (r3387788957) - [ ] 31. testhooks.go: accept context.Context instead of hard-coded timeout (r3382365135) -- [ ] 32. tasks.go: extract side-effecting deps to interface (r3382554277) -- [ ] 33. tasks_test.go: use interface and gomock for taskStarter spy (r3382564035) -- [ ] 34. tasks.go: required options as newTaskStarter args (r3387867697) -- [ ] 35. tasks.go: enforce invariant in state machine, not each Update (r3387919402) +- [x] 32. tasks.go: extract side-effecting deps to interface (r3382554277) +- [x] 33. tasks_test.go: use interface and gomock for taskStarter spy (r3382564035) +- [x] 34. tasks.go: required options as newTaskStarter args (r3387867697) +- [x] 35. tasks.go: enforce invariant in state machine, not each Update (r3387919402) - [ ] 36. worker.go: rename ctx to parentCtx (r3387954876) - [ ] 37. worker.go: replace magic number with documented const (r3387973304) - [ ] 38. quickgen.go: consider separate timeout bound (r3387365392)