Skip to content

Commit c8d0cb3

Browse files
Kevin Gallardoolim7t
authored andcommitted
Added PagingState API for client manual paging. (JAVA-550)
1 parent 6a06156 commit c8d0cb3

9 files changed

Lines changed: 571 additions & 21 deletions

File tree

driver-core/CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ CHANGELOG
2727
(JAVA-562)
2828
- [improvement] Rename threads to indicate that they are for the driver
2929
(JAVA-583)
30+
- [new feature] Expose paging state (JAVA-550)
3031

3132
Merged from 2.0.9_fixes branch:
3233

driver-core/src/main/java/com/datastax/driver/core/ArrayBackedResultSet.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ static ArrayBackedResultSet fromMessage(Responses.Result msg, SessionManager ses
7474
// this explicitly because MultiPage implementation don't support info == null.
7575
assert r.metadata.pagingState == null || info != null;
7676
return r.metadata.pagingState == null
77-
? new SinglePage(columnDefs, tokenFactory, r.data, info)
78-
: new MultiPage(columnDefs, tokenFactory, r.data, info, r.metadata.pagingState, session, statement);
77+
? new SinglePage(columnDefs, tokenFactory, r.data, info)
78+
: new MultiPage(columnDefs, tokenFactory, r.data, info, r.metadata.pagingState, session, statement);
7979

8080
case SET_KEYSPACE:
8181
case SCHEMA_CHANGE:
@@ -228,7 +228,7 @@ private MultiPage(ColumnDefinitions metadata,
228228
// that this will never change, so apply the generic check by peeking at the first row.
229229
super(metadata, tokenFactory, rows.peek());
230230
this.currentPage = rows;
231-
this.infos.offer(info);
231+
this.infos.offer(info.withPagingState(pagingState).withStatement(statement));
232232

233233
this.fetchState = new FetchingState(pagingState, null);
234234
this.session = session;
@@ -322,9 +322,10 @@ public void onSet(Connection connection, Message.Response response, ExecutionInf
322322
case RESULT:
323323
Responses.Result rm = (Responses.Result)response;
324324
info = update(info, rm, MultiPage.this.session);
325-
326325
if (rm.kind == Responses.Result.Kind.ROWS) {
327326
Responses.Result.Rows rows = (Responses.Result.Rows)rm;
327+
if (rows.metadata.pagingState != null)
328+
info = info.withPagingState(rows.metadata.pagingState).withStatement(statement);
328329
MultiPage.this.nextPages.offer(rows.data);
329330
MultiPage.this.fetchState = rows.metadata.pagingState == null ? null : new FetchingState(rows.metadata.pagingState, null);
330331
} else if (rm.kind == Responses.Result.Kind.VOID) {

driver-core/src/main/java/com/datastax/driver/core/ExecutionInfo.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.driver.core;
1717

18+
import java.nio.ByteBuffer;
1819
import java.util.List;
1920

2021
/**
@@ -32,28 +33,39 @@
3233
* query.</li>
3334
* </ul>
3435
*/
35-
public class ExecutionInfo
36-
{
36+
public class ExecutionInfo {
3737
private final List<Host> triedHosts;
3838
private final ConsistencyLevel achievedConsistency;
3939
private final QueryTrace trace;
40+
private final ByteBuffer pagingState;
41+
private final Statement statement;
4042

41-
private ExecutionInfo(List<Host> triedHosts, ConsistencyLevel achievedConsistency, QueryTrace trace) {
43+
private ExecutionInfo(List<Host> triedHosts, ConsistencyLevel achievedConsistency, QueryTrace trace, ByteBuffer pagingState, Statement statement) {
4244
this.triedHosts = triedHosts;
4345
this.achievedConsistency = achievedConsistency;
4446
this.trace = trace;
47+
this.pagingState = pagingState;
48+
this.statement = statement;
4549
}
4650

4751
ExecutionInfo(List<Host> triedHosts) {
48-
this(triedHosts, null, null);
52+
this(triedHosts, null, null, null, null);
4953
}
5054

5155
ExecutionInfo withTrace(QueryTrace newTrace) {
52-
return new ExecutionInfo(triedHosts, achievedConsistency, newTrace);
56+
return new ExecutionInfo(triedHosts, achievedConsistency, newTrace, pagingState, statement);
5357
}
5458

5559
ExecutionInfo withAchievedConsistency(ConsistencyLevel newConsistency) {
56-
return new ExecutionInfo(triedHosts, newConsistency, trace);
60+
return new ExecutionInfo(triedHosts, newConsistency, trace, pagingState, statement);
61+
}
62+
63+
ExecutionInfo withPagingState(ByteBuffer pagingState) {
64+
return new ExecutionInfo(triedHosts, achievedConsistency, trace, pagingState, statement);
65+
}
66+
67+
ExecutionInfo withStatement(Statement statement) {
68+
return new ExecutionInfo(triedHosts, achievedConsistency, trace, pagingState, statement);
5769
}
5870

5971
/**
@@ -119,4 +131,20 @@ public ConsistencyLevel getAchievedConsistencyLevel() {
119131
public QueryTrace getQueryTrace() {
120132
return trace;
121133
}
134+
135+
/**
136+
* The paging state of the query.
137+
*
138+
* This object represents the next page to be fetched if this query is
139+
* multi page. It can be saved and reused later on the same statement.
140+
*
141+
* @return the paging state or null if there is no next page.
142+
*
143+
* @see Statement#setPagingState(PagingState)
144+
*/
145+
public PagingState getPagingState() {
146+
if (this.pagingState == null)
147+
return null;
148+
return new PagingState(this.pagingState, this.statement);
149+
}
122150
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Copyright (C) 2012-2015 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.datastax.driver.core;
18+
19+
import java.nio.ByteBuffer;
20+
import java.security.MessageDigest;
21+
import java.security.NoSuchAlgorithmException;
22+
import java.util.Arrays;
23+
24+
import com.datastax.driver.core.exceptions.InvalidQueryException;
25+
import com.datastax.driver.core.exceptions.PagingStateException;
26+
import com.datastax.driver.core.utils.Bytes;
27+
28+
/**
29+
* The paging state of a query.
30+
*
31+
* This object represents the next page to be fetched if the query is
32+
* multi page. It can be saved and reused later on the same statement.
33+
*
34+
* The PagingState can be serialized and deserialized either as a String
35+
* or as a byte array.
36+
*
37+
* @see Statement#setPagingState(PagingState)
38+
*/
39+
public class PagingState {
40+
41+
private byte[] pagingState;
42+
private byte[] hash;
43+
44+
PagingState(ByteBuffer pagingState, Statement statement) {
45+
this.pagingState = Bytes.getArray(pagingState);
46+
this.hash = hash(statement);
47+
}
48+
49+
private PagingState(byte[] complete) {
50+
// Check the sizes in the beginning of the buffer, otherwise we cannot build the paging state object
51+
ByteBuffer pagingStateBB = ByteBuffer.wrap(complete);
52+
int pagingSize = pagingStateBB.getShort();
53+
int hashSize = pagingStateBB.getShort();
54+
if (pagingSize + hashSize != pagingStateBB.remaining()) {
55+
throw new PagingStateException("Cannot deserialize paging state, invalid format. "
56+
+ "The serialized form was corrupted, or not initially generated from a PagingState object.");
57+
}
58+
this.pagingState = extractPagingState(complete);
59+
this.hash = extractHashDigest(complete);
60+
}
61+
62+
private byte[] hash(Statement statement) {
63+
byte[] digest;
64+
ByteBuffer[] values;
65+
MessageDigest md;
66+
assert !(statement instanceof BatchStatement);
67+
try {
68+
md = MessageDigest.getInstance("MD5");
69+
if (statement instanceof BoundStatement) {
70+
BoundStatement bs = ((BoundStatement)statement);
71+
md.update(bs.preparedStatement().getQueryString().getBytes());
72+
values = bs.values;
73+
} else {
74+
//it is a RegularStatement since Batch statements are not allowed
75+
RegularStatement rs = (RegularStatement)statement;
76+
md.update(rs.getQueryString().getBytes());
77+
values = rs.getValues();
78+
}
79+
if (values != null) {
80+
for (ByteBuffer value : values) {
81+
md.update(value.duplicate());
82+
}
83+
}
84+
md.update(this.pagingState);
85+
digest = md.digest();
86+
87+
} catch (NoSuchAlgorithmException e) {
88+
throw new RuntimeException("MD5 doesn't seem to be available on this JVM", e);
89+
}
90+
return digest;
91+
}
92+
93+
boolean matches(Statement statement) {
94+
byte[] toTest = hash(statement);
95+
return Arrays.equals(toTest, this.hash);
96+
}
97+
98+
private static byte[] extractPagingState(byte[] complete) {
99+
ByteBuffer completeBB = ByteBuffer.wrap(complete);
100+
short size = completeBB.getShort();
101+
byte[] array = new byte[size];
102+
// Go forward of 2 bytes
103+
completeBB.getShort();
104+
completeBB.get(array);
105+
return array;
106+
}
107+
108+
private static byte[] extractHashDigest(byte[] complete) {
109+
ByteBuffer completeBB = ByteBuffer.wrap(complete);
110+
short pagingSize = completeBB.getShort();
111+
short hashSize = completeBB.getShort();
112+
completeBB.position(pagingSize + 4);
113+
byte[] array = new byte[hashSize];
114+
completeBB.get(array);
115+
return array;
116+
}
117+
118+
private ByteBuffer generateCompleteOutput() {
119+
ByteBuffer res = ByteBuffer.allocate(pagingState.length + hash.length + 4);
120+
121+
res.putShort((short)pagingState.length);
122+
res.putShort((short)hash.length);
123+
124+
res.put(pagingState);
125+
res.put(hash);
126+
127+
res.rewind();
128+
129+
return res;
130+
}
131+
132+
ByteBuffer getRawState() {
133+
return ByteBuffer.wrap(this.pagingState);
134+
}
135+
136+
@Override
137+
public String toString() {
138+
return Bytes.toRawHexString(generateCompleteOutput());
139+
}
140+
141+
/**
142+
* Create a PagingState object from a string previously generated with {@link #toString()}.
143+
*
144+
* @param string the string value.
145+
* @return the PagingState object created.
146+
*
147+
* @throws PagingStateException if the string does not have the correct format.
148+
*/
149+
public static PagingState fromString(String string) {
150+
try {
151+
byte[] complete = Bytes.fromRawHexString(string, 0);
152+
return new PagingState(complete);
153+
} catch (Exception e) {
154+
throw new PagingStateException("Cannot deserialize paging state, invalid format. "
155+
+ "The serialized form was corrupted, or not initially generated from a PagingState object.", e);
156+
}
157+
}
158+
159+
/**
160+
* Return a representation of the paging state object as a byte array.
161+
*
162+
* @return the paging state as a byte array.
163+
*/
164+
public byte[] toBytes() {
165+
return generateCompleteOutput().array();
166+
}
167+
168+
/**
169+
* Create a PagingState object from a byte array previously generated with {@link #toBytes()}.
170+
*
171+
* @param pagingState The byte array representation.
172+
* @return the PagingState object created.
173+
*
174+
* @throws PagingStateException if the byte array does not have the correct format.
175+
*/
176+
public static PagingState fromBytes(byte[] pagingState) {
177+
return new PagingState(pagingState);
178+
}
179+
}

driver-core/src/main/java/com/datastax/driver/core/SessionManager.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31-
3231
import com.datastax.driver.core.exceptions.DriverInternalError;
3332
import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
3433
import com.datastax.driver.core.policies.LoadBalancingPolicy;
@@ -327,8 +326,8 @@ public Boolean call() {
327326
CloseFuture removePool(Host host) {
328327
final HostConnectionPool pool = pools.remove(host);
329328
return pool == null
330-
? CloseFuture.immediateFuture()
331-
: pool.closeAsync();
329+
? CloseFuture.immediateFuture()
330+
: pool.closeAsync();
332331
}
333332

334333
/*
@@ -451,6 +450,7 @@ Message.Request makeRequestMessage(Statement statement, ByteBuffer pagingState)
451450
Message.Request makeRequestMessage(Statement statement, ConsistencyLevel cl, ConsistencyLevel scl, ByteBuffer pagingState) {
452451
int protoVersion = cluster.manager.protocolVersion();
453452
int fetchSize = statement.getFetchSize();
453+
ByteBuffer usedPagingState = pagingState;
454454

455455
if (protoVersion == 1) {
456456
assert pagingState == null;
@@ -467,6 +467,10 @@ else if (fetchSize != Integer.MAX_VALUE)
467467
if (fetchSize == Integer.MAX_VALUE)
468468
fetchSize = -1;
469469

470+
if (pagingState == null) {
471+
usedPagingState = statement.getPagingState();
472+
}
473+
470474
if (statement instanceof RegularStatement) {
471475
RegularStatement rs = (RegularStatement)statement;
472476

@@ -483,12 +487,12 @@ else if (fetchSize != Integer.MAX_VALUE)
483487

484488
List<ByteBuffer> values = rawValues == null ? Collections.<ByteBuffer>emptyList() : Arrays.asList(rawValues);
485489
String qString = rs.getQueryString();
486-
Requests.QueryProtocolOptions options = new Requests.QueryProtocolOptions(cl, values, false, fetchSize, pagingState, scl);
490+
Requests.QueryProtocolOptions options = new Requests.QueryProtocolOptions(cl, values, false, fetchSize, usedPagingState, scl);
487491
return new Requests.Query(qString, options);
488492
} else if (statement instanceof BoundStatement) {
489493
BoundStatement bs = (BoundStatement)statement;
490494
boolean skipMetadata = protoVersion != 1 && bs.statement.getPreparedId().resultSetMetadata != null;
491-
Requests.QueryProtocolOptions options = new Requests.QueryProtocolOptions(cl, Arrays.asList(bs.values), skipMetadata, fetchSize, pagingState, scl);
495+
Requests.QueryProtocolOptions options = new Requests.QueryProtocolOptions(cl, Arrays.asList(bs.values), skipMetadata, fetchSize, usedPagingState, scl);
492496
return new Requests.Execute(bs.statement.getPreparedId().id, options);
493497
} else {
494498
assert statement instanceof BatchStatement : statement;

0 commit comments

Comments
 (0)