Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [improvement] JAVA-630: Don't process DOWN events for nodes that have active connections.
- [improvement] JAVA-851: Improve UUIDs javadoc with regard to user-provided timestamps.
- [improvement] JAVA-979: Update javadoc for RegularStatement toString() and getQueryString() to indicate that consistency level and other parameters are not maintained in the query string.
- [improvement] JAVA-1038: Fetch node info by rpc_address if its broadcast_address is not in system.peers.

Merged from 2.0 branch:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,14 @@ private Row fetchNodeInfo(Host host, Connection c) throws ConnectionException, B
? new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL))
: new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS + " WHERE peer='" + host.listenAddress.getHostAddress() + '\''));
c.write(future);
return future.get().one();
Row row = future.get().one();
if (row != null) {
return row;
} else {
logger.debug("Could not find peer with broadcast address {}, " +
"falling back to a full system.peers scan to fetch info for {} " +
"(this can happen if the broadcast address changed)", host.listenAddress, host);
}
}

// We have to fetch the whole peers table and find the host we're looking for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import com.datastax.driver.core.utils.CassandraVersion;
import com.google.common.base.Function;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.scassandra.http.client.PrimingRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -37,9 +41,12 @@

import static com.datastax.driver.core.Assertions.assertThat;
import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD;
import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS;
import static com.datastax.driver.core.ScassandraCluster.datacenter;
import static com.datastax.driver.core.TestUtils.nonDebouncingQueryOptions;
import static com.datastax.driver.core.TestUtils.nonQuietClusterCloseOptions;
import static com.google.common.collect.Lists.newArrayList;
import static org.scassandra.http.client.PrimingRequest.then;

@CreateCCM(PER_METHOD)
@CCMConfig(dirtiesContext = true, createCluster = false)
Expand Down Expand Up @@ -215,6 +222,96 @@ public Integer apply(InetAddress input) {

}

/**
* Ensures that when a node changes its broadcast address (for example, after
* a shutdown and startup on EC2 and its public IP has changed),
* the driver will be able to detect that change and recognize the host
* in the system.peers table in spite of that change.
*
* @jira_ticket JAVA-1038
* @expected_result The driver should be able to detect that a host has changed its broadcast address
* and update its metadata accordingly.
* @test_category control_connection
* @since 2.1.10
*/
@SuppressWarnings("unchecked")
@Test(groups = "short")
@CCMConfig(createCcm = false)
public void should_fetch_whole_peers_table_if_broadcast_address_changed() throws UnknownHostException {
ScassandraCluster scassandras = ScassandraCluster.builder().withNodes(2).build();
scassandras.init();

InetSocketAddress node2RpcAddress = scassandras.address(2);

Cluster cluster = Cluster.builder()
.addContactPoints(scassandras.address(1).getAddress())
.withPort(scassandras.getBinaryPort())
.withNettyOptions(nonQuietClusterCloseOptions)
.build();

try {

cluster.init();

Host host2 = cluster.getMetadata().getHost(node2RpcAddress);
assertThat(host2).isNotNull();

InetAddress node2OldBroadcastAddress = host2.listenAddress;
InetAddress node2NewBroadcastAddress = InetAddress.getByName("1.2.3.4");

// host 2 has the old broadcast_address (which is identical to its rpc_broadcast_address)
assertThat(host2.getAddress())
.isEqualTo(node2OldBroadcastAddress);

// simulate a change in host 2 public IP
Map<String, ?> rows = ImmutableMap.<String, Object>builder()
.put("peer", node2NewBroadcastAddress) // new broadcast address for host 2
.put("rpc_address", host2.getAddress()) // rpc_broadcast_address remains unchanged
.put("data_center", datacenter(1))
.put("rack", "rack1")
.put("release_version", "2.1.8")
.put("tokens", ImmutableSet.of(Long.toString(scassandras.getTokensForDC(1).get(1))))
.build();

scassandras.node(1).primingClient().clearAllPrimes();

// the driver will attempt to locate host2 in system.peers by its old broadcast address, and that will fail
scassandras.node(1).primingClient().prime(PrimingRequest.queryBuilder()
.withQuery("SELECT * FROM system.peers WHERE peer='" + node2OldBroadcastAddress + "'")
.withThen(then()
.withColumnTypes(SELECT_PEERS)
.build())
.build());

// the driver will then attempt to fetch the whole system.peers
scassandras.node(1).primingClient().prime(PrimingRequest.queryBuilder()
.withQuery("SELECT * FROM system.peers")
.withThen(then()
.withColumnTypes(SELECT_PEERS)
.withRows(rows)
.build())
.build());

assertThat(cluster.manager.controlConnection.refreshNodeInfo(host2)).isTrue();

host2 = cluster.getMetadata().getHost(node2RpcAddress);

// host2 should now have a new broadcast address
assertThat(host2).isNotNull();
assertThat(host2.listenAddress)
.isEqualTo(node2NewBroadcastAddress);

// host 2 should keep its old rpc broadcast address
assertThat(host2.getSocketAddress())
.isEqualTo(node2RpcAddress);

} finally {
cluster.close();
scassandras.stop();
}

}

static class QueryPlanCountingPolicy extends DelegatingLoadBalancingPolicy {

final AtomicInteger counter = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void start(Cluster cluster, int dc, int node) {
start(cluster, ipSuffix(dc, node));
}

private List<Long> getTokensForDC(int dc) {
public List<Long> getTokensForDC(int dc) {
// Offset DCs by dc * 100 to ensure unique tokens.
int offset = (dc - 1) * 100;
int dcNodeCount = nodes(dc).size();
Expand Down Expand Up @@ -367,7 +367,7 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu
: defaultValue;
}

static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_PEERS = {
public static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_PEERS = {
column("peer", INET),
column("rpc_address", INET),
column("data_center", TEXT),
Expand All @@ -376,7 +376,7 @@ private Object getPeerInfo(int dc, int node, String property, Object defaultValu
column("tokens", set(TEXT))
};

static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_LOCAL = {
public static final org.scassandra.http.client.types.ColumnMetadata[] SELECT_LOCAL = {
column("key", TEXT),
column("bootstrapped", TEXT),
column("broadcast_address", INET),
Expand Down