From 1287f16be2efc9f6641efda1570da8c9ea91518c Mon Sep 17 00:00:00 2001 From: olim7t Date: Mon, 20 Jun 2016 14:51:33 -0700 Subject: [PATCH 1/2] JAVA-1212: Don't retry non-idempotent statements by default This was opt-in behavior in 3.0.0, make it the default because it's the right thing to do. The counterpart is that the isIdempotent flag needs to be positioned properly on statements, the doc has been updated in that regard. This was made as an "internal" change (as opposed to a retry policy implementation), so that it would work with any retry policy. Therefore `IdempotenceAwareRetryPolicy` becomes redundant and was deprecated. --- changelog/README.md | 1 + .../datastax/driver/core/RequestHandler.java | 101 ++++++++++-------- .../com/datastax/driver/core/Statement.java | 11 +- .../core/policies/DefaultRetryPolicy.java | 6 -- .../policies/IdempotenceAwareRetryPolicy.java | 4 + .../driver/core/policies/RetryPolicy.java | 12 ++- manual/idempotence/README.md | 10 +- manual/retries/README.md | 43 +++----- upgrade_guide/README.md | 32 ++++++ 9 files changed, 130 insertions(+), 90 deletions(-) diff --git a/changelog/README.md b/changelog/README.md index 15850195a65..0ba13570262 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -8,6 +8,7 @@ - [improvement] JAVA-1233: Update Snappy to 1.1.2.6. - [bug] JAVA-1161: Preserve full time zone info in ZonedDateTimeCodec and DateTimeCodec. - [new feature] JAVA-1157: Allow asynchronous paging of Mapper Result. +- [improvement] JAVA-1212: Don't retry non-idempotent statements by default. Merged from 3.0.x branch: diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index 745737342d2..23f475e48d5 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -42,6 +42,7 @@ */ class RequestHandler { private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class); + private static final AtomicBoolean WARNED_IDEMPOTENT = new AtomicBoolean(); final String id; @@ -353,6 +354,38 @@ private void write(Connection connection, Connection.ResponseCallback responseCa connection.release(); } + private RetryPolicy.RetryDecision computeRetryDecisionOnRequestError(DriverException exception) { + RetryPolicy.RetryDecision decision; + if (statement.isIdempotentWithDefault(manager.cluster.getConfiguration().getQueryOptions())) { + decision = retryPolicy().onRequestError(statement, request().consistency(), exception, retriesByPolicy); + } else { + logIdempotenceWarning(); + decision = RetryPolicy.RetryDecision.rethrow(); + } + if (metricsEnabled()) { + if (exception instanceof OperationTimedOutException) { + metrics().getErrorMetrics().getClientTimeouts().inc(); + if (decision.getType() == Type.RETRY) + metrics().getErrorMetrics().getRetriesOnClientTimeout().inc(); + if (decision.getType() == Type.IGNORE) + metrics().getErrorMetrics().getIgnoresOnClientTimeout().inc(); + } else if (exception instanceof ConnectionException) { + metrics().getErrorMetrics().getConnectionErrors().inc(); + if (decision.getType() == Type.RETRY) + metrics().getErrorMetrics().getRetriesOnConnectionError().inc(); + if (decision.getType() == Type.IGNORE) + metrics().getErrorMetrics().getIgnoresOnConnectionError().inc(); + } else { + metrics().getErrorMetrics().getOthers().inc(); + if (decision.getType() == Type.RETRY) + metrics().getErrorMetrics().getRetriesOnOtherErrors().inc(); + if (decision.getType() == Type.IGNORE) + metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc(); + } + } + return decision; + } + private void processRetryDecision(RetryPolicy.RetryDecision retryDecision, Connection connection, Exception exceptionToReport) { switch (retryDecision.getType()) { case RETRY: @@ -490,12 +523,17 @@ public void onSet(Connection connection, Message.Response response, long latency connection.release(); assert err.infos instanceof WriteTimeoutException; WriteTimeoutException wte = (WriteTimeoutException) err.infos; - retry = retryPolicy.onWriteTimeout(statement, - wte.getConsistencyLevel(), - wte.getWriteType(), - wte.getRequiredAcknowledgements(), - wte.getReceivedAcknowledgements(), - retriesByPolicy); + if (statement.isIdempotentWithDefault(manager.cluster.getConfiguration().getQueryOptions())) + retry = retryPolicy.onWriteTimeout(statement, + wte.getConsistencyLevel(), + wte.getWriteType(), + wte.getRequiredAcknowledgements(), + wte.getReceivedAcknowledgements(), + retriesByPolicy); + else { + logIdempotenceWarning(); + retry = RetryPolicy.RetryDecision.rethrow(); + } if (metricsEnabled()) { metrics().getErrorMetrics().getWriteTimeouts().inc(); if (retry.getType() == Type.RETRY) @@ -525,16 +563,7 @@ public void onSet(Connection connection, Message.Response response, long latency connection.release(); assert exceptionToReport instanceof OverloadedException; logger.warn("Host {} is overloaded.", connection.address); - retry = retryPolicy.onRequestError(statement, - request().consistency(), - (OverloadedException) exceptionToReport, retriesByPolicy); - if (metricsEnabled()) { - metrics().getErrorMetrics().getOthers().inc(); - if (retry.getType() == Type.RETRY) - metrics().getErrorMetrics().getRetriesOnOtherErrors().inc(); - if (retry.getType() == Type.IGNORE) - metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc(); - } + retry = computeRetryDecisionOnRequestError((OverloadedException) exceptionToReport); break; case SERVER_ERROR: connection.release(); @@ -542,16 +571,7 @@ public void onSet(Connection connection, Message.Response response, long latency logger.warn("{} replied with server error ({}), defuncting connection.", connection.address, err.message); // Defunct connection connection.defunct(exceptionToReport); - retry = retryPolicy.onRequestError(statement, - request().consistency(), - (ServerError) exceptionToReport, retriesByPolicy); - if (metricsEnabled()) { - metrics().getErrorMetrics().getOthers().inc(); - if (retry.getType() == Type.RETRY) - metrics().getErrorMetrics().getRetriesOnOtherErrors().inc(); - if (retry.getType() == Type.IGNORE) - metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc(); - } + retry = computeRetryDecisionOnRequestError((ServerError) exceptionToReport); break; case IS_BOOTSTRAPPING: connection.release(); @@ -716,15 +736,7 @@ public void onException(Connection connection, Exception exception, long latency connection.release(); if (exception instanceof ConnectionException) { - RetryPolicy retryPolicy = retryPolicy(); - RetryPolicy.RetryDecision decision = retryPolicy.onRequestError(statement, request().consistency(), (ConnectionException) exception, retriesByPolicy); - if (metricsEnabled()) { - metrics().getErrorMetrics().getConnectionErrors().inc(); - if (decision.getType() == Type.RETRY) - metrics().getErrorMetrics().getRetriesOnConnectionError().inc(); - if (decision.getType() == Type.IGNORE) - metrics().getErrorMetrics().getIgnoresOnConnectionError().inc(); - } + RetryPolicy.RetryDecision decision = computeRetryDecisionOnRequestError((ConnectionException) exception); processRetryDecision(decision, connection, exception); return; } @@ -755,15 +767,7 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { try { connection.release(); - RetryPolicy retryPolicy = retryPolicy(); - RetryPolicy.RetryDecision decision = retryPolicy.onRequestError(statement, request().consistency(), timeoutException, retriesByPolicy); - if (metricsEnabled()) { - metrics().getErrorMetrics().getClientTimeouts().inc(); - if (decision.getType() == Type.RETRY) - metrics().getErrorMetrics().getRetriesOnClientTimeout().inc(); - if (decision.getType() == Type.IGNORE) - metrics().getErrorMetrics().getIgnoresOnClientTimeout().inc(); - } + RetryPolicy.RetryDecision decision = computeRetryDecisionOnRequestError(timeoutException); processRetryDecision(decision, connection, timeoutException); } catch (Exception e) { // This shouldn't happen, but if it does, we want to signal the callback, not let it hang indefinitely @@ -789,6 +793,15 @@ private void setFinalResult(Connection connection, Message.Response response) { } } + private void logIdempotenceWarning() { + if (WARNED_IDEMPOTENT.compareAndSet(false, true)) + logger.warn("Not retrying statement because it is not idempotent (this message will be logged only once). " + + "Note that this version of the driver changes the default retry behavior for non-idempotent " + + "statements: they won't be automatically retried anymore. The driver marks statements " + + "non-idempotent by default, so you should explicitly call setIdempotent(true) if your statements " + + "are safe to retry. See http://goo.gl/4HrSby for more details."); + } + /** * The state of a SpeculativeExecution. *

diff --git a/driver-core/src/main/java/com/datastax/driver/core/Statement.java b/driver-core/src/main/java/com/datastax/driver/core/Statement.java index 99eb14657cc..405f89dfa6f 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Statement.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Statement.java @@ -490,9 +490,14 @@ public Statement setIdempotent(boolean idempotent) { * Whether this statement is idempotent, i.e. whether it can be applied multiple times * without changing the result beyond the initial application. *

- * Idempotence plays a role in {@link com.datastax.driver.core.policies.SpeculativeExecutionPolicy speculative executions}. - * If a statement is not idempotent, the driver will not schedule speculative - * executions for it. + * If a statement is not idempotent, the driver will ensure that it never gets executed more than once, + * which means: + *

+ * (this behavior is implemented in the driver internals, the corresponding policies will not even be invoked). *

* Note that this method can return {@code null}, in which case the driver will default to * {@link QueryOptions#getDefaultIdempotence()}. diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java index 049c5212e90..8fd28dce56b 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/DefaultRetryPolicy.java @@ -114,12 +114,6 @@ public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int /** * {@inheritDoc} - *

- * For historical reasons, this implementation triggers a retry on the next host in the query plan - * with the same consistency level, regardless of the statement's idempotence. - * Note that this breaks the general rule - * stated in {@link RetryPolicy#onRequestError(Statement, ConsistencyLevel, DriverException, int)}: - * "a retry should only be attempted if the request is known to be idempotent". */ @Override public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry) { diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/IdempotenceAwareRetryPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/IdempotenceAwareRetryPolicy.java index b7de495e236..9db16b833a6 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/IdempotenceAwareRetryPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/IdempotenceAwareRetryPolicy.java @@ -25,7 +25,11 @@ * if the statement is deemed non-idempotent (see {@link #isIdempotent(Statement)}). *

* For all other cases, this policy delegates the decision to the child policy. + * + * @deprecated As of version 3.1.0, the driver doesn't retry non-idempotent statements for write timeouts or unexpected + * errors anymore. It is no longer necessary to wrap your retry policies in this policy. */ +@Deprecated public class IdempotenceAwareRetryPolicy implements RetryPolicy { private final RetryPolicy childPolicy; diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/RetryPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/RetryPolicy.java index 7512d704aab..01d91a3cd91 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/RetryPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/RetryPolicy.java @@ -194,6 +194,9 @@ public String toString() { /** * Defines whether to retry and at which consistency level on a write timeout. + *

+ * Note that if a statement is {@link Statement#isIdempotent() not idempotent}, the driver will never retry it on a + * write timeout (this method won't even be called). * * @param statement the original query that timed out. * @param cl the requested consistency level of the write that timed out. @@ -247,13 +250,12 @@ public String toString() { *

  • On a client timeout, while waiting for the server response * (see {@link SocketOptions#getReadTimeoutMillis()});
  • *
  • On a connection error (socket closed, etc.);
  • - *
  • When the contacted host replies with an error, such as - * {@code OVERLOADED}, {@code IS_BOOTSTRAPPING}, {@code SERVER_ERROR}, etc.
  • + *
  • When the contacted host replies with an {@code OVERLOADED} error or a {@code SERVER_ERROR}.
  • * *

    - * Note that when this method is invoked, the driver cannot guarantee that the mutation has - * been effectively applied server-side; a retry should only be attempted if the request - * is known to be idempotent. + * Note that when such an error occurs, there is no guarantee that the mutation has been applied server-side or not. + * Therefore, if a statement is {@link Statement#isIdempotent() not idempotent}, the driver will never retry it + * (this method won't even be called). * * @param statement the original query that failed. * @param cl the requested consistency level for the operation. diff --git a/manual/idempotence/README.md b/manual/idempotence/README.md index 8d75b1fc866..4a1118e5a0a 100644 --- a/manual/idempotence/README.md +++ b/manual/idempotence/README.md @@ -8,8 +8,9 @@ example: * `update my_table set list_col = [1] + list_col where pk = 1` is not idempotent: if `list_col` was initially empty, it will contain `[1]` after the first execution, `[1, 1]` after the second, etc. -Idempotence matters for [retries](../retries/) and [speculative query executions](../speculative_execution/). The -corresponding policies inspect the [Statement#isIdempotent()][isIdempotent] flag. +Idempotence matters for [retries](../retries/) and [speculative query executions](../speculative_execution/). The driver +will bypass those features if the [Statement#isIdempotent()][isIdempotent] flag is set to `false`, to ensure that the +statement does not get executed more than once. In most cases, you must set that flag manually. The driver does not parse query strings, so it can't infer it automatically (except for statements coming from the query builder, see below). @@ -121,9 +122,8 @@ clients' point of view, there were two operations: But overall the column changed from 1 to 2. There is no ordering of the two operations that can explain that change. We broke linearizability by doing a transparent retry at step 6. -To avoid this, the driver considers lightweight transactions as non-idempotent, and provides a -[retry policy](../retries/) that doesn't retry non-idempotent statements. If linearizability is important for you, you -should use that policy, and ensure that lightweight transactions are appropriately flagged. +If linearizability is important for you, you should ensure that lightweight transactions are appropriately flagged as +not idempotent. [isIdempotent]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/Statement.html#isIdempotent-- [setDefaultIdempotence]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/QueryOptions.html#setDefaultIdempotence-boolean- diff --git a/manual/retries/README.md b/manual/retries/README.md index bd03ea683d5..ed33ffd6713 100644 --- a/manual/retries/README.md +++ b/manual/retries/README.md @@ -79,7 +79,6 @@ the client. This is similar to `onReadTimeout`, but for write operations. The reason reads and writes are handled separately is because a read is obviously a non mutating operation, whereas a write is likely to be. If a write times out at the coordinator level, there is no way to know whether the mutation was applied or not on the non-answering replica. -Policy implementations are usually more conservative in `onWriteTimeout` in case the write was not idempotent. If the policy rethrows the error, the user code will get a [WriteTimeoutException]. @@ -98,9 +97,6 @@ happened. The possible exceptions are: high; the coordinator temporarily refuses writes for these replicas (see [hinted handoffs] in the Cassandra documentation). -In all these cases, there is no absolute certainty that the request was not applied by a host, so implementations must -consider the risk of retrying non-idempotent statements. - ### Hard-coded rules There are a few cases where retrying is always the right thing to do. These are not covered by `RetryPolicy`, but @@ -130,30 +126,24 @@ These include: ### Retries and idempotence -Retry policies should avoid retrying [idempotent queries] when it's not clear whether the query was applied or not. - -It's always safe to retry in `onReadTimeout`, since by definition we know that the query is a read, which doesn't mutate -any data. Similarly, `onUnavailable` is safe: the coordinator is telling us that it didn't find enough replicas, so we -know that it didn't try to apply the query. - -`onWriteTimeout` is not safe. The default retry policy is very conservative (it only retries batch log writes) so it -will never cause any issue. Custom implementations should check the statements' idempotent flag. - -`onRequestError` isn't safe either. - -**For historical reasons, the built-in retry policy implementations do not check the idempotent flag in -`onRequestError`**. This is based on the fact that: +If a query is [not idempotent][idempotence], the driver will not retry it if that could produce inconsistent results: -* previous versions of the driver (which did `onRequestError` internally) didn't check the flag either; -* the majority of queries in an application should be idempotent; -* statements start out as non-idempotent by default. +* retrying in `onReadTimeout` is always safe, since by definition this error indicates that the query was a read, which + didn't mutate any data; +* similarly, `onUnavailable` is safe: the coordinator is telling us that it didn't find enough replicas, so we know that + it didn't try to apply the query. +* `onWriteTimeout` is **not safe**: some replicas failed to reply to the coordinator in time, but they might still have + applied the mutation; +* `onRequestError` is **not safe** either: the query might have been applied before the error occurred. In particular, + an `OperationTimedOutException` could be caused by a network issue that prevented a successful response to come back + to the client. -If `onRequestError` enforced idempotence strictly, this would cause a lot of queries that were legitimately retried with -previous driver versions to not be retried anymore. This would be a big behavioral change, so we decided to prefer -consistency with previous versions. +Therefore, the driver does not retry after a write timeout or request error if the statement is not idempotent. This is +handled internally, the retry policy methods are not even invoked in those cases. -If you prefer strict handling of the idempotent flag, you can wrap your retry policy into an -[IdempotenceAwareRetryPolicy]. Make sure to position the flag properly on all statements. +Note that this behavior was introduced in version 3.1.0 of the driver. In previous versions, it was up to retry policy +implementations to handle idempotence (the new behavior is equivalent to what you achieved with +`IdempotenceAwareRetryPolicy` before). [RetryDecision]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/RetryPolicy.RetryDecision.html @@ -183,11 +173,10 @@ If you prefer strict handling of the idempotent flag, you can wrap your retry po [SyntaxError]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/exceptions/SyntaxError.html [AlreadyExistsException]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/exceptions/AlreadyExistsException.html [TruncateException]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/exceptions/TruncateException.html -[IdempotenceAwareRetryPolicy]: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/policies/IdempotenceAwareRetryPolicy.html [query plan]: ../load_balancing/#query-plan [connection pool]: ../pooling/ [prepared]: ../statements/prepared/#preparing-on-multiple-nodes [driver read timeout]: ../socket_options/#driver-read-timeout [hinted handoffs]: https://docs.datastax.com/en/cassandra/2.1/cassandra/dml/dml_about_hh_c.html?scroll=concept_ds_ifg_jqx_zj__performance -[idempotent queries]: ../idempotence/ \ No newline at end of file +[idempotence]: ../idempotence/ \ No newline at end of file diff --git a/upgrade_guide/README.md b/upgrade_guide/README.md index 34bd2f4a14a..c730eb109bc 100644 --- a/upgrade_guide/README.md +++ b/upgrade_guide/README.md @@ -3,6 +3,38 @@ The purpose of this guide is to detail changes made by successive versions of the Java driver. +### 3.1.0 + +This version introduces an important change in the default retry behavior: statements that are not idempotent are not +always retried automatically anymore. + +Prior to 2.1.10, idempotence was not considered for retries. This exposed applications to the risk of applying a +non-idempotent statement twice (counter increment, list append...), or to more subtle bugs with lightweight transactions +(see [JAVA-819](https://datastax-oss.atlassian.net/browse/JAVA-819)). + +In 2.1.10 / 3.0.x, we introduced `IdempotenceAwareRetryPolicy`, which considers the `Statement#isIdempotent()` in the +retry decision process. However, for consistency with previous versions, this policy was not enabled by default (in +particular because statements are non-idempotent by default, and we didn't want applications to suddenly stop retrying +queries that were retried before). + +In 3.1.0, the default is now to **not retry** after a write timeout or request error if the statement is not idempotent. +This is handled internally, the retry policy methods are not even invoked in those cases (and therefore +`IdempotenceAwareRetryPolicy` has been deprecated). See the manual section about [retries](../manual/retries/) for more +information. + +In practice, here's what upgrading to 3.1.0 means for you: + +* if you were already handling idempotence in your application, there won't be any change, but you can stop wrapping + your retry policy with `IdempotenceAwareRetryPolicy`; +* otherwise, you might want to review how your code positions the `setIdempotent` flag on statements. In most cases the + driver can't compute in automatically (because it doesn't parse query strings), so it takes a conservative approach + and sets it to `false` by default. If you know the query is idempotent, you should set it to `true` manually. See the + [query idempotence](../manual/idempotence/) section of the manual. + +The driver logs a warning the first time it ignores a non-idempotent request; this warning will be removed in version +3.2.0. + + ### 3.0.0 This version brings parity with Cassandra 2.2 and 3.0. From 9c546ab992c95d61575bfb0721462a5451c59fd7 Mon Sep 17 00:00:00 2001 From: olim7t Date: Mon, 20 Jun 2016 17:03:08 -0700 Subject: [PATCH 2/2] Extract constants for invariable retry decisions These methods always return the same result. Extract constants to avoid building a new object each time (even though the JVM probably optimized it eventually). --- .../com/datastax/driver/core/policies/RetryPolicy.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/RetryPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/RetryPolicy.java index 01d91a3cd91..a894aba5344 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/RetryPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/RetryPolicy.java @@ -41,6 +41,10 @@ public interface RetryPolicy { * */ class RetryDecision { + + private static final RetryDecision RETHROW_DECISION = new RetryDecision(Type.RETHROW, null, true); + private static final RetryDecision IGNORE_DECISION = new RetryDecision(Type.IGNORE, null, true); + /** * The types of retry decisions. */ @@ -99,7 +103,7 @@ public boolean isRetryCurrent() { * @return a {@link RetryDecision.Type#RETHROW} retry decision. */ public static RetryDecision rethrow() { - return new RetryDecision(Type.RETHROW, null, true); + return RETHROW_DECISION; } /** @@ -128,7 +132,7 @@ public static RetryDecision retry(ConsistencyLevel consistency) { * @return an {@link RetryDecision.Type#IGNORE} retry decision. */ public static RetryDecision ignore() { - return new RetryDecision(Type.IGNORE, null, true); + return IGNORE_DECISION; } /**