1717
1818import java .net .InetAddress ;
1919import java .net .InetSocketAddress ;
20+ import java .net .UnknownHostException ;
2021import java .util .Collections ;
2122import java .util .List ;
23+ import java .util .Map ;
2224import java .util .concurrent .TimeUnit ;
2325import java .util .concurrent .TimeoutException ;
2426
27+ import com .google .common .collect .ImmutableMap ;
28+ import com .google .common .collect .ImmutableSet ;
2529import com .google .common .collect .Lists ;
30+ import org .scassandra .Scassandra ;
31+ import org .scassandra .http .client .PrimingClient ;
32+ import org .scassandra .http .client .PrimingRequest ;
2633import org .slf4j .Logger ;
2734import org .slf4j .LoggerFactory ;
2835import org .testng .annotations .Test ;
2936
30- import static org .assertj .core .api .Assertions .fail ;
37+ import static org .mockito .Mockito .atLeast ;
38+ import static org .mockito .Mockito .atMost ;
3139import static org .mockito .Mockito .spy ;
32- import static org .mockito .Mockito .times ;
3340import static org .mockito .Mockito .verify ;
3441
3542import com .datastax .driver .core .exceptions .NoHostAvailableException ;
3643import com .datastax .driver .core .policies .ConstantReconnectionPolicy ;
37- import com .datastax .driver .core .querybuilder .Insert ;
38- import com .datastax .driver .core .utils .UUIDs ;
3944
45+ import static com .datastax .driver .core .Assertions .*;
4046import static com .datastax .driver .core .FakeHost .Behavior .THROWING_CONNECT_TIMEOUTS ;
41- import static com .datastax .driver .core .querybuilder .QueryBuilder .insertInto ;
47+ import static com .datastax .driver .core .Host .State .DOWN ;
48+ import static com .datastax .driver .core .Host .State .UP ;
49+ import static com .datastax .driver .core .HostDistance .LOCAL ;
4250
4351public class ClusterInitTest {
4452 private static final Logger logger = LoggerFactory .getLogger (ClusterInitTest .class );
@@ -47,44 +55,58 @@ public class ClusterInitTest {
4755 * Test for JAVA-522: when the cluster and session initialize, if some contact points are behaving badly and
4856 * causing timeouts, we want to ensure that the driver does not wait multiple times on the same host.
4957 */
50-
5158 @ Test (groups = "short" )
52- public void should_wait_for_each_contact_point_at_most_once () {
53- CCMBridge ccm = null ;
59+ public void should_handle_failing_or_missing_contact_points () throws UnknownHostException {
5460 Cluster cluster = null ;
55- List <FakeHost > fakeHosts = Lists .newArrayList ();
61+ Scassandra scassandra = null ;
62+ List <FakeHost > failingHosts = Lists .newArrayList ();
5663 try {
57- // Obtaining connect timeouts is not trivial: we create a 6-host cluster but only start one of them,
58- // then simulate the other 5.
59- ccm = CCMBridge .builder ("test" ).withNodes (6 ).notStarted ().build ();
60- ccm .start (1 );
61-
62- for (int i = 0 ; i < 5 ; i ++) {
63- FakeHost fakeHost = new FakeHost (CCMBridge .ipOfNode (i + 2 ), 9042 , THROWING_CONNECT_TIMEOUTS );
64- fakeHosts .add (fakeHost );
65- fakeHost .start ();
64+ // Simulate a cluster of 5 hosts.
65+
66+ // - 1 is an actual Scassandra instance that will accept connections:
67+ scassandra = TestUtils .createScassandraServer ();
68+ scassandra .start ();
69+ String realHostAddress = "localhost" ;
70+ int port = scassandra .getBinaryPort ();
71+
72+ // - the remaining 4 are fake servers that will throw connect timeouts:
73+ for (int i = 2 ; i <= 5 ; i ++) {
74+ FakeHost failingHost = new FakeHost (CCMBridge .ipOfNode (i ), port , THROWING_CONNECT_TIMEOUTS );
75+ failingHosts .add (failingHost );
76+ failingHost .start ();
6677 }
6778
68- // Our real instance has no rows in its system.peers table. That would cause the driver to ignore our fake
69- // hosts when the control connection refreshes the host list.
70- // So we also insert fake rows in system.peers:
71- fakePeerRowsInNode1 ();
79+ // - we also have a "missing" contact point, i.e. there's no server listening at this address,
80+ // and the address is not listed in the live host's system.peers
81+ String missingHostAddress = CCMBridge .ipOfNode (6 );
82+
83+ primePeerRows (scassandra , failingHosts );
7284
7385 logger .info ("Environment is set up, starting test" );
7486 long start = System .nanoTime ();
7587
76- // We want to count how many connections were attempted. For that, we rely on the fact that SocketOptions.getKeepAlive is called in Connection.Factory.newBoostrap()
77- // each time we prepare to open a new connection. This is a bit of a hack, but this is what we have .
88+ // We want to count how many connections were attempted. For that, we rely on the fact that SocketOptions.getKeepAlive
89+ // is called in Connection.Factory.newBoostrap() each time we prepare to open a new connection.
7890 SocketOptions socketOptions = spy (new SocketOptions ());
7991
80- // Set an enormous delay so that reconnection attempts don't pollute our observations
92+ // Set an "infinite" reconnection delay so that reconnection attempts don't pollute our observations
8193 ConstantReconnectionPolicy reconnectionPolicy = new ConstantReconnectionPolicy (3600 * 1000 );
8294
83- cluster = Cluster .builder ().addContactPoints (
84- CCMBridge .ipOfNode (1 ), CCMBridge .ipOfNode (2 ), CCMBridge .ipOfNode (3 ),
85- CCMBridge .ipOfNode (4 ), CCMBridge .ipOfNode (5 ), CCMBridge .ipOfNode (6 ))
95+ // Force 1 connection per pool. Otherwise we can't distinguish a failed pool creation from multiple connection
96+ // attempts, because pools create their connections in parallel (so 1 pool failure equals multiple connection failures).
97+ PoolingOptions poolingOptions = new PoolingOptions ().setConnectionsPerHost (LOCAL , 1 , 1 );
98+
99+ cluster = Cluster .builder ()
100+ .withPort (scassandra .getBinaryPort ())
101+ .addContactPoints (
102+ realHostAddress ,
103+ failingHosts .get (0 ).address , failingHosts .get (1 ).address ,
104+ failingHosts .get (2 ).address , failingHosts .get (3 ).address ,
105+ missingHostAddress
106+ )
86107 .withSocketOptions (socketOptions )
87108 .withReconnectionPolicy (reconnectionPolicy )
109+ .withPoolingOptions (poolingOptions )
88110 .withProtocolVersion (TestUtils .getDesiredProtocolVersion ())
89111 .build ();
90112 cluster .connect ();
@@ -93,72 +115,80 @@ public void should_wait_for_each_contact_point_at_most_once() {
93115 long initTimeMs = TimeUnit .MILLISECONDS .convert (System .nanoTime () - start , TimeUnit .NANOSECONDS );
94116 logger .info ("Cluster and session initialized in {} ms" , initTimeMs );
95117
96- // We have one live host so we expect 1 control connection + core connection count successful connections.
97- // The other 5 hosts are unreachable, we should attempt to connect to each of them only once.
98- int coreConnections = cluster .getConfiguration ()
99- .getPoolingOptions ()
100- .getCoreConnectionsPerHost (HostDistance .LOCAL );
101- verify (socketOptions , times (1 + coreConnections + 5 )).getKeepAlive ();
118+ // Expect :
119+ // - 2 connections for the live host (1 control connection + 1 pooled connection)
120+ // - 1 attempt per failing host (either a control connection attempt or a failed pool creation)
121+ // - 0 or 1 for the missing host. We can't know for sure because contact points are randomized. If it's tried
122+ // before the live host there will be a connection attempt, otherwise it will be removed directly because
123+ // it's not in the live host's system.peers.
124+ verify (socketOptions , atLeast (6 )).getKeepAlive ();
125+ verify (socketOptions , atMost (7 )).getKeepAlive ();
126+
127+ assertThat (cluster ).host (realHostAddress ).hasState (UP );
128+ for (FakeHost failingHost : failingHosts )
129+ assertThat (cluster ).host (failingHost .address ).hasState (DOWN );
130+ assertThat (cluster ).host (missingHostAddress ).isNull ();
131+
102132 } finally {
103133 if (cluster != null )
104134 cluster .close ();
105- for (FakeHost fakeHost : fakeHosts )
135+ for (FakeHost fakeHost : failingHosts )
106136 fakeHost .stop ();
107- if (ccm != null )
108- ccm . remove ();
137+ if (scassandra != null )
138+ scassandra . stop ();
109139 }
110140 }
111141
112142 /**
113- * <p>
114143 * Validates that a Cluster that was never able to successfully establish connection a session can be closed
115144 * properly.
116145 *
117146 * @test_category connection
118147 * @expected_result Cluster closes within 1 second.
119148 */
120- @ Test (groups = "unit" )
149+ @ Test (groups = "unit" )
121150 public void should_be_able_to_close_cluster_that_never_successfully_connected () throws Exception {
122151 Cluster cluster = Cluster .builder ()
123- .addContactPointsWithPorts (Collections .singleton (new InetSocketAddress ("127.0.0.1" , 65534 )))
124- .build ();
152+ .addContactPointsWithPorts (Collections .singleton (new InetSocketAddress ("127.0.0.1" , 65534 )))
153+ .build ();
125154 try {
126155 cluster .connect ();
127156 fail ("Should not have been able to connect." );
128- } catch (NoHostAvailableException e ) {} // Expected.
157+ } catch (NoHostAvailableException e ) {
158+ } // Expected.
129159 CloseFuture closeFuture = cluster .closeAsync ();
130160 try {
131161 closeFuture .get (1 , TimeUnit .SECONDS );
132- } catch (TimeoutException e ) {
162+ } catch (TimeoutException e ) {
133163 fail ("Close Future did not complete quickly." );
134164 }
135165 }
136166
137- private void fakePeerRowsInNode1 () {
138- Cluster cluster = null ;
139- try {
140- cluster = Cluster .builder ().addContactPoint (CCMBridge .ipOfNode (1 )).build ();
141- Session session = cluster .connect ("system" );
142-
143- String releaseVersion = session .execute ("SELECT release_version FROM local" )
144- .one ().getString ("release_version" );
145-
146- for (int i = 2 ; i <= 6 ; i ++) {
147- Insert insertStmt = insertInto ("peers" )
148- .value ("peer" , InetAddress .getByName (CCMBridge .ipOfNode (i )))
149- .value ("data_center" , "datacenter1" )
150- .value ("host_id" , UUIDs .random ())
151- .value ("rack" , "rack1" )
152- .value ("release_version" , releaseVersion )
153- .value ("rpc_address" , InetAddress .getByName (CCMBridge .ipOfNode (i )))
154- .value ("schema_version" , UUIDs .random ());
155- session .execute (insertStmt );
156- }
157- } catch (Exception e ) {
158- fail ("Error while inserting fake peer rows" , e );
159- } finally {
160- if (cluster != null )
161- cluster .close ();
167+ private void primePeerRows (Scassandra scassandra , List <FakeHost > otherHosts ) throws UnknownHostException {
168+ PrimingClient primingClient = PrimingClient .builder ()
169+ .withHost ("localhost" ).withPort (scassandra .getAdminPort ())
170+ .build ();
171+
172+ List <Map <String , ?>> rows = Lists .newArrayListWithCapacity (5 );
173+
174+ int i = 0 ;
175+ for (FakeHost otherHost : otherHosts ) {
176+ InetAddress address = InetAddress .getByName (otherHost .address );
177+ rows .add (ImmutableMap .<String , Object >builder ()
178+ .put ("peer" , address )
179+ .put ("rpc_address" , address )
180+ .put ("data_center" , "datacenter1" )
181+ .put ("rack" , "rack1" )
182+ .put ("release_version" , "2.0.1" )
183+ .put ("tokens" , ImmutableSet .of (Long .toString (Long .MIN_VALUE + i ++)))
184+ .build ());
162185 }
186+
187+ primingClient .prime (
188+ PrimingRequest .queryBuilder ()
189+ .withQuery ("SELECT * FROM system.peers" )
190+ .withColumnTypes (SCassandraCluster .SELECT_PEERS_COLUMN_TYPES )
191+ .withRows (rows )
192+ .build ());
163193 }
164194}
0 commit comments