Skip to content

Commit 4c2d26d

Browse files
Alexandre Dutraolim7t
authored andcommitted
JAVA-2323: Handle restart of a node with same host_id but a different address
1 parent 7404f49 commit 4c2d26d

12 files changed

Lines changed: 102 additions & 44 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.2.0 (in progress)
66

7+
- [bug] JAVA-2323: Handle restart of a node with same host_id but a different address
78
- [improvement] JAVA-2303: Ignore peer rows matching the control host's RPC address
89
- [improvement] JAVA-2236: Add methods to set the auth provider programmatically
910
- [improvement] JAVA-2369: Change mapper annotations retention to runtime

core/src/main/java/com/datastax/oss/driver/api/core/metadata/Node.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.datastax.oss.driver.api.core.Version;
1919
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
2020
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
21+
import com.datastax.oss.driver.api.core.session.Session;
2122
import edu.umd.cs.findbugs.annotations.NonNull;
2223
import edu.umd.cs.findbugs.annotations.Nullable;
2324
import java.net.InetSocketAddress;
@@ -30,6 +31,11 @@
3031
*
3132
* <p>This object is mutable, all of its properties may be updated at runtime to reflect the latest
3233
* state of the node.
34+
*
35+
* <p>Note that the default implementation returned by the driver uses <b>reference equality</b>. A
36+
* {@link Session} will always return the same instance for a given {@link #getHostId() host id}.
37+
* However, instances coming from different sessions will not be equal, even if they refer to the
38+
* same host id.
3339
*/
3440
public interface Node {
3541

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,10 @@ public class AddNodeRefresh extends NodesRefresh {
3737
public Result compute(
3838
DefaultMetadata oldMetadata, boolean tokenMapEnabled, InternalDriverContext context) {
3939
Map<UUID, Node> oldNodes = oldMetadata.getNodes();
40-
if (oldNodes.containsKey(newNodeInfo.getHostId())) {
41-
return new Result(oldMetadata);
42-
} else {
40+
Node existing = oldNodes.get(newNodeInfo.getHostId());
41+
if (existing == null) {
4342
DefaultNode newNode = new DefaultNode(newNodeInfo.getEndPoint(), context);
44-
copyInfos(newNodeInfo, newNode, null, context.getSessionName());
43+
copyInfos(newNodeInfo, newNode, null, context);
4544
Map<UUID, Node> newNodes =
4645
ImmutableMap.<UUID, Node>builder()
4746
.putAll(oldNodes)
@@ -50,6 +49,19 @@ public Result compute(
5049
return new Result(
5150
oldMetadata.withNodes(newNodes, tokenMapEnabled, false, null, context),
5251
ImmutableList.of(NodeStateEvent.added(newNode)));
52+
} else {
53+
// If a node is restarted after changing its broadcast RPC address, Cassandra considers that
54+
// an addition, even though the host_id hasn't changed :(
55+
// Update the existing instance and emit an UP event to trigger a pool reconnection.
56+
if (!existing.getEndPoint().equals(newNodeInfo.getEndPoint())) {
57+
copyInfos(newNodeInfo, ((DefaultNode) existing), null, context);
58+
assert newNodeInfo.getBroadcastRpcAddress().isPresent(); // always for peer nodes
59+
return new Result(
60+
oldMetadata,
61+
ImmutableList.of(TopologyEvent.suggestUp(newNodeInfo.getBroadcastRpcAddress().get())));
62+
} else {
63+
return new Result(oldMetadata);
64+
}
5365
}
5466
}
5567
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.datastax.oss.driver.api.core.metadata.NodeState;
2323
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2424
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
25+
import com.datastax.oss.driver.internal.core.metrics.NoopNodeMetricUpdater;
2526
import edu.umd.cs.findbugs.annotations.NonNull;
2627
import edu.umd.cs.findbugs.annotations.Nullable;
2728
import java.net.InetSocketAddress;
@@ -39,8 +40,8 @@
3940
@ThreadSafe
4041
public class DefaultNode implements Node {
4142

42-
private final EndPoint endPoint;
43-
private final NodeMetricUpdater metricUpdater;
43+
private volatile EndPoint endPoint;
44+
private volatile NodeMetricUpdater metricUpdater;
4445

4546
volatile InetSocketAddress broadcastRpcAddress;
4647
volatile InetSocketAddress broadcastAddress;
@@ -80,6 +81,18 @@ public EndPoint getEndPoint() {
8081
return endPoint;
8182
}
8283

84+
public void setEndPoint(@NonNull EndPoint newEndPoint, @NonNull InternalDriverContext context) {
85+
if (!newEndPoint.equals(endPoint)) {
86+
endPoint = newEndPoint;
87+
88+
// The endpoint is also used to build metric names, so make sure they get updated
89+
NodeMetricUpdater previousMetricUpdater = metricUpdater;
90+
if (!(previousMetricUpdater instanceof NoopNodeMetricUpdater)) {
91+
metricUpdater = context.getMetricsFactory().newNodeUpdater(this);
92+
}
93+
}
94+
}
95+
8396
@NonNull
8497
@Override
8598
public Optional<InetSocketAddress> getBroadcastRpcAddress() {
@@ -165,28 +178,9 @@ public NodeMetricUpdater getMetricUpdater() {
165178
return metricUpdater;
166179
}
167180

168-
@Override
169-
public boolean equals(Object other) {
170-
if (other == this) {
171-
return true;
172-
} else if (other instanceof Node) {
173-
Node that = (Node) other;
174-
// hostId is the natural identifier, but unfortunately we don't know it for contact points
175-
// until the driver has opened the first connection.
176-
return this.endPoint.equals(that.getEndPoint());
177-
} else {
178-
return false;
179-
}
180-
}
181-
182-
@Override
183-
public int hashCode() {
184-
return endPoint.hashCode();
185-
}
186-
187181
@Override
188182
public String toString() {
189-
return endPoint.toString();
183+
return String.format("%s(%s,%s)", super.toString(), hostId, endPoint);
190184
}
191185

192186
/** Note: deliberately not exposed by the public interface. */

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public Result compute(
7272
if (tokenFactory == null && nodeInfo.getPartitioner() != null) {
7373
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
7474
}
75-
tokensChanged |= copyInfos(nodeInfo, node, tokenFactory, logPrefix);
75+
tokensChanged |= copyInfos(nodeInfo, node, tokenFactory, context);
7676
}
7777

7878
Set<UUID> removed = Sets.difference(oldNodes.keySet(), seen);

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,15 @@ public Result compute(
7474
if (tokenFactory == null && nodeInfo.getPartitioner() != null) {
7575
tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());
7676
}
77-
tokensChanged |= copyInfos(nodeInfo, node, tokenFactory, logPrefix);
77+
tokensChanged |= copyInfos(nodeInfo, node, tokenFactory, context);
7878
newNodesBuilder.put(node.getHostId(), node);
7979
}
8080

8181
ImmutableMap<UUID, DefaultNode> newNodes = newNodesBuilder.build();
8282
ImmutableList.Builder<Object> eventsBuilder = ImmutableList.builder();
8383

8484
for (DefaultNode newNode : newNodes.values()) {
85-
if (!contactPoints.contains(newNode)) {
85+
if (findIn(contactPoints, newNode.getEndPoint()) == null) {
8686
eventsBuilder.add(NodeStateEvent.added(newNode));
8787
}
8888
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public CompletionStage<Void> refreshNode(Node node) {
163163
maybeInfo -> {
164164
if (maybeInfo.isPresent()) {
165165
boolean tokensChanged =
166-
NodesRefresh.copyInfos(maybeInfo.get(), (DefaultNode) node, null, logPrefix);
166+
NodesRefresh.copyInfos(maybeInfo.get(), (DefaultNode) node, null, context);
167167
if (tokensChanged) {
168168
apply(new TokensChangedRefresh());
169169
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodesRefresh.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.datastax.oss.driver.internal.core.metadata;
1717

1818
import com.datastax.oss.driver.api.core.Version;
19+
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
1920
import com.datastax.oss.driver.internal.core.metadata.token.TokenFactory;
2021
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
2122
import java.util.Collections;
@@ -34,7 +35,12 @@ abstract class NodesRefresh implements MetadataRefresh {
3435
* mutate the tokens in-place, so there is no way to check this after the fact).
3536
*/
3637
protected static boolean copyInfos(
37-
NodeInfo nodeInfo, DefaultNode node, TokenFactory tokenFactory, String logPrefix) {
38+
NodeInfo nodeInfo,
39+
DefaultNode node,
40+
TokenFactory tokenFactory,
41+
InternalDriverContext context) {
42+
43+
node.setEndPoint(nodeInfo.getEndPoint(), context);
3844
node.broadcastRpcAddress = nodeInfo.getBroadcastRpcAddress().orElse(null);
3945
node.broadcastAddress = nodeInfo.getBroadcastAddress().orElse(null);
4046
node.listenAddress = nodeInfo.getListenAddress().orElse(null);
@@ -48,7 +54,7 @@ protected static boolean copyInfos(
4854
} catch (IllegalArgumentException e) {
4955
LOG.warn(
5056
"[{}] Error converting Cassandra version '{}' for {}",
51-
logPrefix,
57+
context.getSessionName(),
5258
versionString,
5359
node.getEndPoint());
5460
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefreshTest.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535

3636
@RunWith(MockitoJUnitRunner.class)
3737
public class AddNodeRefreshTest {
38-
private static final InetSocketAddress ADDRESS1 = new InetSocketAddress("127.0.0.1", 9042);
39-
private static final InetSocketAddress ADDRESS2 = new InetSocketAddress("127.0.0.2", 9042);
4038

4139
@Mock private InternalDriverContext context;
4240
@Mock protected MetricsFactory metricsFactory;
@@ -84,7 +82,7 @@ public void should_add_new_node() {
8482
}
8583

8684
@Test
87-
public void should_not_add_existing_node() {
85+
public void should_not_add_existing_node_with_same_id_and_endpoint() {
8886
// Given
8987
DefaultMetadata oldMetadata =
9088
new DefaultMetadata(
@@ -108,4 +106,37 @@ public void should_not_add_existing_node() {
108106
assertThat(node1.getRack()).isNull();
109107
assertThat(result.events).isEmpty();
110108
}
109+
110+
@Test
111+
public void should_add_existing_node_with_same_id_but_different_endpoint() {
112+
// Given
113+
DefaultMetadata oldMetadata =
114+
new DefaultMetadata(
115+
ImmutableMap.of(node1.getHostId(), node1), Collections.emptyMap(), null);
116+
DefaultEndPoint newEndPoint = TestNodeFactory.newEndPoint(2);
117+
InetSocketAddress newBroadcastRpcAddress = newEndPoint.resolve();
118+
UUID newSchemaVersion = Uuids.random();
119+
DefaultNodeInfo newNodeInfo =
120+
DefaultNodeInfo.builder()
121+
.withHostId(node1.getHostId())
122+
.withEndPoint(newEndPoint)
123+
.withDatacenter("dc1")
124+
.withRack("rack2")
125+
.withSchemaVersion(newSchemaVersion)
126+
.withBroadcastRpcAddress(newBroadcastRpcAddress)
127+
.build();
128+
AddNodeRefresh refresh = new AddNodeRefresh(newNodeInfo);
129+
130+
// When
131+
MetadataRefresh.Result result = refresh.compute(oldMetadata, false, context);
132+
133+
// Then
134+
Map<UUID, Node> newNodes = result.newMetadata.getNodes();
135+
assertThat(newNodes).hasSize(1).containsEntry(node1.getHostId(), node1);
136+
assertThat(node1.getEndPoint()).isEqualTo(newEndPoint);
137+
assertThat(node1.getDatacenter()).isEqualTo("dc1");
138+
assertThat(node1.getRack()).isEqualTo("rack2");
139+
assertThat(node1.getSchemaVersion()).isEqualTo(newSchemaVersion);
140+
assertThat(result.events).containsExactly(TopologyEvent.suggestUp(newBroadcastRpcAddress));
141+
}
111142
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefreshTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.datastax.oss.driver.Assertions.assertThat;
1919
import static org.mockito.Mockito.when;
2020

21+
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2122
import com.datastax.oss.driver.api.core.uuid.Uuids;
2223
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2324
import com.datastax.oss.driver.internal.core.metrics.MetricsFactory;
@@ -39,15 +40,18 @@ public class FullNodeListRefreshTest {
3940

4041
private DefaultNode node1;
4142
private DefaultNode node2;
42-
private DefaultNode node3;
43+
private EndPoint endPoint3;
44+
private UUID hostId3;
4345

4446
@Before
4547
public void setup() {
4648
when(context.getMetricsFactory()).thenReturn(metricsFactory);
4749

4850
node1 = TestNodeFactory.newNode(1, context);
4951
node2 = TestNodeFactory.newNode(2, context);
50-
node3 = TestNodeFactory.newNode(3, context);
52+
53+
endPoint3 = TestNodeFactory.newEndPoint(3);
54+
hostId3 = UUID.randomUUID();
5155
}
5256

5357
@Test
@@ -64,18 +68,15 @@ public void should_add_and_remove_nodes() {
6468
.withEndPoint(node2.getEndPoint())
6569
.withHostId(node2.getHostId())
6670
.build(),
67-
DefaultNodeInfo.builder()
68-
.withEndPoint(node3.getEndPoint())
69-
.withHostId(node3.getHostId())
70-
.build());
71+
DefaultNodeInfo.builder().withEndPoint(endPoint3).withHostId(hostId3).build());
7172
FullNodeListRefresh refresh = new FullNodeListRefresh(newInfos);
7273

7374
// When
7475
MetadataRefresh.Result result = refresh.compute(oldMetadata, false, context);
7576

7677
// Then
77-
assertThat(result.newMetadata.getNodes())
78-
.containsOnlyKeys(node2.getHostId(), node3.getHostId());
78+
assertThat(result.newMetadata.getNodes()).containsOnlyKeys(node2.getHostId(), hostId3);
79+
DefaultNode node3 = (DefaultNode) result.newMetadata.getNodes().get(hostId3);
7980
assertThat(result.events)
8081
.containsOnly(NodeStateEvent.removed(node1), NodeStateEvent.added(node3));
8182
}

0 commit comments

Comments
 (0)