-
Notifications
You must be signed in to change notification settings - Fork 885
JAVA-1212: Don't retry non-idempotent statements by default. #696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| */ | ||
| class RequestHandler { | ||
| private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class); | ||
| private static final AtomicBoolean WARNED_IDEMPOTENT = new AtomicBoolean(); | ||
|
|
||
| final String id; | ||
|
|
||
|
|
@@ -353,6 +354,38 @@ private void write(Connection connection, Connection.ResponseCallback responseCa | |
| connection.release(); | ||
| } | ||
|
|
||
| private RetryPolicy.RetryDecision computeRetryDecisionOnRequestError(DriverException exception) { | ||
| RetryPolicy.RetryDecision decision; | ||
| if (statement.isIdempotentWithDefault(manager.cluster.getConfiguration().getQueryOptions())) { | ||
| decision = retryPolicy().onRequestError(statement, request().consistency(), exception, retriesByPolicy); | ||
| } else { | ||
| logIdempotenceWarning(); | ||
| decision = RetryPolicy.RetryDecision.rethrow(); | ||
| } | ||
| if (metricsEnabled()) { | ||
| if (exception instanceof OperationTimedOutException) { | ||
| metrics().getErrorMetrics().getClientTimeouts().inc(); | ||
| if (decision.getType() == Type.RETRY) | ||
| metrics().getErrorMetrics().getRetriesOnClientTimeout().inc(); | ||
| if (decision.getType() == Type.IGNORE) | ||
| metrics().getErrorMetrics().getIgnoresOnClientTimeout().inc(); | ||
| } else if (exception instanceof ConnectionException) { | ||
| metrics().getErrorMetrics().getConnectionErrors().inc(); | ||
| if (decision.getType() == Type.RETRY) | ||
| metrics().getErrorMetrics().getRetriesOnConnectionError().inc(); | ||
| if (decision.getType() == Type.IGNORE) | ||
| metrics().getErrorMetrics().getIgnoresOnConnectionError().inc(); | ||
| } else { | ||
| metrics().getErrorMetrics().getOthers().inc(); | ||
| if (decision.getType() == Type.RETRY) | ||
| metrics().getErrorMetrics().getRetriesOnOtherErrors().inc(); | ||
| if (decision.getType() == Type.IGNORE) | ||
| metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc(); | ||
| } | ||
| } | ||
| return decision; | ||
| } | ||
|
|
||
| private void processRetryDecision(RetryPolicy.RetryDecision retryDecision, Connection connection, Exception exceptionToReport) { | ||
| switch (retryDecision.getType()) { | ||
| case RETRY: | ||
|
|
@@ -490,12 +523,17 @@ public void onSet(Connection connection, Message.Response response, long latency | |
| connection.release(); | ||
| assert err.infos instanceof WriteTimeoutException; | ||
| WriteTimeoutException wte = (WriteTimeoutException) err.infos; | ||
| retry = retryPolicy.onWriteTimeout(statement, | ||
| wte.getConsistencyLevel(), | ||
| wte.getWriteType(), | ||
| wte.getRequiredAcknowledgements(), | ||
| wte.getReceivedAcknowledgements(), | ||
| retriesByPolicy); | ||
| if (statement.isIdempotentWithDefault(manager.cluster.getConfiguration().getQueryOptions())) | ||
| retry = retryPolicy.onWriteTimeout(statement, | ||
| wte.getConsistencyLevel(), | ||
| wte.getWriteType(), | ||
| wte.getRequiredAcknowledgements(), | ||
| wte.getReceivedAcknowledgements(), | ||
| retriesByPolicy); | ||
| else { | ||
| logIdempotenceWarning(); | ||
| retry = RetryPolicy.RetryDecision.rethrow(); | ||
| } | ||
| if (metricsEnabled()) { | ||
| metrics().getErrorMetrics().getWriteTimeouts().inc(); | ||
| if (retry.getType() == Type.RETRY) | ||
|
|
@@ -525,33 +563,15 @@ public void onSet(Connection connection, Message.Response response, long latency | |
| connection.release(); | ||
| assert exceptionToReport instanceof OverloadedException; | ||
| logger.warn("Host {} is overloaded.", connection.address); | ||
| retry = retryPolicy.onRequestError(statement, | ||
| request().consistency(), | ||
| (OverloadedException) exceptionToReport, retriesByPolicy); | ||
| if (metricsEnabled()) { | ||
| metrics().getErrorMetrics().getOthers().inc(); | ||
| if (retry.getType() == Type.RETRY) | ||
| metrics().getErrorMetrics().getRetriesOnOtherErrors().inc(); | ||
| if (retry.getType() == Type.IGNORE) | ||
| metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc(); | ||
| } | ||
| retry = computeRetryDecisionOnRequestError((OverloadedException) exceptionToReport); | ||
| break; | ||
| case SERVER_ERROR: | ||
| connection.release(); | ||
| assert exceptionToReport instanceof ServerError; | ||
| logger.warn("{} replied with server error ({}), defuncting connection.", connection.address, err.message); | ||
| // Defunct connection | ||
| connection.defunct(exceptionToReport); | ||
| retry = retryPolicy.onRequestError(statement, | ||
| request().consistency(), | ||
| (ServerError) exceptionToReport, retriesByPolicy); | ||
| if (metricsEnabled()) { | ||
| metrics().getErrorMetrics().getOthers().inc(); | ||
| if (retry.getType() == Type.RETRY) | ||
| metrics().getErrorMetrics().getRetriesOnOtherErrors().inc(); | ||
| if (retry.getType() == Type.IGNORE) | ||
| metrics().getErrorMetrics().getIgnoresOnOtherErrors().inc(); | ||
| } | ||
| retry = computeRetryDecisionOnRequestError((ServerError) exceptionToReport); | ||
| break; | ||
| case IS_BOOTSTRAPPING: | ||
| connection.release(); | ||
|
|
@@ -716,15 +736,7 @@ public void onException(Connection connection, Exception exception, long latency | |
| connection.release(); | ||
|
|
||
| if (exception instanceof ConnectionException) { | ||
| RetryPolicy retryPolicy = retryPolicy(); | ||
| RetryPolicy.RetryDecision decision = retryPolicy.onRequestError(statement, request().consistency(), (ConnectionException) exception, retriesByPolicy); | ||
| if (metricsEnabled()) { | ||
| metrics().getErrorMetrics().getConnectionErrors().inc(); | ||
| if (decision.getType() == Type.RETRY) | ||
| metrics().getErrorMetrics().getRetriesOnConnectionError().inc(); | ||
| if (decision.getType() == Type.IGNORE) | ||
| metrics().getErrorMetrics().getIgnoresOnConnectionError().inc(); | ||
| } | ||
| RetryPolicy.RetryDecision decision = computeRetryDecisionOnRequestError((ConnectionException) exception); | ||
| processRetryDecision(decision, connection, exception); | ||
| return; | ||
| } | ||
|
|
@@ -755,15 +767,7 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) { | |
| try { | ||
| connection.release(); | ||
|
|
||
| RetryPolicy retryPolicy = retryPolicy(); | ||
| RetryPolicy.RetryDecision decision = retryPolicy.onRequestError(statement, request().consistency(), timeoutException, retriesByPolicy); | ||
| if (metricsEnabled()) { | ||
| metrics().getErrorMetrics().getClientTimeouts().inc(); | ||
| if (decision.getType() == Type.RETRY) | ||
| metrics().getErrorMetrics().getRetriesOnClientTimeout().inc(); | ||
| if (decision.getType() == Type.IGNORE) | ||
| metrics().getErrorMetrics().getIgnoresOnClientTimeout().inc(); | ||
| } | ||
| RetryPolicy.RetryDecision decision = computeRetryDecisionOnRequestError(timeoutException); | ||
| processRetryDecision(decision, connection, timeoutException); | ||
| } catch (Exception e) { | ||
| // This shouldn't happen, but if it does, we want to signal the callback, not let it hang indefinitely | ||
|
|
@@ -789,6 +793,15 @@ private void setFinalResult(Connection connection, Message.Response response) { | |
| } | ||
| } | ||
|
|
||
| private void logIdempotenceWarning() { | ||
| if (WARNED_IDEMPOTENT.compareAndSet(false, true)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about establishing a common logger implementation for messages like this? This way users can easily suppress warnings like this globally for all changes. Some users pay very close attention to warnings and try their best to suppress all the ones that doesn't affect their application health. |
||
| logger.warn("Not retrying statement because it is not idempotent (this message will be logged only once). " + | ||
| "Note that this version of the driver changes the default retry behavior for non-idempotent " + | ||
| "statements: they won't be automatically retried anymore. The driver marks statements " + | ||
| "non-idempotent by default, so you should explicitly call setIdempotent(true) if your statements " + | ||
| "are safe to retry. See http://goo.gl/4HrSby for more details."); | ||
| } | ||
|
|
||
| /** | ||
| * The state of a SpeculativeExecution. | ||
| * <p/> | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we get rid of
IdempotenceAwareRetryPolicysince we now consider idempotence outside of Retry policy?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, it was marked deprecated.