1818import static com .datastax .oss .driver .Assertions .assertThat ;
1919import static com .datastax .oss .driver .Assertions .assertThatStage ;
2020import static org .assertj .core .api .Assertions .fail ;
21+ import static org .assertj .core .api .Assertions .filter ;
2122import static org .mockito .ArgumentMatchers .anyString ;
23+ import static org .mockito .Mockito .atLeast ;
2224import static org .mockito .Mockito .mock ;
2325import static org .mockito .Mockito .never ;
2426import static org .mockito .Mockito .spy ;
2527import static org .mockito .Mockito .times ;
2628import static org .mockito .Mockito .verify ;
2729import static org .mockito .Mockito .when ;
2830
31+ import ch .qos .logback .classic .Level ;
32+ import ch .qos .logback .classic .Logger ;
33+ import ch .qos .logback .classic .spi .ILoggingEvent ;
34+ import ch .qos .logback .core .Appender ;
2935import com .datastax .oss .driver .api .core .addresstranslation .AddressTranslator ;
3036import com .datastax .oss .driver .api .core .config .DefaultDriverOption ;
3137import com .datastax .oss .driver .api .core .config .DriverConfig ;
6268import java .util .UUID ;
6369import java .util .concurrent .CompletableFuture ;
6470import java .util .concurrent .CompletionStage ;
71+ import org .junit .After ;
6572import org .junit .Before ;
6673import org .junit .Test ;
6774import org .junit .runner .RunWith ;
75+ import org .mockito .ArgumentCaptor ;
76+ import org .mockito .Captor ;
6877import org .mockito .Mock ;
6978import org .mockito .MockitoAnnotations ;
79+ import org .slf4j .LoggerFactory ;
7080
7181@ RunWith (DataProviderRunner .class )
7282public class DefaultTopologyMonitorTest {
7383
74- private static final InetSocketAddress ADDRESS1 = new InetSocketAddress ("127.0.0.1" , 9042 );
7584 private static final InetSocketAddress ADDRESS2 = new InetSocketAddress ("127.0.0.2" , 9042 );
7685
7786 @ Mock private InternalDriverContext context ;
@@ -81,11 +90,17 @@ public class DefaultTopologyMonitorTest {
8190 @ Mock private DriverChannel channel ;
8291 @ Mock protected MetricsFactory metricsFactory ;
8392
93+ @ Mock private Appender <ILoggingEvent > appender ;
94+ @ Captor private ArgumentCaptor <ILoggingEvent > loggingEventCaptor ;
95+
8496 private DefaultNode node1 ;
8597 private DefaultNode node2 ;
8698
8799 private TestTopologyMonitor topologyMonitor ;
88100
101+ private Logger logger ;
102+ private Level initialLogLevel ;
103+
89104 @ Before
90105 public void setup () {
91106 MockitoAnnotations .initMocks (this );
@@ -107,6 +122,17 @@ public void setup() {
107122 when (context .getControlConnection ()).thenReturn (controlConnection );
108123
109124 topologyMonitor = new TestTopologyMonitor (context );
125+
126+ logger = (Logger ) LoggerFactory .getLogger (DefaultTopologyMonitor .class );
127+ initialLogLevel = logger .getLevel ();
128+ logger .setLevel (Level .INFO );
129+ logger .addAppender (appender );
130+ }
131+
132+ @ After
133+ public void teardown () {
134+ logger .detachAppender (appender );
135+ logger .setLevel (initialLogLevel );
110136 }
111137
112138 @ Test
@@ -239,23 +265,23 @@ public void should_refresh_node_from_peers_if_broadcast_address_is_not_present_V
239265 @ Test
240266 public void should_get_new_node_from_peers () {
241267 // Given
242- AdminRow peer3 = mockPeersRow (3 , UUID .randomUUID ());
243- AdminRow peer2 = mockPeersRow (2 , node2 .getHostId ());
244- AdminRow peer1 = mockPeersRow (1 , node1 .getHostId ());
268+ AdminRow peer3 = mockPeersRow (4 , UUID .randomUUID ());
269+ AdminRow peer2 = mockPeersRow (3 , node2 .getHostId ());
270+ AdminRow peer1 = mockPeersRow (2 , node1 .getHostId ());
245271 topologyMonitor .isSchemaV2 = false ;
246272 topologyMonitor .stubQueries (
247273 new StubbedQuery ("SELECT * FROM system.peers" , mockResult (peer3 , peer2 , peer1 )));
248274
249275 // When
250- CompletionStage <Optional <NodeInfo >> futureInfo = topologyMonitor .getNewNodeInfo (ADDRESS1 );
276+ CompletionStage <Optional <NodeInfo >> futureInfo = topologyMonitor .getNewNodeInfo (ADDRESS2 );
251277
252278 // Then
253279 assertThatStage (futureInfo )
254280 .isSuccess (
255281 maybeInfo -> {
256282 assertThat (maybeInfo .isPresent ()).isTrue ();
257283 NodeInfo info = maybeInfo .get ();
258- assertThat (info .getDatacenter ()).isEqualTo ("dc1 " );
284+ assertThat (info .getDatacenter ()).isEqualTo ("dc2 " );
259285 });
260286 // The rpc_address in each row should have been tried, only the last row should have been
261287 // converted
@@ -272,23 +298,23 @@ public void should_get_new_node_from_peers() {
272298 @ Test
273299 public void should_get_new_node_from_peers_v2 () {
274300 // Given
275- AdminRow peer3 = mockPeersV2Row (3 , UUID .randomUUID ());
276- AdminRow peer2 = mockPeersV2Row (2 , node2 .getHostId ());
277- AdminRow peer1 = mockPeersV2Row (1 , node1 .getHostId ());
301+ AdminRow peer3 = mockPeersV2Row (4 , UUID .randomUUID ());
302+ AdminRow peer2 = mockPeersV2Row (3 , node2 .getHostId ());
303+ AdminRow peer1 = mockPeersV2Row (2 , node1 .getHostId ());
278304 topologyMonitor .isSchemaV2 = true ;
279305 topologyMonitor .stubQueries (
280306 new StubbedQuery ("SELECT * FROM system.peers_v2" , mockResult (peer3 , peer2 , peer1 )));
281307
282308 // When
283- CompletionStage <Optional <NodeInfo >> futureInfo = topologyMonitor .getNewNodeInfo (ADDRESS1 );
309+ CompletionStage <Optional <NodeInfo >> futureInfo = topologyMonitor .getNewNodeInfo (ADDRESS2 );
284310
285311 // Then
286312 assertThatStage (futureInfo )
287313 .isSuccess (
288314 maybeInfo -> {
289315 assertThat (maybeInfo .isPresent ()).isTrue ();
290316 NodeInfo info = maybeInfo .get ();
291- assertThat (info .getDatacenter ()).isEqualTo ("dc1 " );
317+ assertThat (info .getDatacenter ()).isEqualTo ("dc2 " );
292318 });
293319 // The natove in each row should have been tried, only the last row should have been
294320 // converted
@@ -358,6 +384,10 @@ public void should_skip_invalid_peers_row(String columnToCheck) {
358384 // Then
359385 assertThatStage (futureInfo ).isSuccess (maybeInfo -> assertThat (maybeInfo ).isEmpty ());
360386 assertThat (node2 .broadcastAddress ).isNotNull ().isEqualTo (ADDRESS2 );
387+ assertLog (
388+ Level .WARN ,
389+ "[null] Found invalid row in system.peers for peer: /127.0.0.2. "
390+ + "This is likely a gossip or snitch issue, this node will be ignored." );
361391 }
362392
363393 @ Test
@@ -390,6 +420,10 @@ public void should_skip_invalid_peers_row_v2(String columnToCheck) {
390420 // Then
391421 assertThatStage (futureInfo ).isSuccess (maybeInfo -> assertThat (maybeInfo ).isEmpty ());
392422 assertThat (node2 .broadcastAddress ).isNotNull ().isEqualTo (ADDRESS2 );
423+ assertLog (
424+ Level .WARN ,
425+ "[null] Found invalid row in system.peers_v2 for peer: /127.0.0.2. "
426+ + "This is likely a gossip or snitch issue, this node will be ignored." );
393427 }
394428
395429 @ DataProvider
@@ -415,6 +449,67 @@ public void should_stop_executing_queries_once_closed() {
415449 .isFailed (error -> assertThat (error ).isInstanceOf (IllegalStateException .class ));
416450 }
417451
452+ @ Test
453+ public void should_warn_when_control_host_found_in_system_peers () {
454+ // Given
455+ AdminRow local = mockLocalRow (1 , node1 .getHostId ());
456+ AdminRow peer3 = mockPeersRow (3 , UUID .randomUUID ());
457+ AdminRow peer2 = mockPeersRow (2 , node2 .getHostId ());
458+ AdminRow peer1 = mockPeersRow (1 , node2 .getHostId ()); // invalid
459+ topologyMonitor .stubQueries (
460+ new StubbedQuery ("SELECT * FROM system.local" , mockResult (local )),
461+ new StubbedQuery ("SELECT * FROM system.peers_v2" , Collections .emptyMap (), null , true ),
462+ new StubbedQuery ("SELECT * FROM system.peers" , mockResult (peer3 , peer2 , peer1 )));
463+
464+ // When
465+ CompletionStage <Iterable <NodeInfo >> futureInfos = topologyMonitor .refreshNodeList ();
466+
467+ // Then
468+ assertThatStage (futureInfos )
469+ .isSuccess (
470+ infos ->
471+ assertThat (infos )
472+ .hasSize (3 )
473+ .extractingResultOf ("getEndPoint" )
474+ .containsOnlyOnce (node1 .getEndPoint ()));
475+ assertLog (
476+ Level .WARN ,
477+ "[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers: "
478+ + "this entry will be ignored. This is likely due to a misconfiguration; "
479+ + "please verify your rpc_address configuration in cassandra.yaml on "
480+ + "all nodes in your cluster." );
481+ }
482+
483+ @ Test
484+ public void should_warn_when_control_host_found_in_system_peers_v2 () {
485+ // Given
486+ AdminRow local = mockLocalRow (1 , node1 .getHostId ());
487+ AdminRow peer3 = mockPeersRow (3 , UUID .randomUUID ());
488+ AdminRow peer2 = mockPeersRow (2 , node2 .getHostId ());
489+ AdminRow peer1 = mockPeersRow (1 , node2 .getHostId ()); // invalid
490+ topologyMonitor .stubQueries (
491+ new StubbedQuery ("SELECT * FROM system.local" , mockResult (local )),
492+ new StubbedQuery ("SELECT * FROM system.peers_v2" , mockResult (peer3 , peer2 , peer1 )));
493+
494+ // When
495+ CompletionStage <Iterable <NodeInfo >> futureInfos = topologyMonitor .refreshNodeList ();
496+
497+ // Then
498+ assertThatStage (futureInfos )
499+ .isSuccess (
500+ infos ->
501+ assertThat (infos )
502+ .hasSize (3 )
503+ .extractingResultOf ("getEndPoint" )
504+ .containsOnlyOnce (node1 .getEndPoint ()));
505+ assertLog (
506+ Level .WARN ,
507+ "[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers_v2: "
508+ + "this entry will be ignored. This is likely due to a misconfiguration; "
509+ + "please verify your rpc_address configuration in cassandra.yaml on "
510+ + "all nodes in your cluster." );
511+ }
512+
418513 /** Mocks the query execution logic. */
419514 private static class TestTopologyMonitor extends DefaultTopologyMonitor {
420515
@@ -539,4 +634,12 @@ private AdminResult mockResult(AdminRow... rows) {
539634 when (result .iterator ()).thenReturn (Iterators .forArray (rows ));
540635 return result ;
541636 }
637+
638+ private void assertLog (Level level , String message ) {
639+ verify (appender , atLeast (1 )).doAppend (loggingEventCaptor .capture ());
640+ Iterable <ILoggingEvent > logs =
641+ filter (loggingEventCaptor .getAllValues ()).with ("level" , level ).get ();
642+ assertThat (logs ).hasSize (1 );
643+ assertThat (logs .iterator ().next ().getFormattedMessage ()).contains (message );
644+ }
542645}
0 commit comments