Skip to content

Commit 8c0778c

Browse files
author
Alexandre Dutra
committed
JAVA-876: Support new system tables in C* 3.0.0-alpha1.
1 parent 7cfbf27 commit 8c0778c

14 files changed

Lines changed: 754 additions & 357 deletions

changelog/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
## Changelog
22

3+
### 3.0.0-alpha1
4+
5+
- [new feature] Support new system tables in C* 3.0.0-alpha1 (JAVA-876)
6+
7+
Merged from 2.2 branch:
8+
9+
- [improvement] Rename DateWithoutTime to LocalDate (JAVA-810)
10+
- [bug] DateCodec does not format values correctly (JAVA-816)
11+
- [bug] TimeCodec does not format values correctly (JAVA-817)
12+
- [bug] TypeCodec.getDataTypeFor() does not handle LocalDate instances (JAVA-818)
13+
- [improvement] Make ResultSet#fetchMoreResult return a
14+
ListenableFuture<ResultSet> (JAVA-836)
15+
- [improvement] Disable frozen checks in mapper (JAVA-843)
16+
- [improvement] Allow user to register custom type codecs (JAVA-721)
17+
- [improvement] Support custom type codecs in mapper (JAVA-722)
18+
319
### 2.2.0-rc2
420

521
- [improvement] Rename DateWithoutTime to LocalDate (JAVA-810)

