Skip to content

Commit 31f9c20

Browse files
author
Alexandre Dutra
committed
JAVA-1346: Reset heartbeat only on client reads (not writes).
Contributions by Andrew Tolbert (@tolbertam): Refactor HeartbeatTest to use Scassandra instead of a C* cluster.
1 parent e089952 commit 31f9c20

3 files changed

Lines changed: 153 additions & 15 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- [bug] JAVA-1330: Add un/register for SchemaChangeListener in DelegatingCluster
66
- [bug] JAVA-1351: Include Custom Payload in Request.copy.
7+
- [bug] JAVA-1346: Reset heartbeat only on client reads (not writes).
78

89

910
### 3.0.5

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import java.util.concurrent.atomic.AtomicReference;
5151

5252
import static com.datastax.driver.core.Message.Response.Type.ERROR;
53-
import static io.netty.handler.timeout.IdleState.ALL_IDLE;
53+
import static io.netty.handler.timeout.IdleState.READER_IDLE;
5454

5555
// For LoggingHandler
5656
//import org.jboss.netty.handler.logging.LoggingHandler;
@@ -1077,7 +1077,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Message.Response response
10771077

10781078
@Override
10791079
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1080-
if (!isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == ALL_IDLE) {
1080+
if (!isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
10811081
logger.debug("{} was inactive for {} seconds, sending heartbeat", Connection.this, factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds());
10821082
write(HEARTBEAT_CALLBACK);
10831083
}
@@ -1406,7 +1406,7 @@ private static class Initializer extends ChannelInitializer<SocketChannel> {
14061406
this.sslOptions = sslOptions;
14071407
this.nettyOptions = nettyOptions;
14081408
this.codecRegistry = codecRegistry;
1409-
this.idleStateHandler = new IdleStateHandler(0, 0, heartBeatIntervalSeconds);
1409+
this.idleStateHandler = new IdleStateHandler(heartBeatIntervalSeconds, 0, 0);
14101410
}
14111411

14121412
@Override

driver-core/src/test/java/com/datastax/driver/core/HeartbeatTest.java

Lines changed: 149 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,26 @@
1515
*/
1616
package com.datastax.driver.core;
1717

18+
import com.google.common.util.concurrent.Uninterruptibles;
1819
import org.apache.log4j.Level;
1920
import org.apache.log4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022
import org.testng.annotations.AfterMethod;
2123
import org.testng.annotations.BeforeMethod;
2224
import org.testng.annotations.Test;
2325

26+
import java.util.concurrent.TimeUnit;
27+
import java.util.regex.Pattern;
28+
2429
import static com.datastax.driver.core.Assertions.assertThat;
2530
import static java.util.concurrent.TimeUnit.SECONDS;
31+
import static org.assertj.core.api.Assertions.fail;
32+
import static org.scassandra.http.client.PrimingRequest.queryBuilder;
33+
import static org.scassandra.http.client.PrimingRequest.then;
2634

27-
@CCMConfig(createCluster = false)
28-
public class HeartbeatTest extends CCMTestsSupport {
35+
public class HeartbeatTest extends ScassandraTestBase {
2936

37+
static org.slf4j.Logger logger = LoggerFactory.getLogger(HeartbeatTest.class);
3038
Logger connectionLogger = Logger.getLogger(Connection.class);
3139
MemoryAppender logs;
3240
Level originalLevel;
@@ -56,12 +64,12 @@ public void stopCapturingLogs() {
5664
*/
5765
@Test(groups = "long")
5866
public void should_send_heartbeat_when_connection_is_inactive() throws InterruptedException {
59-
Cluster cluster = register(Cluster.builder()
60-
.addContactPoints(getContactPoints().get(0))
61-
.withPort(ccm().getBinaryPort())
62-
.withPoolingOptions(new PoolingOptions()
63-
.setHeartbeatIntervalSeconds(3))
64-
.build());
67+
Cluster cluster = Cluster.builder()
68+
.addContactPoints(hostAddress.getAddress())
69+
.withPort(scassandra.getBinaryPort())
70+
.withProtocolVersion(ProtocolVersion.V2)
71+
.withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(3))
72+
.build();
6573

6674
try {
6775
// Don't create any session, only the control connection will be established
@@ -103,6 +111,134 @@ public void should_send_heartbeat_when_connection_is_inactive() throws Interrupt
103111
}
104112
}
105113

114+
/**
115+
* Verifies that there exists a line in logs that matches pattern.
116+
*
117+
* @param logs Captured log entries.
118+
* @param pattern Pattern to match on individual lines.
119+
* @return
120+
*/
121+
private void assertLineMatches(String logs, Pattern pattern) {
122+
String lines[] = logs.split("\\r?\\n");
123+
for (String line : lines) {
124+
if (pattern.matcher(line).matches()) {
125+
return;
126+
}
127+
}
128+
fail("Expecting: [" + logs + "] to contain " + pattern);
129+
}
130+
131+
/**
132+
* Verifies that no line in logs matches pattern.
133+
*
134+
* @param logs Captured log entries.
135+
* @param pattern Pattern to match on individual lines.
136+
*/
137+
private void assertNoLineMatches(String logs, Pattern pattern) {
138+
String lines[] = logs.split("\\r?\\n");
139+
for (String line : lines) {
140+
if (pattern.matcher(line).matches()) {
141+
fail("Expecting: [" + logs + "] not to contain " + pattern);
142+
}
143+
}
144+
}
145+
146+
/**
147+
* Ensures that a heartbeat message is sent after the configured heartbeat interval of idle time when no data is
148+
* received on a connection even though are successful writes on the socket.
149+
*
150+
* @test_category connection:heartbeat
151+
* @expected_result heartbeat is sent after heartbeat interval (3) seconds of idle time.
152+
* @jira_ticket JAVA-1346
153+
* @since 3.0.6, 3.1.3
154+
*/
155+
@Test(groups = "long")
156+
public void should_send_heartbeat_when_requests_being_written_but_nothing_received() throws Exception {
157+
Cluster cluster = Cluster.builder()
158+
.addContactPoints(hostAddress.getAddress())
159+
.withPort(scassandra.getBinaryPort())
160+
.withProtocolVersion(ProtocolVersion.V2)
161+
.withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(3).setConnectionsPerHost(HostDistance.LOCAL, 1, 1))
162+
.build();
163+
164+
// Prime 'ping' to never return a response this is a way to create outgoing traffic
165+
// without receiving anything inbound.
166+
scassandra.primingClient()
167+
.prime(queryBuilder().withQuery("ping").withThen(then().withFixedDelay(8675309999L)));
168+
169+
// Thread that will submit queries that get no response repeatedly.
170+
Thread submitter = null;
171+
try {
172+
// Don't create any session, only the control connection will be established
173+
cluster.init();
174+
175+
// Find the connection in the connection pool.
176+
SessionManager session = (SessionManager) cluster.connect();
177+
Host host = TestUtils.findHost(cluster, 1);
178+
Connection connection = session.pools.get(host).connections.get(0);
179+
180+
// Extract connection name from toString implementation.
181+
String connectionName = connection.toString()
182+
.replaceAll("\\-", "\\\\-") // Replace - with \- so its properly escaped as a regex.
183+
.replaceAll("Connection\\[\\/", "") // Replace first part of toString (Connection[
184+
.replaceAll("\\, inFlight.*", ""); // Replace everything after ',inFlight'
185+
186+
// Define patterns that check for whether or not heartbeats are sent / received on a given connection.
187+
Pattern heartbeatSentPattern = Pattern.compile(".*" + connectionName + ".*sending heartbeat");
188+
Pattern heartbeatReceivedPattern = Pattern.compile(".*" + connectionName + ".*heartbeat query succeeded");
189+
logger.debug("Heartbeat pattern is {}", heartbeatSentPattern);
190+
191+
// Start query submission thread.
192+
submitter = new Thread(new QuerySubmitter(session));
193+
submitter.start();
194+
195+
for (int i = 0; i < 5; i++) {
196+
session.execute("bar");
197+
SECONDS.sleep(1);
198+
}
199+
200+
// Should be no heartbeats sent on pooled connection since we had successful requests.
201+
String log = logs.getNext();
202+
assertNoLineMatches(log, heartbeatSentPattern);
203+
204+
int inFlight = connection.inFlight.get();
205+
assertThat(inFlight).isGreaterThan(0);
206+
207+
// Ensure heartbeat is sent after no received data, even though we have inflight requests (JAVA-1346).
208+
SECONDS.sleep(4);
209+
// Verify more requests were sent over this time period.
210+
assertThat(connection.inFlight.get()).isGreaterThan(inFlight);
211+
log = logs.getNext();
212+
// Heartbeat should have been sent and received.
213+
assertLineMatches(log, heartbeatSentPattern);
214+
assertLineMatches(log, heartbeatReceivedPattern);
215+
} finally {
216+
// interrupt thread so it stops submitting queries.
217+
if (submitter != null) {
218+
submitter.interrupt();
219+
}
220+
cluster.close();
221+
}
222+
}
223+
224+
private static class QuerySubmitter implements Runnable {
225+
226+
private final Session session;
227+
228+
QuerySubmitter(Session session) {
229+
this.session = session;
230+
}
231+
232+
@Override
233+
public void run() {
234+
while (!Thread.currentThread().isInterrupted()) {
235+
logger.debug("Sending ping, for which we expect no response");
236+
session.executeAsync("ping");
237+
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
238+
}
239+
}
240+
}
241+
106242
/**
107243
* Ensures that a heartbeat message is not sent if the configured heartbeat interval is 0.
108244
* <p/>
@@ -116,12 +252,13 @@ public void should_send_heartbeat_when_connection_is_inactive() throws Interrupt
116252
*/
117253
@Test(groups = "long")
118254
public void should_not_send_heartbeat_when_disabled() throws InterruptedException {
119-
Cluster cluster = register(Cluster.builder()
120-
.addContactPoints(getContactPoints().get(0))
121-
.withPort(ccm().getBinaryPort())
255+
Cluster cluster = Cluster.builder()
256+
.addContactPoints(hostAddress.getAddress())
257+
.withPort(scassandra.getBinaryPort())
122258
.withPoolingOptions(new PoolingOptions()
123259
.setHeartbeatIntervalSeconds(0))
124-
.build());
260+
.withProtocolVersion(ProtocolVersion.V2)
261+
.build();
125262

126263
try {
127264
// Don't create any session, only the control connection will be established

0 commit comments

Comments
 (0)