Skip to content

Commit e48ca9e

Browse files
committed
JAVA-2730: Add support for Cassandra 4.0 table options
1 parent 18abff3 commit e48ca9e

14 files changed

Lines changed: 392 additions & 37 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
- [bug] JAVA-2627: Avoid logging error message including stack trace in request handler.
1111
- [new feature] JAVA-2706: Add now_in_seconds to protocol v5 query messages.
12+
- [improvement] JAVA-2730: Add support for Cassandra® 4.0 table options
1213

1314

1415
## 3.8.0

driver-core/src/main/java/com/datastax/driver/core/AbstractTableMetadata.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,14 +258,22 @@ protected StringBuilder appendOptions(StringBuilder sb, boolean formatted) {
258258
sb.append("WITH ");
259259
if (options.isCompactStorage()) and(sb.append("COMPACT STORAGE"), formatted);
260260
if (!clusteringOrder.isEmpty()) and(appendClusteringOrder(sb), formatted);
261-
sb.append("read_repair_chance = ").append(options.getReadRepairChance());
262-
and(sb, formatted)
263-
.append("dclocal_read_repair_chance = ")
264-
.append(options.getLocalReadRepairChance());
261+
if (cassandraVersion.getMajor() < 4)
262+
sb.append("read_repair_chance = ").append(options.getReadRepairChance());
263+
else sb.append("read_repair = '").append(options.getReadRepair()).append('\'');
264+
if (cassandraVersion.getMajor() < 4)
265+
and(sb, formatted)
266+
.append("dclocal_read_repair_chance = ")
267+
.append(options.getLocalReadRepairChance());
265268
if (cassandraVersion.getMajor() < 2
266269
|| (cassandraVersion.getMajor() == 2 && cassandraVersion.getMinor() == 0))
267270
and(sb, formatted).append("replicate_on_write = ").append(options.getReplicateOnWrite());
268271
and(sb, formatted).append("gc_grace_seconds = ").append(options.getGcGraceInSeconds());
272+
if (cassandraVersion.getMajor() > 3)
273+
and(sb, formatted)
274+
.append("additional_write_policy = '")
275+
.append(options.getAdditionalWritePolicy())
276+
.append('\'');
269277
and(sb, formatted)
270278
.append("bloom_filter_fp_chance = ")
271279
.append(options.getBloomFilterFalsePositiveChance());

driver-core/src/main/java/com/datastax/driver/core/TableOptionsMetadata.java

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
public class TableOptionsMetadata {
2424

2525
private static final String COMMENT = "comment";
26-
private static final String READ_REPAIR = "read_repair_chance";
27-
private static final String DCLOCAL_READ_REPAIR = "dclocal_read_repair_chance";
28-
private static final String LOCAL_READ_REPAIR = "local_read_repair_chance";
26+
private static final String READ_REPAIR_CHANCE = "read_repair_chance";
27+
private static final String DCLOCAL_READ_REPAIR_CHANCE = "dclocal_read_repair_chance";
28+
private static final String READ_REPAIR = "read_repair";
29+
private static final String LOCAL_READ_REPAIR_CHANCE = "local_read_repair_chance";
2930
private static final String REPLICATE_ON_WRITE = "replicate_on_write";
3031
private static final String GC_GRACE = "gc_grace_seconds";
3132
private static final String BF_FP_CHANCE = "bloom_filter_fp_chance";
@@ -45,6 +46,7 @@ public class TableOptionsMetadata {
4546
private static final String CRC_CHECK_CHANCE = "crc_check_chance";
4647
private static final String EXTENSIONS = "extensions";
4748
private static final String CDC = "cdc";
49+
private static final String ADDITIONAL_WRITE_POLICY = "additional_write_policy";
4850

4951
private static final boolean DEFAULT_REPLICATE_ON_WRITE = true;
5052
private static final double DEFAULT_BF_FP_CHANCE = 0.01;
@@ -57,12 +59,15 @@ public class TableOptionsMetadata {
5759
private static final int DEFAULT_MAX_INDEX_INTERVAL = 2048;
5860
private static final double DEFAULT_CRC_CHECK_CHANCE = 1.0;
5961
private static final boolean DEFAULT_CDC = false;
62+
private static final String DEFAULT_READ_REPAIR = "BLOCKING";
63+
private static final String DEFAULT_ADDITIONAL_WRITE_POLICY = "99p";
6064

6165
private final boolean isCompactStorage;
6266

6367
private final String comment;
64-
private final double readRepair;
65-
private final double localReadRepair;
68+
private final double readRepairChance;
69+
private final double localReadRepairChance;
70+
private final String readRepair;
6671
private final boolean replicateOnWrite;
6772
private final int gcGrace;
6873
private final double bfFpChance;
@@ -79,6 +84,7 @@ public class TableOptionsMetadata {
7984
private final Double crcCheckChance;
8085
private final Map<String, ByteBuffer> extensions;
8186
private final boolean cdc;
87+
private final String additionalWritePolicy;
8288

8389
TableOptionsMetadata(Row row, boolean isCompactStorage, VersionNumber version) {
8490

@@ -92,10 +98,13 @@ public class TableOptionsMetadata {
9298

9399
this.isCompactStorage = isCompactStorage;
94100
this.comment = isNullOrAbsent(row, COMMENT) ? "" : row.getString(COMMENT);
95-
this.readRepair = row.getDouble(READ_REPAIR);
101+
this.readRepairChance = row.getDouble(READ_REPAIR_CHANCE);
96102

97-
if (is300OrHigher) this.localReadRepair = row.getDouble(DCLOCAL_READ_REPAIR);
98-
else this.localReadRepair = row.getDouble(LOCAL_READ_REPAIR);
103+
if (is400OrHigher) this.readRepair = row.getString(READ_REPAIR);
104+
else this.readRepair = DEFAULT_READ_REPAIR;
105+
106+
if (is300OrHigher) this.localReadRepairChance = row.getDouble(DCLOCAL_READ_REPAIR_CHANCE);
107+
else this.localReadRepairChance = row.getDouble(LOCAL_READ_REPAIR_CHANCE);
99108

100109
this.replicateOnWrite =
101110
is210OrHigher || isNullOrAbsent(row, REPLICATE_ON_WRITE)
@@ -176,6 +185,9 @@ public class TableOptionsMetadata {
176185

177186
if (is380OrHigher) this.cdc = isNullOrAbsent(row, CDC) ? DEFAULT_CDC : row.getBool(CDC);
178187
else this.cdc = DEFAULT_CDC;
188+
189+
if (is400OrHigher) this.additionalWritePolicy = row.getString(ADDITIONAL_WRITE_POLICY);
190+
else this.additionalWritePolicy = DEFAULT_ADDITIONAL_WRITE_POLICY;
179191
}
180192

181193
private static boolean isNullOrAbsent(Row row, String name) {
@@ -206,6 +218,19 @@ public String getComment() {
206218
* @return the read repair chance set for table (in [0.0, 1.0]).
207219
*/
208220
public double getReadRepairChance() {
221+
return readRepairChance;
222+
}
223+
224+
/**
225+
* Returns the read_repair option for this table. <b>NOTE:</b> this is a Cassandra® 4.0 and newer
226+
* option (described here: <a
227+
* href="http://cassandra.apache.org/doc/latest/operating/read_repair.html">
228+
* http://cassandra.apache.org/doc/latest/operating/read_repair.html</a>). Possible values are
229+
* {@code BLOCKING} or {@code NONE}, with the default being {@code BLOCKING}.
230+
*
231+
* @return the read repair option (either {@code BLOCKING} or {@code NONE}).
232+
*/
233+
public String getReadRepair() {
209234
return readRepair;
210235
}
211236

@@ -215,7 +240,7 @@ public double getReadRepairChance() {
215240
* @return the local read repair chance set for table (in [0.0, 1.0]).
216241
*/
217242
public double getLocalReadRepairChance() {
218-
return localReadRepair;
243+
return localReadRepairChance;
219244
}
220245

221246
/**
@@ -396,6 +421,17 @@ public boolean isCDC() {
396421
return cdc;
397422
}
398423

424+
/**
425+
* The threshold at which a cheap quorum write will be upgraded to include transient replicas.
426+
*
427+
* <p>This option is only available in Cassandra® 4.0 and above. Default value is {@code 99p}.
428+
*
429+
* @return The additional write policy for this table (ex. '99p').
430+
*/
431+
public String getAdditionalWritePolicy() {
432+
return additionalWritePolicy;
433+
}
434+
399435
@Override
400436
public boolean equals(Object other) {
401437
if (other == this) return true;
@@ -404,8 +440,9 @@ public boolean equals(Object other) {
404440
TableOptionsMetadata that = (TableOptionsMetadata) other;
405441
return this.isCompactStorage == that.isCompactStorage
406442
&& MoreObjects.equal(this.comment, that.comment)
407-
&& this.readRepair == that.readRepair
408-
&& this.localReadRepair == that.localReadRepair
443+
&& this.readRepairChance == that.readRepairChance
444+
&& this.localReadRepairChance == that.localReadRepairChance
445+
&& MoreObjects.equal(this.readRepair, that.readRepair)
409446
&& this.replicateOnWrite == that.replicateOnWrite
410447
&& this.gcGrace == that.gcGrace
411448
&& this.bfFpChance == that.bfFpChance
@@ -421,6 +458,7 @@ public boolean equals(Object other) {
421458
&& MoreObjects.equal(this.compaction, that.compaction)
422459
&& MoreObjects.equal(this.compression, that.compression)
423460
&& MoreObjects.equal(this.crcCheckChance, that.crcCheckChance)
461+
&& MoreObjects.equal(this.additionalWritePolicy, that.additionalWritePolicy)
424462
&& MoreObjects.equal(this.extensions, that.extensions);
425463
}
426464

@@ -429,8 +467,9 @@ public int hashCode() {
429467
return MoreObjects.hashCode(
430468
isCompactStorage,
431469
comment,
470+
readRepairChance,
471+
localReadRepairChance,
432472
readRepair,
433-
localReadRepair,
434473
replicateOnWrite,
435474
gcGrace,
436475
bfFpChance,
@@ -446,6 +485,7 @@ public int hashCode() {
446485
compression,
447486
crcCheckChance,
448487
extensions,
449-
cdc);
488+
cdc,
489+
additionalWritePolicy);
450490
}
451491
}

driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,14 @@ public CCMBridge build() {
10471047
cassandraConfiguration.remove("rpc_port");
10481048
cassandraConfiguration.remove("thrift_prepared_statements_cache_size_mb");
10491049
}
1050+
if (isMaterializedViewsDisabledByDefault(cassandraVersion)) {
1051+
// enable materialized views
1052+
cassandraConfiguration.put("enable_materialized_views", true);
1053+
}
1054+
if (isSasiConfigEnablementRequired(cassandraVersion)) {
1055+
// enable SASI indexing in config (disabled by default in C* 4.0)
1056+
cassandraConfiguration.put("enable_sasi_indexes", true);
1057+
}
10501058
final CCMBridge ccm =
10511059
new CCMBridge(
10521060
clusterName,
@@ -1089,7 +1097,16 @@ public void run() {
10891097
}
10901098

10911099
private static boolean isThriftSupported(VersionNumber cassandraVersion) {
1092-
return cassandraVersion.compareTo(VersionNumber.parse("4.0")) < 0;
1100+
// Thrift is removed from some pre-release 4.x versions, make the comparison work for those
1101+
return cassandraVersion.nextStable().compareTo(VersionNumber.parse("4.0")) < 0;
1102+
}
1103+
1104+
private static boolean isMaterializedViewsDisabledByDefault(VersionNumber cassandraVersion) {
1105+
return cassandraVersion.nextStable().compareTo(VersionNumber.parse("4.0")) >= 0;
1106+
}
1107+
1108+
private static boolean isSasiConfigEnablementRequired(VersionNumber cassandraVersion) {
1109+
return cassandraVersion.nextStable().compareTo(VersionNumber.parse("4.0")) >= 0;
10931110
}
10941111

10951112
public int weight() {

driver-core/src/test/java/com/datastax/driver/core/ExportAsStringTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void should_create_schema_and_ensure_exported_cql_is_as_expected() {
146146
// matter, alphabetical does.
147147
session.execute(
148148
"CREATE MATERIALIZED VIEW cyclist_by_r_age "
149-
+ "AS SELECT age, birthday, name, country "
149+
+ "AS SELECT cid, age, birthday, name, country "
150150
+ "FROM cyclist_mv "
151151
+ "WHERE age IS NOT NULL AND cid IS NOT NULL "
152152
+ "PRIMARY KEY (age, cid) "
@@ -163,7 +163,7 @@ public void should_create_schema_and_ensure_exported_cql_is_as_expected() {
163163
// A materialized view for cyclist_mv, select columns
164164
session.execute(
165165
"CREATE MATERIALIZED VIEW cyclist_by_age "
166-
+ "AS SELECT age, birthday, name, country "
166+
+ "AS SELECT cid, age, birthday, name, country "
167167
+ "FROM cyclist_mv "
168168
+ "WHERE age IS NOT NULL AND cid IS NOT NULL "
169169
+ "PRIMARY KEY (age, cid) WITH comment = 'simple view'");

driver-core/src/test/java/com/datastax/driver/core/MissingRpcAddressTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ private void deleteNode2RpcAddressFromNode1() throws Exception {
7272
"DELETE rpc_address FROM system.peers WHERE peer = '%s'",
7373
ccm().addressOfNode(2).getHostName());
7474
session.execute(deleteStmt);
75+
// For Cassandra 4.0, we also need to remove the info from peers_v2
76+
if (ccm().getCassandraVersion().nextStable().compareTo(VersionNumber.parse("4.0")) >= 0) {
77+
deleteStmt =
78+
String.format(
79+
"DELETE native_address, native_port FROM system.peers_v2 WHERE peer = '%s' and peer_port = %d",
80+
ccm().addressOfNode(2).getHostName(), ccm().getStoragePort());
81+
session.execute(deleteStmt);
82+
}
7583
session.close();
7684
cluster.close();
7785
}

driver-core/src/test/java/com/datastax/driver/core/SchemaChangesTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ public void should_retain_view_on_table_update(String keyspace) throws Interrupt
257257
// Create view and ensure event is received and metadata is updated
258258
session1.execute(
259259
String.format(
260-
"CREATE MATERIALIZED VIEW %s.mv1 AS SELECT c FROM %s.table1 WHERE c IS NOT NULL PRIMARY KEY (pk, c)",
260+
"CREATE MATERIALIZED VIEW %s.mv1 AS SELECT pk, c FROM %s.table1 WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (pk, c)",
261261
keyspace, keyspace));
262262
for (SchemaChangeListener listener : listeners) {
263263
ArgumentCaptor<MaterializedViewMetadata> viewAdded =
@@ -506,7 +506,7 @@ public void should_notify_of_view_creation(String keyspace) {
506506
session1.execute(String.format("CREATE TABLE %s.table1 (pk int PRIMARY KEY, c int)", keyspace));
507507
session1.execute(
508508
String.format(
509-
"CREATE MATERIALIZED VIEW %s.mv1 AS SELECT c FROM %s.table1 WHERE c IS NOT NULL PRIMARY KEY (pk, c)",
509+
"CREATE MATERIALIZED VIEW %s.mv1 AS SELECT pk, c FROM %s.table1 WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (pk, c)",
510510
keyspace, keyspace));
511511
for (SchemaChangeListener listener : listeners) {
512512
ArgumentCaptor<MaterializedViewMetadata> removed =
@@ -525,7 +525,7 @@ public void should_notify_of_view_update(String keyspace) {
525525
session1.execute(String.format("CREATE TABLE %s.table1 (pk int PRIMARY KEY, c int)", keyspace));
526526
session1.execute(
527527
String.format(
528-
"CREATE MATERIALIZED VIEW %s.mv1 AS SELECT c FROM %s.table1 WHERE c IS NOT NULL PRIMARY KEY (pk, c) WITH compaction = { 'class' : 'SizeTieredCompactionStrategy' }",
528+
"CREATE MATERIALIZED VIEW %s.mv1 AS SELECT pk, c FROM %s.table1 WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (pk, c) WITH compaction = { 'class' : 'SizeTieredCompactionStrategy' }",
529529
keyspace, keyspace));
530530
for (SchemaChangeListener listener : listeners) {
531531
ArgumentCaptor<MaterializedViewMetadata> removed =
@@ -576,7 +576,7 @@ public void should_notify_of_view_drop(String keyspace) {
576576
session1.execute(String.format("CREATE TABLE %s.table1 (pk int PRIMARY KEY, c int)", keyspace));
577577
session1.execute(
578578
String.format(
579-
"CREATE MATERIALIZED VIEW %s.mv1 AS SELECT c FROM %s.table1 WHERE c IS NOT NULL PRIMARY KEY (pk, c)",
579+
"CREATE MATERIALIZED VIEW %s.mv1 AS SELECT pk, c FROM %s.table1 WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (pk, c)",
580580
keyspace, keyspace));
581581
session1.execute(String.format("DROP MATERIALIZED VIEW %s.mv1", keyspace));
582582
for (SchemaChangeListener listener : listeners) {

driver-core/src/test/java/com/datastax/driver/core/TableMetadataTest.java

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,32 @@ public void should_parse_table_options() {
300300
// given
301301
String cql;
302302

303-
// Cassandra 3.0 +
304-
if (version.getMajor() > 2) {
303+
// Cassandra 4.0 +
304+
if (version.getMajor() > 3) {
305+
cql =
306+
String.format(
307+
"CREATE TABLE %s.with_options (\n"
308+
+ " k text,\n"
309+
+ " c1 int,\n"
310+
+ " c2 int,\n"
311+
+ " i int,\n"
312+
+ " PRIMARY KEY (k, c1, c2)\n"
313+
+ ") WITH CLUSTERING ORDER BY (c1 DESC, c2 ASC)\n"
314+
+ " AND additional_write_policy = '99p'\n"
315+
+ " AND read_repair = 'BLOCKING'\n"
316+
+ " AND speculative_retry = '99.9p'\n"
317+
+ " AND gc_grace_seconds = 42\n"
318+
+ " AND bloom_filter_fp_chance = 0.01\n"
319+
+ " AND caching = { 'keys' : 'ALL', 'rows_per_partition' : 10 }\n"
320+
+ " AND comment = 'My awesome table'\n"
321+
+ " AND compaction = { 'class' : 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'sstable_size_in_mb' : 15 }\n"
322+
+ " AND compression = { 'sstable_compression' : 'org.apache.cassandra.io.compress.SnappyCompressor', 'chunk_length_kb' : 128 }\n"
323+
+ " AND crc_check_chance = 0.5\n" // available from C* 3.0
324+
+ " AND memtable_flush_period_in_ms = 1000;",
325+
keyspace);
326+
327+
// Cassandra 3.0 +
328+
} else if (version.getMajor() > 2) {
305329
cql =
306330
String.format(
307331
"CREATE TABLE %s.with_options (\n"
@@ -401,8 +425,58 @@ public void should_parse_table_options() {
401425
.hasType(cint());
402426
assertThat(table);
403427

404-
// Cassandra 3.8 +
405-
if (version.getMajor() > 3 || (version.getMajor() == 3 && version.getMinor() >= 8)) {
428+
// Cassandra 4.0 +
429+
if (version.getMajor() > 3) {
430+
431+
assertThat(table.getOptions().getGcGraceInSeconds()).isEqualTo(42);
432+
assertThat(table.getOptions().getBloomFilterFalsePositiveChance()).isEqualTo(0.01);
433+
assertThat(table.getOptions().getComment()).isEqualTo("My awesome table");
434+
assertThat(table.getOptions().getCaching()).contains(entry("keys", "ALL"));
435+
assertThat(table.getOptions().getCaching()).contains(entry("rows_per_partition", "10"));
436+
assertThat(table.getOptions().getCompaction())
437+
.contains(entry("class", "org.apache.cassandra.db.compaction.LeveledCompactionStrategy"));
438+
assertThat(table.getOptions().getCompaction()).contains(entry("sstable_size_in_mb", "15"));
439+
assertThat(table.getOptions().getCompression())
440+
.contains(
441+
entry(
442+
"class",
443+
"org.apache.cassandra.io.compress.SnappyCompressor")); // sstable_compression
444+
// becomes class
445+
assertThat(table.getOptions().getCompression())
446+
.contains(entry("chunk_length_in_kb", "128")); // note the "in" prefix
447+
assertThat(table.getOptions().getDefaultTimeToLive()).isEqualTo(0);
448+
assertThat(table.getOptions().getSpeculativeRetry()).isEqualTo("99.9p");
449+
assertThat(table.getOptions().getIndexInterval()).isNull();
450+
assertThat(table.getOptions().getMinIndexInterval()).isEqualTo(128);
451+
assertThat(table.getOptions().getMaxIndexInterval()).isEqualTo(2048);
452+
assertThat(table.getOptions().getReplicateOnWrite()).isTrue(); // default
453+
assertThat(table.getOptions().getCrcCheckChance()).isEqualTo(0.5);
454+
assertThat(table.getOptions().getExtensions()).isEmpty(); // default
455+
assertThat(table.getOptions().getMemtableFlushPeriodInMs()).isEqualTo(1000);
456+
assertThat(table.asCQLQuery())
457+
.contains("additional_write_policy = '99p'")
458+
.contains("read_repair = 'BLOCKING'")
459+
.contains("gc_grace_seconds = 42")
460+
.contains("bloom_filter_fp_chance = 0.01")
461+
.contains("comment = 'My awesome table'")
462+
.contains("'keys' : 'ALL'")
463+
.contains("'rows_per_partition' : 10")
464+
.contains("'class' : 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'")
465+
.contains("'sstable_size_in_mb' : 15")
466+
.contains(
467+
"'class' : 'org.apache.cassandra.io.compress.SnappyCompressor'") // sstable_compression becomes class
468+
.contains("'chunk_length_in_kb' : 128") // note the "in" prefix
469+
.contains("default_time_to_live = 0")
470+
.contains("speculative_retry = '99.9p'")
471+
.contains("min_index_interval = 128")
472+
.contains("max_index_interval = 2048")
473+
.contains("crc_check_chance = 0.5")
474+
.contains("cdc = false")
475+
.contains("memtable_flush_period_in_ms = 1000")
476+
.doesNotContain(" index_interval")
477+
.doesNotContain("replicate_on_write");
478+
// Cassandra 3.8 +
479+
} else if (version.getMajor() == 3 && version.getMinor() >= 8) {
406480

407481
assertThat(table.getOptions().getReadRepairChance()).isEqualTo(0.5);
408482
assertThat(table.getOptions().getLocalReadRepairChance()).isEqualTo(0.6);

0 commit comments

Comments
 (0)