diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java index 478ff520a37..dba66a5ab9c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java @@ -46,6 +46,27 @@ public AggregateMetric recordDurations(int count, AtomicLongArray durations) { return this; } + /** + * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link + * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. + */ + public AggregateMetric recordOneDuration(long tagAndDuration) { + ++hitCount; + if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + tagAndDuration ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { + tagAndDuration ^= ERROR_TAG; + errorLatencies.accept(tagAndDuration); + ++errorCount; + } else { + okLatencies.accept(tagAndDuration); + } + duration += tagAndDuration; + return this; + } + public int getErrorCount() { return errorCount; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index 8a69dbc6e56..e632555cc21 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -1,16 +1,26 @@ package datadog.trace.common.metrics; +import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.StopSignal; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,11 +31,8 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); - private final MessagePassingQueue batchPool; private final MessagePassingQueue inbox; private final LRUCache aggregates; - private final ConcurrentMap pending; - private final Set commonKeys; private final MetricWriter writer; // the reporting interval controls how much history will be buffered // when the agent is unresponsive (only 10 pending requests will be @@ -41,20 +48,14 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, HealthMetrics healthMetrics) { this( writer, - batchPool, inbox, - pending, - commonKeys, maxAggregates, reportingInterval, reportingIntervalTimeUnit, @@ -64,30 +65,37 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, long sleepMillis, HealthMetrics healthMetrics) { this.writer = writer; - this.batchPool = batchPool; this.inbox = inbox; - this.commonKeys = commonKeys; this.aggregates = new LRUCache<>( - new CommonKeyCleaner(commonKeys, healthMetrics), - maxAggregates * 4 / 3, - 0.75f, - maxAggregates); - this.pending = pending; + new AggregateExpiry(healthMetrics), maxAggregates * 4 / 3, 0.75f, maxAggregates); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; } + private static final class AggregateExpiry + implements LRUCache.ExpiryListener { + private final HealthMetrics healthMetrics; + + AggregateExpiry(HealthMetrics healthMetrics) { + this.healthMetrics = healthMetrics; + } + + @Override + public void accept(Map.Entry expired) { + if (expired.getValue().getHitCount() > 0) { + healthMetrics.onStatsAggregateDropped(); + } + } + } + public void clearAggregates() { this.aggregates.clear(); } @@ -129,20 +137,54 @@ public void accept(InboxItem item) { } else { signal.ignore(); } - } else if (item instanceof Batch && !stopped) { - Batch batch = (Batch) item; - MetricKey key = batch.getKey(); - // important that it is still *this* batch pending, must not remove otherwise - pending.remove(key, batch); + } else if (item instanceof SpanSnapshot && !stopped) { + SpanSnapshot snapshot = (SpanSnapshot) item; + MetricKey key = buildMetricKey(snapshot); AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); - batch.contributeTo(aggregate); + aggregate.recordOneDuration(snapshot.tagAndDuration); dirty = true; - // return the batch for reuse - batchPool.offer(batch); } } } + private static MetricKey buildMetricKey(SpanSnapshot s) { + return new MetricKey( + s.resourceName, + SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE), + s.operationName, + s.serviceNameSource, + s.spanType, + s.httpStatusCode, + s.synthetic, + s.traceRoot, + SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create), + materializePeerTags(s.peerTagPairs), + s.httpMethod, + s.httpEndpoint, + s.grpcStatusCode); + } + + private static List materializePeerTags(String[] pairs) { + if (pairs == null || pairs.length == 0) { + return Collections.emptyList(); + } + if (pairs.length == 2) { + // single-entry fast path (matches the original singletonList shape for INTERNAL spans) + return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); + } + List tags = new ArrayList<>(pairs.length / 2); + for (int i = 0; i < pairs.length; i += 2) { + tags.add(encodePeerTag(pairs[i], pairs[i + 1])); + } + return tags; + } + + private static UTF8BytesString encodePeerTag(String name, String value) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); + return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + } + private void report(long when, SignalItem signal) { boolean skipped = true; if (dirty) { @@ -177,7 +219,6 @@ private void expungeStaleAggregates() { AggregateMetric metric = pair.getValue(); if (metric.getHitCount() == 0) { it.remove(); - commonKeys.remove(pair.getKey()); } } } @@ -185,24 +226,4 @@ private void expungeStaleAggregates() { private long wallClockTime() { return MILLISECONDS.toNanos(System.currentTimeMillis()); } - - private static final class CommonKeyCleaner - implements LRUCache.ExpiryListener { - - private final Set commonKeys; - private final HealthMetrics healthMetrics; - - private CommonKeyCleaner(Set commonKeys, HealthMetrics healthMetrics) { - this.commonKeys = commonKeys; - this.healthMetrics = healthMetrics; - } - - @Override - public void accept(Map.Entry expired) { - commonKeys.remove(expired.getKey()); - if (expired.getValue().getHitCount() > 0) { - healthMetrics.onStatsAggregateDropped(); - } - } - } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java deleted file mode 100644 index 5f103805e98..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java +++ /dev/null @@ -1,90 +0,0 @@ -package datadog.trace.common.metrics; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongArray; - -/** - * This is a thread-safe container for partial conflating and accumulating partial aggregates on the - * same key. - * - *