driver-core/src/main/java/com/datastax/driver/core/AggregateMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class AggregateMetadata {
4040
private final DataType stateType;
4141
private final TypeCodec<Object> stateTypeCodec;
4242

43-
public AggregateMetadata(KeyspaceMetadata keyspace, String fullName, String simpleName, List<DataType> argumentTypes,
43+
private AggregateMetadata(KeyspaceMetadata keyspace, String fullName, String simpleName, List<DataType> argumentTypes,
4444
String finalFuncSimpleName, String finalFuncFullName, Object initCond, DataType returnType,
4545
String stateFuncSimpleName, String stateFuncFullName, DataType stateType, TypeCodec<Object> stateTypeCodec) {
4646
this.keyspace = keyspace;

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2049,7 +2049,7 @@ public void run() {
20492049
if (!schemaInAgreement)
20502050
logger.warn("No schema agreement from live replicas after {} s. The schema may not be up to date on some nodes.", configuration.getProtocolOptions().getMaxSchemaAgreementWaitSeconds());
20512051
if (refreshSchema)
2052-
ControlConnection.refreshSchema(connection, targetType, targetKeyspace, targetName, targetSignature, Manager.this, false);
2052+
ControlConnection.refreshSchema(connection, targetType, targetKeyspace, targetName, targetSignature, Manager.this);
20532053
} catch (Exception e) {
20542054
if (refreshSchema) {
20552055
logger.error("Error during schema refresh ({}). The schema from Cluster.getMetadata() might appear stale. Asynchronously submitting job to fix.", e.getMessage());
@@ -2183,7 +2183,7 @@ public void runMayThrow() throws InterruptedException, ExecutionException {
21832183
if (scc.targetType == KEYSPACE) {
21842184
manager.metadata.removeKeyspace(scc.targetKeyspace);
21852185
} else {
2186-
KeyspaceMetadata keyspace = manager.metadata.getKeyspaceInternal(scc.targetKeyspace);
2186+
KeyspaceMetadata keyspace = manager.metadata.keyspaces.get(scc.targetKeyspace);
21872187
if (keyspace == null) {
21882188
logger.warn("Received a DROPPED notification for {} {}.{}, but this keyspace is unknown in our metadata",
21892189
scc.targetType, scc.targetKeyspace, scc.targetName);

driver-core/src/main/java/com/datastax/driver/core/ColumnMetadata.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,41 @@ public String toString() {
300300
// exposed publicly at all.
301301
static class Raw {
302302

303-
public enum Kind { PARTITION_KEY, CLUSTERING_KEY, REGULAR, COMPACT_VALUE, STATIC }
303+
public enum Kind {
304+
305+
PARTITION_KEY ("PARTITION_KEY" , "PARTITION_KEY"),
306+
CLUSTERING_COLUMN ("CLUSTERING_KEY", "CLUSTERING" ),
307+
REGULAR ("REGULAR" , "REGULAR" ),
308+
COMPACT_VALUE ("COMPACT_VALUE" , "" ), // v2 only
309+
STATIC ("STATIC" , "STATIC" );
310+
311+
final String v2;
312+
final String v3;
313+
314+
Kind(String v2, String v3) {
315+
this.v2 = v2;
316+
this.v3 = v3;
317+
}
318+
319+
static Kind fromStringV2(String s) {
320+
for (Kind kind : Kind.values()) {
321+
if(kind.v2.equalsIgnoreCase(s))
322+
return kind;
323+
}
324+
throw new IllegalArgumentException(s);
325+
}
326+
327+
static Kind fromStringV3(String s) {
328+
for (Kind kind : Kind.values()) {
329+
if(kind.v3.equalsIgnoreCase(s))
330+
return kind;
331+
}
332+
throw new IllegalArgumentException(s);
333+
}
334+
}
304335

305336
public final String name;
306-
public final Kind kind;
337+
public Kind kind;
307338
public final int componentIndex;
308339
public final DataType dataType;
309340
public final boolean isReversed;
@@ -319,11 +350,15 @@ public enum Kind { PARTITION_KEY, CLUSTERING_KEY, REGULAR, COMPACT_VALUE, STATIC
319350
}
320351

321352
static Raw fromRow(Row row, VersionNumber version, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
322-
323353
String name = row.getString(COLUMN_NAME);
324-
Kind kind = version.getMajor() < 2 || row.isNull(KIND)
325-
? Kind.REGULAR
326-
: Enum.valueOf(Kind.class, row.getString(KIND).toUpperCase());
354+
Kind kind;
355+
if(version.getMajor() < 2 || row.isNull(KIND)) {
356+
kind = Kind.REGULAR;
357+
} else if (version.getMajor() < 3) {
358+
kind = Kind.fromStringV2(row.getString(KIND));
359+
} else {
360+
kind = Kind.fromStringV3(row.getString(KIND));
361+
}
327362
int componentIndex = row.isNull(COMPONENT_INDEX) ? 0 : row.getInt(COMPONENT_INDEX);
328363
String validatorStr = row.getString(VALIDATOR);
329364
boolean reversed = CassandraTypeParser.isReversed(validatorStr);

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 9 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,12 @@
3232
import com.datastax.driver.core.exceptions.DriverInternalError;
3333
import com.datastax.driver.core.exceptions.NoHostAvailableException;
3434

35-
import static com.datastax.driver.core.SchemaElement.*;
35+
import static com.datastax.driver.core.SchemaElement.KEYSPACE;
3636

3737
class ControlConnection {
3838

3939
private static final Logger logger = LoggerFactory.getLogger(ControlConnection.class);
4040

41-
private static final TypeCodec.ListCodec<String> LIST_OF_TEXT_CODEC = new TypeCodec.ListCodec<String>(TypeCodec.VarcharCodec.instance);
42-
4341
private static final InetAddress bindAllAddress;
4442
static
4543
{
@@ -50,13 +48,6 @@ class ControlConnection {
5048
}
5149
}
5250

53-
private static final String SELECT_KEYSPACES = "SELECT * FROM system.schema_keyspaces";
54-
private static final String SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies";
55-
private static final String SELECT_COLUMNS = "SELECT * FROM system.schema_columns";
56-
private static final String SELECT_USERTYPES = "SELECT * FROM system.schema_usertypes";
57-
private static final String SELECT_FUNCTIONS = "SELECT * FROM system.schema_functions";
58-
private static final String SELECT_AGGREGATES = "SELECT * FROM system.schema_aggregates";
59-
6051
private static final String SELECT_PEERS = "SELECT * FROM system.peers";
6152
private static final String SELECT_LOCAL = "SELECT * FROM system.local WHERE key='local'";
6253

@@ -270,7 +261,7 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
270261
// We want that because the token map was not properly initialized by the first call above, since it requires the list of keyspaces
271262
// to be loaded.
272263
logger.debug("[Control connection] Refreshing schema");
273-
refreshSchema(connection, null, null, null, null, cluster, isInitialConnection);
264+
refreshSchema(connection, null, null, null, null, cluster);
274265
return connection;
275266
} catch (BusyConnectionException e) {
276267
connection.closeAsync().force();
@@ -299,7 +290,7 @@ public void refreshSchema(SchemaElement targetType, String targetKeyspace, Strin
299290
// At startup, when we add the initial nodes, this will be null, which is ok
300291
if (c == null)
301292
return;
302-
refreshSchema(c, targetType, targetKeyspace, targetName, signature, cluster, false);
293+
refreshSchema(c, targetType, targetKeyspace, targetName, signature, cluster);
303294
} catch (ConnectionException e) {
304295
logger.debug("[Control connection] Connection error while refreshing schema ({})", e.getMessage());
305296
signalError();
@@ -314,7 +305,7 @@ public void refreshSchema(SchemaElement targetType, String targetKeyspace, Strin
314305
}
315306
}
316307

317-
static void refreshSchema(Connection connection, SchemaElement targetType, String targetKeyspace, String targetName, List<String> targetSignature, Cluster.Manager cluster, boolean isInitialConnection) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
308+
static void refreshSchema(Connection connection, SchemaElement targetType, String targetKeyspace, String targetName, List<String> targetSignature, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
318309
Host host = cluster.metadata.getHost(connection.address);
319310
// Neither host, nor it's version should be null. But instead of dying if there is a race or something, we can kind of try to infer
320311
// a Cassandra version from the protocol version (this is not full proof, we can have the protocol 1 against C* 2.0+, but it's worth
@@ -328,83 +319,17 @@ static void refreshSchema(Connection connection, SchemaElement targetType, Strin
328319
cassandraVersion = host.getCassandraVersion();
329320
}
330321

331-
// Make sure we're up to date on schema
332-
String whereClause = "";
333-
if (targetType != null) {
334-
whereClause = " WHERE keyspace_name = '" + targetKeyspace + '\'';
335-
if (targetType == TABLE)
336-
whereClause += " AND columnfamily_name = '" + targetName + '\'';
337-
else if (targetType == TYPE)
338-
whereClause += " AND type_name = '" + targetName + '\'';
339-
else if (targetType == FUNCTION)
340-
whereClause += " AND function_name = '" + targetName + "' AND signature = " + LIST_OF_TEXT_CODEC.format(targetSignature);
341-
else if (targetType == AGGREGATE)
342-
whereClause += " AND aggregate_name = '" + targetName + "' AND signature = " + LIST_OF_TEXT_CODEC.format(targetSignature);
343-
}
344-
345-
boolean isSchemaOrKeyspace = (targetType == null || targetType == KEYSPACE);
346-
DefaultResultSetFuture ksFuture = isSchemaOrKeyspace
347-
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_KEYSPACES + whereClause))
348-
: null;
349-
DefaultResultSetFuture udtFuture = (isSchemaOrKeyspace && supportsUdts(cassandraVersion) || targetType == TYPE)
350-
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_USERTYPES + whereClause))
351-
: null;
352-
DefaultResultSetFuture cfFuture = (isSchemaOrKeyspace || targetType == TABLE)
353-
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_COLUMN_FAMILIES + whereClause))
354-
: null;
355-
DefaultResultSetFuture colsFuture = (isSchemaOrKeyspace || targetType == TABLE)
356-
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_COLUMNS + whereClause))
357-
: null;
358-
DefaultResultSetFuture functionsFuture = (isSchemaOrKeyspace && supportsUdfs(cassandraVersion) || targetType == FUNCTION)
359-
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_FUNCTIONS + whereClause))
360-
: null;
361-
DefaultResultSetFuture aggregatesFuture = (isSchemaOrKeyspace && supportsUdfs(cassandraVersion) || targetType == AGGREGATE)
362-
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_AGGREGATES + whereClause))
363-
: null;
364-
365-
if (ksFuture != null)
366-
connection.write(ksFuture);
367-
if (udtFuture != null)
368-
connection.write(udtFuture);
369-
if (cfFuture != null)
370-
connection.write(cfFuture);
371-
if (colsFuture != null)
372-
connection.write(colsFuture);
373-
if (functionsFuture != null)
374-
connection.write(functionsFuture);
375-
if (aggregatesFuture != null)
376-
connection.write(aggregatesFuture);
377-
378-
try {
379-
cluster.metadata.rebuildSchema(targetType, targetKeyspace, targetName,
380-
ksFuture == null ? null : ksFuture.get(),
381-
udtFuture == null ? null : udtFuture.get(),
382-
cfFuture == null ? null : cfFuture.get(),
383-
colsFuture == null ? null : colsFuture.get(),
384-
functionsFuture == null ? null : functionsFuture.get(),
385-
aggregatesFuture == null ? null : aggregatesFuture.get(),
386-
cassandraVersion);
387-
} catch (RuntimeException e) {
388-
// Failure to parse the schema is definitively wrong so log a full-on error, but this won't generally prevent queries to
389-
// work and this can happen when new Cassandra versions modify stuff in the schema and the driver hasn't yet be modified.
390-
// So log, but let things go otherwise.
391-
logger.error("Error parsing schema from Cassandra system tables: the schema in Cluster#getMetadata() will appear incomplete or stale", e);
392-
}
322+
SchemaParser.forVersion(cassandraVersion)
323+
.refresh(cluster.metadata,
324+
targetType, targetKeyspace, targetName, targetSignature,
325+
connection, cassandraVersion);
393326

394327
// If we rebuild all from scratch or have an updated keyspace, rebuild the token map since some replication on some keyspace
395328
// may have changed
396-
if (isSchemaOrKeyspace)
329+
if ((targetType == null || targetType == KEYSPACE))
397330
refreshNodeListAndTokenMap(connection, cluster, false, false);
398331
}
399332

