From 13281d82355778fd0fa7a330db0a99db627171b1 Mon Sep 17 00:00:00 2001 From: Ethan Dickson Date: Thu, 9 Apr 2026 02:18:22 +0000 Subject: [PATCH] feat(coderd/database/pubsub): add batched pubsub with flush-failure fallback and sender reset Adds a chatd-specific BatchingPubsub that routes publishes through a dedicated single-connection sender, coalescing notifications into single transactions on a 50ms timer. Includes flush-failure fallback to the shared delegate, automatic sender reset/recreate, expanded histogram buckets, and focused recovery tests. --- cli/server.go | 22 + coderd/apidoc/docs.go | 9 + coderd/apidoc/swagger.json | 9 + coderd/coderd.go | 12 +- coderd/database/pubsub/batching.go | 950 ++++++++++++++++++ .../database/pubsub/batching_internal_test.go | 539 ++++++++++ coderd/database/pubsub/batching_test.go | 130 +++ coderd/database/pubsub/pubsub.go | 30 +- codersdk/deployment.go | 38 +- docs/reference/api/general.md | 5 +- docs/reference/api/schemas.md | 29 +- scripts/metricsdocgen/generated_metrics | 54 + site/src/api/typesGenerated.ts | 3 + 13 files changed, 1810 insertions(+), 20 deletions(-) create mode 100644 coderd/database/pubsub/batching.go create mode 100644 coderd/database/pubsub/batching_internal_test.go create mode 100644 coderd/database/pubsub/batching_test.go diff --git a/cli/server.go b/cli/server.go index 6acbed6b80386..a436028a65a9e 100644 --- a/cli/server.go +++ b/cli/server.go @@ -768,10 +768,32 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. return xerrors.Errorf("create pubsub: %w", err) } options.Pubsub = ps + options.ChatPubsub = ps if options.DeploymentValues.Prometheus.Enable { options.PrometheusRegistry.MustRegister(ps) } defer options.Pubsub.Close() + if options.DeploymentValues.AI.Chat.PubsubBatchEnabled.Value() { + chatPubsub, err := pubsub.NewBatching( + ctx, + logger.Named("chatd").Named("pubsub_batch"), + ps, + sqlDB, + dbURL, + pubsub.BatchingConfig{ + FlushInterval: options.DeploymentValues.AI.Chat.PubsubFlushInterval.Value(), + QueueSize: int(options.DeploymentValues.AI.Chat.PubsubQueueSize.Value()), + }, + ) + if err != nil { + return xerrors.Errorf("create chat pubsub batcher: %w", err) + } + options.ChatPubsub = chatPubsub + if options.DeploymentValues.Prometheus.Enable { + options.PrometheusRegistry.MustRegister(chatPubsub) + } + defer options.ChatPubsub.Close() + } psWatchdog := pubsub.NewWatchdog(ctx, logger.Named("pswatch"), ps) pubsubWatchdogTimeout = psWatchdog.Timeout() defer psWatchdog.Close() diff --git a/coderd/apidoc/docs.go b/coderd/apidoc/docs.go index 662ce7464b7d1..0231ac4443dc4 100644 --- a/coderd/apidoc/docs.go +++ b/coderd/apidoc/docs.go @@ -14479,6 +14479,15 @@ const docTemplate = `{ "properties": { "acquire_batch_size": { "type": "integer" + }, + "pubsub_batch_enabled": { + "type": "boolean" + }, + "pubsub_flush_interval": { + "type": "integer" + }, + "pubsub_queue_size": { + "type": "integer" } } }, diff --git a/coderd/apidoc/swagger.json b/coderd/apidoc/swagger.json index 91fe3e819bb9d..a638f509506d9 100644 --- a/coderd/apidoc/swagger.json +++ b/coderd/apidoc/swagger.json @@ -13014,6 +13014,15 @@ "properties": { "acquire_batch_size": { "type": "integer" + }, + "pubsub_batch_enabled": { + "type": "boolean" + }, + "pubsub_flush_interval": { + "type": "integer" + }, + "pubsub_queue_size": { + "type": "integer" } } }, diff --git a/coderd/coderd.go b/coderd/coderd.go index e1ae2a9502a2a..8a1c89772e054 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -159,7 +159,10 @@ type Options struct { Logger slog.Logger Database database.Store Pubsub pubsub.Pubsub - RuntimeConfig *runtimeconfig.Manager + // ChatPubsub allows chatd to use a dedicated publish path without changing + // the shared pubsub used by the rest of coderd. + ChatPubsub pubsub.Pubsub + RuntimeConfig *runtimeconfig.Manager // CacheDir is used for caching files served by the API. CacheDir string @@ -777,6 +780,11 @@ func New(options *Options) *API { maxChatsPerAcquire = math.MinInt32 } + chatPubsub := options.ChatPubsub + if chatPubsub == nil { + chatPubsub = options.Pubsub + } + api.chatDaemon = chatd.New(chatd.Config{ Logger: options.Logger.Named("chatd"), Database: options.Database, @@ -789,7 +797,7 @@ func New(options *Options) *API { InstructionLookupTimeout: options.ChatdInstructionLookupTimeout, CreateWorkspace: api.chatCreateWorkspace, StartWorkspace: api.chatStartWorkspace, - Pubsub: options.Pubsub, + Pubsub: chatPubsub, WebpushDispatcher: options.WebPushDispatcher, UsageTracker: options.WorkspaceUsageTracker, }) diff --git a/coderd/database/pubsub/batching.go b/coderd/database/pubsub/batching.go new file mode 100644 index 0000000000000..99d5197f48598 --- /dev/null +++ b/coderd/database/pubsub/batching.go @@ -0,0 +1,950 @@ +package pubsub + +import ( + "bytes" + "context" + "database/sql" + "errors" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/xerrors" + + "cdr.dev/slog/v3" + "github.com/coder/quartz" +) + +const ( + // DefaultBatchingFlushInterval is the default upper bound on how long chatd + // publishes wait before a scheduled flush when nearby publishes do not + // naturally coalesce sooner. + DefaultBatchingFlushInterval = 50 * time.Millisecond + // DefaultBatchingQueueSize is the default number of buffered chatd publish + // requests waiting to be flushed. + DefaultBatchingQueueSize = 8192 + + defaultBatchingPressureWait = 10 * time.Millisecond + defaultBatchingFinalFlushLimit = 15 * time.Second + batchingWarnInterval = 10 * time.Second + + batchFlushScheduled = "scheduled" + batchFlushPressure = "pressure" + batchFlushShutdown = "shutdown" + + batchFlushStageNone = "none" + batchFlushStageBegin = "begin" + batchFlushStageExec = "exec" + batchFlushStageCommit = "commit" + batchFlushStageUnknown = "unknown" + + batchDelegateFallbackReasonQueueFull = "queue_full" + batchDelegateFallbackReasonFlushError = "flush_error" + + batchResultAccepted = "accepted" + batchResultRejected = "rejected" + batchResultSuccess = "success" + batchResultError = "error" + + batchChannelClassStreamNotify = "stream_notify" + batchChannelClassOwnerEvent = "owner_event" + batchChannelClassConfigChange = "config_change" + batchChannelClassOther = "other" +) + +var ( + // ErrBatchingPubsubClosed is returned when a batched pubsub publish is + // attempted after shutdown has started. + ErrBatchingPubsubClosed = xerrors.New("batched pubsub is closed") + // ErrBatchingPubsubQueueFull is retained for compatibility with older + // callers. The current batching path falls back to the shared delegate when + // pressure persists instead of returning this error. + ErrBatchingPubsubQueueFull = xerrors.New("batched pubsub queue is full") +) + +// BatchingConfig controls the chatd-specific PostgreSQL pubsub batching path. +// Flush timing is automatic: the run loop wakes every FlushInterval (or on +// backpressure) and drains everything currently queued into a single +// transaction. There is no fixed batch-size knob — the batch size is simply +// whatever accumulated since the last flush, which naturally adapts to load. +type BatchingConfig struct { + FlushInterval time.Duration + QueueSize int + PressureWait time.Duration + FinalFlushTimeout time.Duration + Clock quartz.Clock +} + +type queuedPublish struct { + event string + channelClass string + message []byte + enqueuedAt time.Time +} + +type batchSender interface { + Flush(ctx context.Context, batch []queuedPublish) error + Close() error +} + +type batchFlushError struct { + stage string + err error +} + +func (e *batchFlushError) Error() string { + return e.err.Error() +} + +func (e *batchFlushError) Unwrap() error { + return e.err +} + +// BatchingPubsub batches chatd publish traffic onto a dedicated PostgreSQL +// sender connection while delegating subscribe behavior to the shared listener +// pubsub instance. +type BatchingPubsub struct { + logger slog.Logger + delegate *PGPubsub + sender batchSender + newSender func(context.Context) (batchSender, error) + clock quartz.Clock + + publishCh chan queuedPublish + flushCh chan struct{} + closeCh chan struct{} + doneCh chan struct{} + + spaceMu sync.Mutex + spaceSignal chan struct{} + + warnTicker *quartz.Ticker + + flushInterval time.Duration + pressureWait time.Duration + finalFlushTimeout time.Duration + + queuedCount atomic.Int64 + queueDepthHighWatermark atomic.Int64 + closed atomic.Bool + closeOnce sync.Once + closeErr error + runErr error + + runCtx context.Context + cancel context.CancelFunc + metrics batchingMetrics +} + +type batchingMetrics struct { + QueueDepth prometheus.Gauge + QueueDepthHighWatermark prometheus.Gauge + QueueCapacity prometheus.Gauge + BatchSize prometheus.Histogram + LogicalPublishesTotal *prometheus.CounterVec + LogicalPublishBytesTotal *prometheus.CounterVec + FlushedNotifications *prometheus.CounterVec + FlushedBytes *prometheus.CounterVec + QueueWait *prometheus.HistogramVec + FlushesTotal *prometheus.CounterVec + FlushDuration *prometheus.HistogramVec + FlushAttemptsTotal *prometheus.CounterVec + FlushFailuresTotal *prometheus.CounterVec + PublishRejectionsTotal *prometheus.CounterVec + DelegateFallbacksTotal *prometheus.CounterVec + SenderResetsTotal prometheus.Counter + SenderResetFailuresTotal prometheus.Counter + FlushInflight prometheus.Gauge +} + +func newBatchingMetrics() batchingMetrics { + return batchingMetrics{ + QueueDepth: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_queue_depth", + Help: "The number of chatd notifications waiting in the batching queue.", + }), + QueueDepthHighWatermark: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_queue_depth_high_watermark", + Help: "The highest chatd batching queue depth observed since process start.", + }), + QueueCapacity: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_queue_capacity", + Help: "The configured capacity of the chatd batching queue.", + }), + BatchSize: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_size", + Help: "The number of logical notifications sent in each chatd batch flush.", + Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}, + }), + LogicalPublishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_logical_publishes_total", + Help: "The number of logical chatd publishes seen by the batching wrapper by channel class and result.", + }, []string{"channel_class", "result"}), + LogicalPublishBytesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_logical_publish_bytes_total", + Help: "The number of accepted chatd payload bytes enqueued into the batching wrapper by channel class.", + }, []string{"channel_class"}), + FlushedNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_flushed_notifications_total", + Help: "The number of logical chatd notifications removed from the batching queue for flush attempts by reason.", + }, []string{"reason"}), + FlushedBytes: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_flushed_bytes_total", + Help: "The number of chatd payload bytes removed from the batching queue for flush attempts by reason.", + }, []string{"reason"}), + QueueWait: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_queue_wait_seconds", + Help: "The time accepted chatd publishes spent waiting in the batching queue before a flush attempt started.", + Buckets: []float64{0.0001, 0.00025, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1}, + }, []string{"channel_class"}), + FlushesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_flushes_total", + Help: "The number of chatd batch flush attempts by reason.", + }, []string{"reason"}), + FlushDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_flush_duration_seconds", + Help: "The time spent flushing one chatd batch to PostgreSQL.", + Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 20, 30}, + }, []string{"reason"}), + FlushAttemptsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_flush_attempts_total", + Help: "The number of chatd sender flush stages by stage and result.", + }, []string{"stage", "result"}), + FlushFailuresTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_flush_failures_total", + Help: "The number of failed chatd batch flushes by reason.", + }, []string{"reason"}), + PublishRejectionsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_publish_rejections_total", + Help: "The number of chatd publishes rejected by the batching queue.", + }, []string{"reason"}), + DelegateFallbacksTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_delegate_fallbacks_total", + Help: "The number of chatd publishes that fell back to the shared pubsub pool by channel class, reason, and flush stage.", + }, []string{"channel_class", "reason", "stage"}), + SenderResetsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_sender_resets_total", + Help: "The number of successful batched pubsub sender resets after flush failures.", + }), + SenderResetFailuresTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_sender_reset_failures_total", + Help: "The number of batched pubsub sender reset attempts that failed.", + }), + FlushInflight: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "coder", + Subsystem: "pubsub", + Name: "batch_flush_inflight", + Help: "Whether a chatd batch flush is currently executing against the dedicated sender connection.", + }), + } +} + +func (m batchingMetrics) Describe(descs chan<- *prometheus.Desc) { + m.QueueDepth.Describe(descs) + m.QueueDepthHighWatermark.Describe(descs) + m.QueueCapacity.Describe(descs) + m.BatchSize.Describe(descs) + m.LogicalPublishesTotal.Describe(descs) + m.LogicalPublishBytesTotal.Describe(descs) + m.FlushedNotifications.Describe(descs) + m.FlushedBytes.Describe(descs) + m.QueueWait.Describe(descs) + m.FlushesTotal.Describe(descs) + m.FlushDuration.Describe(descs) + m.FlushAttemptsTotal.Describe(descs) + m.FlushFailuresTotal.Describe(descs) + m.PublishRejectionsTotal.Describe(descs) + m.DelegateFallbacksTotal.Describe(descs) + m.SenderResetsTotal.Describe(descs) + m.SenderResetFailuresTotal.Describe(descs) + m.FlushInflight.Describe(descs) +} + +func (m batchingMetrics) Collect(metrics chan<- prometheus.Metric) { + m.QueueDepth.Collect(metrics) + m.QueueDepthHighWatermark.Collect(metrics) + m.QueueCapacity.Collect(metrics) + m.BatchSize.Collect(metrics) + m.LogicalPublishesTotal.Collect(metrics) + m.LogicalPublishBytesTotal.Collect(metrics) + m.FlushedNotifications.Collect(metrics) + m.FlushedBytes.Collect(metrics) + m.QueueWait.Collect(metrics) + m.FlushesTotal.Collect(metrics) + m.FlushDuration.Collect(metrics) + m.FlushAttemptsTotal.Collect(metrics) + m.FlushFailuresTotal.Collect(metrics) + m.PublishRejectionsTotal.Collect(metrics) + m.DelegateFallbacksTotal.Collect(metrics) + m.SenderResetsTotal.Collect(metrics) + m.SenderResetFailuresTotal.Collect(metrics) + m.FlushInflight.Collect(metrics) +} + +// NewBatching creates a chatd-specific batched pubsub wrapper around the +// shared PostgreSQL listener implementation. +func NewBatching( + ctx context.Context, + logger slog.Logger, + delegate *PGPubsub, + prototype *sql.DB, + connectURL string, + cfg BatchingConfig, +) (*BatchingPubsub, error) { + if delegate == nil { + return nil, xerrors.New("delegate pubsub is nil") + } + if prototype == nil { + return nil, xerrors.New("prototype database is nil") + } + if connectURL == "" { + return nil, xerrors.New("connect URL is empty") + } + + newSender := func(ctx context.Context) (batchSender, error) { + return newPGBatchSender(ctx, logger.Named("sender"), prototype, connectURL) + } + + sender, err := newSender(ctx) + if err != nil { + return nil, err + } + + ps, err := newBatchingPubsub(logger, delegate, sender, cfg) + if err != nil { + _ = sender.Close() + return nil, err + } + ps.newSender = newSender + return ps, nil +} + +func newBatchingPubsub( + logger slog.Logger, + delegate *PGPubsub, + sender batchSender, + cfg BatchingConfig, +) (*BatchingPubsub, error) { + if delegate == nil { + return nil, xerrors.New("delegate pubsub is nil") + } + if sender == nil { + return nil, xerrors.New("batch sender is nil") + } + + flushInterval := cfg.FlushInterval + if flushInterval == 0 { + flushInterval = DefaultBatchingFlushInterval + } + if flushInterval < 0 { + return nil, xerrors.New("flush interval must be positive") + } + + queueSize := cfg.QueueSize + if queueSize == 0 { + queueSize = DefaultBatchingQueueSize + } + if queueSize < 0 { + return nil, xerrors.New("queue size must be positive") + } + + pressureWait := cfg.PressureWait + if pressureWait == 0 { + pressureWait = defaultBatchingPressureWait + } + if pressureWait < 0 { + return nil, xerrors.New("pressure wait must be positive") + } + + finalFlushTimeout := cfg.FinalFlushTimeout + if finalFlushTimeout == 0 { + finalFlushTimeout = defaultBatchingFinalFlushLimit + } + if finalFlushTimeout < 0 { + return nil, xerrors.New("final flush timeout must be positive") + } + + clock := cfg.Clock + if clock == nil { + clock = quartz.NewReal() + } + + runCtx, cancel := context.WithCancel(context.Background()) + ps := &BatchingPubsub{ + logger: logger, + delegate: delegate, + sender: sender, + clock: clock, + publishCh: make(chan queuedPublish, queueSize), + flushCh: make(chan struct{}, 1), + closeCh: make(chan struct{}), + doneCh: make(chan struct{}), + spaceSignal: make(chan struct{}), + warnTicker: clock.NewTicker(batchingWarnInterval, "pubsubBatcher", "warn"), + flushInterval: flushInterval, + pressureWait: pressureWait, + finalFlushTimeout: finalFlushTimeout, + runCtx: runCtx, + cancel: cancel, + metrics: newBatchingMetrics(), + } + ps.metrics.QueueDepth.Set(0) + ps.metrics.QueueDepthHighWatermark.Set(0) + ps.metrics.QueueCapacity.Set(float64(queueSize)) + ps.metrics.FlushInflight.Set(0) + + go ps.run() + return ps, nil +} + +// Describe implements prometheus.Collector. +func (p *BatchingPubsub) Describe(descs chan<- *prometheus.Desc) { + p.metrics.Describe(descs) +} + +// Collect implements prometheus.Collector. +func (p *BatchingPubsub) Collect(metrics chan<- prometheus.Metric) { + p.metrics.Collect(metrics) +} + +// Subscribe delegates to the shared PostgreSQL listener pubsub. +func (p *BatchingPubsub) Subscribe(event string, listener Listener) (func(), error) { + return p.delegate.Subscribe(event, listener) +} + +// SubscribeWithErr delegates to the shared PostgreSQL listener pubsub. +func (p *BatchingPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (func(), error) { + return p.delegate.SubscribeWithErr(event, listener) +} + +// Publish enqueues a logical notification for asynchronous batched delivery. +func (p *BatchingPubsub) Publish(event string, message []byte) error { + channelClass := batchChannelClass(event) + if p.closed.Load() { + p.observeRejectedPublish(channelClass, "closed") + return ErrBatchingPubsubClosed + } + + req := queuedPublish{ + event: event, + channelClass: channelClass, + message: bytes.Clone(message), + } + req.enqueuedAt = p.clock.Now() + if p.tryEnqueue(req) { + p.observeAcceptedPublish(req) + return nil + } + + timer := p.clock.NewTimer(p.pressureWait, "pubsubBatcher", "pressureWait") + defer timer.Stop("pubsubBatcher", "pressureWait") + + for { + if p.closed.Load() { + p.observeRejectedPublish(channelClass, "closed") + return ErrBatchingPubsubClosed + } + p.signalPressureFlush() + spaceSignal := p.currentSpaceSignal() + req.enqueuedAt = p.clock.Now() + if p.tryEnqueue(req) { + p.observeAcceptedPublish(req) + return nil + } + + select { + case <-spaceSignal: + continue + case <-timer.C: + req.enqueuedAt = p.clock.Now() + if p.tryEnqueue(req) { + p.observeAcceptedPublish(req) + return nil + } + // The batching queue is still full after a pressure + // flush and brief wait. Fall back to the shared + // pubsub pool so the notification is still delivered + // rather than dropped. + p.observeDelegateFallback(channelClass, batchDelegateFallbackReasonQueueFull, batchFlushStageNone) + p.logPublishRejection(event) + return p.delegate.Publish(event, message) + case <-p.doneCh: + p.observeRejectedPublish(channelClass, "closed") + return ErrBatchingPubsubClosed + } + } +} + +// Close stops accepting new publishes, performs a bounded best-effort drain, +// and then closes the dedicated sender connection. +func (p *BatchingPubsub) Close() error { + p.closeOnce.Do(func() { + p.closed.Store(true) + p.cancel() + p.notifySpaceAvailable() + close(p.closeCh) + <-p.doneCh + p.closeErr = p.runErr + }) + return p.closeErr +} + +func (p *BatchingPubsub) tryEnqueue(req queuedPublish) bool { + if p.closed.Load() { + return false + } + select { + case p.publishCh <- req: + queuedDepth := p.queuedCount.Add(1) + p.observeQueueDepth(queuedDepth) + return true + default: + return false + } +} + +func (p *BatchingPubsub) observeQueueDepth(depth int64) { + p.metrics.QueueDepth.Set(float64(depth)) + for { + currentMax := p.queueDepthHighWatermark.Load() + if depth <= currentMax { + return + } + if p.queueDepthHighWatermark.CompareAndSwap(currentMax, depth) { + p.metrics.QueueDepthHighWatermark.Set(float64(depth)) + return + } + } +} + +func (p *BatchingPubsub) signalPressureFlush() { + select { + case p.flushCh <- struct{}{}: + default: + } +} + +func (p *BatchingPubsub) currentSpaceSignal() <-chan struct{} { + p.spaceMu.Lock() + defer p.spaceMu.Unlock() + return p.spaceSignal +} + +func (p *BatchingPubsub) notifySpaceAvailable() { + p.spaceMu.Lock() + defer p.spaceMu.Unlock() + close(p.spaceSignal) + p.spaceSignal = make(chan struct{}) +} + +func batchChannelClass(event string) string { + switch { + case strings.HasPrefix(event, "chat:stream:"): + return batchChannelClassStreamNotify + case strings.HasPrefix(event, "chat:owner:"): + return batchChannelClassOwnerEvent + case event == "chat:config_change": + return batchChannelClassConfigChange + default: + return batchChannelClassOther + } +} + +func (p *BatchingPubsub) observeAcceptedPublish(req queuedPublish) { + p.metrics.LogicalPublishesTotal.WithLabelValues(req.channelClass, batchResultAccepted).Inc() + p.metrics.LogicalPublishBytesTotal.WithLabelValues(req.channelClass).Add(float64(len(req.message))) +} + +func (p *BatchingPubsub) observeRejectedPublish(channelClass string, reason string) { + p.metrics.LogicalPublishesTotal.WithLabelValues(channelClass, batchResultRejected).Inc() + p.metrics.PublishRejectionsTotal.WithLabelValues(reason).Inc() +} + +func (p *BatchingPubsub) observeDelegateFallback(channelClass string, reason string, stage string) { + p.metrics.DelegateFallbacksTotal.WithLabelValues(channelClass, reason, stage).Inc() +} + +func (p *BatchingPubsub) observeDelegateFallbackBatch(batch []queuedPublish, reason string, stage string) { + if len(batch) == 0 { + return + } + counts := make(map[string]int) + for _, item := range batch { + counts[item.channelClass]++ + } + for channelClass, count := range counts { + p.metrics.DelegateFallbacksTotal.WithLabelValues(channelClass, reason, stage).Add(float64(count)) + } +} + +func batchFlushStage(err error) string { + if err == nil { + return batchFlushStageCommit + } + var flushErr *batchFlushError + if errors.As(err, &flushErr) { + return flushErr.stage + } + return batchFlushStageUnknown +} + +func (p *BatchingPubsub) observeFlushStageResults(err error) { + stage := batchFlushStage(err) + + switch stage { + case batchFlushStageBegin: + p.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageBegin, batchResultError).Inc() + return + case batchFlushStageExec: + p.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageBegin, batchResultSuccess).Inc() + p.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageExec, batchResultError).Inc() + return + case batchFlushStageCommit: + p.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageBegin, batchResultSuccess).Inc() + p.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageExec, batchResultSuccess).Inc() + result := batchResultSuccess + if err != nil { + result = batchResultError + } + p.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageCommit, result).Inc() + return + default: + p.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageUnknown, batchResultError).Inc() + return + } +} + +func (p *BatchingPubsub) run() { + defer close(p.doneCh) + defer p.warnTicker.Stop("pubsubBatcher", "warn") + + batch := make([]queuedPublish, 0, 64) + timer := p.clock.NewTimer(p.flushInterval, "pubsubBatcher", "scheduledFlush") + defer timer.Stop("pubsubBatcher", "scheduledFlush") + + flush := func(reason string) { + batch = p.drainIntoBatch(batch) + batch, _ = p.flushBatch(p.runCtx, batch, reason) + timer.Reset(p.flushInterval, "pubsubBatcher", reason+"Flush") + } + + for { + select { + case item := <-p.publishCh: + // An item arrived before the timer fired. Append it and + // let the timer or pressure signal trigger the actual + // flush so that nearby publishes coalesce naturally. + batch = append(batch, item) + p.notifySpaceAvailable() + case <-timer.C: + flush(batchFlushScheduled) + case <-p.flushCh: + flush(batchFlushPressure) + case <-p.closeCh: + p.runErr = errors.Join(p.drain(batch), p.sender.Close()) + return + } + } +} + +func (p *BatchingPubsub) drainIntoBatch(batch []queuedPublish) []queuedPublish { + drained := false + for { + select { + case item := <-p.publishCh: + batch = append(batch, item) + drained = true + default: + if drained { + p.notifySpaceAvailable() + } + return batch + } + } +} + +func (p *BatchingPubsub) flushBatch( + ctx context.Context, + batch []queuedPublish, + reason string, +) ([]queuedPublish, error) { + if len(batch) == 0 { + return batch[:0], nil + } + + count := len(batch) + totalBytes := 0 + start := p.clock.Now() + for _, item := range batch { + totalBytes += len(item.message) + queueWait := start.Sub(item.enqueuedAt) + if queueWait < 0 { + queueWait = 0 + } + p.metrics.QueueWait.WithLabelValues(item.channelClass).Observe(queueWait.Seconds()) + } + + p.metrics.FlushesTotal.WithLabelValues(reason).Inc() + p.metrics.BatchSize.Observe(float64(count)) + p.metrics.FlushedNotifications.WithLabelValues(reason).Add(float64(count)) + p.metrics.FlushedBytes.WithLabelValues(reason).Add(float64(totalBytes)) + p.metrics.FlushInflight.Set(1) + senderErr := p.sender.Flush(ctx, batch) + p.metrics.FlushInflight.Set(0) + p.observeFlushStageResults(senderErr) + elapsed := p.clock.Since(start) + p.metrics.FlushDuration.WithLabelValues(reason).Observe(elapsed.Seconds()) + + var err error + if senderErr != nil { + p.metrics.FlushFailuresTotal.WithLabelValues(reason).Inc() + stage := batchFlushStage(senderErr) + delivered, failed, fallbackErr := p.replayBatchViaDelegate(batch, batchDelegateFallbackReasonFlushError, stage) + var resetErr error + if reason != batchFlushShutdown { + resetErr = p.resetSender() + } + p.logFlushFailure(reason, stage, count, totalBytes, delivered, failed, senderErr, fallbackErr, resetErr) + if fallbackErr != nil { + err = errors.Join(senderErr, fallbackErr) + if resetErr != nil { + err = errors.Join(err, resetErr) + } + } + } else if p.delegate != nil { + p.delegate.publishesTotal.WithLabelValues("true").Add(float64(count)) + p.delegate.publishedBytesTotal.Add(float64(totalBytes)) + } + + queuedDepth := p.queuedCount.Add(-int64(count)) + p.observeQueueDepth(queuedDepth) + clear(batch) + return batch[:0], err +} + +func (p *BatchingPubsub) replayBatchViaDelegate(batch []queuedPublish, reason string, stage string) (int, int, error) { + if len(batch) == 0 { + return 0, 0, nil + } + p.observeDelegateFallbackBatch(batch, reason, stage) + if p.delegate == nil { + return 0, len(batch), xerrors.New("delegate pubsub is nil") + } + + var ( + delivered int + failed int + errs []error + ) + for _, item := range batch { + if err := p.delegate.Publish(item.event, item.message); err != nil { + failed++ + errs = append(errs, xerrors.Errorf("delegate publish %q: %w", item.event, err)) + continue + } + delivered++ + } + return delivered, failed, errors.Join(errs...) +} + +func (p *BatchingPubsub) resetSender() error { + if p.newSender == nil { + return nil + } + newSender, err := p.newSender(context.Background()) + if err != nil { + p.metrics.SenderResetFailuresTotal.Inc() + return err + } + oldSender := p.sender + p.sender = newSender + p.metrics.SenderResetsTotal.Inc() + if oldSender == nil { + return nil + } + if err := oldSender.Close(); err != nil { + p.logger.Warn(context.Background(), "failed to close old batched pubsub sender after reset", slog.Error(err)) + } + return nil +} + +func (p *BatchingPubsub) logFlushFailure(reason string, stage string, count int, totalBytes int, delivered int, failed int, senderErr error, fallbackErr error, resetErr error) { + fields := []slog.Field{ + slog.F("reason", reason), + slog.F("stage", stage), + slog.F("count", count), + slog.F("total_bytes", totalBytes), + slog.F("delegate_delivered", delivered), + slog.F("delegate_failed", failed), + slog.Error(senderErr), + } + if fallbackErr != nil { + fields = append(fields, slog.F("delegate_error", fallbackErr.Error())) + } + if resetErr != nil { + fields = append(fields, slog.F("sender_reset_error", resetErr.Error())) + } + p.logger.Error(context.Background(), "batched pubsub flush failed", fields...) +} + +func (p *BatchingPubsub) drain(batch []queuedPublish) error { + ctx, cancel := context.WithTimeout(context.Background(), p.finalFlushTimeout) + defer cancel() + + var errs []error + for { + batch = p.drainIntoBatch(batch) + if len(batch) == 0 { + break + } + var err error + batch, err = p.flushBatch(ctx, batch, batchFlushShutdown) + if err != nil { + errs = append(errs, err) + } + if ctx.Err() != nil { + break + } + } + + dropped := p.dropPendingPublishes() + if dropped > 0 { + errs = append(errs, xerrors.Errorf("dropped %d queued notifications during shutdown", dropped)) + } + if ctx.Err() != nil { + errs = append(errs, xerrors.Errorf("shutdown flush timed out: %w", ctx.Err())) + } + return errors.Join(errs...) +} + +func (p *BatchingPubsub) dropPendingPublishes() int { + count := 0 + for { + select { + case <-p.publishCh: + count++ + default: + if count > 0 { + queuedDepth := p.queuedCount.Add(-int64(count)) + p.observeQueueDepth(queuedDepth) + } + return count + } + } +} + +func (p *BatchingPubsub) logPublishRejection(event string) { + fields := []slog.Field{ + slog.F("event", event), + slog.F("queue_size", cap(p.publishCh)), + slog.F("queued", p.queuedCount.Load()), + } + select { + case <-p.warnTicker.C: + p.logger.Warn(context.Background(), "batched pubsub queue is full", fields...) + default: + p.logger.Debug(context.Background(), "batched pubsub queue is full", fields...) + } +} + +type pgBatchSender struct { + logger slog.Logger + db *sql.DB +} + +func newPGBatchSender( + ctx context.Context, + logger slog.Logger, + prototype *sql.DB, + connectURL string, +) (*pgBatchSender, error) { + connector, err := newConnector(ctx, logger, prototype, connectURL) + if err != nil { + return nil, err + } + + db := sql.OpenDB(connector) + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + db.SetConnMaxIdleTime(0) + db.SetConnMaxLifetime(0) + + pingCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err := db.PingContext(pingCtx); err != nil { + _ = db.Close() + return nil, xerrors.Errorf("ping batched pubsub sender database: %w", err) + } + + return &pgBatchSender{logger: logger, db: db}, nil +} + +func (s *pgBatchSender) Flush(ctx context.Context, batch []queuedPublish) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return &batchFlushError{stage: batchFlushStageBegin, err: xerrors.Errorf("begin batched pubsub transaction: %w", err)} + } + committed := false + defer func() { + if !committed { + _ = tx.Rollback() + } + }() + + for _, item := range batch { + // This is safe because we are calling pq.QuoteLiteral. pg_notify does + // not support the first parameter being a prepared statement. + //nolint:gosec + _, err = tx.ExecContext(ctx, `select pg_notify(`+pq.QuoteLiteral(item.event)+`, $1)`, item.message) + if err != nil { + return &batchFlushError{stage: batchFlushStageExec, err: xerrors.Errorf("exec pg_notify: %w", err)} + } + } + + if err := tx.Commit(); err != nil { + return &batchFlushError{stage: batchFlushStageCommit, err: xerrors.Errorf("commit batched pubsub transaction: %w", err)} + } + committed = true + return nil +} + +func (s *pgBatchSender) Close() error { + return s.db.Close() +} diff --git a/coderd/database/pubsub/batching_internal_test.go b/coderd/database/pubsub/batching_internal_test.go new file mode 100644 index 0000000000000..3ca29fe32c18e --- /dev/null +++ b/coderd/database/pubsub/batching_internal_test.go @@ -0,0 +1,539 @@ +package pubsub + +import ( + "bytes" + "context" + "database/sql" + "sync" + "testing" + "time" + + _ "github.com/lib/pq" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + + "cdr.dev/slog/v3" + "cdr.dev/slog/v3/sloggers/slogtest" + "github.com/coder/coder/v2/testutil" + "github.com/coder/quartz" +) + +func TestBatchingPubsubScheduledFlush(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + clock := quartz.NewMock(t) + newTimerTrap := clock.Trap().NewTimer("pubsubBatcher", "scheduledFlush") + defer newTimerTrap.Close() + resetTrap := clock.Trap().TimerReset("pubsubBatcher", "scheduledFlush") + defer resetTrap.Close() + + sender := newFakeBatchSender() + ps, _ := newTestBatchingPubsub(t, sender, BatchingConfig{ + Clock: clock, + FlushInterval: 10 * time.Millisecond, + QueueSize: 8, + }) + + call, err := newTimerTrap.Wait(ctx) + require.NoError(t, err) + call.MustRelease(ctx) + + require.NoError(t, ps.Publish("chat:stream:a", []byte("one"))) + require.NoError(t, ps.Publish("chat:stream:a", []byte("two"))) + require.Empty(t, sender.Batches()) + + clock.Advance(10 * time.Millisecond).MustWait(ctx) + resetCall, err := resetTrap.Wait(ctx) + require.NoError(t, err) + resetCall.MustRelease(ctx) + + batch := testutil.TryReceive(ctx, t, sender.flushes) + require.Len(t, batch, 2) + require.Equal(t, []byte("one"), batch[0].message) + require.Equal(t, []byte("two"), batch[1].message) + require.Equal(t, float64(2), prom_testutil.ToFloat64(ps.metrics.LogicalPublishesTotal.WithLabelValues(batchChannelClassStreamNotify, batchResultAccepted))) + require.Zero(t, prom_testutil.ToFloat64(ps.metrics.LogicalPublishesTotal.WithLabelValues(batchChannelClassStreamNotify, batchResultRejected))) + require.Equal(t, float64(6), prom_testutil.ToFloat64(ps.metrics.LogicalPublishBytesTotal.WithLabelValues(batchChannelClassStreamNotify))) + require.Equal(t, float64(2), prom_testutil.ToFloat64(ps.metrics.QueueDepthHighWatermark)) + require.Equal(t, float64(2), prom_testutil.ToFloat64(ps.metrics.FlushedNotifications.WithLabelValues(batchFlushScheduled))) + require.Equal(t, float64(6), prom_testutil.ToFloat64(ps.metrics.FlushedBytes.WithLabelValues(batchFlushScheduled))) + queueWaitCount, queueWaitSum := histogramCountAndSum(t, ps.metrics.QueueWait.WithLabelValues(batchChannelClassStreamNotify)) + require.Equal(t, uint64(2), queueWaitCount) + require.InDelta(t, 0.02, queueWaitSum, 0.000001) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageBegin, batchResultSuccess))) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageExec, batchResultSuccess))) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageCommit, batchResultSuccess))) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushesTotal.WithLabelValues(batchFlushScheduled))) + require.Zero(t, prom_testutil.ToFloat64(ps.metrics.FlushInflight)) + require.Zero(t, prom_testutil.ToFloat64(ps.metrics.QueueDepth)) +} + +func TestBatchingPubsubDefaultConfigUsesDedicatedSenderFirstDefaults(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + sender := newFakeBatchSender() + ps, _ := newTestBatchingPubsub(t, sender, BatchingConfig{Clock: clock}) + + require.Equal(t, DefaultBatchingFlushInterval, ps.flushInterval) + require.Equal(t, DefaultBatchingQueueSize, cap(ps.publishCh)) + require.Equal(t, defaultBatchingPressureWait, ps.pressureWait) + require.Equal(t, defaultBatchingFinalFlushLimit, ps.finalFlushTimeout) + require.Equal(t, float64(DefaultBatchingQueueSize), prom_testutil.ToFloat64(ps.metrics.QueueCapacity)) + require.Zero(t, prom_testutil.ToFloat64(ps.metrics.QueueDepthHighWatermark)) +} + +func TestBatchChannelClass(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + event string + want string + }{ + {name: "stream notify", event: "chat:stream:123", want: batchChannelClassStreamNotify}, + {name: "owner event", event: "chat:owner:123", want: batchChannelClassOwnerEvent}, + {name: "config change", event: "chat:config_change", want: batchChannelClassConfigChange}, + {name: "fallback", event: "workspace:owner:123", want: batchChannelClassOther}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tt.want, batchChannelClass(tt.event)) + }) + } +} + +func TestBatchingPubsubTimerFlushDrainsAll(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + clock := quartz.NewMock(t) + newTimerTrap := clock.Trap().NewTimer("pubsubBatcher", "scheduledFlush") + defer newTimerTrap.Close() + resetTrap := clock.Trap().TimerReset("pubsubBatcher", "scheduledFlush") + defer resetTrap.Close() + + sender := newFakeBatchSender() + ps, _ := newTestBatchingPubsub(t, sender, BatchingConfig{ + Clock: clock, + FlushInterval: 10 * time.Millisecond, + QueueSize: 64, + }) + + call, err := newTimerTrap.Wait(ctx) + require.NoError(t, err) + call.MustRelease(ctx) + + // Enqueue many messages before the timer fires — all should be + // drained and flushed in a single batch. + for _, msg := range []string{"one", "two", "three", "four", "five"} { + require.NoError(t, ps.Publish("chat:stream:a", []byte(msg))) + } + require.Empty(t, sender.Batches()) + + clock.Advance(10 * time.Millisecond).MustWait(ctx) + resetCall, err := resetTrap.Wait(ctx) + require.NoError(t, err) + resetCall.MustRelease(ctx) + + batch := testutil.TryReceive(ctx, t, sender.flushes) + require.Len(t, batch, 5) + require.Equal(t, []byte("one"), batch[0].message) + require.Equal(t, []byte("five"), batch[4].message) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushesTotal.WithLabelValues(batchFlushScheduled))) + require.Zero(t, prom_testutil.ToFloat64(ps.metrics.QueueDepth)) +} + +func TestBatchingPubsubQueueFullFallsBackToDelegate(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + clock := quartz.NewMock(t) + newTimerTrap := clock.Trap().NewTimer("pubsubBatcher", "scheduledFlush") + defer newTimerTrap.Close() + resetTrap := clock.Trap().TimerReset("pubsubBatcher", "scheduledFlush") + defer resetTrap.Close() + pressureTrap := clock.Trap().NewTimer("pubsubBatcher", "pressureWait") + defer pressureTrap.Close() + + sender := newFakeBatchSender() + sender.blockCh = make(chan struct{}) + ps, _ := newTestBatchingPubsub(t, sender, BatchingConfig{ + Clock: clock, + FlushInterval: 10 * time.Millisecond, + QueueSize: 1, + PressureWait: 10 * time.Millisecond, + }) + + call, err := newTimerTrap.Wait(ctx) + require.NoError(t, err) + call.MustRelease(ctx) + + // Fill the queue (capacity 1). + require.NoError(t, ps.Publish("chat:stream:a", []byte("one"))) + + // Fire the timer so the run loop starts flushing "one" — the + // sender blocks on blockCh so the flush stays in-flight. + clock.Advance(10 * time.Millisecond).MustWait(ctx) + <-sender.started + + // The run loop is blocked in flushBatch. Fill the queue again. + require.NoError(t, ps.Publish("chat:stream:a", []byte("two"))) + + // A third publish should fall back to the delegate (which has a + // nil db, so the delegate Publish itself will error — but we + // verify the fallback metric was incremented, not + // ErrBatchingPubsubQueueFull). + errCh := make(chan error, 1) + go func() { + errCh <- ps.Publish("chat:stream:a", []byte("three")) + }() + + pressureCall, err := pressureTrap.Wait(ctx) + require.NoError(t, err) + pressureCall.MustRelease(ctx) + clock.Advance(10 * time.Millisecond).MustWait(ctx) + + err = testutil.TryReceive(ctx, t, errCh) + // The delegate has a nil db so it returns an error, but it must + // NOT be ErrBatchingPubsubQueueFull — that sentinel is gone. + require.Error(t, err) + require.NotErrorIs(t, err, ErrBatchingPubsubQueueFull) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.DelegateFallbacksTotal.WithLabelValues(batchChannelClassStreamNotify, batchDelegateFallbackReasonQueueFull, batchFlushStageNone))) + + close(sender.blockCh) + // Let the run loop finish the blocked flush and process "two". + resetCall, err := resetTrap.Wait(ctx) + require.NoError(t, err) + resetCall.MustRelease(ctx) + require.NoError(t, ps.Close()) +} + +func TestBatchingPubsubCloseDrainsQueue(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + clock := quartz.NewMock(t) + newTimerTrap := clock.Trap().NewTimer("pubsubBatcher", "scheduledFlush") + defer newTimerTrap.Close() + + sender := newFakeBatchSender() + ps, _ := newTestBatchingPubsub(t, sender, BatchingConfig{ + Clock: clock, + FlushInterval: time.Hour, + QueueSize: 8, + }) + + call, err := newTimerTrap.Wait(ctx) + require.NoError(t, err) + call.MustRelease(ctx) + + require.NoError(t, ps.Publish("chat:stream:a", []byte("one"))) + require.NoError(t, ps.Publish("chat:stream:a", []byte("two"))) + require.NoError(t, ps.Publish("chat:stream:a", []byte("three"))) + + require.NoError(t, ps.Close()) + batches := sender.Batches() + require.Len(t, batches, 1) + require.Len(t, batches[0], 3) + require.Equal(t, []byte("one"), batches[0][0].message) + require.Equal(t, []byte("two"), batches[0][1].message) + require.Equal(t, []byte("three"), batches[0][2].message) + require.Equal(t, float64(3), prom_testutil.ToFloat64(ps.metrics.QueueDepthHighWatermark)) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushesTotal.WithLabelValues(batchFlushShutdown))) + require.Zero(t, prom_testutil.ToFloat64(ps.metrics.QueueDepth)) + require.Equal(t, 1, sender.CloseCalls()) +} + +func TestBatchingPubsubPreservesOrder(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + clock := quartz.NewMock(t) + newTimerTrap := clock.Trap().NewTimer("pubsubBatcher", "scheduledFlush") + defer newTimerTrap.Close() + + sender := newFakeBatchSender() + ps, _ := newTestBatchingPubsub(t, sender, BatchingConfig{ + Clock: clock, + FlushInterval: time.Hour, + QueueSize: 8, + }) + + call, err := newTimerTrap.Wait(ctx) + require.NoError(t, err) + call.MustRelease(ctx) + + for _, msg := range []string{"one", "two", "three", "four", "five"} { + require.NoError(t, ps.Publish("chat:stream:a", []byte(msg))) + } + + require.NoError(t, ps.Close()) + batches := sender.Batches() + require.NotEmpty(t, batches) + + messages := make([]string, 0, 5) + for _, batch := range batches { + for _, item := range batch { + messages = append(messages, string(item.message)) + } + } + require.Equal(t, []string{"one", "two", "three", "four", "five"}, messages) +} + +func TestBatchingPubsubFlushFailureMetrics(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitShort) + clock := quartz.NewMock(t) + newTimerTrap := clock.Trap().NewTimer("pubsubBatcher", "scheduledFlush") + defer newTimerTrap.Close() + resetTrap := clock.Trap().TimerReset("pubsubBatcher", "scheduledFlush") + defer resetTrap.Close() + + sender := newFakeBatchSender() + sender.err = context.DeadlineExceeded + sender.errStage = batchFlushStageExec + ps, delegate := newTestBatchingPubsub(t, sender, BatchingConfig{ + Clock: clock, + FlushInterval: 10 * time.Millisecond, + QueueSize: 8, + }) + + call, err := newTimerTrap.Wait(ctx) + require.NoError(t, err) + call.MustRelease(ctx) + + require.NoError(t, ps.Publish("chat:stream:a", []byte("one"))) + + clock.Advance(10 * time.Millisecond).MustWait(ctx) + resetCall, err := resetTrap.Wait(ctx) + require.NoError(t, err) + resetCall.MustRelease(ctx) + + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushFailuresTotal.WithLabelValues(batchFlushScheduled))) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageBegin, batchResultSuccess))) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageExec, batchResultError))) + require.Zero(t, prom_testutil.ToFloat64(ps.metrics.FlushAttemptsTotal.WithLabelValues(batchFlushStageCommit, batchResultSuccess))) + require.Equal(t, float64(1), prom_testutil.ToFloat64(delegate.publishesTotal.WithLabelValues("false"))) + require.Zero(t, prom_testutil.ToFloat64(delegate.publishesTotal.WithLabelValues("true"))) + require.Zero(t, prom_testutil.ToFloat64(ps.metrics.QueueDepth)) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.DelegateFallbacksTotal.WithLabelValues(batchChannelClassStreamNotify, batchDelegateFallbackReasonFlushError, batchFlushStageExec))) +} + +func TestBatchingPubsubFlushFailureStageAccounting(t *testing.T) { + t.Parallel() + + stages := []string{batchFlushStageBegin, batchFlushStageExec, batchFlushStageCommit} + for _, stage := range stages { + stage := stage + t.Run(stage, func(t *testing.T) { + t.Parallel() + + sender := newFakeBatchSender() + sender.err = context.DeadlineExceeded + sender.errStage = stage + ps, delegate := newTestBatchingPubsub(t, sender, BatchingConfig{Clock: quartz.NewMock(t)}) + + batch := []queuedPublish{{ + event: "chat:stream:test", + channelClass: batchChannelClass("chat:stream:test"), + message: []byte("fallback-" + stage), + enqueuedAt: time.Now(), + }} + ps.queuedCount.Store(int64(len(batch))) + _, err := ps.flushBatch(context.Background(), batch, batchFlushScheduled) + require.Error(t, err) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.DelegateFallbacksTotal.WithLabelValues(batchChannelClassStreamNotify, batchDelegateFallbackReasonFlushError, stage))) + require.Equal(t, float64(1), prom_testutil.ToFloat64(delegate.publishesTotal.WithLabelValues("false"))) + }) + } +} + +func TestBatchingPubsubFlushFailureResetSender(t *testing.T) { + t.Parallel() + + clock := quartz.NewMock(t) + firstSender := newFakeBatchSender() + firstSender.err = context.DeadlineExceeded + firstSender.errStage = batchFlushStageExec + secondSender := newFakeBatchSender() + ps, _ := newTestBatchingPubsub(t, firstSender, BatchingConfig{Clock: clock}) + ps.newSender = func(context.Context) (batchSender, error) { + return secondSender, nil + } + + firstBatch := []queuedPublish{{ + event: "chat:stream:first", + channelClass: batchChannelClass("chat:stream:first"), + message: []byte("first"), + enqueuedAt: time.Now(), + }} + ps.queuedCount.Store(int64(len(firstBatch))) + _, err := ps.flushBatch(context.Background(), firstBatch, batchFlushScheduled) + require.Error(t, err) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.SenderResetsTotal)) + require.Equal(t, 1, firstSender.CloseCalls()) + + secondBatch := []queuedPublish{{ + event: "chat:stream:second", + channelClass: batchChannelClass("chat:stream:second"), + message: []byte("second"), + enqueuedAt: time.Now(), + }} + ps.queuedCount.Store(int64(len(secondBatch))) + _, err = ps.flushBatch(context.Background(), secondBatch, batchFlushScheduled) + require.NoError(t, err) + batches := secondSender.Batches() + require.Len(t, batches, 1) + require.Len(t, batches[0], 1) + require.Equal(t, []byte("second"), batches[0][0].message) +} + +func TestBatchingPubsubFlushFailureReturnsJoinedErrorWhenReplayFails(t *testing.T) { + t.Parallel() + + sender := newFakeBatchSender() + sender.err = context.DeadlineExceeded + sender.errStage = batchFlushStageExec + ps, _ := newTestBatchingPubsub(t, sender, BatchingConfig{Clock: quartz.NewMock(t)}) + + batch := []queuedPublish{{ + event: "chat:stream:error", + channelClass: batchChannelClass("chat:stream:error"), + message: []byte("error"), + enqueuedAt: time.Now(), + }} + ps.queuedCount.Store(int64(len(batch))) + _, err := ps.flushBatch(context.Background(), batch, batchFlushScheduled) + require.Error(t, err) + require.ErrorContains(t, err, context.DeadlineExceeded.Error()) + require.ErrorContains(t, err, `delegate publish "chat:stream:error"`) + require.Equal(t, float64(1), prom_testutil.ToFloat64(ps.metrics.DelegateFallbacksTotal.WithLabelValues(batchChannelClassStreamNotify, batchDelegateFallbackReasonFlushError, batchFlushStageExec))) +} + +func newTestBatchingPubsub(t *testing.T, sender batchSender, cfg BatchingConfig) (*BatchingPubsub, *PGPubsub) { + t.Helper() + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + // Use a closed *sql.DB so that delegate.Publish returns a real + // error instead of panicking on a nil pointer when the batching + // queue falls back to the shared pool under pressure. + closedDB := newClosedDB(t) + delegate := newWithoutListener(logger.Named("delegate"), closedDB) + ps, err := newBatchingPubsub(logger.Named("batcher"), delegate, sender, cfg) + require.NoError(t, err) + t.Cleanup(func() { + _ = ps.Close() + }) + return ps, delegate +} + +// newClosedDB returns an *sql.DB whose connections have been closed, +// so any ExecContext call returns an error rather than panicking. +func newClosedDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("postgres", "host=localhost dbname=closed_db_stub sslmode=disable connect_timeout=1") + require.NoError(t, err) + require.NoError(t, db.Close()) + return db +} + +type fakeBatchSender struct { + mu sync.Mutex + batches [][]queuedPublish + flushes chan []queuedPublish + started chan struct{} + blockCh chan struct{} + err error + errStage string + closeErr error + closeCall int +} + +func newFakeBatchSender() *fakeBatchSender { + return &fakeBatchSender{ + flushes: make(chan []queuedPublish, 16), + started: make(chan struct{}, 16), + } +} + +func (s *fakeBatchSender) Flush(ctx context.Context, batch []queuedPublish) error { + select { + case s.started <- struct{}{}: + default: + } + if s.blockCh != nil { + select { + case <-s.blockCh: + case <-ctx.Done(): + return ctx.Err() + } + } + clone := make([]queuedPublish, len(batch)) + for i, item := range batch { + clone[i] = queuedPublish{ + event: item.event, + message: bytes.Clone(item.message), + } + } + + s.mu.Lock() + s.batches = append(s.batches, clone) + s.mu.Unlock() + + select { + case s.flushes <- clone: + default: + } + if s.err == nil { + return nil + } + if s.errStage != "" { + return &batchFlushError{stage: s.errStage, err: s.err} + } + return s.err +} + +type metricWriter interface { + Write(*dto.Metric) error +} + +func histogramCountAndSum(t *testing.T, observer any) (uint64, float64) { + t.Helper() + writer, ok := observer.(metricWriter) + require.True(t, ok) + + metric := &dto.Metric{} + require.NoError(t, writer.Write(metric)) + histogram := metric.GetHistogram() + require.NotNil(t, histogram) + return histogram.GetSampleCount(), histogram.GetSampleSum() +} + +func (s *fakeBatchSender) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + s.closeCall++ + return s.closeErr +} + +func (s *fakeBatchSender) Batches() [][]queuedPublish { + s.mu.Lock() + defer s.mu.Unlock() + clone := make([][]queuedPublish, len(s.batches)) + for i, batch := range s.batches { + clone[i] = make([]queuedPublish, len(batch)) + copy(clone[i], batch) + } + return clone +} + +func (s *fakeBatchSender) CloseCalls() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.closeCall +} diff --git a/coderd/database/pubsub/batching_test.go b/coderd/database/pubsub/batching_test.go new file mode 100644 index 0000000000000..11e96b48804ce --- /dev/null +++ b/coderd/database/pubsub/batching_test.go @@ -0,0 +1,130 @@ +package pubsub_test + +import ( + "context" + "database/sql" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "cdr.dev/slog/v3" + "cdr.dev/slog/v3/sloggers/slogtest" + "github.com/coder/coder/v2/coderd/database/dbtestutil" + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/coder/coder/v2/testutil" +) + +func TestBatchingPubsubDedicatedSenderConnection(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitLong) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + + connectionURL, err := dbtestutil.Open(t) + require.NoError(t, err) + + trackedDriver := dbtestutil.NewDriver() + defer trackedDriver.Close() + tconn, err := trackedDriver.Connector(connectionURL) + require.NoError(t, err) + trackedDB := sql.OpenDB(tconn) + defer trackedDB.Close() + + base, err := pubsub.New(ctx, logger.Named("base"), trackedDB, connectionURL) + require.NoError(t, err) + defer base.Close() + + listenerConn := testutil.TryReceive(ctx, t, trackedDriver.Connections) + batched, err := pubsub.NewBatching(ctx, logger.Named("batched"), base, trackedDB, connectionURL, pubsub.BatchingConfig{ + FlushInterval: 10 * time.Millisecond, + + QueueSize: 8, + }) + require.NoError(t, err) + defer batched.Close() + + senderConn := testutil.TryReceive(ctx, t, trackedDriver.Connections) + require.NotEqual(t, fmt.Sprintf("%p", listenerConn), fmt.Sprintf("%p", senderConn)) + + event := t.Name() + messageCh := make(chan []byte, 1) + cancel, err := batched.Subscribe(event, func(_ context.Context, message []byte) { + messageCh <- message + }) + require.NoError(t, err) + defer cancel() + + require.NoError(t, batched.Publish(event, []byte("hello"))) + require.Equal(t, []byte("hello"), testutil.TryReceive(ctx, t, messageCh)) +} + +func TestBatchingPubsubReconnectsAfterSenderDisconnect(t *testing.T) { + t.Parallel() + + ctx := testutil.Context(t, testutil.WaitLong) + logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug) + + connectionURL, err := dbtestutil.Open(t) + require.NoError(t, err) + + trackedDriver := dbtestutil.NewDriver() + defer trackedDriver.Close() + tconn, err := trackedDriver.Connector(connectionURL) + require.NoError(t, err) + trackedDB := sql.OpenDB(tconn) + defer trackedDB.Close() + + base, err := pubsub.New(ctx, logger.Named("base"), trackedDB, connectionURL) + require.NoError(t, err) + defer base.Close() + + _ = testutil.TryReceive(ctx, t, trackedDriver.Connections) // listener connection + batched, err := pubsub.NewBatching(ctx, logger.Named("batched"), base, trackedDB, connectionURL, pubsub.BatchingConfig{ + FlushInterval: 10 * time.Millisecond, + + QueueSize: 8, + }) + require.NoError(t, err) + defer batched.Close() + + senderConn := testutil.TryReceive(ctx, t, trackedDriver.Connections) + event := t.Name() + messageCh := make(chan []byte, 4) + cancel, err := batched.Subscribe(event, func(_ context.Context, message []byte) { + messageCh <- message + }) + require.NoError(t, err) + defer cancel() + + require.NoError(t, batched.Publish(event, []byte("before-disconnect"))) + require.Equal(t, []byte("before-disconnect"), testutil.TryReceive(ctx, t, messageCh)) + require.NoError(t, senderConn.Close()) + + reconnected := false + delivered := false + testutil.Eventually(ctx, t, func(ctx context.Context) bool { + if !reconnected { + select { + case conn := <-trackedDriver.Connections: + reconnected = conn != nil + default: + } + } + select { + case <-messageCh: + default: + } + if err := batched.Publish(event, []byte("after-disconnect")); err != nil { + return false + } + select { + case msg := <-messageCh: + delivered = string(msg) == "after-disconnect" + case <-time.After(testutil.IntervalFast): + delivered = false + } + return reconnected && delivered + }, testutil.IntervalMedium, "batched sender did not recover after disconnect") +} diff --git a/coderd/database/pubsub/pubsub.go b/coderd/database/pubsub/pubsub.go index d227063ba8c29..924ffab1a32ae 100644 --- a/coderd/database/pubsub/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -487,12 +487,14 @@ func (d logDialer) DialContext(ctx context.Context, network, address string) (ne return conn, nil } -func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { - p.connected.Set(0) - // Creates a new listener using pq. +func newConnector(ctx context.Context, logger slog.Logger, db *sql.DB, connectURL string) (driver.Connector, error) { + if db == nil { + return nil, xerrors.New("database is nil") + } + var ( dialer = logDialer{ - logger: p.logger, + logger: logger, // pq.defaultDialer uses a zero net.Dialer as well. d: net.Dialer{}, } @@ -501,28 +503,38 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { ) // Create a custom connector if the database driver supports it. - connectorCreator, ok := p.db.Driver().(database.ConnectorCreator) + connectorCreator, ok := db.Driver().(database.ConnectorCreator) if ok { connector, err = connectorCreator.Connector(connectURL) if err != nil { - return xerrors.Errorf("create custom connector: %w", err) + return nil, xerrors.Errorf("create custom connector: %w", err) } } else { - // use the default pq connector otherwise + // Use the default pq connector otherwise. connector, err = pq.NewConnector(connectURL) if err != nil { - return xerrors.Errorf("create pq connector: %w", err) + return nil, xerrors.Errorf("create pq connector: %w", err) } } // Set the dialer if the connector supports it. dc, ok := connector.(database.DialerConnector) if !ok { - p.logger.Critical(ctx, "connector does not support setting log dialer, database connection debug logs will be missing") + logger.Critical(ctx, "connector does not support setting log dialer, database connection debug logs will be missing") } else { dc.Dialer(dialer) } + return connector, nil +} + +func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error { + p.connected.Set(0) + connector, err := newConnector(ctx, p.logger, p.db, connectURL) + if err != nil { + return err + } + var ( errCh = make(chan error, 1) sentErrCh = false diff --git a/codersdk/deployment.go b/codersdk/deployment.go index 1c975c19f3e47..445a39f358b8b 100644 --- a/codersdk/deployment.go +++ b/codersdk/deployment.go @@ -3624,6 +3624,39 @@ Write out the current server config as YAML to stdout.`, YAML: "acquireBatchSize", Hidden: true, // Hidden because most operators should not need to modify this. }, + { + Name: "Chat: Pubsub Batch Enabled", + Description: "Whether chatd should route PostgreSQL pubsub publishes through a dedicated sender connection. The default keeps batching enabled so chatd avoids shared-pool starvation without changing global pubsub wiring.", + Flag: "chat-pubsub-batch-enabled", + Env: "CODER_CHAT_PUBSUB_BATCH_ENABLED", + Value: &c.AI.Chat.PubsubBatchEnabled, + Default: "true", + Group: &deploymentGroupChat, + YAML: "pubsubBatchEnabled", + Hidden: true, + }, + { + Name: "Chat: Pubsub Flush Interval", + Description: "The maximum time accepted chatd pubsub publishes wait before the batching loop schedules a flush.", + Flag: "chat-pubsub-flush-interval", + Env: "CODER_CHAT_PUBSUB_FLUSH_INTERVAL", + Value: &c.AI.Chat.PubsubFlushInterval, + Default: "50ms", + Group: &deploymentGroupChat, + YAML: "pubsubFlushInterval", + Hidden: true, + }, + { + Name: "Chat: Pubsub Queue Size", + Description: "How many chatd pubsub publishes can wait in memory for the dedicated sender path when PostgreSQL falls behind.", + Flag: "chat-pubsub-queue-size", + Env: "CODER_CHAT_PUBSUB_QUEUE_SIZE", + Value: &c.AI.Chat.PubsubQueueSize, + Default: "8192", + Group: &deploymentGroupChat, + YAML: "pubsubQueueSize", + Hidden: true, + }, // AI Bridge Options { Name: "AI Bridge Enabled", @@ -4090,7 +4123,10 @@ type AIBridgeProxyConfig struct { } type ChatConfig struct { - AcquireBatchSize serpent.Int64 `json:"acquire_batch_size" typescript:",notnull"` + AcquireBatchSize serpent.Int64 `json:"acquire_batch_size" typescript:",notnull"` + PubsubBatchEnabled serpent.Bool `json:"pubsub_batch_enabled" typescript:",notnull"` + PubsubFlushInterval serpent.Duration `json:"pubsub_flush_interval" typescript:",notnull"` + PubsubQueueSize serpent.Int64 `json:"pubsub_queue_size" typescript:",notnull"` } type AIConfig struct { diff --git a/docs/reference/api/general.md b/docs/reference/api/general.md index 949661f83fdd8..31fe2054a5837 100644 --- a/docs/reference/api/general.md +++ b/docs/reference/api/general.md @@ -209,7 +209,10 @@ curl -X GET http://coder-server:8080/api/v2/deployment/config \ "structured_logging": true }, "chat": { - "acquire_batch_size": 0 + "acquire_batch_size": 0, + "pubsub_batch_enabled": true, + "pubsub_flush_interval": 0, + "pubsub_queue_size": 0 } }, "allow_workspace_renames": true, diff --git a/docs/reference/api/schemas.md b/docs/reference/api/schemas.md index 33dc2a0b25aa7..30d766e962da6 100644 --- a/docs/reference/api/schemas.md +++ b/docs/reference/api/schemas.md @@ -1234,7 +1234,10 @@ "structured_logging": true }, "chat": { - "acquire_batch_size": 0 + "acquire_batch_size": 0, + "pubsub_batch_enabled": true, + "pubsub_flush_interval": 0, + "pubsub_queue_size": 0 } } ``` @@ -2015,15 +2018,21 @@ AuthorizationObject can represent a "set" of objects, such as: all workspaces in ```json { - "acquire_batch_size": 0 + "acquire_batch_size": 0, + "pubsub_batch_enabled": true, + "pubsub_flush_interval": 0, + "pubsub_queue_size": 0 } ``` ### Properties -| Name | Type | Required | Restrictions | Description | -|----------------------|---------|----------|--------------|-------------| -| `acquire_batch_size` | integer | false | | | +| Name | Type | Required | Restrictions | Description | +|-------------------------|---------|----------|--------------|-------------| +| `acquire_batch_size` | integer | false | | | +| `pubsub_batch_enabled` | boolean | false | | | +| `pubsub_flush_interval` | integer | false | | | +| `pubsub_queue_size` | integer | false | | | ## codersdk.ChatRetentionDaysResponse @@ -3233,7 +3242,10 @@ CreateWorkspaceRequest provides options for creating a new workspace. Only one o "structured_logging": true }, "chat": { - "acquire_batch_size": 0 + "acquire_batch_size": 0, + "pubsub_batch_enabled": true, + "pubsub_flush_interval": 0, + "pubsub_queue_size": 0 } }, "allow_workspace_renames": true, @@ -3811,7 +3823,10 @@ CreateWorkspaceRequest provides options for creating a new workspace. Only one o "structured_logging": true }, "chat": { - "acquire_batch_size": 0 + "acquire_batch_size": 0, + "pubsub_batch_enabled": true, + "pubsub_flush_interval": 0, + "pubsub_queue_size": 0 } }, "allow_workspace_renames": true, diff --git a/scripts/metricsdocgen/generated_metrics b/scripts/metricsdocgen/generated_metrics index fae3de129a6b1..2709b4b362163 100644 --- a/scripts/metricsdocgen/generated_metrics +++ b/scripts/metricsdocgen/generated_metrics @@ -82,6 +82,60 @@ coder_derp_server_unknown_frames_total 0 # HELP coder_derp_server_watchers Current watchers. # TYPE coder_derp_server_watchers gauge coder_derp_server_watchers 0 +# HELP coder_pubsub_batch_delegate_fallbacks_total The number of chatd publishes that fell back to the shared pubsub pool by channel class, reason, and flush stage. +# TYPE coder_pubsub_batch_delegate_fallbacks_total counter +coder_pubsub_batch_delegate_fallbacks_total{channel_class="",reason="",stage=""} 0 +# HELP coder_pubsub_batch_flush_attempts_total The number of chatd sender flush stages by stage and result. +# TYPE coder_pubsub_batch_flush_attempts_total counter +coder_pubsub_batch_flush_attempts_total{stage="",result=""} 0 +# HELP coder_pubsub_batch_flush_duration_seconds The time spent flushing one chatd batch to PostgreSQL. +# TYPE coder_pubsub_batch_flush_duration_seconds histogram +coder_pubsub_batch_flush_duration_seconds{reason=""} 0 +# HELP coder_pubsub_batch_flush_failures_total The number of failed chatd batch flushes by reason. +# TYPE coder_pubsub_batch_flush_failures_total counter +coder_pubsub_batch_flush_failures_total{reason=""} 0 +# HELP coder_pubsub_batch_flush_inflight Whether a chatd batch flush is currently executing against the dedicated sender connection. +# TYPE coder_pubsub_batch_flush_inflight gauge +coder_pubsub_batch_flush_inflight 0 +# HELP coder_pubsub_batch_flushed_bytes_total The number of chatd payload bytes removed from the batching queue for flush attempts by reason. +# TYPE coder_pubsub_batch_flushed_bytes_total counter +coder_pubsub_batch_flushed_bytes_total{reason=""} 0 +# HELP coder_pubsub_batch_flushed_notifications_total The number of logical chatd notifications removed from the batching queue for flush attempts by reason. +# TYPE coder_pubsub_batch_flushed_notifications_total counter +coder_pubsub_batch_flushed_notifications_total{reason=""} 0 +# HELP coder_pubsub_batch_flushes_total The number of chatd batch flush attempts by reason. +# TYPE coder_pubsub_batch_flushes_total counter +coder_pubsub_batch_flushes_total{reason=""} 0 +# HELP coder_pubsub_batch_logical_publish_bytes_total The number of accepted chatd payload bytes enqueued into the batching wrapper by channel class. +# TYPE coder_pubsub_batch_logical_publish_bytes_total counter +coder_pubsub_batch_logical_publish_bytes_total{channel_class=""} 0 +# HELP coder_pubsub_batch_logical_publishes_total The number of logical chatd publishes seen by the batching wrapper by channel class and result. +# TYPE coder_pubsub_batch_logical_publishes_total counter +coder_pubsub_batch_logical_publishes_total{channel_class="",result=""} 0 +# HELP coder_pubsub_batch_publish_rejections_total The number of chatd publishes rejected by the batching queue. +# TYPE coder_pubsub_batch_publish_rejections_total counter +coder_pubsub_batch_publish_rejections_total{reason=""} 0 +# HELP coder_pubsub_batch_queue_capacity The configured capacity of the chatd batching queue. +# TYPE coder_pubsub_batch_queue_capacity gauge +coder_pubsub_batch_queue_capacity 0 +# HELP coder_pubsub_batch_queue_depth The number of chatd notifications waiting in the batching queue. +# TYPE coder_pubsub_batch_queue_depth gauge +coder_pubsub_batch_queue_depth 0 +# HELP coder_pubsub_batch_queue_depth_high_watermark The highest chatd batching queue depth observed since process start. +# TYPE coder_pubsub_batch_queue_depth_high_watermark gauge +coder_pubsub_batch_queue_depth_high_watermark 0 +# HELP coder_pubsub_batch_queue_wait_seconds The time accepted chatd publishes spent waiting in the batching queue before a flush attempt started. +# TYPE coder_pubsub_batch_queue_wait_seconds histogram +coder_pubsub_batch_queue_wait_seconds{channel_class=""} 0 +# HELP coder_pubsub_batch_sender_reset_failures_total The number of batched pubsub sender reset attempts that failed. +# TYPE coder_pubsub_batch_sender_reset_failures_total counter +coder_pubsub_batch_sender_reset_failures_total 0 +# HELP coder_pubsub_batch_sender_resets_total The number of successful batched pubsub sender resets after flush failures. +# TYPE coder_pubsub_batch_sender_resets_total counter +coder_pubsub_batch_sender_resets_total 0 +# HELP coder_pubsub_batch_size The number of logical notifications sent in each chatd batch flush. +# TYPE coder_pubsub_batch_size histogram +coder_pubsub_batch_size 0 # HELP coder_pubsub_connected Whether we are connected (1) or not connected (0) to postgres # TYPE coder_pubsub_connected gauge coder_pubsub_connected 0 diff --git a/site/src/api/typesGenerated.ts b/site/src/api/typesGenerated.ts index e277b6368ca6a..a1768f72f77d2 100644 --- a/site/src/api/typesGenerated.ts +++ b/site/src/api/typesGenerated.ts @@ -1234,6 +1234,9 @@ export const ChatCompactionThresholdKeyPrefix = // From codersdk/deployment.go export interface ChatConfig { readonly acquire_batch_size: number; + readonly pubsub_batch_enabled: boolean; + readonly pubsub_flush_interval: number; + readonly pubsub_queue_size: number; } // From codersdk/chats.go