Skip to content

Commit 2708bd7

Browse files
committed
Merge pull request apache#596 from datastax/java1038
JAVA-1038: Fetch node info by rpc_address if it has a broadcast_address which is not in system.peers.
2 parents f4662d7 + a612215 commit 2708bd7

4 files changed

Lines changed: 109 additions & 4 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
- [improvement] JAVA-630: Don't process DOWN events for nodes that have active connections.
1717
- [improvement] JAVA-851: Improve UUIDs javadoc with regard to user-provided timestamps.
1818
- [improvement] JAVA-979: Update javadoc for RegularStatement toString() and getQueryString() to indicate that consistency level and other parameters are not maintained in the query string.
19+
- [improvement] JAVA-1038: Fetch node info by rpc_address if its broadcast_address is not in system.peers.
1920

2021
Merged from 2.0 branch:
2122

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,14 @@ private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, B
448448
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL))
449449
: new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS + " WHERE peer='" + host.listenAddress.getHostAddress() + '\''));
450450
c.write(future);
451-
return future.get().one();
451+
Row row = future.get().one();
452+
if (row != null) {
453+
return row;
454+
} else {
455+
logger.debug("Could not find peer with broadcast address {}, " +
456+
"falling back to a full system.peers scan to fetch info for {} " +
457+
"(this can happen if the broadcast address changed)", host.listenAddress, host);
458+
}
452459
}
453460

454461
// We have to fetch the whole peers table and find the host we're looking for

driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@
2222
import com.datastax.driver.core.utils.CassandraVersion;
2323
import com.google.common.base.Function;
2424
import com.google.common.collect.HashMultiset;
25+
import com.google.common.collect.ImmutableMap;
26+
import com.google.common.collect.ImmutableSet;
2527
import com.google.common.collect.Maps;
28+
import org.scassandra.http.client.PrimingRequest;
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
2831
import org.testng.annotations.Test;
2932

3033
import java.net.InetAddress;
3134
import java.net.InetSocketAddress;
35+
import java.net.UnknownHostException;
3236
import java.util.Collection;
3337
import java.util.Iterator;
3438
import java.util.Map;
@@ -37,9 +41,12 @@
3741

3842
import static com.datastax.driver.core.Assertions.assertThat;
3943
import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD;
44+
import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS;
45+
import static com.datastax.driver.core.ScassandraCluster.datacenter;
4046
import static com.datastax.driver.core.TestUtils.nonDebouncingQueryOptions;
4147
import static com.datastax.driver.core.TestUtils.nonQuietClusterCloseOptions;
4248
import static com.google.common.collect.Lists.newArrayList;
49+
import static org.scassandra.http.client.PrimingRequest.then;
4350

4451
@CreateCCM(PER_METHOD)
4552
@CCMConfig(dirtiesContext = true, createCluster = false)
@@ -215,6 +222,96 @@ public Integer apply(InetAddress input) {
215222

216223
}
217224

225+
/**
226+
* Ensures that when a node changes its broadcast address (for example, after
227+
* a shutdown and startup on EC2 and its public IP has changed),
228+
* the driver will be able to detect that change and recognize the host
229+
* in the system.peers table in spite of that change.
230+
*
231+
* @jira_ticket JAVA-1038
232+
* @expected_result The driver should be able to detect that a host has changed its broadcast address
233+
* and update its metadata accordingly.
234+
* @test_category control_connection
235+
* @since 2.1.10
236+
*/
237+
@SuppressWarnings("unchecked")
238+
@Test(groups = "short")
239+
@CCMConfig(createCcm = false)
240+
public void should_fetch_whole_peers_table_if_broadcast_address_changed() throws UnknownHostException {
241+
ScassandraCluster scassandras = ScassandraCluster.builder().withNodes(2).build();
242+
scassandras.init();
243+
244+
InetSocketAddress node2RpcAddress = scassandras.address(2);
245+
246+
Cluster cluster = Cluster.builder()
247+
.addContactPoints(scassandras.address(1).getAddress())
248+
.withPort(scassandras.getBinaryPort())
249+
.withNettyOptions(nonQuietClusterCloseOptions)
250+
.build();
251+
252+
try {
253+
254+
cluster.init();
255+
256+
Host host2 = cluster.getMetadata().getHost(node2RpcAddress);
257+
assertThat(host2).isNotNull();
258+
259+
InetAddress node2OldBroadcastAddress = host2.listenAddress;
260+
InetAddress node2NewBroadcastAddress = InetAddress.getByName("1.2.3.4");
261+
262+
// host 2 has the old broadcast_address (which is identical to its rpc_broadcast_address)
263+
assertThat(host2.getAddress())
264+
.isEqualTo(node2OldBroadcastAddress);
265+
266+
// simulate a change in host 2 public IP
267+
Map<String, ?> rows = ImmutableMap.<String, Object>builder()
268+
.put("peer", node2NewBroadcastAddress) // new broadcast address for host 2
269+
.put("rpc_address", host2.getAddress()) // rpc_broadcast_address remains unchanged
270+
.put("data_center", datacenter(1))
271+
.put("rack", "rack1")
272+
.put("release_version", "2.1.8")
273+
.put("tokens", ImmutableSet.of(Long.toString(scassandras.getTokensForDC(1).get(1))))
274+
.build();
275+
276+
scassandras.node(1).primingClient().clearAllPrimes();
277+
278+
// the driver will attempt to locate host2 in system.peers by its old broadcast address, and that will fail
279+
scassandras.node(1).primingClient().prime(PrimingRequest.queryBuilder()
280+
.withQuery("SELECT * FROM system.peers WHERE peer='" + node2OldBroadcastAddress + "'")
281+
.withThen(then()
282+
.withColumnTypes(SELECT_PEERS)
283+
.build())
284+
.build());
285+
286+
// the driver will then attempt to fetch the whole system.peers
287+
scassandras.node(1).primingClient().prime(PrimingRequest.queryBuilder()
288+
.withQuery("SELECT * FROM system.peers")
289+
.withThen(then()
290+
.withColumnTypes(SELECT_PEERS)
291+
.withRows(rows)
292+
.build())
293+
.build());
294+
295+
assertThat(cluster.manager.controlConnection.refreshNodeInfo(host2)).isTrue();
296+
297+
host2 = cluster.getMetadata().getHost(node2RpcAddress);
298+
299+
// host2 should now have a new broadcast address
300+
assertThat(host2).isNotNull();
301+
assertThat(host2.listenAddress)
302+
.isEqualTo(node2NewBroadcastAddress);
303+
304+
// host 2 should keep its old rpc broadcast address
305+
assertThat(host2.getSocketAddress())
306+
.isEqualTo(node2RpcAddress);
307+
308+
} finally {
309+
cluster.close();
310+
scassandras.stop();
311+
}
312+
313+
}
314+
218315
static class QueryPlanCountingPolicy extends DelegatingLoadBalancingPolicy {
219316

220317
final AtomicInteger counter = new AtomicInteger();

driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public void start(Cluster cluster, int dc, int node) {
259259
start(cluster, ipSuffix(dc, node));
260260
}
261261

262-
private List<Long> getTokensForDC(int dc) {
262+
public List<Long> getTokensForDC(int dc) {
263263
// Offset DCs by dc * 100 to ensure unique tokens.
264264
int offset = (dc - 1) * 100;
265265
int dcNodeCount = nodes(dc).size();
@@ -367,7 +367,7 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu
367367
: defaultValue;
368368
}
369369

370-
static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_PEERS = {
370+
public static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_PEERS = {
371371
column("peer", INET),
372372
column("rpc_address", INET),
373373
column("data_center", TEXT),
@@ -376,7 +376,7 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu
376376
column("tokens", set(TEXT))
377377
};
378378

379-
static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_LOCAL = {
379+
public static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_LOCAL = {
380380
column("key", TEXT),
381381
column("bootstrapped", TEXT),
382382
column("broadcast_address", INET),

0 commit comments

Comments
 (0)