Skip to content

Commit 310cd33

Browse files
Alexandre Dutraolim7t
authored andcommitted
JAVA-2303: Ignore peer rows matching the control host's RPC address
1 parent c55ff4c commit 310cd33

3 files changed

Lines changed: 156 additions & 23 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.2.0 (in progress)
66

7+
- [improvement] JAVA-2303: Ignore peer rows matching the control host's RPC address
78
- [improvement] JAVA-2236: Add methods to set the auth provider programmatically
89
- [improvement] JAVA-2369: Change mapper annotations retention to runtime
910
- [improvement] JAVA-2365: Redeclare default constants when an enum is abstracted behind an

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,18 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
197197
(controlNodeResult, peersResult) -> {
198198
List<NodeInfo> nodeInfos = new ArrayList<>();
199199
AdminRow localRow = controlNodeResult.iterator().next();
200-
InetSocketAddress localBroadcastRpcAddress = getBroadcastRpcAddress(localRow);
200+
InetSocketAddress localBroadcastRpcAddress =
201+
getBroadcastRpcAddress(localRow, localEndPoint);
201202
nodeInfos.add(nodeInfoBuilder(localRow, localBroadcastRpcAddress, localEndPoint).build());
202203
for (AdminRow peerRow : peersResult) {
203204
if (isPeerValid(peerRow)) {
204-
InetSocketAddress peerBroadcastRpcAddress = getBroadcastRpcAddress(peerRow);
205-
NodeInfo nodeInfo =
206-
nodeInfoBuilder(peerRow, peerBroadcastRpcAddress, localEndPoint).build();
207-
nodeInfos.add(nodeInfo);
205+
InetSocketAddress peerBroadcastRpcAddress =
206+
getBroadcastRpcAddress(peerRow, localEndPoint);
207+
if (peerBroadcastRpcAddress != null) {
208+
NodeInfo nodeInfo =
209+
nodeInfoBuilder(peerRow, peerBroadcastRpcAddress, localEndPoint).build();
210+
nodeInfos.add(nodeInfo);
211+
}
208212
}
209213
}
210214
return nodeInfos;
@@ -263,8 +267,10 @@ private Optional<NodeInfo> firstPeerRowAsNodeInfo(AdminResult result, EndPoint l
263267
if (iterator.hasNext()) {
264268
AdminRow row = iterator.next();
265269
if (isPeerValid(row)) {
266-
InetSocketAddress peerBroadcastRpcAddress = getBroadcastRpcAddress(row);
267-
return Optional.of(nodeInfoBuilder(row, peerBroadcastRpcAddress, localEndPoint).build());
270+
return Optional.ofNullable(getBroadcastRpcAddress(row, localEndPoint))
271+
.map(
272+
broadcastRpcAddress ->
273+
nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
268274
}
269275
}
270276
return Optional.empty();
@@ -364,7 +370,7 @@ protected DefaultNodeInfo.Builder nodeInfoBuilder(
364370
private Optional<NodeInfo> findInPeers(
365371
AdminResult result, InetSocketAddress broadcastRpcAddressToFind, EndPoint localEndPoint) {
366372
for (AdminRow row : result) {
367-
InetSocketAddress broadcastRpcAddress = getBroadcastRpcAddress(row);
373+
InetSocketAddress broadcastRpcAddress = getBroadcastRpcAddress(row, localEndPoint);
368374
if (broadcastRpcAddress != null
369375
&& broadcastRpcAddress.equals(broadcastRpcAddressToFind)
370376
&& isPeerValid(row)) {
@@ -383,8 +389,10 @@ private Optional<NodeInfo> findInPeers(
383389
for (AdminRow row : result) {
384390
UUID hostId = row.getUuid("host_id");
385391
if (hostId != null && hostId.equals(hostIdToFind) && isPeerValid(row)) {
386-
InetSocketAddress broadcastRpcAddress = getBroadcastRpcAddress(row);
387-
return Optional.of(nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
392+
return Optional.ofNullable(getBroadcastRpcAddress(row, localEndPoint))
393+
.map(
394+
broadcastRpcAddress ->
395+
nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
388396
}
389397
}
390398
LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, hostIdToFind);
@@ -407,11 +415,17 @@ private void savePort(DriverChannel channel) {
407415
* Determines the broadcast RPC address of the node represented by the given row.
408416
*
409417
* @param row The row to inspect; can represent either a local (control) node or a peer node.
418+
* @param localEndPoint the control node endpoint that was used to query the node's system tables.
419+
* This is a parameter because it would be racy to call {@code
420+
* controlConnection.channel().getEndPoint()} from within this method, as the control
421+
* connection may have changed its channel since. So this parameter must be provided by the
422+
* caller.
410423
* @return the broadcast RPC address of the node, if it could be determined; or {@code null}
411424
* otherwise.
412425
*/
413426
@Nullable
414-
protected InetSocketAddress getBroadcastRpcAddress(@NonNull AdminRow row) {
427+
protected InetSocketAddress getBroadcastRpcAddress(
428+
@NonNull AdminRow row, @NonNull EndPoint localEndPoint) {
415429
// in system.peers or system.local
416430
InetAddress broadcastRpcInetAddress = row.getInetAddress("rpc_address");
417431
if (broadcastRpcInetAddress == null) {
@@ -434,7 +448,22 @@ protected InetSocketAddress getBroadcastRpcAddress(@NonNull AdminRow row) {
434448
broadcastRpcPort = port == -1 ? 0 : port;
435449
}
436450
}
437-
return new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
451+
InetSocketAddress broadcastRpcAddress =
452+
new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
453+
if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.resolve())) {
454+
// JAVA-2303: if the peer is actually the control node, ignore that peer as it is likely
455+
// a misconfiguration problem.
456+
LOG.warn(
457+
"[{}] Control node {} has an entry for itself in {}: this entry will be ignored. "
458+
+ "This is likely due to a misconfiguration; please verify your rpc_address "
459+
+ "configuration in cassandra.yaml on all nodes in your cluster.",
460+
logPrefix,
461+
localEndPoint,
462+
retrievePeerTableName());
463+
return null;
464+
}
465+
466+
return broadcastRpcAddress;
438467
}
439468

440469
/**

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java

Lines changed: 114 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@
1818
import static com.datastax.oss.driver.Assertions.assertThat;
1919
import static com.datastax.oss.driver.Assertions.assertThatStage;
2020
import static org.assertj.core.api.Assertions.fail;
21+
import static org.assertj.core.api.Assertions.filter;
2122
import static org.mockito.ArgumentMatchers.anyString;
23+
import static org.mockito.Mockito.atLeast;
2224
import static org.mockito.Mockito.mock;
2325
import static org.mockito.Mockito.never;
2426
import static org.mockito.Mockito.spy;
2527
import static org.mockito.Mockito.times;
2628
import static org.mockito.Mockito.verify;
2729
import static org.mockito.Mockito.when;
2830

31+
import ch.qos.logback.classic.Level;
32+
import ch.qos.logback.classic.Logger;
33+
import ch.qos.logback.classic.spi.ILoggingEvent;
34+
import ch.qos.logback.core.Appender;
2935
import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
3036
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3137
import com.datastax.oss.driver.api.core.config.DriverConfig;
@@ -62,16 +68,19 @@
6268
import java.util.UUID;
6369
import java.util.concurrent.CompletableFuture;
6470
import java.util.concurrent.CompletionStage;
71+
import org.junit.After;
6572
import org.junit.Before;
6673
import org.junit.Test;
6774
import org.junit.runner.RunWith;
75+
import org.mockito.ArgumentCaptor;
76+
import org.mockito.Captor;
6877
import org.mockito.Mock;
6978
import org.mockito.MockitoAnnotations;
79+
import org.slf4j.LoggerFactory;
7080

7181
@RunWith(DataProviderRunner.class)
7282
public class DefaultTopologyMonitorTest {
7383

74-
private static final InetSocketAddress ADDRESS1 = new InetSocketAddress("127.0.0.1", 9042);
7584
private static final InetSocketAddress ADDRESS2 = new InetSocketAddress("127.0.0.2", 9042);
7685

7786
@Mock private InternalDriverContext context;
@@ -81,11 +90,17 @@ public class DefaultTopologyMonitorTest {
8190
@Mock private DriverChannel channel;
8291
@Mock protected MetricsFactory metricsFactory;
8392

93+
@Mock private Appender<ILoggingEvent> appender;
94+
@Captor private ArgumentCaptor<ILoggingEvent> loggingEventCaptor;
95+
8496
private DefaultNode node1;
8597
private DefaultNode node2;
8698

8799
private TestTopologyMonitor topologyMonitor;
88100

101+
private Logger logger;
102+
private Level initialLogLevel;
103+
89104
@Before
90105
public void setup() {
91106
MockitoAnnotations.initMocks(this);
@@ -107,6 +122,17 @@ public void setup() {
107122
when(context.getControlConnection()).thenReturn(controlConnection);
108123

109124
topologyMonitor = new TestTopologyMonitor(context);
125+
126+
logger = (Logger) LoggerFactory.getLogger(DefaultTopologyMonitor.class);
127+
initialLogLevel = logger.getLevel();
128+
logger.setLevel(Level.INFO);
129+
logger.addAppender(appender);
130+
}
131+
132+
@After
133+
public void teardown() {
134+
logger.detachAppender(appender);
135+
logger.setLevel(initialLogLevel);
110136
}
111137

112138
@Test
@@ -239,23 +265,23 @@ public void should_refresh_node_from_peers_if_broadcast_address_is_not_present_V
239265
@Test
240266
public void should_get_new_node_from_peers() {
241267
// Given
242-
AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
243-
AdminRow peer2 = mockPeersRow(2, node2.getHostId());
244-
AdminRow peer1 = mockPeersRow(1, node1.getHostId());
268+
AdminRow peer3 = mockPeersRow(4, UUID.randomUUID());
269+
AdminRow peer2 = mockPeersRow(3, node2.getHostId());
270+
AdminRow peer1 = mockPeersRow(2, node1.getHostId());
245271
topologyMonitor.isSchemaV2 = false;
246272
topologyMonitor.stubQueries(
247273
new StubbedQuery("SELECT * FROM system.peers", mockResult(peer3, peer2, peer1)));
248274

249275
// When
250-
CompletionStage<Optional<NodeInfo>> futureInfo = topologyMonitor.getNewNodeInfo(ADDRESS1);
276+
CompletionStage<Optional<NodeInfo>> futureInfo = topologyMonitor.getNewNodeInfo(ADDRESS2);
251277

252278
// Then
253279
assertThatStage(futureInfo)
254280
.isSuccess(
255281
maybeInfo -> {
256282
assertThat(maybeInfo.isPresent()).isTrue();
257283
NodeInfo info = maybeInfo.get();
258-
assertThat(info.getDatacenter()).isEqualTo("dc1");
284+
assertThat(info.getDatacenter()).isEqualTo("dc2");
259285
});
260286
// The rpc_address in each row should have been tried, only the last row should have been
261287
// converted
@@ -272,23 +298,23 @@ public void should_get_new_node_from_peers() {
272298
@Test
273299
public void should_get_new_node_from_peers_v2() {
274300
// Given
275-
AdminRow peer3 = mockPeersV2Row(3, UUID.randomUUID());
276-
AdminRow peer2 = mockPeersV2Row(2, node2.getHostId());
277-
AdminRow peer1 = mockPeersV2Row(1, node1.getHostId());
301+
AdminRow peer3 = mockPeersV2Row(4, UUID.randomUUID());
302+
AdminRow peer2 = mockPeersV2Row(3, node2.getHostId());
303+
AdminRow peer1 = mockPeersV2Row(2, node1.getHostId());
278304
topologyMonitor.isSchemaV2 = true;
279305
topologyMonitor.stubQueries(
280306
new StubbedQuery("SELECT * FROM system.peers_v2", mockResult(peer3, peer2, peer1)));
281307

282308
// When
283-
CompletionStage<Optional<NodeInfo>> futureInfo = topologyMonitor.getNewNodeInfo(ADDRESS1);
309+
CompletionStage<Optional<NodeInfo>> futureInfo = topologyMonitor.getNewNodeInfo(ADDRESS2);
284310

285311
// Then
286312
assertThatStage(futureInfo)
287313
.isSuccess(
288314
maybeInfo -> {
289315
assertThat(maybeInfo.isPresent()).isTrue();
290316
NodeInfo info = maybeInfo.get();
291-
assertThat(info.getDatacenter()).isEqualTo("dc1");
317+
assertThat(info.getDatacenter()).isEqualTo("dc2");
292318
});
293319
// The natove in each row should have been tried, only the last row should have been
294320
// converted
@@ -358,6 +384,10 @@ public void should_skip_invalid_peers_row(String columnToCheck) {
358384
// Then
359385
assertThatStage(futureInfo).isSuccess(maybeInfo -> assertThat(maybeInfo).isEmpty());
360386
assertThat(node2.broadcastAddress).isNotNull().isEqualTo(ADDRESS2);
387+
assertLog(
388+
Level.WARN,
389+
"[null] Found invalid row in system.peers for peer: /127.0.0.2. "
390+
+ "This is likely a gossip or snitch issue, this node will be ignored.");
361391
}
362392

363393
@Test
@@ -390,6 +420,10 @@ public void should_skip_invalid_peers_row_v2(String columnToCheck) {
390420
// Then
391421
assertThatStage(futureInfo).isSuccess(maybeInfo -> assertThat(maybeInfo).isEmpty());
392422
assertThat(node2.broadcastAddress).isNotNull().isEqualTo(ADDRESS2);
423+
assertLog(
424+
Level.WARN,
425+
"[null] Found invalid row in system.peers_v2 for peer: /127.0.0.2. "
426+
+ "This is likely a gossip or snitch issue, this node will be ignored.");
393427
}
394428

395429
@DataProvider
@@ -415,6 +449,67 @@ public void should_stop_executing_queries_once_closed() {
415449
.isFailed(error -> assertThat(error).isInstanceOf(IllegalStateException.class));
416450
}
417451

452+
@Test
453+
public void should_warn_when_control_host_found_in_system_peers() {
454+
// Given
455+
AdminRow local = mockLocalRow(1, node1.getHostId());
456+
AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
457+
AdminRow peer2 = mockPeersRow(2, node2.getHostId());
458+
AdminRow peer1 = mockPeersRow(1, node2.getHostId()); // invalid
459+
topologyMonitor.stubQueries(
460+
new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
461+
new StubbedQuery("SELECT * FROM system.peers_v2", Collections.emptyMap(), null, true),
462+
new StubbedQuery("SELECT * FROM system.peers", mockResult(peer3, peer2, peer1)));
463+
464+
// When
465+
CompletionStage<Iterable<NodeInfo>> futureInfos = topologyMonitor.refreshNodeList();
466+
467+
// Then
468+
assertThatStage(futureInfos)
469+
.isSuccess(
470+
infos ->
471+
assertThat(infos)
472+
.hasSize(3)
473+
.extractingResultOf("getEndPoint")
474+
.containsOnlyOnce(node1.getEndPoint()));
475+
assertLog(
476+
Level.WARN,
477+
"[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers: "
478+
+ "this entry will be ignored. This is likely due to a misconfiguration; "
479+
+ "please verify your rpc_address configuration in cassandra.yaml on "
480+
+ "all nodes in your cluster.");
481+
}
482+
483+
@Test
484+
public void should_warn_when_control_host_found_in_system_peers_v2() {
485+
// Given
486+
AdminRow local = mockLocalRow(1, node1.getHostId());
487+
AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
488+
AdminRow peer2 = mockPeersRow(2, node2.getHostId());
489+
AdminRow peer1 = mockPeersRow(1, node2.getHostId()); // invalid
490+
topologyMonitor.stubQueries(
491+
new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
492+
new StubbedQuery("SELECT * FROM system.peers_v2", mockResult(peer3, peer2, peer1)));
493+
494+
// When
495+
CompletionStage<Iterable<NodeInfo>> futureInfos = topologyMonitor.refreshNodeList();
496+
497+
// Then
498+
assertThatStage(futureInfos)
499+
.isSuccess(
500+
infos ->
501+
assertThat(infos)
502+
.hasSize(3)
503+
.extractingResultOf("getEndPoint")
504+
.containsOnlyOnce(node1.getEndPoint()));
505+
assertLog(
506+
Level.WARN,
507+
"[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers_v2: "
508+
+ "this entry will be ignored. This is likely due to a misconfiguration; "
509+
+ "please verify your rpc_address configuration in cassandra.yaml on "
510+
+ "all nodes in your cluster.");
511+
}
512+
418513
/** Mocks the query execution logic. */
419514
private static class TestTopologyMonitor extends DefaultTopologyMonitor {
420515

@@ -539,4 +634,12 @@ private AdminResult mockResult(AdminRow... rows) {
539634
when(result.iterator()).thenReturn(Iterators.forArray(rows));
540635
return result;
541636
}
637+
638+
private void assertLog(Level level, String message) {
639+
verify(appender, atLeast(1)).doAppend(loggingEventCaptor.capture());
640+
Iterable<ILoggingEvent> logs =
641+
filter(loggingEventCaptor.getAllValues()).with("level", level).get();
642+
assertThat(logs).hasSize(1);
643+
assertThat(logs.iterator().next().getFormattedMessage()).contains(message);
644+
}
542645
}

0 commit comments

Comments
 (0)