diff --git a/changelog/README.md b/changelog/README.md index 834ad3e6b56..de527fd07fb 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -1,5 +1,10 @@ ## Changelog +### 3.7.3 (In progress) + +- [improvement] JAVA-2326: Reduce memory allocations in Flusher.run, RequestHandler and flags decoding logic + + ### 3.7.2 - [bug] JAVA-2249: Stop stripping trailing zeros in ByteOrderedTokens. diff --git a/driver-core/src/main/java/com/datastax/driver/core/Connection.java b/driver-core/src/main/java/com/datastax/driver/core/Connection.java index 2aa22a9ad5f..4151add1e9b 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Connection.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Connection.java @@ -731,7 +731,8 @@ ResponseHandler write( throw new ConnectionException(address, "Connection has been closed"); } - logger.trace("{}, stream {}, writing request {}", this, request.getStreamId(), request); + if (logger.isTraceEnabled()) + logger.trace("{}, stream {}, writing request {}", this, request.getStreamId(), request); writer.incrementAndGet(); if (DISABLE_COALESCING) { @@ -787,8 +788,9 @@ public void run() { } }); } else { - logger.trace( - "{}, stream {}, request sent successfully", Connection.this, request.getStreamId()); + if (logger.isTraceEnabled()) + logger.trace( + "{}, stream {}, request sent successfully", Connection.this, request.getStreamId()); } } }; @@ -1103,11 +1105,11 @@ public void run() { } } - // Always flush what we have (don't artificially delay to try to coalesce more messages) - for (Channel channel : channels) channel.flush(); - channels.clear(); - if (doneWork) { + // Always flush what we have (don't artificially delay to try to coalesce more messages) + for (Channel channel : channels) channel.flush(); + channels.clear(); + runsWithNoWork = 0; } else { // either reschedule or cancel diff --git a/driver-core/src/main/java/com/datastax/driver/core/Frame.java b/driver-core/src/main/java/com/datastax/driver/core/Frame.java index 36d6ee37e7b..46d6d76881e 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Frame.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Frame.java @@ -179,9 +179,10 @@ enum Flag { WARNING, USE_BETA; + private static final Flag[] values = Flag.values(); + static EnumSet deserialize(int flags) { EnumSet set = EnumSet.noneOf(Flag.class); - Flag[] values = Flag.values(); for (int n = 0; n < 8; n++) { if ((flags & (1 << n)) != 0) set.add(values[n]); } diff --git a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java index 5094c4f1bc7..30c5b53f28b 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java +++ b/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java @@ -15,7 +15,6 @@ */ package com.datastax.driver.core; -import com.codahale.metrics.Timer; import com.datastax.driver.core.exceptions.BootstrappingException; import com.datastax.driver.core.exceptions.BusyConnectionException; import com.datastax.driver.core.exceptions.BusyPoolException; @@ -67,7 +66,7 @@ class RequestHandler { private static final QueryLogger QUERY_LOGGER = QueryLogger.builder().build(); static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS"; - final String id; + private volatile String id; private final SessionManager manager; private final Callback callback; @@ -83,15 +82,13 @@ class RequestHandler { private volatile List triedHosts; private volatile ConcurrentMap errors; - private final Timer.Context timerContext; private final long startTime; private final AtomicBoolean isDone = new AtomicBoolean(); private final AtomicInteger executionIndex = new AtomicInteger(); public RequestHandler(SessionManager manager, Callback callback, Statement statement) { - this.id = Long.toString(System.identityHashCode(this)); - if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement); + if (logger.isTraceEnabled()) logger.trace("[{}] {}", getId(), statement); this.manager = manager; this.callback = callback; this.scheduler = manager.cluster.manager.connectionFactory.timer; @@ -114,7 +111,6 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state && statement.isIdempotentWithDefault(manager.configuration().getQueryOptions()); this.statement = statement; - this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null; this.startTime = System.nanoTime(); } @@ -129,6 +125,14 @@ void cancel() { cancelPendingExecutions(null); } + String getId() { + // atomicity is not required to set the value + if (id == null) { + id = Long.toString(System.identityHashCode(this)); + } + return id; + } + private void startNewExecution() { if (isDone.get()) return; @@ -143,7 +147,7 @@ private void startNewExecution() { private void scheduleExecution(long delayMillis) { if (isDone.get() || delayMillis < 0) return; if (logger.isTraceEnabled()) - logger.trace("[{}] Schedule next speculative execution in {} ms", id, delayMillis); + logger.trace("[{}] Schedule next speculative execution in {} ms", getId(), delayMillis); if (delayMillis == 0) { // kick off request immediately scheduleExecutionImmediately(); @@ -189,16 +193,17 @@ private void setFinalResult( SpeculativeExecution execution, Connection connection, Message.Response response) { if (!isDone.compareAndSet(false, true)) { if (logger.isTraceEnabled()) - logger.trace("[{}] Got beaten to setting the result", execution.id); + logger.trace("[{}] Got beaten to setting the result", execution.getId()); return; } - if (logger.isTraceEnabled()) logger.trace("[{}] Setting final result", execution.id); + if (logger.isTraceEnabled()) logger.trace("[{}] Setting final result", execution.getId()); cancelPendingExecutions(execution); try { - if (timerContext != null) timerContext.stop(); + if (metricsEnabled()) + metrics().getRequestsTimer().update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); ExecutionInfo info; int speculativeExecutions = executionIndex.get() - 1; @@ -257,16 +262,17 @@ private void setFinalException( SpeculativeExecution execution, Connection connection, Exception exception) { if (!isDone.compareAndSet(false, true)) { if (logger.isTraceEnabled()) - logger.trace("[{}] Got beaten to setting final exception", execution.id); + logger.trace("[{}] Got beaten to setting final exception", execution.getId()); return; } - if (logger.isTraceEnabled()) logger.trace("[{}] Setting final exception", execution.id); + if (logger.isTraceEnabled()) logger.trace("[{}] Setting final exception", execution.getId()); cancelPendingExecutions(execution); try { - if (timerContext != null) timerContext.stop(); + if (metricsEnabled()) + metrics().getRequestsTimer().update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } finally { callback.onException(connection, exception, System.nanoTime() - startTime, /*unused*/ 0); } @@ -324,7 +330,7 @@ void onSet( * informs the RequestHandler, which will decide what to do */ class SpeculativeExecution implements Connection.ResponseCallback { - final String id; + private volatile String id; private final Message.Request request; private final int position; private volatile Host current; @@ -344,11 +350,18 @@ class SpeculativeExecution implements Connection.ResponseCallback { private volatile Connection.ResponseHandler connectionHandler; SpeculativeExecution(Message.Request request, int position) { - this.id = RequestHandler.this.id + "-" + position; this.request = request; this.position = position; this.queryStateRef = new AtomicReference(QueryState.INITIAL); - if (logger.isTraceEnabled()) logger.trace("[{}] Starting", id); + if (logger.isTraceEnabled()) logger.trace("[{}] Starting", getId()); + } + + String getId() { + // atomicity is not required to set the value + if (id == null) { + id = RequestHandler.this.getId() + "-" + position; + } + return id; } void findNextHostAndQuery() { @@ -387,7 +400,7 @@ private boolean query(final Host host) { HostConnectionPool pool = manager.pools.get(host); if (pool == null || pool.isClosed()) return false; - if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", id, host); + if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", getId(), host); if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true)) scheduleExecution(speculativeExecutionPlan.nextExecution(host)); @@ -530,7 +543,7 @@ private void processRetryDecision( if (logger.isDebugEnabled()) logger.debug( "[{}] Doing retry {} for query {} at consistency {}", - id, + getId(), retriesByPolicy, statement, retryDecision.getRetryConsistencyLevel()); @@ -559,7 +572,7 @@ private void retry(final boolean retryCurrent, ConsistencyLevel newConsistencyLe } private void logError(InetSocketAddress address, Throwable exception) { - logger.debug("[{}] Error querying {} : {}", id, address, exception.toString()); + logger.debug("[{}] Error querying {} : {}", getId(), address, exception.toString()); if (errors == null) { synchronized (RequestHandler.this) { if (errors == null) { @@ -580,7 +593,7 @@ void cancel() { return; } else if (previous.inProgress && queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_IN_PROGRESS)) { - if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while in progress", id); + if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while in progress", getId()); // The connectionHandler should be non-null, but we might miss the update if we're racing // with write(). // If it's still null, this will be handled by re-checking queryStateRef at the end of @@ -598,7 +611,7 @@ void cancel() { return; } else if (!previous.inProgress && queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_COMPLETE)) { - if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while complete", id); + if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while complete", getId()); Host queriedHost = current; if (queriedHost != null && statement != Statement.DEFAULT) { manager.cluster.manager.reportQuery( diff --git a/driver-core/src/main/java/com/datastax/driver/core/Responses.java b/driver-core/src/main/java/com/datastax/driver/core/Responses.java index 067a151d84e..ec40a1ecfce 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Responses.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Responses.java @@ -393,9 +393,10 @@ private enum Flag { NO_METADATA, METADATA_CHANGED; + private static final Flag[] values = Flag.values(); + static EnumSet deserialize(int flags) { EnumSet set = EnumSet.noneOf(Flag.class); - Flag[] values = Flag.values(); for (int n = 0; n < values.length; n++) { if ((flags & (1 << n)) != 0) set.add(values[n]); }