Skip to content

Commit 8762111

Browse files
committed
Add integration state for node state changes
1 parent 43605b5 commit 8762111

6 files changed

Lines changed: 515 additions & 19 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/context/EventBus.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public <T> boolean unregister(Object key, Class<T> eventClass) {
8282
* processing asynchronously if needed.
8383
*/
8484
public void fire(Object event) {
85+
LOG.trace("[{}] Firing an instance of {}: {}", logPrefix, event.getClass(), event);
8586
// if the exact match thing gets too cumbersome, we can reconsider, but I'd like to avoid
8687
// scanning all the keys with instanceof checks.
8788
Class<?> eventClass = event.getClass();

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/NodeStateManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
1919
import com.datastax.oss.driver.api.core.config.CoreDriverOption;
2020
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
21+
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
2122
import com.datastax.oss.driver.api.core.metadata.NodeState;
2223
import com.datastax.oss.driver.internal.core.channel.ChannelEvent;
2324
import com.datastax.oss.driver.internal.core.context.EventBus;
@@ -148,8 +149,8 @@ private void onDebouncedTopologyEvent(TopologyEvent event) {
148149
metadataManager.addNode(event.address);
149150
} else if (node.state == NodeState.FORCED_DOWN) {
150151
LOG.debug("[{}] Not setting {} UP because it is FORCED_DOWN", logPrefix, node);
151-
} else {
152-
setState(node, NodeState.UP, "an UP topology event was received");
152+
} else if (node.distance == NodeDistance.IGNORED) {
153+
setState(node, NodeState.UP, "it is IGNORED and an UP topology event was received");
153154
}
154155
break;
155156
case SUGGEST_DOWN:
@@ -165,8 +166,8 @@ private void onDebouncedTopologyEvent(TopologyEvent event) {
165166
node);
166167
} else if (node.state == NodeState.FORCED_DOWN) {
167168
LOG.debug("[{}] Not setting {} DOWN because it is FORCED_DOWN", logPrefix, node);
168-
} else {
169-
setState(node, NodeState.DOWN, "a DOWN topology event was received");
169+
} else if (node.distance == NodeDistance.IGNORED) {
170+
setState(node, NodeState.DOWN, "it is IGNORED and a DOWN topology event was received");
170171
}
171172
break;
172173
case FORCE_UP:

core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
3030
import com.datastax.oss.driver.internal.core.metadata.DistanceEvent;
3131
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
32+
import com.datastax.oss.driver.internal.core.metadata.TopologyEvent;
3233
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
3334
import com.datastax.oss.driver.internal.core.pool.ChannelPoolFactory;
3435
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
@@ -208,6 +209,7 @@ private class SingleThreaded {
208209
private final Object stateListenerKey;
209210
private final ReplayingEventFilter<NodeStateEvent> stateEventFilter =
210211
new ReplayingEventFilter<>(this::processStateEvent);
212+
private final Object topologyListenerKey;
211213
// The pools that we have opened but have not finished initializing yet
212214
private final Map<Node, CompletionStage<ChannelPool>> pending = new HashMap<>();
213215
// If we receive events while a pool is initializing, the last one is stored here
@@ -226,6 +228,11 @@ private SingleThreaded(InternalDriverContext context) {
226228
context
227229
.eventBus()
228230
.register(NodeStateEvent.class, RunOrSchedule.on(adminExecutor, this::onStateEvent));
231+
this.topologyListenerKey =
232+
context
233+
.eventBus()
234+
.register(
235+
TopologyEvent.class, RunOrSchedule.on(adminExecutor, this::onTopologyEvent));
229236
}
230237

231238
private void init() {
@@ -338,39 +345,57 @@ private void processDistanceEvent(DistanceEvent event) {
338345

339346
private void processStateEvent(NodeStateEvent event) {
340347
assert adminExecutor.inEventLoop();
341-
// no need to check closeWasCalled, because we stop listening for events one closed
348+
// no need to check closeWasCalled, because we stop listening for events once closed
342349
DefaultNode node = event.node;
350+
NodeState oldState = event.oldState;
343351
NodeState newState = event.newState;
344352
if (pending.containsKey(node)) {
345353
pendingStateEvents.put(node, event);
346354
} else if (newState == NodeState.FORCED_DOWN) {
347355
ChannelPool pool = pools.remove(node);
348356
if (pool != null) {
349-
LOG.debug("[{}] {} became FORCED_DOWN, destroying pool", logPrefix, node);
357+
LOG.debug("[{}] {} was FORCED_DOWN, destroying pool", logPrefix, node);
350358
pool.closeAsync()
351359
.exceptionally(
352360
error -> {
353361
LOG.warn("[{}] Error closing pool", logPrefix, error);
354362
return null;
355363
});
356364
}
357-
} else if (newState == NodeState.UP && node.getDistance() != NodeDistance.IGNORED) {
358-
ChannelPool pool = pools.get(node);
359-
if (pool == null) {
360-
LOG.debug("[{}] {} came back UP and no pool found, initializing it", logPrefix, node);
361-
CompletionStage<ChannelPool> poolFuture =
362-
channelPoolFactory.init(node, keyspace, node.getDistance(), context, logPrefix);
363-
pending.put(node, poolFuture);
364-
poolFuture
365-
.thenAcceptAsync(this::onPoolInitialized, adminExecutor)
366-
.exceptionally(UncaughtExceptions::log);
367-
} else {
368-
LOG.debug("[{}] {} came back UP, triggering pool reconnection", logPrefix, node);
369-
pool.reconnectNow();
365+
} else if (oldState == NodeState.FORCED_DOWN
366+
&& newState == NodeState.UP
367+
&& node.getDistance() != NodeDistance.IGNORED) {
368+
LOG.debug("[{}] {} was forced back UP, initializing pool", logPrefix, node);
369+
createOrReconnectPool(node);
370+
}
371+
}
372+
373+
private void onTopologyEvent(TopologyEvent event) {
374+
assert adminExecutor.inEventLoop();
375+
if (event.type == TopologyEvent.Type.SUGGEST_UP) {
376+
Node node = context.metadataManager().getMetadata().getNodes().get(event.address);
377+
if (node.getDistance() != NodeDistance.IGNORED) {
378+
LOG.debug(
379+
"[{}] Received a SUGGEST_UP event for {}, reconnecting pool now", logPrefix, node);
380+
createOrReconnectPool(node);
370381
}
371382
}
372383
}
373384

385+
private void createOrReconnectPool(Node node) {
386+
ChannelPool pool = pools.get(node);
387+
if (pool == null) {
388+
CompletionStage<ChannelPool> poolFuture =
389+
channelPoolFactory.init(node, keyspace, node.getDistance(), context, logPrefix);
390+
pending.put(node, poolFuture);
391+
poolFuture
392+
.thenAcceptAsync(this::onPoolInitialized, adminExecutor)
393+
.exceptionally(UncaughtExceptions::log);
394+
} else {
395+
pool.reconnectNow();
396+
}
397+
}
398+
374399
private void onPoolInitialized(ChannelPool pool) {
375400
assert adminExecutor.inEventLoop();
376401
Node node = pool.getNode();
@@ -458,6 +483,7 @@ private void close() {
458483
// Stop listening for events
459484
context.eventBus().unregister(distanceListenerKey, DistanceEvent.class);
460485
context.eventBus().unregister(stateListenerKey, NodeStateEvent.class);
486+
context.eventBus().unregister(topologyListenerKey, TopologyEvent.class);
461487

462488
List<CompletionStage<Void>> closePoolStages = new ArrayList<>(pools.size());
463489
for (ChannelPool pool : pools.values()) {

0 commit comments

Comments
 (0)