Skip to content

Commit 067f1d9

Browse files
GregBestlandAlexandre Dutra
authored andcommitted
JAVA-2456: Detect CaaS and change default consistency level (apache#11)
1 parent c790361 commit 067f1d9

14 files changed

Lines changed: 512 additions & 28 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
*/
1616
package com.datastax.oss.driver.internal.core.channel;
1717

18+
import com.datastax.oss.driver.api.core.ConsistencyLevel;
1819
import com.datastax.oss.driver.api.core.ProtocolVersion;
1920
import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException;
2021
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
22+
import com.datastax.oss.driver.api.core.config.DriverConfig;
2123
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2224
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2325
import com.datastax.oss.driver.api.core.metadata.Node;
2426
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
2527
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
28+
import com.datastax.oss.driver.internal.core.config.typesafe.TypesafeDriverConfig;
2629
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2730
import com.datastax.oss.driver.internal.core.context.NettyOptions;
2831
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
@@ -33,13 +36,15 @@
3336
import com.datastax.oss.driver.internal.core.protocol.FrameEncoder;
3437
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
3538
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
39+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
3640
import io.netty.bootstrap.Bootstrap;
3741
import io.netty.channel.Channel;
3842
import io.netty.channel.ChannelFuture;
3943
import io.netty.channel.ChannelInitializer;
4044
import io.netty.channel.ChannelOption;
4145
import io.netty.channel.ChannelPipeline;
4246
import java.util.List;
47+
import java.util.Map;
4348
import java.util.Optional;
4449
import java.util.concurrent.CompletableFuture;
4550
import java.util.concurrent.CompletionStage;
@@ -54,6 +59,15 @@ public class ChannelFactory {
5459

5560
private static final Logger LOG = LoggerFactory.getLogger(ChannelFactory.class);
5661

62+
/** A value for {@link #productType} that indicates that we are connected to Datastax Cloud. */
63+
private static final String DATASTAX_CLOUD_PRODUCT_TYPE = "DATASTAX_APOLLO";
64+
65+
/**
66+
* A value for {@link #productType} that indicates that the server does not report any product
67+
* type.
68+
*/
69+
private static final String UNKNOWN_PRODUCT_TYPE = "UNKNOWN";
70+
5771
private final String logPrefix;
5872
protected final InternalDriverContext context;
5973

@@ -62,6 +76,14 @@ public class ChannelFactory {
6276

6377
@VisibleForTesting volatile String clusterName;
6478

79+
/**
80+
* The value of the {@code PRODUCT_TYPE} option reported by the first channel we opened, in
81+
* response to a {@code SUPPORTED} request.
82+
*
83+
* <p>If the server does not return that option, the value will be {@link #UNKNOWN_PRODUCT_TYPE}.
84+
*/
85+
@VisibleForTesting volatile String productType;
86+
6587
public ChannelFactory(InternalDriverContext context) {
6688
this.logPrefix = context.getSessionName();
6789
this.context = context;
@@ -166,6 +188,24 @@ private void connect(
166188
if (ChannelFactory.this.clusterName == null) {
167189
ChannelFactory.this.clusterName = driverChannel.getClusterName();
168190
}
191+
Map<String, List<String>> supportedOptions = driverChannel.getOptions();
192+
if (ChannelFactory.this.productType == null && supportedOptions != null) {
193+
List<String> productTypes = supportedOptions.get("PRODUCT_TYPE");
194+
String productType =
195+
productTypes != null && !productTypes.isEmpty()
196+
? productTypes.get(0)
197+
: UNKNOWN_PRODUCT_TYPE;
198+
ChannelFactory.this.productType = productType;
199+
DriverConfig driverConfig = context.getConfig();
200+
if (driverConfig instanceof TypesafeDriverConfig
201+
&& productType.equals(DATASTAX_CLOUD_PRODUCT_TYPE)) {
202+
((TypesafeDriverConfig) driverConfig)
203+
.overrideDefaults(
204+
ImmutableMap.of(
205+
DefaultDriverOption.REQUEST_CONSISTENCY,
206+
ConsistencyLevel.LOCAL_QUORUM.name()));
207+
}
208+
}
169209
resultFuture.complete(driverChannel);
170210
} else {
171211
Throwable error = connectFuture.cause();
@@ -237,7 +277,13 @@ protected void initChannel(Channel channel) {
237277
HeartbeatHandler heartbeatHandler = new HeartbeatHandler(defaultConfig);
238278
ProtocolInitHandler initHandler =
239279
new ProtocolInitHandler(
240-
context, protocolVersion, clusterName, endPoint, options, heartbeatHandler);
280+
context,
281+
protocolVersion,
282+
clusterName,
283+
endPoint,
284+
options,
285+
heartbeatHandler,
286+
productType == null);
241287

242288
ChannelPipeline pipeline = channel.pipeline();
243289
context

core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.netty.util.concurrent.Promise;
3030
import java.net.SocketAddress;
3131
import java.nio.ByteBuffer;
32+
import java.util.List;
3233
import java.util.Map;
3334
import java.util.concurrent.atomic.AtomicBoolean;
3435
import net.jcip.annotations.ThreadSafe;
@@ -39,7 +40,10 @@
3940
*/
4041
@ThreadSafe
4142
public class DriverChannel {
43+
4244
static final AttributeKey<String> CLUSTER_NAME_KEY = AttributeKey.newInstance("cluster_name");
45+
static final AttributeKey<Map<String, List<String>>> OPTIONS_KEY =
46+
AttributeKey.newInstance("options");
4347

4448
@SuppressWarnings("RedundantStringConstructorCall")
4549
static final Object GRACEFUL_CLOSE_MESSAGE = new String("GRACEFUL_CLOSE_MESSAGE");
@@ -120,6 +124,10 @@ public String getClusterName() {
120124
return channel.attr(CLUSTER_NAME_KEY).get();
121125
}
122126

127+
public Map<String, List<String>> getOptions() {
128+
return channel.attr(OPTIONS_KEY).get();
129+
}
130+
123131
/**
124132
* @return the number of available stream ids on the channel. This is used to weigh channels in
125133
* pools that have a size bigger than 1, in the load balancing policy, and for monitoring

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
3232
import com.datastax.oss.protocol.internal.Message;
3333
import com.datastax.oss.protocol.internal.ProtocolConstants;
34+
import com.datastax.oss.protocol.internal.ProtocolConstants.ErrorCode;
3435
import com.datastax.oss.protocol.internal.request.AuthResponse;
36+
import com.datastax.oss.protocol.internal.request.Options;
3537
import com.datastax.oss.protocol.internal.request.Query;
3638
import com.datastax.oss.protocol.internal.request.Register;
3739
import com.datastax.oss.protocol.internal.request.Startup;
@@ -40,6 +42,7 @@
4042
import com.datastax.oss.protocol.internal.response.Authenticate;
4143
import com.datastax.oss.protocol.internal.response.Error;
4244
import com.datastax.oss.protocol.internal.response.Ready;
45+
import com.datastax.oss.protocol.internal.response.Supported;
4346
import com.datastax.oss.protocol.internal.response.result.Rows;
4447
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
4548
import io.netty.channel.ChannelHandlerContext;
@@ -69,14 +72,21 @@ class ProtocolInitHandler extends ConnectInitHandler {
6972
private final HeartbeatHandler heartbeatHandler;
7073
private String logPrefix;
7174
private ChannelHandlerContext ctx;
75+
private boolean querySupportedOptions;
7276

77+
/**
78+
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
79+
* protocol options the channel supports. If this is true, the options will be stored as a
80+
* channel attribute, and exposed via {@link DriverChannel#getOptions()}.
81+
*/
7382
ProtocolInitHandler(
7483
InternalDriverContext context,
7584
ProtocolVersion protocolVersion,
7685
String expectedClusterName,
7786
EndPoint endPoint,
7887
DriverChannelOptions options,
79-
HeartbeatHandler heartbeatHandler) {
88+
HeartbeatHandler heartbeatHandler,
89+
boolean querySupportedOptions) {
8090

8191
this.context = context;
8292
this.endPoint = endPoint;
@@ -89,6 +99,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
8999
this.expectedClusterName = expectedClusterName;
90100
this.options = options;
91101
this.heartbeatHandler = heartbeatHandler;
102+
this.querySupportedOptions = querySupportedOptions;
92103
this.logPrefix = options.ownerLogPrefix + "|connecting...";
93104
}
94105

@@ -117,6 +128,7 @@ protected boolean setConnectSuccess() {
117128
}
118129

119130
private enum Step {
131+
OPTIONS,
120132
STARTUP,
121133
GET_CLUSTER_NAME,
122134
SET_KEYSPACE,
@@ -133,7 +145,7 @@ private class InitRequest extends ChannelHandlerRequest {
133145

134146
InitRequest(ChannelHandlerContext ctx) {
135147
super(ctx, timeoutMillis);
136-
this.step = Step.STARTUP;
148+
this.step = querySupportedOptions ? Step.OPTIONS : Step.STARTUP;
137149
}
138150

139151
@Override
@@ -144,6 +156,8 @@ String describe() {
144156
@Override
145157
Message getRequest() {
146158
switch (step) {
159+
case OPTIONS:
160+
return Options.INSTANCE;
147161
case STARTUP:
148162
return new Startup(context.getStartupOptions());
149163
case GET_CLUSTER_NAME:
@@ -167,7 +181,11 @@ void onResponse(Message response) {
167181
step,
168182
ProtocolUtils.opcodeString(response.opcode));
169183
try {
170-
if (step == Step.STARTUP && response instanceof Ready) {
184+
if (step == Step.OPTIONS && response instanceof Supported) {
185+
channel.attr(DriverChannel.OPTIONS_KEY).set(((Supported) response).options);
186+
step = Step.STARTUP;
187+
send();
188+
} else if (step == Step.STARTUP && response instanceof Ready) {
171189
context.getAuthProvider().ifPresent(provider -> provider.onMissingChallenge(endPoint));
172190
step = Step.GET_CLUSTER_NAME;
173191
send();
@@ -265,12 +283,14 @@ void onResponse(Message response) {
265283
} else if (response instanceof Error) {
266284
Error error = (Error) response;
267285
// Testing for a specific string is a tad fragile but Cassandra doesn't give us a more
268-
// precise error
269-
// code.
286+
// precise error code.
270287
// C* 2.1 reports a server error instead of protocol error, see CASSANDRA-9451.
271-
if (step == Step.STARTUP
272-
&& (error.code == ProtocolConstants.ErrorCode.PROTOCOL_ERROR
273-
|| error.code == ProtocolConstants.ErrorCode.SERVER_ERROR)
288+
boolean firstRequest =
289+
(step == Step.OPTIONS && querySupportedOptions) || step == Step.STARTUP;
290+
boolean serverOrProtocolError =
291+
error.code == ErrorCode.PROTOCOL_ERROR || error.code == ErrorCode.SERVER_ERROR;
292+
if (firstRequest
293+
&& serverOrProtocolError
274294
&& error.message.contains("Invalid or unsupported protocol version")) {
275295
fail(
276296
UnsupportedProtocolVersionException.forSingleAttempt(

core/src/main/java/com/datastax/oss/driver/internal/core/config/typesafe/TypesafeDriverConfig.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,20 @@
1919

2020
import com.datastax.oss.driver.api.core.config.DriverConfig;
2121
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
22+
import com.datastax.oss.driver.api.core.config.DriverOption;
2223
import com.datastax.oss.driver.internal.core.util.Loggers;
2324
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
2425
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
2526
import com.typesafe.config.Config;
2627
import com.typesafe.config.ConfigObject;
28+
import com.typesafe.config.ConfigOrigin;
29+
import com.typesafe.config.ConfigOriginFactory;
2730
import com.typesafe.config.ConfigValue;
31+
import com.typesafe.config.ConfigValueFactory;
2832
import edu.umd.cs.findbugs.annotations.NonNull;
33+
import java.net.URL;
2934
import java.util.Map;
35+
import java.util.concurrent.ConcurrentHashMap;
3036
import net.jcip.annotations.ThreadSafe;
3137
import org.slf4j.Logger;
3238
import org.slf4j.LoggerFactory;
@@ -35,14 +41,17 @@
3541
public class TypesafeDriverConfig implements DriverConfig {
3642

3743
private static final Logger LOG = LoggerFactory.getLogger(TypesafeDriverConfig.class);
44+
private static final ConfigOrigin DEFAULT_OVERRIDES_ORIGIN =
45+
ConfigOriginFactory.newSimple("default was overridden programmatically");
3846

3947
private final ImmutableMap<String, TypesafeDriverExecutionProfile.Base> profiles;
4048
// Only used to detect if reload saw any change
4149
private volatile Config lastLoadedConfig;
4250

51+
private final Map<DriverOption, Object> defaultOverrides = new ConcurrentHashMap<>();
52+
4353
public TypesafeDriverConfig(Config config) {
4454
this.lastLoadedConfig = config;
45-
4655
Map<String, Config> profileConfigs = extractProfiles(config);
4756

4857
ImmutableMap.Builder<String, TypesafeDriverExecutionProfile.Base> builder =
@@ -57,6 +66,7 @@ public TypesafeDriverConfig(Config config) {
5766

5867
/** @return whether the configuration changed */
5968
public boolean reload(Config config) {
69+
config = applyDefaultOverrides(config);
6070
if (config.equals(lastLoadedConfig)) {
6171
return false;
6272
} else {
@@ -141,4 +151,50 @@ public DriverExecutionProfile getProfile(@NonNull String profileName) {
141151
public Map<String, ? extends DriverExecutionProfile> getProfiles() {
142152
return profiles;
143153
}
154+
155+
/**
156+
* Replace the given options, <em>only if the original values came from {@code
157+
* reference.conf}</em>: if the option was set explicitly in {@code application.conf}, then the
158+
* override is ignored.
159+
*
160+
* <p>The overrides are also taken into account in profiles, and survive reloads. If this method
161+
* is invoked multiple times, the last value for each option will be used. Note that it is
162+
* currently not possible to use {@code null} as a value.
163+
*/
164+
public void overrideDefaults(@NonNull Map<DriverOption, Object> overrides) {
165+
defaultOverrides.putAll(overrides);
166+
reload(lastLoadedConfig);
167+
}
168+
169+
private Config applyDefaultOverrides(Config source) {
170+
Config result = source;
171+
for (Map.Entry<DriverOption, Object> entry : defaultOverrides.entrySet()) {
172+
String path = entry.getKey().getPath();
173+
Object value = entry.getValue();
174+
if (isDefault(source, path)) {
175+
LOG.debug("Replacing default value for {} by {}", path, value);
176+
result =
177+
result.withValue(
178+
path, ConfigValueFactory.fromAnyRef(value).withOrigin(DEFAULT_OVERRIDES_ORIGIN));
179+
} else {
180+
LOG.debug(
181+
"Ignoring default override for {} because the user has overridden the value", path);
182+
}
183+
}
184+
return result;
185+
}
186+
187+
// Whether the value in the given path comes from the reference.conf in the driver JAR.
188+
private static boolean isDefault(Config config, String path) {
189+
if (!config.hasPath(path)) {
190+
return false;
191+
}
192+
ConfigOrigin origin = config.getValue(path).origin();
193+
if (origin.equals(DEFAULT_OVERRIDES_ORIGIN)) {
194+
// Same default was overridden twice, should use the last value
195+
return true;
196+
}
197+
URL url = origin.url();
198+
return url != null && url.toString().endsWith("reference.conf");
199+
}
144200
}

core/src/main/resources/reference.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ datastax-java-driver {
194194
# This setting must be a valid URL.
195195
# If the protocol is not specified, it is implicitly assumed to be the `file://` protocol,
196196
# in which case the value is expected to be a valid path on the local filesystem.
197-
# For example, `/a/path/to/bundle` will be interpreted as `file:///a/path/to/bunde`.
197+
# For example, `/a/path/to/bundle` will be interpreted as `file:/a/path/to/bunde`.
198198
# If the protocol is provided explicitly, then the value will be used as is.
199199
#
200200
# Required: no

core/src/test/java/com/datastax/oss/driver/internal/core/TestResponses.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
import com.datastax.oss.driver.shaded.guava.common.base.Charsets;
1919
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
20+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
2021
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
2122
import com.datastax.oss.protocol.internal.ProtocolConstants;
23+
import com.datastax.oss.protocol.internal.response.Supported;
2224
import com.datastax.oss.protocol.internal.response.result.ColumnSpec;
2325
import com.datastax.oss.protocol.internal.response.result.DefaultRows;
2426
import com.datastax.oss.protocol.internal.response.result.RawType;
2527
import com.datastax.oss.protocol.internal.response.result.Rows;
2628
import com.datastax.oss.protocol.internal.response.result.RowsMetadata;
2729
import java.nio.ByteBuffer;
2830
import java.util.List;
31+
import java.util.Map;
2932
import java.util.Queue;
3033

3134
public class TestResponses {
@@ -43,4 +46,9 @@ public static Rows clusterNameResponse(String actualClusterName) {
4346
data.add(Lists.newArrayList(ByteBuffer.wrap(actualClusterName.getBytes(Charsets.UTF_8))));
4447
return new DefaultRows(metadata, data);
4548
}
49+
50+
public static Supported supportedResponse(String key, String value) {
51+
Map<String, List<String>> options = ImmutableMap.of(key, ImmutableList.of(value));
52+
return new Supported(options);
53+
}
4654
}

core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryClusterNameTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public void should_set_cluster_name_from_first_connection() {
4141
factory.connect(
4242
SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE);
4343

44+
writeInboundFrame(
45+
readOutboundFrame(), TestResponses.supportedResponse("mock_key", "mock_value"));
4446
writeInboundFrame(readOutboundFrame(), new Ready());
4547
writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("mockClusterName"));
4648

@@ -61,6 +63,8 @@ public void should_check_cluster_name_for_next_connections() throws Throwable {
6163
factory.connect(
6264
SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE);
6365
// open a first connection that will define the cluster name
66+
writeInboundFrame(
67+
readOutboundFrame(), TestResponses.supportedResponse("mock_key", "mock_value"));
6468
writeInboundFrame(readOutboundFrame(), new Ready());
6569
writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("mockClusterName"));
6670
assertThatStage(channelFuture).isSuccess();

0 commit comments

Comments
 (0)