@@ -13,6 +13,7 @@ import (
1313 "time"
1414
1515 "github.com/google/uuid"
16+ lru "github.com/hashicorp/golang-lru/v2"
1617 "golang.org/x/xerrors"
1718
1819 "cdr.dev/slog"
@@ -24,15 +25,22 @@ import (
2425// that uses PostgreSQL pubsub to exchange handshakes.
2526func NewCoordinator (logger slog.Logger , pubsub database.Pubsub ) (agpl.Coordinator , error ) {
2627 ctx , cancelFunc := context .WithCancel (context .Background ())
28+
29+ nameCache , err := lru.New [uuid.UUID , string ](512 )
30+ if err != nil {
31+ panic ("make lru cache: " + err .Error ())
32+ }
33+
2734 coord := & haCoordinator {
2835 id : uuid .New (),
2936 log : logger ,
3037 pubsub : pubsub ,
3138 closeFunc : cancelFunc ,
3239 close : make (chan struct {}),
3340 nodes : map [uuid.UUID ]* agpl.Node {},
34- agentSockets : map [uuid.UUID ]net.Conn {},
35- agentToConnectionSockets : map [uuid.UUID ]map [uuid.UUID ]net.Conn {},
41+ agentSockets : map [uuid.UUID ]* agpl.TrackedConn {},
42+ agentToConnectionSockets : map [uuid.UUID ]map [uuid.UUID ]* agpl.TrackedConn {},
43+ agentNameCache : nameCache ,
3644 }
3745
3846 if err := coord .runPubsub (ctx ); err != nil {
@@ -53,10 +61,14 @@ type haCoordinator struct {
5361 // nodes maps agent and connection IDs their respective node.
5462 nodes map [uuid.UUID ]* agpl.Node
5563 // agentSockets maps agent IDs to their open websocket.
56- agentSockets map [uuid.UUID ]net. Conn
64+ agentSockets map [uuid.UUID ]* agpl. TrackedConn
5765 // agentToConnectionSockets maps agent IDs to connection IDs of conns that
5866 // are subscribed to updates for that agent.
59- agentToConnectionSockets map [uuid.UUID ]map [uuid.UUID ]net.Conn
67+ agentToConnectionSockets map [uuid.UUID ]map [uuid.UUID ]* agpl.TrackedConn
68+
69+ // agentNameCache holds a cache of agent names. If one of them disappears,
70+ // it's helpful to have a name cached for debugging.
71+ agentNameCache * lru.Cache [uuid.UUID , string ]
6072}
6173
6274// Node returns an in-memory node by ID.
@@ -94,12 +106,18 @@ func (c *haCoordinator) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID
94106 c .mutex .Lock ()
95107 connectionSockets , ok := c .agentToConnectionSockets [agent ]
96108 if ! ok {
97- connectionSockets = map [uuid.UUID ]net. Conn {}
109+ connectionSockets = map [uuid.UUID ]* agpl. TrackedConn {}
98110 c .agentToConnectionSockets [agent ] = connectionSockets
99111 }
100112
101- // Insert this connection into a map so the agent can publish node updates.
102- connectionSockets [id ] = conn
113+ now := time .Now ().Unix ()
114+ // Insert this connection into a map so the agent
115+ // can publish node updates.
116+ connectionSockets [id ] = & agpl.TrackedConn {
117+ Conn : conn ,
118+ Start : now ,
119+ LastWrite : now ,
120+ }
103121 c .mutex .Unlock ()
104122
105123 defer func () {
@@ -176,7 +194,9 @@ func (c *haCoordinator) handleNextClientMessage(id, agent uuid.UUID, decoder *js
176194
177195// ServeAgent accepts a WebSocket connection to an agent that listens to
178196// incoming connections and publishes node updates.
179- func (c * haCoordinator ) ServeAgent (conn net.Conn , id uuid.UUID , _ string ) error {
197+ func (c * haCoordinator ) ServeAgent (conn net.Conn , id uuid.UUID , name string ) error {
198+ c .agentNameCache .Add (id , name )
199+
180200 // Tell clients on other instances to send a callmemaybe to us.
181201 err := c .publishAgentHello (id )
182202 if err != nil {
@@ -196,21 +216,41 @@ func (c *haCoordinator) ServeAgent(conn net.Conn, id uuid.UUID, _ string) error
196216 }
197217 }
198218
219+ // This uniquely identifies a connection that belongs to this goroutine.
220+ unique := uuid .New ()
221+ now := time .Now ().Unix ()
222+ overwrites := int64 (0 )
223+
199224 // If an old agent socket is connected, we close it
200225 // to avoid any leaks. This shouldn't ever occur because
201226 // we expect one agent to be running.
202227 c .mutex .Lock ()
203228 oldAgentSocket , ok := c .agentSockets [id ]
204229 if ok {
230+ overwrites = oldAgentSocket .Overwrites + 1
205231 _ = oldAgentSocket .Close ()
206232 }
207- c .agentSockets [id ] = conn
233+ c .agentSockets [id ] = & agpl.TrackedConn {
234+ ID : unique ,
235+ Conn : conn ,
236+
237+ Name : name ,
238+ Start : now ,
239+ LastWrite : now ,
240+ Overwrites : overwrites ,
241+ }
208242 c .mutex .Unlock ()
243+
209244 defer func () {
210245 c .mutex .Lock ()
211246 defer c .mutex .Unlock ()
212- delete (c .agentSockets , id )
213- delete (c .nodes , id )
247+
248+ // Only delete the connection if it's ours. It could have been
249+ // overwritten.
250+ if idConn , ok := c .agentSockets [id ]; ok && idConn .ID == unique {
251+ delete (c .agentSockets , id )
252+ delete (c .nodes , id )
253+ }
214254 }()
215255
216256 decoder := json .NewDecoder (conn )
@@ -576,8 +616,14 @@ func (c *haCoordinator) formatAgentUpdate(id uuid.UUID, node *agpl.Node) ([]byte
576616 return buf .Bytes (), nil
577617}
578618
579- func (* haCoordinator ) ServeHTTPDebug (w http.ResponseWriter , _ * http.Request ) {
619+ func (c * haCoordinator ) ServeHTTPDebug (w http.ResponseWriter , r * http.Request ) {
580620 w .Header ().Set ("Content-Type" , "text/html; charset=utf-8" )
581- fmt .Fprintf (w , "<h1>coordinator</h1>" )
582- fmt .Fprintf (w , "<h2>ha debug coming soon</h2>" )
621+
622+ c .mutex .RLock ()
623+ defer c .mutex .RUnlock ()
624+
625+ fmt .Fprintln (w , "<h1>high-availability wireguard coordinator debug</h1>" )
626+ fmt .Fprintln (w , "<h4 style=\" margin-top:-25px\" >warning: this only provides info from the node that served the request, if there are multiple replicas this data may be incomplete</h4>" )
627+
628+ agpl .CoordinatorHTTPDebug (c .agentSockets , c .agentToConnectionSockets , c .agentNameCache )(w , r )
583629}
0 commit comments