Skip to content

Commit 14ed3e3

Browse files
authored
feat: bump workspace last_used_at on chat heartbeat (#23205)
- coderd: Wires `options.WorkspaceUsageTracker` into the chatd config. - chatd: Adds `UsageTracker` and calls `UsageTracker.Add(workspaceID)` on each heartbeat tick - chatd: adds tests to verify `last_used_at` bump behaviour > 🤖 This PR was created with the help of Coder Agents, and will be reviewed by my human. 🧑‍💻
1 parent fb61c48 commit 14ed3e3

3 files changed

Lines changed: 250 additions & 2 deletions

File tree

coderd/chatd/chatd.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/coder/coder/v2/coderd/database/pubsub"
3232
coderdpubsub "github.com/coder/coder/v2/coderd/pubsub"
3333
"github.com/coder/coder/v2/coderd/webpush"
34+
"github.com/coder/coder/v2/coderd/workspacestats"
3435
"github.com/coder/coder/v2/codersdk"
3536
"github.com/coder/coder/v2/codersdk/workspacesdk"
3637
"github.com/coder/quartz"
@@ -46,7 +47,9 @@ const (
4647

4748
homeInstructionLookupTimeout = 5 * time.Second
4849
instructionCacheTTL = 5 * time.Minute
49-
chatHeartbeatInterval = 60 * time.Second
50+
// DefaultChatHeartbeatInterval is the default time between chat
51+
// heartbeat updates while a chat is being processed.
52+
DefaultChatHeartbeatInterval = 30 * time.Second
5053
maxChatSteps = 1200
5154
// maxStreamBufferSize caps the number of events buffered
5255
// per chat during a single LLM step. When exceeded the
@@ -96,10 +99,14 @@ type Server struct {
9699
instructionCacheMu sync.RWMutex
97100
instructionCache map[uuid.UUID]cachedInstruction
98101

102+
usageTracker *workspacestats.UsageTracker
103+
clock quartz.Clock
104+
99105
// Configuration
100106
pendingChatAcquireInterval time.Duration
101107
maxChatsPerAcquire int32
102108
inFlightChatStaleAfter time.Duration
109+
chatHeartbeatInterval time.Duration
103110
}
104111

105112
type cachedInstruction struct {
@@ -1285,12 +1292,15 @@ type Config struct {
12851292
PendingChatAcquireInterval time.Duration
12861293
MaxChatsPerAcquire int32
12871294
InFlightChatStaleAfter time.Duration
1295+
ChatHeartbeatInterval time.Duration
12881296
AgentConn AgentConnFunc
12891297
CreateWorkspace chattool.CreateWorkspaceFn
12901298
StartWorkspace chattool.StartWorkspaceFn
12911299
Pubsub pubsub.Pubsub
12921300
ProviderAPIKeys chatprovider.ProviderAPIKeys
12931301
WebpushDispatcher webpush.Dispatcher
1302+
UsageTracker *workspacestats.UsageTracker
1303+
Clock quartz.Clock
12941304
}
12951305

12961306
// New creates a new chat processor. The processor polls for pending
@@ -1314,6 +1324,16 @@ func New(cfg Config) *Server {
13141324
maxChatsPerAcquire = DefaultMaxChatsPerAcquire
13151325
}
13161326

1327+
chatHeartbeatInterval := cfg.ChatHeartbeatInterval
1328+
if chatHeartbeatInterval == 0 {
1329+
chatHeartbeatInterval = DefaultChatHeartbeatInterval
1330+
}
1331+
1332+
clk := cfg.Clock
1333+
if clk == nil {
1334+
clk = quartz.NewReal()
1335+
}
1336+
13171337
workerID := cfg.ReplicaID
13181338
if workerID == uuid.Nil {
13191339
workerID = uuid.New()
@@ -1336,6 +1356,9 @@ func New(cfg Config) *Server {
13361356
pendingChatAcquireInterval: pendingChatAcquireInterval,
13371357
maxChatsPerAcquire: maxChatsPerAcquire,
13381358
inFlightChatStaleAfter: inFlightChatStaleAfter,
1359+
chatHeartbeatInterval: chatHeartbeatInterval,
1360+
usageTracker: cfg.UsageTracker,
1361+
clock: clk,
13391362
}
13401363

13411364
//nolint:gocritic // The chat processor uses a scoped chatd context.
@@ -2230,6 +2253,35 @@ func (p *Server) tryAutoPromoteQueuedMessage(
22302253
return &msg, remainingQueuedMessages, true, nil
22312254
}
22322255

2256+
// trackWorkspaceUsage bumps the workspace's last_used_at via the
2257+
// usage tracker. If wsID is not yet valid, it re-reads the chat
2258+
// from the DB to pick up late associations (e.g. create_workspace
2259+
// linking a workspace mid-conversation). The caller should store
2260+
// the returned value so that subsequent calls skip the DB lookup
2261+
// once a workspace has been found.
2262+
func (p *Server) trackWorkspaceUsage(
2263+
ctx context.Context,
2264+
chatID uuid.UUID,
2265+
wsID uuid.NullUUID,
2266+
logger slog.Logger,
2267+
) uuid.NullUUID {
2268+
if p.usageTracker == nil {
2269+
return wsID
2270+
}
2271+
if !wsID.Valid {
2272+
latest, err := p.db.GetChatByID(ctx, chatID)
2273+
if err != nil {
2274+
logger.Warn(ctx, "failed to re-read chat for workspace association", slog.Error(err))
2275+
return wsID
2276+
}
2277+
wsID = latest.WorkspaceID
2278+
}
2279+
if wsID.Valid {
2280+
p.usageTracker.Add(wsID.UUID)
2281+
}
2282+
return wsID
2283+
}
2284+
22332285
func (p *Server) processChat(ctx context.Context, chat database.Chat) {
22342286
logger := p.logger.With(slog.F("chat_id", chat.ID))
22352287
logger.Info(ctx, "processing chat request")
@@ -2248,7 +2300,7 @@ func (p *Server) processChat(ctx context.Context, chat database.Chat) {
22482300
// worker is still alive. The goroutine stops when chatCtx is
22492301
// canceled (either by completion or interruption).
22502302
go func() {
2251-
ticker := time.NewTicker(chatHeartbeatInterval)
2303+
ticker := p.clock.NewTicker(p.chatHeartbeatInterval, "chatd", "heartbeat")
22522304
defer ticker.Stop()
22532305
for {
22542306
select {
@@ -2267,6 +2319,7 @@ func (p *Server) processChat(ctx context.Context, chat database.Chat) {
22672319
cancel(chatloop.ErrInterrupted)
22682320
return
22692321
}
2322+
chat.WorkspaceID = p.trackWorkspaceUsage(chatCtx, chat.ID, chat.WorkspaceID, logger)
22702323
}
22712324
}
22722325
}()

coderd/chatd/chatd_test.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/coder/coder/v2/coderd/database/dbtestutil"
3434
dbpubsub "github.com/coder/coder/v2/coderd/database/pubsub"
3535
"github.com/coder/coder/v2/coderd/util/slice"
36+
"github.com/coder/coder/v2/coderd/workspacestats"
3637
"github.com/coder/coder/v2/codersdk"
3738
"github.com/coder/coder/v2/codersdk/workspacesdk"
3839
"github.com/coder/coder/v2/codersdk/workspacesdk/agentconnmock"
@@ -1956,6 +1957,199 @@ func TestStartWorkspaceTool_EndToEnd(t *testing.T) {
19561957
require.True(t, foundToolResultInSecondCall, "expected second streamed model call to include start_workspace tool output")
19571958
}
19581959

1960+
func TestHeartbeatBumpsWorkspaceUsage(t *testing.T) {
1961+
t.Parallel()
1962+
1963+
db, ps := dbtestutil.NewDB(t)
1964+
1965+
ctx := testutil.Context(t, testutil.WaitLong)
1966+
user, model := seedChatDependencies(ctx, t, db)
1967+
setOpenAIProviderBaseURL(ctx, t, db, chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse {
1968+
if !req.Stream {
1969+
return chattest.OpenAINonStreamingResponse("ok")
1970+
}
1971+
// Block until the request context is canceled so the chat
1972+
// stays in a processing state long enough for heartbeats
1973+
// to fire.
1974+
chunks := make(chan chattest.OpenAIChunk)
1975+
go func() {
1976+
defer close(chunks)
1977+
<-req.Context().Done()
1978+
}()
1979+
return chattest.OpenAIResponse{StreamingChunks: chunks}
1980+
}))
1981+
1982+
// Create a workspace that will be linked to the chat later,
1983+
// simulating the normal flow where a chat is created first
1984+
// and then creates a workspace via create_workspace.
1985+
org := dbgen.Organization(t, db, database.Organization{})
1986+
tmpl := dbgen.Template(t, db, database.Template{
1987+
OrganizationID: org.ID,
1988+
CreatedBy: user.ID,
1989+
})
1990+
ws := dbgen.Workspace(t, db, database.WorkspaceTable{
1991+
OwnerID: user.ID,
1992+
OrganizationID: org.ID,
1993+
TemplateID: tmpl.ID,
1994+
})
1995+
1996+
// Set up a short heartbeat interval and a UsageTracker that
1997+
// flushes frequently so last_used_at gets updated in the DB.
1998+
flushTick := make(chan time.Time)
1999+
flushDone := make(chan int, 1)
2000+
tracker := workspacestats.NewTracker(db,
2001+
workspacestats.TrackerWithTickFlush(flushTick, flushDone),
2002+
workspacestats.TrackerWithLogger(slogtest.Make(t, nil)),
2003+
)
2004+
t.Cleanup(func() { tracker.Close() })
2005+
2006+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
2007+
server := chatd.New(chatd.Config{
2008+
Logger: logger,
2009+
Database: db,
2010+
ReplicaID: uuid.New(),
2011+
Pubsub: ps,
2012+
PendingChatAcquireInterval: 10 * time.Millisecond,
2013+
InFlightChatStaleAfter: testutil.WaitLong,
2014+
ChatHeartbeatInterval: 100 * time.Millisecond,
2015+
UsageTracker: tracker,
2016+
})
2017+
t.Cleanup(func() {
2018+
require.NoError(t, server.Close())
2019+
})
2020+
2021+
// Create a chat WITHOUT a workspace, the normal starting state.
2022+
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
2023+
OwnerID: user.ID,
2024+
Title: "usage-tracking-test",
2025+
ModelConfigID: model.ID,
2026+
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
2027+
})
2028+
require.NoError(t, err)
2029+
2030+
// Wait for the chat to start processing and at least one
2031+
// heartbeat to fire.
2032+
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
2033+
fromDB, listErr := db.GetChatByID(ctx, chat.ID)
2034+
if listErr != nil {
2035+
return false
2036+
}
2037+
return fromDB.Status == database.ChatStatusRunning &&
2038+
fromDB.HeartbeatAt.Valid &&
2039+
fromDB.HeartbeatAt.Time.After(fromDB.CreatedAt)
2040+
}, testutil.IntervalFast,
2041+
"chat should be running with at least one heartbeat")
2042+
2043+
// Flush the tracker and verify nothing was tracked yet
2044+
// (no workspace linked).
2045+
testutil.RequireSend(ctx, t, flushTick, time.Now())
2046+
count := testutil.RequireReceive(ctx, t, flushDone)
2047+
require.Equal(t, 0, count,
2048+
"expected no workspaces to be flushed before association")
2049+
2050+
// Link the workspace to the chat in the DB, simulating what
2051+
// the create_workspace tool does mid-conversation.
2052+
_, err = db.UpdateChatWorkspace(ctx, database.UpdateChatWorkspaceParams{
2053+
WorkspaceID: uuid.NullUUID{UUID: ws.ID, Valid: true},
2054+
ID: chat.ID,
2055+
})
2056+
require.NoError(t, err)
2057+
2058+
// The heartbeat re-reads the workspace association from the DB
2059+
// on each tick. Wait for the tracker to pick it up.
2060+
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
2061+
select {
2062+
case flushTick <- time.Now():
2063+
case <-ctx.Done():
2064+
return false
2065+
}
2066+
select {
2067+
case c := <-flushDone:
2068+
return c > 0
2069+
case <-ctx.Done():
2070+
return false
2071+
}
2072+
}, testutil.IntervalMedium,
2073+
"expected usage tracker to flush the late-associated workspace")
2074+
2075+
// Verify the workspace's last_used_at was actually updated.
2076+
updatedWs, err := db.GetWorkspaceByID(ctx, ws.ID)
2077+
require.NoError(t, err)
2078+
require.True(t, updatedWs.LastUsedAt.After(ws.LastUsedAt),
2079+
"workspace last_used_at should have been bumped")
2080+
}
2081+
2082+
func TestHeartbeatNoWorkspaceNoBump(t *testing.T) {
2083+
t.Parallel()
2084+
2085+
db, ps := dbtestutil.NewDB(t)
2086+
2087+
ctx := testutil.Context(t, testutil.WaitLong)
2088+
user, model := seedChatDependencies(ctx, t, db)
2089+
setOpenAIProviderBaseURL(ctx, t, db, chattest.NewOpenAI(t, func(req *chattest.OpenAIRequest) chattest.OpenAIResponse {
2090+
if !req.Stream {
2091+
return chattest.OpenAINonStreamingResponse("ok")
2092+
}
2093+
chunks := make(chan chattest.OpenAIChunk)
2094+
go func() {
2095+
defer close(chunks)
2096+
<-req.Context().Done()
2097+
}()
2098+
return chattest.OpenAIResponse{StreamingChunks: chunks}
2099+
}))
2100+
2101+
// Set up UsageTracker with manual tick/flush.
2102+
usageTickCh := make(chan time.Time)
2103+
flushCh := make(chan int, 1)
2104+
tracker := workspacestats.NewTracker(db,
2105+
workspacestats.TrackerWithTickFlush(usageTickCh, flushCh),
2106+
workspacestats.TrackerWithLogger(slogtest.Make(t, nil)),
2107+
)
2108+
t.Cleanup(func() { tracker.Close() })
2109+
2110+
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
2111+
server := chatd.New(chatd.Config{
2112+
Logger: logger,
2113+
Database: db,
2114+
ReplicaID: uuid.New(),
2115+
Pubsub: ps,
2116+
PendingChatAcquireInterval: 10 * time.Millisecond,
2117+
InFlightChatStaleAfter: testutil.WaitLong,
2118+
ChatHeartbeatInterval: 100 * time.Millisecond,
2119+
})
2120+
t.Cleanup(func() {
2121+
require.NoError(t, server.Close())
2122+
})
2123+
2124+
// Create a chat WITHOUT linking a workspace.
2125+
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
2126+
OwnerID: user.ID,
2127+
Title: "no-workspace-test",
2128+
ModelConfigID: model.ID,
2129+
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
2130+
})
2131+
require.NoError(t, err)
2132+
2133+
// Wait for the chat to be acquired and at least one heartbeat
2134+
// to fire.
2135+
testutil.Eventually(ctx, t, func(ctx context.Context) bool {
2136+
fromDB, listErr := db.GetChatByID(ctx, chat.ID)
2137+
if listErr != nil {
2138+
return false
2139+
}
2140+
return fromDB.Status == database.ChatStatusRunning &&
2141+
fromDB.HeartbeatAt.Valid &&
2142+
fromDB.HeartbeatAt.Time.After(fromDB.CreatedAt)
2143+
}, testutil.IntervalFast,
2144+
"chat should be running with at least one heartbeat")
2145+
2146+
// Flush the tracker. Since no workspace was linked, count
2147+
// should be 0.
2148+
testutil.RequireSend(ctx, t, usageTickCh, time.Now())
2149+
count := testutil.RequireReceive(ctx, t, flushCh)
2150+
require.Equal(t, 0, count, "expected no workspaces to be flushed when chat has no workspace")
2151+
}
2152+
19592153
func newTestServer(
19602154
t *testing.T,
19612155
db database.Store,

coderd/coderd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,7 @@ func New(options *Options) *API {
787787
StartWorkspace: api.chatStartWorkspace,
788788
Pubsub: options.Pubsub,
789789
WebpushDispatcher: options.WebPushDispatcher,
790+
UsageTracker: options.WorkspaceUsageTracker,
790791
})
791792
gitSyncLogger := options.Logger.Named("gitsync")
792793
refresher := gitsync.NewRefresher(

0 commit comments

Comments
 (0)