Skip to content

Commit ce99768

Browse files
committed
JAVA-1568: Handle Reconnection#reconnectNow/stop while the current attempt is still in progress
1 parent 15e8b27 commit ce99768

7 files changed

Lines changed: 284 additions & 49 deletions

File tree

changelog/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### 4.0.0-alpha2 (in progress)
66

7+
- [bug] JAVA-1568: Handle Reconnection#reconnectNow/stop while the current attempt is still in
8+
progress
79
- [improvement] JAVA-1585: Add GenericType#where
810
- [improvement] JAVA-1590: Properly skip deployment of integration-tests module
911
- [improvement] JAVA-1576: Expose AsyncResultSet's iterator through a currentPage() method

core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,7 @@ private void onChannelClosed(DriverChannel channel, Node node) {
333333
if (!closeWasCalled) {
334334
LOG.debug("[{}] Lost channel {}", logPrefix, channel);
335335
context.eventBus().fire(ChannelEvent.channelClosed(node));
336-
if (!reconnection.isRunning()) {
337-
reconnection.start();
338-
}
336+
reconnection.start();
339337
}
340338
}
341339

core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -340,9 +340,7 @@ private void onChannelCloseStarted(DriverChannel channel) {
340340
channels.remove(channel);
341341
closingChannels.add(channel);
342342
eventBus.fire(ChannelEvent.channelClosed(node));
343-
if (!reconnection.isRunning()) {
344-
reconnection.start();
345-
}
343+
reconnection.start();
346344
}
347345
}
348346

