Skip to content

Commit 8866686

Browse files
committed
Extract ProtocolVersionRegistry interface, add method to find cluster's optimal version
1 parent 5e59744 commit 8866686

7 files changed

Lines changed: 355 additions & 81 deletions

File tree

core/src/main/java/com/datastax/oss/driver/api/core/UnsupportedProtocolVersionException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public static UnsupportedProtocolVersionException forNegotiation(
5252
address, message, ImmutableList.copyOf(attemptedVersions));
5353
}
5454

55-
private UnsupportedProtocolVersionException(
55+
public UnsupportedProtocolVersionException(
5656
SocketAddress address, String message, List<ProtocolVersion> attemptedVersions) {
5757
super(message, null, true);
5858
this.address = address;
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Copyright (C) 2017-2017 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core;
17+
18+
import com.datastax.oss.driver.api.core.CassandraVersion;
19+
import com.datastax.oss.driver.api.core.CoreProtocolVersion;
20+
import com.datastax.oss.driver.api.core.ProtocolVersion;
21+
import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException;
22+
import com.datastax.oss.driver.api.core.metadata.Node;
23+
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
24+
import com.google.common.base.Preconditions;
25+
import com.google.common.collect.ImmutableList;
26+
import java.util.Collection;
27+
import java.util.Map;
28+
import java.util.NavigableMap;
29+
import java.util.Optional;
30+
import java.util.SortedSet;
31+
import java.util.TreeMap;
32+
import java.util.TreeSet;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
/**
37+
* Built-in implementation of the protocol version registry, that supports the protocol versions of
38+
* Apache Cassandra.
39+
*
40+
* <p>
41+
*
42+
* <p>This can be overridden with a custom implementation by subclassing {@link
43+
* DefaultDriverContext}.
44+
*
45+
* @see CoreProtocolVersion
46+
*/
47+
public class CassandraProtocolVersionRegistry implements ProtocolVersionRegistry {
48+
49+
private static final Logger LOG = LoggerFactory.getLogger(CassandraProtocolVersionRegistry.class);
50+
51+
private static final CassandraVersion CASSANDRA_210 = CassandraVersion.parse("2.1.0");
52+
private static final CassandraVersion CASSANDRA_220 = CassandraVersion.parse("2.2.0");
53+
54+
private final String logPrefix;
55+
private final NavigableMap<Integer, ProtocolVersion> versionsByCode;
56+
57+
public CassandraProtocolVersionRegistry(String logPrefix) {
58+
this(logPrefix, CoreProtocolVersion.values());
59+
}
60+
61+
protected CassandraProtocolVersionRegistry(String logPrefix, ProtocolVersion[]... versionRanges) {
62+
this.logPrefix = logPrefix;
63+
this.versionsByCode = byCode(versionRanges);
64+
}
65+
66+
@Override
67+
public ProtocolVersion fromCode(int code) {
68+
ProtocolVersion protocolVersion = versionsByCode.get(code);
69+
if (protocolVersion == null) {
70+
throw new IllegalArgumentException("Unknown protocol version code: " + code);
71+
}
72+
return protocolVersion;
73+
}
74+
75+
@Override
76+
public ProtocolVersion fromName(String name) {
77+
for (ProtocolVersion version : versionsByCode.values()) {
78+
if (version.name().equals(name)) {
79+
return version;
80+
}
81+
}
82+
throw new IllegalArgumentException("Unknown protocol version name: " + name);
83+
}
84+
85+
@Override
86+
public ProtocolVersion highestNonBeta() {
87+
ProtocolVersion highest = versionsByCode.lastEntry().getValue();
88+
if (!highest.isBeta()) {
89+
return highest;
90+
} else {
91+
return downgrade(highest)
92+
.orElseThrow(() -> new AssertionError("There should be at least one non-beta version"));
93+
}
94+
}
95+
96+
@Override
97+
public Optional<ProtocolVersion> downgrade(ProtocolVersion version) {
98+
Map.Entry<Integer, ProtocolVersion> previousEntry =
99+
versionsByCode.lowerEntry(version.getCode());
100+
if (previousEntry == null) {
101+
return Optional.empty();
102+
} else {
103+
ProtocolVersion previousVersion = previousEntry.getValue();
104+
// Beta versions are skipped during negotiation
105+
return (previousVersion.isBeta()) ? downgrade(previousVersion) : Optional.of(previousVersion);
106+
}
107+
}
108+
109+
@Override
110+
public ProtocolVersion highestCommon(Collection<Node> nodes) {
111+
if (nodes == null || nodes.isEmpty()) {
112+
throw new IllegalArgumentException("Expected at least one node");
113+
}
114+
115+
SortedSet<CoreProtocolVersion> candidates = new TreeSet<>();
116+
117+
for (CoreProtocolVersion version : CoreProtocolVersion.values()) {
118+
// Beta versions always need to be forced, and we only call this method if the version
119+
// wasn't forced
120+
if (!version.isBeta()) {
121+
candidates.add(version);
122+
}
123+
}
124+
125+
// The C*<=>protocol mapping is hardcoded in the code below, I don't see a need to be more
126+
// sophisticated right now.
127+
for (Node node : nodes) {
128+
CassandraVersion cassandraVersion = node.getCassandraVersion();
129+
if (cassandraVersion == null) {
130+
LOG.warn(
131+
"[{}] Node {} reports null Cassandra version, "
132+
+ "ignoring it from optimal protocol version computation",
133+
logPrefix,
134+
node.getConnectAddress());
135+
continue;
136+
}
137+
cassandraVersion = cassandraVersion.nextStable();
138+
if (cassandraVersion.compareTo(CASSANDRA_210) < 0) {
139+
throw new UnsupportedProtocolVersionException(
140+
node.getConnectAddress(),
141+
String.format(
142+
"Node %s reports Cassandra version %s, "
143+
+ "but the driver only supports 2.1.0 and above",
144+
node.getConnectAddress(), cassandraVersion),
145+
ImmutableList.of(CoreProtocolVersion.V3, CoreProtocolVersion.V4));
146+
}
147+
148+
LOG.debug(
149+
"[{}] Node {} reports Cassandra version {}",
150+
logPrefix,
151+
node.getConnectAddress(),
152+
cassandraVersion);
153+
if (cassandraVersion.compareTo(CASSANDRA_220) < 0
154+
&& candidates.remove(CoreProtocolVersion.V4)) {
155+
LOG.debug("[{}] Excluding protocol V4", logPrefix);
156+
}
157+
}
158+
159+
if (candidates.isEmpty()) {
160+
// Note: with the current algorithm, this never happens
161+
throw new UnsupportedProtocolVersionException(
162+
null,
163+
String.format(
164+
"Could not determine a common protocol version, "
165+
+ "enable DEBUG logs for '%s' for more details",
166+
LOG.getName()),
167+
ImmutableList.of(CoreProtocolVersion.V3, CoreProtocolVersion.V4));
168+
} else {
169+
return candidates.last();
170+
}
171+
}
172+
173+
private NavigableMap<Integer, ProtocolVersion> byCode(ProtocolVersion[][] versionRanges) {
174+
NavigableMap<Integer, ProtocolVersion> map = new TreeMap<>();
175+
for (ProtocolVersion[] versionRange : versionRanges) {
176+
for (ProtocolVersion version : versionRange) {
177+
ProtocolVersion previous = map.put(version.getCode(), version);
178+
Preconditions.checkArgument(
179+
previous == null,
180+
"Duplicate version code: %s in %s and %s",
181+
version.getCode(),
182+
previous,
183+
version);
184+
}
185+
}
186+
return map;
187+
}
188+
}

core/src/main/java/com/datastax/oss/driver/internal/core/ProtocolVersionRegistry.java

Lines changed: 39 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -15,85 +15,55 @@
1515
*/
1616
package com.datastax.oss.driver.internal.core;
1717

18-
import com.datastax.oss.driver.api.core.CoreProtocolVersion;
1918
import com.datastax.oss.driver.api.core.ProtocolVersion;
20-
import com.google.common.base.Preconditions;
21-
import java.util.Map;
22-
import java.util.NavigableMap;
19+
import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException;
20+
import com.datastax.oss.driver.api.core.config.CoreDriverOption;
21+
import com.datastax.oss.driver.api.core.metadata.Node;
22+
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
23+
import java.util.Collection;
2324
import java.util.Optional;
24-
import java.util.TreeMap;
2525

26-
/** Manages all the native protocol versions supported by the driver. */
27-
public class ProtocolVersionRegistry {
28-
private final NavigableMap<Integer, ProtocolVersion> versionsByCode;
26+
/** Defines which native protocol versions are supported by a driver instance. */
27+
public interface ProtocolVersionRegistry {
2928

30-
public ProtocolVersionRegistry(ProtocolVersion[]... versionRanges) {
31-
this.versionsByCode = byCode(versionRanges);
32-
}
33-
34-
/** Default implementation, initialized with the core OSS versions. */
35-
public ProtocolVersionRegistry() {
36-
this(CoreProtocolVersion.values());
37-
}
38-
39-
public ProtocolVersion fromCode(int code) {
40-
ProtocolVersion protocolVersion = versionsByCode.get(code);
41-
if (protocolVersion == null) {
42-
throw new IllegalArgumentException("Unknown protocol version code: " + code);
43-
}
44-
return protocolVersion;
45-
}
29+
/**
30+
* Look up a version by its {@link ProtocolVersion#getCode()} code}.
31+
*
32+
* @throws IllegalArgumentException if there is no known version with this code.
33+
*/
34+
ProtocolVersion fromCode(int code);
4635

47-
public ProtocolVersion fromName(String name) {
48-
for (ProtocolVersion version : versionsByCode.values()) {
49-
if (version.name().equals(name)) {
50-
return version;
51-
}
52-
}
53-
throw new IllegalArgumentException("Unknown protocol version name: " + name);
54-
}
36+
/**
37+
* Look up a version by its {@link ProtocolVersion#name() name}. This is used when a version was
38+
* forced in the configuration.
39+
*
40+
* @throws IllegalArgumentException if there is no known version with this name.
41+
* @see CoreDriverOption#PROTOCOL_VERSION
42+
*/
43+
ProtocolVersion fromName(String name);
5544

56-
public ProtocolVersion highestNonBeta() {
57-
ProtocolVersion highest = versionsByCode.lastEntry().getValue();
58-
if (!highest.isBeta()) {
59-
return highest;
60-
} else {
61-
return downgrade(highest)
62-
.orElseThrow(() -> new AssertionError("There should be at least one non-beta version"));
63-
}
64-
}
45+
/**
46+
* The highest, non-beta version supported by the driver. This is used as the starting point for
47+
* the negotiation process for the initial connection (if the version wasn't forced).
48+
*/
49+
ProtocolVersion highestNonBeta();
6550

6651
/**
6752
* Downgrade to a lower version if the current version is not supported by the server. This is
68-
* used during the protocol negotiation process.
53+
* used during the negotiation process for the initial connection (if the version wasn't forced).
6954
*
70-
* @return an empty optional if there is no version to downgrade to.
55+
* @return empty if there is no version to downgrade to.
7156
*/
72-
public Optional<ProtocolVersion> downgrade(ProtocolVersion version) {
73-
Map.Entry<Integer, ProtocolVersion> previousEntry =
74-
versionsByCode.lowerEntry(version.getCode());
75-
if (previousEntry == null) {
76-
return Optional.empty();
77-
} else {
78-
ProtocolVersion previousVersion = previousEntry.getValue();
79-
// Beta versions are skipped during negotiation
80-
return (previousVersion.isBeta()) ? downgrade(previousVersion) : Optional.of(previousVersion);
81-
}
82-
}
57+
Optional<ProtocolVersion> downgrade(ProtocolVersion version);
8358

84-
private NavigableMap<Integer, ProtocolVersion> byCode(ProtocolVersion[][] versionRanges) {
85-
NavigableMap<Integer, ProtocolVersion> map = new TreeMap<>();
86-
for (ProtocolVersion[] versionRange : versionRanges) {
87-
for (ProtocolVersion version : versionRange) {
88-
ProtocolVersion previous = map.put(version.getCode(), version);
89-
Preconditions.checkArgument(
90-
previous == null,
91-
"Duplicate version code: %s in %s and %s",
92-
version.getCode(),
93-
previous,
94-
version);
95-
}
96-
}
97-
return map;
98-
}
59+
/**
60+
* Computes the highest common version supported by the given nodes. This is called after the
61+
* initial {@link TopologyMonitor#refreshNodeList()} node refresh} (provided that the version was
62+
* not forced), to ensure that we proceed with a version that will work with all the nodes.
63+
*
64+
* @throws UnsupportedProtocolVersionException if no such version exists (the nodes support
65+
* non-intersecting ranges), or if there was an error during the computation. This will cause
66+
* the driver initialization to fail.
67+
*/
68+
ProtocolVersion highestCommon(Collection<Node> nodes);
9969
}

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
3232
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
3333
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
34+
import com.datastax.oss.driver.internal.core.CassandraProtocolVersionRegistry;
3435
import com.datastax.oss.driver.internal.core.ProtocolVersionRegistry;
3536
import com.datastax.oss.driver.internal.core.channel.ChannelFactory;
3637
import com.datastax.oss.driver.internal.core.channel.WriteCoalescer;
@@ -219,7 +220,7 @@ protected FrameCodec<ByteBuf> buildFrameCodec() {
219220
}
220221

221222
protected ProtocolVersionRegistry buildProtocolVersionRegistry() {
222-
return new ProtocolVersionRegistry();
223+
return new CassandraProtocolVersionRegistry(clusterName());
223224
}
224225

225226
protected NettyOptions buildNettyOptions() {

0 commit comments

Comments
 (0)