1616package com .datastax .oss .driver .internal .core .util .concurrent ;
1717
1818import com .datastax .oss .driver .api .core .connection .ReconnectionPolicy ;
19- import com .google .common .base .Preconditions ;
2019import io .netty .util .concurrent .EventExecutor ;
2120import io .netty .util .concurrent .Future ;
2221import io .netty .util .concurrent .ScheduledFuture ;
3837public class Reconnection {
3938 private static final Logger LOG = LoggerFactory .getLogger (Reconnection .class );
4039
40+ private enum State {
41+ STOPPED ,
42+ SCHEDULED , // next attempt scheduled but not started yet
43+ ATTEMPT_IN_PROGRESS , // current attempt started and not completed yet
44+ STOP_AFTER_CURRENT , // stopped, but we're letting an in-progress attempt finish
45+ ;
46+ }
47+
4148 private final String logPrefix ;
4249 private final EventExecutor executor ;
4350 private final ReconnectionPolicy reconnectionPolicy ;
4451 private final Callable <CompletionStage <Boolean >> reconnectionTask ;
4552 private final Runnable onStart ;
4653 private final Runnable onStop ;
4754
48- private boolean isRunning ;
55+ private State state = State . STOPPED ;
4956 private ReconnectionPolicy .ReconnectionSchedule reconnectionSchedule ;
5057 private ScheduledFuture <CompletionStage <Boolean >> nextAttempt ;
5158
@@ -76,32 +83,57 @@ public Reconnection(
7683 this (logPrefix , executor , reconnectionPolicy , reconnectionTask , () -> {}, () -> {});
7784 }
7885
86+ /**
87+ * Note that if {@link #stop()} was called but we're still waiting for the last pending attempt to
88+ * complete, this still returns {@code true}.
89+ */
7990 public boolean isRunning () {
8091 assert executor .inEventLoop ();
81- return isRunning ;
92+ return state != State . STOPPED ;
8293 }
8394
84- /** @throws IllegalStateException if the reconnection is already running */
95+ /** This is a no-op if the reconnection is already running. */
8596 public void start () {
8697 assert executor .inEventLoop ();
87- Preconditions .checkState (!isRunning , "Already running" );
88- reconnectionSchedule = reconnectionPolicy .newSchedule ();
89- isRunning = true ;
90- onStart .run ();
91- scheduleNextAttempt ();
98+ switch (state ) {
99+ case SCHEDULED :
100+ case ATTEMPT_IN_PROGRESS :
101+ // nothing to do
102+ break ;
103+ case STOP_AFTER_CURRENT :
104+ // cancel the scheduled stop
105+ state = State .ATTEMPT_IN_PROGRESS ;
106+ break ;
107+ case STOPPED :
108+ reconnectionSchedule = reconnectionPolicy .newSchedule ();
109+ onStart .run ();
110+ scheduleNextAttempt ();
111+ break ;
112+ }
92113 }
93114
94115 /**
95116 * Forces a reconnection now, without waiting for the next scheduled attempt.
96117 *
97- * @param forceIfStopped if true and the reconnection is not running, it will get started. If
98- * false and the reconnection is not running, no attempt is scheduled.
118+ * @param forceIfStopped if true and the reconnection is not running, it will get started (meaning
119+ * subsequent reconnections will be scheduled if this attempt fails). If false and the
120+ * reconnection is not running, no attempt is scheduled.
99121 */
100122 public void reconnectNow (boolean forceIfStopped ) {
101123 assert executor .inEventLoop ();
102- if (isRunning || forceIfStopped ) {
124+ if (state == State .ATTEMPT_IN_PROGRESS || state == State .STOP_AFTER_CURRENT ) {
125+ LOG .debug (
126+ "[{}] reconnectNow and current attempt was still running, letting it complete" ,
127+ logPrefix );
128+ if (state == State .STOP_AFTER_CURRENT ) {
129+ // Make sure that we will schedule other attempts if this one fails.
130+ state = State .ATTEMPT_IN_PROGRESS ;
131+ }
132+ } else if (state == State .STOPPED && !forceIfStopped ) {
133+ LOG .debug ("[{}] reconnectNow(false) while stopped, nothing to do" , logPrefix );
134+ } else {
135+ assert state == State .SCHEDULED || (state == State .STOPPED && forceIfStopped );
103136 LOG .debug ("[{}] Forcing next attempt now" , logPrefix );
104- isRunning = true ;
105137 if (nextAttempt != null ) {
106138 nextAttempt .cancel (true );
107139 }
@@ -116,20 +148,33 @@ public void reconnectNow(boolean forceIfStopped) {
116148
117149 public void stop () {
118150 assert executor .inEventLoop ();
119- if (isRunning ) {
120- isRunning = false ;
121- LOG .debug ("[{}] Stopping reconnection" , logPrefix );
122- if (nextAttempt != null ) {
123- nextAttempt .cancel (true );
124- }
125- onStop .run ();
151+ switch (state ) {
152+ case STOPPED :
153+ case STOP_AFTER_CURRENT :
154+ break ;
155+ case ATTEMPT_IN_PROGRESS :
156+ state = State .STOP_AFTER_CURRENT ;
157+ break ;
158+ case SCHEDULED :
159+ reallyStop ();
160+ break ;
161+ }
162+ }
163+
164+ private void reallyStop () {
165+ LOG .debug ("[{}] Stopping reconnection" , logPrefix );
166+ state = State .STOPPED ;
167+ if (nextAttempt != null ) {
168+ nextAttempt .cancel (true );
126169 nextAttempt = null ;
127- reconnectionSchedule = null ;
128170 }
171+ onStop .run ();
172+ reconnectionSchedule = null ;
129173 }
130174
131175 private void scheduleNextAttempt () {
132176 assert executor .inEventLoop ();
177+ state = State .SCHEDULED ;
133178 if (reconnectionSchedule == null ) { // happens if reconnectNow() while we were stopped
134179 reconnectionSchedule = reconnectionPolicy .newSchedule ();
135180 }
@@ -152,6 +197,7 @@ private void scheduleNextAttempt() {
152197 // the CompletableFuture to find out if that succeeded or not.
153198 private void onNextAttemptStarted (CompletionStage <Boolean > futureOutcome ) {
154199 assert executor .inEventLoop ();
200+ state = State .ATTEMPT_IN_PROGRESS ;
155201 futureOutcome
156202 .whenCompleteAsync (this ::onNextAttemptCompleted , executor )
157203 .exceptionally (UncaughtExceptions ::log );
@@ -161,12 +207,15 @@ private void onNextAttemptCompleted(Boolean success, Throwable error) {
161207 assert executor .inEventLoop ();
162208 if (success ) {
163209 LOG .debug ("[{}] Reconnection successful" , logPrefix );
164- stop ();
210+ reallyStop ();
165211 } else {
166212 if (error != null && !(error instanceof CancellationException )) {
167213 LOG .warn ("[{}] Uncaught error while starting reconnection attempt" , logPrefix , error );
168214 }
169- if (isRunning ) { // can be false if stop() was called
215+ if (state == State .STOP_AFTER_CURRENT ) {
216+ reallyStop ();
217+ } else {
218+ assert state == State .ATTEMPT_IN_PROGRESS ;
170219 scheduleNextAttempt ();
171220 }
172221 }
0 commit comments