From 356353778d30d501918ad85a7736d62f6383b780 Mon Sep 17 00:00:00 2001 From: konstantinov Date: Tue, 9 Jul 2019 00:35:22 +0300 Subject: [PATCH 1/3] fields "id" in RequestHandler and RequestHandler.SpeculativeExecution are evaluated for each request and allocate extra objects in heap but actually needed only for tracing. The values can be calculated lazy. --- changelog/README.md | 2 +- .../datastax/driver/core/RequestHandler.java | 46 ++++++++++++------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/changelog/README.md b/changelog/README.md index f02b925b416..7d8c5614dc9 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -1,7 +1,7 @@ ## Changelog ### 3.7.2 (In progress) - +- [improvement] JAVA-2326: Avoid String allocations required only for trace in RequestHandler - [bug] JAVA-2249: Stop stripping trailing zeros in ByteOrderedTokens. - [bug] JAVA-1492: Don't immediately reuse busy connections for another request. - [bug] JAVA-2198: Handle UDTs with names that clash with collection types. diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index 5094c4f1bc7..89e671a93c5 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -67,7 +67,7 @@ class RequestHandler { private static final QueryLogger QUERY_LOGGER = QueryLogger.builder().build(); static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS"; - final String id; + private volatile String id; private final SessionManager manager; private final Callback callback; @@ -90,8 +90,7 @@ class RequestHandler { private final AtomicInteger executionIndex = new AtomicInteger(); public RequestHandler(SessionManager manager, Callback callback, Statement statement) { - this.id = Long.toString(System.identityHashCode(this)); - if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement); + if (logger.isTraceEnabled()) logger.trace("[{}] {}", getId(), statement); this.manager = manager; this.callback = callback; this.scheduler = manager.cluster.manager.connectionFactory.timer; @@ -129,6 +128,14 @@ void cancel() { cancelPendingExecutions(null); } + String getId() { + // atomicity is not required to set the value + if (id == null) { + id = Long.toString(System.identityHashCode(this)); + } + return id; + } + private void startNewExecution() { if (isDone.get()) return; @@ -143,7 +150,7 @@ private void startNewExecution() { private void scheduleExecution(long delayMillis) { if (isDone.get() || delayMillis < 0) return; if (logger.isTraceEnabled()) - logger.trace("[{}] Schedule next speculative execution in {} ms", id, delayMillis); + logger.trace("[{}] Schedule next speculative execution in {} ms", getId(), delayMillis); if (delayMillis == 0) { // kick off request immediately scheduleExecutionImmediately(); @@ -189,11 +196,11 @@ private void setFinalResult( SpeculativeExecution execution, Connection connection, Message.Response response) { if (!isDone.compareAndSet(false, true)) { if (logger.isTraceEnabled()) - logger.trace("[{}] Got beaten to setting the result", execution.id); + logger.trace("[{}] Got beaten to setting the result", execution.getId()); return; } - if (logger.isTraceEnabled()) logger.trace("[{}] Setting final result", execution.id); + if (logger.isTraceEnabled()) logger.trace("[{}] Setting final result", execution.getId()); cancelPendingExecutions(execution); @@ -257,11 +264,11 @@ private void setFinalException( SpeculativeExecution execution, Connection connection, Exception exception) { if (!isDone.compareAndSet(false, true)) { if (logger.isTraceEnabled()) - logger.trace("[{}] Got beaten to setting final exception", execution.id); + logger.trace("[{}] Got beaten to setting final exception", execution.getId()); return; } - if (logger.isTraceEnabled()) logger.trace("[{}] Setting final exception", execution.id); + if (logger.isTraceEnabled()) logger.trace("[{}] Setting final exception", execution.getId()); cancelPendingExecutions(execution); @@ -324,7 +331,7 @@ void onSet( * informs the RequestHandler, which will decide what to do */ class SpeculativeExecution implements Connection.ResponseCallback { - final String id; + private volatile String id; private final Message.Request request; private final int position; private volatile Host current; @@ -344,11 +351,18 @@ class SpeculativeExecution implements Connection.ResponseCallback { private volatile Connection.ResponseHandler connectionHandler; SpeculativeExecution(Message.Request request, int position) { - this.id = RequestHandler.this.id + "-" + position; this.request = request; this.position = position; this.queryStateRef = new AtomicReference(QueryState.INITIAL); - if (logger.isTraceEnabled()) logger.trace("[{}] Starting", id); + if (logger.isTraceEnabled()) logger.trace("[{}] Starting", getId()); + } + + String getId() { + // atomicity is not required to set the value + if (id == null) { + id = RequestHandler.this.getId() + "-" + position; + } + return id; } void findNextHostAndQuery() { @@ -387,7 +401,7 @@ private boolean query(final Host host) { HostConnectionPool pool = manager.pools.get(host); if (pool == null || pool.isClosed()) return false; - if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", id, host); + if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", getId(), host); if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true)) scheduleExecution(speculativeExecutionPlan.nextExecution(host)); @@ -530,7 +544,7 @@ private void processRetryDecision( if (logger.isDebugEnabled()) logger.debug( "[{}] Doing retry {} for query {} at consistency {}", - id, + getId(), retriesByPolicy, statement, retryDecision.getRetryConsistencyLevel()); @@ -559,7 +573,7 @@ private void retry(final boolean retryCurrent, ConsistencyLevel newConsistencyLe } private void logError(InetSocketAddress address, Throwable exception) { - logger.debug("[{}] Error querying {} : {}", id, address, exception.toString()); + logger.debug("[{}] Error querying {} : {}", getId(), address, exception.toString()); if (errors == null) { synchronized (RequestHandler.this) { if (errors == null) { @@ -580,7 +594,7 @@ void cancel() { return; } else if (previous.inProgress && queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_IN_PROGRESS)) { - if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while in progress", id); + if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while in progress", getId()); // The connectionHandler should be non-null, but we might miss the update if we're racing // with write(). // If it's still null, this will be handled by re-checking queryStateRef at the end of @@ -598,7 +612,7 @@ void cancel() { return; } else if (!previous.inProgress && queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_COMPLETE)) { - if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while complete", id); + if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while complete", getId()); Host queriedHost = current; if (queriedHost != null && statement != Statement.DEFAULT) { manager.cluster.manager.reportQuery( From dbc0f320ecbc014db3813bdb82515e02e13a8d4e Mon Sep 17 00:00:00 2001 From: konstantinov Date: Tue, 6 Aug 2019 11:09:21 +0300 Subject: [PATCH 2/3] reduce memory allocations in Flusher.run, for Enum.values() invocations, for Integer boxing within trace log parameters and for requests time tracking logic --- .../com/datastax/driver/core/Connection.java | 16 +++++++++------- .../java/com/datastax/driver/core/Frame.java | 3 ++- .../com/datastax/driver/core/RequestHandler.java | 9 ++++----- .../java/com/datastax/driver/core/Responses.java | 3 ++- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java index 2aa22a9ad5f..4151add1e9b 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java @@ -731,7 +731,8 @@ ResponseHandler write( throw new ConnectionException(address, "Connection has been closed"); } - logger.trace("{}, stream {}, writing request {}", this, request.getStreamId(), request); + if (logger.isTraceEnabled()) + logger.trace("{}, stream {}, writing request {}", this, request.getStreamId(), request); writer.incrementAndGet(); if (DISABLE_COALESCING) { @@ -787,8 +788,9 @@ public void run() { } }); } else { - logger.trace( - "{}, stream {}, request sent successfully", Connection.this, request.getStreamId()); + if (logger.isTraceEnabled()) + logger.trace( + "{}, stream {}, request sent successfully", Connection.this, request.getStreamId()); } } }; @@ -1103,11 +1105,11 @@ public void run() { } } - // Always flush what we have (don't artificially delay to try to coalesce more messages) - for (Channel channel : channels) channel.flush(); - channels.clear(); - if (doneWork) { + // Always flush what we have (don't artificially delay to try to coalesce more messages) + for (Channel channel : channels) channel.flush(); + channels.clear(); + runsWithNoWork = 0; } else { // either reschedule or cancel diff --git a/driver-core/src/main/java/com/datastax/driver/core/Frame.java b/driver-core/src/main/java/com/datastax/driver/core/Frame.java index 36d6ee37e7b..46d6d76881e 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Frame.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Frame.java @@ -179,9 +179,10 @@ enum Flag { WARNING, USE_BETA; + private static final Flag[] values = Flag.values(); + static EnumSet deserialize(int flags) { EnumSet set = EnumSet.noneOf(Flag.class); - Flag[] values = Flag.values(); for (int n = 0; n < 8; n++) { if ((flags & (1 << n)) != 0) set.add(values[n]); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index 89e671a93c5..30c5b53f28b 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -15,7 +15,6 @@ */ package com.datastax.driver.core; -import com.codahale.metrics.Timer; import com.datastax.driver.core.exceptions.BootstrappingException; import com.datastax.driver.core.exceptions.BusyConnectionException; import com.datastax.driver.core.exceptions.BusyPoolException; @@ -83,7 +82,6 @@ class RequestHandler { private volatile List triedHosts; private volatile ConcurrentMap errors; - private final Timer.Context timerContext; private final long startTime; private final AtomicBoolean isDone = new AtomicBoolean(); @@ -113,7 +111,6 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state && statement.isIdempotentWithDefault(manager.configuration().getQueryOptions()); this.statement = statement; - this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null; this.startTime = System.nanoTime(); } @@ -205,7 +202,8 @@ private void setFinalResult( cancelPendingExecutions(execution); try { - if (timerContext != null) timerContext.stop(); + if (metricsEnabled()) + metrics().getRequestsTimer().update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); ExecutionInfo info; int speculativeExecutions = executionIndex.get() - 1; @@ -273,7 +271,8 @@ private void setFinalException( cancelPendingExecutions(execution); try { - if (timerContext != null) timerContext.stop(); + if (metricsEnabled()) + metrics().getRequestsTimer().update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } finally { callback.onException(connection, exception, System.nanoTime() - startTime, /*unused*/ 0); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/Responses.java b/driver-core/src/main/java/com/datastax/driver/core/Responses.java index 067a151d84e..ec40a1ecfce 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Responses.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Responses.java @@ -393,9 +393,10 @@ private enum Flag { NO_METADATA, METADATA_CHANGED; + private static final Flag[] values = Flag.values(); + static EnumSet deserialize(int flags) { EnumSet set = EnumSet.noneOf(Flag.class); - Flag[] values = Flag.values(); for (int n = 0; n < values.length; n++) { if ((flags & (1 << n)) != 0) set.add(values[n]); } From 4549138af7d92ef8a72a94e66339398d71de8bab Mon Sep 17 00:00:00 2001 From: konstantinov Date: Wed, 7 Aug 2019 11:27:31 +0300 Subject: [PATCH 3/3] reduce memory allocations in Flusher.run, RequestHandler and flags decoding logic --- changelog/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/README.md b/changelog/README.md index 3b2b9ee7851..de527fd07fb 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -2,7 +2,7 @@ ### 3.7.3 (In progress) -- [improvement] JAVA-2326: Avoid String allocations required only for trace in RequestHandler +- [improvement] JAVA-2326: Reduce memory allocations in Flusher.run, RequestHandler and flags decoding logic ### 3.7.2