Skip to content

Commit 22fc84d

Browse files
committed
Throw when diagnostic unreliable
1 parent 26acd2c commit 22fc84d

9 files changed

Lines changed: 346 additions & 33 deletions

File tree

core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,11 @@ default TopologyDiagnostic generateTopologyDiagnostic() {
174174
* <p>However Gossip events are not 100% reliable, and it happens frequently that the driver
175175
* cannot know if a certain node is up or down. Such nodes show up in a {@link TopologyDiagnostic}
176176
* in "unknown" state; in a {@link TokenRingDiagnostic} however, <em>these nodes are
177-
* optimistically considered as up</em>. The more nodes in "unknown" state, the lesser the
178-
* accuracy of token ring health statuses returned by this method. This is generally more
179-
* impactful for global consistency levels than for datacenter-local ones.
177+
* pessimistically considered as down</em>. The more nodes in "unknown" state, the lesser the
178+
* accuracy of token ring health statuses returned by this method; if a diagnostic becomes
179+
* unreliable because there are too many nodes in "unknown" state, this method will throw {@link
180+
* IllegalStateException}. This is generally more likely to happen for global consistency levels
181+
* than for datacenter-local ones.
180182
*
181183
* <p>Therefore, <em>diagnostic reports generated by this method should be considered best-effort
182184
* only</em>, and are not meant to replace a proper operational surveillance tool.
@@ -187,6 +189,8 @@ default TopologyDiagnostic generateTopologyDiagnostic() {
187189
* may be null otherwise.
188190
* @see DefaultDriverOption#METADATA_TOKEN_MAP_ENABLED
189191
* @see #getKeyspace(String)
192+
* @throws IllegalArgumentException if the pre-requisites for generation are not met.
193+
* @throws IllegalStateException if the diagnostic is too unreliable to be generated.
190194
*/
191195
default TokenRingDiagnostic generateTokenRingDiagnostic(
192196
@NonNull CqlIdentifier keyspaceName,

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/diagnostic/ring/AbstractTokenRingDiagnosticGenerator.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Collections;
2929
import java.util.Objects;
3030
import java.util.Set;
31-
import java.util.stream.Collectors;
3231

3332
public abstract class AbstractTokenRingDiagnosticGenerator implements TokenRingDiagnosticGenerator {
3433

@@ -62,21 +61,24 @@ protected Set<TokenRangeDiagnostic> generateTokenRangeDiagnostics(TokenMap token
6261
Set<TokenRangeDiagnostic> reports = Sets.newHashSetWithExpectedSize(tokenRanges.size());
6362
for (TokenRange range : tokenRanges) {
6463
Set<Node> allReplicas = tokenMap.getReplicas(keyspace.getName(), range);
65-
Set<Node> aliveReplicas =
66-
allReplicas.stream().filter(this::isAlive).collect(Collectors.toSet());
67-
TokenRangeDiagnostic report = generateTokenRangeDiagnostic(range, aliveReplicas);
64+
TokenRangeDiagnostic report = generateTokenRangeDiagnostic(range, allReplicas);
6865
reports.add(report);
6966
}
7067
return reports;
7168
}
7269

73-
protected boolean isAlive(Node node) {
70+
protected boolean isPessimisticallyUp(Node node) {
71+
// Be optimistic and count nodes in unknown state as alive
72+
return node.getState() == NodeState.UP;
73+
}
74+
75+
protected boolean isOptimisticallyUp(Node node) {
7476
// Be optimistic and count nodes in unknown state as alive
7577
return node.getState() == NodeState.UP || node.getState() == NodeState.UNKNOWN;
7678
}
7779

7880
protected abstract TokenRangeDiagnostic generateTokenRangeDiagnostic(
79-
TokenRange range, Set<Node> aliveReplicas);
81+
TokenRange range, Set<Node> allReplicas);
8082

8183
protected abstract TokenRingDiagnostic generateRingDiagnostic(
8284
Set<TokenRangeDiagnostic> tokenRangeDiagnostics);

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/diagnostic/ring/DefaultTokenRingDiagnosticGenerator.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,34 @@ public DefaultTokenRingDiagnosticGenerator(
6060

6161
@Override
6262
protected TokenRangeDiagnostic generateTokenRangeDiagnostic(
63-
TokenRange range, Set<Node> aliveReplicas) {
64-
return new SimpleTokenRangeDiagnostic(range, requiredReplicas, aliveReplicas.size());
63+
TokenRange range, Set<Node> allReplicas) {
64+
int pessimisticallyAliveReplicas = getPessimisticallyAliveReplicas(allReplicas);
65+
TokenRangeDiagnostic pessimistic =
66+
new SimpleTokenRangeDiagnostic(range, requiredReplicas, pessimisticallyAliveReplicas);
67+
if (!pessimistic.isAvailable()) {
68+
int optimisticallyAliveReplicas = getOptimisticallyAliveReplicas(allReplicas);
69+
if (optimisticallyAliveReplicas > pessimisticallyAliveReplicas) {
70+
TokenRangeDiagnostic optimistic =
71+
new SimpleTokenRangeDiagnostic(range, requiredReplicas, optimisticallyAliveReplicas);
72+
if (optimistic.isAvailable()) {
73+
throw new UnreliableTokenRangeDiagnosticException(range);
74+
}
75+
}
76+
}
77+
return pessimistic;
6578
}
6679

6780
@Override
6881
protected TokenRingDiagnostic generateRingDiagnostic(
6982
Set<TokenRangeDiagnostic> tokenRangeDiagnostics) {
7083
return new DefaultTokenRingDiagnostic(keyspace, consistencyLevel, null, tokenRangeDiagnostics);
7184
}
85+
86+
private int getOptimisticallyAliveReplicas(Set<Node> allReplicas) {
87+
return (int) allReplicas.stream().filter(this::isOptimisticallyUp).count();
88+
}
89+
90+
private int getPessimisticallyAliveReplicas(Set<Node> allReplicas) {
91+
return (int) allReplicas.stream().filter(this::isPessimisticallyUp).count();
92+
}
7293
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/diagnostic/ring/EachQuorumTokenRingDiagnosticGenerator.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,49 @@ public EachQuorumTokenRingDiagnosticGenerator(
6060

6161
@Override
6262
protected TokenRangeDiagnostic generateTokenRangeDiagnostic(
63-
TokenRange range, Set<Node> aliveReplicas) {
63+
TokenRange range, Set<Node> allReplicas) {
6464
CompositeTokenRangeDiagnostic.Builder diagnostic =
6565
new CompositeTokenRangeDiagnostic.Builder(range);
66-
Map<String, Integer> aliveReplicasByDc =
67-
aliveReplicas.stream()
68-
.collect(Collectors.toMap(Node::getDatacenter, replica -> 1, Integer::sum));
66+
Map<String, Integer> pessimisticallyAliveReplicasByDc =
67+
getPessimisticallyAliveReplicasByDc(allReplicas);
68+
Map<String, Integer> optimisticallyAliveReplicasByDc =
69+
getOptimisticallyAliveReplicasByDc(allReplicas);
6970
for (String datacenter : this.requiredReplicasByDc.keySet()) {
7071
int requiredReplicasInDc = this.requiredReplicasByDc.get(datacenter);
71-
int aliveReplicasInDc = aliveReplicasByDc.getOrDefault(datacenter, 0);
72-
TokenRangeDiagnostic childDiagnostic =
73-
new SimpleTokenRangeDiagnostic(range, requiredReplicasInDc, aliveReplicasInDc);
74-
diagnostic.addChildDiagnostic(datacenter, childDiagnostic);
72+
int pessimisticallyAliveReplicasInDc =
73+
pessimisticallyAliveReplicasByDc.getOrDefault(datacenter, 0);
74+
TokenRangeDiagnostic pessimistic =
75+
new SimpleTokenRangeDiagnostic(
76+
range, requiredReplicasInDc, pessimisticallyAliveReplicasInDc);
77+
if (!pessimistic.isAvailable()) {
78+
int optimisticallyAliveReplicasInDc =
79+
optimisticallyAliveReplicasByDc.getOrDefault(datacenter, 0);
80+
if (optimisticallyAliveReplicasInDc > pessimisticallyAliveReplicasInDc) {
81+
TokenRangeDiagnostic optimistic =
82+
new SimpleTokenRangeDiagnostic(
83+
range, requiredReplicasInDc, optimisticallyAliveReplicasInDc);
84+
if (optimistic.isAvailable()) {
85+
throw new UnreliableTokenRangeDiagnosticException(range);
86+
}
87+
}
88+
}
89+
diagnostic.addChildDiagnostic(datacenter, pessimistic);
7590
}
7691
return diagnostic.build();
7792
}
7893

94+
private Map<String, Integer> getPessimisticallyAliveReplicasByDc(Set<Node> allReplicas) {
95+
return allReplicas.stream()
96+
.filter(this::isPessimisticallyUp)
97+
.collect(Collectors.toMap(Node::getDatacenter, replica -> 1, Integer::sum));
98+
}
99+
100+
private Map<String, Integer> getOptimisticallyAliveReplicasByDc(Set<Node> allReplicas) {
101+
return allReplicas.stream()
102+
.filter(this::isOptimisticallyUp)
103+
.collect(Collectors.toMap(Node::getDatacenter, replica -> 1, Integer::sum));
104+
}
105+
79106
@Override
80107
protected TokenRingDiagnostic generateRingDiagnostic(
81108
Set<TokenRangeDiagnostic> tokenRangeDiagnostics) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/diagnostic/ring/LocalTokenRingDiagnosticGenerator.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,22 @@ public LocalTokenRingDiagnosticGenerator(
5656

5757
@Override
5858
protected TokenRangeDiagnostic generateTokenRangeDiagnostic(
59-
TokenRange range, Set<Node> aliveReplicas) {
60-
int aliveReplicasInDc =
61-
(int) aliveReplicas.stream().map(Node::getDatacenter).filter(datacenter::equals).count();
62-
return new SimpleTokenRangeDiagnostic(range, requiredReplicas, aliveReplicasInDc);
59+
TokenRange range, Set<Node> allReplicas) {
60+
int pessimisticallyAliveReplicasInDc = getPessimisticallyAliveReplicasInDc(allReplicas);
61+
TokenRangeDiagnostic pessimistic =
62+
new SimpleTokenRangeDiagnostic(range, requiredReplicas, pessimisticallyAliveReplicasInDc);
63+
if (!pessimistic.isAvailable()) {
64+
int optimisticallyAliveReplicasInDc = getOptimisticallyAliveReplicasInDc(allReplicas);
65+
if (optimisticallyAliveReplicasInDc > pessimisticallyAliveReplicasInDc) {
66+
TokenRangeDiagnostic optimistic =
67+
new SimpleTokenRangeDiagnostic(
68+
range, requiredReplicas, optimisticallyAliveReplicasInDc);
69+
if (optimistic.isAvailable()) {
70+
throw new UnreliableTokenRangeDiagnosticException(range);
71+
}
72+
}
73+
}
74+
return pessimistic;
6375
}
6476

6577
@Override
@@ -68,4 +80,22 @@ protected TokenRingDiagnostic generateRingDiagnostic(
6880
return new DefaultTokenRingDiagnostic(
6981
keyspace, consistencyLevel, datacenter, tokenRangeDiagnostics);
7082
}
83+
84+
private int getPessimisticallyAliveReplicasInDc(Set<Node> allReplicas) {
85+
return (int)
86+
allReplicas.stream()
87+
.filter(this::isPessimisticallyUp)
88+
.map(Node::getDatacenter)
89+
.filter(datacenter::equals)
90+
.count();
91+
}
92+
93+
private int getOptimisticallyAliveReplicasInDc(Set<Node> allReplicas) {
94+
return (int)
95+
allReplicas.stream()
96+
.filter(this::isOptimisticallyUp)
97+
.map(Node::getDatacenter)
98+
.filter(datacenter::equals)
99+
.count();
100+
}
71101
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.metadata.diagnostic.ring;
17+
18+
import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
19+
import edu.umd.cs.findbugs.annotations.NonNull;
20+
21+
public class UnreliableTokenRangeDiagnosticException extends IllegalStateException {
22+
23+
private final TokenRange range;
24+
25+
public UnreliableTokenRangeDiagnosticException(@NonNull TokenRange range) {
26+
super("Cannot establish reliable diagnostic for range " + range.format());
27+
this.range = range;
28+
}
29+
30+
@NonNull
31+
public TokenRange getTokenRange() {
32+
return range;
33+
}
34+
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/diagnostic/ring/DefaultTokenRingDiagnosticGeneratorTest.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
import static com.datastax.oss.driver.api.core.ConsistencyLevel.QUORUM;
1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.catchThrowable;
2021
import static org.mockito.BDDMockito.given;
2122

2223
import com.datastax.oss.driver.api.core.CqlIdentifier;
2324
import com.datastax.oss.driver.api.core.metadata.Metadata;
2425
import com.datastax.oss.driver.api.core.metadata.Node;
2526
import com.datastax.oss.driver.api.core.metadata.NodeState;
2627
import com.datastax.oss.driver.api.core.metadata.TokenMap;
28+
import com.datastax.oss.driver.api.core.metadata.diagnostic.Status;
2729
import com.datastax.oss.driver.api.core.metadata.diagnostic.TokenRingDiagnostic;
2830
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
2931
import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
@@ -77,14 +79,15 @@ public void setUp() {
7779
}
7880

7981
@Test
80-
public void should_generate_diagnostic_for_non_local_CL() {
82+
public void should_generate_diagnostic_for_non_local_CL_when_diagnostic_reliable() {
8183
// given
8284
DefaultTokenRingDiagnosticGenerator generator =
8385
new DefaultTokenRingDiagnosticGenerator(metadata, ks, QUORUM, new ReplicationFactor(3));
8486
// when
8587
TokenRingDiagnostic tokenRingDiagnostic = generator.generate();
8688
// then
8789
assertThat(tokenRingDiagnostic).isExactlyInstanceOf(DefaultTokenRingDiagnostic.class);
90+
assertThat(tokenRingDiagnostic.getStatus()).isEqualTo(Status.UNAVAILABLE);
8891
assertThat(tokenRingDiagnostic)
8992
.isEqualTo(
9093
new DefaultTokenRingDiagnostic(
@@ -95,4 +98,46 @@ public void should_generate_diagnostic_for_non_local_CL() {
9598
new SimpleTokenRangeDiagnostic(tr1, 2, 2),
9699
new SimpleTokenRangeDiagnostic(tr2, 2, 1))));
97100
}
101+
102+
@Test
103+
public void should_not_generate_diagnostic_for_non_local_CL_when_diagnostic_unreliable() {
104+
// given
105+
given(node2.getState()).willReturn(NodeState.UNKNOWN); // makes diagnostic unreliable
106+
DefaultTokenRingDiagnosticGenerator generator =
107+
new DefaultTokenRingDiagnosticGenerator(metadata, ks, QUORUM, new ReplicationFactor(3));
108+
// when
109+
Throwable throwable = catchThrowable(generator::generate);
110+
// then
111+
assertThat(throwable)
112+
.isInstanceOf(UnreliableTokenRangeDiagnosticException.class)
113+
.hasMessageContaining("Cannot establish reliable diagnostic for range ]1,2]")
114+
.extracting("tokenRange")
115+
.isEqualTo(tr1);
116+
}
117+
118+
@Test
119+
public void
120+
should_generate_diagnostic_for_non_local_CL_when_node_state_is_unknown_but_diagnostic_reliable() {
121+
// given
122+
given(node2.getState()).willReturn(NodeState.DOWN);
123+
given(node3.getState()).willReturn(NodeState.DOWN);
124+
// does not affect diagnostic's reliability given that other nodes are down
125+
given(node4.getState()).willReturn(NodeState.UNKNOWN);
126+
DefaultTokenRingDiagnosticGenerator generator =
127+
new DefaultTokenRingDiagnosticGenerator(metadata, ks, QUORUM, new ReplicationFactor(3));
128+
// when
129+
TokenRingDiagnostic tokenRingDiagnostic = generator.generate();
130+
// then
131+
assertThat(tokenRingDiagnostic).isExactlyInstanceOf(DefaultTokenRingDiagnostic.class);
132+
assertThat(tokenRingDiagnostic.getStatus()).isEqualTo(Status.UNAVAILABLE);
133+
assertThat(tokenRingDiagnostic)
134+
.isEqualTo(
135+
new DefaultTokenRingDiagnostic(
136+
ks,
137+
QUORUM,
138+
null,
139+
ImmutableSet.of(
140+
new SimpleTokenRangeDiagnostic(tr1, 2, 1),
141+
new SimpleTokenRangeDiagnostic(tr2, 2, 0))));
142+
}
98143
}

0 commit comments

Comments
 (0)