1717
1818import com .datastax .oss .driver .api .core .AsyncAutoCloseable ;
1919import com .datastax .oss .driver .api .core .Cluster ;
20+ import com .datastax .oss .driver .api .core .CqlIdentifier ;
21+ import com .datastax .oss .driver .api .core .DriverException ;
2022import com .datastax .oss .driver .api .core .context .DriverContext ;
23+ import com .datastax .oss .driver .api .core .cql .CqlSession ;
2124import com .datastax .oss .driver .api .core .metadata .Metadata ;
25+ import com .datastax .oss .driver .api .core .session .Session ;
2226import com .datastax .oss .driver .internal .core .context .InternalDriverContext ;
2327import com .datastax .oss .driver .internal .core .metadata .MetadataManager ;
2428import com .datastax .oss .driver .internal .core .metadata .NodeStateManager ;
29+ import com .datastax .oss .driver .internal .core .session .DefaultSession ;
2530import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
2631import com .datastax .oss .driver .internal .core .util .concurrent .RunOrSchedule ;
32+ import com .datastax .oss .driver .internal .core .util .concurrent .UncaughtExceptions ;
2733import com .google .common .collect .ImmutableList ;
2834import io .netty .util .concurrent .EventExecutor ;
2935import java .net .InetSocketAddress ;
@@ -72,6 +78,13 @@ public DriverContext getContext() {
7278 return context ;
7379 }
7480
81+ @ Override
82+ public CompletionStage <CqlSession > connectAsync (CqlIdentifier keyspace ) {
83+ CompletableFuture <CqlSession > connectFuture = new CompletableFuture <>();
84+ RunOrSchedule .on (adminExecutor , () -> singleThreaded .connect (keyspace , connectFuture ));
85+ return connectFuture ;
86+ }
87+
7588 @ Override
7689 public CompletionStage <Void > closeFuture () {
7790 return singleThreaded .closeFuture ;
@@ -99,12 +112,15 @@ private class SingleThreaded {
99112 private final CompletableFuture <Void > closeFuture = new CompletableFuture <>();
100113 private boolean closeWasCalled ;
101114 private boolean forceCloseWasCalled ;
102- private List <CompletionStage <Void >> childrenCloseFutures ;
115+ // Note: closed sessions are not removed from the list. If this creates a memory issue, there
116+ // is something really wrong in the client program
117+ private List <Session > sessions ;
103118
104119 private SingleThreaded (InternalDriverContext context , Set <InetSocketAddress > contactPoints ) {
105120 this .context = context ;
106121 this .nodeStateManager = new NodeStateManager (context );
107122 this .initialContactPoints = contactPoints ;
123+ this .sessions = new ArrayList <>();
108124 }
109125
110126 private void init () {
@@ -152,6 +168,30 @@ private void init() {
152168 });
153169 }
154170
171+ private void connect (CqlIdentifier keyspace , CompletableFuture <CqlSession > connectFuture ) {
172+ assert adminExecutor .inEventLoop ();
173+ if (closeWasCalled ) {
174+ connectFuture .completeExceptionally (new DriverException ("Cluster was closed" ));
175+ } else {
176+ DefaultSession .init (context , keyspace )
177+ .whenCompleteAsync (
178+ (session , error ) -> {
179+ if (error != null ) {
180+ connectFuture .completeExceptionally (error );
181+ } else if (closeWasCalled ) {
182+ connectFuture .completeExceptionally (
183+ new DriverException ("Cluster was closed while session was initializing" ));
184+ session .forceCloseAsync ();
185+ } else {
186+ sessions .add (session );
187+ connectFuture .complete (session );
188+ }
189+ },
190+ adminExecutor )
191+ .exceptionally (UncaughtExceptions ::log );
192+ }
193+ }
194+
155195 private void close () {
156196 assert adminExecutor .inEventLoop ();
157197 if (closeWasCalled ) {
@@ -160,12 +200,13 @@ private void close() {
160200 closeWasCalled = true ;
161201
162202 LOG .debug ("Closing {}" , this );
163- childrenCloseFutures = new ArrayList <>();
203+ List < CompletionStage < Void >> childrenCloseStages = new ArrayList <>();
164204 for (AsyncAutoCloseable closeable : internalComponentsToClose ()) {
165205 LOG .debug ("Closing {}" , closeable );
166- childrenCloseFutures .add (closeable .closeAsync ());
206+ childrenCloseStages .add (closeable .closeAsync ());
167207 }
168- CompletableFutures .whenAllDone (childrenCloseFutures , this ::onChildrenClosed , adminExecutor );
208+ CompletableFutures .whenAllDone (
209+ childrenCloseStages , () -> onChildrenClosed (childrenCloseStages ), adminExecutor );
169210 }
170211
171212 private void forceClose () {
@@ -178,26 +219,27 @@ private void forceClose() {
178219 LOG .debug ("Force-closing {} (was {}closed before)" , this , (closeWasCalled ? "" : "not " ));
179220
180221 if (closeWasCalled ) {
181- // childrenCloseFutures is already created, and onChildrenClosed has already been called
222+ // onChildrenClosed has already been called
182223 for (AsyncAutoCloseable closeable : internalComponentsToClose ()) {
183224 LOG .debug ("Force-closing {}" , closeable );
184225 closeable .forceCloseAsync ();
185226 }
186227 } else {
187228 closeWasCalled = true ;
188- childrenCloseFutures = new ArrayList <>();
229+ List < CompletionStage < Void >> childrenCloseStages = new ArrayList <>();
189230 for (AsyncAutoCloseable closeable : internalComponentsToClose ()) {
190231 LOG .debug ("Force-closing {}" , closeable );
191- childrenCloseFutures .add (closeable .forceCloseAsync ());
232+ childrenCloseStages .add (closeable .forceCloseAsync ());
192233 }
193- CompletableFutures .whenAllDone (childrenCloseFutures , this ::onChildrenClosed , adminExecutor );
234+ CompletableFutures .whenAllDone (
235+ childrenCloseStages , () -> onChildrenClosed (childrenCloseStages ), adminExecutor );
194236 }
195237 }
196238
197- private void onChildrenClosed () {
239+ private void onChildrenClosed (List < CompletionStage < Void >> childrenCloseStages ) {
198240 assert adminExecutor .inEventLoop ();
199- for (CompletionStage <Void > future : childrenCloseFutures ) {
200- warnIfFailed (future );
241+ for (CompletionStage <Void > stage : childrenCloseStages ) {
242+ warnIfFailed (stage );
201243 }
202244 context
203245 .nettyOptions ()
@@ -221,11 +263,14 @@ private void warnIfFailed(CompletionStage<Void> stage) {
221263 }
222264
223265 private List <AsyncAutoCloseable > internalComponentsToClose () {
224- return ImmutableList .of (
225- nodeStateManager ,
226- metadataManager ,
227- context .topologyMonitor (),
228- context .controlConnection ());
266+ return ImmutableList .<AsyncAutoCloseable >builder ()
267+ .addAll (sessions )
268+ .add (
269+ nodeStateManager ,
270+ metadataManager ,
271+ context .topologyMonitor (),
272+ context .controlConnection ())
273+ .build ();
229274 }
230275 }
231276}
0 commit comments