Skip to content

Commit 60fa57b

Browse files
tomekl007adutra
authored andcommitted
JAVA-2566: Introduce specific metrics for Graph queries (apache#314)
1 parent cc8b9ed commit 60fa57b

16 files changed

Lines changed: 344 additions & 33 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### NGDG (in progress)
66

7+
- [improvement] JAVA-2566: Introduce specific metrics for Graph queries
78
- [improvement] JAVA-2556: Make ExecutionInfo compatible with any Request type
89
- [improvement] JAVA-2571: Revisit usages of DseGraph.g
910
- [improvement] JAVA-2558: Revisit GraphRequestHandler

core/src/main/java/com/datastax/dse/driver/api/core/config/DseDriverOption.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,46 @@ public enum DseDriverOption implements DriverOption {
207207
* <p>Value type: int
208208
*/
209209
GRAPH_CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES("advanced.graph.paging-options.max-enqueued-pages"),
210+
/**
211+
* The largest latency that we expect to record for graph requests.
212+
*
213+
* <p>Value-type: {@link java.time.Duration Duration}
214+
*/
215+
METRICS_SESSION_GRAPH_REQUESTS_HIGHEST("advanced.metrics.session.graph-requests.highest-latency"),
216+
/**
217+
* The number of significant decimal digits to which internal structures will maintain for graph
218+
* requests.
219+
*
220+
* <p>Value-type: int
221+
*/
222+
METRICS_SESSION_GRAPH_REQUESTS_DIGITS(
223+
"advanced.metrics.session.graph-requests.significant-digits"),
224+
/**
225+
* The interval at which percentile data is refreshed for graph requests.
226+
*
227+
* <p>Value-type: {@link java.time.Duration Duration}
228+
*/
229+
METRICS_SESSION_GRAPH_REQUESTS_INTERVAL(
230+
"advanced.metrics.session.graph-requests.refresh-interval"),
231+
/**
232+
* The largest latency that we expect to record for graph requests.
233+
*
234+
* <p>Value-type: {@link java.time.Duration Duration}
235+
*/
236+
METRICS_NODE_GRAPH_MESSAGES_HIGHEST("advanced.metrics.node.graph-messages.highest-latency"),
237+
/**
238+
* The number of significant decimal digits to which internal structures will maintain for graph
239+
* requests.
240+
*
241+
* <p>Value-type: int
242+
*/
243+
METRICS_NODE_GRAPH_MESSAGES_DIGITS("advanced.metrics.node.graph-messages.significant-digits"),
244+
/**
245+
* The interval at which percentile data is refreshed for graph requests.
246+
*
247+
* <p>Value-type: {@link java.time.Duration Duration}
248+
*/
249+
METRICS_NODE_GRAPH_MESSAGES_INTERVAL("advanced.metrics.node.graph-messages.refresh-interval"),
210250
;
211251

212252
private final String path;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.dse.driver.api.core.metrics;
17+
18+
import com.datastax.oss.driver.api.core.metrics.NodeMetric;
19+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
20+
import edu.umd.cs.findbugs.annotations.NonNull;
21+
import java.util.Map;
22+
23+
/** See {@code dse-reference.conf} for a description of each metric. */
24+
public enum DseNodeMetrics implements NodeMetric {
25+
GRAPH_MESSAGES("graph-messages");
26+
27+
private static final Map<String, DseNodeMetrics> BY_PATH = sortByPath();
28+
29+
private final String path;
30+
31+
DseNodeMetrics(String path) {
32+
this.path = path;
33+
}
34+
35+
@Override
36+
@NonNull
37+
public String getPath() {
38+
return path;
39+
}
40+
41+
@NonNull
42+
public static DseNodeMetrics fromPath(@NonNull String path) {
43+
DseNodeMetrics metric = BY_PATH.get(path);
44+
if (metric == null) {
45+
throw new IllegalArgumentException("Unknown node metric path " + path);
46+
}
47+
return metric;
48+
}
49+
50+
private static Map<String, DseNodeMetrics> sortByPath() {
51+
ImmutableMap.Builder<String, DseNodeMetrics> result = ImmutableMap.builder();
52+
for (DseNodeMetrics value : values()) {
53+
result.put(value.getPath(), value);
54+
}
55+
return result.build();
56+
}
57+
}

core/src/main/java/com/datastax/dse/driver/api/core/metrics/DseSessionMetric.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
/** See {@code dse-reference.conf} for a description of each metric. */
2424
public enum DseSessionMetric implements SessionMetric {
2525
CONTINUOUS_CQL_REQUESTS("continuous-cql-requests"),
26+
GRAPH_REQUESTS("graph-requests"),
27+
GRAPH_CLIENT_TIMEOUTS("graph-client-timeouts"),
2628
;
2729

2830
private static final Map<String, DseSessionMetric> BY_PATH = sortByPath();

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.dse.driver.internal.core.cql.continuous;
1717

18+
import com.datastax.dse.driver.DseSessionMetric;
1819
import com.datastax.dse.driver.api.core.config.DseDriverOption;
1920
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
2021
import com.datastax.dse.driver.internal.core.cql.DseConversions;
@@ -23,6 +24,8 @@
2324
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
2425
import com.datastax.oss.driver.api.core.cql.Row;
2526
import com.datastax.oss.driver.api.core.cql.Statement;
27+
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
28+
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
2629
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2730
import com.datastax.oss.driver.internal.core.cql.DefaultRow;
2831
import com.datastax.oss.driver.internal.core.session.DefaultSession;
@@ -55,7 +58,15 @@ public class ContinuousCqlRequestHandler
5558
@NonNull DefaultSession session,
5659
@NonNull InternalDriverContext context,
5760
@NonNull String sessionLogPrefix) {
58-
super(statement, session, context, sessionLogPrefix, false);
61+
super(
62+
statement,
63+
session,
64+
context,
65+
sessionLogPrefix,
66+
false,
67+
DefaultSessionMetric.CQL_CLIENT_TIMEOUTS,
68+
DseSessionMetric.CONTINUOUS_CQL_REQUESTS,
69+
DefaultNodeMetric.CQL_MESSAGES);
5970
message = DseConversions.toContinuousPagingMessage(statement, executionProfile, context);
6071
firstPageTimeout =
6172
executionProfile.getDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE);

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import com.datastax.oss.driver.api.core.metadata.Node;
2828
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
2929
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
30+
import com.datastax.oss.driver.api.core.metrics.NodeMetric;
31+
import com.datastax.oss.driver.api.core.metrics.SessionMetric;
3032
import com.datastax.oss.driver.api.core.retry.RetryDecision;
3133
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
3234
import com.datastax.oss.driver.api.core.servererrors.BootstrappingException;
@@ -161,6 +163,9 @@ public abstract class ContinuousRequestHandlerBase<StatementT extends Request, R
161163
*/
162164
private final ReentrantLock lock = new ReentrantLock();
163165

166+
private final SessionMetric clientTimeoutsMetric;
167+
private final SessionMetric continuousRequestsMetric;
168+
private final NodeMetric messagesMetric;
164169
/**
165170
* The page queue, storing pages that we have received and have not been consumed by the client
166171
* yet. It can also store errors, when the operation completed exceptionally.
@@ -199,14 +204,20 @@ public ContinuousRequestHandlerBase(
199204
@NonNull DefaultSession session,
200205
@NonNull InternalDriverContext context,
201206
@NonNull String sessionLogPrefix,
202-
boolean specExecEnabled) {
207+
boolean specExecEnabled,
208+
SessionMetric clientTimeoutsMetric,
209+
SessionMetric continuousRequestsMetric,
210+
NodeMetric messagesMetric) {
203211
ProtocolVersion protocolVersion = context.getProtocolVersion();
204212
if (!context
205213
.getProtocolVersionRegistry()
206214
.supports(protocolVersion, DseProtocolFeature.CONTINUOUS_PAGING)) {
207215
throw new IllegalStateException(
208216
"Cannot execute continuous paging requests with protocol version " + protocolVersion);
209217
}
218+
this.clientTimeoutsMetric = clientTimeoutsMetric;
219+
this.continuousRequestsMetric = continuousRequestsMetric;
220+
this.messagesMetric = messagesMetric;
210221
this.logPrefix = sessionLogPrefix + "|" + this.hashCode();
211222
LOG.trace("[{}] Creating new continuous handler for request {}", logPrefix, statement);
212223
this.statement = statement;
@@ -400,6 +411,12 @@ private class NodeResponseCallback
400411

401412
private ColumnDefinitions columnDefinitions;
402413

414+
// SpeculativeExecution node metrics should be executed only for the first page (first
415+
// invocation)
416+
private final AtomicBoolean stopNodeMessageTimerReported = new AtomicBoolean(false);
417+
private final AtomicBoolean nodeErrorReported = new AtomicBoolean(false);
418+
private final AtomicBoolean nodeSuccessReported = new AtomicBoolean(false);
419+
403420
NodeResponseCallback(
404421
Node node,
405422
DriverChannel channel,
@@ -963,9 +980,10 @@ private void logServerWarnings(List<String> warnings) {
963980

964981
private void stopNodeMessageTimer() {
965982
NodeMetricUpdater nodeMetricUpdater = ((DefaultNode) node).getMetricUpdater();
966-
if (nodeMetricUpdater.isEnabled(DefaultNodeMetric.CQL_MESSAGES, executionProfile.getName())) {
983+
if (nodeMetricUpdater.isEnabled(messagesMetric, executionProfile.getName())
984+
&& stopNodeMessageTimerReported.compareAndSet(false, true)) {
967985
nodeMetricUpdater.updateTimer(
968-
DefaultNodeMetric.CQL_MESSAGES,
986+
messagesMetric,
969987
executionProfile.getName(),
970988
System.nanoTime() - nodeStartTimeNanos,
971989
TimeUnit.NANOSECONDS);
@@ -1005,15 +1023,17 @@ private void updateNodeErrorMetrics(
10051023

10061024
private void trackNodeSuccess() {
10071025
RequestTracker requestTracker = context.getRequestTracker();
1008-
if (!(requestTracker instanceof NoopRequestTracker)) {
1026+
if (!(requestTracker instanceof NoopRequestTracker)
1027+
&& nodeSuccessReported.compareAndSet(false, true)) {
10091028
long latencyNanos = System.nanoTime() - nodeStartTimeNanos;
10101029
requestTracker.onNodeSuccess(statement, latencyNanos, executionProfile, node, logPrefix);
10111030
}
10121031
}
10131032

10141033
private void trackNodeError(@NonNull Throwable error) {
10151034
RequestTracker requestTracker = context.getRequestTracker();
1016-
if (!(requestTracker instanceof NoopRequestTracker)) {
1035+
if (!(requestTracker instanceof NoopRequestTracker)
1036+
&& nodeErrorReported.compareAndSet(false, true)) {
10171037
long latencyNanos = System.nanoTime() - nodeStartTimeNanos;
10181038
requestTracker.onNodeError(
10191039
statement, error, latencyNanos, executionProfile, node, logPrefix);
@@ -1460,8 +1480,7 @@ private void setCompleted(@NonNull NodeResponseCallback callback) {
14601480
RequestTracker requestTracker = context.getRequestTracker();
14611481
boolean requestTrackerEnabled = !(requestTracker instanceof NoopRequestTracker);
14621482
boolean metricEnabled =
1463-
sessionMetricUpdater.isEnabled(
1464-
DseSessionMetric.CONTINUOUS_CQL_REQUESTS, executionProfile.getName());
1483+
sessionMetricUpdater.isEnabled(continuousRequestsMetric, executionProfile.getName());
14651484
if (requestTrackerEnabled || metricEnabled) {
14661485
long now = System.nanoTime();
14671486
long totalLatencyNanos = now - startTimeNanos;
@@ -1471,7 +1490,7 @@ private void setCompleted(@NonNull NodeResponseCallback callback) {
14711490
}
14721491
if (metricEnabled) {
14731492
sessionMetricUpdater.updateTimer(
1474-
DseSessionMetric.CONTINUOUS_CQL_REQUESTS,
1493+
continuousRequestsMetric,
14751494
executionProfile.getName(),
14761495
totalLatencyNanos,
14771496
TimeUnit.NANOSECONDS);
@@ -1520,10 +1539,8 @@ private void setFailed(@Nullable NodeResponseCallback callback, @NonNull Throwab
15201539
}
15211540
if (error instanceof DriverTimeoutException) {
15221541
throttler.signalTimeout(this);
1523-
if (sessionMetricUpdater.isEnabled(
1524-
DefaultSessionMetric.CQL_CLIENT_TIMEOUTS, executionProfile.getName())) {
1525-
sessionMetricUpdater.incrementCounter(
1526-
DefaultSessionMetric.CQL_CLIENT_TIMEOUTS, executionProfile.getName());
1542+
if (sessionMetricUpdater.isEnabled(clientTimeoutsMetric, executionProfile.getName())) {
1543+
sessionMetricUpdater.incrementCounter(clientTimeoutsMetric, executionProfile.getName());
15271544
}
15281545
} else if (!(error instanceof RequestThrottlingException)) {
15291546
throttler.signalError(this, error);

core/src/main/java/com/datastax/dse/driver/internal/core/graph/ContinuousGraphRequestHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
*/
77
package com.datastax.dse.driver.internal.core.graph;
88

9+
import com.datastax.dse.driver.DseNodeMetrics;
10+
import com.datastax.dse.driver.DseSessionMetric;
911
import com.datastax.dse.driver.api.core.config.DseDriverOption;
1012
import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet;
1113
import com.datastax.dse.driver.api.core.graph.GraphNode;
@@ -52,7 +54,15 @@ public class ContinuousGraphRequestHandler
5254
@NonNull String sessionLogPrefix,
5355
@NonNull GraphBinaryModule graphBinaryModule,
5456
@NonNull GraphSupportChecker graphSupportChecker) {
55-
super(statement, session, context, sessionLogPrefix, true);
57+
super(
58+
statement,
59+
session,
60+
context,
61+
sessionLogPrefix,
62+
true,
63+
DseSessionMetric.GRAPH_CLIENT_TIMEOUTS,
64+
DseSessionMetric.GRAPH_REQUESTS,
65+
DseNodeMetrics.GRAPH_MESSAGES);
5666
this.graphBinaryModule = graphBinaryModule;
5767
subProtocol = graphSupportChecker.inferGraphProtocol(statement, executionProfile, context);
5868
message =

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet;
2020
import com.datastax.dse.driver.api.core.graph.GraphNode;
2121
import com.datastax.dse.driver.api.core.graph.GraphStatement;
22+
import com.datastax.dse.driver.api.core.metrics.DseNodeMetrics;
2223
import com.datastax.dse.driver.api.core.metrics.DseSessionMetric;
2324
import com.datastax.dse.driver.internal.core.graph.binary.GraphBinaryModule;
2425
import com.datastax.oss.driver.api.core.AllNodesFailedException;
@@ -365,13 +366,13 @@ private void setFinalResult(
365366
statement, totalLatencyNanos, executionProfile, callback.node, logPrefix);
366367
}
367368
if (sessionMetricUpdater.isEnabled(
368-
DseSessionMetric.CONTINUOUS_CQL_REQUESTS, executionProfile.getName())) {
369+
DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) {
369370
if (completionTimeNanos == NANOTIME_NOT_MEASURED_YET) {
370371
completionTimeNanos = System.nanoTime();
371372
totalLatencyNanos = completionTimeNanos - startTimeNanos;
372373
}
373374
sessionMetricUpdater.updateTimer(
374-
DseSessionMetric.CONTINUOUS_CQL_REQUESTS,
375+
DseSessionMetric.GRAPH_REQUESTS,
375376
executionProfile.getName(),
376377
totalLatencyNanos,
377378
TimeUnit.NANOSECONDS);
@@ -462,7 +463,7 @@ private void setFinalError(Throwable error, Node node, int execution) {
462463
if (error instanceof DriverTimeoutException) {
463464
throttler.signalTimeout(this);
464465
sessionMetricUpdater.incrementCounter(
465-
DefaultSessionMetric.CQL_CLIENT_TIMEOUTS, executionProfile.getName());
466+
DseSessionMetric.GRAPH_CLIENT_TIMEOUTS, executionProfile.getName());
466467
} else if (!(error instanceof RequestThrottlingException)) {
467468
throttler.signalError(this, error);
468469
}
@@ -593,11 +594,11 @@ private void scheduleSpeculativeExecution(int index, long delay) {
593594
public void onResponse(Frame responseFrame) {
594595
long nodeResponseTimeNanos = NANOTIME_NOT_MEASURED_YET;
595596
NodeMetricUpdater nodeMetricUpdater = ((DefaultNode) node).getMetricUpdater();
596-
if (nodeMetricUpdater.isEnabled(DefaultNodeMetric.CQL_MESSAGES, executionProfile.getName())) {
597+
if (nodeMetricUpdater.isEnabled(DseNodeMetrics.GRAPH_MESSAGES, executionProfile.getName())) {
597598
nodeResponseTimeNanos = System.nanoTime();
598599
long nodeLatency = System.nanoTime() - nodeStartTimeNanos;
599600
nodeMetricUpdater.updateTimer(
600-
DefaultNodeMetric.CQL_MESSAGES,
601+
DseNodeMetrics.GRAPH_MESSAGES,
601602
executionProfile.getName(),
602603
nodeLatency,
603604
TimeUnit.NANOSECONDS);

core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricsFactory.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.datastax.oss.driver.internal.core.metrics;
1717

1818
import com.codahale.metrics.MetricRegistry;
19+
import com.datastax.dse.driver.api.core.metrics.DseNodeMetrics;
1920
import com.datastax.dse.driver.api.core.metrics.DseSessionMetric;
2021
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2122
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
@@ -28,7 +29,6 @@
2829
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2930
import edu.umd.cs.findbugs.annotations.Nullable;
3031
import java.util.Collections;
31-
import java.util.EnumSet;
3232
import java.util.HashSet;
3333
import java.util.List;
3434
import java.util.Optional;
@@ -107,12 +107,16 @@ protected Set<SessionMetric> parseSessionMetricPaths(List<String> paths) {
107107
}
108108

109109
protected Set<NodeMetric> parseNodeMetricPaths(List<String> paths) {
110-
EnumSet<DefaultNodeMetric> result = EnumSet.noneOf(DefaultNodeMetric.class);
110+
Set<NodeMetric> result = new HashSet<>();
111111
for (String path : paths) {
112112
try {
113113
result.add(DefaultNodeMetric.fromPath(path));
114114
} catch (IllegalArgumentException e) {
115-
LOG.warn("[{}] Unknown node metric {}, skipping", logPrefix, path);
115+
try {
116+
result.add(DseNodeMetrics.fromPath(path));
117+
} catch (IllegalArgumentException e1) {
118+
LOG.warn("[{}] Unknown node metric {}, skipping", logPrefix, path);
119+
}
116120
}
117121
}
118122
return Collections.unmodifiableSet(result);

0 commit comments

Comments
 (0)