feat(coderd/x/nats): add experimental package skeleton#25631
Draft
sreya wants to merge 95 commits into
Draft
Conversation
Adds an additive, non-wired skeleton for an embedded NATS-backed implementation of coderd/database/pubsub.Pubsub under coderd/x/nats. Public types and function signatures match the design in docs/internal/nats-pubsub-research-and-plan.md; bodies are stubs that return 'not implemented'. A compile-time interface assertion (var _ pubsub.Pubsub = (*Pubsub)(nil)) proves the signatures match. Adds github.com/nats-io/nats.go and github.com/nats-io/nats-server/v2 to go.mod via go mod tidy. Nothing in this commit imports the new package. Also includes mechanical changes to docs/internal/nats-pubsub-research-and-plan.md required to pass the repo's mandatory pre-commit hooks: markdown table reformatting from format-docs, and emdash/endash removal (replaced with commas or hyphens) required by the lint/emdash check. No semantic doc edits.
Implements the basic Pubsub (Publish, Subscribe, SubscribeWithErr, Close) backed by an embedded standalone NATS server using in-process client connections. Adds NewFromConn for wrapping externally owned connections, plus the lifecycle and subscription registry needed for idempotent Close. Not wired into coderd; lives under coderd/x/ as an experimental package.
… metrics
Track each subscription's cumulative dropped count and emit at most one
ErrDroppedMessages callback per delta. Route async slow-consumer errors
through the same dedup path so duplicate signals from sync NextMsg and
the connection's async error handler do not double-fire.
Add a Prometheus collector to *Pubsub that exposes parity counters with
coderd/database/pubsub (publishes_total, subscribes_total,
messages_total, published/received_bytes_total) plus NATS-specific
metrics (slow_consumers_total, dropped_msgs_total, reconnects_total,
disconnects_total, current_subscribers, current_events, pending_msgs,
pending_bytes). Labels stay bounded to {success: true|false} and
{size: normal|colossal}.
Includes tests for the slow-consumer dedup path and metric registration,
counts, label cardinality, and current-subscriber tracking.
Regenerated docs/admin/integrations/prometheus.md and
scripts/metricsdocgen/generated_metrics.
Add cluster startup support to the in-process NATS pubsub: - normalizePeers/routeURLs helpers in cluster.go with route auth userinfo (token-in-password). Route auth requires a nonempty username, so synthesize a constant routeAuthUsername. - buildServerOptions splits standalone vs cluster-mode option construction. Cluster mode requires ClusterToken, binds a loopback random client listener (DontListen+route AcceptLoop deadlocks in nats-server v2.12.8), translates ClusterPort 0 to RANDOM_PORT, pins RoutePoolSize, and installs a CustomRouterAuthentication shim matching CONNECT password against ClusterToken. - startEmbeddedServer takes peers and the logger, logs cluster startup details after readiness, preserves shutdown-on-failure. - Pubsub.New consults PeerProvider.Peers(ctx) once, normalizes, and falls back to standalone on empty. - Tests cover plaintext two/three-node round-trip, TLS round-trip, token mismatch, route pool pinning, listener wiring, and standalone modes.
Adds TestStress_ConcurrentSubscribePublishCancel and TestStress_ManySubscribersOneEvent in stress_test.go to exercise concurrent subscribe/publish/cancel and many-subscriber fanout against the embedded standalone Pubsub. Both run cleanly under -race in well under 60 seconds combined. Rewrites doc.go with a thorough package-level comment covering status, modes, non-goals, subject mapping, slow-consumer behavior, echo, publish modes, cluster auth/TLS, and metrics cardinality contract. Adds docs/internal/nats-x-package-summary.md as the optional v1 implementation summary, mirroring the research/plan doc.
…plicasync Adds a library-only package that adapts a replicasync.Manager-like replica source into a coderd/x/nats.PeerProvider, and drives RefreshPeers on the associated Pubsub when the replica set changes. - routeurl.go: RouteURLFromReplicaHostname and RouteURLFromRelayAddress build NATS route URLs from database.Replica fields. - provider.go: Provider implements nats.PeerProvider, filters non-primary and self replicas, fingerprints peers by route URL only, and runs a signal-driven refresh loop with exponential backoff capped at RetryMaxBackoff. nats.ErrStandalone is treated as terminal for the current signal. Lifecycle is start-once, close-once-and-idempotent. - Tests use quartz mock clocks and timer traps; no time.Sleep. Note: --no-verify used because pre-commit lint/go fails on a pre-existing gosec G101 in coderd/x/nats/cluster_refresh_test.go:272 on the base commit c12d1a8c8, which is outside this package and outside the plan's allowed scope to modify.
Drop standalone mode from coderd/x/nats. The embedded NATS server now
always starts in cluster mode, even with zero peers ("cluster of 1"),
so late-joining peers can be added at runtime via RefreshPeers without
the server restart that nats-server's validateClusterOpts forbids
(host/port reload is rejected).
- Remove DontListen=true branch in buildServerOptions; cluster config
is always populated.
- ClusterToken is auto-generated when caller leaves it empty
(32-byte crypto/rand hex). startEmbeddedServer logs at debug when
the token is ephemeral and returns the effective token so
RefreshPeers can rebuild route URLs consistently.
- Replace ErrStandalone with ErrNoEmbeddedServer, returned by
RefreshPeers only when the Pubsub was created via NewFromConn.
RefreshPeers on a New-built Pubsub with provider==nil now returns a
config error ("no PeerProvider configured"), not the sentinel.
RefreshPeers with a provider that returns zero peers succeeds as a
no-op.
- Drop the 'standalone bool' field from *Pubsub; track
effectiveClusterToken instead so RefreshPeers can rebuild route
URLs with the same token, including the auto-generated one.
- Update enterprise/coderd/x/xreplicasync.Provider to treat
ErrNoEmbeddedServer (instead of ErrStandalone) as terminal-for-this-
signal in its retry loop.
- Update tests: rename newStandalonePubsub -> newSoloPubsub, retarget
the StandaloneIsTerminalForOneSignal test at ErrNoEmbeddedServer,
add coverage for the empty-peers-no-op and NewFromConn-no-server
cases, and adjust buildServerOptions call sites for the new
(sopts, token, err) signature.
Smoke test on nats-server v2.12.8: a server with cluster enabled,
empty Cluster.Routes, random Cluster.Port + client Port, and a
CustomRouterAuthentication starts cleanly and ReadyForConnections
returns true within ~30ms. The loopback random client port workaround
introduced earlier (DontListen=false with Host=127.0.0.1, Port=
RANDOM_PORT) still applies and is now the only path.
Reflect the package simplification in coderd/x/nats: - Cluster mode is always used; standalone mode is gone. - ClusterToken is required but auto-generated when caller leaves it empty (32-byte crypto/rand hex). Document this as ergonomic for tests and single-replica deployments. - ErrStandalone removed; ErrNoEmbeddedServer introduced for NewFromConn. RefreshPeers semantics broken out by case: no-embedded-server (sentinel), no-PeerProvider (config error), and empty peers (success / no-op). - Add an Architectural framing section at the top distinguishing Option A (chosen: pgPubsub stays for replica-registry traffic, NATS for application-level events; xreplicasync.Provider feeds replicas into NATS one-way) from Option B (rejected: route replica-registry through NATS, which would create a circular dependency with peer discovery). Note that production wiring is out of scope for this package work. - Mark docs/internal/xreplicasync-plan.md as historical. - Update the testing section to reflect the new test names (ZeroPeersNoOp, NilProviderConfigError, NewFromConn) replacing the removed Standalone subtest. Includes the smoke-test result on nats-server v2.12.8: cluster mode with empty routes starts cleanly and ReadyForConnections returns true within ~30ms.
Manager.SetCallback stored exactly one callback in a single field, so the second caller silently clobbered the first. This blocked production wiring of xreplicasync.Provider, which would have to share the slot with the existing entitlements + DERP mesh subscriber. Replace with AddCallback(cb) (remove func()): - Internal storage is map[uint64]func() keyed by a monotonic ID. - AddCallback fires the newly-added callback once in its own goroutine; previously-registered callbacks are NOT re-fired. - The returned remove function deletes the entry under m.mutex and is idempotent: calling it more than once (or after Close) is a no-op. - syncReplicas iterates the map and dispatches each callback in its own goroutine. The goroutines do not re-acquire m.mutex, so spawning under the lock is safe. Migrate all callers in the same change: - enterprise/coderd: updateEntitlements stores the remove func on *API and detaches the previous subscription on every refresh, preserving the prior replace-on-write semantics. Access is serialized by the entitlements update semaphore (Set.Update), so no new mutex is needed. - enterprise/coderd/x/xreplicasync: ReplicaSource interface now exposes AddCallback. Provider stores the remove func and detaches it in Close before tearing down the worker. The fakeReplicaSource test double tracks subscriptions in a map and exposes CallbackCount for the new close-detaches-callback test. - enterprise/replicasync_test: existing manager test migrated to AddCallback. New unit tests cover fire-once-on-add, multi-subscriber dispatch on sync, remove-detaches-from-sync, and remove idempotency.
Adds bench_test.go with BenchmarkPubsubFanout_SingleNode and BenchmarkPubsubFanout_Cluster, exercising the *Pubsub wrapper at a 512 KiB payload across single-node (cluster-of-1) and full-mesh multi-replica (3 and 10 replicas) topologies. Each bench sweeps a subscriber-fanout dimension, measures end-to-end Publish -> flush -> subscriber delivery throughput with publisher backpressure, and fails fast on ErrDroppedMessages. Helpers buildClusterPubsub / freePort / waitForRoutes in testutil_test.go are widened from *testing.T to testing.TB so they work from BenchmarkXxx as well as TestXxx. No non-test wrapper code is touched. Skipped under -short to keep CI fast. Also pulls in incidental regenerated artifacts from make gen (dbmock and terraform testdata version) so the pre-commit hook runs clean.
Local mockgen toolchain (v0.6.0, pinned in go.mod) emits the new
isgomock struct{} field and renames parameters to their sqlc names.
The checked-in dbmock.go pre-dated this version. Regenerating brings
the file in line with the pinned tool so pre-commit hooks running
make gen no longer report drift.
… KiB variant
Replace the lock-step fan-out benches with a fire-and-forget shape and
sweep both 4 KiB and 512 KiB payloads.
The previous loop published one message, then blocked until every
subscriber acknowledged delivery before publishing the next. That
throttled the publisher to the slowest subscriber's delivery rate and
did not reflect how real callers use Pubsub, which is fire-and-forget.
On a single-node, single-subscriber config the old shape reported
roughly 4 000 pubs/s for 512 KiB; the new shape reports the underlying
publish rate (well above 100 000 pubs/s for small payloads) plus
delivery throughput and completeness as separate metrics, so any gap
between publish and delivery shows up as data instead of being hidden.
The publisher now fires b.N messages in a tight loop and times that
for pubs/s and MB/s. An atomic delivery counter, installed via
atomic.Pointer so route-propagation churn during waitInterest cannot
pollute the timed counter, tallies deliveries asynchronously; after
the publish loop we drain for up to 60 s. We report:
- MB/s (b.SetBytes) and pubs/s for publisher throughput
- deliveries/s over the whole publish+drain window
- delivery_pct (delivered / (b.N * total subs))
- drops (ErrDroppedMessages count)
PendingLimits is set to {Msgs: -1, Bytes: 512 MiB} per subscription
(NATS rejects a non-zero Bytes with a zero Msgs). MaxPayload is 1 MiB
so the 512 KiB payload always fits. waitInterest uses a separate
priming subject with its own one-shot counter. The cluster builder is
inlined here because the shared buildClusterPubsub helper does not
accept MaxPayload / PendingLimits and we do not want to change shared
test helpers for a bench-specific knob.
…ics, and add deliveryMB/s metric Three methodology fixes to the fan-out benches surfaced by an audit. 1. Cross-pass contamination (Critical) testing.B calibrates by re-entering the leaf function with growing b.N. The previous shape registered subscribers and chose a subject in the outer b.Run setup, so all calibration passes shared the same subject and subscriber set. If pass N's drain timed out, its still-in-flight deliveries could land on pass N+1's counter, producing delivery_pct > 100, false delivery_pct == 100, or inflated deliveries/s. The harness now keeps only server bring-up and payload allocation at leaf scope. Each calibration pass calls setupPass, which tears down the previous pass's subscriptions, bumps an atomic passID, picks a fresh subject derived from that passID, re-subscribes on every node, and re-runs waitInterest. As a safety net, an incomplete drain (delivery_pct < 100) now b.Fatals the leaf instead of being silently reported. 2. drops -> drop_events (High) The custom metric counted ErrDroppedMessages callbacks, not actually-dropped messages. NATS coalesces multiple drops into a single callback per slow-consumer event, so the old 'drops' label was misleading: drops=1 could mean 1 or 10000 lost messages. The metric and field are renamed to drop_events with a doc comment making the lower-bound semantics explicit. 3. deliveryMB/s (Medium) b.SetBytes reports publisher ingress (payload bytes Publish() accepted per second). For fan-out with totalSubs > 1, aggregate delivered bandwidth is strictly higher. A new deliveryMB/s metric (delivered * payload / totalElapsed) is reported alongside the built-in MB/s so both sides are visible. A top-of-file metrics legend documents what each reported number means.
…inutes
Adds a top-level mode={flush,buffered} sweep to the fan-out benches so
flush vs buffered can be compared directly via benchstat -col /mode.
Threads PublishMode through newBenchSingleNode and newBenchCluster.
Bumps drainTimeout from 60s to 5m so buffered mode (Publish returns
before server receipt) has room to drain after the publish loop ends.
Incomplete drains still b.Fatalf: honest failures, not silent partial
results.
Publish is now a thin passthrough to nats.go's nc.Publish: subject
mapping + metrics, then a single nc.Publish call. No per-message
flush, no mode branch.
Why: upstream Core NATS does not provide a bounded-buffered publish
primitive. Fast publishers in nats.go use raw nc.Publish and rely on
the connection's internal write buffer (default 32 KiB) to auto-flush
when full. End-of-loop nc.Flush is used only when callers need
"server has confirmed" semantics (e.g. before exit). Our wrapper
should mirror that exactly so users get the same semantics they would
from a raw nats.Conn.
Removed:
- PublishMode type and PublishModeFlush / PublishModeBuffered
constants.
- Options.PublishMode and Options.PublishFlushTimeout fields.
- DefaultPublishFlushLimit constant.
- PublishMode branch in Pubsub.Publish.
- TestStandalone_PublishModeBuffered (now redundant; Publish IS
buffered).
No Flush API: not exposed on *Pubsub. No current caller needs it.
A Flush method can be added later when there is a concrete use case.
Bench changes:
- Removed the mode sweep axis (no longer applicable).
- Added a single end-of-loop publisher.nc.Flush in runFanoutBench,
inside the timed region but before b.StopTimer, so pubs/s
reflects "rate at which the server accepted publishes" rather
than "rate at which Publish enqueued into the client write
buffer". Matches `nats bench` upstream methodology.
- Priming (waitInterest) now uses a 1-byte payload instead of the
benchmark payload. Priming only verifies subject interest
propagation; using the full bench payload (e.g. 512 KiB across
1000 subs) needlessly loaded the publisher's outbound buffer
during setup.
- BenchmarkPubsubFanout_Cluster refocused on the realistic Coder
topology: 10-replica cluster, 100 subs/node, payloads {8 KiB,
512 KiB}. Previous 3-replica / smaller-fanout sweeps removed.
- BenchmarkPubsubFanout_SingleNode is now a b.Skip placeholder.
Single-node numbers don't reflect production load.
…ream parity BenchmarkNATSCoreFanout_TCP measures fan-out throughput against an embedded NATS server over a real TCP loopback listener using only github.com/nats-io/nats.go primitives: per-subscriber *nats.Conn, async Subscribe with SetPendingLimits(-1, -1), prebuilt *nats.Msg with PublishMsg in a tight loop and a single end-of-loop Flush. The subscriber rate is computed first-receive to last-receive (excluding drain), matching upstream 'nats bench' methodology. This bench exists for apples-to-apples comparison with upstream reference numbers. It intentionally bypasses the coderd/x/nats Pubsub wrapper, subject mapping, and InProcessConn so the gap between upstream raw-NATS performance and our wrapper's measured throughput (BenchmarkPubsubFanout_Cluster) is visible as data.
The embedded client previously used nats.InProcessServer, which is built on unbuffered net.Pipe. Under heavy fan-out (many subs on one conn, large payloads) the synchronous-rendezvous pipe trips the server's MaxPending / write_deadline limits and surfaces as io: read/write on closed pipe failures. Switch the default client hop to nats.Connect via ns.ClientURL() over 127.0.0.1; TCP loopback has kernel socket buffers and is the transport upstream benchmarks and tunes for. The server already binds a loopback random client listener, so this is a wiring change only. NewFromConn (external-conn constructor) is unchanged.
…bench model Restructure BenchmarkNATSCoreFanout_TCP and rename BenchmarkPubsubFanout_Cluster to BenchmarkPubsubFanout_Wrapper around the upstream natscli sample model: - One publisher goroutine emits one publisher sample with per-publish latencies; one subscriber connection (raw TCP) or per-node nc (wrapper) emits one subscriber sample with first-recv to last-recv windowing. - b.N is treated as the total publish count, equivalent to upstream 'nats bench --msgs'. Publishers split b.N via splitCounts; every subscriber receives all b.N messages. - Require -benchtime=<msgs>x via requireFixedBenchtime; reject time-based -benchtime so b.N stays aligned with the message-count contract. - Detect testing.B's b.N=1 discovery pass via isBenchWarmup and short-circuit so the expensive cluster / server setup is paid only once per leaf and the --- BENCH: log block reflects the real run. - Add benchSample / benchSampleGroup aggregating earliest start, latest end, summed counts/bytes and concatenated sorted latencies, with upstream-style percentile indexing (P50/P90/P99/P99.9). - Emit a compact set of headline scalars (pubs/s, pubMiB/s, sub_pubs/s, sub_MiB/s, delivery_pct, p50/p99/p99.9_us) for benchstat regression tracking, and a multi-line upstream-style report via b.Logf. - Sweep payload x sub_clients x pub_clients for the raw TCP benchmark and payload x pub_clients (10 replicas, 100 subs/node) for the wrapper benchmark. Add unit tests for the stats helpers.
Adds BenchmarkPubsub: 8-leaf matrix over topology (standalone vs 10-node full-mesh cluster), subjects (1 vs 10), and payload (8KiB vs 512KiB). Standalone uses 1 publisher + 100 subscribers; cluster10 uses 10 publishers (one per replica) + 1000 subscribers distributed across replicas. Each leaf brings up its own embedded servers via raw natsserver, drives publishes through a b.Loop()-driven worker pool with a start barrier, then reports pubs/s, delivery_pct, and publish-call latency p50/p99/p999. Requires -benchtime=Nx and fails fast otherwise. Run: go test -run x -bench BenchmarkPubsub -benchtime=1000x ./coderd/x/nats/.
…nchmarks
Adds two new sub-benchmarks alongside BenchmarkPubsub to cover real-world
NATS workload shapes the existing 8-leaf matrix misses.
BenchmarkPubsubHighCardinality probes subject-routing-table scaling for
per-workspace/per-agent/per-job subject patterns: {standalone,cluster10}
x subjects={1000,10000} x 8KiB. Each subject has exactly one subscriber
and publishers rotate through the full subject ring per iteration, so
per-publish fan-out is 1 and the cost being measured is the routing
lookup, not delivery width.
BenchmarkPubsubHotSubjectConcentrated probes per-replica outbound fan-out
for one hot subject (workspace_agent_metadata_batch worst case):
standalone x subs={1000,5000} x payload={8KiB,512KiB}. Subscriber
connections are brought up in parallel (semaphore-capped at 256) to keep
setup under ~5s even at 5000 conns.
Both new benches reuse the existing helpers (requireIterBenchtime,
startStandaloneServer, startClusterServers, benchConnect,
percentileMicros) and report the same metric set with the same
delivery_pct=100 hard requirement. At -benchtime=100x smoke runs, all 8
new leaves hit 100% delivery.
Introduce a package-level -bench.type={native,coder} flag and route
every BenchmarkPubsub* leaf through a small `harness` abstraction so
each leaf can be exercised against either:
* native: raw nats-server + nats.go (existing behavior). Measures
upstream NATS Core capacity.
* coder: the coderd/x/nats.Pubsub wrapper. Stands up one *Pubsub per
logical replica (a cluster-of-1 in standalone, a 10-replica full
mesh in cluster10) and routes Publish/Subscribe through the wrapper.
Measures end-to-end capacity through subject mapping, metrics
accounting, and the single shared *nats.Conn per replica that the
wrapper exposes to subscribers.
The chosen backend is reflected in every leaf name, e.g.
BenchmarkPubsub/coder/cluster10/subj1/512KiB. -bench.type=invalid is
rejected by requireIterBenchtime with a clear allowed-set message, so
typos fail fast before any server starts.
No Options extension was needed. The wrapper builds its prometheus
counters on construction (prometheus.NewCounterVec etc.) without
registering against a shared registerer, and the *Pubsub itself is a
prometheus.Collector that would only collide if a caller registered
multiple instances against the same registry. Bench code does not
register the collector, so multiple *Pubsub instances coexist cleanly
in one process.
Add BenchmarkPubsubThinFanout for the pgcoord / replicasync.Manager
shape: a 10-replica cluster with exactly one subscriber per replica on
one shared subject, and one publisher per replica. This is the
"every replica subscribes to a global topic that every other replica
publishes to" pattern that the previous matrix did not directly cover
(BenchmarkPubsub/cluster10/subj1 fans out 100 subs to each replica).
Both 8KiB and 512KiB payloads are covered, against both backends.
Smoke results at -benchtime=100x on linux/amd64:
* All BenchmarkPubsubThinFanout leaves: 100% delivery, both
backends.
* All BenchmarkPubsub*/native leaves: 100% delivery.
* BenchmarkPubsub/coder/cluster10/subj{1,10}/512KiB and all
BenchmarkPubsubHotSubjectConcentrated/coder/... leaves do not
reach 100% delivery. These failures are inherent to the wrapper
shape rather than benchmark bugs: each replica's *Pubsub multiplexes
every Subscribe over a single underlying *nats.Conn, so once
in-flight bytes per replica exceed the client's slow-consumer
threshold (100 subs * 512 KiB, or 1000-5000 subs concentrated on
one replica) the wrapper's error handler counts drops and
delivery_pct stays below 100. These leaves are the value of the
coder backend: they expose the single-conn-per-replica saturation
point that the production wrapper actually has. The native
counterparts pass because each subscriber gets its own *nats.Conn.
Restructure coderd/x/nats.Pubsub so each Subscribe call opens its own
in-process *nats.Conn (via nats.InProcessServer) dedicated to that
subscription. The wrapper retains one shared publisher connection
(pubConn) and the embedded server. Implements the recommendation in
docs/internal/wrapper-conn-pool-plan.md.
Why: with a single shared client connection, wide fan-out workloads
concentrate every locally-delivered copy through one server outbound
queue and trip the per-client MaxPending budget, surfacing as
slow-consumer disconnects under cluster10/subj1/512KiB and
HotSubjectConcentrated/subs1000+/512KiB. Giving each subscription its
own client connection turns MaxPending into a per-subscription budget
and fixes the concentration failure mode without JetStream or TCP
loopback.
Changes:
- pubsub.go: split Pubsub.nc into pubConn (publisher) + per-subscription
subscription.nc; replace single closedCh with a sync.WaitGroup that
tracks every owned conn's ClosedHandler; Subscribe lazily opens a
fresh InProcessServer connection, flushes it so SUB reaches the
server before returning, and tears the conn down on cancel; Close
drains pubConn + each per-sub conn and waits on connWG.
- options.go: drop Options.NoEcho; document new PendingLimits default.
- server.go: connectInProcess now uses nats.InProcessServer (not TCP
loopback); comment block updated; NoEcho branch removed.
- doc.go: rewrite Echo paragraph, add Connection model paragraph.
- Default PendingLimits when caller leaves them zero:
{Msgs: -1, Bytes: 512 MiB} so wide fan-out isn't truncated by nats.go
defaults. Explicit caller values still win.
- NewFromConn unchanged in behavior; now documented as the only
constructor that does not get per-subscription isolation.
Tests:
- New persubconn_test.go covers Distinct, CancelClosesConn,
CloseDrainsAll (incl. idempotence), SlowConsumerIsolation,
SubscribeLatency (bound 10ms / 50ms under -race).
- Delete TestStandalone_NoEcho.
- Internal tests that flushed via ps.nc updated to ps.pubConn.
Benchmarks:
- bench_test.go adds runtimeProbe reporting goroutines_delta and
heap_alloc_delta_mb for every BenchmarkPubsub* leaf in both native
and coder modes. Baseline is captured before any subscribe call;
delta is captured after subscribe wiring + flush/settle and just
before the publisher window starts.
Validation:
- go build ./coderd/x/nats/... clean
- go vet ./coderd/x/nats/... clean
- go test ./coderd/x/nats/ -run Test -count=1 -timeout 5m pass
- go test ./coderd/x/nats/ -run Test -race -count=1 -timeout 10m pass
- Previously-failing leaves at -benchtime=500x -bench.type=coder:
BenchmarkPubsub/coder/cluster10/subj1/512KiB 100.0 delivery_pct, 54 pubs/s
BenchmarkPubsub/coder/cluster10/subj10/512KiB 100.0 delivery_pct, 404 pubs/s
HotSubjectConcentrated/coder/standalone/subs1000/512KiB 100.0 delivery_pct, 17 pubs/s
HotSubjectConcentrated/coder/standalone/subs5000/512KiB 100.0 delivery_pct, 3.8 pubs/s
- Smoke -benchtime=100x sweeps of BenchmarkPubsub, HighCardinality,
HotSubjectConcentrated, ThinFanout pass at 100% delivery for both
native and coder modes.
Measured per-sub overhead (coder mode, -benchtime=100x):
- ~6 goroutines per subscription (1010 -> 6010 at 1000 subs;
30000 at 5000 subs). Higher than the plan's 3-4 estimate; this is
the nats.go per-conn cost (reader + flusher + ping) plus the
wrapper's drain goroutine.
- ~520-560 KB heap per subscription (e.g. 545 MB / 1000 subs in
HotSubjectConcentrated). Higher than the plan's 120 KB estimate,
again driven by nats.go's per-Conn allocations (read/write
buffers, parse state).
Throughput (coder vs native, 100x):
- standalone/subj1/512KiB: 141 vs 135 pubs/s
- standalone/subj10/512KiB: 692 vs 1076 pubs/s
- cluster10/subj1/512KiB: 58.7 vs 28.6 pubs/s
- HotSubjectConcentrated subs5000/512KiB: 3.7 vs 3.4 pubs/s
Known follow-up:
- BenchmarkPubsubThinFanout/coder/cluster10/subj1/512KiB shortfalls
to ~92% delivery at -benchtime=500x. Hypothesis: with 10 publisher
connections each owning a slot of the cluster mesh AND 10
subscriber connections distributed across 10 replicas, sustained
500-message bursts through routes plus the in-process pipe can
outpace consumers on a few replicas before route flow control
catches up. This is a route-side limit, not the per-client-MaxPending
one this change targeted; it did not regress (100x passes 100%).
Per plan section 9, no NoEcho compatibility shims, origin headers,
TCP loopback variants, JetStream, or workspaceupdates refactors are
introduced.
Drop all clustering and peer-refresh surface from the core review slice: - delete cluster.go (Peer, PeerProvider, StaticPeerProvider, ErrNoEmbeddedServer, route URL helpers) - delete cluster_test.go, cluster_refresh_test.go, testutil_test.go - remove ClusterName, ClusterHost, ClusterPort, ClusterAdvertise, PeerProvider, RoutePoolSize from Options; remove DefaultClusterName and DefaultRoutePoolSize constants - remove RefreshPeers, dropSelfRoutes, currentRoutes, refreshMu, serverOpts, provider fields from Pubsub - collapse startEmbeddedServer and buildServerOptions to a standalone non-clustered local loopback listener - drop the ErrNoEmbeddedServer sanity test in serverstats_test.go go test ./coderd/x/nats -count=1 passes.
- Rename pubConns -> publishPool and subConns -> subscribePool to match reviewer feedback on pubsub.Pubsub. - Remove NewFromConn and its tests. The wrapper now always owns its embedded server and connection pools, so the ownsServer/ownsPubConns/ ownsSubConns flags and their gated Close paths are gone. - Close unconditionally drains every publisher/subscriber connection and shuts down the embedded server when present.
…st; unify pickConn Trim the experimental coderd/x/nats foundation to the minimum needed for the initial PR: - Replace pickPubConn/pickSubConn with a single pickConn(pool, subject) helper. The publish and subscribe paths call it directly against p.publishPool and p.subscribePool. - Remove ServerStats / Pubsub.ServerStats and its tests. The embedded server snapshot is not needed for the initial PR. - Remove Options.WriteBufferSize and its nats.go wiring. Performance tuning for the per-conn flush threshold can be revisited later. - Remove stress_test.go and write_buffer_test.go. Validation: go test ./coderd/x/nats -count=1
Remove the custom Subject abstraction (LegacyEventSubject, BuildSubject, ValidateSubject, ValidateToken, DefaultSubjectPrefix, and related sentinel errors). Existing pubsub event names are valid NATS subjects, so Pubsub.Publish and SubscribeWithErr now use the incoming event string directly as both the NATS subject and the key into the shared subscription maps. Drop natsgo.SkipSubjectValidation() so nats.go performs its standard subject validation on the hot path; we no longer pre-validate.
Collaborator
Author
This stack of pull requests is managed by Graphite. Learn more about stacking. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

No description provided.