Skip to content

Commit 6ec72c4

Browse files
committed
JAVA-2900: Allow the request to retry to be customized by the retry policy
1 parent ad3db9c commit 6ec72c4

22 files changed

Lines changed: 876 additions & 469 deletions

File tree

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandler.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import com.datastax.dse.driver.api.core.metrics.DseSessionMetric;
2121
import com.datastax.dse.driver.internal.core.cql.DseConversions;
2222
import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata;
23+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2324
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
2425
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
2526
import com.datastax.oss.driver.api.core.cql.Row;
2627
import com.datastax.oss.driver.api.core.cql.Statement;
2728
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
2829
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
2930
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
31+
import com.datastax.oss.driver.internal.core.cql.Conversions;
3032
import com.datastax.oss.driver.internal.core.cql.DefaultRow;
3133
import com.datastax.oss.driver.internal.core.session.DefaultSession;
3234
import com.datastax.oss.driver.internal.core.util.CountingIterator;
@@ -45,13 +47,7 @@
4547
*/
4648
@ThreadSafe
4749
public class ContinuousCqlRequestHandler
48-
extends ContinuousRequestHandlerBase<Statement, ContinuousAsyncResultSet> {
49-
50-
private final Message message;
51-
private final Duration firstPageTimeout;
52-
private final Duration otherPagesTimeout;
53-
private final int maxEnqueuedPages;
54-
private final int maxPages;
50+
extends ContinuousRequestHandlerBase<Statement<?>, ContinuousAsyncResultSet> {
5551

5652
ContinuousCqlRequestHandler(
5753
@NonNull Statement<?> statement,
@@ -68,14 +64,6 @@ public class ContinuousCqlRequestHandler
6864
DefaultSessionMetric.CQL_CLIENT_TIMEOUTS,
6965
DseSessionMetric.CONTINUOUS_CQL_REQUESTS,
7066
DefaultNodeMetric.CQL_MESSAGES);
71-
message = DseConversions.toContinuousPagingMessage(statement, executionProfile, context);
72-
firstPageTimeout =
73-
executionProfile.getDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE);
74-
otherPagesTimeout =
75-
executionProfile.getDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES);
76-
maxEnqueuedPages =
77-
executionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES);
78-
maxPages = executionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES);
7967
// NOTE that ordering of the following statement matters.
8068
// We should register this request after all fields have been initialized.
8169
throttler.register(this);
@@ -89,40 +77,54 @@ protected Duration getGlobalTimeout() {
8977

9078
@NonNull
9179
@Override
92-
protected Duration getPageTimeout(int pageNumber) {
93-
return pageNumber == 1 ? firstPageTimeout : otherPagesTimeout;
80+
protected Duration getPageTimeout(@NonNull Statement<?> statement, int pageNumber) {
81+
DriverExecutionProfile executionProfile =
82+
Conversions.resolveExecutionProfile(statement, context);
83+
if (pageNumber == 1) {
84+
return executionProfile.getDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE);
85+
} else {
86+
return executionProfile.getDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES);
87+
}
9488
}
9589

9690
@NonNull
9791
@Override
98-
protected Duration getReviseRequestTimeout() {
99-
return otherPagesTimeout;
92+
protected Duration getReviseRequestTimeout(@NonNull Statement<?> statement) {
93+
DriverExecutionProfile executionProfile =
94+
Conversions.resolveExecutionProfile(statement, context);
95+
return executionProfile.getDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES);
10096
}
10197

10298
@Override
103-
protected int getMaxEnqueuedPages() {
104-
return maxEnqueuedPages;
99+
protected int getMaxEnqueuedPages(@NonNull Statement<?> statement) {
100+
DriverExecutionProfile executionProfile =
101+
Conversions.resolveExecutionProfile(statement, context);
102+
return executionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES);
105103
}
106104

107105
@Override
108-
protected int getMaxPages() {
109-
return maxPages;
106+
protected int getMaxPages(@NonNull Statement<?> statement) {
107+
DriverExecutionProfile executionProfile =
108+
Conversions.resolveExecutionProfile(statement, context);
109+
return executionProfile.getInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES);
110110
}
111111

112112
@NonNull
113113
@Override
114-
protected Message getMessage() {
115-
return message;
114+
protected Message getMessage(@NonNull Statement<?> statement) {
115+
DriverExecutionProfile executionProfile =
116+
Conversions.resolveExecutionProfile(statement, context);
117+
return DseConversions.toContinuousPagingMessage(statement, executionProfile, context);
116118
}
117119

118120
@Override
119-
protected boolean isTracingEnabled() {
121+
protected boolean isTracingEnabled(@NonNull Statement<?> statement) {
120122
return false;
121123
}
122124

123125
@NonNull
124126
@Override
125-
protected Map<String, ByteBuffer> createPayload() {
127+
protected Map<String, ByteBuffer> createPayload(@NonNull Statement<?> statement) {
126128
return statement.getCustomPayload();
127129
}
128130

@@ -135,6 +137,7 @@ protected ContinuousAsyncResultSet createEmptyResultSet(@NonNull ExecutionInfo e
135137
@NonNull
136138
@Override
137139
protected DefaultContinuousAsyncResultSet createResultSet(
140+
@NonNull Statement<?> statement,
138141
@NonNull Rows rows,
139142
@NonNull ExecutionInfo executionInfo,
140143
@NonNull ColumnDefinitions columnDefinitions) {

0 commit comments

Comments
 (0)