@@ -23,6 +23,7 @@ import (
2323 "fmt"
2424 "log/slog"
2525 "net"
26+ "sync"
2627 "time"
2728
2829 "github.com/wso2/api-platform/gateway/gateway-controller/pkg/apikeyxds"
@@ -45,6 +46,7 @@ type Server struct {
4546 lazyResourceSnapshotMgr * lazyresourcexds.LazyResourceSnapshotManager
4647 port int
4748 tlsConfig * TLSConfig
49+ onFirstConnect chan struct {}
4850 logger * slog.Logger
4951}
5052
@@ -69,6 +71,13 @@ func WithTLS(certFile, keyFile string) ServerOption {
6971 }
7072}
7173
74+ // WithOnFirstConnect sets a channel that will be closed when the first xDS client connects
75+ func WithOnFirstConnect (ch chan struct {}) ServerOption {
76+ return func (s * Server ) {
77+ s .onFirstConnect = ch
78+ }
79+ }
80+
7281// NewServer creates a new policy xDS server
7382func NewServer (snapshotManager * SnapshotManager , apiKeySnapshotMgr * apikeyxds.APIKeySnapshotManager , lazyResourceSnapshotMgr * lazyresourcexds.LazyResourceSnapshotManager , port int , logger * slog.Logger , opts ... ServerOption ) * Server {
7483 s := & Server {
@@ -118,7 +127,11 @@ func NewServer(snapshotManager *SnapshotManager, apiKeySnapshotMgr *apikeyxds.AP
118127 lazyResourceCache := lazyResourceSnapshotMgr .GetCache ()
119128 combinedCache := NewCombinedCache (policyCache , apiKeyCache , lazyResourceCache , logger )
120129
121- callbacks := & serverCallbacks {logger : logger }
130+ callbacks := & serverCallbacks {
131+ logger : logger ,
132+ activeStreams : make (map [int64 ]bool ),
133+ onFirstConnect : s .onFirstConnect ,
134+ }
122135 xdsServer := server .NewServer (context .Background (), combinedCache , callbacks )
123136
124137 // Register ADS (Aggregated Discovery Service) for policy distribution
@@ -160,7 +173,11 @@ func (s *Server) Stop() {
160173
161174// serverCallbacks implements xDS server callbacks for logging and debugging
162175type serverCallbacks struct {
163- logger * slog.Logger
176+ logger * slog.Logger
177+ activeStreams map [int64 ]bool
178+ activeStreamsMu sync.Mutex
179+ onFirstConnect chan struct {}
180+ firstConnectOnce sync.Once
164181}
165182
166183// OnStreamOpen is called when a new stream is opened
@@ -176,6 +193,10 @@ func (cb *serverCallbacks) OnStreamClosed(streamID int64, node *core.Node) {
176193 cb .logger .Info ("Policy xDS stream closed" ,
177194 slog .Int64 ("stream_id" , streamID ),
178195 slog .String ("node_id" , node .GetId ()))
196+
197+ cb .activeStreamsMu .Lock ()
198+ defer cb .activeStreamsMu .Unlock ()
199+ delete (cb .activeStreams , streamID )
179200}
180201
181202// OnStreamRequest is called when a discovery request is received
@@ -185,6 +206,17 @@ func (cb *serverCallbacks) OnStreamRequest(streamID int64, req *discoverygrpc.Di
185206 slog .String ("type_url" , req .GetTypeUrl ()),
186207 slog .String ("version" , req .GetVersionInfo ()),
187208 slog .Any ("resource_names" , req .GetResourceNames ()))
209+
210+ cb .activeStreamsMu .Lock ()
211+ defer cb .activeStreamsMu .Unlock ()
212+
213+ if _ , exists := cb .activeStreams [streamID ]; ! exists {
214+ cb .activeStreams [streamID ] = true
215+ if cb .onFirstConnect != nil {
216+ cb .firstConnectOnce .Do (func () { close (cb .onFirstConnect ) })
217+ }
218+ }
219+
188220 return nil
189221}
190222
0 commit comments