Skip to content

Commit 5009c79

Browse files
author
Alexandre Dutra
committed
JAVA-936: Adapt schema metadata parsing logic to new storage format of CQL types in C* 3.0.
This commit also adds lazy evaluation of UDTs. It also migrates the 'initcond' column type in table 'system_schema.aggregates' from blob to varchar (see See CASSANDRA-10650).
1 parent 57932be commit 5009c79

24 files changed

Lines changed: 1155 additions & 312 deletions

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- [improvement] Pass the authenticator name from the server to the auth provider (JAVA-885)
99
- [improvement] Raise an exception when an older version of guava (<16.01) is found (JAVA-961)
1010
- [bug] TypeCodec.parse() implementations should be case insensitive when checking for keyword NULL (JAVA-972)
11+
- [improvement] Adapt schema metadata parsing logic to new storage format of CQL types in C* 3.0 (JAVA-936)
1112

1213
### 3.0.0-alpha4
1314

driver-core/src/main/java/com/datastax/driver/core/AggregateMetadata.java

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private AggregateMetadata(KeyspaceMetadata keyspace, String fullName, String sim
5858
this.stateTypeCodec = stateTypeCodec;
5959
}
6060

61+
// Cassandra < 3.0:
6162
// CREATE TABLE system.schema_aggregates (
6263
// keyspace_name text,
6364
// aggregate_name text,
@@ -70,42 +71,74 @@ private AggregateMetadata(KeyspaceMetadata keyspace, String fullName, String sim
7071
// state_type text,
7172
// PRIMARY KEY (keyspace_name, aggregate_name, signature)
7273
// ) WITH CLUSTERING ORDER BY (aggregate_name ASC, signature ASC)
73-
static AggregateMetadata build(KeyspaceMetadata ksm, Row row, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
74+
//
75+
// Cassandra >= 3.0:
76+
// CREATE TABLE system.schema_aggregates (
77+
// keyspace_name text,
78+
// aggregate_name text,
79+
// argument_types frozen<list<text>>,
80+
// final_func text,
81+
// initcond text,
82+
// return_type text,
83+
// state_func text,
84+
// state_type text,
85+
// PRIMARY KEY (keyspace_name, aggregate_name, argument_types)
86+
// ) WITH CLUSTERING ORDER BY (aggregate_name ASC, argument_types ASC)
87+
static AggregateMetadata build(KeyspaceMetadata ksm, Row row, VersionNumber version, Cluster cluster) {
88+
CodecRegistry codecRegistry = cluster.getConfiguration().getCodecRegistry();
89+
ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
7490
String simpleName = row.getString("aggregate_name");
75-
List<String> signature = row.getList("signature", String.class);
76-
String fullName = Metadata.fullFunctionName(simpleName, signature);
77-
List<DataType> argumentTypes = parseTypes(row.getList("argument_types", String.class), protocolVersion, codecRegistry);
91+
List<DataType> argumentTypes = parseTypes(ksm, row.getList("argument_types", String.class), version, cluster);
92+
String fullName = Metadata.fullFunctionName(simpleName, argumentTypes);
7893
String finalFuncSimpleName = row.getString("final_func");
79-
DataType returnType = CassandraTypeParser.parseOne(row.getString("return_type"), protocolVersion, codecRegistry);
94+
DataType returnType;
95+
if(version.getMajor() >= 3) {
96+
returnType = DataTypeParser.parse(row.getString("return_type"), cluster, ksm.getName(), ksm.userTypes, false);
97+
} else {
98+
returnType = CassandraTypeParser.parseOne(row.getString("return_type"), protocolVersion, codecRegistry);
99+
}
80100
String stateFuncSimpleName = row.getString("state_func");
81101
String stateTypeName = row.getString("state_type");
82-
DataType stateType = CassandraTypeParser.parseOne(stateTypeName, protocolVersion, codecRegistry);
83-
ByteBuffer rawInitCond = row.getBytes("initcond");
84-
Object initCond = rawInitCond == null ? null : codecRegistry.codecFor(stateType).deserialize(rawInitCond, protocolVersion);
102+
DataType stateType;
103+
Object initCond;
104+
if(version.getMajor() >= 3) {
105+
stateType = DataTypeParser.parse(stateTypeName, cluster, ksm.getName(), ksm.userTypes, false);
106+
String rawInitCond = row.getString("initcond");
107+
initCond = rawInitCond == null ? null : codecRegistry.codecFor(stateType).parse(rawInitCond);
108+
} else {
109+
stateType = CassandraTypeParser.parseOne(stateTypeName, protocolVersion, codecRegistry);
110+
ByteBuffer rawInitCond = row.getBytes("initcond");
111+
initCond = rawInitCond == null ? null : codecRegistry.codecFor(stateType).deserialize(rawInitCond, protocolVersion);
112+
}
85113

86114
String finalFuncFullName = finalFuncSimpleName == null ? null : String.format("%s(%s)", finalFuncSimpleName, stateType);
87-
String stateFuncFullName = makeStateFuncFullName(stateFuncSimpleName, stateType.toString(), signature);
115+
String stateFuncFullName = makeStateFuncFullName(stateFuncSimpleName, stateType, argumentTypes);
88116

89-
AggregateMetadata aggregate = new AggregateMetadata(ksm, fullName, simpleName, argumentTypes,
117+
return new AggregateMetadata(ksm, fullName, simpleName, argumentTypes,
90118
finalFuncSimpleName, finalFuncFullName, initCond, returnType, stateFuncSimpleName,
91119
stateFuncFullName, stateType, codecRegistry.codecFor(stateType));
92-
ksm.add(aggregate);
93-
return aggregate;
94120
}
95121

96-
private static String makeStateFuncFullName(String stateFuncSimpleName, String stateTypeName, List<String> typeNames) {
97-
List<String> args = Lists.newArrayList(stateTypeName);
98-
args.addAll(typeNames);
122+
private static String makeStateFuncFullName(String stateFuncSimpleName, DataType stateType, List<DataType> argumentTypes) {
123+
List<DataType> args = Lists.newArrayList(stateType);
124+
args.addAll(argumentTypes);
99125
return Metadata.fullFunctionName(stateFuncSimpleName, args);
100126
}
101127

102-
private static List<DataType> parseTypes(List<String> names, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
103-
if (names.isEmpty())
128+
private static List<DataType> parseTypes(KeyspaceMetadata ksm, List<String> types, VersionNumber version, Cluster cluster) {
129+
if (types.isEmpty())
104130
return Collections.emptyList();
105131

132+
CodecRegistry codecRegistry = cluster.getConfiguration().getCodecRegistry();
133+
ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
106134
ImmutableList.Builder<DataType> builder = ImmutableList.builder();
107-
for (String name : names) {
108-
DataType type = CassandraTypeParser.parseOne(name, protocolVersion, codecRegistry);
135+
for (String name : types) {
136+
DataType type;
137+
if (version.getMajor() >= 3) {
138+
type = DataTypeParser.parse(name, cluster, ksm.getName(), ksm.userTypes, false);
139+
} else {
140+
type = CassandraTypeParser.parseOne(name, protocolVersion, codecRegistry);
141+
}
109142
builder.add(type);
110143
}
111144
return builder.build();

driver-core/src/main/java/com/datastax/driver/core/CodecRegistry.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,8 @@ private <T> TypeCodec<T> lookupCodec(DataType cqlType, TypeToken<T> javaType) {
499499
throw (CodecNotFoundException) e.getCause();
500500
}
501501
throw new CodecNotFoundException(e.getCause(), cqlType, javaType);
502+
} catch (RuntimeException e) {
503+
throw new CodecNotFoundException(e.getCause(), cqlType, javaType);
502504
} catch (ExecutionException e) {
503505
throw new CodecNotFoundException(e.getCause(), cqlType, javaType);
504506
}

driver-core/src/main/java/com/datastax/driver/core/CodecUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ static void setCodecRegistry(DataType cqlType, CodecRegistry codecRegistry) {
8080
if(cqlType instanceof UserType) {
8181
UserType userType = (UserType)cqlType;
8282
userType.setCodecRegistry(codecRegistry);
83-
for (UserType.Field field : userType.byIdx) {
83+
for (UserType.Field field : userType.getFields()) {
8484
setCodecRegistry(field.getType(), codecRegistry);
8585
}
8686
}

driver-core/src/main/java/com/datastax/driver/core/ColumnMetadata.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ private ColumnMetadata(TableOrView parent, String name, DataType type, boolean i
5656
this.isStatic = isStatic;
5757
}
5858

59-
static ColumnMetadata fromRaw(TableOrView tm, Raw raw) {
60-
return new ColumnMetadata(tm, raw.name, raw.dataType, raw.kind == Raw.Kind.STATIC);
59+
static ColumnMetadata fromRaw(TableOrView tm, Raw raw, DataType dataType) {
60+
return new ColumnMetadata(tm, raw.name, dataType, raw.kind == Raw.Kind.STATIC);
6161
}
6262

6363
static ColumnMetadata forAlias(TableMetadata tm, String name, DataType type) {
@@ -165,20 +165,20 @@ static Kind fromStringV3(String s) {
165165
public final String name;
166166
public Kind kind;
167167
public final int position;
168-
public final DataType dataType;
168+
public final String dataType;
169169
public final boolean isReversed;
170170

171171
public final Map<String, String> indexColumns = new HashMap<String, String>();
172172

173-
Raw(String name, Kind kind, int position, DataType dataType, boolean isReversed) {
173+
Raw(String name, Kind kind, int position, String dataType, boolean isReversed) {
174174
this.name = name;
175175
this.kind = kind;
176176
this.position = position;
177177
this.dataType = dataType;
178178
this.isReversed = isReversed;
179179
}
180180

181-
static Raw fromRow(Row row, VersionNumber version, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
181+
static Raw fromRow(Row row, VersionNumber version) {
182182
String name = row.getString(COLUMN_NAME);
183183

184184
Kind kind;
@@ -198,26 +198,26 @@ static Raw fromRow(Row row, VersionNumber version, ProtocolVersion protocolVersi
198198
position = row.isNull(COMPONENT_INDEX) ? 0 : row.getInt(COMPONENT_INDEX);
199199
}
200200

201-
String dataTypeStr;
201+
String dataType;
202202
boolean reversed;
203203
if(version.getMajor() >= 3) {
204-
dataTypeStr = row.getString(TYPE);
204+
dataType = row.getString(TYPE);
205205
String clusteringOrderStr = row.getString(CLUSTERING_ORDER);
206206
reversed = clusteringOrderStr.equals(DESC);
207207
} else {
208-
dataTypeStr = row.getString(VALIDATOR);
209-
reversed = CassandraTypeParser.isReversed(dataTypeStr);
208+
dataType = row.getString(VALIDATOR);
209+
reversed = CassandraTypeParser.isReversed(dataType);
210210
}
211-
DataType dataType = CassandraTypeParser.parseOne(dataTypeStr, protocolVersion, codecRegistry);
212211

213212
Raw c = new Raw(name, kind, position, dataType, reversed);
214213

215214
// secondary indexes (C* < 3.0.0)
216215
// from C* 3.0 onwards 2i are defined in a separate table
217-
for (String str : Arrays.asList(INDEX_TYPE, INDEX_NAME, INDEX_OPTIONS))
218-
if (row.getColumnDefinitions().contains(str) && !row.isNull(str))
219-
c.indexColumns.put(str, row.getString(str));
220-
216+
if(version.getMajor() < 3) {
217+
for (String str : Arrays.asList(INDEX_TYPE, INDEX_NAME, INDEX_OPTIONS))
218+
if (row.getColumnDefinitions().contains(str) && !row.isNull(str))
219+
c.indexColumns.put(str, row.getString(str));
220+
}
221221
return c;
222222
}
223223
}

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ static void refreshSchema(Connection connection, SchemaElement targetType, Strin
327327
}
328328

329329
SchemaParser.forVersion(cassandraVersion)
330-
.refresh(cluster.metadata,
330+
.refresh(cluster.getCluster(),
331331
targetType, targetKeyspace, targetName, targetSignature,
332332
connection, cassandraVersion);
333333
}

0 commit comments

Comments
 (0)