Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions coderd/x/chatd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"
"sync"
"sync/atomic"

"github.com/google/uuid"
"golang.org/x/xerrors"

"cdr.dev/slog/v3"
"github.com/coder/coder/v2/coderd/database"
coderdpubsub "github.com/coder/coder/v2/coderd/pubsub"
)
Expand Down Expand Up @@ -47,6 +49,8 @@ type runner struct {
rec *runnerRecord
opts chatWorkerOptions

runStarted atomic.Bool

lastSnapshotVersion int64
hasAcceptedState bool
latestState runnerStateUpdate
Expand All @@ -73,6 +77,9 @@ func newRunner(ctx context.Context, mgr *runnerManager, rec *runnerRecord, opts
}

func (r *runner) run() {
if !r.beginRun() {
return
}
if !r.bootstrap() {
return
}
Expand All @@ -89,6 +96,18 @@ func (r *runner) run() {
}
}

func (r *runner) beginRun() bool {
if r.runStarted.CompareAndSwap(false, true) {
return true
}
r.opts.Logger.Error(r.ctx, "chatworker runner run called more than once",
slog.F("chat_id", r.rec.key.ChatID),
slog.F("runner_id", r.rec.key.RunnerID),
)
r.mgr.requestCleanup(r.ctx, r.rec.key)
return false
}

func (r *runner) bootstrap() bool {
// Pubsub can deliver chat:update messages that were already queued by
// Postgres before this runner subscribed. Hold those hints until the
Expand Down
134 changes: 134 additions & 0 deletions coderd/x/chatd/runner_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package chatd

import (
"context"
"errors"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/testutil"
)

func TestRunner_BeginRunTreatsMultipleCallsAsError(t *testing.T) {
t.Parallel()

sink := testutil.NewFakeSink(t)
managerCtx, cancel := context.WithCancel(context.Background())
defer cancel()
mgr := &runnerManager{
ctx: managerCtx,
cleanupReqCh: make(chan runnerKey, 1),
}
key := runnerKey{ChatID: uuid.New(), RunnerID: uuid.New()}
r := &runner{
ctx: context.Background(),
mgr: mgr,
rec: &runnerRecord{key: key},
opts: chatWorkerOptions{
Logger: sink.Logger(),
},
}

require.True(t, r.beginRun())
require.False(t, r.beginRun())

select {
case got := <-mgr.cleanupReqCh:
require.Equal(t, key, got)
case <-time.After(testutil.WaitLong):
t.Fatal("timed out waiting for cleanup request")
}
require.Len(t, entriesWithMessage(sink, "chatworker runner run called more than once"), 1)
}

func TestRunnerManager_RouteStateHintPreservesBufferedHintsWhenFull(t *testing.T) {
t.Parallel()

managerCtx, cancel := context.WithCancel(context.Background())
defer cancel()
chatID := uuid.New()
runnerID := uuid.New()
first := runnerStateUpdate{
ChatID: chatID,
SnapshotVersion: 1,
HistoryVersion: 1,
Status: database.ChatStatusRunning,
}
latest := runnerStateUpdate{
ChatID: chatID,
SnapshotVersion: 2,
HistoryVersion: 2,
Status: database.ChatStatusRunning,
}
done := make(chan struct{})
stateCh := make(chan runnerStateUpdate, 1)
stateCh <- first
mgr := &runnerManager{
ctx: managerCtx,
runnersByChat: map[uuid.UUID]map[uuid.UUID]*runnerRecord{
chatID: {
runnerID: {done: done, stateCh: stateCh},
},
},
}

mgr.RouteStateHint(context.Background(), latest)

select {
case got := <-stateCh:
require.Equal(t, first, got)
default:
t.Fatal("expected buffered state hint to remain queued")
}
select {
case got := <-stateCh:
t.Fatalf("unexpected state hint queued after full channel: %#v", got)
default:
}
}

func TestRunnerManager_RegisterCleanupWaiterDoesNotLockManagerOnShutdown(t *testing.T) {
t.Parallel()

managerCtx, cancel := context.WithCancel(context.Background())
mgr := &runnerManager{ctx: managerCtx, cleanupDoneCh: make(chan runnerKey, 1)}
done := make(chan struct{})
rec := &runnerRecord{done: done}

mgr.mu.Lock()
defer mgr.mu.Unlock()
cancel()
mgr.registerCleanupWaiter(runnerKey{ChatID: uuid.New(), RunnerID: uuid.New()}, rec)
close(done)

waited := make(chan struct{})
go func() {
mgr.wait()
close(waited)
}()
select {
case <-waited:
case <-time.After(testutil.WaitLong):
t.Fatal("cleanup waiter blocked on manager lock during shutdown")
}
}

func TestRunnerManager_ShouldLogLoopError(t *testing.T) {
t.Parallel()

errBoom := xerrors.New("boom")
ctx, cancel := context.WithCancel(context.Background())
cancel()

require.False(t, shouldLogRunnerManagerLoopError(context.Background(), nil))
require.False(t, shouldLogRunnerManagerLoopError(ctx, errBoom))
require.False(t, shouldLogRunnerManagerLoopError(context.Background(), context.Canceled))
require.False(t, shouldLogRunnerManagerLoopError(context.Background(), xerrors.Errorf("wrapped: %w", context.Canceled)))
require.True(t, shouldLogRunnerManagerLoopError(context.Background(), errBoom))
require.True(t, shouldLogRunnerManagerLoopError(context.Background(), errors.Join(errBoom, context.DeadlineExceeded)))
}
37 changes: 25 additions & 12 deletions coderd/x/chatd/runner_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -178,7 +179,15 @@ func (m *runnerManager) RouteStateHint(ctx context.Context, state runnerStateUpd
case <-m.ctx.Done():
return
default:
// stateCh is full; drop the hint for this runner.
// Each hint carries a snapshot version, and runners discard stale
// snapshots after reading them. If this channel is full, preserve
// the buffered older hints instead of evicting the head. Dropping
// the newest hint can delay processing, but it does not lose
// correctness because pubsub or database sync will provide a later
// snapshot, and runner-owned tasks verify durable state before
// committing. The default buffer of 64 absorbs ordinary bursts. A
// full channel means the runner is already behind, so blocking here
// would spread backpressure to pubsub and worker maintenance loops.
}
}
}
Expand Down Expand Up @@ -289,18 +298,12 @@ func (m *runnerManager) handleCleanupRequest(key runnerKey) {
func (m *runnerManager) registerCleanupWaiter(key runnerKey, rec *runnerRecord) {
m.wg.Go(func() {
<-rec.done
if m.ctx.Err() != nil {
m.mu.Lock()
delete(m.cleaning, key)
m.mu.Unlock()
return
}
select {
case m.cleanupDoneCh <- key:
case <-m.ctx.Done():
m.mu.Lock()
delete(m.cleaning, key)
m.mu.Unlock()
// During shutdown the manager is being discarded. Avoid touching
// m.cleaning here so callers waiting on m.wg do not depend on this
// goroutine acquiring m.mu before it can exit.
}
})
}
Expand Down Expand Up @@ -409,7 +412,7 @@ func (m *runnerManager) databaseSyncLoop() {
for {
select {
case <-ticker.C:
if err := m.syncOnce(m.ctx); err != nil {
if err := m.syncOnce(m.ctx); shouldLogRunnerManagerLoopError(m.ctx, err) {
m.opts.Logger.Warn(m.ctx, "chatworker runner sync failed", slogError(err))
}
case <-m.ctx.Done():
Expand Down Expand Up @@ -454,7 +457,7 @@ func (m *runnerManager) heartbeatLoop() {
for {
select {
case <-ticker.C:
if err := m.heartbeatOnce(m.ctx); err != nil {
if err := m.heartbeatOnce(m.ctx); shouldLogRunnerManagerLoopError(m.ctx, err) {
m.opts.Logger.Warn(m.ctx, "chatworker heartbeat failed", slogError(err))
}
case <-m.ctx.Done():
Expand Down Expand Up @@ -500,6 +503,16 @@ func (m *runnerManager) heartbeatCleanupOnce(ctx context.Context) error {
return err
}

func shouldLogRunnerManagerLoopError(ctx context.Context, err error) bool {
if err == nil {
return false
}
if ctx.Err() != nil {
return false
}
return !errors.Is(err, context.Canceled)
}

func stateUpdateFromChat(chat database.Chat) runnerStateUpdate {
var workerID *uuid.UUID
if chat.WorkerID.Valid {
Expand Down
Loading
Loading