Skip to content

Commit db16d19

Browse files
author
Alexandre Dutra
committed
JAVA-936: Adapt schema metadata parsing logic to new storage format of CQL types in C* 3.0.
1 parent 3a65d47 commit db16d19

14 files changed

Lines changed: 684 additions & 279 deletions

driver-core/src/main/java/com/datastax/driver/core/AggregateMetadata.java

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private AggregateMetadata(KeyspaceMetadata keyspace, String fullName, String sim
5858
this.stateTypeCodec = stateTypeCodec;
5959
}
6060

61+
// Cassandra < 3.0:
6162
// CREATE TABLE system.schema_aggregates (
6263
// keyspace_name text,
6364
// aggregate_name text,
@@ -70,42 +71,76 @@ private AggregateMetadata(KeyspaceMetadata keyspace, String fullName, String sim
7071
// state_type text,
7172
// PRIMARY KEY (keyspace_name, aggregate_name, signature)
7273
// ) WITH CLUSTERING ORDER BY (aggregate_name ASC, signature ASC)
73-
static AggregateMetadata build(KeyspaceMetadata ksm, Row row, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
74+
//
75+
// Cassandra >= 3.0:
76+
// CREATE TABLE system.schema_aggregates (
77+
// keyspace_name text,
78+
// aggregate_name text,
79+
// argument_types frozen<list<text>>,
80+
// final_func text,
81+
// initcond text,
82+
// return_type text,
83+
// state_func text,
84+
// state_type text,
85+
// PRIMARY KEY (keyspace_name, aggregate_name, argument_types)
86+
// ) WITH CLUSTERING ORDER BY (aggregate_name ASC, argument_types ASC)
87+
static AggregateMetadata build(KeyspaceMetadata ksm, Row row, VersionNumber version, Cluster cluster) {
88+
CodecRegistry codecRegistry = cluster.getConfiguration().getCodecRegistry();
89+
ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
7490
String simpleName = row.getString("aggregate_name");
75-
List<String> signature = row.getList("signature", String.class);
76-
String fullName = Metadata.fullFunctionName(simpleName, signature);
77-
List<DataType> argumentTypes = parseTypes(row.getList("argument_types", String.class), protocolVersion, codecRegistry);
91+
List<DataType> argumentTypes = parseTypes(row.getList("argument_types", String.class), version, cluster, ksm);
92+
String fullName = Metadata.fullFunctionName(simpleName, argumentTypes);
7893
String finalFuncSimpleName = row.getString("final_func");
79-
DataType returnType = CassandraTypeParser.parseOne(row.getString("return_type"), protocolVersion, codecRegistry);
94+
DataType returnType;
95+
if(version.getMajor() >= 3) {
96+
returnType = DataTypeParser.parse(row.getString("return_type"), cluster.getMetadata(), ksm.getName(), false);
97+
} else {
98+
returnType = CassandraTypeParser.parseOne(row.getString("return_type"), protocolVersion, codecRegistry);
99+
}
80100
String stateFuncSimpleName = row.getString("state_func");
81101
String stateTypeName = row.getString("state_type");
82-
DataType stateType = CassandraTypeParser.parseOne(stateTypeName, protocolVersion, codecRegistry);
83-
ByteBuffer rawInitCond = row.getBytes("initcond");
84-
Object initCond = rawInitCond == null ? null : codecRegistry.codecFor(stateType).deserialize(rawInitCond, protocolVersion);
102+
DataType stateType;
103+
Object initCond;
104+
if(version.getMajor() >= 3) {
105+
stateType = DataTypeParser.parse(stateTypeName, cluster.getMetadata(), ksm.getName(), false);
106+
String rawInitCond = row.getString("initcond");
107+
initCond = rawInitCond == null ? null : codecRegistry.codecFor(stateType).parse(rawInitCond);
108+
} else {
109+
stateType = CassandraTypeParser.parseOne(stateTypeName, protocolVersion, codecRegistry);
110+
ByteBuffer rawInitCond = row.getBytes("initcond");
111+
initCond = rawInitCond == null ? null : codecRegistry.codecFor(stateType).deserialize(rawInitCond, protocolVersion);
112+
}
85113

86114
String finalFuncFullName = finalFuncSimpleName == null ? null : String.format("%s(%s)", finalFuncSimpleName, stateType);
87-
String stateFuncFullName = makeStateFuncFullName(stateFuncSimpleName, stateType.toString(), signature);
115+
String stateFuncFullName = makeStateFuncFullName(stateFuncSimpleName, stateType, argumentTypes);
88116

89117
AggregateMetadata aggregate = new AggregateMetadata(ksm, fullName, simpleName, argumentTypes,
90118
finalFuncSimpleName, finalFuncFullName, initCond, returnType, stateFuncSimpleName,
91119
stateFuncFullName, stateType, codecRegistry.codecFor(stateType));
92-
ksm.add(aggregate);
93120
return aggregate;
94121
}
95122

96-
private static String makeStateFuncFullName(String stateFuncSimpleName, String stateTypeName, List<String> typeNames) {
97-
List<String> args = Lists.newArrayList(stateTypeName);
98-
args.addAll(typeNames);
123+
private static String makeStateFuncFullName(String stateFuncSimpleName, DataType stateType, List<DataType> argumentTypes) {
124+
List<DataType> args = Lists.newArrayList(stateType);
125+
args.addAll(argumentTypes);
99126
return Metadata.fullFunctionName(stateFuncSimpleName, args);
100127
}
101128

102-
private static List<DataType> parseTypes(List<String> names, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
103-
if (names.isEmpty())
129+
private static List<DataType> parseTypes(List<String> types, VersionNumber version, Cluster cluster, KeyspaceMetadata ksm) {
130+
if (types.isEmpty())
104131
return Collections.emptyList();
105132

133+
Metadata metadata = cluster.getMetadata();
134+
CodecRegistry codecRegistry = cluster.getConfiguration().getCodecRegistry();
135+
ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
106136
ImmutableList.Builder<DataType> builder = ImmutableList.builder();
107-
for (String name : names) {
108-
DataType type = CassandraTypeParser.parseOne(name, protocolVersion, codecRegistry);
137+
for (String name : types) {
138+
DataType type;
139+
if (version.getMajor() >= 3) {
140+
type = DataTypeParser.parse(name, metadata, ksm.getName(), false);
141+
} else {
142+
type = CassandraTypeParser.parseOne(name, protocolVersion, codecRegistry);
143+
}
109144
builder.add(type);
110145
}
111146
return builder.build();

driver-core/src/main/java/com/datastax/driver/core/ColumnMetadata.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ static Kind fromStringV3(String s) {
178178
this.isReversed = isReversed;
179179
}
180180

181-
static Raw fromRow(Row row, VersionNumber version, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
181+
static Raw fromRow(Row row, VersionNumber version, Cluster cluster, String currentKeyspace) {
182182
String name = row.getString(COLUMN_NAME);
183183

184184
Kind kind;
@@ -198,26 +198,30 @@ static Raw fromRow(Row row, VersionNumber version, ProtocolVersion protocolVersi
198198
position = row.isNull(COMPONENT_INDEX) ? 0 : row.getInt(COMPONENT_INDEX);
199199
}
200200

201-
String dataTypeStr;
201+
DataType dataType;
202202
boolean reversed;
203203
if(version.getMajor() >= 3) {
204-
dataTypeStr = row.getString(TYPE);
204+
String dataTypeStr = row.getString(TYPE);
205+
dataType = DataTypeParser.parse(dataTypeStr, cluster.getMetadata(), currentKeyspace, false);
205206
String clusteringOrderStr = row.getString(CLUSTERING_ORDER);
206207
reversed = clusteringOrderStr.equals(DESC);
207208
} else {
208-
dataTypeStr = row.getString(VALIDATOR);
209+
String dataTypeStr = row.getString(VALIDATOR);
210+
ProtocolVersion protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
211+
CodecRegistry codecRegistry = cluster.getConfiguration().getCodecRegistry();
212+
dataType = CassandraTypeParser.parseOne(dataTypeStr, protocolVersion, codecRegistry);
209213
reversed = CassandraTypeParser.isReversed(dataTypeStr);
210214
}
211-
DataType dataType = CassandraTypeParser.parseOne(dataTypeStr, protocolVersion, codecRegistry);
212215

213216
Raw c = new Raw(name, kind, position, dataType, reversed);
214217

215218
// secondary indexes (C* < 3.0.0)
216219
// from C* 3.0 onwards 2i are defined in a separate table
217-
for (String str : Arrays.asList(INDEX_TYPE, INDEX_NAME, INDEX_OPTIONS))
218-
if (row.getColumnDefinitions().contains(str) && !row.isNull(str))
219-
c.indexColumns.put(str, row.getString(str));
220-
220+
if(version.getMajor() < 3) {
221+
for (String str : Arrays.asList(INDEX_TYPE, INDEX_NAME, INDEX_OPTIONS))
222+
if (row.getColumnDefinitions().contains(str) && !row.isNull(str))
223+
c.indexColumns.put(str, row.getString(str));
224+
}
221225
return c;
222226
}
223227
}

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ static void refreshSchema(Connection connection, SchemaElement targetType, Strin
327327
}
328328

329329
SchemaParser.forVersion(cassandraVersion)
330-
.refresh(cluster.metadata,
330+
.refresh(cluster.getCluster(),
331331
targetType, targetKeyspace, targetName, targetSignature,
332332
connection, cassandraVersion);
333333
}

0 commit comments

Comments
 (0)