Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions coderd/x/nats/cluster_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ func Test_parsePeerAddresses(t *testing.T) {
}, routeStrings(routes))
})

// Regression: in production the relay URL host carries the coderd HTTP
// port (e.g. 8080), and routes must be rewritten to the NATS cluster
// port. This only works because New defaults ClusterPort to
// defaultClusterPort; if it were left at the zero value the rewrite
// would be skipped and routes would dial the HTTP port.
t.Run("RewritesRelayHTTPPort", func(t *testing.T) {
t.Parallel()
ps := &Pubsub{}
ps.opts.ClusterPort = defaultClusterPort
routes, err := ps.parsePeerAddresses([]string{"http://10.0.0.7:8080"})
require.NoError(t, err)
require.Equal(t, []string{"nats://10.0.0.7:6222"}, routeStrings(routes))
})

t.Run("Empty", func(t *testing.T) {
t.Parallel()
ps := &Pubsub{}
Expand Down Expand Up @@ -186,6 +200,20 @@ func (f *testPeerFetcher) PrimaryPeerAddresses() []string {
return f.addresses
}

// TestPubsub_New_DefaultsClusterPort guards the production wiring: New
// must persist the default cluster port onto opts so the peer route
// rewrite in parsePeerAddresses recognizes prod and forces routes to the
// NATS port. The cli constructs Options without a ClusterPort, so leaving
// it at the zero value made every replica dial peers at the relay URL's
// HTTP port instead of the NATS route port.
func TestPubsub_New_DefaultsClusterPort(t *testing.T) {
t.Parallel()
// defaultTestOptions disables clustering (no fixed-port listener to
// collide with parallel tests) and leaves ClusterPort unset.
ps := newTestPubsub(t, defaultTestOptions())
require.Equal(t, defaultClusterPort, ps.opts.ClusterPort)
}

func TestPubsub_setPeerAddresses(t *testing.T) {
t.Parallel()
t.Run("OK", func(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions coderd/x/nats/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,17 @@ func (p *Pubsub) buildConnHandlers() connHandlers {
// embedded server and the publisher and subscriber connection pools.
// Close shuts down all owned resources.
func New(ctx context.Context, logger slog.Logger, opts Options) (*Pubsub, error) {
// Persist the default cluster port onto opts so it is the same value the
// listener (buildServerOptions) binds and the value parsePeerAddresses
// compares against. parsePeerAddresses overwrites each peer's parsed port
// with defaultClusterPort, but only when opts.ClusterPort already equals
// defaultClusterPort. Callers like the cli leave ClusterPort at 0, so
// without this that branch is skipped and peers are dialed on the relay
// URL's port (e.g. 8080) instead of the NATS route port (6222).
if opts.ClusterPort == 0 {
opts.ClusterPort = defaultClusterPort
}

sopts, err := buildServerOptions(opts)
if err != nil {
return nil, err
Expand Down
Loading