2525import java .util .concurrent .TimeUnit ;
2626
2727import static org .testng .Assert .assertEquals ;
28+ import static java .util .concurrent .TimeUnit .MINUTES ;
29+
30+ import com .google .common .collect .Lists ;
31+ import org .testng .annotations .Test ;
32+
33+ import static org .assertj .core .api .Assertions .fail ;
34+
35+ import com .datastax .driver .core .exceptions .InvalidQueryException ;
36+ import com .datastax .driver .core .utils .SocketChannelMonitor ;
37+
38+ import static com .datastax .driver .core .Assertions .assertThat ;
2839
2940
3041public class SessionLeakTest {
3142
43+ Cluster cluster ;
44+ List <InetSocketAddress > nodes = Lists .newArrayList (
45+ new InetSocketAddress (CCMBridge .IP_PREFIX + '1' , 9042 ),
46+ new InetSocketAddress (CCMBridge .IP_PREFIX + '2' , 9042 ));
47+ SocketChannelMonitor channelMonitor ;
3248
3349 @ Test (groups = "short" )
3450 public void connectionLeakTest () throws Exception {
3551 // Checking for JAVA-342
3652 CCMBridge ccmBridge ;
3753 ccmBridge = CCMBridge .create ("test" , 1 );
38- Cluster cluster = null ;
39- SocketChannelMonitor channelMonitor = new SocketChannelMonitor ();
54+ channelMonitor = new SocketChannelMonitor ();
4055 channelMonitor .reportAtFixedInterval (1 , TimeUnit .SECONDS );
41-
42- List <InetSocketAddress > nodes = Lists .newArrayList (
43- new InetSocketAddress (CCMBridge .IP_PREFIX + '1' , 9042 ),
44- new InetSocketAddress (CCMBridge .IP_PREFIX + '2' , 9042 ));
4556 try {
46- // create a new cluster object and ensure 0 sessions and connections
4757 cluster = Cluster .builder ()
4858 .addContactPointsWithPorts (Collections .singletonList (
4959 new InetSocketAddress (CCMBridge .IP_PREFIX + '1' , 9042 )))
5060 .withNettyOptions (channelMonitor .nettyOptions ()).build ();
51- cluster .init ();
5261
53- int corePoolSize = cluster .getConfiguration ()
54- .getPoolingOptions ()
55- .getCoreConnectionsPerHost (HostDistance .LOCAL );
62+ cluster .init ();
5663
57- assertEquals (cluster .manager .sessions .size (), 0 );
64+ assertThat (cluster .manager .sessions .size ()). isEqualTo ( 0 );
5865 // Should be 1 control connection after initialization.
59- assertEquals ((int ) cluster .getMetrics ().getOpenConnections ().getValue (), 1 );
60- assertEquals (channelMonitor .openChannels (nodes ).size (), 1 );
66+ assertOpenConnections (1 );
6167
6268 // ensure sessions.size() returns with 1 control connection + core pool size.
69+ int corePoolSize = TestUtils .numberOfLocalCoreConnections (cluster );
6370 Session session = cluster .connect ();
64- assertEquals ( cluster . manager . sessions . size (), 1 );
65- assertEquals (( int ) cluster .getMetrics (). getOpenConnections (). getValue (), 1 + corePoolSize );
66- assertEquals ( channelMonitor . openChannels ( nodes ). size (), 1 + corePoolSize );
71+
72+ assertThat ( cluster .manager . sessions . size ()). isEqualTo ( 1 );
73+ assertOpenConnections ( 1 + corePoolSize );
6774
6875 // ensure sessions.size() returns to 0 with only 1 active connection (the control connection)
6976 session .close ();
70- assertEquals (cluster .manager .sessions .size (), 0 );
71- assertEquals ((int ) cluster .getMetrics ().getOpenConnections ().getValue (), 1 );
72- assertEquals (channelMonitor .openChannels (nodes ).size (), 1 );
77+ assertThat (cluster .manager .sessions .size ()).isEqualTo (0 );
78+ assertOpenConnections (1 );
7379
7480 // ensure bootstrapping a node does not create additional connections
7581 ccmBridge .bootstrapNode (2 );
76- assertEquals (cluster .manager .sessions .size (), 0 );
77- assertEquals ((int ) cluster .getMetrics ().getOpenConnections ().getValue (), 1 );
78- assertEquals (channelMonitor .openChannels (nodes ).size (), 1 );
82+ assertThat (cluster ).host (2 ).comesUpWithin (2 , MINUTES );
83+
84+ assertThat (cluster .manager .sessions .size ()).isEqualTo (0 );
85+ assertOpenConnections (1 );
7986
8087 // ensure a new session gets registered and core connections are established
8188 // there should be corePoolSize more connections to accommodate for the new host.
8289 Session thisSession = cluster .connect ();
83- assertEquals (cluster .manager .sessions .size (), 1 );
84-
85- assertEquals ((int ) cluster .getMetrics ().getOpenConnections ().getValue (), 1 + (corePoolSize * 2 ));
86- assertEquals (channelMonitor .openChannels (nodes ).size (), 1 + (corePoolSize * 2 ));
90+ assertThat (cluster .manager .sessions .size ()).isEqualTo (1 );
91+ assertOpenConnections (1 + (corePoolSize * 2 ));
8792
8893 // ensure bootstrapping a node does not create additional connections that won't get cleaned up
8994 thisSession .close ();
9095
91- assertEquals (cluster .manager .sessions .size (), 0 );
92- assertEquals ((int ) cluster .getMetrics ().getOpenConnections ().getValue (), 1 );
93- assertEquals (channelMonitor .openChannels (nodes ).size (), 1 );
96+ assertThat (cluster .manager .sessions .size ()).isEqualTo (0 );
97+ assertOpenConnections (1 );
98+ } finally {
99+ if (cluster != null ){
100+ cluster .close ();
101+ }
102+ if (ccmBridge != null ) {
103+ ccmBridge .remove ();
104+ }
105+ // Ensure no channels remain open.
106+ channelMonitor .stop ();
107+ channelMonitor .report ();
108+ assertThat (channelMonitor .openChannels (nodes ).size ()).isEqualTo (0 );
109+ }
110+ }
111+
112+ @ Test (groups = "short" )
113+ public void should_not_leak_session_when_wrong_keyspace () throws Exception {
114+ // Checking for JAVA-806
115+ CCMBridge ccmBridge ;
116+ ccmBridge = CCMBridge .create ("test" , 1 );
117+ channelMonitor = new SocketChannelMonitor ();
118+ channelMonitor .reportAtFixedInterval (1 , TimeUnit .SECONDS );
119+ try {
120+ cluster = Cluster .builder ()
121+ .addContactPointsWithPorts (Collections .singletonList (
122+ new InetSocketAddress (CCMBridge .IP_PREFIX + '1' , 9042 )))
123+ .withNettyOptions (channelMonitor .nettyOptions ()).build ();
124+
125+ cluster .init ();
126+
127+ assertThat (cluster .manager .sessions .size ()).isEqualTo (0 );
128+ // Should be 1 control connection after initialization.
129+ assertOpenConnections (1 );
130+
131+ // ensure sessions.size() returns with 1 control connection + core pool size.
132+ int corePoolSize = TestUtils .numberOfLocalCoreConnections (cluster );
133+ cluster .connect ("wrong_keyspace" );
134+
135+ fail ("Should not have connected to a wrong keyspace" );
136+
137+ } catch (InvalidQueryException e ) {
138+
139+ // ok
140+
94141 } finally {
142+
143+ assertThat (cluster .manager .sessions .size ()).isEqualTo (0 );
144+
95145 if (cluster != null ){
96146 cluster .close ();
97147 }
@@ -101,7 +151,12 @@ public void connectionLeakTest() throws Exception {
101151 // Ensure no channels remain open.
102152 channelMonitor .stop ();
103153 channelMonitor .report ();
104- assertEquals (channelMonitor .openChannels (nodes ).size (), 0 );
154+ assertThat (channelMonitor .openChannels (nodes ).size ()). isEqualTo ( 0 );
105155 }
106156 }
157+
158+ private void assertOpenConnections (int expected ) {
159+ assertThat ((cluster .getMetrics ().getOpenConnections ().getValue ())).isEqualTo (expected );
160+ assertThat (channelMonitor .openChannels (nodes ).size ()).isEqualTo (expected );
161+ }
107162}
0 commit comments