1515 */
1616package com .datastax .driver .core ;
1717
18+ import com .google .common .util .concurrent .Uninterruptibles ;
1819import org .apache .log4j .Level ;
1920import org .apache .log4j .Logger ;
21+ import org .slf4j .LoggerFactory ;
2022import org .testng .annotations .AfterMethod ;
2123import org .testng .annotations .BeforeMethod ;
2224import org .testng .annotations .Test ;
2325
26+ import java .util .concurrent .TimeUnit ;
27+ import java .util .regex .Pattern ;
28+
2429import static com .datastax .driver .core .Assertions .assertThat ;
2530import static java .util .concurrent .TimeUnit .SECONDS ;
31+ import static org .assertj .core .api .Assertions .fail ;
32+ import static org .scassandra .http .client .PrimingRequest .queryBuilder ;
33+ import static org .scassandra .http .client .PrimingRequest .then ;
2634
27- @ CCMConfig (createCluster = false )
28- public class HeartbeatTest extends CCMTestsSupport {
35+ public class HeartbeatTest extends ScassandraTestBase {
2936
37+ static org .slf4j .Logger logger = LoggerFactory .getLogger (HeartbeatTest .class );
3038 Logger connectionLogger = Logger .getLogger (Connection .class );
3139 MemoryAppender logs ;
3240 Level originalLevel ;
@@ -56,12 +64,12 @@ public void stopCapturingLogs() {
5664 */
5765 @ Test (groups = "long" )
5866 public void should_send_heartbeat_when_connection_is_inactive () throws InterruptedException {
59- Cluster cluster = register ( Cluster .builder ()
60- .addContactPoints (getContactPoints (). get ( 0 ))
61- .withPort (ccm () .getBinaryPort ())
62- .withPoolingOptions ( new PoolingOptions ( )
63- .setHeartbeatIntervalSeconds (3 ))
64- .build ()) ;
67+ Cluster cluster = Cluster .builder ()
68+ .addContactPoints (hostAddress . getAddress ( ))
69+ .withPort (scassandra .getBinaryPort ())
70+ .withProtocolVersion ( ProtocolVersion . V2 )
71+ . withPoolingOptions ( new PoolingOptions () .setHeartbeatIntervalSeconds (3 ))
72+ .build ();
6573
6674 try {
6775 // Don't create any session, only the control connection will be established
@@ -103,6 +111,134 @@ public void should_send_heartbeat_when_connection_is_inactive() throws Interrupt
103111 }
104112 }
105113
114+ /**
115+ * Verifies that there exists a line in logs that matches pattern.
116+ *
117+ * @param logs Captured log entries.
118+ * @param pattern Pattern to match on individual lines.
119+ * @return
120+ */
121+ private void assertLineMatches (String logs , Pattern pattern ) {
122+ String lines [] = logs .split ("\\ r?\\ n" );
123+ for (String line : lines ) {
124+ if (pattern .matcher (line ).matches ()) {
125+ return ;
126+ }
127+ }
128+ fail ("Expecting: [" + logs + "] to contain " + pattern );
129+ }
130+
131+ /**
132+ * Verifies that no line in logs matches pattern.
133+ *
134+ * @param logs Captured log entries.
135+ * @param pattern Pattern to match on individual lines.
136+ */
137+ private void assertNoLineMatches (String logs , Pattern pattern ) {
138+ String lines [] = logs .split ("\\ r?\\ n" );
139+ for (String line : lines ) {
140+ if (pattern .matcher (line ).matches ()) {
141+ fail ("Expecting: [" + logs + "] not to contain " + pattern );
142+ }
143+ }
144+ }
145+
146+ /**
147+ * Ensures that a heartbeat message is sent after the configured heartbeat interval of idle time when no data is
148+ * received on a connection even though are successful writes on the socket.
149+ *
150+ * @test_category connection:heartbeat
151+ * @expected_result heartbeat is sent after heartbeat interval (3) seconds of idle time.
152+ * @jira_ticket JAVA-1346
153+ * @since 3.0.6, 3.1.3
154+ */
155+ @ Test (groups = "long" )
156+ public void should_send_heartbeat_when_requests_being_written_but_nothing_received () throws Exception {
157+ Cluster cluster = Cluster .builder ()
158+ .addContactPoints (hostAddress .getAddress ())
159+ .withPort (scassandra .getBinaryPort ())
160+ .withProtocolVersion (ProtocolVersion .V2 )
161+ .withPoolingOptions (new PoolingOptions ().setHeartbeatIntervalSeconds (3 ).setConnectionsPerHost (HostDistance .LOCAL , 1 , 1 ))
162+ .build ();
163+
164+ // Prime 'ping' to never return a response this is a way to create outgoing traffic
165+ // without receiving anything inbound.
166+ scassandra .primingClient ()
167+ .prime (queryBuilder ().withQuery ("ping" ).withThen (then ().withFixedDelay (8675309999L )));
168+
169+ // Thread that will submit queries that get no response repeatedly.
170+ Thread submitter = null ;
171+ try {
172+ // Don't create any session, only the control connection will be established
173+ cluster .init ();
174+
175+ // Find the connection in the connection pool.
176+ SessionManager session = (SessionManager ) cluster .connect ();
177+ Host host = TestUtils .findHost (cluster , 1 );
178+ Connection connection = session .pools .get (host ).connections .get (0 );
179+
180+ // Extract connection name from toString implementation.
181+ String connectionName = connection .toString ()
182+ .replaceAll ("\\ -" , "\\ \\ -" ) // Replace - with \- so its properly escaped as a regex.
183+ .replaceAll ("Connection\\ [\\ /" , "" ) // Replace first part of toString (Connection[
184+ .replaceAll ("\\ , inFlight.*" , "" ); // Replace everything after ',inFlight'
185+
186+ // Define patterns that check for whether or not heartbeats are sent / received on a given connection.
187+ Pattern heartbeatSentPattern = Pattern .compile (".*" + connectionName + ".*sending heartbeat" );
188+ Pattern heartbeatReceivedPattern = Pattern .compile (".*" + connectionName + ".*heartbeat query succeeded" );
189+ logger .debug ("Heartbeat pattern is {}" , heartbeatSentPattern );
190+
191+ // Start query submission thread.
192+ submitter = new Thread (new QuerySubmitter (session ));
193+ submitter .start ();
194+
195+ for (int i = 0 ; i < 5 ; i ++) {
196+ session .execute ("bar" );
197+ SECONDS .sleep (1 );
198+ }
199+
200+ // Should be no heartbeats sent on pooled connection since we had successful requests.
201+ String log = logs .getNext ();
202+ assertNoLineMatches (log , heartbeatSentPattern );
203+
204+ int inFlight = connection .inFlight .get ();
205+ assertThat (inFlight ).isGreaterThan (0 );
206+
207+ // Ensure heartbeat is sent after no received data, even though we have inflight requests (JAVA-1346).
208+ SECONDS .sleep (4 );
209+ // Verify more requests were sent over this time period.
210+ assertThat (connection .inFlight .get ()).isGreaterThan (inFlight );
211+ log = logs .getNext ();
212+ // Heartbeat should have been sent and received.
213+ assertLineMatches (log , heartbeatSentPattern );
214+ assertLineMatches (log , heartbeatReceivedPattern );
215+ } finally {
216+ // interrupt thread so it stops submitting queries.
217+ if (submitter != null ) {
218+ submitter .interrupt ();
219+ }
220+ cluster .close ();
221+ }
222+ }
223+
224+ private static class QuerySubmitter implements Runnable {
225+
226+ private final Session session ;
227+
228+ QuerySubmitter (Session session ) {
229+ this .session = session ;
230+ }
231+
232+ @ Override
233+ public void run () {
234+ while (!Thread .currentThread ().isInterrupted ()) {
235+ logger .debug ("Sending ping, for which we expect no response" );
236+ session .executeAsync ("ping" );
237+ Uninterruptibles .sleepUninterruptibly (1 , TimeUnit .SECONDS );
238+ }
239+ }
240+ }
241+
106242 /**
107243 * Ensures that a heartbeat message is not sent if the configured heartbeat interval is 0.
108244 * <p/>
@@ -116,12 +252,13 @@ public void should_send_heartbeat_when_connection_is_inactive() throws Interrupt
116252 */
117253 @ Test (groups = "long" )
118254 public void should_not_send_heartbeat_when_disabled () throws InterruptedException {
119- Cluster cluster = register ( Cluster .builder ()
120- .addContactPoints (getContactPoints (). get ( 0 ))
121- .withPort (ccm () .getBinaryPort ())
255+ Cluster cluster = Cluster .builder ()
256+ .addContactPoints (hostAddress . getAddress ( ))
257+ .withPort (scassandra .getBinaryPort ())
122258 .withPoolingOptions (new PoolingOptions ()
123259 .setHeartbeatIntervalSeconds (0 ))
124- .build ());
260+ .withProtocolVersion (ProtocolVersion .V2 )
261+ .build ();
125262
126263 try {
127264 // Don't create any session, only the control connection will be established
0 commit comments