Updates to an already consumed batch are rejected. - * - *

A batch can currently take at most 64 values. Attempts to add the 65th update will be - * rejected. - */ -public final class Batch implements InboxItem { - - private static final int MAX_BATCH_SIZE = 64; - private static final AtomicIntegerFieldUpdater COUNT = - AtomicIntegerFieldUpdater.newUpdater(Batch.class, "count"); - private static final AtomicIntegerFieldUpdater COMMITTED = - AtomicIntegerFieldUpdater.newUpdater(Batch.class, "committed"); - - /** - * This counter has two states: - * - *

    - *
  1. negative: the batch has been used, must not add values - *
  2. otherwise: the number of values added to the batch - *
- */ - private volatile int count = 0; - - /** incremented when a duration has been added. */ - private volatile int committed = 0; - - private MetricKey key; - private final AtomicLongArray durations; - - Batch(MetricKey key) { - this(new AtomicLongArray(MAX_BATCH_SIZE)); - this.key = key; - } - - Batch() { - this(new AtomicLongArray(MAX_BATCH_SIZE)); - } - - private Batch(AtomicLongArray durations) { - this.durations = durations; - } - - public MetricKey getKey() { - return key; - } - - public Batch reset(MetricKey key) { - this.key = key; - COUNT.lazySet(this, 0); - return this; - } - - public boolean isUsed() { - return count < 0; - } - - public boolean add(long tag, long durationNanos) { - // technically this would be wrong if there were 2^31 unsuccessful - // attempts to add a value, but this an acceptable risk - int position = COUNT.getAndIncrement(this); - if (position >= 0 && position < durations.length()) { - durations.set(position, tag | durationNanos); - COMMITTED.getAndIncrement(this); - return true; - } - return false; - } - - public void contributeTo(AggregateMetric aggregate) { - int count = Math.min(COUNT.getAndSet(this, Integer.MIN_VALUE), MAX_BATCH_SIZE); - if (count >= 0) { - // wait for the duration to have been set. - // note this mechanism only supports a single reader - while (committed != count) { - Thread.yield(); - } - COMMITTED.lazySet(this, 0); - aggregate.recordDurations(count, durations); - } - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index fee2f9a7748..9ea77140113 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -3,7 +3,6 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT; import static datadog.trace.api.DDSpanTypes.RPC; import static datadog.trace.api.DDTags.BASE_SERVICE; -import static datadog.trace.api.Functions.UTF8_ENCODE; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; @@ -33,13 +32,11 @@ import datadog.trace.core.SpanKindFilter; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -54,18 +51,16 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - private static final DDCache SERVICE_NAMES = - DDCaches.newFixedSizeCache(32); + static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); - private static final DDCache SPAN_KINDS = - DDCaches.newFixedSizeCache(16); - private static final DDCache< + static final DDCache SPAN_KINDS = DDCaches.newFixedSizeCache(16); + static final DDCache< String, Pair, Function>> PEER_TAGS_CACHE = DDCaches.newFixedSizeCache( 64); // it can be unbounded since those values are returned by the agent and should be // under control. 64 entries is enough in this case to contain all the peer tags. - private static final Function< + static final Function< String, Pair, Function>> PEER_TAGS_CACHE_ADDER = key -> @@ -89,9 +84,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve SpanKindFilter.builder().includeInternal().build(); private final Set ignoredResources; - private final MessagePassingQueue batchPool; - private final ConcurrentHashMap pending; - private final ConcurrentHashMap keys; private final Thread thread; private final MessagePassingQueue inbox; private final Sink sink; @@ -185,23 +177,12 @@ public ConflatingMetricsAggregator( this.ignoredResources = ignoredResources; this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); - this.batchPool = Queues.spmcArrayQueue(maxAggregates); - this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3); - this.keys = new ConcurrentHashMap<>(); this.features = features; this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = new Aggregator( - metricWriter, - batchPool, - inbox, - pending, - keys.keySet(), - maxAggregates, - reportingInterval, - timeUnit, - healthMetric); + metricWriter, inbox, maxAggregates, reportingInterval, timeUnit, healthMetric); this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator); this.reportingInterval = reportingInterval; this.reportingIntervalTimeUnit = timeUnit; @@ -328,99 +309,73 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { // CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString // tag values don't trigger a ClassCastException on the String assignment. final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString(); - MetricKey newKey = - new MetricKey( + + boolean error = span.getError() > 0; + long tagAndDuration = + span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); + + SpanSnapshot snapshot = + new SpanSnapshot( span.getResourceName(), - SERVICE_NAMES.computeIfAbsent(span.getServiceName(), UTF8_ENCODE), + span.getServiceName(), span.getOperationName(), span.getServiceNameSource(), spanType, span.getHttpStatusCode(), isSynthetic(span), span.getParentId() == 0, - SPAN_KINDS.computeIfAbsent( - spanKind, UTF8BytesString::create), // save repeated utf8 conversions - getPeerTags(span), + spanKind, + extractPeerTagPairs(span), httpMethod, httpEndpoint, - grpcStatusCode); - MetricKey key = keys.putIfAbsent(newKey, newKey); - if (null == key) { - key = newKey; + grpcStatusCode, + tagAndDuration); + if (!inbox.offer(snapshot)) { + healthMetrics.onStatsInboxFull(); } - long tag = (span.getError() > 0 ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); - long durationNanos = span.getDurationNano(); - Batch batch = pending.get(key); - if (null != batch) { - // there is a pending batch, try to win the race to add to it - // returning false means that either the batch can't take any - // more data, or it has already been consumed - if (batch.add(tag, durationNanos)) { - // added to a pending batch prior to consumption, - // so skip publishing to the queue (we also know - // the key isn't rare enough to override the sampler) - return false; - } - // recycle the older key - key = batch.getKey(); - } - batch = newBatch(key); - batch.add(tag, durationNanos); - // overwrite the last one if present, it was already full - // or had been consumed by the time we tried to add to it - pending.put(key, batch); - // must offer to the queue after adding to pending - inbox.offer(batch); // force keep keys if there are errors - return span.getError() > 0; + return error; } - private List getPeerTags(CoreSpan span) { + private String[] extractPeerTagPairs(CoreSpan span) { if (span.isKind(PEER_AGGREGATION_KINDS)) { final Set eligiblePeerTags = features.peerTags(); - List peerTags = null; + String[] pairs = null; + int count = 0; for (String peerTag : eligiblePeerTags) { Object value = span.unsafeGetTag(peerTag); if (value != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); - if (peerTags == null) { - peerTags = new ArrayList<>(eligiblePeerTags.size()); + if (pairs == null) { + // pairs are flattened [name, value, ...]; size for worst case + pairs = new String[eligiblePeerTags.size() * 2]; } - peerTags.add( - cacheAndCreator - .getLeft() - .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + pairs[count++] = peerTag; + pairs[count++] = value.toString(); } } - return peerTags == null ? Collections.emptyList() : peerTags; + if (pairs == null) { + return null; + } + if (count < pairs.length) { + String[] trimmed = new String[count]; + System.arraycopy(pairs, 0, trimmed, 0, count); + return trimmed; + } + return pairs; } else if (span.isKind(INTERNAL_KIND)) { // in this case only the base service should be aggregated if present final Object baseService = span.unsafeGetTag(BASE_SERVICE); if (baseService != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); - return Collections.singletonList( - cacheAndCreator - .getLeft() - .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); + return new String[] {BASE_SERVICE, baseService.toString()}; } } - return Collections.emptyList(); + return null; } private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } - private Batch newBatch(MetricKey key) { - Batch batch = batchPool.poll(); - if (null == batch) { - return new Batch(key); - } - return batch.reset(key); - } - public void stop() { if (null != cancellation) { cancellation.cancel(); @@ -463,8 +418,6 @@ private void disable() { features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - this.pending.clear(); - this.batchPool.clear(); this.inbox.clear(); this.aggregator.clearAggregates(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java new file mode 100644 index 00000000000..2816fad0411 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -0,0 +1,65 @@ +package datadog.trace.common.metrics; + +/** + * Immutable per-span value posted from the producer to the aggregator thread. Carries the raw + * inputs the aggregator needs to build a {@link MetricKey} and update an {@link AggregateMetric}. + * + *

All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the + * aggregator thread; the producer just shuffles references. + */ +final class SpanSnapshot implements InboxItem { + + final CharSequence resourceName; + final String serviceName; + final CharSequence operationName; + final CharSequence serviceNameSource; + final CharSequence spanType; + final short httpStatusCode; + final boolean synthetic; + final boolean traceRoot; + final String spanKind; + + /** + * Flattened name/value pairs of peer-tag matches: {@code [name0, value0, name1, value1, ...]}. + * {@code null} when there are no matches (the common case). + */ + final String[] peerTagPairs; + + final String httpMethod; + final String httpEndpoint; + final String grpcStatusCode; + + /** Duration in nanoseconds, OR-ed with {@code ERROR_TAG} / {@code TOP_LEVEL_TAG} as needed. */ + final long tagAndDuration; + + SpanSnapshot( + CharSequence resourceName, + String serviceName, + CharSequence operationName, + CharSequence serviceNameSource, + CharSequence spanType, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + String spanKind, + String[] peerTagPairs, + String httpMethod, + String httpEndpoint, + String grpcStatusCode, + long tagAndDuration) { + this.resourceName = resourceName; + this.serviceName = serviceName; + this.operationName = operationName; + this.serviceNameSource = serviceNameSource; + this.spanType = spanType; + this.httpStatusCode = httpStatusCode; + this.synthetic = synthetic; + this.traceRoot = traceRoot; + this.spanKind = spanKind; + this.peerTagPairs = peerTagPairs; + this.httpMethod = httpMethod; + this.httpEndpoint = httpEndpoint; + this.grpcStatusCode = grpcStatusCode; + this.tagAndDuration = tagAndDuration; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index 257d887029b..d1c7fe126b4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -93,6 +93,11 @@ public void onClientStatDowngraded() {} public void onStatsAggregateDropped() {} + /** + * Reports a single span whose stats snapshot was dropped because the aggregator inbox was full. + */ + public void onStatsInboxFull() {} + /** * @return Human-readable summary of the current health metrics. */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 2df54241e56..76051645fcb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -98,6 +98,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder clientStatsDowngrades = new LongAdder(); private final LongAdder statsAggregateDropped = new LongAdder(); + private final LongAdder statsInboxFull = new LongAdder(); private final StatsDClient statsd; private final long interval; @@ -357,6 +358,11 @@ public void onStatsAggregateDropped() { statsAggregateDropped.increment(); } + @Override + public void onStatsInboxFull() { + statsInboxFull.increment(); + } + @Override public void close() { if (null != cancellation) { @@ -374,6 +380,7 @@ private static class Flush implements AgentTaskScheduler.Task= 99 okLatencies.getMaxValue() <= 5 } - - def "consistent under concurrent attempts to read and write"() { - given: - AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null) - BlockingDeque queue = new LinkedBlockingDeque<>(1000) - ExecutorService reader = Executors.newSingleThreadExecutor() - int writerCount = 10 - ExecutorService writers = Executors.newFixedThreadPool(writerCount) - CountDownLatch readerLatch = new CountDownLatch(1) - CountDownLatch writerLatch = new CountDownLatch(writerCount) - CountDownLatch queueEmptyLatch = new CountDownLatch(1) - - AtomicInteger written = new AtomicInteger(0) - - when: - for (int i = 0; i < writerCount; ++i) { - writers.submit({ - readerLatch.await() - for (int j = 0; j < 10_000; ++j) { - Batch batch = queue.peekLast() - if (batch?.add(0L, 1)) { - written.incrementAndGet() - } else { - queue.offer(new Batch().reset(key)) - } - } - writerLatch.countDown() - }) - } - def future = reader.submit({ - readerLatch.countDown() - while (!Thread.currentThread().isInterrupted()) { - Batch batch = queue.poll(100, TimeUnit.MILLISECONDS) - if (null == batch && writerLatch.count == 0) { - queueEmptyLatch.countDown() - } else if (null != batch) { - batch.contributeTo(aggregate) - } - } - }) - assert writerLatch.await(10, TimeUnit.SECONDS) - // Wait here until we know that the queue is empty - assert queueEmptyLatch.await(10, TimeUnit.SECONDS) - future.cancel(true) - - then: - aggregate.getHitCount() == written.get() - } }