400-
private static boolean supportsUdts(VersionNumber cassandraVersion) {
401-
return cassandraVersion.getMajor() > 2 || (cassandraVersion.getMajor() == 2 && cassandraVersion.getMinor() >= 1);
402-
}
403-
404-
private static boolean supportsUdfs(VersionNumber cassandraVersion) {
405-
return cassandraVersion.getMajor() > 2 || (cassandraVersion.getMajor() == 2 && cassandraVersion.getMinor() >= 2);
406-
}
407-
408333
public void refreshNodeListAndTokenMap() {
409334
Connection c = connectionRef.get();
410335
// At startup, when we add the initial nodes, this will be null, which is ok

driver-core/src/main/java/com/datastax/driver/core/DefaultResultSetFuture.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import com.datastax.driver.core.exceptions.*;
2828

2929
import static com.datastax.driver.core.SchemaElement.KEYSPACE;
30-
import static com.datastax.driver.core.SchemaElement.TABLE;
31-
import static com.datastax.driver.core.SchemaElement.TYPE;
3230

3331
/**
3432
* Internal implementation of ResultSetFuture.
@@ -83,7 +81,7 @@ public void onSet(Connection connection, Message.Response response, ExecutionInf
8381
if (scc.targetType == KEYSPACE) {
8482
session.cluster.manager.metadata.removeKeyspace(scc.targetKeyspace);
8583
} else {
86-
KeyspaceMetadata keyspace = session.cluster.manager.metadata.getKeyspaceInternal(scc.targetKeyspace);
84+
KeyspaceMetadata keyspace = session.cluster.manager.metadata.keyspaces.get(scc.targetKeyspace);
8785
if (keyspace == null) {
8886
logger.warn("Received a DROPPED notification for {} {}.{}, but this keyspace is unknown in our metadata",
8987
scc.targetType, scc.targetKeyspace, scc.targetName);

driver-core/src/main/java/com/datastax/driver/core/KeyspaceMetadata.java

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
*/
1616
package com.datastax.driver.core;
1717

18-
import java.util.*;
18+
import java.util.Collection;
19+
import java.util.Collections;
20+
import java.util.HashMap;
21+
import java.util.Map;
1922
import java.util.concurrent.ConcurrentHashMap;
2023

2124
import com.google.common.annotations.VisibleForTesting;
@@ -26,10 +29,11 @@
2629
*/
2730
public class KeyspaceMetadata {
2831

29-
public static final String KS_NAME = "keyspace_name";
32+
public static final String KS_NAME = "keyspace_name";
3033
private static final String DURABLE_WRITES = "durable_writes";
3134
private static final String STRATEGY_CLASS = "strategy_class";
3235
private static final String STRATEGY_OPTIONS = "strategy_options";
36+
private static final String REPLICATION = "replication";
3337

3438
private final String name;
3539
private final boolean durableWrites;
@@ -50,23 +54,20 @@ public class KeyspaceMetadata {
5054
this.strategy = ReplicationStrategy.create(replication);
5155
}
5256

53-
static KeyspaceMetadata build(Row row, List<Row> udtRows, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
54-
57+
static KeyspaceMetadata buildV2(Row row) {
5558
String name = row.getString(KS_NAME);
5659
boolean durableWrites = row.getBool(DURABLE_WRITES);
57-
58-
Map<String, String> replicationOptions = new HashMap<String, String>();
60+
Map<String, String> replicationOptions;
61+
replicationOptions = new HashMap<String, String>();
5962
replicationOptions.put("class", row.getString(STRATEGY_CLASS));
6063
replicationOptions.putAll(SimpleJSONParser.parseStringMap(row.getString(STRATEGY_OPTIONS)));
64+
return new KeyspaceMetadata(name, durableWrites, replicationOptions);
65+
}
6166

62-
KeyspaceMetadata ksm = new KeyspaceMetadata(name, durableWrites, replicationOptions);
63-
64-
if (udtRows == null)
65-
return ksm;
66-
67-
ksm.addUserTypes(udtRows, protocolVersion, codecRegistry);
68-
69-
return ksm;
67+
static KeyspaceMetadata buildV3(Row row) {
68+
String name = row.getString(KS_NAME);
69+
boolean durableWrites = row.getBool(DURABLE_WRITES);
70+
return new KeyspaceMetadata(name, durableWrites, row.getMap(REPLICATION, String.class, String.class));
7071
}
7172

7273
/**
@@ -143,13 +144,6 @@ public Collection<UserType> getUserTypes() {
143144
return Collections.unmodifiableCollection(userTypes.values());
144145
}
145146

146-
void addUserTypes(List<Row> udtRows, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
147-
for (Row r : udtRows) {
148-
UserType def = UserType.build(r, protocolVersion, codecRegistry);
149-
userTypes.put(def.getTypeName(), def);
150-
}
151-
}
152-
153147
void removeUserType(String userType) {
154148
userTypes.remove(userType);
155149
}
@@ -305,6 +299,10 @@ void add(AggregateMetadata aggregate) {
305299
aggregates.put(aggregate.getFullName(), aggregate);
306300
}
307301

302+
void add(UserType userType) {
303+
userTypes.put(userType.getTypeName(), userType);
304+
}
305+
308306
ReplicationStrategy replicationStrategy() {
309307
return strategy;
310308
}

0 commit comments

Comments
 (0)