2020import com .datastax .dse .driver .api .core .metrics .DseSessionMetric ;
2121import com .datastax .dse .driver .internal .core .cql .DseConversions ;
2222import com .datastax .dse .protocol .internal .response .result .DseRowsMetadata ;
23+ import com .datastax .oss .driver .api .core .config .DriverExecutionProfile ;
2324import com .datastax .oss .driver .api .core .cql .ColumnDefinitions ;
2425import com .datastax .oss .driver .api .core .cql .ExecutionInfo ;
2526import com .datastax .oss .driver .api .core .cql .Row ;
2627import com .datastax .oss .driver .api .core .cql .Statement ;
2728import com .datastax .oss .driver .api .core .metrics .DefaultNodeMetric ;
2829import com .datastax .oss .driver .api .core .metrics .DefaultSessionMetric ;
2930import com .datastax .oss .driver .internal .core .context .InternalDriverContext ;
31+ import com .datastax .oss .driver .internal .core .cql .Conversions ;
3032import com .datastax .oss .driver .internal .core .cql .DefaultRow ;
3133import com .datastax .oss .driver .internal .core .session .DefaultSession ;
3234import com .datastax .oss .driver .internal .core .util .CountingIterator ;
4547 */
4648@ ThreadSafe
4749public class ContinuousCqlRequestHandler
48- extends ContinuousRequestHandlerBase <Statement , ContinuousAsyncResultSet > {
49-
50- private final Message message ;
51- private final Duration firstPageTimeout ;
52- private final Duration otherPagesTimeout ;
53- private final int maxEnqueuedPages ;
54- private final int maxPages ;
50+ extends ContinuousRequestHandlerBase <Statement <?>, ContinuousAsyncResultSet > {
5551
5652 ContinuousCqlRequestHandler (
5753 @ NonNull Statement <?> statement ,
@@ -68,14 +64,6 @@ public class ContinuousCqlRequestHandler
6864 DefaultSessionMetric .CQL_CLIENT_TIMEOUTS ,
6965 DseSessionMetric .CONTINUOUS_CQL_REQUESTS ,
7066 DefaultNodeMetric .CQL_MESSAGES );
71- message = DseConversions .toContinuousPagingMessage (statement , executionProfile , context );
72- firstPageTimeout =
73- executionProfile .getDuration (DseDriverOption .CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE );
74- otherPagesTimeout =
75- executionProfile .getDuration (DseDriverOption .CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES );
76- maxEnqueuedPages =
77- executionProfile .getInt (DseDriverOption .CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES );
78- maxPages = executionProfile .getInt (DseDriverOption .CONTINUOUS_PAGING_MAX_PAGES );
7967 // NOTE that ordering of the following statement matters.
8068 // We should register this request after all fields have been initialized.
8169 throttler .register (this );
@@ -89,40 +77,54 @@ protected Duration getGlobalTimeout() {
8977
9078 @ NonNull
9179 @ Override
92- protected Duration getPageTimeout (int pageNumber ) {
93- return pageNumber == 1 ? firstPageTimeout : otherPagesTimeout ;
80+ protected Duration getPageTimeout (@ NonNull Statement <?> statement , int pageNumber ) {
81+ DriverExecutionProfile executionProfile =
82+ Conversions .resolveExecutionProfile (statement , context );
83+ if (pageNumber == 1 ) {
84+ return executionProfile .getDuration (DseDriverOption .CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE );
85+ } else {
86+ return executionProfile .getDuration (DseDriverOption .CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES );
87+ }
9488 }
9589
9690 @ NonNull
9791 @ Override
98- protected Duration getReviseRequestTimeout () {
99- return otherPagesTimeout ;
92+ protected Duration getReviseRequestTimeout (@ NonNull Statement <?> statement ) {
93+ DriverExecutionProfile executionProfile =
94+ Conversions .resolveExecutionProfile (statement , context );
95+ return executionProfile .getDuration (DseDriverOption .CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES );
10096 }
10197
10298 @ Override
103- protected int getMaxEnqueuedPages () {
104- return maxEnqueuedPages ;
99+ protected int getMaxEnqueuedPages (@ NonNull Statement <?> statement ) {
100+ DriverExecutionProfile executionProfile =
101+ Conversions .resolveExecutionProfile (statement , context );
102+ return executionProfile .getInt (DseDriverOption .CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES );
105103 }
106104
107105 @ Override
108- protected int getMaxPages () {
109- return maxPages ;
106+ protected int getMaxPages (@ NonNull Statement <?> statement ) {
107+ DriverExecutionProfile executionProfile =
108+ Conversions .resolveExecutionProfile (statement , context );
109+ return executionProfile .getInt (DseDriverOption .CONTINUOUS_PAGING_MAX_PAGES );
110110 }
111111
112112 @ NonNull
113113 @ Override
114- protected Message getMessage () {
115- return message ;
114+ protected Message getMessage (@ NonNull Statement <?> statement ) {
115+ DriverExecutionProfile executionProfile =
116+ Conversions .resolveExecutionProfile (statement , context );
117+ return DseConversions .toContinuousPagingMessage (statement , executionProfile , context );
116118 }
117119
118120 @ Override
119- protected boolean isTracingEnabled () {
121+ protected boolean isTracingEnabled (@ NonNull Statement <?> statement ) {
120122 return false ;
121123 }
122124
123125 @ NonNull
124126 @ Override
125- protected Map <String , ByteBuffer > createPayload () {
127+ protected Map <String , ByteBuffer > createPayload (@ NonNull Statement <?> statement ) {
126128 return statement .getCustomPayload ();
127129 }
128130
@@ -135,6 +137,7 @@ protected ContinuousAsyncResultSet createEmptyResultSet(@NonNull ExecutionInfo e
135137 @ NonNull
136138 @ Override
137139 protected DefaultContinuousAsyncResultSet createResultSet (
140+ @ NonNull Statement <?> statement ,
138141 @ NonNull Rows rows ,
139142 @ NonNull ExecutionInfo executionInfo ,
140143 @ NonNull ColumnDefinitions columnDefinitions ) {
0 commit comments