Skip to content

Commit 215b8de

Browse files
author
Alexandre Dutra
committed
Add k/v payload for 3rd party usage (JAVA-779).
1 parent c1f9386 commit 215b8de

18 files changed

Lines changed: 976 additions & 41 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- [new feature] Protocol V4 enum support (JAVA-783)
66
- [new feature] Use PK columns in protocol v4 PREPARED response (JAVA-776)
77
- [new feature] Distinguish NULL and UNSET values (JAVA-777)
8+
- [new feature] Add k/v payload for 3rd party usage (JAVA-779)
89
- [new feature] Expose server-side warnings on ExecutionInfo (JAVA-780)
910
- [new feature] Expose new read/write failure exceptions (JAVA-749)
1011
- [new feature] Expose function and aggregate metadata (JAVA-747)

driver-core/src/main/java/com/datastax/driver/core/AbstractSession.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
package com.datastax.driver.core;
1717

1818
import java.nio.ByteBuffer;
19+
import java.util.Map;
1920
import java.util.concurrent.ExecutionException;
2021

2122
import com.google.common.base.Function;
22-
import com.google.common.util.concurrent.*;
23+
import com.google.common.util.concurrent.Futures;
24+
import com.google.common.util.concurrent.ListenableFuture;
25+
import com.google.common.util.concurrent.Uninterruptibles;
2326

