Skip to content

Commit 61d59c2

Browse files
committed
[fixup] add throttling metrics
1 parent c70a85d commit 61d59c2

19 files changed

Lines changed: 170 additions & 28 deletions

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ public enum DefaultDriverOption implements DriverOption {
126126
METRICS_SESSION_CQL_REQUESTS_HIGHEST("metrics.session.cql-requests.highest-latency", false),
127127
METRICS_SESSION_CQL_REQUESTS_DIGITS("metrics.session.cql-requests.significant-digits", false),
128128
METRICS_SESSION_CQL_REQUESTS_INTERVAL("metrics.session.cql-requests.refresh-interval", false),
129+
METRICS_SESSION_THROTTLING_HIGHEST("metrics.session.throttling.delay.highest-latency", false),
130+
METRICS_SESSION_THROTTLING_DIGITS("metrics.session.throttling.delay.significant-digits", false),
131+
METRICS_SESSION_THROTTLING_INTERVAL("metrics.session.throttling.delay.refresh-interval", false),
129132
METRICS_NODE_CQL_MESSAGES_HIGHEST("metrics.node.cql-messages.highest-latency", false),
130133
METRICS_NODE_CQL_MESSAGES_DIGITS("metrics.node.cql-messages.significant-digits", false),
131134
METRICS_NODE_CQL_MESSAGES_INTERVAL("metrics.node.cql-messages.refresh-interval", false),

core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ public enum DefaultSessionMetric implements SessionMetric {
2323
CONNECTED_NODES("connected-nodes"),
2424
CQL_REQUESTS("cql-requests"),
2525
CQL_CLIENT_TIMEOUTS("cql-client-timeouts"),
26+
THROTTLING_DELAY("throttling.delay"),
27+
THROTTLING_QUEUE_SIZE("throttling.queue-size"),
28+
THROTTLING_ERRORS("throttling.errors"),
2629
;
2730

2831
private static final Map<String, DefaultSessionMetric> BY_PATH = sortByPath();

core/src/main/java/com/datastax/oss/driver/internal/core/adminrequest/ThrottledAdminRequestHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,37 @@
1717

1818
import com.datastax.oss.driver.api.core.DriverTimeoutException;
1919
import com.datastax.oss.driver.api.core.RequestThrottlingException;
20+
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
2021
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
22+
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
2123
import com.datastax.oss.driver.internal.core.session.throttling.RequestThrottler;
2224
import com.datastax.oss.driver.internal.core.session.throttling.Throttled;
2325
import com.datastax.oss.protocol.internal.Message;
2426
import java.nio.ByteBuffer;
2527
import java.time.Duration;
2628
import java.util.Map;
2729
import java.util.concurrent.CompletionStage;
30+
import java.util.concurrent.TimeUnit;
2831

2932
public class ThrottledAdminRequestHandler extends AdminRequestHandler implements Throttled {
3033

34+
private final long startTimeNanos;
3135
private final RequestThrottler throttler;
36+
private final SessionMetricUpdater metricUpdater;
3237

3338
public ThrottledAdminRequestHandler(
3439
DriverChannel channel,
3540
Message message,
3641
Map<String, ByteBuffer> customPayload,
3742
Duration timeout,
3843
RequestThrottler throttler,
44+
SessionMetricUpdater metricUpdater,
3945
String logPrefix,
4046
String debugString) {
4147
super(channel, message, customPayload, timeout, logPrefix, debugString);
48+
this.startTimeNanos = System.nanoTime();
4249
this.throttler = throttler;
50+
this.metricUpdater = metricUpdater;
4351
}
4452

4553
@Override
@@ -50,12 +58,19 @@ public CompletionStage<AdminResult> start() {
5058
}
5159

5260
@Override
53-
public void onThrottleReady() {
61+
public void onThrottleReady(boolean wasDelayed) {
62+
if (wasDelayed) {
63+
metricUpdater.updateTimer(
64+
DefaultSessionMetric.THROTTLING_DELAY,
65+
System.nanoTime() - startTimeNanos,
66+
TimeUnit.NANOSECONDS);
67+
}
5468
super.start();
5569
}
5670

5771
@Override
5872
public void onThrottleFailure(RequestThrottlingException error) {
73+
metricUpdater.incrementCounter(DefaultSessionMetric.THROTTLING_ERRORS);
5974
setFinalError(error);
6075
}
6176

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandlerBase.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
2727
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
2828
import com.datastax.oss.driver.api.core.metadata.Node;
29+
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
2930
import com.datastax.oss.driver.api.core.retry.RetryDecision;
3031
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
3132
import com.datastax.oss.driver.api.core.servererrors.BootstrappingException;
@@ -75,6 +76,7 @@ public abstract class CqlPrepareHandlerBase implements Throttled {
7576

7677
private static final Logger LOG = LoggerFactory.getLogger(CqlPrepareHandlerBase.class);
7778

79+
private final long startTimeNanos;
7880
private final String logPrefix;
7981
private final PrepareRequest request;
8082
private final ConcurrentMap<ByteBuffer, DefaultPreparedStatement> preparedStatementsCache;
@@ -102,6 +104,7 @@ protected CqlPrepareHandlerBase(
102104
InternalDriverContext context,
103105
String sessionLogPrefix) {
104106

107+
this.startTimeNanos = System.nanoTime();
105108
this.logPrefix = sessionLogPrefix + "|" + this.hashCode();
106109
LOG.debug("[{}] Creating new handler for prepare request {}", logPrefix, request);
107110

@@ -157,7 +160,15 @@ protected CqlPrepareHandlerBase(
157160
}
158161

159162
@Override
160-
public void onThrottleReady() {
163+
public void onThrottleReady(boolean wasDelayed) {
164+
if (wasDelayed) {
165+
session
166+
.getMetricUpdater()
167+
.updateTimer(
168+
DefaultSessionMetric.THROTTLING_DELAY,
169+
System.nanoTime() - startTimeNanos,
170+
TimeUnit.NANOSECONDS);
171+
}
161172
sendRequest(null, 0);
162173
}
163174

@@ -304,6 +315,7 @@ private CompletionStage<Void> prepareOnOtherNode(Node node) {
304315
request.getCustomPayload(),
305316
timeout,
306317
throttler,
318+
session.getMetricUpdater(),
307319
logPrefix,
308320
message.toString());
309321
return handler
@@ -323,6 +335,7 @@ private CompletionStage<Void> prepareOnOtherNode(Node node) {
323335

324336
@Override
325337
public void onThrottleFailure(RequestThrottlingException error) {
338+
session.getMetricUpdater().incrementCounter(DefaultSessionMetric.THROTTLING_ERRORS);
326339
setFinalError(error);
327340
}
328341

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerBase.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,15 @@ protected CqlRequestHandlerBase(
184184
}
185185

186186
@Override
187-
public void onThrottleReady() {
187+
public void onThrottleReady(boolean wasDelayed) {
188+
if (wasDelayed) {
189+
session
190+
.getMetricUpdater()
191+
.updateTimer(
192+
DefaultSessionMetric.THROTTLING_DELAY,
193+
System.nanoTime() - startTimeNanos,
194+
TimeUnit.NANOSECONDS);
195+
}
188196
sendRequest(null, 0, 0, true);
189197
}
190198

@@ -316,6 +324,7 @@ private ExecutionInfo buildExecutionInfo(
316324

317325
@Override
318326
public void onThrottleFailure(RequestThrottlingException error) {
327+
session.getMetricUpdater().incrementCounter(DefaultSessionMetric.THROTTLING_ERRORS);
319328
setFinalError(error);
320329
}
321330

@@ -498,6 +507,7 @@ private void processErrorResponse(Error errorMessage) {
498507
repreparePayload.customPayload,
499508
timeout,
500509
throttler,
510+
session.getMetricUpdater(),
501511
logPrefix,
502512
"Reprepare " + reprepareMessage.toString());
503513
reprepareHandler

core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DefaultMetricUpdaterFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,23 @@ public class DefaultMetricUpdaterFactory implements MetricUpdaterFactory {
3636

3737
private final String logPrefix;
3838
private final InternalDriverContext context;
39-
private final Set<SessionMetric> enabledSessionMetrics;
4039
private final Set<NodeMetric> enabledNodeMetrics;
40+
private final SessionMetricUpdater sessionMetricUpdater;
4141

4242
public DefaultMetricUpdaterFactory(InternalDriverContext context) {
4343
this.logPrefix = context.sessionName();
4444
this.context = context;
4545
DriverConfigProfile config = context.config().getDefaultProfile();
46-
this.enabledSessionMetrics =
46+
Set<SessionMetric> enabledSessionMetrics =
4747
parseSessionMetricPaths(config.getStringList(DefaultDriverOption.METRICS_SESSION_ENABLED));
48+
this.sessionMetricUpdater = new DefaultSessionMetricUpdater(enabledSessionMetrics, context);
4849
this.enabledNodeMetrics =
4950
parseNodeMetricPaths(config.getStringList(DefaultDriverOption.METRICS_NODE_ENABLED));
5051
}
5152

5253
@Override
53-
public SessionMetricUpdater newSessionUpdater() {
54-
return new DefaultSessionMetricUpdater(enabledSessionMetrics, context);
54+
public SessionMetricUpdater getSessionUpdater() {
55+
return sessionMetricUpdater;
5556
}
5657

5758
@Override

core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DefaultSessionMetricUpdater.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,18 @@
2121
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
2222
import com.datastax.oss.driver.api.core.metrics.SessionMetric;
2323
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
24+
import com.datastax.oss.driver.internal.core.session.throttling.ConcurrencyLimitingRequestThrottler;
25+
import com.datastax.oss.driver.internal.core.session.throttling.RateLimitingRequestThrottler;
26+
import com.datastax.oss.driver.internal.core.session.throttling.RequestThrottler;
2427
import java.util.Set;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2530

2631
public class DefaultSessionMetricUpdater extends MetricUpdaterBase<SessionMetric>
2732
implements SessionMetricUpdater {
2833

34+
private static final Logger LOG = LoggerFactory.getLogger(DefaultSessionMetricUpdater.class);
35+
2936
private final String metricNamePrefix;
3037

3138
public DefaultSessionMetricUpdater(
@@ -47,17 +54,44 @@ public DefaultSessionMetricUpdater(
4754
return count;
4855
});
4956
}
57+
if (enabledMetrics.contains(DefaultSessionMetric.THROTTLING_QUEUE_SIZE)) {
58+
metricRegistry.register(
59+
buildFullName(DefaultSessionMetric.THROTTLING_QUEUE_SIZE),
60+
buildQueueGauge(context.requestThrottler(), context.sessionName()));
61+
}
5062
initializeHdrTimer(
5163
DefaultSessionMetric.CQL_REQUESTS,
5264
context.config().getDefaultProfile(),
5365
DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_HIGHEST,
5466
DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_DIGITS,
5567
DefaultDriverOption.METRICS_SESSION_CQL_REQUESTS_INTERVAL);
5668
initializeDefaultCounter(DefaultSessionMetric.CQL_CLIENT_TIMEOUTS);
69+
initializeHdrTimer(
70+
DefaultSessionMetric.THROTTLING_DELAY,
71+
context.config().getDefaultProfile(),
72+
DefaultDriverOption.METRICS_SESSION_THROTTLING_HIGHEST,
73+
DefaultDriverOption.METRICS_SESSION_THROTTLING_DIGITS,
74+
DefaultDriverOption.METRICS_SESSION_THROTTLING_INTERVAL);
75+
initializeDefaultCounter(DefaultSessionMetric.THROTTLING_ERRORS);
5776
}
5877

5978
@Override
6079
protected String buildFullName(SessionMetric metric) {
6180
return metricNamePrefix + metric.getPath();
6281
}
82+
83+
private Gauge<Integer> buildQueueGauge(RequestThrottler requestThrottler, String logPrefix) {
84+
if (requestThrottler instanceof ConcurrencyLimitingRequestThrottler) {
85+
return ((ConcurrencyLimitingRequestThrottler) requestThrottler)::getQueueSize;
86+
} else if (requestThrottler instanceof RateLimitingRequestThrottler) {
87+
return ((RateLimitingRequestThrottler) requestThrottler)::getQueueSize;
88+
} else {
89+
LOG.warn(
90+
"[{}] Metric {} does not support {}, it will always return 0",
91+
logPrefix,
92+
DefaultSessionMetric.THROTTLING_QUEUE_SIZE.getPath(),
93+
requestThrottler.getClass().getName());
94+
return () -> 0;
95+
}
96+
}
6397
}

core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdaterFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
public interface MetricUpdaterFactory {
2121

22-
SessionMetricUpdater newSessionUpdater();
22+
/** @return the unique instance for this session (this must return the same object every time). */
23+
SessionMetricUpdater getSessionUpdater();
2324

2425
NodeMetricUpdater newNodeUpdater(Node node);
2526
}

core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private DefaultSession(
104104
this.processorRegistry = context.requestProcessorRegistry();
105105
this.poolManager = context.poolManager();
106106
this.logPrefix = context.sessionName();
107-
this.metricUpdater = context.metricUpdaterFactory().newSessionUpdater();
107+
this.metricUpdater = context.metricUpdaterFactory().getSessionUpdater();
108108
}
109109

110110
private CompletionStage<CqlSession> init(CqlIdentifier keyspace) {

core/src/main/java/com/datastax/oss/driver/internal/core/session/ReprepareOnUp.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
2424
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2525
import com.datastax.oss.driver.internal.core.cql.CqlRequestHandlerBase;
26+
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
2627
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
2728
import com.datastax.oss.driver.internal.core.session.throttling.RequestThrottler;
2829
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
@@ -69,6 +70,7 @@ class ReprepareOnUp {
6970
private final int maxParallelism;
7071
private final Duration timeout;
7172
private final RequestThrottler throttler;
73+
private final SessionMetricUpdater metricUpdater;
7274

7375
// After the constructor, everything happens on the channel's event loop, so these fields do not
7476
// need any synchronization.
@@ -97,6 +99,8 @@ class ReprepareOnUp {
9799
config.getDefaultProfile().getInt(DefaultDriverOption.REPREPARE_MAX_STATEMENTS);
98100
this.maxParallelism =
99101
config.getDefaultProfile().getInt(DefaultDriverOption.REPREPARE_MAX_PARALLELISM);
102+
103+
this.metricUpdater = context.metricUpdaterFactory().getSessionUpdater();
100104
}
101105

102106
void start() {
@@ -233,7 +237,14 @@ protected CompletionStage<AdminResult> queryAsync(
233237
Message message, Map<String, ByteBuffer> customPayload, String debugString) {
234238
ThrottledAdminRequestHandler reprepareHandler =
235239
new ThrottledAdminRequestHandler(
236-
channel, message, customPayload, timeout, throttler, logPrefix, debugString);
240+
channel,
241+
message,
242+
customPayload,
243+
timeout,
244+
throttler,
245+
metricUpdater,
246+
logPrefix,
247+
debugString);
237248
return reprepareHandler.start();
238249
}
239250
}

0 commit comments

Comments
 (0)