Skip to content

Commit 13281d8

Browse files
committed
feat(coderd/database/pubsub): add batched pubsub with flush-failure fallback and sender reset
Adds a chatd-specific BatchingPubsub that routes publishes through a dedicated single-connection sender, coalescing notifications into single transactions on a 50ms timer. Includes flush-failure fallback to the shared delegate, automatic sender reset/recreate, expanded histogram buckets, and focused recovery tests.
1 parent d954460 commit 13281d8

13 files changed

Lines changed: 1810 additions & 20 deletions

File tree

cli/server.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,10 +768,32 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
768768
return xerrors.Errorf("create pubsub: %w", err)
769769
}
770770
options.Pubsub = ps
771+
options.ChatPubsub = ps
771772
if options.DeploymentValues.Prometheus.Enable {
772773
options.PrometheusRegistry.MustRegister(ps)
773774
}
774775
defer options.Pubsub.Close()
776+
if options.DeploymentValues.AI.Chat.PubsubBatchEnabled.Value() {
777+
chatPubsub, err := pubsub.NewBatching(
778+
ctx,
779+
logger.Named("chatd").Named("pubsub_batch"),
780+
ps,
781+
sqlDB,
782+
dbURL,
783+
pubsub.BatchingConfig{
784+
FlushInterval: options.DeploymentValues.AI.Chat.PubsubFlushInterval.Value(),
785+
QueueSize: int(options.DeploymentValues.AI.Chat.PubsubQueueSize.Value()),
786+
},
787+
)
788+
if err != nil {
789+
return xerrors.Errorf("create chat pubsub batcher: %w", err)
790+
}
791+
options.ChatPubsub = chatPubsub
792+
if options.DeploymentValues.Prometheus.Enable {
793+
options.PrometheusRegistry.MustRegister(chatPubsub)
794+
}
795+
defer options.ChatPubsub.Close()
796+
}
775797
psWatchdog := pubsub.NewWatchdog(ctx, logger.Named("pswatch"), ps)
776798
pubsubWatchdogTimeout = psWatchdog.Timeout()
777799
defer psWatchdog.Close()

coderd/apidoc/docs.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/apidoc/swagger.json

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coderd/coderd.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ type Options struct {
159159
Logger slog.Logger
160160
Database database.Store
161161
Pubsub pubsub.Pubsub
162-
RuntimeConfig *runtimeconfig.Manager
162+
// ChatPubsub allows chatd to use a dedicated publish path without changing
163+
// the shared pubsub used by the rest of coderd.
164+
ChatPubsub pubsub.Pubsub
165+
RuntimeConfig *runtimeconfig.Manager
163166

164167
// CacheDir is used for caching files served by the API.
165168
CacheDir string
@@ -777,6 +780,11 @@ func New(options *Options) *API {
777780
maxChatsPerAcquire = math.MinInt32
778781
}
779782

783+
chatPubsub := options.ChatPubsub
784+
if chatPubsub == nil {
785+
chatPubsub = options.Pubsub
786+
}
787+
780788
api.chatDaemon = chatd.New(chatd.Config{
781789
Logger: options.Logger.Named("chatd"),
782790
Database: options.Database,
@@ -789,7 +797,7 @@ func New(options *Options) *API {
789797
InstructionLookupTimeout: options.ChatdInstructionLookupTimeout,
790798
CreateWorkspace: api.chatCreateWorkspace,
791799
StartWorkspace: api.chatStartWorkspace,
792-
Pubsub: options.Pubsub,
800+
Pubsub: chatPubsub,
793801
WebpushDispatcher: options.WebPushDispatcher,
794802
UsageTracker: options.WorkspaceUsageTracker,
795803
})

0 commit comments

Comments
 (0)