2427
/**
2528
* Abstract implementation of the Session interface.
@@ -92,6 +95,14 @@ public PreparedStatement prepare(RegularStatement statement) {
9295
}
9396
}
9497

98+
/**
99+
* {@inheritDoc}
100+
*/
101+
@Override
102+
public ListenableFuture<PreparedStatement> prepareAsync(String query) {
103+
return prepareAsync(query, null);
104+
}
105+
95106
/**
96107
* {@inheritDoc}
97108
*/
@@ -100,7 +111,7 @@ public ListenableFuture<PreparedStatement> prepareAsync(final RegularStatement s
100111
if (statement.hasValues())
101112
throw new IllegalArgumentException("A statement to prepare should not have values");
102113

103-
ListenableFuture<PreparedStatement> prepared = prepareAsync(statement.toString());
114+
ListenableFuture<PreparedStatement> prepared = prepareAsync(statement.toString(), statement.getOutgoingPayload());
104115
return Futures.transform(prepared, new Function<PreparedStatement, PreparedStatement>() {
105116
@Override
106117
public PreparedStatement apply(PreparedStatement prepared) {
@@ -111,12 +122,22 @@ public PreparedStatement apply(PreparedStatement prepared) {
111122
if (statement.isTracing())
112123
prepared.enableTracing();
113124
prepared.setRetryPolicy(statement.getRetryPolicy());
114-
125+
prepared.setOutgoingPayload(statement.getOutgoingPayload());
115126
return prepared;
116127
}
117128
});
118129
}
119130

131+
/**
132+
* Prepares the provided query string asynchronously,
133+
* sending along the provided custom payload, if any.
134+
*
135+
* @param query the CQL query string to prepare
136+
* @param customPayload the custom payload to send along the query, or {@code null} if no payload is to be sent
137+
* @return a future on the prepared statement corresponding to {@code query}.
138+
*/
139+
protected abstract ListenableFuture<PreparedStatement> prepareAsync(String query, Map<String, ByteBuffer> customPayload);
140+
120141
/**
121142
* {@inheritDoc}
122143
*/

driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public BoundStatement(PreparedStatement statement) {
7474
this.enableTracing();
7575
if (statement.getRetryPolicy() != null)
7676
this.setRetryPolicy(statement.getRetryPolicy());
77+
if (statement.getOutgoingPayload() != null)
78+
this.setOutgoingPayload(statement.getOutgoingPayload());
79+
else
80+
// propagate incoming payload as outgoing payload, if no outgoing payload has been explicitly set
81+
this.setOutgoingPayload(statement.getIncomingPayload());
7782
}
7883

7984
/**

driver-core/src/main/java/com/datastax/driver/core/CBUtil.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.charset.CharacterCodingException;
2323
import java.util.*;
2424

25+
import com.google.common.collect.ImmutableMap;
2526
import io.netty.buffer.ByteBuf;
2627
import io.netty.util.CharsetUtil;
2728

@@ -117,10 +118,52 @@ public static void writeBytes(byte[] bytes, ByteBuf cb) {
117118
cb.writeBytes(bytes);
118119
}
119120

121+
public static void writeBytes(ByteBuffer bytes, ByteBuf cb) {
122+
cb.writeShort(bytes.remaining());
123+
cb.writeBytes(bytes.duplicate());
124+
}
125+
120126
public static int sizeOfBytes(byte[] bytes) {
121127
return 2 + bytes.length;
122128
}
123129

130+
public static int sizeOfBytes(ByteBuffer bytes) {
131+
return 2 + bytes.remaining();
132+
}
133+
134+
public static Map<String, ByteBuffer> readBytesMap(ByteBuf cb) {
135+
int length = cb.readUnsignedShort();
136+
ImmutableMap.Builder<String, ByteBuffer> builder = ImmutableMap.builder();
137+
for (int i = 0; i < length; i++) {
138+
String key = readString(cb);
139+
ByteBuffer value = readValue(cb);
140+
if (value == null)
141+
value = Statement.NULL_PAYLOAD_VALUE;
142+
builder.put(key, value);
143+
}
144+
return builder.build();
145+
}
146+
147+
public static void writeBytesMap(Map<String, ByteBuffer> m, ByteBuf cb) {
148+
cb.writeShort(m.size());
149+
for (Map.Entry<String, ByteBuffer> entry : m.entrySet()) {
150+
writeString(entry.getKey(), cb);
151+
ByteBuffer value = entry.getValue();
152+
if (value == Statement.NULL_PAYLOAD_VALUE)
153+
value = null;
154+
writeValue(value, cb);
155+
}
156+
}
157+
158+
public static int sizeOfBytesMap(Map<String, ByteBuffer> m) {
159+
int size = 2;
160+
for (Map.Entry<String, ByteBuffer> entry : m.entrySet()) {
161+
size += sizeOfString(entry.getKey());
162+
size += sizeOfBytes(entry.getValue());
163+
}
164+
return size;
165+
}
166+
124167
public static ConsistencyLevel readConsistencyLevel(ByteBuf cb) {
125168
return ConsistencyLevel.fromCode(cb.readUnsignedShort());
126169
}

driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,35 @@
1717

1818
import java.nio.ByteBuffer;
1919
import java.util.List;
20+
import java.util.Map;
21+
22+
import com.google.common.collect.ImmutableMap;
2023

2124
import com.datastax.driver.core.policies.RetryPolicy;
2225

2326
import static com.datastax.driver.core.ProtocolVersion.V4;
2427

25-
public class DefaultPreparedStatement implements PreparedStatement{
28+
public class DefaultPreparedStatement implements PreparedStatement {
2629

2730
final PreparedId preparedId;
2831

2932
final String query;
3033
final String queryKeyspace;
34+
final Map<String, ByteBuffer> incomingPayload;
3135

3236
volatile ByteBuffer routingKey;
3337

3438
volatile ConsistencyLevel consistency;
3539
volatile ConsistencyLevel serialConsistency;
3640
volatile boolean traceQuery;
3741
volatile RetryPolicy retryPolicy;
42+
volatile ImmutableMap<String, ByteBuffer> outgoingPayload;
3843

39-
private DefaultPreparedStatement(PreparedId id, String query, String queryKeyspace) {
44+
private DefaultPreparedStatement(PreparedId id, String query, String queryKeyspace, Map<String, ByteBuffer> incomingPayload) {
4045
this.preparedId = id;
4146
this.query = query;
4247
this.queryKeyspace = queryKeyspace;
48+
this.incomingPayload = incomingPayload;
4349
}
4450

4551
static DefaultPreparedStatement fromMessage(Responses.Result.Prepared msg, Metadata clusterMetadata, ProtocolVersion protocolVersion, String query, String queryKeyspace) {
@@ -48,15 +54,15 @@ static DefaultPreparedStatement fromMessage(Responses.Result.Prepared msg, Metad
4854
ColumnDefinitions defs = msg.metadata.columns;
4955

5056
if (defs.size() == 0)
51-
return new DefaultPreparedStatement(new PreparedId(msg.statementId, defs, msg.resultMetadata.columns, null, protocolVersion), query, queryKeyspace);
57+
return new DefaultPreparedStatement(new PreparedId(msg.statementId, defs, msg.resultMetadata.columns, null, protocolVersion), query, queryKeyspace, msg.getCustomPayload());
5258

5359
int[] pkIndices = (protocolVersion.compareTo(V4) >= 0)
5460
? msg.metadata.pkIndices
5561
: computePkIndices(clusterMetadata, defs);
5662

5763
PreparedId prepId = new PreparedId(msg.statementId, defs, msg.resultMetadata.columns, pkIndices, protocolVersion);
5864

59-
return new DefaultPreparedStatement(prepId, query, queryKeyspace);
65+
return new DefaultPreparedStatement(prepId, query, queryKeyspace, msg.getCustomPayload());
6066
}
6167

6268
private static int[] computePkIndices(Metadata clusterMetadata, ColumnDefinitions boundColumns) {
@@ -185,4 +191,20 @@ public RetryPolicy getRetryPolicy() {
185191
public PreparedId getPreparedId() {
186192
return preparedId;
187193
}
194+
195+
@Override
196+
public Map<String, ByteBuffer> getIncomingPayload() {
197+
return incomingPayload;
198+
}
199+
200+
@Override
201+
public Map<String, ByteBuffer> getOutgoingPayload() {
202+
return outgoingPayload;
203+
}
204+
205+
@Override
206+
public PreparedStatement setOutgoingPayload(Map<String, ByteBuffer> payload) {
207+
this.outgoingPayload = payload == null ? null : ImmutableMap.copyOf(payload);
208+
return this;
209+
}
188210
}

driver-core/src/main/java/com/datastax/driver/core/ExecutionInfo.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.nio.ByteBuffer;
1919
import java.util.Collections;
2020
import java.util.List;
21+
import java.util.Map;
2122

2223
import com.datastax.driver.core.utils.Bytes;
2324

@@ -33,8 +34,9 @@ public class ExecutionInfo {
3334
private final Statement statement;
3435
private volatile boolean schemaInAgreement;
3536
private final List<String> warnings;
37+
private final Map<String, ByteBuffer> incomingPayload;
3638

37-
private ExecutionInfo(List<Host> triedHosts, ConsistencyLevel achievedConsistency, QueryTrace trace, ByteBuffer pagingState, ProtocolVersion protocolVersion, Statement statement, boolean schemaAgreement, List<String> warnings) {
39+
private ExecutionInfo(List<Host> triedHosts, ConsistencyLevel achievedConsistency, QueryTrace trace, ByteBuffer pagingState, ProtocolVersion protocolVersion, Statement statement, boolean schemaAgreement, List<String> warnings, Map<String, ByteBuffer> incomingPayload) {
3840
this.triedHosts = triedHosts;
3941
this.achievedConsistency = achievedConsistency;
4042
this.trace = trace;
@@ -43,26 +45,31 @@ private ExecutionInfo(List<Host> triedHosts, ConsistencyLevel achievedConsistenc
4345
this.statement = statement;
4446
this.schemaInAgreement = schemaAgreement;
4547
this.warnings = warnings;
48+
this.incomingPayload = incomingPayload;
4649
}
4750

4851
ExecutionInfo(List<Host> triedHosts) {
49-
this(triedHosts, null, null, null, null, null, true, Collections.<String>emptyList());
52+
this(triedHosts, null, null, null, null, null, true, Collections.<String>emptyList(), null);
5053
}
5154

5255
ExecutionInfo withTraceAndWarnings(QueryTrace newTrace, List<String> warnings) {
53-
return new ExecutionInfo(triedHosts, achievedConsistency, newTrace, pagingState, protocolVersion, statement, schemaInAgreement, warnings);
56+
return new ExecutionInfo(triedHosts, achievedConsistency, newTrace, pagingState, protocolVersion, statement, schemaInAgreement, warnings, incomingPayload);
5457
}
5558

5659
ExecutionInfo withAchievedConsistency(ConsistencyLevel newConsistency) {
57-
return new ExecutionInfo(triedHosts, newConsistency, trace, pagingState, protocolVersion, statement, schemaInAgreement, warnings);
60+
return new ExecutionInfo(triedHosts, newConsistency, trace, pagingState, protocolVersion, statement, schemaInAgreement, warnings, incomingPayload);
5861
}
5962

6063
ExecutionInfo withPagingState(ByteBuffer pagingState, ProtocolVersion protocolVersion) {
61-
return new ExecutionInfo(triedHosts, achievedConsistency, trace, pagingState, protocolVersion, statement, schemaInAgreement, warnings);
64+
return new ExecutionInfo(triedHosts, achievedConsistency, trace, pagingState, protocolVersion, statement, schemaInAgreement, warnings, incomingPayload);
6265
}
6366

6467
ExecutionInfo withStatement(Statement statement) {
65-
return new ExecutionInfo(triedHosts, achievedConsistency, trace, pagingState, protocolVersion, statement, schemaInAgreement, warnings);
68+
return new ExecutionInfo(triedHosts, achievedConsistency, trace, pagingState, protocolVersion, statement, schemaInAgreement, warnings, incomingPayload);
69+
}
70+
71+
ExecutionInfo withIncomingPayload(Map<String, ByteBuffer> incomingPayload) {
72+
return new ExecutionInfo(triedHosts, achievedConsistency, trace, pagingState, protocolVersion, statement, schemaInAgreement, warnings, incomingPayload);
6673
}
6774

6875
/**
@@ -195,11 +202,33 @@ void setSchemaInAgreement(boolean schemaAgreement) {
195202
* Returns the server-side warnings for this query.
196203
* <p>
197204
* This feature is only available with {@link ProtocolVersion#V4} or above; with lower
198-
* versions, there list will always be empty.
205+
* versions, the returned list will always be empty.
199206
*
200207
* @return the warnings, or an empty list if there are none.
208+
* @since 2.2
201209
*/
202210
public List<String> getWarnings() {
203211
return warnings;
204212
}
213+
214+
/**
215+
* Return the incoming payload, that is, the payload that the server
216+
* sent back with its response, if any,
217+
* or {@code null}, if the server did not include any custom payload.
218+
* <p>
219+
* This method returns a read-only view of the original map, but
220+
* its values remain inherently mutable.
221+
* Callers should take care not to modify the returned map in any way.
222+
* <p>
223+
* This feature is only available with {@link ProtocolVersion#V4} or above; with lower
224+
* versions, this method will always return {@code null}.
225+
*
226+
* @return the custom payload that the server sent back with its response, if any,
227+
* or {@code null}, if the server did not include any custom payload.
228+
* @since 2.2
229+
*/
230+
public Map<String, ByteBuffer> getIncomingPayload() {
231+
return incomingPayload;
232+
}
233+
205234
}

0 commit comments

Comments
 (0)