From 47251b4b2b4973a91577343c978c0942eb70e656 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Fri, 19 Jun 2026 21:27:36 +0000 Subject: [PATCH] fix(coderd/x/nats): default ClusterPort so cluster routes form New left Options.ClusterPort at zero when callers (the cli path) didn't set it. parsePeerAddresses overwrites each peer's parsed port with defaultClusterPort (6222), but only when opts.ClusterPort already equals defaultClusterPort; with the zero value that branch is skipped, so replicas dialed peers on the relay URL's HTTP port (8080) instead of 6222 and no cluster routes ever formed. Default ClusterPort in New so the listener and the peer-port rewrite agree. Adds regression tests. Co-authored-by: Mux --- coderd/x/nats/cluster_internal_test.go | 28 ++++++++++++++++++++++++++ coderd/x/nats/pubsub.go | 11 ++++++++++ 2 files changed, 39 insertions(+) diff --git a/coderd/x/nats/cluster_internal_test.go b/coderd/x/nats/cluster_internal_test.go index 5d70d74f87996..e3dc10c29f0f1 100644 --- a/coderd/x/nats/cluster_internal_test.go +++ b/coderd/x/nats/cluster_internal_test.go @@ -48,6 +48,20 @@ func Test_parsePeerAddresses(t *testing.T) { }, routeStrings(routes)) }) + // Regression: in production the relay URL host carries the coderd HTTP + // port (e.g. 8080), and routes must be rewritten to the NATS cluster + // port. This only works because New defaults ClusterPort to + // defaultClusterPort; if it were left at the zero value the rewrite + // would be skipped and routes would dial the HTTP port. + t.Run("RewritesRelayHTTPPort", func(t *testing.T) { + t.Parallel() + ps := &Pubsub{} + ps.opts.ClusterPort = defaultClusterPort + routes, err := ps.parsePeerAddresses([]string{"http://10.0.0.7:8080"}) + require.NoError(t, err) + require.Equal(t, []string{"nats://10.0.0.7:6222"}, routeStrings(routes)) + }) + t.Run("Empty", func(t *testing.T) { t.Parallel() ps := &Pubsub{} @@ -186,6 +200,20 @@ func (f *testPeerFetcher) PrimaryPeerAddresses() []string { return f.addresses } +// TestPubsub_New_DefaultsClusterPort guards the production wiring: New +// must persist the default cluster port onto opts so the peer route +// rewrite in parsePeerAddresses recognizes prod and forces routes to the +// NATS port. The cli constructs Options without a ClusterPort, so leaving +// it at the zero value made every replica dial peers at the relay URL's +// HTTP port instead of the NATS route port. +func TestPubsub_New_DefaultsClusterPort(t *testing.T) { + t.Parallel() + // defaultTestOptions disables clustering (no fixed-port listener to + // collide with parallel tests) and leaves ClusterPort unset. + ps := newTestPubsub(t, defaultTestOptions()) + require.Equal(t, defaultClusterPort, ps.opts.ClusterPort) +} + func TestPubsub_setPeerAddresses(t *testing.T) { t.Parallel() t.Run("OK", func(t *testing.T) { diff --git a/coderd/x/nats/pubsub.go b/coderd/x/nats/pubsub.go index d6e20d6327fac..681cdd4a941ae 100644 --- a/coderd/x/nats/pubsub.go +++ b/coderd/x/nats/pubsub.go @@ -274,6 +274,17 @@ func (p *Pubsub) buildConnHandlers() connHandlers { // embedded server and the publisher and subscriber connection pools. // Close shuts down all owned resources. func New(ctx context.Context, logger slog.Logger, opts Options) (*Pubsub, error) { + // Persist the default cluster port onto opts so it is the same value the + // listener (buildServerOptions) binds and the value parsePeerAddresses + // compares against. parsePeerAddresses overwrites each peer's parsed port + // with defaultClusterPort, but only when opts.ClusterPort already equals + // defaultClusterPort. Callers like the cli leave ClusterPort at 0, so + // without this that branch is skipped and peers are dialed on the relay + // URL's port (e.g. 8080) instead of the NATS route port (6222). + if opts.ClusterPort == 0 { + opts.ClusterPort = defaultClusterPort + } + sopts, err := buildServerOptions(opts) if err != nil { return nil, err