1919import java .net .InetSocketAddress ;
2020import java .net .UnknownHostException ;
2121import java .util .*;
22- import java .util .concurrent .ExecutionException ;
23- import java .util .concurrent .TimeUnit ;
22+ import java .util .concurrent .*;
2423import java .util .concurrent .atomic .AtomicReference ;
2524
2625import com .google .common .base .Objects ;
@@ -258,11 +257,14 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
258257 // the node we're connecting to.
259258 refreshNodeListAndTokenMap (connection , cluster , isInitialConnection , true );
260259
261- // Note that refreshing the schema will trigger refreshNodeListAndTokenMap since table == null
262- // We want that because the token map was not properly initialized by the first call above, since it requires the list of keyspaces
263- // to be loaded.
264260 logger .debug ("[Control connection] Refreshing schema" );
265- refreshSchema (connection , null , null , cluster , isInitialConnection );
261+ refreshSchema (connection , null , null , cluster );
262+
263+ // We need to refresh the node list again;
264+ // We want that because the token map was not properly initialized by the first call above,
265+ // since it requires the list of keyspaces to be loaded.
266+ refreshNodeListAndTokenMap (connection , cluster , false , true );
267+
266268 return connection ;
267269 } catch (BusyConnectionException e ) {
268270 connection .closeAsync ().force ();
@@ -283,13 +285,22 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
283285 }
284286
285287 public void refreshSchema (String keyspace , String table ) throws InterruptedException {
286- logger .debug ("[Control connection] Refreshing schema for {}{}" , keyspace == null ? "" : keyspace , table == null ? "" : '.' + table );
288+ if (keyspace == null && table == null ) {
289+ logger .debug ("[Control connection] Refreshing schema" );
290+ } else {
291+ logger .debug ("[Control connection] Refreshing schema for {}{}" , keyspace == null ? "" : keyspace , table == null ? "" : '.' + table );
292+ }
287293 try {
288294 Connection c = connectionRef .get ();
289295 // At startup, when we add the initial nodes, this will be null, which is ok
290296 if (c == null )
291297 return ;
292- refreshSchema (c , keyspace , table , cluster , false );
298+ refreshSchema (c , keyspace , table , cluster );
299+ // If the table is null, we either rebuild all from scratch or have an updated keyspace. In both cases, rebuild the token map
300+ // since some replication on some keyspace may have changed
301+ if (table == null ) {
302+ cluster .submitNodeListRefresh ();
303+ }
293304 } catch (ConnectionException e ) {
294305 logger .debug ("[Control connection] Connection error while refreshing schema ({})" , e .getMessage ());
295306 signalError ();
@@ -304,7 +315,7 @@ public void refreshSchema(String keyspace, String table) throws InterruptedExcep
304315 }
305316 }
306317
307- static void refreshSchema (Connection connection , String keyspace , String table , Cluster .Manager cluster , boolean isInitialConnection ) throws ConnectionException , BusyConnectionException , ExecutionException , InterruptedException {
318+ static void refreshSchema (Connection connection , String keyspace , String table , Cluster .Manager cluster ) throws ConnectionException , BusyConnectionException , ExecutionException , InterruptedException {
308319 // Make sure we're up to date on schema
309320 String whereClause = "" ;
310321 if (keyspace != null ) {
@@ -345,11 +356,6 @@ static void refreshSchema(Connection connection, String keyspace, String table,
345356 // So log, but let things go otherwise.
346357 logger .error ("Error parsing schema from Cassandra system tables: the schema in Cluster#getMetadata() will appear incomplete or stale" , e );
347358 }
348-
349- // If the table is null, we either rebuild all from scratch or have an updated keyspace. In both case, rebuild the token map
350- // since some replication on some keyspace may have changed
351- if (table == null )
352- refreshNodeListAndTokenMap (connection , cluster , false , false );
353359 }
354360
355361 public void refreshNodeListAndTokenMap () {
@@ -358,7 +364,6 @@ public void refreshNodeListAndTokenMap() {
358364 if (c == null )
359365 return ;
360366
361- logger .debug ("[Control connection] Refreshing node list and token map" );
362367 try {
363368 refreshNodeListAndTokenMap (c , cluster , false , true );
364369 } catch (ConnectionException e ) {
@@ -591,14 +596,13 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
591596 cluster .metadata .rebuildTokenMap (partitioner , tokenMap );
592597 }
593598
594- static boolean waitForSchemaAgreement (Connection connection , Cluster .Manager cluster ) throws ConnectionException , BusyConnectionException , ExecutionException , InterruptedException {
595-
599+ boolean waitForSchemaAgreement () throws ConnectionException , BusyConnectionException , ExecutionException , InterruptedException {
596600 long start = System .nanoTime ();
597601 long elapsed = 0 ;
598602 int maxSchemaAgreementWaitSeconds = cluster .configuration .getProtocolOptions ().getMaxSchemaAgreementWaitSeconds ();
599603 while (elapsed < maxSchemaAgreementWaitSeconds * 1000 ) {
600604
601- if (checkSchemaAgreement (connection , cluster ))
605+ if (checkSchemaAgreement ())
602606 return true ;
603607
604608 // let's not flood the node too much
@@ -610,7 +614,11 @@ static boolean waitForSchemaAgreement(Connection connection, Cluster.Manager clu
610614 return false ;
611615 }
612616
613- private static boolean checkSchemaAgreement (Connection connection , Cluster .Manager cluster ) throws ConnectionException , BusyConnectionException , InterruptedException , ExecutionException {
617+ boolean checkSchemaAgreement () throws ConnectionException , BusyConnectionException , InterruptedException , ExecutionException {
618+ Connection connection = connectionRef .get ();
619+ if (connection == null || connection .isClosed ())
620+ return false ;
621+
614622 DefaultResultSetFuture peersFuture = new DefaultResultSetFuture (null , new Requests .Query (SELECT_SCHEMA_PEERS ));
615623 DefaultResultSetFuture localFuture = new DefaultResultSetFuture (null , new Requests .Query (SELECT_SCHEMA_LOCAL ));
616624 connection .write (peersFuture );
@@ -636,16 +644,6 @@ private static boolean checkSchemaAgreement(Connection connection, Cluster.Manag
636644 return versions .size () <= 1 ;
637645 }
638646
639- boolean checkSchemaAgreement () {
640- Connection c = connectionRef .get ();
641- try {
642- return c != null && checkSchemaAgreement (c , cluster );
643- } catch (Exception e ) {
644- logger .warn ("Error while checking schema agreement" , e );
645- return false ;
646- }
647- }
648-
649647 boolean isOpen () {
650648 Connection c = connectionRef .get ();
651649 return c != null && !c .isClosed ();
@@ -679,7 +677,7 @@ public void onAdd(Host host) {
679677 // or it's not part of our computed token map
680678 Metadata .TokenMap tkmap = cluster .metadata .tokenMap ;
681679 if (host .getCassandraVersion () == null || tkmap == null || !tkmap .hosts .contains (host ))
682- refreshNodeListAndTokenMap ();
680+ cluster . submitNodeListRefresh ();
683681 }
684682
685683 @ Override
@@ -693,6 +691,6 @@ public void onRemove(Host host) {
693691 backgroundReconnect (0 );
694692 }
695693
696- refreshNodeListAndTokenMap ();
694+ cluster . submitNodeListRefresh ();
697695 }
698696}
0 commit comments