diff --git a/coderd/x/chatd/auto_archive_internal_test.go b/coderd/x/chatd/auto_archive_internal_test.go index 8c2e68b924400..951ba66099ca8 100644 --- a/coderd/x/chatd/auto_archive_internal_test.go +++ b/coderd/x/chatd/auto_archive_internal_test.go @@ -293,9 +293,7 @@ func (f *workerTestFixture) newArchiveWorkerWithOptions(t *testing.T, opts chatW if opts.NotificationsEnqueuer == nil { opts.NotificationsEnqueuer = notificationstest.NewFakeEnqueuer() } - worker, err := newChatWorker(nil, opts) - require.NoError(t, err) - return worker + return newChatWorker(nil, opts.WorkerID, opts.Store, opts.Pubsub, opts.MessagePartBuffer, opts) } func mockAuditorPtr(auditor *audit.MockAuditor) *atomic.Pointer[audit.Auditor] { diff --git a/coderd/x/chatd/chatd.go b/coderd/x/chatd/chatd.go index 0e18f600b60cf..18405c36e9a28 100644 --- a/coderd/x/chatd/chatd.go +++ b/coderd/x/chatd/chatd.go @@ -2275,8 +2275,7 @@ type manualTitleGenerationError struct { // read; the title_change pubsub event it publishes remains the source of // truth for clients. type generatedChatTitle struct { - mu sync.RWMutex - title string + title atomic.Value // string } func (t *generatedChatTitle) Store(title string) { @@ -2284,9 +2283,7 @@ func (t *generatedChatTitle) Store(title string) { return } - t.mu.Lock() - t.title = title - t.mu.Unlock() + t.title.Store(title) } func (t *generatedChatTitle) Load() (string, bool) { @@ -2294,12 +2291,11 @@ func (t *generatedChatTitle) Load() (string, bool) { return "", false } - t.mu.RLock() - defer t.mu.RUnlock() - if t.title == "" { + title, ok := t.title.Load().(string) + if !ok || title == "" { return "", false } - return t.title, true + return title, true } func (e *manualTitleGenerationError) Error() string { @@ -3421,20 +3417,9 @@ func New(ps pubsub.Pubsub, cfg Config) *Server { p.metrics = chatloop.NopMetrics() } p.messagePartBuffer = messagepartbuffer.New(messagepartbuffer.Options{Clock: clk}) - localStreamPartsDialer := NewLocalStreamPartsDialer(LocalStreamPartsDialerConfig{ - Buffer: p.messagePartBuffer, - Logger: cfg.Logger, - }) - p.streamPartsDialer = streamPartsDialerForServer(workerID, localStreamPartsDialer, cfg.StreamPartsDialer) - p.streamSyncPoller = newStreamSyncPoller(ctx, cfg.Database, clk, cfg.Logger.Named("chatstream")) - p.streamSyncPoller.Start() - chatWorker, err := newChatWorker(p, chatWorkerOptions{ - WorkerID: workerID, - Store: cfg.Database, - Pubsub: ps, + workerOpts := chatWorkerOptions{ Logger: cfg.Logger.Named("chatworker"), Clock: clk, - MessagePartBuffer: p.messagePartBuffer, AcquisitionInterval: pendingChatAcquireInterval, AcquisitionBatchSize: maxChatsPerAcquire, HeartbeatInterval: chatHeartbeatInterval, @@ -3442,11 +3427,22 @@ func New(ps pubsub.Pubsub, cfg Config) *Server { NotificationsEnqueuer: notificationsEnqueuer, Auditor: cfg.Auditor, AutoArchiveRecords: chatAutoArchiveRecords, - }) - if err != nil { - panic("chatd: create chat worker: " + err.Error()) } - p.chatWorker = chatWorker + localStreamPartsDialer := NewLocalStreamPartsDialer(LocalStreamPartsDialerConfig{ + Buffer: p.messagePartBuffer, + Logger: cfg.Logger, + }) + p.streamPartsDialer = streamPartsDialerForServer(workerID, localStreamPartsDialer, cfg.StreamPartsDialer) + p.streamSyncPoller = newStreamSyncPoller(ctx, cfg.Database, clk, cfg.Logger.Named("chatstream")) + p.streamSyncPoller.Start() + p.chatWorker = newChatWorker( + p, + workerID, + cfg.Database, + ps, + p.messagePartBuffer, + workerOpts, + ) //nolint:gocritic // The chat processor uses a scoped chatd context. ctx = dbauthz.AsChatd(ctx) @@ -4134,7 +4130,7 @@ func (p *Server) loadPersonalSkillBody( return parsed, nil } -func (p *Server) appendRootChatTools( +func (p *Server) appendRootChatToolsWithoutWorkspaceContextPersistence( ctx context.Context, tools []fantasy.AgentTool, opts rootChatToolsOptions, @@ -4145,14 +4141,11 @@ func (p *Server) appendRootChatTools( // build logs before the tool completes. p.publishChatPubsubEvent(updatedChat, codersdk.ChatWatchEventKindStatusChange, nil) - // Note: we intentionally do not insert AGENTS.md / workspace - // context here. Local tool callbacks must not mutate chat - // history while a local-tool generation task is in flight, - // because that advances history_version before the tool - // result is committed and exits the local-tool commit as - // stale. Workspace context is persisted by the - // persist_workspace_context generation action in a later - // pass. + // Do not persist workspace context from this callback. Local + // tool callbacks run while a generation task is fenced by + // history_version; mutating chat history here would make that + // commit stale. The generation state machine runs + // persist_workspace_context after this tool result commits. // Prime the workspace MCP tools cache while the create_workspace // or start_workspace tool is still running. The AgentID guard diff --git a/coderd/x/chatd/generation_preparer.go b/coderd/x/chatd/generation_preparer.go index a0aec403ba279..d18cf06d455bf 100644 --- a/coderd/x/chatd/generation_preparer.go +++ b/coderd/x/chatd/generation_preparer.go @@ -352,7 +352,7 @@ func (server *Server) prepareGeneration( tools = append(tools, chattool.NewAskUserQuestionTool()) } if isRootChat { - tools = server.appendRootChatTools(ctx, tools, rootChatToolsOptions{ + tools = server.appendRootChatToolsWithoutWorkspaceContextPersistence(ctx, tools, rootChatToolsOptions{ chat: chat, modelConfigID: modelConfig.ID, workspaceCtx: &workspaceCtx, diff --git a/coderd/x/chatd/helpers_test.go b/coderd/x/chatd/helpers_test.go index 352392f26c48c..84f7d5a5464e5 100644 --- a/coderd/x/chatd/helpers_test.go +++ b/coderd/x/chatd/helpers_test.go @@ -269,10 +269,31 @@ func testOptions(t *testing.T, f *workerTestFixture, starter chatWorkerTaskStart } } -func startWorker(t *testing.T, opts chatWorkerOptions) *chatWorker { +func testWorkerDeps(f *workerTestFixture, opts chatWorkerOptions) chatWorkerDependencies { + workerID := opts.WorkerID + if workerID == uuid.Nil { + workerID = uuid.New() + } + store := opts.Store + if store == nil { + store = f.db + } + pubsub := opts.Pubsub + if pubsub == nil { + pubsub = f.pubsub + } + return chatWorkerDependencies{ + WorkerID: workerID, + Store: store, + Pubsub: pubsub, + MessagePartBuffer: opts.MessagePartBuffer, + } +} + +func startWorker(t *testing.T, f *workerTestFixture, opts chatWorkerOptions) *chatWorker { t.Helper() - worker, err := newChatWorker(nil, opts) - require.NoError(t, err) + deps := testWorkerDeps(f, opts) + worker := newChatWorker(nil, deps.WorkerID, deps.Store, deps.Pubsub, deps.MessagePartBuffer, opts) require.NoError(t, worker.Start(context.Background())) t.Cleanup(func() { require.NoError(t, worker.Close()) }) return worker diff --git a/coderd/x/chatd/options.go b/coderd/x/chatd/options.go index ff3dbdd3d9a30..3e3e8929eb310 100644 --- a/coderd/x/chatd/options.go +++ b/coderd/x/chatd/options.go @@ -8,7 +8,6 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" - "golang.org/x/xerrors" "cdr.dev/slog/v3" "github.com/coder/coder/v2/coderd/audit" @@ -94,19 +93,18 @@ type chatWorkerOptions struct { TaskRetryMaxBackoff time.Duration } -func (o chatWorkerOptions) withDefaults() (chatWorkerOptions, error) { - if o.Store == nil { - return chatWorkerOptions{}, xerrors.New("chatworker: store is required") - } - if o.Pubsub == nil { - return chatWorkerOptions{}, xerrors.New("chatworker: pubsub is required") - } - if o.TaskStarter == nil && o.MessagePartBuffer == nil { - return chatWorkerOptions{}, xerrors.New("chatworker: task starter or message part buffer is required") - } - if o.WorkerID == uuid.Nil { - return chatWorkerOptions{}, xerrors.New("chatworker: worker ID is required") - } +type chatWorkerDependencies struct { + WorkerID uuid.UUID + Store database.Store + Pubsub chatWorkerPubsub + MessagePartBuffer *messagepartbuffer.Buffer +} + +func (o chatWorkerOptions) withDefaults(deps chatWorkerDependencies) chatWorkerOptions { + o.WorkerID = deps.WorkerID + o.Store = deps.Store + o.Pubsub = deps.Pubsub + o.MessagePartBuffer = deps.MessagePartBuffer if o.Clock == nil { o.Clock = quartz.NewReal() } @@ -155,5 +153,5 @@ func (o chatWorkerOptions) withDefaults() (chatWorkerOptions, error) { if o.TaskRetryMaxBackoff < o.TaskRetryInitialBackoff { o.TaskRetryMaxBackoff = o.TaskRetryInitialBackoff } - return o, nil + return o } diff --git a/coderd/x/chatd/runner_test.go b/coderd/x/chatd/runner_test.go index ef95069518632..9a2b829c0c953 100644 --- a/coderd/x/chatd/runner_test.go +++ b/coderd/x/chatd/runner_test.go @@ -18,7 +18,7 @@ func TestRunner_IgnoresDuplicateStateNotifications(t *testing.T) { f := newWorkerTestFixture(t) chat := f.createRunningChat(t) starter := newBlockingTaskStarter(false) - startWorker(t, testOptions(t, f, starter)) + startWorker(t, f, testOptions(t, f, starter)) starter.waitCall(t, taskKindGeneration, chat.ID) latest, err := f.db.GetChatByID(testutil.Context(t, testutil.WaitShort), chat.ID) require.NoError(t, err) @@ -33,7 +33,7 @@ func TestRunner_CancelsActiveTaskWhenHistoryChanges(t *testing.T) { f := newWorkerTestFixture(t) chat := f.createRunningChat(t) starter := newBlockingTaskStarter(false) - startWorker(t, testOptions(t, f, starter)) + startWorker(t, f, testOptions(t, f, starter)) first := starter.waitCall(t, taskKindGeneration, chat.ID) updated := commitAssistantStep(t, f, chat.ID, "first step") @@ -49,7 +49,7 @@ func TestRunner_CancelsActiveTaskWhenStatusChanges(t *testing.T) { f := newWorkerTestFixture(t) chat := f.createRunningChat(t) starter := newBlockingTaskStarter(false) - startWorker(t, testOptions(t, f, starter)) + startWorker(t, f, testOptions(t, f, starter)) first := starter.waitCall(t, taskKindGeneration, chat.ID) updated := interruptChat(t, f, chat.ID) @@ -64,7 +64,7 @@ func TestRunner_CleansUpOnOwnershipTakeover(t *testing.T) { f := newWorkerTestFixture(t) chat := f.createRunningChat(t) starter := newBlockingTaskStarter(false) - startWorker(t, testOptions(t, f, starter)) + startWorker(t, f, testOptions(t, f, starter)) first := starter.waitCall(t, taskKindGeneration, chat.ID) acquireChat(t, f, chat.ID, uuid.New(), uuid.New()) @@ -78,7 +78,7 @@ func TestRunner_SerializesReplacementTasksForSameHistoryAndStatus(t *testing.T) chat := f.createRunningChat(t) starter := newBlockingTaskStarter(true) defer starter.releaseAll() - startWorker(t, testOptions(t, f, starter)) + startWorker(t, f, testOptions(t, f, starter)) first := starter.waitCall(t, taskKindGeneration, chat.ID) forceExecutionStateAndPublish(t, f, chat.ID, database.ChatStatusInterrupting, false) @@ -97,7 +97,7 @@ func TestRunner_AllowsReplacementForDifferentHistoryOrStatus(t *testing.T) { chat := f.createRunningChat(t) starter := newBlockingTaskStarter(true) defer starter.releaseAll() - startWorker(t, testOptions(t, f, starter)) + startWorker(t, f, testOptions(t, f, starter)) first := starter.waitCall(t, taskKindGeneration, chat.ID) updated := commitAssistantStep(t, f, chat.ID, "different history") @@ -117,7 +117,7 @@ func TestRunner_TaskTimeoutRetries(t *testing.T) { opts.Clock = clock opts.TaskRetryInitialBackoff = time.Minute opts.TaskRetryMaxBackoff = time.Minute - startWorker(t, opts) + startWorker(t, f, opts) timeoutTrap.MustWait(testutil.Context(t, testutil.WaitLong)).MustRelease(testutil.Context(t, testutil.WaitLong)) timeoutTrap.Close() @@ -143,7 +143,7 @@ func TestWorker_RoutesDatabaseSyncStateToActiveRunner(t *testing.T) { opts := testOptions(t, f, starter) opts.Clock = clock opts.RunnerSyncInterval = time.Minute - startWorker(t, opts) + startWorker(t, f, opts) first := starter.waitCall(t, taskKindGeneration, chat.ID) forceExecutionState(t, f, chat.ID, database.ChatStatusInterrupting, false) @@ -157,7 +157,7 @@ func TestWorker_CleanupStopsRoutingAndCancelsTasks(t *testing.T) { f := newWorkerTestFixture(t) chat := f.createRunningChat(t) starter := newBlockingTaskStarter(false) - startWorker(t, testOptions(t, f, starter)) + startWorker(t, f, testOptions(t, f, starter)) first := starter.waitCall(t, taskKindGeneration, chat.ID) latest := acquireChat(t, f, chat.ID, uuid.New(), uuid.New()) diff --git a/coderd/x/chatd/tasks_test.go b/coderd/x/chatd/tasks_test.go index 8fcc588c35a6f..96d66fb2859f5 100644 --- a/coderd/x/chatd/tasks_test.go +++ b/coderd/x/chatd/tasks_test.go @@ -1124,25 +1124,27 @@ func startRealTaskWorker(t *testing.T, f *taskTestFixture) *chatWorker { t.Helper() buffer := messagepartbuffer.New(messagepartbuffer.Options{}) t.Cleanup(buffer.Close) - worker, err := newChatWorker(nil, chatWorkerOptions{ - WorkerID: uuid.New(), - Store: f.db, - Pubsub: f.pubsub, - Logger: slog.Make(), - MessagePartBuffer: buffer, - AcquisitionInterval: time.Hour, - AcquisitionBatchSize: 10, - RunnerSyncInterval: time.Hour, - HeartbeatInterval: time.Hour, - HeartbeatCleanupInterval: time.Hour, - HeartbeatStaleSeconds: 30, - StateChannelSize: 16, - RunnerManagerChannelSize: 16, - AcquisitionWakeChannelSize: 1, - TaskRetryInitialBackoff: time.Millisecond, - TaskRetryMaxBackoff: time.Millisecond, - }) - require.NoError(t, err) + worker := newChatWorker( + nil, + uuid.New(), + f.db, + f.pubsub, + buffer, + chatWorkerOptions{ + Logger: slog.Make(), + AcquisitionInterval: time.Hour, + AcquisitionBatchSize: 10, + RunnerSyncInterval: time.Hour, + HeartbeatInterval: time.Hour, + HeartbeatCleanupInterval: time.Hour, + HeartbeatStaleSeconds: 30, + StateChannelSize: 16, + RunnerManagerChannelSize: 16, + AcquisitionWakeChannelSize: 1, + TaskRetryInitialBackoff: time.Millisecond, + TaskRetryMaxBackoff: time.Millisecond, + }, + ) require.NoError(t, worker.Start(context.Background())) t.Cleanup(func() { require.NoError(t, worker.Close()) }) return worker diff --git a/coderd/x/chatd/worker.go b/coderd/x/chatd/worker.go index 7b3e8d5666fc1..7c7c75e029c57 100644 --- a/coderd/x/chatd/worker.go +++ b/coderd/x/chatd/worker.go @@ -13,6 +13,7 @@ import ( "github.com/coder/coder/v2/coderd/database" coderdpubsub "github.com/coder/coder/v2/coderd/pubsub" "github.com/coder/coder/v2/coderd/x/chatd/chatstate" + "github.com/coder/coder/v2/coderd/x/chatd/messagepartbuffer" ) // chatWorker owns chat acquisition and runner lifecycle for one process. @@ -32,12 +33,23 @@ type chatWorker struct { // newChatWorker constructs a chat worker. The worker is idle until Start is // called. -func newChatWorker(server *Server, opts chatWorkerOptions) (*chatWorker, error) { - withDefaults, err := opts.withDefaults() - if err != nil { - return nil, err +func newChatWorker( + server *Server, + workerID uuid.UUID, + store database.Store, + pubsub chatWorkerPubsub, + messagePartBuffer *messagepartbuffer.Buffer, + opts chatWorkerOptions, +) *chatWorker { + return &chatWorker{ + server: server, + opts: opts.withDefaults(chatWorkerDependencies{ + WorkerID: workerID, + Store: store, + Pubsub: pubsub, + MessagePartBuffer: messagePartBuffer, + }), } - return &chatWorker{server: server, opts: withDefaults}, nil } // chatWorkerID returns this worker's configured worker ID. diff --git a/coderd/x/chatd/worker_internal_test.go b/coderd/x/chatd/worker_internal_test.go index f01bb0d69cd71..4d4c646f48987 100644 --- a/coderd/x/chatd/worker_internal_test.go +++ b/coderd/x/chatd/worker_internal_test.go @@ -15,30 +15,13 @@ import ( "github.com/coder/quartz" ) -func TestWorker_NewRequiresTaskStarterOrMessagePartBuffer(t *testing.T) { - t.Parallel() - f := newWorkerTestFixture(t) - _, err := newChatWorker(nil, chatWorkerOptions{WorkerID: uuid.New(), Store: f.db, Pubsub: f.pubsub}) - require.ErrorContains(t, err, "task starter or message part buffer is required") -} - -func TestWorker_NewRequiresWorkerID(t *testing.T) { - t.Parallel() - f := newWorkerTestFixture(t) - opts := testOptions(t, f, newRecordingTaskStarter()) - opts.WorkerID = uuid.Nil - _, err := newChatWorker(nil, opts) - require.ErrorContains(t, err, "worker ID is required") -} - func TestWorker_UsesConfiguredWorkerID(t *testing.T) { t.Parallel() f := newWorkerTestFixture(t) starter := newRecordingTaskStarter() opts := testOptions(t, f, starter) workerID := opts.WorkerID - worker, err := newChatWorker(nil, opts) - require.NoError(t, err) + worker := newChatWorker(nil, workerID, opts.Store, opts.Pubsub, opts.MessagePartBuffer, opts) require.Equal(t, workerID, worker.chatWorkerID()) require.NoError(t, worker.Start(context.Background())) require.Equal(t, workerID, worker.chatWorkerID()) @@ -50,7 +33,7 @@ func TestWorker_AcquiresRunnableChatFromOwnershipHint(t *testing.T) { f := newWorkerTestFixture(t) chat := f.createRunningChat(t) starter := newRecordingTaskStarter() - worker := startWorker(t, testOptions(t, f, starter)) + worker := startWorker(t, f, testOptions(t, f, starter)) call := starter.waitCall(t, taskKindGeneration, chat.ID) require.Equal(t, worker.chatWorkerID(), call.input.WorkerID) @@ -73,7 +56,7 @@ func TestWorker_AcquiresRequiresActionChatFromOwnershipHint(t *testing.T) { f := newWorkerTestFixture(t) chat := f.createRequiresActionChat(t) starter := newRecordingTaskStarter() - startWorker(t, testOptions(t, f, starter)) + startWorker(t, f, testOptions(t, f, starter)) call := starter.waitCall(t, taskKindRequiresActionTimeout, chat.ID) require.Equal(t, database.ChatStatusRequiresAction, call.input.Status) @@ -88,7 +71,7 @@ func TestWorker_SkipsFreshlyOwnedChat(t *testing.T) { otherRunner := uuid.New() acquireChat(t, f, chat.ID, otherWorker, otherRunner) starter := newRecordingTaskStarter() - worker := startWorker(t, testOptions(t, f, starter)) + worker := startWorker(t, f, testOptions(t, f, starter)) worker.Wake() starter.assertNoCall(t) @@ -107,7 +90,7 @@ func TestWorker_ReacquiresStaleOwnedChat(t *testing.T) { acquireChat(t, f, chat.ID, deadWorker, deadRunner) makeHeartbeatStale(t, f, chat.ID, deadRunner) starter := newBlockingTaskStarter(false) - worker := startWorker(t, testOptions(t, f, starter)) + worker := startWorker(t, f, testOptions(t, f, starter)) call := starter.waitCall(t, taskKindGeneration, chat.ID) require.Equal(t, worker.chatWorkerID(), call.input.WorkerID) @@ -133,8 +116,8 @@ func TestWorker_TwoWorkersRaceSingleOwner(t *testing.T) { chat := f.createRunningChat(t) firstStarter := newRecordingTaskStarter() secondStarter := newRecordingTaskStarter() - first := startWorker(t, testOptions(t, f, firstStarter)) - second := startWorker(t, testOptions(t, f, secondStarter)) + first := startWorker(t, f, testOptions(t, f, firstStarter)) + second := startWorker(t, f, testOptions(t, f, secondStarter)) call := waitAnyTaskCall(t, firstStarter, secondStarter, taskKindGeneration, chat.ID) require.Contains(t, []uuid.UUID{first.chatWorkerID(), second.chatWorkerID()}, call.input.WorkerID) @@ -158,7 +141,7 @@ func TestWorker_DrainsMultipleRunnableChatsOnWake(t *testing.T) { starter := newRecordingTaskStarter() opts := testOptions(t, f, starter) opts.AcquisitionBatchSize = 1 - startWorker(t, opts) + startWorker(t, f, opts) want := map[uuid.UUID]bool{first.ID: true, second.ID: true, third.ID: true} for range 3 { @@ -178,7 +161,7 @@ func TestWorker_DoesNotAcquireIdleOrArchivedChats(t *testing.T) { archived := f.createRunningChat(t) forceExecutionStateAndPublish(t, f, archived.ID, database.ChatStatusRunning, true) starter := newRecordingTaskStarter() - worker := startWorker(t, testOptions(t, f, starter)) + worker := startWorker(t, f, testOptions(t, f, starter)) worker.Wake() starter.assertNoCall(t) @@ -195,7 +178,7 @@ func TestWorker_HeartbeatLoopRefreshesActiveRunnerHeartbeat(t *testing.T) { opts := testOptions(t, f, starter) opts.Clock = clock opts.HeartbeatInterval = time.Minute - startWorker(t, opts) + startWorker(t, f, opts) heartbeatTrap.MustWait(testutil.Context(t, testutil.WaitLong)).MustRelease(testutil.Context(t, testutil.WaitLong)) call := starter.waitCall(t, taskKindGeneration, chat.ID) oldHeartbeat := makeHeartbeatStale(t, f, chat.ID, call.input.RunnerID) @@ -228,7 +211,7 @@ func TestWorker_HeartbeatCleanupDeletesStaleRows(t *testing.T) { opts := testOptions(t, f, starter) opts.Clock = clock opts.HeartbeatCleanupInterval = time.Minute - startWorker(t, opts) + startWorker(t, f, opts) cleanupTrap.MustWait(testutil.Context(t, testutil.WaitLong)).MustRelease(testutil.Context(t, testutil.WaitLong)) clock.Advance(time.Minute).MustWait(testutil.Context(t, testutil.WaitLong)) @@ -250,7 +233,7 @@ func TestWorker_CloseDeletesOwnedHeartbeatsAndPublishesOwnershipHints(t *testing pubsub := newRecordingPubsub(f.pubsub) opts := testOptions(t, f, starter) opts.Pubsub = pubsub - worker := startWorker(t, opts) + worker := startWorker(t, f, opts) callsByChat := make(map[uuid.UUID]taskCall) for range 2 { call := starter.waitCall(t, taskKindGeneration, uuid.Nil) @@ -283,7 +266,7 @@ func TestWorker_CloseIsIdempotentAndDoesNotBlock(t *testing.T) { f := newWorkerTestFixture(t) chat := f.createRunningChat(t) starter := newBlockingTaskStarter(false) - worker := startWorker(t, testOptions(t, f, starter)) + worker := startWorker(t, f, testOptions(t, f, starter)) call := starter.waitCall(t, taskKindGeneration, chat.ID) closed := make(chan error, 1)