Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [improvement] JAVA-1233: Update Snappy to 1.1.2.6.
- [bug] JAVA-1161: Preserve full time zone info in ZonedDateTimeCodec and DateTimeCodec.
- [new feature] JAVA-1157: Allow asynchronous paging of Mapper Result.
- [improvement] JAVA-1212: Don't retry non-idempotent statements by default.

Merged from 3.0.x branch:

Expand Down
101 changes: 57 additions & 44 deletions driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
*/
class RequestHandler {
private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
private static final AtomicBoolean WARNED_IDEMPOTENT = new AtomicBoolean();

final String id;

Expand Down Expand Up @@ -353,6 +354,38 @@ private void write(Connection connection, Connection.ResponseCallback responseCa
connection.release();
}

private RetryPolicy.RetryDecision computeRetryDecisionOnRequestError(DriverException exception) {
RetryPolicy.RetryDecision decision;
if (statement.isIdempotentWithDefault(manager.cluster.getConfiguration().getQueryOptions())) {
decision = retryPolicy().onRequestError(statement, request().consistency(), exception, retriesByPolicy);
} else {
logIdempotenceWarning();
decision = RetryPolicy.RetryDecision.rethrow();
}
if (metricsEnabled()) {
if (exception instanceof OperationTimedOutException) {
metrics().getErrorMetrics().getClientTimeouts().inc();
if (decision.getType() == Type.RETRY)
metrics().getErrorMetrics().getRetriesOnClientTimeout().inc();
if (decision.getType() == Type.IGNORE)
metrics().getErrorMetrics().getIgnoresOnClientTimeout().inc();
} else if (exception instanceof ConnectionException) {
metrics().getErrorMetrics().getConnectionErrors().inc();
if (decision.getType() == Type.RETRY)
metrics().getErrorMetrics().getRetriesOnConnectionError().inc();
if (decision.getType() == Type.IGNORE)
metrics().getErrorMetrics().getIgnoresOnConnectionError().inc();
} else {
metrics().getErrorMetrics().getOthers().inc();
if (decision.getType() == Type.RETRY)
metrics().getErrorMetrics().getRetriesOnOtherErrors().inc();
if (decision.getType() == Type.IGNORE)
metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc();
}
}
return decision;
}

private void processRetryDecision(RetryPolicy.RetryDecision retryDecision, Connection connection, Exception exceptionToReport) {
switch (retryDecision.getType()) {
case RETRY:
Expand Down Expand Up @@ -490,12 +523,17 @@ public void onSet(Connection connection, Message.Response response, long latency
connection.release();
assert err.infos instanceof WriteTimeoutException;
WriteTimeoutException wte = (WriteTimeoutException) err.infos;
retry = retryPolicy.onWriteTimeout(statement,
wte.getConsistencyLevel(),
wte.getWriteType(),
wte.getRequiredAcknowledgements(),
wte.getReceivedAcknowledgements(),
retriesByPolicy);
if (statement.isIdempotentWithDefault(manager.cluster.getConfiguration().getQueryOptions()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we get rid of IdempotenceAwareRetryPolicy since we now consider idempotence outside of Retry policy?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, it was marked deprecated.

retry = retryPolicy.onWriteTimeout(statement,
wte.getConsistencyLevel(),
wte.getWriteType(),
wte.getRequiredAcknowledgements(),
wte.getReceivedAcknowledgements(),
retriesByPolicy);
else {
logIdempotenceWarning();
retry = RetryPolicy.RetryDecision.rethrow();
}
if (metricsEnabled()) {
metrics().getErrorMetrics().getWriteTimeouts().inc();
if (retry.getType() == Type.RETRY)
Expand Down Expand Up @@ -525,33 +563,15 @@ public void onSet(Connection connection, Message.Response response, long latency
connection.release();
assert exceptionToReport instanceof OverloadedException;
logger.warn("Host {} is overloaded.", connection.address);
retry = retryPolicy.onRequestError(statement,
request().consistency(),
(OverloadedException) exceptionToReport, retriesByPolicy);
if (metricsEnabled()) {
metrics().getErrorMetrics().getOthers().inc();
if (retry.getType() == Type.RETRY)
metrics().getErrorMetrics().getRetriesOnOtherErrors().inc();
if (retry.getType() == Type.IGNORE)
metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc();
}
retry = computeRetryDecisionOnRequestError((OverloadedException) exceptionToReport);
break;
case SERVER_ERROR:
connection.release();
assert exceptionToReport instanceof ServerError;
logger.warn("{} replied with server error ({}), defuncting connection.", connection.address, err.message);
// Defunct connection
connection.defunct(exceptionToReport);
retry = retryPolicy.onRequestError(statement,
request().consistency(),
(ServerError) exceptionToReport, retriesByPolicy);
if (metricsEnabled()) {
metrics().getErrorMetrics().getOthers().inc();
if (retry.getType() == Type.RETRY)
metrics().getErrorMetrics().getRetriesOnOtherErrors().inc();
if (retry.getType() == Type.IGNORE)
metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc();
}
retry = computeRetryDecisionOnRequestError((ServerError) exceptionToReport);
break;
case IS_BOOTSTRAPPING:
connection.release();
Expand Down Expand Up @@ -716,15 +736,7 @@ public void onException(Connection connection, Exception exception, long latency
connection.release();

if (exception instanceof ConnectionException) {
RetryPolicy retryPolicy = retryPolicy();
RetryPolicy.RetryDecision decision = retryPolicy.onRequestError(statement, request().consistency(), (ConnectionException) exception, retriesByPolicy);
if (metricsEnabled()) {
metrics().getErrorMetrics().getConnectionErrors().inc();
if (decision.getType() == Type.RETRY)
metrics().getErrorMetrics().getRetriesOnConnectionError().inc();
if (decision.getType() == Type.IGNORE)
metrics().getErrorMetrics().getIgnoresOnConnectionError().inc();
}
RetryPolicy.RetryDecision decision = computeRetryDecisionOnRequestError((ConnectionException) exception);
processRetryDecision(decision, connection, exception);
return;
}
Expand Down Expand Up @@ -755,15 +767,7 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
try {
connection.release();

RetryPolicy retryPolicy = retryPolicy();
RetryPolicy.RetryDecision decision = retryPolicy.onRequestError(statement, request().consistency(), timeoutException, retriesByPolicy);
if (metricsEnabled()) {
metrics().getErrorMetrics().getClientTimeouts().inc();
if (decision.getType() == Type.RETRY)
metrics().getErrorMetrics().getRetriesOnClientTimeout().inc();
if (decision.getType() == Type.IGNORE)
metrics().getErrorMetrics().getIgnoresOnClientTimeout().inc();
}
RetryPolicy.RetryDecision decision = computeRetryDecisionOnRequestError(timeoutException);
processRetryDecision(decision, connection, timeoutException);
} catch (Exception e) {
// This shouldn't happen, but if it does, we want to signal the callback, not let it hang indefinitely
Expand All @@ -789,6 +793,15 @@ private void setFinalResult(Connection connection, Message.Response response) {
}
}

private void logIdempotenceWarning() {
if (WARNED_IDEMPOTENT.compareAndSet(false, true))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about establishing a common logger implementation for messages like this? This way users can easily suppress warnings like this globally for all changes. Some users pay very close attention to warnings and try their best to suppress all the ones that doesn't affect their application health.

logger.warn("Not retrying statement because it is not idempotent (this message will be logged only once). " +
"Note that this version of the driver changes the default retry behavior for non-idempotent " +
"statements: they won't be automatically retried anymore. The driver marks statements " +
"non-idempotent by default, so you should explicitly call setIdempotent(true) if your statements " +
"are safe to retry. See http://goo.gl/4HrSby for more details.");
}

/**
* The state of a SpeculativeExecution.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,14 @@ public Statement setIdempotent(boolean idempotent) {
* Whether this statement is idempotent, i.e. whether it can be applied multiple times
* without changing the result beyond the initial application.
* <p/>
* Idempotence plays a role in {@link com.datastax.driver.core.policies.SpeculativeExecutionPolicy speculative executions}.
* If a statement is <em>not idempotent</em>, the driver will not schedule speculative
* executions for it.
* If a statement is <em>not idempotent</em>, the driver will ensure that it never gets executed more than once,
* which means:
* <ul>
* <li>avoiding {@link RetryPolicy retries} on write timeouts or request errors;</li>
* <li>never scheduling {@link com.datastax.driver.core.policies.SpeculativeExecutionPolicy speculative executions}.
* </li>
* </ul>
* (this behavior is implemented in the driver internals, the corresponding policies will not even be invoked).
* <p/>
* Note that this method can return {@code null}, in which case the driver will default to
* {@link QueryOptions#getDefaultIdempotence()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int

/**
* {@inheritDoc}
* <p/>
* For historical reasons, this implementation triggers a retry on the next host in the query plan
* with the same consistency level, regardless of the statement's idempotence.
* Note that this breaks the general rule
* stated in {@link RetryPolicy#onRequestError(Statement, ConsistencyLevel, DriverException, int)}:
* "a retry should only be attempted if the request is known to be idempotent".
*/
@Override
public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
* if the statement is deemed non-idempotent (see {@link #isIdempotent(Statement)}).
* <p/>
* For all other cases, this policy delegates the decision to the child policy.
*
* @deprecated As of version 3.1.0, the driver doesn't retry non-idempotent statements for write timeouts or unexpected
* errors anymore. It is no longer necessary to wrap your retry policies in this policy.
*/
@Deprecated
public class IdempotenceAwareRetryPolicy implements RetryPolicy {

private final RetryPolicy childPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public interface RetryPolicy {
* </ul>
*/
class RetryDecision {

private static final RetryDecision RETHROW_DECISION = new RetryDecision(Type.RETHROW, null, true);
private static final RetryDecision IGNORE_DECISION = new RetryDecision(Type.IGNORE, null, true);

/**
* The types of retry decisions.
*/
Expand Down Expand Up @@ -99,7 +103,7 @@ public boolean isRetryCurrent() {
* @return a {@link RetryDecision.Type#RETHROW} retry decision.
*/
public static RetryDecision rethrow() {
return new RetryDecision(Type.RETHROW, null, true);
return RETHROW_DECISION;
}

/**
Expand Down Expand Up @@ -128,7 +132,7 @@ public static RetryDecision retry(ConsistencyLevel consistency) {
* @return an {@link RetryDecision.Type#IGNORE} retry decision.
*/
public static RetryDecision ignore() {
return new RetryDecision(Type.IGNORE, null, true);
return IGNORE_DECISION;
}

/**
Expand Down Expand Up @@ -194,6 +198,9 @@ public String toString() {

/**
* Defines whether to retry and at which consistency level on a write timeout.
* <p/>
* Note that if a statement is {@link Statement#isIdempotent() not idempotent}, the driver will never retry it on a
* write timeout (this method won't even be called).
*
* @param statement the original query that timed out.
* @param cl the requested consistency level of the write that timed out.
Expand Down Expand Up @@ -247,13 +254,12 @@ public String toString() {
* <li>On a client timeout, while waiting for the server response
* (see {@link SocketOptions#getReadTimeoutMillis()});</li>
* <li>On a connection error (socket closed, etc.);</li>
* <li>When the contacted host replies with an error, such as
* {@code OVERLOADED}, {@code IS_BOOTSTRAPPING}, {@code SERVER_ERROR}, etc.</li>
* <li>When the contacted host replies with an {@code OVERLOADED} error or a {@code SERVER_ERROR}.</li>
* </ol>
* <p/>
* Note that when this method is invoked, <em>the driver cannot guarantee that the mutation has
* been effectively applied server-side</em>; a retry should only be attempted if the request
* is known to be idempotent.
* Note that when such an error occurs, there is no guarantee that the mutation has been applied server-side or not.
* Therefore, if a statement is {@link Statement#isIdempotent() not idempotent}, the driver will never retry it
* (this method won't even be called).
*
* @param statement the original query that failed.
* @param cl the requested consistency level for the operation.
Expand Down
10 changes: 5 additions & 5 deletions manual/idempotence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ example:
* `update my_table set list_col = [1] + list_col where pk = 1` is not idempotent: if `list_col` was initially empty,
it will contain `[1]` after the first execution, `[1, 1]` after the second, etc.

Idempotence matters for [retries](../retries/) and [speculative query executions](../speculative_execution/). The
corresponding policies inspect the [Statement#isIdempotent()][isIdempotent] flag.
Idempotence matters for [retries](../retries/) and [speculative query executions](../speculative_execution/). The driver
will bypass those features if the [Statement#isIdempotent()][isIdempotent] flag is set to `false`, to ensure that the
statement does not get executed more than once.

In most cases, you must set that flag manually. The driver does not parse query strings, so it can't infer it
automatically (except for statements coming from the query builder, see below).
Expand Down Expand Up @@ -121,9 +122,8 @@ clients' point of view, there were two operations:
But overall the column changed from 1 to 2. There is no ordering of the two operations that can explain that change. We
broke linearizability by doing a transparent retry at step 6.

To avoid this, the driver considers lightweight transactions as non-idempotent, and provides a
[retry policy](../retries/) that doesn't retry non-idempotent statements. If linearizability is important for you, you
should use that policy, and ensure that lightweight transactions are appropriately flagged.
If linearizability is important for you, you should ensure that lightweight transactions are appropriately flagged as
not idempotent.

[isIdempotent]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/Statement.html#isIdempotent--
[setDefaultIdempotence]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/QueryOptions.html#setDefaultIdempotence-boolean-
Expand Down
Loading