From 60e8c0b104236b809e19659108b0dcc81912b00e Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 14:18:17 -0400 Subject: [PATCH 1/3] Add AggregateTable + AggregateEntry backed by Hashtable Standalone classes for swapping the consumer-side LRUCache with a multi-key Hashtable in the next commit. No call sites use them yet. - AggregateEntry extends Hashtable.Entry, holds the canonical MetricKey, the mutable AggregateMetric, and copies of the 13 raw SpanSnapshot fields for matches(). The 64-bit lookup hash is computed via chained LongHashingUtils.addToHash calls (no varargs, no boxing of short/boolean). - AggregateTable wraps a Hashtable.Entry[] from Hashtable.Support.create. findOrInsert(SpanSnapshot) walks the bucket comparing raw fields, falling back to MetricKeys.fromSnapshot on a true miss. On cap overrun, it scans for an entry with hitCount==0 and unlinks it; if none, it returns null and the caller drops the data point. - MetricKeys.fromSnapshot extracts the canonicalization logic (DDCache lookups + UTF8 encoding) from Aggregator.buildMetricKey, so the helper can be called from AggregateTable on miss. This also commits Hashtable and LongHashingUtils (added earlier, previously uncommitted) and lifts Hashtable.Entry / Hashtable.Support visibility so client code outside datadog.trace.util can build higher-arity tables -- the case the javadoc describes but the original visibility didn't actually support. Specifically: Entry is now public abstract with a protected ctor; keyHash, next(), and setNext() are public; Support's create / clear / bucketIndex / bucketIterator / mutatingBucketIterator methods are public. Tests: AggregateTableTest covers hit, miss, distinct-by-spanKind, peer-tag identity (including null vs non-null), cap overrun with stale victim, cap overrun with no victim (returns null), expungeStaleAggregates, forEach, clear, and that the canonical MetricKey is built at insert. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 98 ++++ .../trace/common/metrics/AggregateTable.java | 134 +++++ .../trace/common/metrics/MetricKeys.java | 65 ++ .../common/metrics/AggregateTableTest.java | 234 ++++++++ .../java/datadog/trace/util/Hashtable.java | 553 ++++++++++++++++++ .../datadog/trace/util/LongHashingUtils.java | 158 +++++ 6 files changed, 1242 insertions(+) create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKeys.java create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java create mode 100644 internal-api/src/main/java/datadog/trace/util/Hashtable.java create mode 100644 internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java new file mode 100644 index 00000000000..10e256620f5 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -0,0 +1,98 @@ +package datadog.trace.common.metrics; + +import datadog.trace.util.Hashtable; +import datadog.trace.util.LongHashingUtils; +import java.util.Arrays; +import java.util.Objects; + +/** + * Hashtable entry pairing the raw {@link SpanSnapshot} key fields with their canonical {@link + * MetricKey} (built once on miss) and the mutable {@link AggregateMetric}. + * + *

Lookups compare the snapshot's raw fields against the entry's stored copies, so the consumer + * never has to build a {@link MetricKey} just to do a HashMap lookup. The {@code MetricKey} field + * is retained because the serializer ({@link MetricWriter#add}) needs it at report time. + */ +final class AggregateEntry extends Hashtable.Entry { + final MetricKey key; + final AggregateMetric aggregate; + + // Raw snapshot fields, used by matches(SpanSnapshot). Stored as captured at insert time; + // the canonical MetricKey above holds the UTF8BytesString-encoded forms. + private final CharSequence resourceName; + private final String serviceName; + private final CharSequence operationName; + private final CharSequence serviceNameSource; + private final CharSequence spanType; + private final short httpStatusCode; + private final boolean synthetic; + private final boolean traceRoot; + private final String spanKind; + private final String[] peerTagPairs; + private final String httpMethod; + private final String httpEndpoint; + private final String grpcStatusCode; + + AggregateEntry(MetricKey key, SpanSnapshot s, AggregateMetric aggregate) { + super(hashOf(s)); + this.key = key; + this.aggregate = aggregate; + this.resourceName = s.resourceName; + this.serviceName = s.serviceName; + this.operationName = s.operationName; + this.serviceNameSource = s.serviceNameSource; + this.spanType = s.spanType; + this.httpStatusCode = s.httpStatusCode; + this.synthetic = s.synthetic; + this.traceRoot = s.traceRoot; + this.spanKind = s.spanKind; + this.peerTagPairs = s.peerTagPairs; + this.httpMethod = s.httpMethod; + this.httpEndpoint = s.httpEndpoint; + this.grpcStatusCode = s.grpcStatusCode; + } + + boolean matches(SpanSnapshot s) { + return httpStatusCode == s.httpStatusCode + && synthetic == s.synthetic + && traceRoot == s.traceRoot + && Objects.equals(resourceName, s.resourceName) + && Objects.equals(serviceName, s.serviceName) + && Objects.equals(operationName, s.operationName) + && Objects.equals(serviceNameSource, s.serviceNameSource) + && Objects.equals(spanType, s.spanType) + && Objects.equals(spanKind, s.spanKind) + && Arrays.equals(peerTagPairs, s.peerTagPairs) + && Objects.equals(httpMethod, s.httpMethod) + && Objects.equals(httpEndpoint, s.httpEndpoint) + && Objects.equals(grpcStatusCode, s.grpcStatusCode); + } + + /** + * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no + * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's + * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes + * to the same bucket the snapshot itself looks up. + */ + static long hashOf(SpanSnapshot s) { + long h = 0; + h = LongHashingUtils.addToHash(h, s.resourceName); + h = LongHashingUtils.addToHash(h, s.serviceName); + h = LongHashingUtils.addToHash(h, s.operationName); + h = LongHashingUtils.addToHash(h, s.serviceNameSource); + h = LongHashingUtils.addToHash(h, s.spanType); + h = LongHashingUtils.addToHash(h, s.httpStatusCode); + h = LongHashingUtils.addToHash(h, s.synthetic); + h = LongHashingUtils.addToHash(h, s.traceRoot); + h = LongHashingUtils.addToHash(h, s.spanKind); + if (s.peerTagPairs != null) { + for (String p : s.peerTagPairs) { + h = LongHashingUtils.addToHash(h, p); + } + } + h = LongHashingUtils.addToHash(h, s.httpMethod); + h = LongHashingUtils.addToHash(h, s.httpEndpoint); + h = LongHashingUtils.addToHash(h, s.grpcStatusCode); + return h; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java new file mode 100644 index 00000000000..98260a2e2b3 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java @@ -0,0 +1,134 @@ +package datadog.trace.common.metrics; + +import datadog.trace.util.Hashtable; +import java.util.function.BiConsumer; + +/** + * Consumer-side {@link AggregateMetric} store, keyed on the raw fields of a {@link SpanSnapshot}. + * + *

Replaces the prior {@code LRUCache}. The win is on the + * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise + * {@code matches}, with no {@link MetricKey} allocation and no UTF8 cache lookups. The canonical + * {@link MetricKey} (with UTF8-encoded forms) is only built once per unique key, at insert time, + * and lives on the {@link AggregateEntry}. + * + *

Not thread-safe. The aggregator thread is the sole writer; {@link #clear()} must be + * routed through the inbox rather than called from arbitrary threads. + */ +final class AggregateTable { + + private final Hashtable.Entry[] buckets; + private final int maxAggregates; + private int size; + + AggregateTable(int maxAggregates) { + this.buckets = Hashtable.Support.create(maxAggregates * 4 / 3); + this.maxAggregates = maxAggregates; + } + + int size() { + return size; + } + + boolean isEmpty() { + return size == 0; + } + + /** + * Returns the {@link AggregateMetric} to update for {@code snapshot}, lazily creating an entry on + * miss. Returns {@code null} when the table is at capacity and no stale entry can be evicted -- + * the caller should drop the data point in that case. + */ + AggregateMetric findOrInsert(SpanSnapshot snapshot) { + long keyHash = AggregateEntry.hashOf(snapshot); + int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash); + for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) { + if (e.keyHash == keyHash) { + AggregateEntry candidate = (AggregateEntry) e; + if (candidate.matches(snapshot)) { + return candidate.aggregate; + } + } + } + if (size >= maxAggregates && !evictOneStale()) { + return null; + } + AggregateEntry entry = + new AggregateEntry(MetricKeys.fromSnapshot(snapshot), snapshot, new AggregateMetric()); + entry.setNext(buckets[bucketIndex]); + buckets[bucketIndex] = entry; + size++; + return entry.aggregate; + } + + /** Unlink the first entry whose {@code AggregateMetric.getHitCount() == 0}. */ + private boolean evictOneStale() { + for (int i = 0; i < buckets.length; i++) { + Hashtable.Entry head = buckets[i]; + if (head == null) { + continue; + } + if (((AggregateEntry) head).aggregate.getHitCount() == 0) { + buckets[i] = head.next(); + size--; + return true; + } + Hashtable.Entry prev = head; + Hashtable.Entry cur = head.next(); + while (cur != null) { + if (((AggregateEntry) cur).aggregate.getHitCount() == 0) { + prev.setNext(cur.next()); + size--; + return true; + } + prev = cur; + cur = cur.next(); + } + } + return false; + } + + void forEach(BiConsumer consumer) { + for (int i = 0; i < buckets.length; i++) { + for (Hashtable.Entry e = buckets[i]; e != null; e = e.next()) { + AggregateEntry entry = (AggregateEntry) e; + consumer.accept(entry.key, entry.aggregate); + } + } + } + + /** Removes entries whose {@code AggregateMetric.getHitCount() == 0}. */ + void expungeStaleAggregates() { + for (int i = 0; i < buckets.length; i++) { + // unlink leading stale entries + Hashtable.Entry head = buckets[i]; + while (head != null && ((AggregateEntry) head).aggregate.getHitCount() == 0) { + head = head.next(); + size--; + } + buckets[i] = head; + if (head == null) { + continue; + } + // unlink stale entries in the chain + Hashtable.Entry prev = head; + Hashtable.Entry cur = head.next(); + while (cur != null) { + if (((AggregateEntry) cur).aggregate.getHitCount() == 0) { + Hashtable.Entry skipped = cur.next(); + prev.setNext(skipped); + size--; + cur = skipped; + } else { + prev = cur; + cur = cur.next(); + } + } + } + } + + void clear() { + Hashtable.Support.clear(buckets); + size = 0; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKeys.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKeys.java new file mode 100644 index 00000000000..2e03c3730d3 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKeys.java @@ -0,0 +1,65 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS; + +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** + * Canonicalization helpers for {@link MetricKey}: applies the static {@link + * ConflatingMetricsAggregator#SERVICE_NAMES} / {@link ConflatingMetricsAggregator#SPAN_KINDS} / + * {@link ConflatingMetricsAggregator#PEER_TAGS_CACHE} caches to a {@link SpanSnapshot}. + * + *

Called only on a true miss in {@link AggregateTable}, so the CHM lookups inside the DDCaches + * happen once per unique key rather than once per snapshot. + */ +final class MetricKeys { + private MetricKeys() {} + + static MetricKey fromSnapshot(SpanSnapshot s) { + return new MetricKey( + s.resourceName, + SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE), + s.operationName, + s.serviceNameSource, + s.spanType, + s.httpStatusCode, + s.synthetic, + s.traceRoot, + SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create), + materializePeerTags(s.peerTagPairs), + s.httpMethod, + s.httpEndpoint, + s.grpcStatusCode); + } + + private static List materializePeerTags(String[] pairs) { + if (pairs == null || pairs.length == 0) { + return Collections.emptyList(); + } + if (pairs.length == 2) { + // single-entry fast path (matches the original singletonList shape for INTERNAL spans) + return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); + } + List tags = new ArrayList<>(pairs.length / 2); + for (int i = 0; i < pairs.length; i += 2) { + tags.add(encodePeerTag(pairs[i], pairs[i + 1])); + } + return tags; + } + + private static UTF8BytesString encodePeerTag(String name, String value) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); + return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java new file mode 100644 index 00000000000..6c4839e4e4f --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -0,0 +1,234 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; +import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.metrics.agent.AgentMeter; +import datadog.metrics.api.statsd.StatsDClient; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.metrics.impl.MonitoringImpl; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class AggregateTableTest { + + @BeforeAll + static void initAgentMeter() { + // AggregateMetric.recordOneDuration -> Histogram.accept needs AgentMeter to be initialized. + // Mirror what AggregateMetricTest does. + MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS); + AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY); + monitoring.newTimer("test.init"); + } + + @Test + void insertOnMissReturnsNewAggregate() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot s = snapshot("svc", "op", "client"); + + AggregateMetric agg = table.findOrInsert(s); + + assertNotNull(agg); + assertEquals(1, table.size()); + assertEquals(0, agg.getHitCount()); + } + + @Test + void hitReturnsSameAggregateInstance() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot s1 = snapshot("svc", "op", "client"); + SpanSnapshot s2 = snapshot("svc", "op", "client"); + + AggregateMetric first = table.findOrInsert(s1); + AggregateMetric second = table.findOrInsert(s2); + + assertSame(first, second); + assertEquals(1, table.size()); + } + + @Test + void differentKindFieldsAreDistinct() { + AggregateTable table = new AggregateTable(8); + + AggregateMetric clientAgg = table.findOrInsert(snapshot("svc", "op", "client")); + AggregateMetric serverAgg = table.findOrInsert(snapshot("svc", "op", "server")); + + assertNotSame(clientAgg, serverAgg); + assertEquals(2, table.size()); + } + + @Test + void peerTagPairsParticipateInIdentity() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot withTags = + builder("svc", "op", "client").peerTags("peer.hostname", "host-a").build(); + SpanSnapshot otherTags = + builder("svc", "op", "client").peerTags("peer.hostname", "host-b").build(); + SpanSnapshot noTags = builder("svc", "op", "client").build(); + + AggregateMetric a = table.findOrInsert(withTags); + AggregateMetric b = table.findOrInsert(otherTags); + AggregateMetric c = table.findOrInsert(noTags); + + assertNotSame(a, b); + assertNotSame(a, c); + assertNotSame(b, c); + assertEquals(3, table.size()); + } + + @Test + void capOverrunEvictsStaleEntry() { + AggregateTable table = new AggregateTable(2); + + AggregateMetric stale = table.findOrInsert(snapshot("svc-a", "op", "client")); + // do not record on stale -> hitCount stays at 0 + + AggregateMetric live = table.findOrInsert(snapshot("svc-b", "op", "client")); + live.recordOneDuration(10L | TOP_LEVEL_TAG); // hitCount=1, not evictable + + // table is full (size=2). Inserting a third should evict the stale one and succeed. + AggregateMetric newcomer = table.findOrInsert(snapshot("svc-c", "op", "client")); + assertNotNull(newcomer); + assertEquals(2, table.size()); + + // re-inserting the stale snapshot should miss now (it was evicted) and produce a fresh entry + AggregateMetric staleAgain = table.findOrInsert(snapshot("svc-a", "op", "client")); + assertNotSame(stale, staleAgain); + } + + @Test + void capOverrunWithNoStaleReturnsNull() { + AggregateTable table = new AggregateTable(2); + + AggregateMetric a = table.findOrInsert(snapshot("svc-a", "op", "client")); + AggregateMetric b = table.findOrInsert(snapshot("svc-b", "op", "client")); + a.recordOneDuration(10L); + b.recordOneDuration(20L); + + AggregateMetric c = table.findOrInsert(snapshot("svc-c", "op", "client")); + assertNull(c); + assertEquals(2, table.size()); + } + + @Test + void expungeStaleAggregatesRemovesZeroHitsOnly() { + AggregateTable table = new AggregateTable(16); + + AggregateMetric live = table.findOrInsert(snapshot("svc-live", "op", "client")); + live.recordOneDuration(10L); + AggregateMetric stale1 = table.findOrInsert(snapshot("svc-stale1", "op", "client")); + AggregateMetric stale2 = table.findOrInsert(snapshot("svc-stale2", "op", "client")); + assertEquals(3, table.size()); + assertEquals(0, stale1.getHitCount()); + assertEquals(0, stale2.getHitCount()); + + table.expungeStaleAggregates(); + + assertEquals(1, table.size()); + // the live entry must still be reachable + assertSame(live, table.findOrInsert(snapshot("svc-live", "op", "client"))); + } + + @Test + void forEachVisitsEveryEntry() { + AggregateTable table = new AggregateTable(8); + table.findOrInsert(snapshot("a", "op", "client")).recordOneDuration(1L); + table.findOrInsert(snapshot("b", "op", "client")).recordOneDuration(2L); + table.findOrInsert(snapshot("c", "op", "client")).recordOneDuration(3L | ERROR_TAG); + + Map visited = new HashMap<>(); + table.forEach((key, agg) -> visited.put(key.getService().toString(), agg.getDuration())); + + assertEquals(3, visited.size()); + assertEquals(1L, visited.get("a")); + assertEquals(2L, visited.get("b")); + assertEquals(3L, visited.get("c")); + } + + @Test + void clearEmptiesTheTable() { + AggregateTable table = new AggregateTable(8); + table.findOrInsert(snapshot("a", "op", "client")); + table.findOrInsert(snapshot("b", "op", "client")); + assertEquals(2, table.size()); + + table.clear(); + + assertTrue(table.isEmpty()); + assertEquals(0, table.size()); + // and re-insertion works after clear + assertNotNull(table.findOrInsert(snapshot("a", "op", "client"))); + } + + @Test + void canonicalMetricKeyIsBuiltOnInsert() { + AggregateTable table = new AggregateTable(4); + List seen = new ArrayList<>(); + table.findOrInsert(snapshot("svc", "op", "client")); + table.forEach((key, agg) -> seen.add(key)); + + assertEquals(1, seen.size()); + MetricKey k = seen.get(0); + assertEquals("svc", k.getService().toString()); + assertEquals("op", k.getOperationName().toString()); + assertEquals("client", k.getSpanKind().toString()); + } + + // ---------- helpers ---------- + + private static SpanSnapshot snapshot(String service, String operation, String spanKind) { + return builder(service, operation, spanKind).build(); + } + + private static SnapshotBuilder builder(String service, String operation, String spanKind) { + return new SnapshotBuilder(service, operation, spanKind); + } + + private static final class SnapshotBuilder { + private final String service; + private final String operation; + private final String spanKind; + private String[] peerTagPairs; + private long tagAndDuration = 0L; + + SnapshotBuilder(String service, String operation, String spanKind) { + this.service = service; + this.operation = operation; + this.spanKind = spanKind; + } + + SnapshotBuilder peerTags(String... namesAndValues) { + this.peerTagPairs = namesAndValues; + return this; + } + + SpanSnapshot build() { + return new SpanSnapshot( + "resource", + service, + operation, + null, + "web", + (short) 200, + false, + true, + spanKind, + peerTagPairs, + null, + null, + null, + tagAndDuration); + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/util/Hashtable.java b/internal-api/src/main/java/datadog/trace/util/Hashtable.java new file mode 100644 index 00000000000..d7f49dcae00 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/util/Hashtable.java @@ -0,0 +1,553 @@ +package datadog.trace.util; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * Light weight simple Hashtable system that can be useful when HashMap would + * be unnecessarily heavy. + * + *

    Use cases include... + *
  • primitive keys + *
  • primitive values + *
  • multi-part keys + *
+ * + * Convenience classes are provided for lower key dimensions. + * + * For higher key dimensions, client code must implement its own class, + * but can still use the support class to ease the implementation complexity. + */ +public abstract class Hashtable { + /** + * Internal base class for entries. Stores the precomputed 64-bit keyHash and + * the chain-next pointer used to link colliding entries within a single bucket. + * + *

Subclasses add the actual key field(s) and a {@code matches(...)} method + * tailored to their key arity. See {@link D1.Entry} and {@link D2.Entry}; for + * higher arities, client code can subclass this directly and use {@link Support} + * to drive the table mechanics. + */ + public static abstract class Entry { + public final long keyHash; + Entry next = null; + + protected Entry(long keyHash) { + this.keyHash = keyHash; + } + + public final void setNext(TEntry next) { + this.next = next; + } + + @SuppressWarnings("unchecked") + public final TEntry next() { + return (TEntry)this.next; + } + } + + /** + * Single-key open hash table with chaining. + * + *

The user supplies an {@link D1.Entry} subclass that carries the key and + * whatever value fields they want to mutate in place, then instantiates this + * class over that entry type. The main advantage over {@code HashMap} + * is that mutating an existing entry's value fields requires no allocation: + * call {@link #get} once and write directly to the returned entry's fields. + * For counter-style workloads this can be several times faster than + * {@code HashMap} and produces effectively zero GC pressure. + * + *

Capacity is fixed at construction. The table does not resize, so the + * caller is responsible for choosing a capacity appropriate to the working + * set. Actual bucket-array length is rounded up to the next power of two. + * + *

Null keys are permitted; they collapse to a single bucket via the + * sentinel hash {@link Long#MIN_VALUE} defined in {@link D1.Entry#hash}. + * + *

Not thread-safe. Concurrent access (including mixing reads with + * writes) requires external synchronization. + * + * @param the key type + * @param the user's {@link D1.Entry D1.Entry<K>} subclass + */ + public static final class D1> { + /** + * Abstract base for {@link D1} entries. Subclass to add value fields you + * wish to mutate in place after retrieving the entry via {@link D1#get}. + * + *

The key is captured at construction and stored alongside its + * precomputed 64-bit hash. {@link #matches(Object)} uses + * {@link Objects#equals} by default; override if a different equality + * semantics is needed (e.g. reference equality for interned keys). + * + * @param the key type + */ + public static abstract class Entry extends Hashtable.Entry { + final K key; + + protected Entry(K key) { + super(hash(key)); + this.key = key; + } + + public boolean matches(Object key) { + return Objects.equals(this.key, key); + } + + public static long hash(Object key) { + return (key == null ) ? Long.MIN_VALUE : key.hashCode(); + } + } + + private final Hashtable.Entry[] buckets; + private int size; + + public D1(int capacity) { + this.buckets = Support.create(capacity); + this.size = 0; + } + + public int size() { + return this.size; + } + + @SuppressWarnings("unchecked") + public TEntry get(K key) { + long keyHash = D1.Entry.hash(key); + Hashtable.Entry[] thisBuckets = this.buckets; + for (Hashtable.Entry e = thisBuckets[Support.bucketIndex(thisBuckets, keyHash)]; e != null; e = e.next) { + if (e.keyHash == keyHash) { + TEntry te = (TEntry) e; + if (te.matches(key)) return te; + } + } + return null; + } + + public TEntry remove(K key) { + long keyHash = D1.Entry.hash(key); + + for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, keyHash); iter.hasNext(); ) { + TEntry curEntry = iter.next(); + + if (curEntry.matches(key)) { + iter.remove(); + this.size -= 1; + return curEntry; + } + } + + return null; + } + + public void insert(TEntry newEntry) { + Hashtable.Entry[] thisBuckets = this.buckets; + int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash); + + Hashtable.Entry curHead = thisBuckets[bucketIndex]; + newEntry.setNext(curHead); + thisBuckets[bucketIndex] = newEntry; + + this.size += 1; + } + + public TEntry insertOrReplace(TEntry newEntry) { + Hashtable.Entry[] thisBuckets = this.buckets; + + for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, newEntry.keyHash); iter.hasNext(); ) { + TEntry curEntry = iter.next(); + + if (curEntry.matches(newEntry.key)) { + iter.replace(newEntry); + return curEntry; + } + } + + int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash); + + Hashtable.Entry curHead = thisBuckets[bucketIndex]; + newEntry.setNext(curHead); + thisBuckets[bucketIndex] = newEntry; + this.size += 1; + return null; + } + + public void clear() { + Support.clear(this.buckets); + this.size = 0; + } + + @SuppressWarnings("unchecked") + public void forEach(Consumer consumer) { + Hashtable.Entry[] thisBuckets = this.buckets; + for (int i = 0; i < thisBuckets.length; i++) { + for (Hashtable.Entry e = thisBuckets[i]; e != null; e = e.next()) { + consumer.accept((TEntry) e); + } + } + } + } + + /** + * Two-key (composite-key) hash table with chaining. + * + *

The user supplies a {@link D2.Entry} subclass carrying both key parts + * and any value fields. Compared to {@code HashMap} this avoids the + * per-lookup {@code Pair} (or record) allocation: both key parts are passed + * directly through {@link #get}, {@link #remove}, {@link #insert}, and + * {@link #insertOrReplace}. Combined with in-place value mutation, this + * makes {@code D2} substantially less GC-intensive than the equivalent + * {@code HashMap} for counter-style workloads. + * + *

Capacity is fixed at construction; the table does not resize. Actual + * bucket-array length is rounded up to the next power of two. + * + *

Key parts are combined into a 64-bit hash via {@link LongHashingUtils}; + * see {@link D2.Entry#hash(Object, Object)}. + * + *

Not thread-safe. + * + * @param first key type + * @param second key type + * @param the user's {@link D2.Entry D2.Entry<K1, K2>} subclass + */ + public static final class D2> { + /** + * Abstract base for {@link D2} entries. Subclass to add value fields you + * wish to mutate in place. + * + *

Both key parts are captured at construction and stored alongside their + * combined 64-bit hash. {@link #matches(Object, Object)} uses + * {@link Objects#equals} pairwise on the two parts. + * + * @param first key type + * @param second key type + */ + public static abstract class Entry extends Hashtable.Entry { + final K1 key1; + final K2 key2; + + protected Entry(K1 key1, K2 key2) { + super(hash(key1, key2)); + this.key1 = key1; + this.key2 = key2; + } + + public boolean matches(K1 key1, K2 key2) { + return Objects.equals(this.key1, key1) && Objects.equals(this.key2, key2); + } + + public static long hash(Object key1, Object key2) { + return LongHashingUtils.hash(key1, key2); + } + } + + private final Hashtable.Entry[] buckets; + private int size; + + public D2(int capacity) { + this.buckets = Support.create(capacity); + this.size = 0; + } + + public int size() { + return this.size; + } + + @SuppressWarnings("unchecked") + public TEntry get(K1 key1, K2 key2) { + long keyHash = D2.Entry.hash(key1, key2); + Hashtable.Entry[] thisBuckets = this.buckets; + for (Hashtable.Entry e = thisBuckets[Support.bucketIndex(thisBuckets, keyHash)]; e != null; e = e.next) { + if (e.keyHash == keyHash) { + TEntry te = (TEntry) e; + if (te.matches(key1, key2)) return te; + } + } + return null; + } + + public TEntry remove(K1 key1, K2 key2) { + long keyHash = D2.Entry.hash(key1, key2); + + for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, keyHash); iter.hasNext(); ) { + TEntry curEntry = iter.next(); + + if (curEntry.matches(key1, key2)) { + iter.remove(); + this.size -= 1; + return curEntry; + } + } + + return null; + } + + public void insert(TEntry newEntry) { + Hashtable.Entry[] thisBuckets = this.buckets; + int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash); + + Hashtable.Entry curHead = thisBuckets[bucketIndex]; + newEntry.setNext(curHead); + thisBuckets[bucketIndex] = newEntry; + + this.size += 1; + } + + public TEntry insertOrReplace(TEntry newEntry) { + Hashtable.Entry[] thisBuckets = this.buckets; + + for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, newEntry.keyHash); iter.hasNext(); ) { + TEntry curEntry = iter.next(); + + if (curEntry.matches(newEntry.key1, newEntry.key2)) { + iter.replace(newEntry); + return curEntry; + } + } + + int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash); + + Hashtable.Entry curHead = thisBuckets[bucketIndex]; + newEntry.setNext(curHead); + thisBuckets[bucketIndex] = newEntry; + this.size += 1; + return null; + } + + public void clear() { + Support.clear(this.buckets); + this.size = 0; + } + + @SuppressWarnings("unchecked") + public void forEach(Consumer consumer) { + Hashtable.Entry[] thisBuckets = this.buckets; + for (int i = 0; i < thisBuckets.length; i++) { + for (Hashtable.Entry e = thisBuckets[i]; e != null; e = e.next()) { + consumer.accept((TEntry) e); + } + } + } + } + + /** + * Internal building blocks for hash-table operations. + * + *

Used by {@link D1} and {@link D2}, and available to package code that + * wants to assemble its own higher-arity table (3+ key parts) without + * re-implementing the bucket-array mechanics. The typical recipe: + * + *

    + *
  • Subclass {@link Hashtable.Entry} directly, adding the key fields and + * a {@code matches(...)} method of your chosen arity. + *
  • Allocate a backing array with {@link #create(int)}. + *
  • Use {@link #bucketIndex(Object[], long)} for the bucket lookup, + * {@link #bucketIterator(Hashtable.Entry[], long)} for read-only chain + * walks, and {@link #mutatingBucketIterator(Hashtable.Entry[], long)} + * when you also need {@code remove} / {@code replace}. + *
  • Clear with {@link #clear(Hashtable.Entry[])}. + *
+ * + *

All bucket arrays produced by {@link #create(int)} have a power-of-two + * length, so {@link #bucketIndex(Object[], long)} can use a bit mask. + * + *

Methods on this class are package-private; the class itself is public + * only so that its nested {@link BucketIterator} can be referenced by + * callers in other packages. + */ + public static final class Support { + public static final Hashtable.Entry[] create(int capacity) { + return new Entry[sizeFor(capacity)]; + } + + static final int sizeFor(int requestedCapacity) { + int pow; + for ( pow = 1; pow < requestedCapacity; pow *= 2 ); + return pow; + } + + public static final void clear(Hashtable.Entry[] buckets) { + Arrays.fill(buckets, null); + } + + public static final BucketIterator bucketIterator(Hashtable.Entry[] buckets, long keyHash) { + return new BucketIterator(buckets, keyHash); + } + + public static final MutatingBucketIterator mutatingBucketIterator(Hashtable.Entry[] buckets, long keyHash) { + return new MutatingBucketIterator(buckets, keyHash); + } + + public static final int bucketIndex(Object[] buckets, long keyHash) { + return (int)(keyHash & buckets.length - 1); + } + } + + /** + * Read-only iterator over entries in a single bucket whose {@code keyHash} + * matches a specific search hash. Cheaper than {@link MutatingBucketIterator} + * because it does not track the previous-node pointers required for + * splicing — use it when you only need to walk the chain. + * + *

For {@code remove} or {@code replace} operations, use + * {@link MutatingBucketIterator} instead. + */ + public static final class BucketIterator implements Iterator { + private final long keyHash; + private Hashtable.Entry nextEntry; + + BucketIterator(Hashtable.Entry[] buckets, long keyHash) { + this.keyHash = keyHash; + Hashtable.Entry cur = buckets[Support.bucketIndex(buckets, keyHash)]; + while (cur != null && cur.keyHash != keyHash) cur = cur.next; + this.nextEntry = cur; + } + + @Override + public boolean hasNext() { + return this.nextEntry != null; + } + + @Override + @SuppressWarnings("unchecked") + public TEntry next() { + Hashtable.Entry cur = this.nextEntry; + if (cur == null) throw new NoSuchElementException("no next!"); + + Hashtable.Entry advance = cur.next; + while (advance != null && advance.keyHash != keyHash) advance = advance.next; + this.nextEntry = advance; + + return (TEntry) cur; + } + } + + /** + * Mutating iterator over entries in a single bucket whose {@code keyHash} + * matches a specific search hash. Supports {@link #remove()} and + * {@link #replace(Entry)} to splice the chain in place. + * + *

Carries previous-node pointers for the current entry and the next-match + * entry so that {@code remove} and {@code replace} can fix up the chain in + * O(1) without re-walking from the bucket head. After {@code remove} or + * {@code replace}, iteration may continue with another {@link #next()}. + */ + public static final class MutatingBucketIterator implements Iterator { + private final long keyHash; + + private final Hashtable.Entry[] buckets; + + /** + * The entry prior to the last entry returned by next + * Used for mutating operations + */ + private Hashtable.Entry curPrevEntry; + + /** + * The entry that was last returned by next + */ + private Hashtable.Entry curEntry; + + /** + * The entry prior to the next entry + */ + private Hashtable.Entry nextPrevEntry; + + /** + * The next entry to be returned by next + */ + private Hashtable.Entry nextEntry; + + MutatingBucketIterator(Hashtable.Entry[] buckets, long keyHash) { + this.buckets = buckets; + this.keyHash = keyHash; + + int bucketIndex = Support.bucketIndex(buckets, keyHash); + Hashtable.Entry headEntry = this.buckets[bucketIndex]; + if ( headEntry == null ) { + this.nextEntry = null; + this.nextPrevEntry = null; + + this.curEntry = null; + this.curPrevEntry = null; + } else { + Hashtable.Entry prev, cur; + for ( prev = null, cur = headEntry; cur != null; prev = cur, cur = cur.next() ) { + if ( cur.keyHash == keyHash ) break; + } + this.nextPrevEntry = prev; + this.nextEntry = cur; + + this.curEntry = null; + this.curPrevEntry = null; + } + } + + @Override + public boolean hasNext() { + return (this.nextEntry != null); + } + + @Override + @SuppressWarnings("unchecked") + public TEntry next() { + Hashtable.Entry curEntry = this.nextEntry; + if ( curEntry == null ) throw new NoSuchElementException("no next!"); + + this.curEntry = curEntry; + this.curPrevEntry = this.nextPrevEntry; + + Hashtable.Entry prev, cur; + for ( prev = this.nextEntry, cur = this.nextEntry.next(); cur != null; prev = cur, cur = prev.next() ) { + if ( cur.keyHash == keyHash ) break; + } + this.nextPrevEntry = prev; + this.nextEntry = cur; + + return (TEntry) curEntry; + } + + @Override + public void remove() { + Hashtable.Entry oldCurEntry = this.curEntry; + if ( oldCurEntry == null ) throw new IllegalStateException(); + + this.setPrevNext(oldCurEntry.next()); + + // If the next match was directly after oldCurEntry, its predecessor is now + // curPrevEntry (oldCurEntry was just unlinked from the chain). + if ( this.nextPrevEntry == oldCurEntry ) { + this.nextPrevEntry = this.curPrevEntry; + } + this.curEntry = null; + } + + public void replace(TEntry replacementEntry) { + Hashtable.Entry oldCurEntry = this.curEntry; + if ( oldCurEntry == null ) throw new IllegalStateException(); + + replacementEntry.setNext(oldCurEntry.next()); + this.setPrevNext(replacementEntry); + + // If the next match was directly after oldCurEntry, its predecessor is now + // the replacement entry (which took oldCurEntry's chain slot). + if ( this.nextPrevEntry == oldCurEntry ) { + this.nextPrevEntry = replacementEntry; + } + this.curEntry = replacementEntry; + } + + void setPrevNext(Hashtable.Entry nextEntry) { + if ( this.curPrevEntry == null ) { + Hashtable.Entry[] buckets = this.buckets; + buckets[Support.bucketIndex(buckets, this.keyHash)] = nextEntry; + } else { + this.curPrevEntry.setNext(nextEntry); + } + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java b/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java new file mode 100644 index 00000000000..bc53bc4ecb6 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java @@ -0,0 +1,158 @@ +package datadog.trace.util; + +/** + * This class is intended to be a drop-in replacement for the hashing portions of java.util.Objects. + * This class provides more convenience methods for hashing primitives and includes overrides for + * hash that take many argument lengths to avoid var-args allocation. + */ +public final class LongHashingUtils { + private LongHashingUtils() {} + + public static final long hashCodeX(Object obj) { + return obj == null ? Long.MIN_VALUE : obj.hashCode(); + } + + public static final long hash(boolean value) { + return Boolean.hashCode(value); + } + + public static final long hash(char value) { + return Character.hashCode(value); + } + + public static final long hash(byte value) { + return Byte.hashCode(value); + } + + public static final long hash(short value) { + return Short.hashCode(value); + } + + public static final long hash(int value) { + return Integer.hashCode(value); + } + + public static final long hash(long value) { + return value; + } + + public static final long hash(float value) { + return Float.hashCode(value); + } + + public static final long hash(double value) { + return Double.doubleToRawLongBits(value); + } + + public static final long hash(Object obj0, Object obj1) { + return hash(intHash(obj0), intHash(obj1)); + } + + public static final long hash(int hash0, int hash1) { + return 31L * hash0 + hash1; + } + + private static final int intHash(Object obj) { + return obj == null ? 0 : obj.hashCode(); + } + + public static final long hash(Object obj0, Object obj1, Object obj2) { + return hash(intHash(obj0), intHash(obj1), intHash(obj2)); + } + + public static final long hash(long hash0, long hash1, long hash2) { + // DQH - Micro-optimizing, 31L * 31L will constant fold + // Since there are multiple execution ports for load & store, + // this will make good use of the core. + return 31L * 31L * hash0 + 31L * hash1 + hash2; + } + + public static final long hash(Object obj0, Object obj1, Object obj2, Object obj3) { + return hash(intHash(obj0), intHash(obj1), intHash(obj2), intHash(obj3)); + } + + public static final long hash(int hash0, int hash1, int hash2, int hash3) { + // DQH - Micro-optimizing, 31L * 31L will constant fold + // Since there are multiple execution ports for load & store, + // this will make good use of the core. + return 31L * 31L * 31L * hash0 + 31L * 31L * hash1 + 31L * hash2 + hash3; + } + + public static final long hash(Object obj0, Object obj1, Object obj2, Object obj3, Object obj4) { + return hash(intHash(obj0), intHash(obj1), intHash(obj2), intHash(obj3), intHash(obj4)); + } + + public static final long hash(int hash0, int hash1, int hash2, int hash3, int hash4) { + // DQH - Micro-optimizing, 31L * 31L will constant fold + // Since there are multiple execution ports for load & store, + // this will make good use of the core. + return 31L * 31L * 31L * 31L * hash0 + 31L * 31L * 31L * hash1 + 31L * 31L * hash2 + 31L * hash3 + hash4; + } + + @Deprecated + public static final long hash(int[] hashes) { + long result = 0; + for (int hash : hashes) { + result = addToHash(result, hash); + } + return result; + } + + public static final long addToHash(long hash, int value) { + return 31L * hash + value; + } + + public static final long addToHash(long hash, Object obj) { + return addToHash(hash, intHash(obj)); + } + + public static final long addToHash(long hash, boolean value) { + return addToHash(hash, Boolean.hashCode(value)); + } + + public static final long addToHash(long hash, char value) { + return addToHash(hash, Character.hashCode(value)); + } + + public static final long addToHash(long hash, byte value) { + return addToHash(hash, Byte.hashCode(value)); + } + + public static final long addToHash(long hash, short value) { + return addToHash(hash, Short.hashCode(value)); + } + + public static final long addToHash(long hash, long value) { + return addToHash(hash, Long.hashCode(value)); + } + + public static final long addToHash(long hash, float value) { + return addToHash(hash, Float.hashCode(value)); + } + + public static final long addToHash(long hash, double value) { + return addToHash(hash, Double.hashCode(value)); + } + + public static final long hash(Iterable objs) { + long result = 0; + for (Object obj : objs) { + result = addToHash(result, obj); + } + return result; + } + + /** + * Calling this var-arg version can result in large amounts of allocation (see HashingBenchmark) + * Rather than calliing this method, add another override of hash that handles a larger number of + * arguments or use calls to addToHash. + */ + @Deprecated + public static final long hash(Object[] objs) { + long result = 0; + for (Object obj : objs) { + result = addToHash(result, obj); + } + return result; + } +} From 3ccb4e5a6d7594c600c7bac9bda040805fe96c8b Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 14:24:09 -0400 Subject: [PATCH 2/3] Swap LRUCache for AggregateTable in Aggregator + route disable() clear Replace LRUCache with the AggregateTable added in the prior commit. The hot path in Drainer.accept becomes: AggregateMetric aggregate = aggregates.findOrInsert(snapshot); if (aggregate != null) { aggregate.recordOneDuration(snapshot.tagAndDuration); dirty = true; } else { healthMetrics.onStatsAggregateDropped(); } On the steady-state hit path the lookup is a 64-bit hash compute + bucket walk + matches(snapshot) -- no MetricKey allocation, no SERVICE_NAMES / SPAN_KINDS / PEER_TAGS_CACHE lookups. The canonical MetricKey is now built once per unique key at insert time, in MetricKeys.fromSnapshot. Behavioral change in the cap-overrun path ----------------------------------------- The old LRUCache evicted least-recently-used: at cap, a new insert would push out the oldest entry regardless of whether it was live or stale. AggregateTable instead scans for a hitCount==0 entry to recycle, and drops the new key if none exists. Practical impact: in the common case where the table holds a stable set of recurring keys, an unrelated burst of new keys is dropped (and reported via onStatsAggregateDropped) rather than evicting the established keys. The existing test that asserted "service0 evicted in favor of service10" is updated to assert the new semantics. The other cap-related test ("should not report dropped aggregate when evicted entry was already flushed") still passes unchanged: after report() clears all entries to hitCount=0, the next wave of inserts recycles them. Threading fix ------------- ConflatingMetricsAggregator.disable() used to call aggregator.clearAggregates() and inbox.clear() directly from the Sink's IO event thread, racing with the aggregator thread mid-write. The race was tolerable for LinkedHashMap; it is not for AggregateTable (chain corruption can NPE or loop). disable() now offers a ClearSignal to the inbox so the aggregator thread itself performs the table clear and the inbox.clear(). Adds one SignalItem subclass + one branch in Drainer.accept; preserves the single-writer invariant for AggregateTable end-to-end. Removed: LRUCache import, AggregateExpiry inner class, the static buildMetricKey / materializePeerTags / encodePeerTag helpers (now in MetricKeys). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/Aggregator.java | 120 ++++-------------- .../metrics/ConflatingMetricsAggregator.java | 7 +- .../trace/common/metrics/InboxItem.java | 11 ++ .../ConflatingMetricAggregatorTest.groovy | 11 +- 4 files changed, 49 insertions(+), 100 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index e632555cc21..d0262f328f6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -1,26 +1,12 @@ package datadog.trace.common.metrics; -import static datadog.trace.api.Functions.UTF8_ENCODE; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import datadog.trace.api.Pair; -import datadog.trace.api.cache.DDCache; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.common.metrics.SignalItem.ClearSignal; import datadog.trace.common.metrics.SignalItem.StopSignal; import datadog.trace.core.monitor.HealthMetrics; -import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +18,9 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); private final MessagePassingQueue inbox; - private final LRUCache aggregates; + private final AggregateTable aggregates; private final MetricWriter writer; + private final HealthMetrics healthMetrics; // the reporting interval controls how much history will be buffered // when the agent is unresponsive (only 10 pending requests will be // buffered by OkHttpSink) @@ -73,27 +60,10 @@ final class Aggregator implements Runnable { HealthMetrics healthMetrics) { this.writer = writer; this.inbox = inbox; - this.aggregates = - new LRUCache<>( - new AggregateExpiry(healthMetrics), maxAggregates * 4 / 3, 0.75f, maxAggregates); + this.aggregates = new AggregateTable(maxAggregates); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; - } - - private static final class AggregateExpiry - implements LRUCache.ExpiryListener { - private final HealthMetrics healthMetrics; - - AggregateExpiry(HealthMetrics healthMetrics) { - this.healthMetrics = healthMetrics; - } - - @Override - public void accept(Map.Entry expired) { - if (expired.getValue().getHitCount() > 0) { - healthMetrics.onStatsAggregateDropped(); - } - } + this.healthMetrics = healthMetrics; } public void clearAggregates() { @@ -126,7 +96,13 @@ private final class Drainer implements MessagePassingQueue.Consumer { @Override public void accept(InboxItem item) { - if (item instanceof SignalItem) { + if (item == ClearSignal.CLEAR) { + if (!stopped) { + aggregates.clear(); + inbox.clear(); + } + ((SignalItem) item).complete(); + } else if (item instanceof SignalItem) { SignalItem signal = (SignalItem) item; if (!stopped) { report(wallClockTime(), signal); @@ -139,64 +115,31 @@ public void accept(InboxItem item) { } } else if (item instanceof SpanSnapshot && !stopped) { SpanSnapshot snapshot = (SpanSnapshot) item; - MetricKey key = buildMetricKey(snapshot); - AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); - aggregate.recordOneDuration(snapshot.tagAndDuration); - dirty = true; + AggregateMetric aggregate = aggregates.findOrInsert(snapshot); + if (aggregate != null) { + aggregate.recordOneDuration(snapshot.tagAndDuration); + dirty = true; + } else { + // table at cap with no stale entry available to evict + healthMetrics.onStatsAggregateDropped(); + } } } } - private static MetricKey buildMetricKey(SpanSnapshot s) { - return new MetricKey( - s.resourceName, - SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE), - s.operationName, - s.serviceNameSource, - s.spanType, - s.httpStatusCode, - s.synthetic, - s.traceRoot, - SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create), - materializePeerTags(s.peerTagPairs), - s.httpMethod, - s.httpEndpoint, - s.grpcStatusCode); - } - - private static List materializePeerTags(String[] pairs) { - if (pairs == null || pairs.length == 0) { - return Collections.emptyList(); - } - if (pairs.length == 2) { - // single-entry fast path (matches the original singletonList shape for INTERNAL spans) - return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); - } - List tags = new ArrayList<>(pairs.length / 2); - for (int i = 0; i < pairs.length; i += 2) { - tags.add(encodePeerTag(pairs[i], pairs[i + 1])); - } - return tags; - } - - private static UTF8BytesString encodePeerTag(String name, String value) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); - return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); - } - private void report(long when, SignalItem signal) { boolean skipped = true; if (dirty) { try { - expungeStaleAggregates(); + aggregates.expungeStaleAggregates(); if (!aggregates.isEmpty()) { skipped = false; writer.startBucket(aggregates.size(), when, reportingIntervalNanos); - for (Map.Entry aggregate : aggregates.entrySet()) { - writer.add(aggregate.getKey(), aggregate.getValue()); - aggregate.getValue().clear(); - } + aggregates.forEach( + (key, agg) -> { + writer.add(key, agg); + agg.clear(); + }); // note that this may do IO and block writer.finishBucket(); } @@ -212,17 +155,6 @@ private void report(long when, SignalItem signal) { } } - private void expungeStaleAggregates() { - Iterator> it = aggregates.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry pair = it.next(); - AggregateMetric metric = pair.getValue(); - if (metric.getHitCount() == 0) { - it.remove(); - } - } - } - private long wallClockTime() { return MILLISECONDS.toNanos(System.currentTimeMillis()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 9ea77140113..79dcf991c10 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -8,6 +8,7 @@ import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; +import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP; import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; @@ -418,8 +419,10 @@ private void disable() { features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - this.inbox.clear(); - this.aggregator.clearAggregates(); + // Route the clear through the inbox so the aggregator thread is the only writer. + // AggregateTable is not thread-safe; calling clearAggregates() directly from this thread + // would race with Drainer.accept on the aggregator thread. + inbox.offer(CLEAR); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java index 7d66cad6a15..a0625be095b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java @@ -28,4 +28,15 @@ private StopSignal() {} static final class ReportSignal extends SignalItem { static final ReportSignal REPORT = new ReportSignal(); } + + /** + * Posted from arbitrary threads (e.g. the Sink event thread during agent downgrade) so the + * aggregator thread is the one that actually performs the table reset. Keeps {@link + * AggregateTable} and {@code inbox.clear()} single-writer. + */ + static final class ClearSignal extends SignalItem { + static final ClearSignal CLEAR = new ClearSignal(); + + private ClearSignal() {} + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 962ad2ce892..dedd0bae75b 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -877,7 +877,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { aggregator.close() } - def "test least recently written to aggregate flushed when size limit exceeded"() { + def "new aggregates beyond size limit are dropped when no stale entries can be evicted"() { + // The table only evicts entries with hitCount == 0 to make room. When all entries are live + // (all have been recorded against), an over-cap insert drops the new key rather than evicting + // an established one. This protects the data we've already collected from a burst of new keys. setup: int maxAggregates = 10 MetricWriter writer = Mock(MetricWriter) @@ -901,10 +904,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { aggregator.report() def latchTriggered = latch.await(2, SECONDS) - then: "the first aggregate should be dropped but the rest reported" + then: "the established service0..service9 are reported; service10 is dropped" latchTriggered 1 * writer.startBucket(10, _, SECONDS.toNanos(reportingInterval)) - for (int i = 1; i < 11; ++i) { + for (int i = 0; i < 10; ++i) { 1 * writer.add(new MetricKey( "resource", "service" + i, @@ -925,7 +928,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { } 0 * writer.add(new MetricKey( "resource", - "service0", + "service10", "operation", null, "type", From 050a9987319217eb3d6ff4c85c116ddb0641e882 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 15:07:16 -0400 Subject: [PATCH 3/3] Eliminate MetricKey: inline its fields onto AggregateEntry MetricKey existed for two reasons -- the prior LRUCache key role (now handled by AggregateTable's Hashtable.Entry mechanics) and as the labels argument to MetricWriter.add. The first is gone; the second is the only thing keeping MetricKey alive. Fold its UTF8-encoded label fields onto AggregateEntry, change MetricWriter.add to take AggregateEntry directly, and delete MetricKey + MetricKeys. What AggregateEntry now holds ----------------------------- - 10 UTF8BytesString label fields (resource, service, operationName, serviceSource, type, spanKind, httpMethod, httpEndpoint, grpcStatusCode, and a List peerTags for serialization). - 3 primitives (httpStatusCode, synthetic, traceRoot). - AggregateMetric (the value being accumulated). - The raw String[] peerTagPairs is retained alongside the encoded peerTags -- matches() compares it positionally against the snapshot's pairs; the encoded form is only consumed by the writer. matches(SpanSnapshot) compares the entry's UTF8 forms to the snapshot's raw String / CharSequence fields via content-equality (UTF8BytesString.toString() returns the underlying String in O(1)). This closes a latent bug in the prior raw-vs-raw matches(): if one snapshot delivered a tag value as String and a later snapshot delivered the same content as UTF8BytesString, the old Objects.equals would return false and the table would split into two entries. Content-equality matching collapses them into one. Consolidated caches ------------------- The static UTF8 caches that used to live partly on MetricKey (RESOURCE_CACHE, OPERATION_CACHE, SERVICE_SOURCE_CACHE, TYPE_CACHE, KIND_CACHE, HTTP_METHOD_CACHE, HTTP_ENDPOINT_CACHE, GRPC_STATUS_CODE_CACHE, SERVICE_CACHE) and partly on ConflatingMetricsAggregator (SERVICE_NAMES, SPAN_KINDS, PEER_TAGS_CACHE) are all now on AggregateEntry. The split was duplicating work -- SERVICE_NAMES and SERVICE_CACHE both cached service-name to UTF8BytesString. One cache per field now. API change: MetricWriter.add ---------------------------- Was: add(MetricKey key, AggregateMetric aggregate) Now: add(AggregateEntry entry) The aggregate lives on the entry. Single-arg. SerializingMetricWriter reads the same UTF8 fields off AggregateEntry that it previously read off MetricKey; the wire format is byte-identical. Test impact ----------- AggregateEntry.of(...) takes the same 13 positional args new MetricKey(...) took, so test diffs are mostly mechanical: new MetricKey(args) -> AggregateEntry.of(args) writer.add(key, _) -> writer.add(entry) ValidatingSink in SerializingMetricWriterTest now iterates List directly. ConflatingMetricAggregatorTest's Spock matchers (~36 sites) rely on AggregateEntry.equals comparing the 13 label fields (not the aggregate) so the mock matches by labels regardless of the aggregate state at call time; post-invocation closures verify aggregate state. Benchmarks (2 forks x 5 iter x 15s) ----------------------------------- The change is consumer-thread only; producer publish() is unchanged. SimpleSpan bench: 3.123 +- 0.025 us/op (prior: 3.119 +- 0.018) DDSpan bench: 2.412 +- 0.022 us/op (prior: 2.463 +- 0.041) Both within noise -- the win is structural (one less class, one less allocation per miss, one fewer cache layer) rather than benchmarked. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 360 +++++++++++++++--- .../trace/common/metrics/AggregateTable.java | 16 +- .../trace/common/metrics/Aggregator.java | 6 +- .../metrics/ConflatingMetricsAggregator.java | 21 - .../trace/common/metrics/MetricKey.java | 178 --------- .../trace/common/metrics/MetricKeys.java | 65 ---- .../trace/common/metrics/MetricWriter.java | 6 +- .../metrics/SerializingMetricWriter.java | 37 +- .../trace/common/metrics/SpanSnapshot.java | 3 +- .../ConflatingMetricAggregatorTest.groovy | 264 ++++++------- .../SerializingMetricWriterTest.groovy | 333 ++++++---------- .../common/metrics/AggregateTableTest.java | 16 +- .../groovy/MetricsIntegrationTest.groovy | 17 +- 13 files changed, 609 insertions(+), 713 deletions(-) delete mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java delete mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKeys.java diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index 10e256620f5..e2fda9fde47 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -1,71 +1,176 @@ package datadog.trace.common.metrics; +import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; + +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.util.Hashtable; import datadog.trace.util.LongHashingUtils; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Objects; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; /** - * Hashtable entry pairing the raw {@link SpanSnapshot} key fields with their canonical {@link - * MetricKey} (built once on miss) and the mutable {@link AggregateMetric}. + * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data + * {@link SerializingMetricWriter} writes to the wire) plus the mutable {@link AggregateMetric}. + * + *

{@link #matches(SpanSnapshot)} compares the entry's stored UTF8 forms against the snapshot's + * raw {@code CharSequence}/{@code String}/{@code String[]} fields via content-equality, so {@code + * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry + * instead of splitting. * - *

Lookups compare the snapshot's raw fields against the entry's stored copies, so the consumer - * never has to build a {@link MetricKey} just to do a HashMap lookup. The {@code MetricKey} field - * is retained because the serializer ({@link MetricWriter#add}) needs it at report time. + *

The static UTF8 caches that used to live on {@code MetricKey} and {@code + * ConflatingMetricsAggregator} are consolidated here. */ final class AggregateEntry extends Hashtable.Entry { - final MetricKey key; - final AggregateMetric aggregate; - // Raw snapshot fields, used by matches(SpanSnapshot). Stored as captured at insert time; - // the canonical MetricKey above holds the UTF8BytesString-encoded forms. - private final CharSequence resourceName; - private final String serviceName; - private final CharSequence operationName; - private final CharSequence serviceNameSource; - private final CharSequence spanType; + // UTF8 caches consolidated from the previous MetricKey + ConflatingMetricsAggregator split. + private static final DDCache RESOURCE_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache SERVICE_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache OPERATION_CACHE = + DDCaches.newFixedSizeCache(64); + private static final DDCache SERVICE_SOURCE_CACHE = + DDCaches.newFixedSizeCache(16); + private static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8); + private static final DDCache SPAN_KIND_CACHE = + DDCaches.newFixedSizeCache(16); + private static final DDCache HTTP_METHOD_CACHE = + DDCaches.newFixedSizeCache(8); + private static final DDCache HTTP_ENDPOINT_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache GRPC_STATUS_CODE_CACHE = + DDCaches.newFixedSizeCache(32); + + /** + * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner + * cache produces the "name:value" encoded form the serializer writes. + */ + private static final DDCache< + String, Pair, Function>> + PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64); + + private static final Function< + String, Pair, Function>> + PEER_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); + + private final UTF8BytesString resource; + private final UTF8BytesString service; + private final UTF8BytesString operationName; + private final UTF8BytesString serviceSource; // nullable + private final UTF8BytesString type; + private final UTF8BytesString spanKind; + private final UTF8BytesString httpMethod; // nullable + private final UTF8BytesString httpEndpoint; // nullable + private final UTF8BytesString grpcStatusCode; // nullable private final short httpStatusCode; private final boolean synthetic; private final boolean traceRoot; - private final String spanKind; - private final String[] peerTagPairs; - private final String httpMethod; - private final String httpEndpoint; - private final String grpcStatusCode; - - AggregateEntry(MetricKey key, SpanSnapshot s, AggregateMetric aggregate) { - super(hashOf(s)); - this.key = key; - this.aggregate = aggregate; - this.resourceName = s.resourceName; - this.serviceName = s.serviceName; - this.operationName = s.operationName; - this.serviceNameSource = s.serviceNameSource; - this.spanType = s.spanType; + + // Peer tags carried in two forms: raw String[] for matches() against the snapshot's pairs, + // and pre-encoded List ("name:value") for the serializer. + private final String[] peerTagPairsRaw; + private final List peerTags; + + final AggregateMetric aggregate; + + /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */ + private AggregateEntry(SpanSnapshot s, long keyHash, AggregateMetric aggregate) { + super(keyHash); + this.resource = canonicalize(RESOURCE_CACHE, s.resourceName); + this.service = SERVICE_CACHE.computeIfAbsent(s.serviceName, UTF8_ENCODE); + this.operationName = canonicalize(OPERATION_CACHE, s.operationName); + this.serviceSource = + s.serviceNameSource == null + ? null + : canonicalize(SERVICE_SOURCE_CACHE, s.serviceNameSource); + this.type = canonicalize(TYPE_CACHE, s.spanType); + this.spanKind = SPAN_KIND_CACHE.computeIfAbsent(s.spanKind, UTF8BytesString::create); + this.httpMethod = + s.httpMethod == null + ? null + : HTTP_METHOD_CACHE.computeIfAbsent(s.httpMethod, UTF8BytesString::create); + this.httpEndpoint = + s.httpEndpoint == null + ? null + : HTTP_ENDPOINT_CACHE.computeIfAbsent(s.httpEndpoint, UTF8BytesString::create); + this.grpcStatusCode = + s.grpcStatusCode == null + ? null + : GRPC_STATUS_CODE_CACHE.computeIfAbsent(s.grpcStatusCode, UTF8BytesString::create); this.httpStatusCode = s.httpStatusCode; this.synthetic = s.synthetic; this.traceRoot = s.traceRoot; - this.spanKind = s.spanKind; - this.peerTagPairs = s.peerTagPairs; - this.httpMethod = s.httpMethod; - this.httpEndpoint = s.httpEndpoint; - this.grpcStatusCode = s.grpcStatusCode; + this.peerTagPairsRaw = s.peerTagPairs; + this.peerTags = materializePeerTags(s.peerTagPairs); + this.aggregate = aggregate; + } + + /** Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. */ + static AggregateEntry of( + CharSequence resource, + CharSequence service, + CharSequence operationName, + CharSequence serviceSource, + CharSequence type, + int httpStatusCode, + boolean synthetic, + boolean traceRoot, + CharSequence spanKind, + List peerTags, + CharSequence httpMethod, + CharSequence httpEndpoint, + CharSequence grpcStatusCode) { + String[] rawPairs = peerTagsToRawPairs(peerTags); + SpanSnapshot synthetic_snapshot = + new SpanSnapshot( + resource, + service == null ? null : service.toString(), + operationName, + serviceSource, + type, + (short) httpStatusCode, + synthetic, + traceRoot, + spanKind == null ? null : spanKind.toString(), + rawPairs, + httpMethod == null ? null : httpMethod.toString(), + httpEndpoint == null ? null : httpEndpoint.toString(), + grpcStatusCode == null ? null : grpcStatusCode.toString(), + 0L); + return new AggregateEntry( + synthetic_snapshot, hashOf(synthetic_snapshot), new AggregateMetric()); + } + + /** Construct from a snapshot at consumer-thread miss time. */ + static AggregateEntry forSnapshot(SpanSnapshot s, AggregateMetric aggregate) { + return new AggregateEntry(s, hashOf(s), aggregate); } boolean matches(SpanSnapshot s) { return httpStatusCode == s.httpStatusCode && synthetic == s.synthetic && traceRoot == s.traceRoot - && Objects.equals(resourceName, s.resourceName) - && Objects.equals(serviceName, s.serviceName) - && Objects.equals(operationName, s.operationName) - && Objects.equals(serviceNameSource, s.serviceNameSource) - && Objects.equals(spanType, s.spanType) - && Objects.equals(spanKind, s.spanKind) - && Arrays.equals(peerTagPairs, s.peerTagPairs) - && Objects.equals(httpMethod, s.httpMethod) - && Objects.equals(httpEndpoint, s.httpEndpoint) - && Objects.equals(grpcStatusCode, s.grpcStatusCode); + && contentEquals(resource, s.resourceName) + && stringContentEquals(service, s.serviceName) + && contentEquals(operationName, s.operationName) + && contentEquals(serviceSource, s.serviceNameSource) + && contentEquals(type, s.spanType) + && stringContentEquals(spanKind, s.spanKind) + && Arrays.equals(peerTagPairsRaw, s.peerTagPairs) + && stringContentEquals(httpMethod, s.httpMethod) + && stringContentEquals(httpEndpoint, s.httpEndpoint) + && stringContentEquals(grpcStatusCode, s.grpcStatusCode); } /** @@ -73,6 +178,9 @@ boolean matches(SpanSnapshot s) { * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes * to the same bucket the snapshot itself looks up. + * + *

Hashes are content-stable across {@code String} / {@code UTF8BytesString}: {@link + * UTF8BytesString#hashCode()} returns the underlying {@code String}'s hash. */ static long hashOf(SpanSnapshot s) { long h = 0; @@ -95,4 +203,166 @@ static long hashOf(SpanSnapshot s) { h = LongHashingUtils.addToHash(h, s.grpcStatusCode); return h; } + + // Accessors for SerializingMetricWriter. + UTF8BytesString getResource() { + return resource; + } + + UTF8BytesString getService() { + return service; + } + + UTF8BytesString getOperationName() { + return operationName; + } + + UTF8BytesString getServiceSource() { + return serviceSource; + } + + UTF8BytesString getType() { + return type; + } + + UTF8BytesString getSpanKind() { + return spanKind; + } + + UTF8BytesString getHttpMethod() { + return httpMethod; + } + + UTF8BytesString getHttpEndpoint() { + return httpEndpoint; + } + + UTF8BytesString getGrpcStatusCode() { + return grpcStatusCode; + } + + int getHttpStatusCode() { + return httpStatusCode; + } + + boolean isSynthetics() { + return synthetic; + } + + boolean isTraceRoot() { + return traceRoot; + } + + List getPeerTags() { + return peerTags; + } + + /** + * Equality on the 13 label fields (not on the aggregate). Used only by test mock matchers; the + * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link #matches(SpanSnapshot)} + * and never calls {@code equals}. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof AggregateEntry)) return false; + AggregateEntry that = (AggregateEntry) o; + return httpStatusCode == that.httpStatusCode + && synthetic == that.synthetic + && traceRoot == that.traceRoot + && java.util.Objects.equals(resource, that.resource) + && java.util.Objects.equals(service, that.service) + && java.util.Objects.equals(operationName, that.operationName) + && java.util.Objects.equals(serviceSource, that.serviceSource) + && java.util.Objects.equals(type, that.type) + && java.util.Objects.equals(spanKind, that.spanKind) + && peerTags.equals(that.peerTags) + && java.util.Objects.equals(httpMethod, that.httpMethod) + && java.util.Objects.equals(httpEndpoint, that.httpEndpoint) + && java.util.Objects.equals(grpcStatusCode, that.grpcStatusCode); + } + + @Override + public int hashCode() { + return (int) keyHash; + } + + // ----- helpers ----- + + private static UTF8BytesString canonicalize( + DDCache cache, CharSequence charSeq) { + if (charSeq == null) { + return EMPTY; + } + if (charSeq instanceof UTF8BytesString) { + return (UTF8BytesString) charSeq; + } + return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); + } + + /** UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. */ + private static boolean contentEquals(UTF8BytesString a, CharSequence b) { + if (a == null) { + return b == null; + } + if (b == null) { + return false; + } + // UTF8BytesString.toString() returns the underlying String -- O(1), no allocation. + String aStr = a.toString(); + if (b instanceof String) { + return aStr.equals(b); + } + if (b instanceof UTF8BytesString) { + return aStr.equals(b.toString()); + } + return aStr.contentEquals(b); + } + + private static boolean stringContentEquals(UTF8BytesString a, String b) { + if (a == null) { + return b == null; + } + return b != null && a.toString().equals(b); + } + + private static List materializePeerTags(String[] pairs) { + if (pairs == null || pairs.length == 0) { + return Collections.emptyList(); + } + if (pairs.length == 2) { + return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); + } + List tags = new ArrayList<>(pairs.length / 2); + for (int i = 0; i < pairs.length; i += 2) { + tags.add(encodePeerTag(pairs[i], pairs[i + 1])); + } + return tags; + } + + private static UTF8BytesString encodePeerTag(String name, String value) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); + return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + } + + /** + * Inverse of {@link #materializePeerTags}: takes pre-encoded UTF8 peer tags and recovers the raw + * {@code [name0, value0, name1, value1, ...]} pairs. Used by the test factory {@link #of}, not by + * the hot path. + */ + private static String[] peerTagsToRawPairs(List peerTags) { + if (peerTags == null || peerTags.isEmpty()) { + return null; + } + String[] pairs = new String[peerTags.size() * 2]; + int i = 0; + for (UTF8BytesString peerTag : peerTags) { + String s = peerTag.toString(); + int colon = s.indexOf(':'); + pairs[i++] = colon < 0 ? s : s.substring(0, colon); + pairs[i++] = colon < 0 ? "" : s.substring(colon + 1); + } + return pairs; + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java index 98260a2e2b3..08300eab296 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java @@ -1,16 +1,16 @@ package datadog.trace.common.metrics; import datadog.trace.util.Hashtable; -import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * Consumer-side {@link AggregateMetric} store, keyed on the raw fields of a {@link SpanSnapshot}. * *

Replaces the prior {@code LRUCache}. The win is on the * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise - * {@code matches}, with no {@link MetricKey} allocation and no UTF8 cache lookups. The canonical - * {@link MetricKey} (with UTF8-encoded forms) is only built once per unique key, at insert time, - * and lives on the {@link AggregateEntry}. + * {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache + * lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) live on the {@link + * AggregateEntry} itself and are built once per unique key at insert time. * *

Not thread-safe. The aggregator thread is the sole writer; {@link #clear()} must be * routed through the inbox rather than called from arbitrary threads. @@ -53,8 +53,7 @@ AggregateMetric findOrInsert(SpanSnapshot snapshot) { if (size >= maxAggregates && !evictOneStale()) { return null; } - AggregateEntry entry = - new AggregateEntry(MetricKeys.fromSnapshot(snapshot), snapshot, new AggregateMetric()); + AggregateEntry entry = AggregateEntry.forSnapshot(snapshot, new AggregateMetric()); entry.setNext(buckets[bucketIndex]); buckets[bucketIndex] = entry; size++; @@ -88,11 +87,10 @@ private boolean evictOneStale() { return false; } - void forEach(BiConsumer consumer) { + void forEach(Consumer consumer) { for (int i = 0; i < buckets.length; i++) { for (Hashtable.Entry e = buckets[i]; e != null; e = e.next()) { - AggregateEntry entry = (AggregateEntry) e; - consumer.accept(entry.key, entry.aggregate); + consumer.accept((AggregateEntry) e); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index d0262f328f6..b4fc59d5a1d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -136,9 +136,9 @@ private void report(long when, SignalItem signal) { skipped = false; writer.startBucket(aggregates.size(), when, reportingIntervalNanos); aggregates.forEach( - (key, agg) -> { - writer.add(key, agg); - agg.clear(); + entry -> { + writer.add(entry); + entry.aggregate.clear(); }); // note that this may do IO and block writer.finishBucket(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 79dcf991c10..c675fcb23c4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -20,12 +20,8 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; -import datadog.trace.api.Pair; import datadog.trace.api.WellKnownTags; -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.ReportSignal; import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.core.CoreSpan; @@ -40,7 +36,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,22 +47,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); - - static final DDCache SPAN_KINDS = DDCaches.newFixedSizeCache(16); - static final DDCache< - String, Pair, Function>> - PEER_TAGS_CACHE = - DDCaches.newFixedSizeCache( - 64); // it can be unbounded since those values are returned by the agent and should be - // under control. 64 entries is enough in this case to contain all the peer tags. - static final Function< - String, Pair, Function>> - PEER_TAGS_CACHE_ADDER = - key -> - Pair.of( - DDCaches.newFixedSizeCache(512), - value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private static final SpanKindFilter METRICS_ELIGIBLE_KINDS = diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java deleted file mode 100644 index 9e2e2098d1f..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ /dev/null @@ -1,178 +0,0 @@ -package datadog.trace.common.metrics; - -import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; - -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import datadog.trace.util.HashingUtils; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -/** The aggregation key for tracked metrics. */ -public final class MetricKey { - static final DDCache RESOURCE_CACHE = DDCaches.newFixedSizeCache(32); - static final DDCache SERVICE_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache SERVICE_SOURCE_CACHE = - DDCaches.newFixedSizeCache(16); - static final DDCache OPERATION_CACHE = DDCaches.newFixedSizeCache(64); - static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache KIND_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache HTTP_METHOD_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache HTTP_ENDPOINT_CACHE = - DDCaches.newFixedSizeCache(32); - static final DDCache GRPC_STATUS_CODE_CACHE = - DDCaches.newFixedSizeCache(32); - - private final UTF8BytesString resource; - private final UTF8BytesString service; - private final UTF8BytesString serviceSource; - private final UTF8BytesString operationName; - private final UTF8BytesString type; - private final int httpStatusCode; - private final boolean synthetics; - private final int hash; - private final boolean isTraceRoot; - private final UTF8BytesString spanKind; - private final List peerTags; - private final UTF8BytesString httpMethod; - private final UTF8BytesString httpEndpoint; - private final UTF8BytesString grpcStatusCode; - - public MetricKey( - CharSequence resource, - CharSequence service, - CharSequence operationName, - CharSequence serviceSource, - CharSequence type, - int httpStatusCode, - boolean synthetics, - boolean isTraceRoot, - CharSequence spanKind, - List peerTags, - CharSequence httpMethod, - CharSequence httpEndpoint, - CharSequence grpcStatusCode) { - this.resource = null == resource ? EMPTY : utf8(RESOURCE_CACHE, resource); - this.service = null == service ? EMPTY : utf8(SERVICE_CACHE, service); - this.serviceSource = null == serviceSource ? null : utf8(SERVICE_SOURCE_CACHE, serviceSource); - this.operationName = null == operationName ? EMPTY : utf8(OPERATION_CACHE, operationName); - this.type = null == type ? EMPTY : utf8(TYPE_CACHE, type); - this.httpStatusCode = httpStatusCode; - this.synthetics = synthetics; - this.isTraceRoot = isTraceRoot; - this.spanKind = null == spanKind ? EMPTY : utf8(KIND_CACHE, spanKind); - this.peerTags = peerTags == null ? Collections.emptyList() : peerTags; - this.httpMethod = httpMethod == null ? null : utf8(HTTP_METHOD_CACHE, httpMethod); - this.httpEndpoint = httpEndpoint == null ? null : utf8(HTTP_ENDPOINT_CACHE, httpEndpoint); - this.grpcStatusCode = - grpcStatusCode == null ? null : utf8(GRPC_STATUS_CODE_CACHE, grpcStatusCode); - - int tmpHash = 0; - tmpHash = HashingUtils.addToHash(tmpHash, this.isTraceRoot); - tmpHash = HashingUtils.addToHash(tmpHash, this.spanKind); - tmpHash = HashingUtils.addToHash(tmpHash, this.peerTags); - tmpHash = HashingUtils.addToHash(tmpHash, this.resource); - tmpHash = HashingUtils.addToHash(tmpHash, this.service); - tmpHash = HashingUtils.addToHash(tmpHash, this.operationName); - tmpHash = HashingUtils.addToHash(tmpHash, this.type); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpStatusCode); - tmpHash = HashingUtils.addToHash(tmpHash, this.synthetics); - tmpHash = HashingUtils.addToHash(tmpHash, this.serviceSource); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpEndpoint); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpMethod); - tmpHash = HashingUtils.addToHash(tmpHash, this.grpcStatusCode); - this.hash = tmpHash; - } - - static UTF8BytesString utf8(DDCache cache, CharSequence charSeq) { - if (charSeq instanceof UTF8BytesString) { - return (UTF8BytesString) charSeq; - } else { - return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); - } - } - - public UTF8BytesString getResource() { - return resource; - } - - public UTF8BytesString getService() { - return service; - } - - public UTF8BytesString getServiceSource() { - return serviceSource; - } - - public UTF8BytesString getOperationName() { - return operationName; - } - - public UTF8BytesString getType() { - return type; - } - - public int getHttpStatusCode() { - return httpStatusCode; - } - - public boolean isSynthetics() { - return synthetics; - } - - public boolean isTraceRoot() { - return isTraceRoot; - } - - public UTF8BytesString getSpanKind() { - return spanKind; - } - - public List getPeerTags() { - return peerTags; - } - - public UTF8BytesString getHttpMethod() { - return httpMethod; - } - - public UTF8BytesString getHttpEndpoint() { - return httpEndpoint; - } - - public UTF8BytesString getGrpcStatusCode() { - return grpcStatusCode; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if ((o instanceof MetricKey)) { - MetricKey metricKey = (MetricKey) o; - return hash == metricKey.hash - && synthetics == metricKey.synthetics - && httpStatusCode == metricKey.httpStatusCode - && resource.equals(metricKey.resource) - && service.equals(metricKey.service) - && operationName.equals(metricKey.operationName) - && type.equals(metricKey.type) - && isTraceRoot == metricKey.isTraceRoot - && spanKind.equals(metricKey.spanKind) - && peerTags.equals(metricKey.peerTags) - && Objects.equals(serviceSource, metricKey.serviceSource) - && Objects.equals(httpMethod, metricKey.httpMethod) - && Objects.equals(httpEndpoint, metricKey.httpEndpoint) - && Objects.equals(grpcStatusCode, metricKey.grpcStatusCode); - } - return false; - } - - @Override - public int hashCode() { - return hash; - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKeys.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKeys.java deleted file mode 100644 index 2e03c3730d3..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKeys.java +++ /dev/null @@ -1,65 +0,0 @@ -package datadog.trace.common.metrics; - -import static datadog.trace.api.Functions.UTF8_ENCODE; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS; - -import datadog.trace.api.Pair; -import datadog.trace.api.cache.DDCache; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.function.Function; - -/** - * Canonicalization helpers for {@link MetricKey}: applies the static {@link - * ConflatingMetricsAggregator#SERVICE_NAMES} / {@link ConflatingMetricsAggregator#SPAN_KINDS} / - * {@link ConflatingMetricsAggregator#PEER_TAGS_CACHE} caches to a {@link SpanSnapshot}. - * - *

Called only on a true miss in {@link AggregateTable}, so the CHM lookups inside the DDCaches - * happen once per unique key rather than once per snapshot. - */ -final class MetricKeys { - private MetricKeys() {} - - static MetricKey fromSnapshot(SpanSnapshot s) { - return new MetricKey( - s.resourceName, - SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE), - s.operationName, - s.serviceNameSource, - s.spanType, - s.httpStatusCode, - s.synthetic, - s.traceRoot, - SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create), - materializePeerTags(s.peerTagPairs), - s.httpMethod, - s.httpEndpoint, - s.grpcStatusCode); - } - - private static List materializePeerTags(String[] pairs) { - if (pairs == null || pairs.length == 0) { - return Collections.emptyList(); - } - if (pairs.length == 2) { - // single-entry fast path (matches the original singletonList shape for INTERNAL spans) - return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); - } - List tags = new ArrayList<>(pairs.length / 2); - for (int i = 0; i < pairs.length; i += 2) { - tags.add(encodePeerTag(pairs[i], pairs[i + 1])); - } - return tags; - } - - private static UTF8BytesString encodePeerTag(String name, String value) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); - return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java index fa26ed2e5db..c31825f6af8 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java @@ -3,7 +3,11 @@ public interface MetricWriter { void startBucket(int metricCount, long start, long duration); - void add(MetricKey key, AggregateMetric aggregate); + /** + * Serialize one aggregate. The {@link AggregateEntry} carries both the label fields (resource, + * service, span.kind, peer tags, etc.) and the {@link AggregateMetric} counters being reported. + */ + void add(AggregateEntry entry); void finishBucket(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 0f84964e9db..ba6ae6c2699 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -142,12 +142,13 @@ public void startBucket(int metricCount, long start, long duration) { } @Override - public void add(MetricKey key, AggregateMetric aggregate) { + public void add(AggregateEntry entry) { + final AggregateMetric aggregate = entry.aggregate; // Calculate dynamic map size based on optional fields - final boolean hasHttpMethod = key.getHttpMethod() != null; - final boolean hasHttpEndpoint = key.getHttpEndpoint() != null; - final boolean hasServiceSource = key.getServiceSource() != null; - final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null; + final boolean hasHttpMethod = entry.getHttpMethod() != null; + final boolean hasHttpEndpoint = entry.getHttpEndpoint() != null; + final boolean hasServiceSource = entry.getServiceSource() != null; + final boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null; final int mapSize = 15 + (hasServiceSource ? 1 : 0) @@ -158,31 +159,31 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.startMap(mapSize); writer.writeUTF8(NAME); - writer.writeUTF8(key.getOperationName()); + writer.writeUTF8(entry.getOperationName()); writer.writeUTF8(SERVICE); - writer.writeUTF8(key.getService()); + writer.writeUTF8(entry.getService()); writer.writeUTF8(RESOURCE); - writer.writeUTF8(key.getResource()); + writer.writeUTF8(entry.getResource()); writer.writeUTF8(TYPE); - writer.writeUTF8(key.getType()); + writer.writeUTF8(entry.getType()); writer.writeUTF8(HTTP_STATUS_CODE); - writer.writeInt(key.getHttpStatusCode()); + writer.writeInt(entry.getHttpStatusCode()); writer.writeUTF8(SYNTHETICS); - writer.writeBoolean(key.isSynthetics()); + writer.writeBoolean(entry.isSynthetics()); writer.writeUTF8(IS_TRACE_ROOT); - writer.writeInt(key.isTraceRoot() ? TRISTATE_TRUE : TRISTATE_FALSE); + writer.writeInt(entry.isTraceRoot() ? TRISTATE_TRUE : TRISTATE_FALSE); writer.writeUTF8(SPAN_KIND); - writer.writeUTF8(key.getSpanKind()); + writer.writeUTF8(entry.getSpanKind()); writer.writeUTF8(PEER_TAGS); - final List peerTags = key.getPeerTags(); + final List peerTags = entry.getPeerTags(); writer.startArray(peerTags.size()); for (UTF8BytesString peerTag : peerTags) { @@ -191,24 +192,24 @@ public void add(MetricKey key, AggregateMetric aggregate) { if (hasServiceSource) { writer.writeUTF8(SERVICE_SOURCE); - writer.writeUTF8(key.getServiceSource()); + writer.writeUTF8(entry.getServiceSource()); } // Only include HTTPMethod if present if (hasHttpMethod) { writer.writeUTF8(HTTP_METHOD); - writer.writeUTF8(key.getHttpMethod()); + writer.writeUTF8(entry.getHttpMethod()); } // Only include HTTPEndpoint if present if (hasHttpEndpoint) { writer.writeUTF8(HTTP_ENDPOINT); - writer.writeUTF8(key.getHttpEndpoint()); + writer.writeUTF8(entry.getHttpEndpoint()); } // Only include GRPCStatusCode if present (rpc-type spans) if (hasGrpcStatusCode) { writer.writeUTF8(GRPC_STATUS_CODE); - writer.writeUTF8(key.getGrpcStatusCode()); + writer.writeUTF8(entry.getGrpcStatusCode()); } writer.writeUTF8(HITS); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java index 2816fad0411..b7f81712945 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -2,7 +2,8 @@ /** * Immutable per-span value posted from the producer to the aggregator thread. Carries the raw - * inputs the aggregator needs to build a {@link MetricKey} and update an {@link AggregateMetric}. + * inputs the aggregator needs to build an {@link AggregateEntry} and update its {@link + * AggregateMetric}. * *

All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the * aggregator thread; the producer just shuffles references. diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index dedd0bae75b..4dd0155443a 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -119,7 +119,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( null, "service", "operation", @@ -133,8 +133,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -165,7 +165,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -179,8 +179,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -217,7 +217,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered == statsComputed (statsComputed ? 1 : 0) * writer.startBucket(1, _, _) (statsComputed ? 1 : 0) * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -231,9 +231,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { httpMethod, httpEndpoint, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + } (statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -279,7 +279,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(2, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -293,11 +293,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + } 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -311,9 +311,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -344,7 +344,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(1, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -358,9 +358,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -396,7 +396,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -410,9 +410,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == topLevelCount && e.aggregate.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -455,7 +455,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.finishBucket() >> { latch.countDown() } 1 * writer.startBucket(2, _, SECONDS.toNanos(reportingInterval)) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -469,10 +469,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration + } + 1 * writer.add(AggregateEntry.of( "resource2", "service2", "operation2", @@ -486,9 +486,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration * 2 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration * 2 + } cleanup: aggregator.close() @@ -526,7 +526,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should aggregate into single metric" latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -540,9 +540,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration + } 1 * writer.finishBucket() >> { latch.countDown() } when: "publish spans with different endpoints" @@ -567,7 +567,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create separate metrics for each endpoint/method combination" latchTriggered2 1 * writer.startBucket(3, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -581,10 +581,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -598,10 +598,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/orders/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -615,9 +615,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 3 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 3 + } 1 * writer.finishBucket() >> { latch2.countDown() } cleanup: @@ -665,7 +665,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create 4 separate metrics" latchTriggered 1 * writer.startBucket(4, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -679,10 +679,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -696,10 +696,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -713,10 +713,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 3 - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 3 + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -730,9 +730,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/orders/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 4 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 4 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -769,7 +769,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create separate metric keys for spans with and without HTTP tags" latchTriggered 1 * writer.startBucket(2, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -783,10 +783,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -800,9 +800,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -837,7 +837,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create the different metric keys for spans with and without sources" latchTriggered 1 * writer.startBucket(2, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -851,10 +851,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 2 && value.getDuration() == 2 * duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 2 && e.aggregate.getDuration() == 2 * duration + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -868,9 +868,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -908,7 +908,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(10, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 10; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -922,11 +922,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration } } - 0 * writer.add(new MetricKey( + 0 * writer.add(AggregateEntry.of( "resource", "service10", "operation", @@ -940,7 +940,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + )) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1055,7 +1055,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -1069,9 +1069,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1090,7 +1090,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(4, _, SECONDS.toNanos(reportingInterval)) for (int i = 1; i < 5; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -1104,11 +1104,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } } - 0 * writer.add(new MetricKey( + 0 * writer.add(AggregateEntry.of( "resource", "service0", "operation", @@ -1122,7 +1122,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + )) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1157,7 +1157,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -1171,9 +1171,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1183,7 +1183,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "aggregate not updated in cycle is not reported" 0 * writer.finishBucket() 0 * writer.startBucket(_, _, _) - 0 * writer.add(_, _) + 0 * writer.add(_) cleanup: aggregator.close() @@ -1216,7 +1216,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(1)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -1230,9 +1230,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1383,7 +1383,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(1, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1397,9 +1397,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1438,7 +1438,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(1, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1452,9 +1452,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 3 && aggregateMetric.getTopLevelCount() == 3 && aggregateMetric.getDuration() == 450 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 3 && e.aggregate.getTopLevelCount() == 3 && e.aggregate.getDuration() == 450 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1493,7 +1493,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(3, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1507,11 +1507,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 + } 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1525,11 +1525,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/orders", null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 200 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 200 + } 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1543,9 +1543,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 150 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 150 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1581,7 +1581,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(3, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "grpc.service/Method", "service", "grpc.server", @@ -1595,8 +1595,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, "0" - ), _) - 1 * writer.add(new MetricKey( + )) + 1 * writer.add(AggregateEntry.of( "grpc.service/Method", "service", "grpc.server", @@ -1610,8 +1610,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, "5" - ), _) - 1 * writer.add(new MetricKey( + )) + 1 * writer.add(AggregateEntry.of( "GET /api", "service", "http.request", @@ -1625,7 +1625,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + )) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 3ff81de9851..08f0f7cbb92 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -7,7 +7,6 @@ import static java.util.concurrent.TimeUnit.SECONDS import datadog.metrics.api.Histograms import datadog.metrics.impl.DDSketchHistograms import datadog.trace.api.Config -import datadog.trace.api.Pair import datadog.trace.api.ProcessTags import datadog.trace.api.WellKnownTags import datadog.trace.api.git.CommitInfo @@ -26,6 +25,30 @@ class SerializingMetricWriterTest extends DDSpecification { Histograms.register(DDSketchHistograms.FACTORY) } + /** Build an {@link AggregateEntry} with a pre-recorded duration count. */ + private static AggregateEntry entry( + CharSequence resource, + CharSequence service, + CharSequence operationName, + CharSequence serviceSource, + CharSequence type, + int httpStatusCode, + boolean synthetic, + boolean traceRoot, + CharSequence spanKind, + List peerTags, + CharSequence httpMethod, + CharSequence httpEndpoint, + CharSequence grpcStatusCode, + int hitCount) { + AggregateEntry e = AggregateEntry.of( + resource, service, operationName, serviceSource, type, + httpStatusCode, synthetic, traceRoot, spanKind, peerTags, + httpMethod, httpEndpoint, grpcStatusCode) + e.aggregate.recordDurations(hitCount, new AtomicLongArray(1L)) + return e + } + def "should produce correct message #iterationIndex with process tags enabled #withProcessTags" () { setup: if (!withProcessTags) { @@ -40,8 +63,8 @@ class SerializingMetricWriterTest extends DDSpecification { when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() @@ -55,88 +78,40 @@ class SerializingMetricWriterTest extends DDSpecification { where: content << [ [ - Pair.of( - new MetricKey( - "resource1", - "service1", - "operation1", - null, - "type", - 0, - false, - false, - "client", + entry( + "resource1", "service1", "operation1", null, "type", 0, + false, false, "client", [ UTF8BytesString.create("country:canada"), UTF8BytesString.create("georegion:amer"), UTF8BytesString.create("peer.service:remote-service") ], - null, - null, - null - ), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) - ), - Pair.of( - new MetricKey( - "resource2", - "service2", - "operation2", - null, - "type2", - 200, - true, - false, - "producer", + null, null, null, + 10), + entry( + "resource2", "service2", "operation2", null, "type2", 200, + true, false, "producer", [ UTF8BytesString.create("country:canada"), UTF8BytesString.create("georegion:amer"), UTF8BytesString.create("peer.service:remote-service") ], - null, - null, - null - ), - new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) - ), - Pair.of( - new MetricKey( - "GET /api/users/:id", - "web-service", - "http.request", - null, - "web", - 200, - false, - true, - "server", + null, null, null, + 9), + entry( + "GET /api/users/:id", "web-service", "http.request", null, "web", 200, + false, true, "server", [], - "GET", - "/api/users/:id", - null - ), - new AggregateMetric().recordDurations(5, new AtomicLongArray(1L)) - ) + null, null, null, + 5) ], (0..10000).collect({ i -> - Pair.of( - new MetricKey( - "resource" + i, - "service" + i, - "operation" + i, - null, - "type", - 0, - false, - false, - "producer", + entry( + "resource" + i, "service" + i, "operation" + i, null, "type", 0, + false, false, "producer", [UTF8BytesString.create("messaging.destination:dest" + i)], - null, - null, - null - ), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) - ) + null, null, null, + 10) }) ] withProcessTags << [true, false] @@ -148,22 +123,18 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null) + def entryNoSource = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) + def entryWithSource = entry("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null, 1) - def content = [ - Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - ] + def content = [entryNoSource, entryWithSource] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() @@ -177,34 +148,25 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null,null) - def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders",null) - def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null) - - def content = [ - Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] + def entryWithBoth = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) + def entryWithMethodOnly = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null, 1) + def entryWithEndpointOnly = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null, 1) + def entryWithNeither = entry("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null, 1) + + def content = [entryWithBoth, entryWithMethodOnly, entryWithEndpointOnly, entryWithNeither] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() then: sink.validatedInput() - // Test passes if validation in ValidatingSink succeeds - // ValidatingSink verifies that map size matches actual number of fields - // and that HTTPMethod/HTTPEndpoint are only present when non-empty } def "add git sha commit info when sha commit is #shaCommit"() { @@ -216,40 +178,63 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def key = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) + def e = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) - def content = [Pair.of(key, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),] + def content = [e] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128, gitInfoProvider) when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry entryItem : content) { + writer.add(entryItem) } writer.finishBucket() then: - sink.validatedInput() where: shaCommit << [null, "123456"] } + def "GRPCStatusCode field is present in payload for rpc-type spans"() { + setup: + long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) + long duration = SECONDS.toNanos(10) + WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") + + def entryWithGrpc = entry("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK", 1) + def entryWithGrpcError = entry("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND", 1) + def entryWithoutGrpc = entry("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null, 1) + + def content = [entryWithGrpc, entryWithGrpcError, entryWithoutGrpc] + + ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) + SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) + + when: + writer.startBucket(content.size(), startTime, duration) + for (AggregateEntry e : content) { + writer.add(e) + } + writer.finishBucket() + + then: + sink.validatedInput() + } + static class ValidatingSink implements Sink { private final WellKnownTags wellKnownTags private final long startTimeNanos private final long duration private boolean validated = false - private List> content + private List content ValidatingSink(WellKnownTags wellKnownTags, long startTimeNanos, long duration, - List> content) { + List content) { this.wellKnownTags = wellKnownTags this.startTimeNanos = startTimeNanos this.duration = duration @@ -298,70 +283,69 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackString() == "Stats" int statCount = unpacker.unpackArrayHeader() assert statCount == content.size() - for (Pair pair : content) { - MetricKey key = pair.getLeft() - AggregateMetric value = pair.getRight() + for (AggregateEntry entry : content) { + AggregateMetric value = entry.aggregate int metricMapSize = unpacker.unpackMapHeader() // Calculate expected map size based on optional fields - boolean hasHttpMethod = key.getHttpMethod() != null - boolean hasHttpEndpoint = key.getHttpEndpoint() != null - boolean hasServiceSource = key.getServiceSource() != null - boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null + boolean hasHttpMethod = entry.getHttpMethod() != null + boolean hasHttpEndpoint = entry.getHttpEndpoint() != null + boolean hasServiceSource = entry.getServiceSource() != null + boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null int expectedMapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) assert metricMapSize == expectedMapSize int elementCount = 0 assert unpacker.unpackString() == "Name" - assert unpacker.unpackString() == key.getOperationName() as String + assert unpacker.unpackString() == entry.getOperationName() as String ++elementCount assert unpacker.unpackString() == "Service" - assert unpacker.unpackString() == key.getService() as String + assert unpacker.unpackString() == entry.getService() as String ++elementCount assert unpacker.unpackString() == "Resource" - assert unpacker.unpackString() == key.getResource() as String + assert unpacker.unpackString() == entry.getResource() as String ++elementCount assert unpacker.unpackString() == "Type" - assert unpacker.unpackString() == key.getType() as String + assert unpacker.unpackString() == entry.getType() as String ++elementCount assert unpacker.unpackString() == "HTTPStatusCode" - assert unpacker.unpackInt() == key.getHttpStatusCode() + assert unpacker.unpackInt() == entry.getHttpStatusCode() ++elementCount assert unpacker.unpackString() == "Synthetics" - assert unpacker.unpackBoolean() == key.isSynthetics() + assert unpacker.unpackBoolean() == entry.isSynthetics() ++elementCount assert unpacker.unpackString() == "IsTraceRoot" - assert unpacker.unpackInt() == (key.isTraceRoot() ? TriState.TRUE.serialValue : TriState.FALSE.serialValue) + assert unpacker.unpackInt() == (entry.isTraceRoot() ? TriState.TRUE.serialValue : TriState.FALSE.serialValue) ++elementCount assert unpacker.unpackString() == "SpanKind" - assert unpacker.unpackString() == key.getSpanKind() as String + assert unpacker.unpackString() == entry.getSpanKind() as String ++elementCount assert unpacker.unpackString() == "PeerTags" int peerTagsLength = unpacker.unpackArrayHeader() - assert peerTagsLength == key.getPeerTags().size() + assert peerTagsLength == entry.getPeerTags().size() for (int i = 0; i < peerTagsLength; i++) { def unpackedPeerTag = unpacker.unpackString() - assert unpackedPeerTag == key.getPeerTags()[i].toString() + assert unpackedPeerTag == entry.getPeerTags()[i].toString() } ++elementCount // Service source is only present when the service name has been overridden by the tracer if (hasServiceSource) { assert unpacker.unpackString() == "srv_src" - assert unpacker.unpackString() == key.getServiceSource().toString() + assert unpacker.unpackString() == entry.getServiceSource().toString() ++elementCount } // HTTPMethod and HTTPEndpoint are optional - only present if non-null if (hasHttpMethod) { assert unpacker.unpackString() == "HTTPMethod" - assert unpacker.unpackString() == key.getHttpMethod() as String + assert unpacker.unpackString() == entry.getHttpMethod() as String ++elementCount } if (hasHttpEndpoint) { assert unpacker.unpackString() == "HTTPEndpoint" - assert unpacker.unpackString() == key.getHttpEndpoint() as String + assert unpacker.unpackString() == entry.getHttpEndpoint() as String ++elementCount } if (hasGrpcStatusCode) { assert unpacker.unpackString() == "GRPCStatusCode" - assert unpacker.unpackString() == key.getGrpcStatusCode() as String + assert unpacker.unpackString() == entry.getGrpcStatusCode() as String ++elementCount } assert unpacker.unpackString() == "Hits" @@ -397,99 +381,4 @@ class SerializingMetricWriterTest extends DDSpecification { return validated } } - - def "ServiceSource optional in the payload"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - // Create keys with different combinations of HTTP fields - def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null) - - def content = [ - Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - } - - def "GRPCStatusCode field is present in payload for rpc-type spans"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - def keyWithGrpc = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK") - def keyWithGrpcError = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND") - def keyWithoutGrpc = new MetricKey("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null) - - def content = [ - Pair.of(keyWithGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithGrpcError, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithoutGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - } - - def "HTTPMethod and HTTPEndpoint fields are optional in payload"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - // Create keys with different combinations of HTTP fields - def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null) - def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null) - def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null) - - def content = [ - Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - // Test passes if validation in ValidatingSink succeeds - // ValidatingSink verifies that map size matches actual number of fields - // and that HTTPMethod/HTTPEndpoint are only present when non-empty - } } diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index 6c4839e4e4f..44f2b36cb6b 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -148,7 +148,7 @@ void forEachVisitsEveryEntry() { table.findOrInsert(snapshot("c", "op", "client")).recordOneDuration(3L | ERROR_TAG); Map visited = new HashMap<>(); - table.forEach((key, agg) -> visited.put(key.getService().toString(), agg.getDuration())); + table.forEach(e -> visited.put(e.getService().toString(), e.aggregate.getDuration())); assertEquals(3, visited.size()); assertEquals(1L, visited.get("a")); @@ -172,17 +172,17 @@ void clearEmptiesTheTable() { } @Test - void canonicalMetricKeyIsBuiltOnInsert() { + void encodedLabelsAreBuiltOnInsert() { AggregateTable table = new AggregateTable(4); - List seen = new ArrayList<>(); + List seen = new ArrayList<>(); table.findOrInsert(snapshot("svc", "op", "client")); - table.forEach((key, agg) -> seen.add(key)); + table.forEach(seen::add); assertEquals(1, seen.size()); - MetricKey k = seen.get(0); - assertEquals("svc", k.getService().toString()); - assertEquals("op", k.getOperationName().toString()); - assertEquals("client", k.getSpanKind().toString()); + AggregateEntry e = seen.get(0); + assertEquals("svc", e.getService().toString()); + assertEquals("op", e.getOperationName().toString()); + assertEquals("client", e.getSpanKind().toString()); } // ---------- helpers ---------- diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index 2972ffa2c18..81a476c67c8 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -8,9 +8,8 @@ import datadog.metrics.impl.DDSketchHistograms import datadog.trace.api.Config import datadog.trace.api.WellKnownTags import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString -import datadog.trace.common.metrics.AggregateMetric +import datadog.trace.common.metrics.AggregateEntry import datadog.trace.common.metrics.EventListener -import datadog.trace.common.metrics.MetricKey import datadog.trace.common.metrics.OkHttpSink import datadog.trace.common.metrics.SerializingMetricWriter import java.util.concurrent.CopyOnWriteArrayList @@ -39,14 +38,12 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { sink ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) - writer.add( - new MetricKey("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null), - new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) - ) - writer.add( - new MetricKey("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) - ) + def entry1 = AggregateEntry.of("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null) + entry1.aggregate.recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) + writer.add(entry1) + def entry2 = AggregateEntry.of("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null) + entry2.aggregate.recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) + writer.add(entry2) writer.finishBucket() then: