Skip to content

Commit fb58e24

Browse files
committed
JAVA-764: LWT / CAS Consistency bug on Retry Policy.
(cherry picked from 2.1)
1 parent a5e8ca8 commit fb58e24

12 files changed

Lines changed: 233 additions & 143 deletions

File tree

changelog/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ Merged from 2.1 branch:
1616
- [bug] JAVA-1089: Set LWT made from BuiltStatements to non-idempotent.
1717
- [improvement] JAVA-923: Position idempotent flag on object mapper queries.
1818
- [bug] JAVA-1070: The Mapper should not prepare queries synchronously.
19+
- [new feature] JAVA-982: Introduce new method ConsistencyLevel.isSerial().
20+
- [bug] JAVA-764: Retry with the normal consistency level (not the serial one) when a write times out on the Paxos phase.
1921

2022

2123
### 3.0.0

driver-core/src/main/java/com/datastax/driver/core/ConsistencyLevel.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,30 @@ static ConsistencyLevel fromCode(int code) {
5858
}
5959

6060
/**
61-
* Whether or not the the consistency level applies to the local data-center only.
61+
* Whether or not this consistency level applies to the local data-center only.
6262
*
6363
* @return whether this consistency level is {@code LOCAL_ONE} or {@code LOCAL_QUORUM}.
6464
*/
6565
public boolean isDCLocal() {
6666
return this == LOCAL_ONE || this == LOCAL_QUORUM;
6767
}
68+
69+
/**
70+
* Whether or not this consistency level is serial, that is,
71+
* applies only to the "paxos" phase of a
72+
* <a href="https://docs.datastax.com/en/cassandra/2.1/cassandra/dml/dml_ltwt_transaction_c.html">Lightweight transaction</a>.
73+
* <p/>
74+
* Serial consistency levels are only meaningful when executing conditional updates ({@code INSERT}, {@code UPDATE}
75+
* or {@code DELETE} statements with an {@code IF} condition).
76+
* <p/>
77+
* Two consistency levels belong to this category: {@link #SERIAL} and {@link #LOCAL_SERIAL}.
78+
*
79+
* @return whether this consistency level is {@link #SERIAL} or {@link #LOCAL_SERIAL}.
80+
* @see Statement#setSerialConsistencyLevel(ConsistencyLevel)
81+
* @see <a href="https://docs.datastax.com/en/cassandra/2.1/cassandra/dml/dml_ltwt_transaction_c.html">Lightweight transactions</a>
82+
*/
83+
public boolean isSerial() {
84+
return this == SERIAL || this == LOCAL_SERIAL;
85+
}
86+
6887
}

driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public ConsistencyLevel getConsistencyLevel() {
161161

162162
@Override
163163
public PreparedStatement setSerialConsistencyLevel(ConsistencyLevel serialConsistency) {
164-
if (serialConsistency != ConsistencyLevel.SERIAL && serialConsistency != ConsistencyLevel.LOCAL_SERIAL)
164+
if (!serialConsistency.isSerial())
165165
throw new IllegalArgumentException();
166166
this.serialConsistency = serialConsistency;
167167
return this;

driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,8 @@ private void processRetryDecision(RetryPolicy.RetryDecision retryDecision, Conne
383383

384384
private void retry(final boolean retryCurrent, ConsistencyLevel newConsistencyLevel) {
385385
final Host h = current;
386-
this.retryConsistencyLevel = newConsistencyLevel;
386+
if (newConsistencyLevel != null)
387+
this.retryConsistencyLevel = newConsistencyLevel;
387388

388389
// We should not retry on the current thread as this will be an IO thread.
389390
manager.executor().execute(new Runnable() {

driver-core/src/main/java/com/datastax/driver/core/Statement.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,9 @@ public ConsistencyLevel getConsistencyLevel() {
9898
/**
9999
* Sets the serial consistency level for the query.
100100
* <p/>
101-
* The serial consistency level is only used by conditional updates (so INSERT, UPDATE
102-
* and DELETE with an IF condition). For those, the serial consistency level defines
101+
* The serial consistency level is only used by conditional updates ({@code INSERT}, {@code UPDATE}
102+
* or {@code DELETE} statements with an {@code IF} condition).
103+
* For those, the serial consistency level defines
103104
* the consistency level of the serial phase (or "paxos" phase) while the
104105
* normal consistency level defines the consistency for the "learn" phase, i.e. what
105106
* type of reads will be guaranteed to see the update right away. For instance, if
@@ -122,19 +123,19 @@ public ConsistencyLevel getConsistencyLevel() {
122123
* {@code ConsistencyLevel.SERIAL} or {@code ConsistencyLevel.LOCAL_SERIAL}.
123124
*/
124125
public Statement setSerialConsistencyLevel(ConsistencyLevel serialConsistency) {
125-
if (serialConsistency != ConsistencyLevel.SERIAL && serialConsistency != ConsistencyLevel.LOCAL_SERIAL)
126-
throw new IllegalArgumentException();
126+
if (!serialConsistency.isSerial())
127+
throw new IllegalArgumentException("Supplied consistency level is not serial: " + serialConsistency);
127128
this.serialConsistency = serialConsistency;
128129
return this;
129130
}
130131

131132
/**
132133
* The serial consistency level for this query.
133134
* <p/>
134-
* See {@link #setSerialConsistencyLevel} for more detail on the serial consistency level.
135+
* See {@link #setSerialConsistencyLevel(ConsistencyLevel)} for more detail on the serial consistency level.
135136
*
136-
* @return the consistency level for this query, or {@code null} if no serial
137-
* consistency level has been specified (through {@code setSerialConsistencyLevel}).
137+
* @return the serial consistency level for this query, or {@code null} if no serial
138+
* consistency level has been specified (through {@link #setSerialConsistencyLevel(ConsistencyLevel)}).
138139
* In the latter case, the default serial consistency level will be used.
139140
*/
140141
public ConsistencyLevel getSerialConsistencyLevel() {

driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ private DefaultRetryPolicy() {
4444
}
4545

4646
/**
47-
* Defines whether to retry and at which consistency level on a read timeout.
47+
* {@inheritDoc}
4848
* <p/>
49-
* This method triggers a maximum of one retry, and only if enough
49+
* This implementation triggers a maximum of one retry, and only if enough
5050
* replicas had responded to the read request but data was not retrieved
5151
* amongst those. Indeed, that case usually means that enough replica
5252
* are alive to satisfy the consistency but the coordinator picked a
@@ -55,15 +55,6 @@ private DefaultRetryPolicy() {
5555
* timeout the dead replica will likely have been detected as dead and
5656
* the retry has a high chance of success.
5757
*
58-
* @param statement the original query that timed out.
59-
* @param cl the original consistency level of the read that timed out.
60-
* @param requiredResponses the number of responses that were required to
61-
* achieve the requested consistency level.
62-
* @param receivedResponses the number of responses that had been received
63-
* by the time the timeout exception was raised.
64-
* @param dataRetrieved whether actual data (by opposition to data checksum)
65-
* was present in the received responses.
66-
* @param nbRetry the number of retries already performed for this operation.
6758
* @return {@code RetryDecision.retry(cl)} if no retry attempt has yet been tried and
6859
* {@code receivedResponses >= requiredResponses && !dataRetrieved}, {@code RetryDecision.rethrow()} otherwise.
6960
*/
@@ -76,9 +67,9 @@ public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int
7667
}
7768

7869
/**
79-
* Defines whether to retry and at which consistency level on a write timeout.
70+
* {@inheritDoc}
8071
* <p/>
81-
* This method triggers a maximum of one retry, and only in the case of
72+
* This implementation triggers a maximum of one retry, and only in the case of
8273
* a {@code WriteType.BATCH_LOG} write. The reasoning for the retry in
8374
* that case is that write to the distributed batch log is tried by the
8475
* coordinator of the write against a small subset of all the nodes alive
@@ -88,14 +79,6 @@ public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int
8879
* nodes will likely have been detected as dead and the retry has thus a
8980
* high chance of success.
9081
*
91-
* @param statement the original query that timed out.
92-
* @param cl the original consistency level of the write that timed out.
93-
* @param writeType the type of the write that timed out.
94-
* @param requiredAcks the number of acknowledgments that were required to
95-
* achieve the requested consistency level.
96-
* @param receivedAcks the number of acknowledgments that had been received
97-
* by the time the timeout exception was raised.
98-
* @param nbRetry the number of retry already performed for this operation.
9982
* @return {@code RetryDecision.retry(cl)} if no retry attempt has yet been tried and
10083
* {@code writeType == WriteType.BATCH_LOG}, {@code RetryDecision.rethrow()} otherwise.
10184
*/
@@ -105,37 +88,27 @@ public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, Wr
10588
return RetryDecision.rethrow();
10689

10790
// If the batch log write failed, retry the operation as this might just be we were unlucky at picking candidates
91+
// JAVA-764: testing the write type automatically filters out serial consistency levels as these have always WriteType.CAS.
10892
return writeType == WriteType.BATCH_LOG ? RetryDecision.retry(cl) : RetryDecision.rethrow();
10993
}
11094

11195
/**
112-
* Defines whether to retry and at which consistency level on an
113-
* unavailable exception.
96+
* {@inheritDoc}
11497
* <p/>
115-
* This method triggers a retry iff no retry has been executed before
116-
* (nbRetry == 0), with
117-
* {@link RetryPolicy.RetryDecision#tryNextHost(ConsistencyLevel) RetryDecision.tryNextHost(cl)},
118-
* otherwise it throws an exception. The retry will be processed on the next host
119-
* in the query plan according to the current Load Balancing Policy.
120-
* Where retrying on the same host in the event of an Unavailable exception
121-
* has almost no chance of success, if the first replica tried happens to
122-
* be "network" isolated from all the other nodes but can still answer to
123-
* the client, it makes sense to retry the query on another node.
124-
*
125-
* @param statement the original query for which the consistency level cannot
126-
* be achieved.
127-
* @param cl the original consistency level for the operation.
128-
* @param requiredReplica the number of replica that should have been
129-
* (known) alive for the operation to be attempted.
130-
* @param aliveReplica the number of replica that were know to be alive by
131-
* the coordinator of the operation.
132-
* @param nbRetry the number of retry already performed for this operation.
133-
* @return {@code RetryDecision.rethrow()}.
98+
* This implementation does the following:
99+
* <ul>
100+
* <li>if this is the first retry ({@code nbRetry == 0}), it triggers a retry on the next host in the query plan
101+
* with the same consistency level ({@link RetryPolicy.RetryDecision#tryNextHost(ConsistencyLevel) RetryDecision#tryNextHost(null)}.
102+
* The rationale is that the first coordinator might have been network-isolated from all other nodes (thinking
103+
* they're down), but still able to communicate with the client; in that case, retrying on the same host has almost
104+
* no chance of success, but moving to the next host might solve the issue.</li>
105+
* <li>otherwise, the exception is rethrow.</li>
106+
* </ul>
134107
*/
135108
@Override
136109
public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
137110
return (nbRetry == 0)
138-
? RetryDecision.tryNextHost(cl)
111+
? RetryDecision.tryNextHost(null)
139112
: RetryDecision.rethrow();
140113
}
141114

driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -87,25 +87,14 @@ else if (knownOk == 1)
8787
}
8888

8989
/**
90-
* Defines whether to retry and at which consistency level on a read timeout.
90+
* {@inheritDoc}
9191
* <p/>
92-
* This method triggers a maximum of one retry. If less replica
92+
* This implementation triggers a maximum of one retry. If less replica
9393
* responded than required by the consistency level (but at least one
9494
* replica did respond), the operation is retried at a lower
9595
* consistency level. If enough replica responded but data was not
9696
* retrieve, the operation is retried with the initial consistency
9797
* level. Otherwise, an exception is thrown.
98-
*
99-
* @param statement the original query that timed out.
100-
* @param cl the original consistency level of the read that timed out.
101-
* @param requiredResponses the number of responses that were required to
102-
* achieve the requested consistency level.
103-
* @param receivedResponses the number of responses that had been received
104-
* by the time the timeout exception was raised.
105-
* @param dataRetrieved whether actual data (by opposition to data checksum)
106-
* was present in the received responses.
107-
* @param nbRetry the number of retry already performed for this operation.
108-
* @return a RetryDecision as defined above.
10998
*/
11099
@Override
111100
public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
@@ -116,7 +105,7 @@ public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int
116105
// normal consistency levels on the committing phase. So the main use case for CAS reads is probably for
117106
// when you've timed out on a CAS write and want to make sure what happened. Downgrading in that case
118107
// would be always wrong so we just special case to rethrow.
119-
if (cl == ConsistencyLevel.SERIAL || cl == ConsistencyLevel.LOCAL_SERIAL)
108+
if (cl.isSerial())
120109
return RetryDecision.rethrow();
121110

122111
if (receivedResponses < requiredResponses) {
@@ -128,9 +117,9 @@ public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int
128117
}
129118

130119
/**
131-
* Defines whether to retry and at which consistency level on a write timeout.
120+
* {@inheritDoc}
132121
* <p/>
133-
* This method triggers a maximum of one retry. If {@code writeType ==
122+
* This implementation triggers a maximum of one retry. If {@code writeType ==
134123
* WriteType.BATCH_LOG}, the write is retried with the initial
135124
* consistency level. If {@code writeType == WriteType.UNLOGGED_BATCH}
136125
* and at least one replica acknowledged, the write is retried with a
@@ -139,16 +128,6 @@ public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int
139128
* all, even if {@code receivedAcks > 0}). For other write types ({@code WriteType.SIMPLE}
140129
* and {@code WriteType.BATCH}), if we know the write has been persisted on at
141130
* least one replica, we ignore the exception. Otherwise, an exception is thrown.
142-
*
143-
* @param statement the original query that timed out.
144-
* @param cl the original consistency level of the write that timed out.
145-
* @param writeType the type of the write that timed out.
146-
* @param requiredAcks the number of acknowledgments that were required to
147-
* achieve the requested consistency level.
148-
* @param receivedAcks the number of acknowledgments that had been received
149-
* by the time the timeout exception was raised.
150-
* @param nbRetry the number of retry already performed for this operation.
151-
* @return a RetryDecision as defined above.
152131
*/
153132
@Override
154133
public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
@@ -172,28 +151,22 @@ public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, Wr
172151
}
173152

174153
/**
175-
* Defines whether to retry and at which consistency level on an
176-
* unavailable exception.
154+
* {@inheritDoc}
177155
* <p/>
178-
* This method triggers a maximum of one retry. If at least one replica
156+
* This implementation triggers a maximum of one retry. If at least one replica
179157
* is know to be alive, the operation is retried at a lower consistency
180158
* level.
181-
*
182-
* @param statement the original query for which the consistency level cannot
183-
* be achieved.
184-
* @param cl the original consistency level for the operation.
185-
* @param requiredReplica the number of replica that should have been
186-
* (known) alive for the operation to be attempted.
187-
* @param aliveReplica the number of replica that were know to be alive by
188-
* the coordinator of the operation.
189-
* @param nbRetry the number of retry already performed for this operation.
190-
* @return a RetryDecision as defined above.
191159
*/
192160
@Override
193161
public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
194162
if (nbRetry != 0)
195163
return RetryDecision.rethrow();
196164

165+
// JAVA-764: if the requested consistency level is serial, it means that the operation failed at the paxos phase of a LWT.
166+
// Retry on the next host, on the assumption that the initial coordinator could be network-isolated.
167+
if (cl.isSerial())
168+
return RetryDecision.tryNextHost(null);
169+
197170
// Tries the biggest CL that is expected to work
198171
return maxLikelyToWorkCL(aliveReplica);
199172
}

0 commit comments

Comments
 (0)