diff --git a/changelog/README.md b/changelog/README.md index d877baf4d6a..238982c87eb 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -16,6 +16,7 @@ - [improvement] JAVA-630: Don't process DOWN events for nodes that have active connections. - [improvement] JAVA-851: Improve UUIDs javadoc with regard to user-provided timestamps. - [improvement] JAVA-979: Update javadoc for RegularStatement toString() and getQueryString() to indicate that consistency level and other parameters are not maintained in the query string. +- [improvement] JAVA-1038: Fetch node info by rpc_address if its broadcast_address is not in system.peers. Merged from 2.0 branch: diff --git a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java index 23df03059d3..5ecefb447ff 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java @@ -448,7 +448,14 @@ private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, B ? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL)) : new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS + " WHERE peer='" + host.listenAddress.getHostAddress() + '\'')); c.write(future); - return future.get().one(); + Row row = future.get().one(); + if (row != null) { + return row; + } else { + logger.debug("Could not find peer with broadcast address {}, " + + "falling back to a full system.peers scan to fetch info for {} " + + "(this can happen if the broadcast address changed)", host.listenAddress, host); + } } // We have to fetch the whole peers table and find the host we're looking for diff --git a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java index 848edb8bc45..6ef86de86db 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java @@ -22,13 +22,17 @@ import com.datastax.driver.core.utils.CassandraVersion; import com.google.common.base.Function; import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import org.scassandra.http.client.PrimingRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -37,9 +41,12 @@ import static com.datastax.driver.core.Assertions.assertThat; import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD; +import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS; +import static com.datastax.driver.core.ScassandraCluster.datacenter; import static com.datastax.driver.core.TestUtils.nonDebouncingQueryOptions; import static com.datastax.driver.core.TestUtils.nonQuietClusterCloseOptions; import static com.google.common.collect.Lists.newArrayList; +import static org.scassandra.http.client.PrimingRequest.then; @CreateCCM(PER_METHOD) @CCMConfig(dirtiesContext = true, createCluster = false) @@ -215,6 +222,96 @@ public Integer apply(InetAddress input) { } + /** + * Ensures that when a node changes its broadcast address (for example, after + * a shutdown and startup on EC2 and its public IP has changed), + * the driver will be able to detect that change and recognize the host + * in the system.peers table in spite of that change. + * + * @jira_ticket JAVA-1038 + * @expected_result The driver should be able to detect that a host has changed its broadcast address + * and update its metadata accordingly. + * @test_category control_connection + * @since 2.1.10 + */ + @SuppressWarnings("unchecked") + @Test(groups = "short") + @CCMConfig(createCcm = false) + public void should_fetch_whole_peers_table_if_broadcast_address_changed() throws UnknownHostException { + ScassandraCluster scassandras = ScassandraCluster.builder().withNodes(2).build(); + scassandras.init(); + + InetSocketAddress node2RpcAddress = scassandras.address(2); + + Cluster cluster = Cluster.builder() + .addContactPoints(scassandras.address(1).getAddress()) + .withPort(scassandras.getBinaryPort()) + .withNettyOptions(nonQuietClusterCloseOptions) + .build(); + + try { + + cluster.init(); + + Host host2 = cluster.getMetadata().getHost(node2RpcAddress); + assertThat(host2).isNotNull(); + + InetAddress node2OldBroadcastAddress = host2.listenAddress; + InetAddress node2NewBroadcastAddress = InetAddress.getByName("1.2.3.4"); + + // host 2 has the old broadcast_address (which is identical to its rpc_broadcast_address) + assertThat(host2.getAddress()) + .isEqualTo(node2OldBroadcastAddress); + + // simulate a change in host 2 public IP + Map rows = ImmutableMap.builder() + .put("peer", node2NewBroadcastAddress) // new broadcast address for host 2 + .put("rpc_address", host2.getAddress()) // rpc_broadcast_address remains unchanged + .put("data_center", datacenter(1)) + .put("rack", "rack1") + .put("release_version", "2.1.8") + .put("tokens", ImmutableSet.of(Long.toString(scassandras.getTokensForDC(1).get(1)))) + .build(); + + scassandras.node(1).primingClient().clearAllPrimes(); + + // the driver will attempt to locate host2 in system.peers by its old broadcast address, and that will fail + scassandras.node(1).primingClient().prime(PrimingRequest.queryBuilder() + .withQuery("SELECT * FROM system.peers WHERE peer='" + node2OldBroadcastAddress + "'") + .withThen(then() + .withColumnTypes(SELECT_PEERS) + .build()) + .build()); + + // the driver will then attempt to fetch the whole system.peers + scassandras.node(1).primingClient().prime(PrimingRequest.queryBuilder() + .withQuery("SELECT * FROM system.peers") + .withThen(then() + .withColumnTypes(SELECT_PEERS) + .withRows(rows) + .build()) + .build()); + + assertThat(cluster.manager.controlConnection.refreshNodeInfo(host2)).isTrue(); + + host2 = cluster.getMetadata().getHost(node2RpcAddress); + + // host2 should now have a new broadcast address + assertThat(host2).isNotNull(); + assertThat(host2.listenAddress) + .isEqualTo(node2NewBroadcastAddress); + + // host 2 should keep its old rpc broadcast address + assertThat(host2.getSocketAddress()) + .isEqualTo(node2RpcAddress); + + } finally { + cluster.close(); + scassandras.stop(); + } + + } + static class QueryPlanCountingPolicy extends DelegatingLoadBalancingPolicy { final AtomicInteger counter = new AtomicInteger(); diff --git a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java index 188dc8092c4..0ce8d72d911 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java +++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java @@ -259,7 +259,7 @@ public void start(Cluster cluster, int dc, int node) { start(cluster, ipSuffix(dc, node)); } - private List getTokensForDC(int dc) { + public List getTokensForDC(int dc) { // Offset DCs by dc * 100 to ensure unique tokens. int offset = (dc - 1) * 100; int dcNodeCount = nodes(dc).size(); @@ -367,7 +367,7 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu : defaultValue; } - static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_PEERS = { + public static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_PEERS = { column("peer", INET), column("rpc_address", INET), column("data_center", TEXT), @@ -376,7 +376,7 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu column("tokens", set(TEXT)) }; - static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_LOCAL = { + public static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_LOCAL = { column("key", TEXT), column("bootstrapped", TEXT), column("broadcast_address", INET),