Skip to content

Commit d4e5ea4

Browse files
committed
JAVA-1523: Add query logger
1 parent b92b80b commit d4e5ea4

22 files changed

Lines changed: 1492 additions & 28 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.0.0-alpha4 (in progress)
66

7+
- [new feature] JAVA-1523: Add query logger
78
- [improvement] JAVA-1801: Revisit NodeStateListener and SchemaChangeListener APIs
89
- [improvement] JAVA-1759: Revisit metrics API
910
- [improvement] JAVA-1776: Use concurrency annotations

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,17 @@ public enum DefaultDriverOption implements DriverOption {
6666
REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND("request.throttler.max-requests-per-second", false),
6767
REQUEST_THROTTLER_MAX_QUEUE_SIZE("request.throttler.max-queue-size", false),
6868
REQUEST_THROTTLER_DRAIN_INTERVAL("request.throttler.drain-interval", false),
69+
REQUEST_TRACKER_CLASS("request.tracker.class", false),
70+
71+
REQUEST_LOGGER_SUCCESS_ENABLED("request.tracker.logs.success.enabled", false),
72+
REQUEST_LOGGER_SLOW_THRESHOLD("request.tracker.logs.slow.threshold", false),
73+
REQUEST_LOGGER_SLOW_ENABLED("request.tracker.logs.slow.enabled", false),
74+
REQUEST_LOGGER_ERROR_ENABLED("request.tracker.logs.error.enabled", false),
75+
REQUEST_LOGGER_MAX_QUERY_LENGTH("request.tracker.logs.max-query-length", false),
76+
REQUEST_LOGGER_VALUES("request.tracker.logs.show-values", false),
77+
REQUEST_LOGGER_MAX_VALUE_LENGTH("request.tracker.logs.max-value-length", false),
78+
REQUEST_LOGGER_MAX_VALUES("request.tracker.logs.max-values", false),
79+
REQUEST_LOGGER_STACK_TRACES("request.tracker.logs.show-stack-traces", false),
6980

7081
CONTROL_CONNECTION_TIMEOUT("connection.control-connection.timeout", true),
7182
CONTROL_CONNECTION_AGREEMENT_INTERVAL(

core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.datastax.oss.driver.api.core.context.DriverContext;
2424
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
2525
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
26+
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
2627
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
2728
import com.datastax.oss.driver.internal.core.ContactPoints;
2829
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
@@ -59,6 +60,7 @@ public abstract class SessionBuilder<SelfT extends SessionBuilder, SessionT> {
5960
protected List<TypeCodec<?>> typeCodecs = new ArrayList<>();
6061
private NodeStateListener nodeStateListener;
6162
private SchemaChangeListener schemaChangeListener;
63+
protected RequestTracker requestTracker;
6264
protected CqlIdentifier keyspace;
6365

6466
/**
@@ -157,6 +159,17 @@ public SelfT withSchemaChangeListener(SchemaChangeListener schemaChangeListener)
157159
return self;
158160
}
159161

162+
/**
163+
* Register a request tracker to use with the session.
164+
*
165+
* <p>If the tracker is specified programmatically with this method, it overrides the
166+
* configuration (that is, the {@code request.tracker.class} option will be ignored).
167+
*/
168+
public SelfT withRequestTracker(RequestTracker requestTracker) {
169+
this.requestTracker = requestTracker;
170+
return self;
171+
}
172+
160173
/**
161174
* Sets the keyspace to connect the session to.
162175
*
@@ -216,7 +229,8 @@ protected final CompletionStage<CqlSession> buildDefaultSessionAsync() {
216229

217230
return DefaultSession.init(
218231
(InternalDriverContext)
219-
buildContext(configLoader, typeCodecs, nodeStateListener, schemaChangeListener),
232+
buildContext(
233+
configLoader, typeCodecs, nodeStateListener, schemaChangeListener, requestTracker),
220234
contactPoints,
221235
keyspace);
222236
}
@@ -229,9 +243,10 @@ protected DriverContext buildContext(
229243
DriverConfigLoader configLoader,
230244
List<TypeCodec<?>> typeCodecs,
231245
NodeStateListener nodeStateListener,
232-
SchemaChangeListener schemaChangeListener) {
246+
SchemaChangeListener schemaChangeListener,
247+
RequestTracker requestTracker) {
233248
return new DefaultDriverContext(
234-
configLoader, typeCodecs, nodeStateListener, schemaChangeListener);
249+
configLoader, typeCodecs, nodeStateListener, schemaChangeListener, requestTracker);
235250
}
236251

237252
private static <T> T buildIfNull(T value, Supplier<T> builder) {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.api.core.tracker;
17+
18+
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
19+
import com.datastax.oss.driver.api.core.metadata.Node;
20+
import com.datastax.oss.driver.api.core.session.Request;
21+
import com.datastax.oss.driver.api.core.session.Session;
22+
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
23+
24+
/** Tracks request execution for a session. */
25+
public interface RequestTracker extends AutoCloseable {
26+
27+
/**
28+
* Invoked each time a request succeeds.
29+
*
30+
* @param latencyNanos the overall execution time (from the {@link Session#execute(Request,
31+
* GenericType) session.execute} call until the result is made available to the client).
32+
* @param configProfile the configuration profile that this request was executed with.
33+
* @param node the node that returned the successful response.
34+
*/
35+
void onSuccess(Request request, long latencyNanos, DriverConfigProfile configProfile, Node node);
36+
37+
/**
38+
* Invoked each time a request fails.
39+
*
40+
* @param latencyNanos the overall execution time (from the {@link Session#execute(Request,
41+
* GenericType) session.execute} call until the error is propagated to the client).
42+
* @param configProfile the configuration profile that this request was executed with.
43+
* @param node the node that returned the error response, or {@code null} if the error occurred
44+
*/
45+
void onError(
46+
Request request,
47+
Throwable error,
48+
long latencyNanos,
49+
DriverConfigProfile configProfile,
50+
Node node);
51+
}

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
3232
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
3333
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
34+
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
3435
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
3536
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
3637
import com.datastax.oss.driver.internal.core.CassandraProtocolVersionRegistry;
@@ -169,6 +170,7 @@ public class DefaultDriverContext implements InternalDriverContext {
169170
new LazyReference<>("requestThrottler", this::buildRequestThrottler, cycleDetector);
170171
private final LazyReference<NodeStateListener> nodeStateListenerRef;
171172
private final LazyReference<SchemaChangeListener> schemaChangeListenerRef;
173+
private final LazyReference<RequestTracker> requestTrackerRef;
172174

173175
private final DriverConfig config;
174176
private final DriverConfigLoader configLoader;
@@ -177,12 +179,14 @@ public class DefaultDriverContext implements InternalDriverContext {
177179
private final String sessionName;
178180
private final NodeStateListener nodeStateListenerFromBuilder;
179181
private final SchemaChangeListener schemaChangeListenerFromBuilder;
182+
private final RequestTracker requestTrackerFromBuilder;
180183

181184
public DefaultDriverContext(
182185
DriverConfigLoader configLoader,
183186
List<TypeCodec<?>> typeCodecs,
184187
NodeStateListener nodeStateListener,
185-
SchemaChangeListener schemaChangeListener) {
188+
SchemaChangeListener schemaChangeListener,
189+
RequestTracker requestTracker) {
186190
this.config = configLoader.getInitialConfig();
187191
this.configLoader = configLoader;
188192
DriverConfigProfile defaultProfile = config.getDefaultProfile();
@@ -204,6 +208,10 @@ public DefaultDriverContext(
204208
"schemaChangeListener",
205209
() -> buildSchemaChangeListener(schemaChangeListenerFromBuilder),
206210
cycleDetector);
211+
this.requestTrackerFromBuilder = requestTracker;
212+
this.requestTrackerRef =
213+
new LazyReference<>(
214+
"requestTracker", () -> buildRequestTracker(requestTrackerFromBuilder), cycleDetector);
207215
}
208216

209217
protected LoadBalancingPolicy buildLoadBalancingPolicy() {
@@ -432,6 +440,19 @@ protected SchemaChangeListener buildSchemaChangeListener(
432440
DefaultDriverOption.METADATA_SCHEMA_CHANGE_LISTENER_CLASS)));
433441
}
434442

443+
protected RequestTracker buildRequestTracker(RequestTracker requestTrackerFromBuilder) {
444+
return (requestTrackerFromBuilder != null)
445+
? requestTrackerFromBuilder
446+
: Reflection.buildFromConfig(
447+
this, DefaultDriverOption.REQUEST_TRACKER_CLASS, RequestTracker.class)
448+
.orElseThrow(
449+
() ->
450+
new IllegalArgumentException(
451+
String.format(
452+
"Missing request tracker, check your configuration (%s)",
453+
DefaultDriverOption.REQUEST_TRACKER_CLASS)));
454+
}
455+
435456
@Override
436457
public String sessionName() {
437458
return sessionName;
@@ -612,6 +633,11 @@ public SchemaChangeListener schemaChangeListener() {
612633
return schemaChangeListenerRef.get();
613634
}
614635

636+
@Override
637+
public RequestTracker requestTracker() {
638+
return requestTrackerRef.get();
639+
}
640+
615641
@Override
616642
public CodecRegistry codecRegistry() {
617643
return codecRegistry;

core/src/main/java/com/datastax/oss/driver/internal/core/context/InternalDriverContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.datastax.oss.driver.api.core.context.DriverContext;
2020
import com.datastax.oss.driver.api.core.metadata.NodeStateListener;
2121
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener;
22+
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
2223
import com.datastax.oss.driver.internal.core.ConsistencyLevelRegistry;
2324
import com.datastax.oss.driver.internal.core.ProtocolVersionRegistry;
2425
import com.datastax.oss.driver.internal.core.channel.ChannelFactory;
@@ -97,4 +98,6 @@ public interface InternalDriverContext extends DriverContext {
9798
NodeStateListener nodeStateListener();
9899

99100
SchemaChangeListener schemaChangeListener();
101+
102+
RequestTracker requestTracker();
100103
}

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerBase.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public void onThrottleReady(boolean wasDelayed) {
198198
private ScheduledFuture<?> scheduleTimeout(Duration timeout) {
199199
if (timeout.toNanos() > 0) {
200200
return scheduler.schedule(
201-
() -> setFinalError(new DriverTimeoutException("Query timed out after " + timeout)),
201+
() -> setFinalError(new DriverTimeoutException("Query timed out after " + timeout), null),
202202
timeout.toNanos(),
203203
TimeUnit.NANOSECONDS);
204204
} else {
@@ -234,7 +234,7 @@ private void sendRequest(
234234
// We've reached the end of the query plan without finding any node to write to
235235
if (!result.isDone() && activeExecutionsCount.decrementAndGet() == 0) {
236236
// We're the last execution so fail the result
237-
setFinalError(AllNodesFailedException.fromErrors(this.errors));
237+
setFinalError(AllNodesFailedException.fromErrors(this.errors), null);
238238
}
239239
} else {
240240
NodeResponseCallback nodeResponseCallback =
@@ -287,15 +287,14 @@ private void setFinalResult(
287287
if (result.complete(resultSet)) {
288288
cancelScheduledTasks();
289289
throttler.signalSuccess(this);
290+
long latencyNanos = System.nanoTime() - startTimeNanos;
291+
context.requestTracker().onSuccess(statement, latencyNanos, configProfile, callback.node);
290292
session
291293
.getMetricUpdater()
292-
.updateTimer(
293-
DefaultSessionMetric.CQL_REQUESTS,
294-
System.nanoTime() - startTimeNanos,
295-
TimeUnit.NANOSECONDS);
294+
.updateTimer(DefaultSessionMetric.CQL_REQUESTS, latencyNanos, TimeUnit.NANOSECONDS);
296295
}
297296
} catch (Throwable error) {
298-
setFinalError(error);
297+
setFinalError(error, callback.node);
299298
}
300299
}
301300

@@ -323,12 +322,14 @@ private ExecutionInfo buildExecutionInfo(
323322
@Override
324323
public void onThrottleFailure(RequestThrottlingException error) {
325324
session.getMetricUpdater().incrementCounter(DefaultSessionMetric.THROTTLING_ERRORS);
326-
setFinalError(error);
325+
setFinalError(error, null);
327326
}
328327

329-
private void setFinalError(Throwable error) {
328+
private void setFinalError(Throwable error, Node node) {
330329
if (result.completeExceptionally(error)) {
331330
cancelScheduledTasks();
331+
long latencyNanos = System.nanoTime() - startTimeNanos;
332+
context.requestTracker().onError(statement, error, latencyNanos, configProfile, node);
332333
if (error instanceof DriverTimeoutException) {
333334
throttler.signalTimeout(this);
334335
session.getMetricUpdater().incrementCounter(DefaultSessionMetric.CQL_CLIENT_TIMEOUTS);
@@ -380,7 +381,7 @@ public void operationComplete(Future<java.lang.Void> future) throws Exception {
380381
Throwable error = future.cause();
381382
if (error instanceof EncoderException
382383
&& error.getCause() instanceof FrameTooLongException) {
383-
setFinalError(error.getCause());
384+
setFinalError(error.getCause(), node);
384385
} else {
385386
LOG.debug(
386387
"[{}] Failed to send request on {}, trying next node (cause: {})",
@@ -479,10 +480,10 @@ public void onResponse(Frame responseFrame) {
479480
LOG.debug("[{}] Got error response, processing", logPrefix);
480481
processErrorResponse((Error) responseMessage);
481482
} else {
482-
setFinalError(new IllegalStateException("Unexpected response " + responseMessage));
483+
setFinalError(new IllegalStateException("Unexpected response " + responseMessage), node);
483484
}
484485
} catch (Throwable t) {
485-
setFinalError(t);
486+
setFinalError(t, node);
486487
}
487488
}
488489

@@ -524,12 +525,12 @@ private void processErrorResponse(Error errorMessage) {
524525
|| prepareError instanceof FunctionFailureException
525526
|| prepareError instanceof ProtocolError) {
526527
LOG.debug("[{}] Unrecoverable error on reprepare, rethrowing", logPrefix);
527-
setFinalError(prepareError);
528+
setFinalError(prepareError, node);
528529
return null;
529530
}
530531
}
531532
} else if (exception instanceof RequestThrottlingException) {
532-
setFinalError(exception);
533+
setFinalError(exception, node);
533534
return null;
534535
}
535536
recordError(node, exception);
@@ -554,7 +555,7 @@ private void processErrorResponse(Error errorMessage) {
554555
|| error instanceof ProtocolError) {
555556
LOG.debug("[{}] Unrecoverable error, rethrowing", logPrefix);
556557
metricUpdater.incrementCounter(DefaultNodeMetric.OTHER_ERRORS);
557-
setFinalError(error);
558+
setFinalError(error, node);
558559
} else {
559560
RetryDecision decision;
560561
if (error instanceof ReadTimeoutException) {
@@ -634,7 +635,7 @@ private void processRetryDecision(RetryDecision decision, Throwable error) {
634635
sendRequest(null, execution, retryCount + 1, false);
635636
break;
636637
case RETHROW:
637-
setFinalError(error);
638+
setFinalError(error, node);
638639
break;
639640
case IGNORE:
640641
setFinalResult(Void.INSTANCE, null, true, this);

core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ private void closePolicies() {
425425
context.addressTranslator(),
426426
context.configLoader(),
427427
context.nodeStateListener(),
428-
context.schemaChangeListener())) {
428+
context.schemaChangeListener(),
429+
context.requestTracker())) {
429430
try {
430431
closeable.close();
431432
} catch (Throwable t) {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.tracker;
17+
18+
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
19+
import com.datastax.oss.driver.api.core.context.DriverContext;
20+
import com.datastax.oss.driver.api.core.metadata.Node;
21+
import com.datastax.oss.driver.api.core.session.Request;
22+
import com.datastax.oss.driver.api.core.tracker.RequestTracker;
23+
import net.jcip.annotations.ThreadSafe;
24+
25+
@ThreadSafe
26+
public class NoopRequestTracker implements RequestTracker {
27+
28+
public NoopRequestTracker(@SuppressWarnings("unused") DriverContext context) {
29+
// nothing to do
30+
}
31+
32+
@Override
33+
public void onSuccess(
34+
Request request, long latencyNanos, DriverConfigProfile configProfile, Node node) {
35+
// nothing to do
36+
}
37+
38+
@Override
39+
public void onError(
40+
Request request,
41+
Throwable error,
42+
long latencyNanos,
43+
DriverConfigProfile configProfile,
44+
Node node) {
45+
// nothing to do
46+
}
47+
48+
@Override
49+
public void close() throws Exception {
50+
// nothing to do
51+
}
52+
}

0 commit comments

Comments
 (0)