3232import com .datastax .driver .core .exceptions .DriverInternalError ;
3333import com .datastax .driver .core .exceptions .NoHostAvailableException ;
3434
35- import static com .datastax .driver .core .SchemaElement .* ;
35+ import static com .datastax .driver .core .SchemaElement .KEYSPACE ;
3636
3737class ControlConnection {
3838
3939 private static final Logger logger = LoggerFactory .getLogger (ControlConnection .class );
4040
41- private static final TypeCodec .ListCodec <String > LIST_OF_TEXT_CODEC = new TypeCodec .ListCodec <String >(TypeCodec .VarcharCodec .instance );
42-
4341 private static final InetAddress bindAllAddress ;
4442 static
4543 {
@@ -50,13 +48,6 @@ class ControlConnection {
5048 }
5149 }
5250
53- private static final String SELECT_KEYSPACES = "SELECT * FROM system.schema_keyspaces" ;
54- private static final String SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies" ;
55- private static final String SELECT_COLUMNS = "SELECT * FROM system.schema_columns" ;
56- private static final String SELECT_USERTYPES = "SELECT * FROM system.schema_usertypes" ;
57- private static final String SELECT_FUNCTIONS = "SELECT * FROM system.schema_functions" ;
58- private static final String SELECT_AGGREGATES = "SELECT * FROM system.schema_aggregates" ;
59-
6051 private static final String SELECT_PEERS = "SELECT * FROM system.peers" ;
6152 private static final String SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'" ;
6253
@@ -270,7 +261,7 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
270261 // We want that because the token map was not properly initialized by the first call above, since it requires the list of keyspaces
271262 // to be loaded.
272263 logger .debug ("[Control connection] Refreshing schema" );
273- refreshSchema (connection , null , null , null , null , cluster , isInitialConnection );
264+ refreshSchema (connection , null , null , null , null , cluster );
274265 return connection ;
275266 } catch (BusyConnectionException e ) {
276267 connection .closeAsync ().force ();
@@ -299,7 +290,7 @@ public void refreshSchema(SchemaElement targetType, String targetKeyspace, Strin
299290 // At startup, when we add the initial nodes, this will be null, which is ok
300291 if (c == null )
301292 return ;
302- refreshSchema (c , targetType , targetKeyspace , targetName , signature , cluster , false );
293+ refreshSchema (c , targetType , targetKeyspace , targetName , signature , cluster );
303294 } catch (ConnectionException e ) {
304295 logger .debug ("[Control connection] Connection error while refreshing schema ({})" , e .getMessage ());
305296 signalError ();
@@ -314,7 +305,7 @@ public void refreshSchema(SchemaElement targetType, String targetKeyspace, Strin
314305 }
315306 }
316307
317- static void refreshSchema (Connection connection , SchemaElement targetType , String targetKeyspace , String targetName , List <String > targetSignature , Cluster .Manager cluster , boolean isInitialConnection ) throws ConnectionException , BusyConnectionException , ExecutionException , InterruptedException {
308+ static void refreshSchema (Connection connection , SchemaElement targetType , String targetKeyspace , String targetName , List <String > targetSignature , Cluster .Manager cluster ) throws ConnectionException , BusyConnectionException , ExecutionException , InterruptedException {
318309 Host host = cluster .metadata .getHost (connection .address );
319310 // Neither host, nor it's version should be null. But instead of dying if there is a race or something, we can kind of try to infer
320311 // a Cassandra version from the protocol version (this is not full proof, we can have the protocol 1 against C* 2.0+, but it's worth
@@ -328,83 +319,17 @@ static void refreshSchema(Connection connection, SchemaElement targetType, Strin
328319 cassandraVersion = host .getCassandraVersion ();
329320 }
330321
331- // Make sure we're up to date on schema
332- String whereClause = "" ;
333- if (targetType != null ) {
334- whereClause = " WHERE keyspace_name = '" + targetKeyspace + '\'' ;
335- if (targetType == TABLE )
336- whereClause += " AND columnfamily_name = '" + targetName + '\'' ;
337- else if (targetType == TYPE )
338- whereClause += " AND type_name = '" + targetName + '\'' ;
339- else if (targetType == FUNCTION )
340- whereClause += " AND function_name = '" + targetName + "' AND signature = " + LIST_OF_TEXT_CODEC .format (targetSignature );
341- else if (targetType == AGGREGATE )
342- whereClause += " AND aggregate_name = '" + targetName + "' AND signature = " + LIST_OF_TEXT_CODEC .format (targetSignature );
343- }
344-
345- boolean isSchemaOrKeyspace = (targetType == null || targetType == KEYSPACE );
346- DefaultResultSetFuture ksFuture = isSchemaOrKeyspace
347- ? new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (SELECT_KEYSPACES + whereClause ))
348- : null ;
349- DefaultResultSetFuture udtFuture = (isSchemaOrKeyspace && supportsUdts (cassandraVersion ) || targetType == TYPE )
350- ? new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (SELECT_USERTYPES + whereClause ))
351- : null ;
352- DefaultResultSetFuture cfFuture = (isSchemaOrKeyspace || targetType == TABLE )
353- ? new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (SELECT_COLUMN_FAMILIES + whereClause ))
354- : null ;
355- DefaultResultSetFuture colsFuture = (isSchemaOrKeyspace || targetType == TABLE )
356- ? new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (SELECT_COLUMNS + whereClause ))
357- : null ;
358- DefaultResultSetFuture functionsFuture = (isSchemaOrKeyspace && supportsUdfs (cassandraVersion ) || targetType == FUNCTION )
359- ? new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (SELECT_FUNCTIONS + whereClause ))
360- : null ;
361- DefaultResultSetFuture aggregatesFuture = (isSchemaOrKeyspace && supportsUdfs (cassandraVersion ) || targetType == AGGREGATE )
362- ? new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (SELECT_AGGREGATES + whereClause ))
363- : null ;
364-
365- if (ksFuture != null )
366- connection .write (ksFuture );
367- if (udtFuture != null )
368- connection .write (udtFuture );
369- if (cfFuture != null )
370- connection .write (cfFuture );
371- if (colsFuture != null )
372- connection .write (colsFuture );
373- if (functionsFuture != null )
374- connection .write (functionsFuture );
375- if (aggregatesFuture != null )
376- connection .write (aggregatesFuture );
377-
378- try {
379- cluster .metadata .rebuildSchema (targetType , targetKeyspace , targetName ,
380- ksFuture == null ? null : ksFuture .get (),
381- udtFuture == null ? null : udtFuture .get (),
382- cfFuture == null ? null : cfFuture .get (),
383- colsFuture == null ? null : colsFuture .get (),
384- functionsFuture == null ? null : functionsFuture .get (),
385- aggregatesFuture == null ? null : aggregatesFuture .get (),
386- cassandraVersion );
387- } catch (RuntimeException e ) {
388- // Failure to parse the schema is definitively wrong so log a full-on error, but this won't generally prevent queries to
389- // work and this can happen when new Cassandra versions modify stuff in the schema and the driver hasn't yet be modified.
390- // So log, but let things go otherwise.
391- logger .error ("Error parsing schema from Cassandra system tables: the schema in Cluster#getMetadata() will appear incomplete or stale" , e );
392- }
322+ SchemaParser .forVersion (cassandraVersion )
323+ .refresh (cluster .metadata ,
324+ targetType , targetKeyspace , targetName , targetSignature ,
325+ connection , cassandraVersion );
393326
394327 // If we rebuild all from scratch or have an updated keyspace, rebuild the token map since some replication on some keyspace
395328 // may have changed
396- if (isSchemaOrKeyspace )
329+ if (( targetType == null || targetType == KEYSPACE ) )
397330 refreshNodeListAndTokenMap (connection , cluster , false , false );
398331 }
399332
400- private static boolean supportsUdts (VersionNumber cassandraVersion ) {
401- return cassandraVersion .getMajor () > 2 || (cassandraVersion .getMajor () == 2 && cassandraVersion .getMinor () >= 1 );
402- }
403-
404- private static boolean supportsUdfs (VersionNumber cassandraVersion ) {
405- return cassandraVersion .getMajor () > 2 || (cassandraVersion .getMajor () == 2 && cassandraVersion .getMinor () >= 2 );
406- }
407-
408333 public void refreshNodeListAndTokenMap () {
409334 Connection c = connectionRef .get ();
410335 // At startup, when we add the initial nodes, this will be null, which is ok
0 commit comments