@@ -384,16 +384,30 @@ private void init(CqlIdentifier keyspace) {
384384 .getTopologyMonitor ()
385385 .init ()
386386 .thenCompose (v -> metadataManager .refreshNodes ())
387- .thenAccept (v -> afterInitialNodeListRefresh (keyspace ))
388- .exceptionally (
389- error -> {
390- initFuture .completeExceptionally (error );
391- RunOrSchedule .on (adminExecutor , this ::close );
392- return null ;
387+ .thenCompose (v -> checkProtocolVersion ())
388+ .thenCompose (v -> initialSchemaRefresh ())
389+ .thenCompose (v -> initializePools (keyspace ))
390+ .whenComplete (
391+ (v , error ) -> {
392+ if (error == null ) {
393+ LOG .debug ("[{}] Initialization complete, ready" , logPrefix );
394+ notifyListeners ();
395+ initFuture .complete (DefaultSession .this );
396+ } else {
397+ LOG .debug ("[{}] Initialization failed, force closing" , logPrefix , error );
398+ forceCloseAsync ()
399+ .whenComplete (
400+ (v1 , error1 ) -> {
401+ if (error1 != null ) {
402+ error .addSuppressed (error1 );
403+ }
404+ initFuture .completeExceptionally (error );
405+ });
406+ }
393407 });
394408 }
395409
396- private void afterInitialNodeListRefresh ( CqlIdentifier keyspace ) {
410+ private CompletionStage < Void > checkProtocolVersion ( ) {
397411 try {
398412 boolean protocolWasForced =
399413 context .getConfig ().getDefaultProfile ().isDefined (DefaultDriverOption .PROTOCOL_VERSION );
@@ -426,48 +440,39 @@ private void afterInitialNodeListRefresh(CqlIdentifier keyspace) {
426440 bestVersion );
427441 }
428442 }
429- metadataManager
443+ return CompletableFuture .completedFuture (null );
444+ } catch (Throwable throwable ) {
445+ return CompletableFutures .failedFuture (throwable );
446+ }
447+ }
448+
449+ private CompletionStage <RefreshSchemaResult > initialSchemaRefresh () {
450+ try {
451+ return metadataManager
430452 .refreshSchema (null , false , true )
431- .whenComplete (
432- (metadata , error ) -> {
433- if (error != null ) {
434- Loggers .warnWithException (
435- LOG ,
436- "[{}] Unexpected error while refreshing schema during initialization, "
437- + "keeping previous version" ,
438- logPrefix ,
439- error );
440- }
441- afterInitialSchemaRefresh (keyspace );
453+ .exceptionally (
454+ error -> {
455+ Loggers .warnWithException (
456+ LOG ,
457+ "[{}] Unexpected error while refreshing schema during initialization, "
458+ + "proceeding without schema metadata" ,
459+ logPrefix ,
460+ error );
461+ return null ;
442462 });
443463 } catch (Throwable throwable ) {
444- initFuture . completeExceptionally (throwable );
464+ return CompletableFutures . failedFuture (throwable );
445465 }
446466 }
447467
448- private void afterInitialSchemaRefresh (CqlIdentifier keyspace ) {
468+ private CompletionStage < Void > initializePools (CqlIdentifier keyspace ) {
449469 try {
450470 nodeStateManager .markInitialized ();
451471 context .getLoadBalancingPolicyWrapper ().init ();
452472 context .getConfigLoader ().onDriverInit (context );
453- LOG .debug ("[{}] Initialization complete, ready" , logPrefix );
454- poolManager
455- .init (keyspace )
456- .whenComplete (
457- (v , error ) -> {
458- if (error != null ) {
459- initFuture .completeExceptionally (error );
460- } else {
461- notifyListeners ();
462- initFuture .complete (DefaultSession .this );
463- }
464- });
473+ return poolManager .init (keyspace );
465474 } catch (Throwable throwable ) {
466- forceCloseAsync ()
467- .whenComplete (
468- (v , error ) -> {
469- initFuture .completeExceptionally (throwable );
470- });
475+ return CompletableFutures .failedFuture (throwable );
471476 }
472477 }
473478
0 commit comments