@@ -354,9 +352,7 @@ private void onChannelClosed(DriverChannel channel) {
354352
if (channels.remove(channel)) {
355353
LOG.debug("[{}] Lost channel {}", logPrefix, channel);
356354
eventBus.fire(ChannelEvent.channelClosed(node));
357-
if (!reconnection.isRunning()) {
358-
reconnection.start();
359-
}
355+
reconnection.start();
360356
} else {
361357
LOG.debug("[{}] Channel {} completed graceful shutdown", logPrefix, channel);
362358
closingChannels.remove(channel);
@@ -371,9 +367,7 @@ private void resize(NodeDistance newDistance) {
371367
if (newChannelCount > wantedCount) {
372368
LOG.debug("[{}] Growing ({} => {} channels)", logPrefix, wantedCount, newChannelCount);
373369
wantedCount = newChannelCount;
374-
if (!reconnection.isRunning()) {
375-
reconnection.start();
376-
}
370+
reconnection.start();
377371
} else if (newChannelCount < wantedCount) {
378372
LOG.debug("[{}] Shrinking ({} => {} channels)", logPrefix, wantedCount, newChannelCount);
379373
wantedCount = newChannelCount;
@@ -452,6 +446,8 @@ private CompletionStage<Void> setKeyspace(CqlIdentifier newKeyspaceName) {
452446

453447
private void reconnectNow() {
454448
assert adminExecutor.inEventLoop();
449+
// Don't force because if the reconnection is stopped, it means either we have enough channels
450+
// or the pool is shutting down.
455451
reconnection.reconnectNow(false);
456452
}
457453

@@ -462,7 +458,10 @@ private void close() {
462458
}
463459
isClosing = true;
464460

461+
// If an attempt was in progress right now, it might open new channels but they will be
462+
// handled in onAllConnected
465463
reconnection.stop();
464+
466465
eventBus.unregister(configListenerKey, ConfigChangeEvent.class);
467466

468467
// Close all channels, the pool future completes when all the channels futures have completed

core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Reconnection.java

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.datastax.oss.driver.internal.core.util.concurrent;
1717

1818
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
19-
import com.google.common.base.Preconditions;
2019
import io.netty.util.concurrent.EventExecutor;
2120
import io.netty.util.concurrent.Future;
2221
import io.netty.util.concurrent.ScheduledFuture;
@@ -38,14 +37,22 @@
3837
public class Reconnection {
3938
private static final Logger LOG = LoggerFactory.getLogger(Reconnection.class);
4039

40+
private enum State {
41+
STOPPED,
42+
SCHEDULED, // next attempt scheduled but not started yet
43+
ATTEMPT_IN_PROGRESS, // current attempt started and not completed yet
44+
STOP_AFTER_CURRENT, // stopped, but we're letting an in-progress attempt finish
45+
;
46+
}
47+
4148
private final String logPrefix;
4249
private final EventExecutor executor;
4350
private final ReconnectionPolicy reconnectionPolicy;
4451
private final Callable<CompletionStage<Boolean>> reconnectionTask;
4552
private final Runnable onStart;
4653
private final Runnable onStop;
4754

48-
private boolean isRunning;
55+
private State state = State.STOPPED;
4956
private ReconnectionPolicy.ReconnectionSchedule reconnectionSchedule;
5057
private ScheduledFuture<CompletionStage<Boolean>> nextAttempt;
5158

@@ -76,32 +83,57 @@ public Reconnection(
7683
this(logPrefix, executor, reconnectionPolicy, reconnectionTask, () -> {}, () -> {});
7784
}
7885

86+
/**
87+
* Note that if {@link #stop()} was called but we're still waiting for the last pending attempt to
88+
* complete, this still returns {@code true}.
89+
*/
7990
public boolean isRunning() {
8091
assert executor.inEventLoop();
81-
return isRunning;
92+
return state != State.STOPPED;
8293
}
8394

84-
/** @throws IllegalStateException if the reconnection is already running */
95+
/** This is a no-op if the reconnection is already running. */
8596
public void start() {
8697
assert executor.inEventLoop();
87-
Preconditions.checkState(!isRunning, "Already running");
88-
reconnectionSchedule = reconnectionPolicy.newSchedule();
89-
isRunning = true;
90-
onStart.run();
91-
scheduleNextAttempt();
98+
switch (state) {
99+
case SCHEDULED:
100+
case ATTEMPT_IN_PROGRESS:
101+
// nothing to do
102+
break;
103+
case STOP_AFTER_CURRENT:
104+
// cancel the scheduled stop
105+
state = State.ATTEMPT_IN_PROGRESS;
106+
break;
107+
case STOPPED:
108+
reconnectionSchedule = reconnectionPolicy.newSchedule();
109+
onStart.run();
110+
scheduleNextAttempt();
111+
break;
112+
}
92113
}
93114

94115
/**
95116
* Forces a reconnection now, without waiting for the next scheduled attempt.
96117
*
97-
* @param forceIfStopped if true and the reconnection is not running, it will get started. If
98-
* false and the reconnection is not running, no attempt is scheduled.
118+
* @param forceIfStopped if true and the reconnection is not running, it will get started (meaning
119+
* subsequent reconnections will be scheduled if this attempt fails). If false and the
120+
* reconnection is not running, no attempt is scheduled.
99121
*/
100122
public void reconnectNow(boolean forceIfStopped) {
101123
assert executor.inEventLoop();
102-
if (isRunning || forceIfStopped) {
124+
if (state == State.ATTEMPT_IN_PROGRESS || state == State.STOP_AFTER_CURRENT) {
125+
LOG.debug(
126+
"[{}] reconnectNow and current attempt was still running, letting it complete",
127+
logPrefix);
128+
if (state == State.STOP_AFTER_CURRENT) {
129+
// Make sure that we will schedule other attempts if this one fails.
130+
state = State.ATTEMPT_IN_PROGRESS;
131+
}
132+
} else if (state == State.STOPPED && !forceIfStopped) {
133+
LOG.debug("[{}] reconnectNow(false) while stopped, nothing to do", logPrefix);
134+
} else {
135+
assert state == State.SCHEDULED || (state == State.STOPPED && forceIfStopped);
103136
LOG.debug("[{}] Forcing next attempt now", logPrefix);
104-
isRunning = true;
105137
if (nextAttempt != null) {
106138
nextAttempt.cancel(true);
107139
}
@@ -116,20 +148,33 @@ public void reconnectNow(boolean forceIfStopped) {
116148

117149
public void stop() {
118150
assert executor.inEventLoop();
119-
if (isRunning) {
120-
isRunning = false;
121-
LOG.debug("[{}] Stopping reconnection", logPrefix);
122-
if (nextAttempt != null) {
123-
nextAttempt.cancel(true);
124-
}
125-
onStop.run();
151+
switch (state) {
152+
case STOPPED:
153+
case STOP_AFTER_CURRENT:
154+
break;
155+
case ATTEMPT_IN_PROGRESS:
156+
state = State.STOP_AFTER_CURRENT;
157+
break;
158+
case SCHEDULED:
159+
reallyStop();
160+
break;
161+
}
162+
}
163+
164+
private void reallyStop() {
165+
LOG.debug("[{}] Stopping reconnection", logPrefix);
166+
state = State.STOPPED;
167+
if (nextAttempt != null) {
168+
nextAttempt.cancel(true);
126169
nextAttempt = null;
127-
reconnectionSchedule = null;
128170
}
171+
onStop.run();
172+
reconnectionSchedule = null;
129173
}
130174

131175
private void scheduleNextAttempt() {
132176
assert executor.inEventLoop();
177+
state = State.SCHEDULED;
133178
if (reconnectionSchedule == null) { // happens if reconnectNow() while we were stopped
134179
reconnectionSchedule = reconnectionPolicy.newSchedule();
135180
}
@@ -152,6 +197,7 @@ private void scheduleNextAttempt() {
152197
// the CompletableFuture to find out if that succeeded or not.
153198
private void onNextAttemptStarted(CompletionStage<Boolean> futureOutcome) {
154199
assert executor.inEventLoop();
200+
state = State.ATTEMPT_IN_PROGRESS;
155201
futureOutcome
156202
.whenCompleteAsync(this::onNextAttemptCompleted, executor)
157203
.exceptionally(UncaughtExceptions::log);
@@ -161,12 +207,15 @@ private void onNextAttemptCompleted(Boolean success, Throwable error) {
161207
assert executor.inEventLoop();
162208
if (success) {
163209
LOG.debug("[{}] Reconnection successful", logPrefix);
164-
stop();
210+
reallyStop();
165211
} else {
166212
if (error != null && !(error instanceof CancellationException)) {
167213
LOG.warn("[{}] Uncaught error while starting reconnection attempt", logPrefix, error);
168214
}
169-
if (isRunning) { // can be false if stop() was called
215+
if (state == State.STOP_AFTER_CURRENT) {
216+
reallyStop();
217+
} else {
218+
assert state == State.ATTEMPT_IN_PROGRESS;
170219
scheduleNextAttempt();
171220
}
172221
}

core/src/test/java/com/datastax/oss/driver/TestDataProviders.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.oss.driver;
1717

18+
import com.tngtech.java.junit.dataprovider.DataProvider;
1819
import java.util.Arrays;
1920

2021
public class TestDataProviders {
@@ -77,4 +78,9 @@ public static Object[][] combine(Object[][]... providers) {
7778
}
7879
return result;
7980
}
81+
82+
@DataProvider
83+
public static Object[][] booleans() {
84+
return fromList(true, false);
85+
}
8086
}

core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolReconnectTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@
2424
import java.time.Duration;
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.CompletionStage;
27+
import java.util.concurrent.ExecutionException;
2728
import org.junit.Test;
2829
import org.mockito.InOrder;
2930
import org.mockito.Mockito;
3031

3132
import static com.datastax.oss.driver.Assertions.assertThat;
33+
import static org.mockito.ArgumentMatchers.any;
34+
import static org.mockito.Mockito.never;
3235
import static org.mockito.Mockito.times;
3336

3437
public class ChannelPoolReconnectTest extends ChannelPoolTestBase {
@@ -133,4 +136,59 @@ public void should_reconnect_when_channel_starts_graceful_shutdown() throws Exce
133136

134137
factoryHelper.verifyNoMoreCalls();
135138
}
139+
140+
@Test
141+
public void should_let_current_attempt_complete_when_reconnecting_now()
142+
throws ExecutionException, InterruptedException {
143+
Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1));
144+
145+
Mockito.when(defaultProfile.getInt(CoreDriverOption.CONNECTION_POOL_LOCAL_SIZE)).thenReturn(1);
146+
147+
DriverChannel channel1 = newMockDriverChannel(1);
148+
DriverChannel channel2 = newMockDriverChannel(2);
149+
CompletableFuture<DriverChannel> channel2Future = new CompletableFuture<>();
150+
MockChannelFactoryHelper factoryHelper =
151+
MockChannelFactoryHelper.builder(channelFactory)
152+
// init
153+
.success(ADDRESS, channel1)
154+
// reconnection
155+
.pending(ADDRESS, channel2Future)
156+
.build();
157+
158+
InOrder inOrder = Mockito.inOrder(eventBus);
159+
160+
// Initial connection
161+
CompletionStage<ChannelPool> poolFuture =
162+
ChannelPool.init(NODE, null, NodeDistance.LOCAL, context, "test");
163+
factoryHelper.waitForCalls(ADDRESS, 1);
164+
waitForPendingAdminTasks();
165+
assertThat(poolFuture).isSuccess();
166+
ChannelPool pool = poolFuture.toCompletableFuture().get();
167+
inOrder.verify(eventBus, times(1)).fire(ChannelEvent.channelOpened(NODE));
168+
169+
// Kill channel1, reconnection begins and starts initializing channel2, but the initialization
170+
// is still pending (channel2Future not completed)
171+
((ChannelPromise) channel1.closeStartedFuture()).setSuccess();
172+
waitForPendingAdminTasks();
173+
inOrder.verify(eventBus).fire(ChannelEvent.channelClosed(NODE));
174+
inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(NODE));
175+
Mockito.verify(reconnectionSchedule).nextDelay();
176+
factoryHelper.waitForCalls(ADDRESS, 1);
177+
178+
// Force a reconnection, should not try to create a new channel since we have a pending one
179+
pool.reconnectNow();
180+
waitForPendingAdminTasks();
181+
factoryHelper.verifyNoMoreCalls();
182+
inOrder.verify(eventBus, never()).fire(any());
183+
184+
// Complete the initialization of channel2, reconnection succeeds
185+
channel2Future.complete(channel2);
186+
waitForPendingAdminTasks();
187+
inOrder.verify(eventBus).fire(ChannelEvent.channelOpened(NODE));
188+
Mockito.verify(eventBus).fire(ChannelEvent.reconnectionStopped(NODE));
189+
190+
assertThat(pool.channels).containsOnly(channel2);
191+
192+
factoryHelper.verifyNoMoreCalls();
193+
}
136194
}

0 commit comments

Comments
 (0)