Skip to content

Commit eb037bb

Browse files
committed
JAVA-1593: Reconnect control connection if current node is removed, forced down or ignored
1 parent 1024fd9 commit eb037bb

5 files changed

Lines changed: 291 additions & 21 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.0.0-alpha2 (in progress)
66

7+
- [bug] JAVA-1593: Reconnect control connection if current node is removed, forced down or ignored
78
- [bug] JAVA-1595: Don't use system.local.rpc_address when refreshing node list
89
- [bug] JAVA-1568: Handle Reconnection#reconnectNow/stop while the current attempt is still in
910
progress

core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
import com.datastax.oss.driver.api.core.AllNodesFailedException;
1919
import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
20+
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
2021
import com.datastax.oss.driver.api.core.metadata.Node;
22+
import com.datastax.oss.driver.api.core.metadata.NodeState;
2123
import com.datastax.oss.driver.internal.core.channel.ChannelEvent;
2224
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
2325
import com.datastax.oss.driver.internal.core.channel.DriverChannelOptions;
2426
import com.datastax.oss.driver.internal.core.channel.EventCallback;
2527
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
28+
import com.datastax.oss.driver.internal.core.metadata.DistanceEvent;
29+
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
2630
import com.datastax.oss.driver.internal.core.metadata.SchemaElementKind;
2731
import com.datastax.oss.driver.internal.core.metadata.TopologyEvent;
2832
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
@@ -41,6 +45,7 @@
4145
import java.util.LinkedHashMap;
4246
import java.util.Map;
4347
import java.util.Queue;
48+
import java.util.WeakHashMap;
4449
import java.util.concurrent.CompletableFuture;
4550
import java.util.concurrent.CompletionStage;
4651
import java.util.function.Consumer;
@@ -66,7 +71,7 @@ public class ControlConnection implements EventCallback, AsyncAutoCloseable {
6671
private final EventExecutor adminExecutor;
6772
private final SingleThreaded singleThreaded;
6873

69-
// The single channel used by this connection. This field is accessed currently, but only
74+
// The single channel used by this connection. This field is accessed concurrently, but only
7075
// mutated on adminExecutor (by SingleThreaded methods)
7176
private volatile DriverChannel channel;
7277

@@ -195,11 +200,21 @@ private class SingleThreaded {
195200
private boolean closeWasCalled;
196201
private final Reconnection reconnection;
197202
private DriverChannelOptions channelOptions;
203+
// The last events received for each node
204+
private final Map<Node, DistanceEvent> lastDistanceEvents = new WeakHashMap<>();
205+
private final Map<Node, NodeStateEvent> lastStateEvents = new WeakHashMap<>();
198206

199207
private SingleThreaded(InternalDriverContext context) {
200208
this.context = context;
201209
this.reconnection =
202210
new Reconnection(logPrefix, adminExecutor, context.reconnectionPolicy(), this::reconnect);
211+
212+
context
213+
.eventBus()
214+
.register(DistanceEvent.class, RunOrSchedule.on(adminExecutor, this::onDistanceEvent));
215+
context
216+
.eventBus()
217+
.register(NodeStateEvent.class, RunOrSchedule.on(adminExecutor, this::onStateEvent));
203218
}
204219

205220
private void init(boolean listenToClusterEvents) {
@@ -253,6 +268,8 @@ private void connect(
253268
.whenCompleteAsync(
254269
(channel, error) -> {
255270
try {
271+
DistanceEvent lastDistanceEvent = lastDistanceEvents.get(node);
272+
NodeStateEvent lastStateEvent = lastStateEvents.get(node);
256273
if (error != null) {
257274
if (closeWasCalled) {
258275
onSuccess.run(); // abort, we don't really care about the result
@@ -274,6 +291,25 @@ private void connect(
274291
channel);
275292
channel.forceClose();
276293
onSuccess.run();
294+
} else if (lastDistanceEvent != null
295+
&& lastDistanceEvent.distance == NodeDistance.IGNORED) {
296+
LOG.debug(
297+
"[{}] New channel opened ({}) but node became ignored, "
298+
+ "closing and trying next node",
299+
logPrefix,
300+
channel);
301+
channel.forceClose();
302+
connect(nodes, errors, onSuccess, onFailure);
303+
} else if (lastStateEvent != null
304+
&& (lastStateEvent.newState == null /*(removed)*/
305+
|| lastStateEvent.newState == NodeState.FORCED_DOWN)) {
306+
LOG.debug(
307+
"[{}] New channel opened ({}) but node was removed or forced down, "
308+
+ "closing and trying next node",
309+
logPrefix,
310+
channel);
311+
channel.forceClose();
312+
connect(nodes, errors, onSuccess, onFailure);
277313
} else {
278314
LOG.debug("[{}] Connection established to {}", logPrefix, node);
279315
// Make sure previous channel gets closed (it may still be open if reconnection was forced)
@@ -344,6 +380,36 @@ private void reconnectNow() {
344380
}
345381
}
346382

383+
private void onDistanceEvent(DistanceEvent event) {
384+
assert adminExecutor.inEventLoop();
385+
this.lastDistanceEvents.put(event.node, event);
386+
if (event.distance == NodeDistance.IGNORED
387+
&& channel != null
388+
&& !channel.closeFuture().isDone()
389+
&& event.node.getConnectAddress().equals(channel.address())) {
390+
LOG.debug(
391+
"[{}] Control node {} became IGNORED, reconnecting to a different node",
392+
logPrefix,
393+
event.node);
394+
reconnectNow();
395+
}
396+
}
397+
398+
private void onStateEvent(NodeStateEvent event) {
399+
assert adminExecutor.inEventLoop();
400+
this.lastStateEvents.put(event.node, event);
401+
if ((event.newState == null /*(removed)*/ || event.newState == NodeState.FORCED_DOWN)
402+
&& channel != null
403+
&& !channel.closeFuture().isDone()
404+
&& event.node.getConnectAddress().equals(channel.address())) {
405+
LOG.debug(
406+
"[{}] Control node {} was removed or forced down, reconnecting to a different node",
407+
logPrefix,
408+
event.node);
409+
reconnectNow();
410+
}
411+
}
412+
347413
private void forceClose() {
348414
assert adminExecutor.inEventLoop();
349415
if (closeWasCalled) {

core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.List;
4545
import java.util.Map;
4646
import java.util.Objects;
47+
import java.util.WeakHashMap;
4748
import java.util.concurrent.CompletableFuture;
4849
import java.util.concurrent.CompletionStage;
4950
import java.util.concurrent.ConcurrentHashMap;
@@ -210,8 +211,8 @@ private class SingleThreaded {
210211
// The pools that we have opened but have not finished initializing yet
211212
private final Map<Node, CompletionStage<ChannelPool>> pending = new HashMap<>();
212213
// If we receive events while a pool is initializing, the last one is stored here
213-
private final Map<Node, DistanceEvent> pendingDistanceEvents = new HashMap<>();
214-
private final Map<Node, NodeStateEvent> pendingStateEvents = new HashMap<>();
214+
private final Map<Node, DistanceEvent> pendingDistanceEvents = new WeakHashMap<>();
215+
private final Map<Node, NodeStateEvent> pendingStateEvents = new WeakHashMap<>();
215216

216217
private SingleThreaded(InternalDriverContext context) {
217218
this.context = context;

0 commit comments

Comments
 (0)