diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
new file mode 100644
index 00000000000..e2fda9fde47
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
@@ -0,0 +1,368 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.api.Functions.UTF8_ENCODE;
+import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY;
+
+import datadog.trace.api.Pair;
+import datadog.trace.api.cache.DDCache;
+import datadog.trace.api.cache.DDCaches;
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import datadog.trace.util.Hashtable;
+import datadog.trace.util.LongHashingUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data
+ * {@link SerializingMetricWriter} writes to the wire) plus the mutable {@link AggregateMetric}.
+ *
+ *
{@link #matches(SpanSnapshot)} compares the entry's stored UTF8 forms against the snapshot's
+ * raw {@code CharSequence}/{@code String}/{@code String[]} fields via content-equality, so {@code
+ * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry
+ * instead of splitting.
+ *
+ *
The static UTF8 caches that used to live on {@code MetricKey} and {@code
+ * ConflatingMetricsAggregator} are consolidated here.
+ */
+final class AggregateEntry extends Hashtable.Entry {
+
+ // UTF8 caches consolidated from the previous MetricKey + ConflatingMetricsAggregator split.
+ private static final DDCache RESOURCE_CACHE =
+ DDCaches.newFixedSizeCache(32);
+ private static final DDCache SERVICE_CACHE =
+ DDCaches.newFixedSizeCache(32);
+ private static final DDCache OPERATION_CACHE =
+ DDCaches.newFixedSizeCache(64);
+ private static final DDCache SERVICE_SOURCE_CACHE =
+ DDCaches.newFixedSizeCache(16);
+ private static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8);
+ private static final DDCache SPAN_KIND_CACHE =
+ DDCaches.newFixedSizeCache(16);
+ private static final DDCache HTTP_METHOD_CACHE =
+ DDCaches.newFixedSizeCache(8);
+ private static final DDCache HTTP_ENDPOINT_CACHE =
+ DDCaches.newFixedSizeCache(32);
+ private static final DDCache GRPC_STATUS_CODE_CACHE =
+ DDCaches.newFixedSizeCache(32);
+
+ /**
+ * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner
+ * cache produces the "name:value" encoded form the serializer writes.
+ */
+ private static final DDCache<
+ String, Pair, Function>>
+ PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64);
+
+ private static final Function<
+ String, Pair, Function>>
+ PEER_TAGS_CACHE_ADDER =
+ key ->
+ Pair.of(
+ DDCaches.newFixedSizeCache(512),
+ value -> UTF8BytesString.create(key + ":" + value));
+
+ private final UTF8BytesString resource;
+ private final UTF8BytesString service;
+ private final UTF8BytesString operationName;
+ private final UTF8BytesString serviceSource; // nullable
+ private final UTF8BytesString type;
+ private final UTF8BytesString spanKind;
+ private final UTF8BytesString httpMethod; // nullable
+ private final UTF8BytesString httpEndpoint; // nullable
+ private final UTF8BytesString grpcStatusCode; // nullable
+ private final short httpStatusCode;
+ private final boolean synthetic;
+ private final boolean traceRoot;
+
+ // Peer tags carried in two forms: raw String[] for matches() against the snapshot's pairs,
+ // and pre-encoded List ("name:value") for the serializer.
+ private final String[] peerTagPairsRaw;
+ private final List peerTags;
+
+ final AggregateMetric aggregate;
+
+ /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */
+ private AggregateEntry(SpanSnapshot s, long keyHash, AggregateMetric aggregate) {
+ super(keyHash);
+ this.resource = canonicalize(RESOURCE_CACHE, s.resourceName);
+ this.service = SERVICE_CACHE.computeIfAbsent(s.serviceName, UTF8_ENCODE);
+ this.operationName = canonicalize(OPERATION_CACHE, s.operationName);
+ this.serviceSource =
+ s.serviceNameSource == null
+ ? null
+ : canonicalize(SERVICE_SOURCE_CACHE, s.serviceNameSource);
+ this.type = canonicalize(TYPE_CACHE, s.spanType);
+ this.spanKind = SPAN_KIND_CACHE.computeIfAbsent(s.spanKind, UTF8BytesString::create);
+ this.httpMethod =
+ s.httpMethod == null
+ ? null
+ : HTTP_METHOD_CACHE.computeIfAbsent(s.httpMethod, UTF8BytesString::create);
+ this.httpEndpoint =
+ s.httpEndpoint == null
+ ? null
+ : HTTP_ENDPOINT_CACHE.computeIfAbsent(s.httpEndpoint, UTF8BytesString::create);
+ this.grpcStatusCode =
+ s.grpcStatusCode == null
+ ? null
+ : GRPC_STATUS_CODE_CACHE.computeIfAbsent(s.grpcStatusCode, UTF8BytesString::create);
+ this.httpStatusCode = s.httpStatusCode;
+ this.synthetic = s.synthetic;
+ this.traceRoot = s.traceRoot;
+ this.peerTagPairsRaw = s.peerTagPairs;
+ this.peerTags = materializePeerTags(s.peerTagPairs);
+ this.aggregate = aggregate;
+ }
+
+ /** Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. */
+ static AggregateEntry of(
+ CharSequence resource,
+ CharSequence service,
+ CharSequence operationName,
+ CharSequence serviceSource,
+ CharSequence type,
+ int httpStatusCode,
+ boolean synthetic,
+ boolean traceRoot,
+ CharSequence spanKind,
+ List peerTags,
+ CharSequence httpMethod,
+ CharSequence httpEndpoint,
+ CharSequence grpcStatusCode) {
+ String[] rawPairs = peerTagsToRawPairs(peerTags);
+ SpanSnapshot synthetic_snapshot =
+ new SpanSnapshot(
+ resource,
+ service == null ? null : service.toString(),
+ operationName,
+ serviceSource,
+ type,
+ (short) httpStatusCode,
+ synthetic,
+ traceRoot,
+ spanKind == null ? null : spanKind.toString(),
+ rawPairs,
+ httpMethod == null ? null : httpMethod.toString(),
+ httpEndpoint == null ? null : httpEndpoint.toString(),
+ grpcStatusCode == null ? null : grpcStatusCode.toString(),
+ 0L);
+ return new AggregateEntry(
+ synthetic_snapshot, hashOf(synthetic_snapshot), new AggregateMetric());
+ }
+
+ /** Construct from a snapshot at consumer-thread miss time. */
+ static AggregateEntry forSnapshot(SpanSnapshot s, AggregateMetric aggregate) {
+ return new AggregateEntry(s, hashOf(s), aggregate);
+ }
+
+ boolean matches(SpanSnapshot s) {
+ return httpStatusCode == s.httpStatusCode
+ && synthetic == s.synthetic
+ && traceRoot == s.traceRoot
+ && contentEquals(resource, s.resourceName)
+ && stringContentEquals(service, s.serviceName)
+ && contentEquals(operationName, s.operationName)
+ && contentEquals(serviceSource, s.serviceNameSource)
+ && contentEquals(type, s.spanType)
+ && stringContentEquals(spanKind, s.spanKind)
+ && Arrays.equals(peerTagPairsRaw, s.peerTagPairs)
+ && stringContentEquals(httpMethod, s.httpMethod)
+ && stringContentEquals(httpEndpoint, s.httpEndpoint)
+ && stringContentEquals(grpcStatusCode, s.grpcStatusCode);
+ }
+
+ /**
+ * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no
+ * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's
+ * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes
+ * to the same bucket the snapshot itself looks up.
+ *
+ * Hashes are content-stable across {@code String} / {@code UTF8BytesString}: {@link
+ * UTF8BytesString#hashCode()} returns the underlying {@code String}'s hash.
+ */
+ static long hashOf(SpanSnapshot s) {
+ long h = 0;
+ h = LongHashingUtils.addToHash(h, s.resourceName);
+ h = LongHashingUtils.addToHash(h, s.serviceName);
+ h = LongHashingUtils.addToHash(h, s.operationName);
+ h = LongHashingUtils.addToHash(h, s.serviceNameSource);
+ h = LongHashingUtils.addToHash(h, s.spanType);
+ h = LongHashingUtils.addToHash(h, s.httpStatusCode);
+ h = LongHashingUtils.addToHash(h, s.synthetic);
+ h = LongHashingUtils.addToHash(h, s.traceRoot);
+ h = LongHashingUtils.addToHash(h, s.spanKind);
+ if (s.peerTagPairs != null) {
+ for (String p : s.peerTagPairs) {
+ h = LongHashingUtils.addToHash(h, p);
+ }
+ }
+ h = LongHashingUtils.addToHash(h, s.httpMethod);
+ h = LongHashingUtils.addToHash(h, s.httpEndpoint);
+ h = LongHashingUtils.addToHash(h, s.grpcStatusCode);
+ return h;
+ }
+
+ // Accessors for SerializingMetricWriter.
+ UTF8BytesString getResource() {
+ return resource;
+ }
+
+ UTF8BytesString getService() {
+ return service;
+ }
+
+ UTF8BytesString getOperationName() {
+ return operationName;
+ }
+
+ UTF8BytesString getServiceSource() {
+ return serviceSource;
+ }
+
+ UTF8BytesString getType() {
+ return type;
+ }
+
+ UTF8BytesString getSpanKind() {
+ return spanKind;
+ }
+
+ UTF8BytesString getHttpMethod() {
+ return httpMethod;
+ }
+
+ UTF8BytesString getHttpEndpoint() {
+ return httpEndpoint;
+ }
+
+ UTF8BytesString getGrpcStatusCode() {
+ return grpcStatusCode;
+ }
+
+ int getHttpStatusCode() {
+ return httpStatusCode;
+ }
+
+ boolean isSynthetics() {
+ return synthetic;
+ }
+
+ boolean isTraceRoot() {
+ return traceRoot;
+ }
+
+ List getPeerTags() {
+ return peerTags;
+ }
+
+ /**
+ * Equality on the 13 label fields (not on the aggregate). Used only by test mock matchers; the
+ * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link #matches(SpanSnapshot)}
+ * and never calls {@code equals}.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof AggregateEntry)) return false;
+ AggregateEntry that = (AggregateEntry) o;
+ return httpStatusCode == that.httpStatusCode
+ && synthetic == that.synthetic
+ && traceRoot == that.traceRoot
+ && java.util.Objects.equals(resource, that.resource)
+ && java.util.Objects.equals(service, that.service)
+ && java.util.Objects.equals(operationName, that.operationName)
+ && java.util.Objects.equals(serviceSource, that.serviceSource)
+ && java.util.Objects.equals(type, that.type)
+ && java.util.Objects.equals(spanKind, that.spanKind)
+ && peerTags.equals(that.peerTags)
+ && java.util.Objects.equals(httpMethod, that.httpMethod)
+ && java.util.Objects.equals(httpEndpoint, that.httpEndpoint)
+ && java.util.Objects.equals(grpcStatusCode, that.grpcStatusCode);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) keyHash;
+ }
+
+ // ----- helpers -----
+
+ private static UTF8BytesString canonicalize(
+ DDCache cache, CharSequence charSeq) {
+ if (charSeq == null) {
+ return EMPTY;
+ }
+ if (charSeq instanceof UTF8BytesString) {
+ return (UTF8BytesString) charSeq;
+ }
+ return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create);
+ }
+
+ /** UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. */
+ private static boolean contentEquals(UTF8BytesString a, CharSequence b) {
+ if (a == null) {
+ return b == null;
+ }
+ if (b == null) {
+ return false;
+ }
+ // UTF8BytesString.toString() returns the underlying String -- O(1), no allocation.
+ String aStr = a.toString();
+ if (b instanceof String) {
+ return aStr.equals(b);
+ }
+ if (b instanceof UTF8BytesString) {
+ return aStr.equals(b.toString());
+ }
+ return aStr.contentEquals(b);
+ }
+
+ private static boolean stringContentEquals(UTF8BytesString a, String b) {
+ if (a == null) {
+ return b == null;
+ }
+ return b != null && a.toString().equals(b);
+ }
+
+ private static List materializePeerTags(String[] pairs) {
+ if (pairs == null || pairs.length == 0) {
+ return Collections.emptyList();
+ }
+ if (pairs.length == 2) {
+ return Collections.singletonList(encodePeerTag(pairs[0], pairs[1]));
+ }
+ List tags = new ArrayList<>(pairs.length / 2);
+ for (int i = 0; i < pairs.length; i += 2) {
+ tags.add(encodePeerTag(pairs[i], pairs[i + 1]));
+ }
+ return tags;
+ }
+
+ private static UTF8BytesString encodePeerTag(String name, String value) {
+ final Pair, Function>
+ cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER);
+ return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight());
+ }
+
+ /**
+ * Inverse of {@link #materializePeerTags}: takes pre-encoded UTF8 peer tags and recovers the raw
+ * {@code [name0, value0, name1, value1, ...]} pairs. Used by the test factory {@link #of}, not by
+ * the hot path.
+ */
+ private static String[] peerTagsToRawPairs(List peerTags) {
+ if (peerTags == null || peerTags.isEmpty()) {
+ return null;
+ }
+ String[] pairs = new String[peerTags.size() * 2];
+ int i = 0;
+ for (UTF8BytesString peerTag : peerTags) {
+ String s = peerTag.toString();
+ int colon = s.indexOf(':');
+ pairs[i++] = colon < 0 ? s : s.substring(0, colon);
+ pairs[i++] = colon < 0 ? "" : s.substring(colon + 1);
+ }
+ return pairs;
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java
new file mode 100644
index 00000000000..08300eab296
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java
@@ -0,0 +1,132 @@
+package datadog.trace.common.metrics;
+
+import datadog.trace.util.Hashtable;
+import java.util.function.Consumer;
+
+/**
+ * Consumer-side {@link AggregateMetric} store, keyed on the raw fields of a {@link SpanSnapshot}.
+ *
+ * Replaces the prior {@code LRUCache}. The win is on the
+ * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise
+ * {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache
+ * lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) live on the {@link
+ * AggregateEntry} itself and are built once per unique key at insert time.
+ *
+ * Not thread-safe. The aggregator thread is the sole writer; {@link #clear()} must be
+ * routed through the inbox rather than called from arbitrary threads.
+ */
+final class AggregateTable {
+
+ private final Hashtable.Entry[] buckets;
+ private final int maxAggregates;
+ private int size;
+
+ AggregateTable(int maxAggregates) {
+ this.buckets = Hashtable.Support.create(maxAggregates * 4 / 3);
+ this.maxAggregates = maxAggregates;
+ }
+
+ int size() {
+ return size;
+ }
+
+ boolean isEmpty() {
+ return size == 0;
+ }
+
+ /**
+ * Returns the {@link AggregateMetric} to update for {@code snapshot}, lazily creating an entry on
+ * miss. Returns {@code null} when the table is at capacity and no stale entry can be evicted --
+ * the caller should drop the data point in that case.
+ */
+ AggregateMetric findOrInsert(SpanSnapshot snapshot) {
+ long keyHash = AggregateEntry.hashOf(snapshot);
+ int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash);
+ for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) {
+ if (e.keyHash == keyHash) {
+ AggregateEntry candidate = (AggregateEntry) e;
+ if (candidate.matches(snapshot)) {
+ return candidate.aggregate;
+ }
+ }
+ }
+ if (size >= maxAggregates && !evictOneStale()) {
+ return null;
+ }
+ AggregateEntry entry = AggregateEntry.forSnapshot(snapshot, new AggregateMetric());
+ entry.setNext(buckets[bucketIndex]);
+ buckets[bucketIndex] = entry;
+ size++;
+ return entry.aggregate;
+ }
+
+ /** Unlink the first entry whose {@code AggregateMetric.getHitCount() == 0}. */
+ private boolean evictOneStale() {
+ for (int i = 0; i < buckets.length; i++) {
+ Hashtable.Entry head = buckets[i];
+ if (head == null) {
+ continue;
+ }
+ if (((AggregateEntry) head).aggregate.getHitCount() == 0) {
+ buckets[i] = head.next();
+ size--;
+ return true;
+ }
+ Hashtable.Entry prev = head;
+ Hashtable.Entry cur = head.next();
+ while (cur != null) {
+ if (((AggregateEntry) cur).aggregate.getHitCount() == 0) {
+ prev.setNext(cur.next());
+ size--;
+ return true;
+ }
+ prev = cur;
+ cur = cur.next();
+ }
+ }
+ return false;
+ }
+
+ void forEach(Consumer consumer) {
+ for (int i = 0; i < buckets.length; i++) {
+ for (Hashtable.Entry e = buckets[i]; e != null; e = e.next()) {
+ consumer.accept((AggregateEntry) e);
+ }
+ }
+ }
+
+ /** Removes entries whose {@code AggregateMetric.getHitCount() == 0}. */
+ void expungeStaleAggregates() {
+ for (int i = 0; i < buckets.length; i++) {
+ // unlink leading stale entries
+ Hashtable.Entry head = buckets[i];
+ while (head != null && ((AggregateEntry) head).aggregate.getHitCount() == 0) {
+ head = head.next();
+ size--;
+ }
+ buckets[i] = head;
+ if (head == null) {
+ continue;
+ }
+ // unlink stale entries in the chain
+ Hashtable.Entry prev = head;
+ Hashtable.Entry cur = head.next();
+ while (cur != null) {
+ if (((AggregateEntry) cur).aggregate.getHitCount() == 0) {
+ Hashtable.Entry skipped = cur.next();
+ prev.setNext(skipped);
+ size--;
+ cur = skipped;
+ } else {
+ prev = cur;
+ cur = cur.next();
+ }
+ }
+ }
+ }
+
+ void clear() {
+ Hashtable.Support.clear(buckets);
+ size = 0;
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
index e632555cc21..b4fc59d5a1d 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
@@ -1,26 +1,12 @@
package datadog.trace.common.metrics;
-import static datadog.trace.api.Functions.UTF8_ENCODE;
-import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE;
-import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER;
-import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES;
-import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import datadog.trace.api.Pair;
-import datadog.trace.api.cache.DDCache;
-import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import datadog.trace.common.metrics.SignalItem.ClearSignal;
import datadog.trace.common.metrics.SignalItem.StopSignal;
import datadog.trace.core.monitor.HealthMetrics;
-import datadog.trace.core.util.LRUCache;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import org.jctools.queues.MessagePassingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +18,9 @@ final class Aggregator implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Aggregator.class);
private final MessagePassingQueue inbox;
- private final LRUCache aggregates;
+ private final AggregateTable aggregates;
private final MetricWriter writer;
+ private final HealthMetrics healthMetrics;
// the reporting interval controls how much history will be buffered
// when the agent is unresponsive (only 10 pending requests will be
// buffered by OkHttpSink)
@@ -73,27 +60,10 @@ final class Aggregator implements Runnable {
HealthMetrics healthMetrics) {
this.writer = writer;
this.inbox = inbox;
- this.aggregates =
- new LRUCache<>(
- new AggregateExpiry(healthMetrics), maxAggregates * 4 / 3, 0.75f, maxAggregates);
+ this.aggregates = new AggregateTable(maxAggregates);
this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval);
this.sleepMillis = sleepMillis;
- }
-
- private static final class AggregateExpiry
- implements LRUCache.ExpiryListener {
- private final HealthMetrics healthMetrics;
-
- AggregateExpiry(HealthMetrics healthMetrics) {
- this.healthMetrics = healthMetrics;
- }
-
- @Override
- public void accept(Map.Entry expired) {
- if (expired.getValue().getHitCount() > 0) {
- healthMetrics.onStatsAggregateDropped();
- }
- }
+ this.healthMetrics = healthMetrics;
}
public void clearAggregates() {
@@ -126,7 +96,13 @@ private final class Drainer implements MessagePassingQueue.Consumer {
@Override
public void accept(InboxItem item) {
- if (item instanceof SignalItem) {
+ if (item == ClearSignal.CLEAR) {
+ if (!stopped) {
+ aggregates.clear();
+ inbox.clear();
+ }
+ ((SignalItem) item).complete();
+ } else if (item instanceof SignalItem) {
SignalItem signal = (SignalItem) item;
if (!stopped) {
report(wallClockTime(), signal);
@@ -139,64 +115,31 @@ public void accept(InboxItem item) {
}
} else if (item instanceof SpanSnapshot && !stopped) {
SpanSnapshot snapshot = (SpanSnapshot) item;
- MetricKey key = buildMetricKey(snapshot);
- AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric());
- aggregate.recordOneDuration(snapshot.tagAndDuration);
- dirty = true;
+ AggregateMetric aggregate = aggregates.findOrInsert(snapshot);
+ if (aggregate != null) {
+ aggregate.recordOneDuration(snapshot.tagAndDuration);
+ dirty = true;
+ } else {
+ // table at cap with no stale entry available to evict
+ healthMetrics.onStatsAggregateDropped();
+ }
}
}
}
- private static MetricKey buildMetricKey(SpanSnapshot s) {
- return new MetricKey(
- s.resourceName,
- SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE),
- s.operationName,
- s.serviceNameSource,
- s.spanType,
- s.httpStatusCode,
- s.synthetic,
- s.traceRoot,
- SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create),
- materializePeerTags(s.peerTagPairs),
- s.httpMethod,
- s.httpEndpoint,
- s.grpcStatusCode);
- }
-
- private static List materializePeerTags(String[] pairs) {
- if (pairs == null || pairs.length == 0) {
- return Collections.emptyList();
- }
- if (pairs.length == 2) {
- // single-entry fast path (matches the original singletonList shape for INTERNAL spans)
- return Collections.singletonList(encodePeerTag(pairs[0], pairs[1]));
- }
- List tags = new ArrayList<>(pairs.length / 2);
- for (int i = 0; i < pairs.length; i += 2) {
- tags.add(encodePeerTag(pairs[i], pairs[i + 1]));
- }
- return tags;
- }
-
- private static UTF8BytesString encodePeerTag(String name, String value) {
- final Pair, Function>
- cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER);
- return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight());
- }
-
private void report(long when, SignalItem signal) {
boolean skipped = true;
if (dirty) {
try {
- expungeStaleAggregates();
+ aggregates.expungeStaleAggregates();
if (!aggregates.isEmpty()) {
skipped = false;
writer.startBucket(aggregates.size(), when, reportingIntervalNanos);
- for (Map.Entry aggregate : aggregates.entrySet()) {
- writer.add(aggregate.getKey(), aggregate.getValue());
- aggregate.getValue().clear();
- }
+ aggregates.forEach(
+ entry -> {
+ writer.add(entry);
+ entry.aggregate.clear();
+ });
// note that this may do IO and block
writer.finishBucket();
}
@@ -212,17 +155,6 @@ private void report(long when, SignalItem signal) {
}
}
- private void expungeStaleAggregates() {
- Iterator> it = aggregates.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry pair = it.next();
- AggregateMetric metric = pair.getValue();
- if (metric.getHitCount() == 0) {
- it.remove();
- }
- }
- }
-
private long wallClockTime() {
return MILLISECONDS.toNanos(System.currentTimeMillis());
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
index 9ea77140113..c675fcb23c4 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
@@ -8,6 +8,7 @@
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG;
+import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR;
import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT;
import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP;
import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR;
@@ -19,12 +20,8 @@
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
-import datadog.trace.api.Pair;
import datadog.trace.api.WellKnownTags;
-import datadog.trace.api.cache.DDCache;
-import datadog.trace.api.cache.DDCaches;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
-import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.common.metrics.SignalItem.ReportSignal;
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.core.CoreSpan;
@@ -39,7 +36,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import org.jctools.queues.MessagePassingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,22 +47,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private static final Map DEFAULT_HEADERS =
Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);
- static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32);
-
- static final DDCache SPAN_KINDS = DDCaches.newFixedSizeCache(16);
- static final DDCache<
- String, Pair, Function>>
- PEER_TAGS_CACHE =
- DDCaches.newFixedSizeCache(
- 64); // it can be unbounded since those values are returned by the agent and should be
- // under control. 64 entries is enough in this case to contain all the peer tags.
- static final Function<
- String, Pair, Function>>
- PEER_TAGS_CACHE_ADDER =
- key ->
- Pair.of(
- DDCaches.newFixedSizeCache(512),
- value -> UTF8BytesString.create(key + ":" + value));
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
private static final SpanKindFilter METRICS_ELIGIBLE_KINDS =
@@ -418,8 +398,10 @@ private void disable() {
features.discover();
if (!features.supportsMetrics()) {
log.debug("Disabling metric reporting because an agent downgrade was detected");
- this.inbox.clear();
- this.aggregator.clearAggregates();
+ // Route the clear through the inbox so the aggregator thread is the only writer.
+ // AggregateTable is not thread-safe; calling clearAggregates() directly from this thread
+ // would race with Drainer.accept on the aggregator thread.
+ inbox.offer(CLEAR);
}
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java
index 7d66cad6a15..a0625be095b 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java
@@ -28,4 +28,15 @@ private StopSignal() {}
static final class ReportSignal extends SignalItem {
static final ReportSignal REPORT = new ReportSignal();
}
+
+ /**
+ * Posted from arbitrary threads (e.g. the Sink event thread during agent downgrade) so the
+ * aggregator thread is the one that actually performs the table reset. Keeps {@link
+ * AggregateTable} and {@code inbox.clear()} single-writer.
+ */
+ static final class ClearSignal extends SignalItem {
+ static final ClearSignal CLEAR = new ClearSignal();
+
+ private ClearSignal() {}
+ }
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java
deleted file mode 100644
index 9e2e2098d1f..00000000000
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package datadog.trace.common.metrics;
-
-import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY;
-
-import datadog.trace.api.cache.DDCache;
-import datadog.trace.api.cache.DDCaches;
-import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
-import datadog.trace.util.HashingUtils;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-/** The aggregation key for tracked metrics. */
-public final class MetricKey {
- static final DDCache RESOURCE_CACHE = DDCaches.newFixedSizeCache(32);
- static final DDCache SERVICE_CACHE = DDCaches.newFixedSizeCache(8);
- static final DDCache SERVICE_SOURCE_CACHE =
- DDCaches.newFixedSizeCache(16);
- static final DDCache OPERATION_CACHE = DDCaches.newFixedSizeCache(64);
- static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8);
- static final DDCache KIND_CACHE = DDCaches.newFixedSizeCache(8);
- static final DDCache HTTP_METHOD_CACHE = DDCaches.newFixedSizeCache(8);
- static final DDCache HTTP_ENDPOINT_CACHE =
- DDCaches.newFixedSizeCache(32);
- static final DDCache GRPC_STATUS_CODE_CACHE =
- DDCaches.newFixedSizeCache(32);
-
- private final UTF8BytesString resource;
- private final UTF8BytesString service;
- private final UTF8BytesString serviceSource;
- private final UTF8BytesString operationName;
- private final UTF8BytesString type;
- private final int httpStatusCode;
- private final boolean synthetics;
- private final int hash;
- private final boolean isTraceRoot;
- private final UTF8BytesString spanKind;
- private final List peerTags;
- private final UTF8BytesString httpMethod;
- private final UTF8BytesString httpEndpoint;
- private final UTF8BytesString grpcStatusCode;
-
- public MetricKey(
- CharSequence resource,
- CharSequence service,
- CharSequence operationName,
- CharSequence serviceSource,
- CharSequence type,
- int httpStatusCode,
- boolean synthetics,
- boolean isTraceRoot,
- CharSequence spanKind,
- List peerTags,
- CharSequence httpMethod,
- CharSequence httpEndpoint,
- CharSequence grpcStatusCode) {
- this.resource = null == resource ? EMPTY : utf8(RESOURCE_CACHE, resource);
- this.service = null == service ? EMPTY : utf8(SERVICE_CACHE, service);
- this.serviceSource = null == serviceSource ? null : utf8(SERVICE_SOURCE_CACHE, serviceSource);
- this.operationName = null == operationName ? EMPTY : utf8(OPERATION_CACHE, operationName);
- this.type = null == type ? EMPTY : utf8(TYPE_CACHE, type);
- this.httpStatusCode = httpStatusCode;
- this.synthetics = synthetics;
- this.isTraceRoot = isTraceRoot;
- this.spanKind = null == spanKind ? EMPTY : utf8(KIND_CACHE, spanKind);
- this.peerTags = peerTags == null ? Collections.emptyList() : peerTags;
- this.httpMethod = httpMethod == null ? null : utf8(HTTP_METHOD_CACHE, httpMethod);
- this.httpEndpoint = httpEndpoint == null ? null : utf8(HTTP_ENDPOINT_CACHE, httpEndpoint);
- this.grpcStatusCode =
- grpcStatusCode == null ? null : utf8(GRPC_STATUS_CODE_CACHE, grpcStatusCode);
-
- int tmpHash = 0;
- tmpHash = HashingUtils.addToHash(tmpHash, this.isTraceRoot);
- tmpHash = HashingUtils.addToHash(tmpHash, this.spanKind);
- tmpHash = HashingUtils.addToHash(tmpHash, this.peerTags);
- tmpHash = HashingUtils.addToHash(tmpHash, this.resource);
- tmpHash = HashingUtils.addToHash(tmpHash, this.service);
- tmpHash = HashingUtils.addToHash(tmpHash, this.operationName);
- tmpHash = HashingUtils.addToHash(tmpHash, this.type);
- tmpHash = HashingUtils.addToHash(tmpHash, this.httpStatusCode);
- tmpHash = HashingUtils.addToHash(tmpHash, this.synthetics);
- tmpHash = HashingUtils.addToHash(tmpHash, this.serviceSource);
- tmpHash = HashingUtils.addToHash(tmpHash, this.httpEndpoint);
- tmpHash = HashingUtils.addToHash(tmpHash, this.httpMethod);
- tmpHash = HashingUtils.addToHash(tmpHash, this.grpcStatusCode);
- this.hash = tmpHash;
- }
-
- static UTF8BytesString utf8(DDCache cache, CharSequence charSeq) {
- if (charSeq instanceof UTF8BytesString) {
- return (UTF8BytesString) charSeq;
- } else {
- return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create);
- }
- }
-
- public UTF8BytesString getResource() {
- return resource;
- }
-
- public UTF8BytesString getService() {
- return service;
- }
-
- public UTF8BytesString getServiceSource() {
- return serviceSource;
- }
-
- public UTF8BytesString getOperationName() {
- return operationName;
- }
-
- public UTF8BytesString getType() {
- return type;
- }
-
- public int getHttpStatusCode() {
- return httpStatusCode;
- }
-
- public boolean isSynthetics() {
- return synthetics;
- }
-
- public boolean isTraceRoot() {
- return isTraceRoot;
- }
-
- public UTF8BytesString getSpanKind() {
- return spanKind;
- }
-
- public List getPeerTags() {
- return peerTags;
- }
-
- public UTF8BytesString getHttpMethod() {
- return httpMethod;
- }
-
- public UTF8BytesString getHttpEndpoint() {
- return httpEndpoint;
- }
-
- public UTF8BytesString getGrpcStatusCode() {
- return grpcStatusCode;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if ((o instanceof MetricKey)) {
- MetricKey metricKey = (MetricKey) o;
- return hash == metricKey.hash
- && synthetics == metricKey.synthetics
- && httpStatusCode == metricKey.httpStatusCode
- && resource.equals(metricKey.resource)
- && service.equals(metricKey.service)
- && operationName.equals(metricKey.operationName)
- && type.equals(metricKey.type)
- && isTraceRoot == metricKey.isTraceRoot
- && spanKind.equals(metricKey.spanKind)
- && peerTags.equals(metricKey.peerTags)
- && Objects.equals(serviceSource, metricKey.serviceSource)
- && Objects.equals(httpMethod, metricKey.httpMethod)
- && Objects.equals(httpEndpoint, metricKey.httpEndpoint)
- && Objects.equals(grpcStatusCode, metricKey.grpcStatusCode);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return hash;
- }
-}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java
index fa26ed2e5db..c31825f6af8 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java
@@ -3,7 +3,11 @@
public interface MetricWriter {
void startBucket(int metricCount, long start, long duration);
- void add(MetricKey key, AggregateMetric aggregate);
+ /**
+ * Serialize one aggregate. The {@link AggregateEntry} carries both the label fields (resource,
+ * service, span.kind, peer tags, etc.) and the {@link AggregateMetric} counters being reported.
+ */
+ void add(AggregateEntry entry);
void finishBucket();
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
index 0f84964e9db..ba6ae6c2699 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
@@ -142,12 +142,13 @@ public void startBucket(int metricCount, long start, long duration) {
}
@Override
- public void add(MetricKey key, AggregateMetric aggregate) {
+ public void add(AggregateEntry entry) {
+ final AggregateMetric aggregate = entry.aggregate;
// Calculate dynamic map size based on optional fields
- final boolean hasHttpMethod = key.getHttpMethod() != null;
- final boolean hasHttpEndpoint = key.getHttpEndpoint() != null;
- final boolean hasServiceSource = key.getServiceSource() != null;
- final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null;
+ final boolean hasHttpMethod = entry.getHttpMethod() != null;
+ final boolean hasHttpEndpoint = entry.getHttpEndpoint() != null;
+ final boolean hasServiceSource = entry.getServiceSource() != null;
+ final boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null;
final int mapSize =
15
+ (hasServiceSource ? 1 : 0)
@@ -158,31 +159,31 @@ public void add(MetricKey key, AggregateMetric aggregate) {
writer.startMap(mapSize);
writer.writeUTF8(NAME);
- writer.writeUTF8(key.getOperationName());
+ writer.writeUTF8(entry.getOperationName());
writer.writeUTF8(SERVICE);
- writer.writeUTF8(key.getService());
+ writer.writeUTF8(entry.getService());
writer.writeUTF8(RESOURCE);
- writer.writeUTF8(key.getResource());
+ writer.writeUTF8(entry.getResource());
writer.writeUTF8(TYPE);
- writer.writeUTF8(key.getType());
+ writer.writeUTF8(entry.getType());
writer.writeUTF8(HTTP_STATUS_CODE);
- writer.writeInt(key.getHttpStatusCode());
+ writer.writeInt(entry.getHttpStatusCode());
writer.writeUTF8(SYNTHETICS);
- writer.writeBoolean(key.isSynthetics());
+ writer.writeBoolean(entry.isSynthetics());
writer.writeUTF8(IS_TRACE_ROOT);
- writer.writeInt(key.isTraceRoot() ? TRISTATE_TRUE : TRISTATE_FALSE);
+ writer.writeInt(entry.isTraceRoot() ? TRISTATE_TRUE : TRISTATE_FALSE);
writer.writeUTF8(SPAN_KIND);
- writer.writeUTF8(key.getSpanKind());
+ writer.writeUTF8(entry.getSpanKind());
writer.writeUTF8(PEER_TAGS);
- final List peerTags = key.getPeerTags();
+ final List peerTags = entry.getPeerTags();
writer.startArray(peerTags.size());
for (UTF8BytesString peerTag : peerTags) {
@@ -191,24 +192,24 @@ public void add(MetricKey key, AggregateMetric aggregate) {
if (hasServiceSource) {
writer.writeUTF8(SERVICE_SOURCE);
- writer.writeUTF8(key.getServiceSource());
+ writer.writeUTF8(entry.getServiceSource());
}
// Only include HTTPMethod if present
if (hasHttpMethod) {
writer.writeUTF8(HTTP_METHOD);
- writer.writeUTF8(key.getHttpMethod());
+ writer.writeUTF8(entry.getHttpMethod());
}
// Only include HTTPEndpoint if present
if (hasHttpEndpoint) {
writer.writeUTF8(HTTP_ENDPOINT);
- writer.writeUTF8(key.getHttpEndpoint());
+ writer.writeUTF8(entry.getHttpEndpoint());
}
// Only include GRPCStatusCode if present (rpc-type spans)
if (hasGrpcStatusCode) {
writer.writeUTF8(GRPC_STATUS_CODE);
- writer.writeUTF8(key.getGrpcStatusCode());
+ writer.writeUTF8(entry.getGrpcStatusCode());
}
writer.writeUTF8(HITS);
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
index 2816fad0411..b7f81712945 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
@@ -2,7 +2,8 @@
/**
* Immutable per-span value posted from the producer to the aggregator thread. Carries the raw
- * inputs the aggregator needs to build a {@link MetricKey} and update an {@link AggregateMetric}.
+ * inputs the aggregator needs to build an {@link AggregateEntry} and update its {@link
+ * AggregateMetric}.
*
* All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the
* aggregator thread; the producer just shuffles references.
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy
index 962ad2ce892..4dd0155443a 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy
@@ -119,7 +119,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then:
latchTriggered
1 * writer.startBucket(1, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
null,
"service",
"operation",
@@ -133,8 +133,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), _) >> { MetricKey key, AggregateMetric value ->
- value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100
}
1 * writer.finishBucket() >> { latch.countDown() }
@@ -165,7 +165,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then:
latchTriggered
1 * writer.startBucket(1, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -179,8 +179,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), _) >> { MetricKey key, AggregateMetric value ->
- value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100
}
1 * writer.finishBucket() >> { latch.countDown() }
@@ -217,7 +217,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered == statsComputed
(statsComputed ? 1 : 0) * writer.startBucket(1, _, _)
(statsComputed ? 1 : 0) * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -231,9 +231,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
httpMethod,
httpEndpoint,
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100
+ }
(statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -279,7 +279,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(2, _, _)
1 * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -293,11 +293,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100
+ }
1 * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -311,9 +311,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -344,7 +344,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(1, _, _)
1 * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -358,9 +358,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -396,7 +396,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then:
latchTriggered
1 * writer.startBucket(1, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -410,9 +410,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == topLevelCount && e.aggregate.getDuration() == 100
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -455,7 +455,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.finishBucket() >> { latch.countDown() }
1 * writer.startBucket(2, _, SECONDS.toNanos(reportingInterval))
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -469,10 +469,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == count && value.getDuration() == count * duration
- })
- 1 * writer.add(new MetricKey(
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration
+ }
+ 1 * writer.add(AggregateEntry.of(
"resource2",
"service2",
"operation2",
@@ -486,9 +486,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == count && value.getDuration() == count * duration * 2
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration * 2
+ }
cleanup:
aggregator.close()
@@ -526,7 +526,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then: "should aggregate into single metric"
latchTriggered
1 * writer.startBucket(1, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -540,9 +540,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"GET",
"/api/users/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == count && value.getDuration() == count * duration
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration
+ }
1 * writer.finishBucket() >> { latch.countDown() }
when: "publish spans with different endpoints"
@@ -567,7 +567,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then: "should create separate metrics for each endpoint/method combination"
latchTriggered2
1 * writer.startBucket(3, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -581,10 +581,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"GET",
"/api/users/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
- })
- 1 * writer.add(new MetricKey(
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
+ }
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -598,10 +598,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"GET",
"/api/orders/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration * 2
- })
- 1 * writer.add(new MetricKey(
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2
+ }
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -615,9 +615,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"POST",
"/api/users/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration * 3
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 3
+ }
1 * writer.finishBucket() >> { latch2.countDown() }
cleanup:
@@ -665,7 +665,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then: "should create 4 separate metrics"
latchTriggered
1 * writer.startBucket(4, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -679,10 +679,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"GET",
"/api/users/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
- })
- 1 * writer.add(new MetricKey(
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
+ }
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -696,10 +696,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"POST",
"/api/users/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration * 2
- })
- 1 * writer.add(new MetricKey(
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2
+ }
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -713,10 +713,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"GET",
"/api/users/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration * 3
- })
- 1 * writer.add(new MetricKey(
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 3
+ }
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -730,9 +730,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"GET",
"/api/orders/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration * 4
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 4
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -769,7 +769,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then: "should create separate metric keys for spans with and without HTTP tags"
latchTriggered
1 * writer.startBucket(2, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -783,10 +783,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
- })
- 1 * writer.add(new MetricKey(
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
+ }
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -800,9 +800,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"GET",
"/api/users/:id",
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration * 2
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -837,7 +837,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then: "should create the different metric keys for spans with and without sources"
latchTriggered
1 * writer.startBucket(2, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -851,10 +851,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == 2 && value.getDuration() == 2 * duration
- })
- 1 * writer.add(new MetricKey(
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 2 && e.aggregate.getDuration() == 2 * duration
+ }
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service",
"operation",
@@ -868,16 +868,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
aggregator.close()
}
- def "test least recently written to aggregate flushed when size limit exceeded"() {
+ def "new aggregates beyond size limit are dropped when no stale entries can be evicted"() {
+ // The table only evicts entries with hitCount == 0 to make room. When all entries are live
+ // (all have been recorded against), an over-cap insert drops the new key rather than evicting
+ // an established one. This protects the data we've already collected from a burst of new keys.
setup:
int maxAggregates = 10
MetricWriter writer = Mock(MetricWriter)
@@ -901,11 +904,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
aggregator.report()
def latchTriggered = latch.await(2, SECONDS)
- then: "the first aggregate should be dropped but the rest reported"
+ then: "the established service0..service9 are reported; service10 is dropped"
latchTriggered
1 * writer.startBucket(10, _, SECONDS.toNanos(reportingInterval))
- for (int i = 1; i < 11; ++i) {
- 1 * writer.add(new MetricKey(
+ for (int i = 0; i < 10; ++i) {
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service" + i,
"operation",
@@ -919,13 +922,13 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), _) >> { MetricKey key, AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
}
}
- 0 * writer.add(new MetricKey(
+ 0 * writer.add(AggregateEntry.of(
"resource",
- "service0",
+ "service10",
"operation",
null,
"type",
@@ -937,7 +940,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), _)
+ ))
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -1052,7 +1055,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval))
for (int i = 0; i < 5; ++i) {
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service" + i,
"operation",
@@ -1066,9 +1069,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
+ }
}
1 * writer.finishBucket() >> { latch.countDown() }
@@ -1087,7 +1090,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(4, _, SECONDS.toNanos(reportingInterval))
for (int i = 1; i < 5; ++i) {
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service" + i,
"operation",
@@ -1101,11 +1104,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
+ }
}
- 0 * writer.add(new MetricKey(
+ 0 * writer.add(AggregateEntry.of(
"resource",
"service0",
"operation",
@@ -1119,7 +1122,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), _)
+ ))
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -1154,7 +1157,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval))
for (int i = 0; i < 5; ++i) {
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service" + i,
"operation",
@@ -1168,9 +1171,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
+ }
}
1 * writer.finishBucket() >> { latch.countDown() }
@@ -1180,7 +1183,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then: "aggregate not updated in cycle is not reported"
0 * writer.finishBucket()
0 * writer.startBucket(_, _, _)
- 0 * writer.add(_, _)
+ 0 * writer.add(_)
cleanup:
aggregator.close()
@@ -1213,7 +1216,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(5, _, SECONDS.toNanos(1))
for (int i = 0; i < 5; ++i) {
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"resource",
"service" + i,
"operation",
@@ -1227,9 +1230,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric value ->
- value.getHitCount() == 1 && value.getDuration() == duration
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration
+ }
}
1 * writer.finishBucket() >> { latch.countDown() }
@@ -1380,7 +1383,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(1, _, _)
1 * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -1394,9 +1397,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -1435,7 +1438,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(1, _, _)
1 * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -1449,9 +1452,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 3 && aggregateMetric.getTopLevelCount() == 3 && aggregateMetric.getDuration() == 450
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 3 && e.aggregate.getTopLevelCount() == 3 && e.aggregate.getDuration() == 450
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -1490,7 +1493,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
latchTriggered
1 * writer.startBucket(3, _, _)
1 * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -1504,11 +1507,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"GET",
"/api/users/:id",
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100
+ }
1 * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -1522,11 +1525,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"POST",
"/api/orders",
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 200
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 200
+ }
1 * writer.add(
- new MetricKey(
+ AggregateEntry.of(
"resource",
"service",
"operation",
@@ -1540,9 +1543,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), { AggregateMetric aggregateMetric ->
- aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 150
- })
+ )) >> { AggregateEntry e ->
+ e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 150
+ }
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
@@ -1578,7 +1581,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
then:
latchTriggered
1 * writer.startBucket(3, _, _)
- 1 * writer.add(new MetricKey(
+ 1 * writer.add(AggregateEntry.of(
"grpc.service/Method",
"service",
"grpc.server",
@@ -1592,8 +1595,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
"0"
- ), _)
- 1 * writer.add(new MetricKey(
+ ))
+ 1 * writer.add(AggregateEntry.of(
"grpc.service/Method",
"service",
"grpc.server",
@@ -1607,8 +1610,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
"5"
- ), _)
- 1 * writer.add(new MetricKey(
+ ))
+ 1 * writer.add(AggregateEntry.of(
"GET /api",
"service",
"http.request",
@@ -1622,7 +1625,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
null,
null,
null
- ), _)
+ ))
1 * writer.finishBucket() >> { latch.countDown() }
cleanup:
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy
index 3ff81de9851..08f0f7cbb92 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy
@@ -7,7 +7,6 @@ import static java.util.concurrent.TimeUnit.SECONDS
import datadog.metrics.api.Histograms
import datadog.metrics.impl.DDSketchHistograms
import datadog.trace.api.Config
-import datadog.trace.api.Pair
import datadog.trace.api.ProcessTags
import datadog.trace.api.WellKnownTags
import datadog.trace.api.git.CommitInfo
@@ -26,6 +25,30 @@ class SerializingMetricWriterTest extends DDSpecification {
Histograms.register(DDSketchHistograms.FACTORY)
}
+ /** Build an {@link AggregateEntry} with a pre-recorded duration count. */
+ private static AggregateEntry entry(
+ CharSequence resource,
+ CharSequence service,
+ CharSequence operationName,
+ CharSequence serviceSource,
+ CharSequence type,
+ int httpStatusCode,
+ boolean synthetic,
+ boolean traceRoot,
+ CharSequence spanKind,
+ List peerTags,
+ CharSequence httpMethod,
+ CharSequence httpEndpoint,
+ CharSequence grpcStatusCode,
+ int hitCount) {
+ AggregateEntry e = AggregateEntry.of(
+ resource, service, operationName, serviceSource, type,
+ httpStatusCode, synthetic, traceRoot, spanKind, peerTags,
+ httpMethod, httpEndpoint, grpcStatusCode)
+ e.aggregate.recordDurations(hitCount, new AtomicLongArray(1L))
+ return e
+ }
+
def "should produce correct message #iterationIndex with process tags enabled #withProcessTags" () {
setup:
if (!withProcessTags) {
@@ -40,8 +63,8 @@ class SerializingMetricWriterTest extends DDSpecification {
when:
writer.startBucket(content.size(), startTime, duration)
- for (Pair pair : content) {
- writer.add(pair.getLeft(), pair.getRight())
+ for (AggregateEntry e : content) {
+ writer.add(e)
}
writer.finishBucket()
@@ -55,88 +78,40 @@ class SerializingMetricWriterTest extends DDSpecification {
where:
content << [
[
- Pair.of(
- new MetricKey(
- "resource1",
- "service1",
- "operation1",
- null,
- "type",
- 0,
- false,
- false,
- "client",
+ entry(
+ "resource1", "service1", "operation1", null, "type", 0,
+ false, false, "client",
[
UTF8BytesString.create("country:canada"),
UTF8BytesString.create("georegion:amer"),
UTF8BytesString.create("peer.service:remote-service")
],
- null,
- null,
- null
- ),
- new AggregateMetric().recordDurations(10, new AtomicLongArray(1L))
- ),
- Pair.of(
- new MetricKey(
- "resource2",
- "service2",
- "operation2",
- null,
- "type2",
- 200,
- true,
- false,
- "producer",
+ null, null, null,
+ 10),
+ entry(
+ "resource2", "service2", "operation2", null, "type2", 200,
+ true, false, "producer",
[
UTF8BytesString.create("country:canada"),
UTF8BytesString.create("georegion:amer"),
UTF8BytesString.create("peer.service:remote-service")
],
- null,
- null,
- null
- ),
- new AggregateMetric().recordDurations(9, new AtomicLongArray(1L))
- ),
- Pair.of(
- new MetricKey(
- "GET /api/users/:id",
- "web-service",
- "http.request",
- null,
- "web",
- 200,
- false,
- true,
- "server",
+ null, null, null,
+ 9),
+ entry(
+ "GET /api/users/:id", "web-service", "http.request", null, "web", 200,
+ false, true, "server",
[],
- "GET",
- "/api/users/:id",
- null
- ),
- new AggregateMetric().recordDurations(5, new AtomicLongArray(1L))
- )
+ null, null, null,
+ 5)
],
(0..10000).collect({ i ->
- Pair.of(
- new MetricKey(
- "resource" + i,
- "service" + i,
- "operation" + i,
- null,
- "type",
- 0,
- false,
- false,
- "producer",
+ entry(
+ "resource" + i, "service" + i, "operation" + i, null, "type", 0,
+ false, false, "producer",
[UTF8BytesString.create("messaging.destination:dest" + i)],
- null,
- null,
- null
- ),
- new AggregateMetric().recordDurations(10, new AtomicLongArray(1L))
- )
+ null, null, null,
+ 10)
})
]
withProcessTags << [true, false]
@@ -148,22 +123,18 @@ class SerializingMetricWriterTest extends DDSpecification {
long duration = SECONDS.toNanos(10)
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
- // Create keys with different combinations of HTTP fields
- def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null)
- def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null)
+ def entryNoSource = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1)
+ def entryWithSource = entry("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null, 1)
- def content = [
- Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- ]
+ def content = [entryNoSource, entryWithSource]
ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)
when:
writer.startBucket(content.size(), startTime, duration)
- for (Pair pair : content) {
- writer.add(pair.getLeft(), pair.getRight())
+ for (AggregateEntry e : content) {
+ writer.add(e)
}
writer.finishBucket()
@@ -177,34 +148,25 @@ class SerializingMetricWriterTest extends DDSpecification {
long duration = SECONDS.toNanos(10)
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
- // Create keys with different combinations of HTTP fields
- def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null)
- def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null,null)
- def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders",null)
- def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null)
-
- def content = [
- Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L)))
- ]
+ def entryWithBoth = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1)
+ def entryWithMethodOnly = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null, 1)
+ def entryWithEndpointOnly = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null, 1)
+ def entryWithNeither = entry("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null, 1)
+
+ def content = [entryWithBoth, entryWithMethodOnly, entryWithEndpointOnly, entryWithNeither]
ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)
when:
writer.startBucket(content.size(), startTime, duration)
- for (Pair pair : content) {
- writer.add(pair.getLeft(), pair.getRight())
+ for (AggregateEntry e : content) {
+ writer.add(e)
}
writer.finishBucket()
then:
sink.validatedInput()
- // Test passes if validation in ValidatingSink succeeds
- // ValidatingSink verifies that map size matches actual number of fields
- // and that HTTPMethod/HTTPEndpoint are only present when non-empty
}
def "add git sha commit info when sha commit is #shaCommit"() {
@@ -216,40 +178,63 @@ class SerializingMetricWriterTest extends DDSpecification {
long duration = SECONDS.toNanos(10)
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
- // Create keys with different combinations of HTTP fields
- def key = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null)
+ def e = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1)
- def content = [Pair.of(key, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),]
+ def content = [e]
ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128, gitInfoProvider)
when:
-
writer.startBucket(content.size(), startTime, duration)
- for (Pair pair : content) {
- writer.add(pair.getLeft(), pair.getRight())
+ for (AggregateEntry entryItem : content) {
+ writer.add(entryItem)
}
writer.finishBucket()
then:
-
sink.validatedInput()
where:
shaCommit << [null, "123456"]
}
+ def "GRPCStatusCode field is present in payload for rpc-type spans"() {
+ setup:
+ long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
+ long duration = SECONDS.toNanos(10)
+ WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
+
+ def entryWithGrpc = entry("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK", 1)
+ def entryWithGrpcError = entry("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND", 1)
+ def entryWithoutGrpc = entry("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null, 1)
+
+ def content = [entryWithGrpc, entryWithGrpcError, entryWithoutGrpc]
+
+ ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
+ SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)
+
+ when:
+ writer.startBucket(content.size(), startTime, duration)
+ for (AggregateEntry e : content) {
+ writer.add(e)
+ }
+ writer.finishBucket()
+
+ then:
+ sink.validatedInput()
+ }
+
static class ValidatingSink implements Sink {
private final WellKnownTags wellKnownTags
private final long startTimeNanos
private final long duration
private boolean validated = false
- private List> content
+ private List content
ValidatingSink(WellKnownTags wellKnownTags, long startTimeNanos, long duration,
- List> content) {
+ List content) {
this.wellKnownTags = wellKnownTags
this.startTimeNanos = startTimeNanos
this.duration = duration
@@ -298,70 +283,69 @@ class SerializingMetricWriterTest extends DDSpecification {
assert unpacker.unpackString() == "Stats"
int statCount = unpacker.unpackArrayHeader()
assert statCount == content.size()
- for (Pair pair : content) {
- MetricKey key = pair.getLeft()
- AggregateMetric value = pair.getRight()
+ for (AggregateEntry entry : content) {
+ AggregateMetric value = entry.aggregate
int metricMapSize = unpacker.unpackMapHeader()
// Calculate expected map size based on optional fields
- boolean hasHttpMethod = key.getHttpMethod() != null
- boolean hasHttpEndpoint = key.getHttpEndpoint() != null
- boolean hasServiceSource = key.getServiceSource() != null
- boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null
+ boolean hasHttpMethod = entry.getHttpMethod() != null
+ boolean hasHttpEndpoint = entry.getHttpEndpoint() != null
+ boolean hasServiceSource = entry.getServiceSource() != null
+ boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null
int expectedMapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0)
assert metricMapSize == expectedMapSize
int elementCount = 0
assert unpacker.unpackString() == "Name"
- assert unpacker.unpackString() == key.getOperationName() as String
+ assert unpacker.unpackString() == entry.getOperationName() as String
++elementCount
assert unpacker.unpackString() == "Service"
- assert unpacker.unpackString() == key.getService() as String
+ assert unpacker.unpackString() == entry.getService() as String
++elementCount
assert unpacker.unpackString() == "Resource"
- assert unpacker.unpackString() == key.getResource() as String
+ assert unpacker.unpackString() == entry.getResource() as String
++elementCount
assert unpacker.unpackString() == "Type"
- assert unpacker.unpackString() == key.getType() as String
+ assert unpacker.unpackString() == entry.getType() as String
++elementCount
assert unpacker.unpackString() == "HTTPStatusCode"
- assert unpacker.unpackInt() == key.getHttpStatusCode()
+ assert unpacker.unpackInt() == entry.getHttpStatusCode()
++elementCount
assert unpacker.unpackString() == "Synthetics"
- assert unpacker.unpackBoolean() == key.isSynthetics()
+ assert unpacker.unpackBoolean() == entry.isSynthetics()
++elementCount
assert unpacker.unpackString() == "IsTraceRoot"
- assert unpacker.unpackInt() == (key.isTraceRoot() ? TriState.TRUE.serialValue : TriState.FALSE.serialValue)
+ assert unpacker.unpackInt() == (entry.isTraceRoot() ? TriState.TRUE.serialValue : TriState.FALSE.serialValue)
++elementCount
assert unpacker.unpackString() == "SpanKind"
- assert unpacker.unpackString() == key.getSpanKind() as String
+ assert unpacker.unpackString() == entry.getSpanKind() as String
++elementCount
assert unpacker.unpackString() == "PeerTags"
int peerTagsLength = unpacker.unpackArrayHeader()
- assert peerTagsLength == key.getPeerTags().size()
+ assert peerTagsLength == entry.getPeerTags().size()
for (int i = 0; i < peerTagsLength; i++) {
def unpackedPeerTag = unpacker.unpackString()
- assert unpackedPeerTag == key.getPeerTags()[i].toString()
+ assert unpackedPeerTag == entry.getPeerTags()[i].toString()
}
++elementCount
// Service source is only present when the service name has been overridden by the tracer
if (hasServiceSource) {
assert unpacker.unpackString() == "srv_src"
- assert unpacker.unpackString() == key.getServiceSource().toString()
+ assert unpacker.unpackString() == entry.getServiceSource().toString()
++elementCount
}
// HTTPMethod and HTTPEndpoint are optional - only present if non-null
if (hasHttpMethod) {
assert unpacker.unpackString() == "HTTPMethod"
- assert unpacker.unpackString() == key.getHttpMethod() as String
+ assert unpacker.unpackString() == entry.getHttpMethod() as String
++elementCount
}
if (hasHttpEndpoint) {
assert unpacker.unpackString() == "HTTPEndpoint"
- assert unpacker.unpackString() == key.getHttpEndpoint() as String
+ assert unpacker.unpackString() == entry.getHttpEndpoint() as String
++elementCount
}
if (hasGrpcStatusCode) {
assert unpacker.unpackString() == "GRPCStatusCode"
- assert unpacker.unpackString() == key.getGrpcStatusCode() as String
+ assert unpacker.unpackString() == entry.getGrpcStatusCode() as String
++elementCount
}
assert unpacker.unpackString() == "Hits"
@@ -397,99 +381,4 @@ class SerializingMetricWriterTest extends DDSpecification {
return validated
}
}
-
- def "ServiceSource optional in the payload"() {
- setup:
- long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
- long duration = SECONDS.toNanos(10)
- WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
-
- // Create keys with different combinations of HTTP fields
- def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null)
- def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null)
-
- def content = [
- Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- ]
-
- ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
- SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)
-
- when:
- writer.startBucket(content.size(), startTime, duration)
- for (Pair pair : content) {
- writer.add(pair.getLeft(), pair.getRight())
- }
- writer.finishBucket()
-
- then:
- sink.validatedInput()
- }
-
- def "GRPCStatusCode field is present in payload for rpc-type spans"() {
- setup:
- long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
- long duration = SECONDS.toNanos(10)
- WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
-
- def keyWithGrpc = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK")
- def keyWithGrpcError = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND")
- def keyWithoutGrpc = new MetricKey("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null)
-
- def content = [
- Pair.of(keyWithGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithGrpcError, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithoutGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L)))
- ]
-
- ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
- SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)
-
- when:
- writer.startBucket(content.size(), startTime, duration)
- for (Pair pair : content) {
- writer.add(pair.getLeft(), pair.getRight())
- }
- writer.finishBucket()
-
- then:
- sink.validatedInput()
- }
-
- def "HTTPMethod and HTTPEndpoint fields are optional in payload"() {
- setup:
- long startTime = MILLISECONDS.toNanos(System.currentTimeMillis())
- long duration = SECONDS.toNanos(10)
- WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
-
- // Create keys with different combinations of HTTP fields
- def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null)
- def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null)
- def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null)
- def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null)
-
- def content = [
- Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),
- Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L)))
- ]
-
- ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content)
- SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128)
-
- when:
- writer.startBucket(content.size(), startTime, duration)
- for (Pair pair : content) {
- writer.add(pair.getLeft(), pair.getRight())
- }
- writer.finishBucket()
-
- then:
- sink.validatedInput()
- // Test passes if validation in ValidatingSink succeeds
- // ValidatingSink verifies that map size matches actual number of fields
- // and that HTTPMethod/HTTPEndpoint are only present when non-empty
- }
}
diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java
new file mode 100644
index 00000000000..44f2b36cb6b
--- /dev/null
+++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java
@@ -0,0 +1,234 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
+import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import datadog.metrics.agent.AgentMeter;
+import datadog.metrics.api.statsd.StatsDClient;
+import datadog.metrics.impl.DDSketchHistograms;
+import datadog.metrics.impl.MonitoringImpl;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class AggregateTableTest {
+
+ @BeforeAll
+ static void initAgentMeter() {
+ // AggregateMetric.recordOneDuration -> Histogram.accept needs AgentMeter to be initialized.
+ // Mirror what AggregateMetricTest does.
+ MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS);
+ AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY);
+ monitoring.newTimer("test.init");
+ }
+
+ @Test
+ void insertOnMissReturnsNewAggregate() {
+ AggregateTable table = new AggregateTable(8);
+ SpanSnapshot s = snapshot("svc", "op", "client");
+
+ AggregateMetric agg = table.findOrInsert(s);
+
+ assertNotNull(agg);
+ assertEquals(1, table.size());
+ assertEquals(0, agg.getHitCount());
+ }
+
+ @Test
+ void hitReturnsSameAggregateInstance() {
+ AggregateTable table = new AggregateTable(8);
+ SpanSnapshot s1 = snapshot("svc", "op", "client");
+ SpanSnapshot s2 = snapshot("svc", "op", "client");
+
+ AggregateMetric first = table.findOrInsert(s1);
+ AggregateMetric second = table.findOrInsert(s2);
+
+ assertSame(first, second);
+ assertEquals(1, table.size());
+ }
+
+ @Test
+ void differentKindFieldsAreDistinct() {
+ AggregateTable table = new AggregateTable(8);
+
+ AggregateMetric clientAgg = table.findOrInsert(snapshot("svc", "op", "client"));
+ AggregateMetric serverAgg = table.findOrInsert(snapshot("svc", "op", "server"));
+
+ assertNotSame(clientAgg, serverAgg);
+ assertEquals(2, table.size());
+ }
+
+ @Test
+ void peerTagPairsParticipateInIdentity() {
+ AggregateTable table = new AggregateTable(8);
+ SpanSnapshot withTags =
+ builder("svc", "op", "client").peerTags("peer.hostname", "host-a").build();
+ SpanSnapshot otherTags =
+ builder("svc", "op", "client").peerTags("peer.hostname", "host-b").build();
+ SpanSnapshot noTags = builder("svc", "op", "client").build();
+
+ AggregateMetric a = table.findOrInsert(withTags);
+ AggregateMetric b = table.findOrInsert(otherTags);
+ AggregateMetric c = table.findOrInsert(noTags);
+
+ assertNotSame(a, b);
+ assertNotSame(a, c);
+ assertNotSame(b, c);
+ assertEquals(3, table.size());
+ }
+
+ @Test
+ void capOverrunEvictsStaleEntry() {
+ AggregateTable table = new AggregateTable(2);
+
+ AggregateMetric stale = table.findOrInsert(snapshot("svc-a", "op", "client"));
+ // do not record on stale -> hitCount stays at 0
+
+ AggregateMetric live = table.findOrInsert(snapshot("svc-b", "op", "client"));
+ live.recordOneDuration(10L | TOP_LEVEL_TAG); // hitCount=1, not evictable
+
+ // table is full (size=2). Inserting a third should evict the stale one and succeed.
+ AggregateMetric newcomer = table.findOrInsert(snapshot("svc-c", "op", "client"));
+ assertNotNull(newcomer);
+ assertEquals(2, table.size());
+
+ // re-inserting the stale snapshot should miss now (it was evicted) and produce a fresh entry
+ AggregateMetric staleAgain = table.findOrInsert(snapshot("svc-a", "op", "client"));
+ assertNotSame(stale, staleAgain);
+ }
+
+ @Test
+ void capOverrunWithNoStaleReturnsNull() {
+ AggregateTable table = new AggregateTable(2);
+
+ AggregateMetric a = table.findOrInsert(snapshot("svc-a", "op", "client"));
+ AggregateMetric b = table.findOrInsert(snapshot("svc-b", "op", "client"));
+ a.recordOneDuration(10L);
+ b.recordOneDuration(20L);
+
+ AggregateMetric c = table.findOrInsert(snapshot("svc-c", "op", "client"));
+ assertNull(c);
+ assertEquals(2, table.size());
+ }
+
+ @Test
+ void expungeStaleAggregatesRemovesZeroHitsOnly() {
+ AggregateTable table = new AggregateTable(16);
+
+ AggregateMetric live = table.findOrInsert(snapshot("svc-live", "op", "client"));
+ live.recordOneDuration(10L);
+ AggregateMetric stale1 = table.findOrInsert(snapshot("svc-stale1", "op", "client"));
+ AggregateMetric stale2 = table.findOrInsert(snapshot("svc-stale2", "op", "client"));
+ assertEquals(3, table.size());
+ assertEquals(0, stale1.getHitCount());
+ assertEquals(0, stale2.getHitCount());
+
+ table.expungeStaleAggregates();
+
+ assertEquals(1, table.size());
+ // the live entry must still be reachable
+ assertSame(live, table.findOrInsert(snapshot("svc-live", "op", "client")));
+ }
+
+ @Test
+ void forEachVisitsEveryEntry() {
+ AggregateTable table = new AggregateTable(8);
+ table.findOrInsert(snapshot("a", "op", "client")).recordOneDuration(1L);
+ table.findOrInsert(snapshot("b", "op", "client")).recordOneDuration(2L);
+ table.findOrInsert(snapshot("c", "op", "client")).recordOneDuration(3L | ERROR_TAG);
+
+ Map visited = new HashMap<>();
+ table.forEach(e -> visited.put(e.getService().toString(), e.aggregate.getDuration()));
+
+ assertEquals(3, visited.size());
+ assertEquals(1L, visited.get("a"));
+ assertEquals(2L, visited.get("b"));
+ assertEquals(3L, visited.get("c"));
+ }
+
+ @Test
+ void clearEmptiesTheTable() {
+ AggregateTable table = new AggregateTable(8);
+ table.findOrInsert(snapshot("a", "op", "client"));
+ table.findOrInsert(snapshot("b", "op", "client"));
+ assertEquals(2, table.size());
+
+ table.clear();
+
+ assertTrue(table.isEmpty());
+ assertEquals(0, table.size());
+ // and re-insertion works after clear
+ assertNotNull(table.findOrInsert(snapshot("a", "op", "client")));
+ }
+
+ @Test
+ void encodedLabelsAreBuiltOnInsert() {
+ AggregateTable table = new AggregateTable(4);
+ List seen = new ArrayList<>();
+ table.findOrInsert(snapshot("svc", "op", "client"));
+ table.forEach(seen::add);
+
+ assertEquals(1, seen.size());
+ AggregateEntry e = seen.get(0);
+ assertEquals("svc", e.getService().toString());
+ assertEquals("op", e.getOperationName().toString());
+ assertEquals("client", e.getSpanKind().toString());
+ }
+
+ // ---------- helpers ----------
+
+ private static SpanSnapshot snapshot(String service, String operation, String spanKind) {
+ return builder(service, operation, spanKind).build();
+ }
+
+ private static SnapshotBuilder builder(String service, String operation, String spanKind) {
+ return new SnapshotBuilder(service, operation, spanKind);
+ }
+
+ private static final class SnapshotBuilder {
+ private final String service;
+ private final String operation;
+ private final String spanKind;
+ private String[] peerTagPairs;
+ private long tagAndDuration = 0L;
+
+ SnapshotBuilder(String service, String operation, String spanKind) {
+ this.service = service;
+ this.operation = operation;
+ this.spanKind = spanKind;
+ }
+
+ SnapshotBuilder peerTags(String... namesAndValues) {
+ this.peerTagPairs = namesAndValues;
+ return this;
+ }
+
+ SpanSnapshot build() {
+ return new SpanSnapshot(
+ "resource",
+ service,
+ operation,
+ null,
+ "web",
+ (short) 200,
+ false,
+ true,
+ spanKind,
+ peerTagPairs,
+ null,
+ null,
+ null,
+ tagAndDuration);
+ }
+ }
+}
diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy
index 2972ffa2c18..81a476c67c8 100644
--- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy
+++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy
@@ -8,9 +8,8 @@ import datadog.metrics.impl.DDSketchHistograms
import datadog.trace.api.Config
import datadog.trace.api.WellKnownTags
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString
-import datadog.trace.common.metrics.AggregateMetric
+import datadog.trace.common.metrics.AggregateEntry
import datadog.trace.common.metrics.EventListener
-import datadog.trace.common.metrics.MetricKey
import datadog.trace.common.metrics.OkHttpSink
import datadog.trace.common.metrics.SerializingMetricWriter
import java.util.concurrent.CopyOnWriteArrayList
@@ -39,14 +38,12 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest {
sink
)
writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10))
- writer.add(
- new MetricKey("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null),
- new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5))
- )
- writer.add(
- new MetricKey("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null),
- new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9))
- )
+ def entry1 = AggregateEntry.of("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null)
+ entry1.aggregate.recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5))
+ writer.add(entry1)
+ def entry2 = AggregateEntry.of("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null)
+ entry2.aggregate.recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9))
+ writer.add(entry2)
writer.finishBucket()
then:
diff --git a/internal-api/src/main/java/datadog/trace/util/Hashtable.java b/internal-api/src/main/java/datadog/trace/util/Hashtable.java
new file mode 100644
index 00000000000..d7f49dcae00
--- /dev/null
+++ b/internal-api/src/main/java/datadog/trace/util/Hashtable.java
@@ -0,0 +1,553 @@
+package datadog.trace.util;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+/**
+ * Light weight simple Hashtable system that can be useful when HashMap would
+ * be unnecessarily heavy.
+ *
+ * Use cases include...
+ * - primitive keys
+ *
- primitive values
+ *
- multi-part keys
+ *
+ *
+ * Convenience classes are provided for lower key dimensions.
+ *
+ * For higher key dimensions, client code must implement its own class,
+ * but can still use the support class to ease the implementation complexity.
+ */
+public abstract class Hashtable {
+ /**
+ * Internal base class for entries. Stores the precomputed 64-bit keyHash and
+ * the chain-next pointer used to link colliding entries within a single bucket.
+ *
+ * Subclasses add the actual key field(s) and a {@code matches(...)} method
+ * tailored to their key arity. See {@link D1.Entry} and {@link D2.Entry}; for
+ * higher arities, client code can subclass this directly and use {@link Support}
+ * to drive the table mechanics.
+ */
+ public static abstract class Entry {
+ public final long keyHash;
+ Entry next = null;
+
+ protected Entry(long keyHash) {
+ this.keyHash = keyHash;
+ }
+
+ public final void setNext(TEntry next) {
+ this.next = next;
+ }
+
+ @SuppressWarnings("unchecked")
+ public final TEntry next() {
+ return (TEntry)this.next;
+ }
+ }
+
+ /**
+ * Single-key open hash table with chaining.
+ *
+ * The user supplies an {@link D1.Entry} subclass that carries the key and
+ * whatever value fields they want to mutate in place, then instantiates this
+ * class over that entry type. The main advantage over {@code HashMap}
+ * is that mutating an existing entry's value fields requires no allocation:
+ * call {@link #get} once and write directly to the returned entry's fields.
+ * For counter-style workloads this can be several times faster than
+ * {@code HashMap} and produces effectively zero GC pressure.
+ *
+ * Capacity is fixed at construction. The table does not resize, so the
+ * caller is responsible for choosing a capacity appropriate to the working
+ * set. Actual bucket-array length is rounded up to the next power of two.
+ *
+ *
Null keys are permitted; they collapse to a single bucket via the
+ * sentinel hash {@link Long#MIN_VALUE} defined in {@link D1.Entry#hash}.
+ *
+ *
Not thread-safe. Concurrent access (including mixing reads with
+ * writes) requires external synchronization.
+ *
+ * @param the key type
+ * @param the user's {@link D1.Entry D1.Entry<K>} subclass
+ */
+ public static final class D1> {
+ /**
+ * Abstract base for {@link D1} entries. Subclass to add value fields you
+ * wish to mutate in place after retrieving the entry via {@link D1#get}.
+ *
+ * The key is captured at construction and stored alongside its
+ * precomputed 64-bit hash. {@link #matches(Object)} uses
+ * {@link Objects#equals} by default; override if a different equality
+ * semantics is needed (e.g. reference equality for interned keys).
+ *
+ * @param the key type
+ */
+ public static abstract class Entry extends Hashtable.Entry {
+ final K key;
+
+ protected Entry(K key) {
+ super(hash(key));
+ this.key = key;
+ }
+
+ public boolean matches(Object key) {
+ return Objects.equals(this.key, key);
+ }
+
+ public static long hash(Object key) {
+ return (key == null ) ? Long.MIN_VALUE : key.hashCode();
+ }
+ }
+
+ private final Hashtable.Entry[] buckets;
+ private int size;
+
+ public D1(int capacity) {
+ this.buckets = Support.create(capacity);
+ this.size = 0;
+ }
+
+ public int size() {
+ return this.size;
+ }
+
+ @SuppressWarnings("unchecked")
+ public TEntry get(K key) {
+ long keyHash = D1.Entry.hash(key);
+ Hashtable.Entry[] thisBuckets = this.buckets;
+ for (Hashtable.Entry e = thisBuckets[Support.bucketIndex(thisBuckets, keyHash)]; e != null; e = e.next) {
+ if (e.keyHash == keyHash) {
+ TEntry te = (TEntry) e;
+ if (te.matches(key)) return te;
+ }
+ }
+ return null;
+ }
+
+ public TEntry remove(K key) {
+ long keyHash = D1.Entry.hash(key);
+
+ for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, keyHash); iter.hasNext(); ) {
+ TEntry curEntry = iter.next();
+
+ if (curEntry.matches(key)) {
+ iter.remove();
+ this.size -= 1;
+ return curEntry;
+ }
+ }
+
+ return null;
+ }
+
+ public void insert(TEntry newEntry) {
+ Hashtable.Entry[] thisBuckets = this.buckets;
+ int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash);
+
+ Hashtable.Entry curHead = thisBuckets[bucketIndex];
+ newEntry.setNext(curHead);
+ thisBuckets[bucketIndex] = newEntry;
+
+ this.size += 1;
+ }
+
+ public TEntry insertOrReplace(TEntry newEntry) {
+ Hashtable.Entry[] thisBuckets = this.buckets;
+
+ for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, newEntry.keyHash); iter.hasNext(); ) {
+ TEntry curEntry = iter.next();
+
+ if (curEntry.matches(newEntry.key)) {
+ iter.replace(newEntry);
+ return curEntry;
+ }
+ }
+
+ int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash);
+
+ Hashtable.Entry curHead = thisBuckets[bucketIndex];
+ newEntry.setNext(curHead);
+ thisBuckets[bucketIndex] = newEntry;
+ this.size += 1;
+ return null;
+ }
+
+ public void clear() {
+ Support.clear(this.buckets);
+ this.size = 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void forEach(Consumer super TEntry> consumer) {
+ Hashtable.Entry[] thisBuckets = this.buckets;
+ for (int i = 0; i < thisBuckets.length; i++) {
+ for (Hashtable.Entry e = thisBuckets[i]; e != null; e = e.next()) {
+ consumer.accept((TEntry) e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Two-key (composite-key) hash table with chaining.
+ *
+ * The user supplies a {@link D2.Entry} subclass carrying both key parts
+ * and any value fields. Compared to {@code HashMap} this avoids the
+ * per-lookup {@code Pair} (or record) allocation: both key parts are passed
+ * directly through {@link #get}, {@link #remove}, {@link #insert}, and
+ * {@link #insertOrReplace}. Combined with in-place value mutation, this
+ * makes {@code D2} substantially less GC-intensive than the equivalent
+ * {@code HashMap} for counter-style workloads.
+ *
+ * Capacity is fixed at construction; the table does not resize. Actual
+ * bucket-array length is rounded up to the next power of two.
+ *
+ *
Key parts are combined into a 64-bit hash via {@link LongHashingUtils};
+ * see {@link D2.Entry#hash(Object, Object)}.
+ *
+ *
Not thread-safe.
+ *
+ * @param first key type
+ * @param second key type
+ * @param the user's {@link D2.Entry D2.Entry<K1, K2>} subclass
+ */
+ public static final class D2> {
+ /**
+ * Abstract base for {@link D2} entries. Subclass to add value fields you
+ * wish to mutate in place.
+ *
+ * Both key parts are captured at construction and stored alongside their
+ * combined 64-bit hash. {@link #matches(Object, Object)} uses
+ * {@link Objects#equals} pairwise on the two parts.
+ *
+ * @param first key type
+ * @param second key type
+ */
+ public static abstract class Entry extends Hashtable.Entry {
+ final K1 key1;
+ final K2 key2;
+
+ protected Entry(K1 key1, K2 key2) {
+ super(hash(key1, key2));
+ this.key1 = key1;
+ this.key2 = key2;
+ }
+
+ public boolean matches(K1 key1, K2 key2) {
+ return Objects.equals(this.key1, key1) && Objects.equals(this.key2, key2);
+ }
+
+ public static long hash(Object key1, Object key2) {
+ return LongHashingUtils.hash(key1, key2);
+ }
+ }
+
+ private final Hashtable.Entry[] buckets;
+ private int size;
+
+ public D2(int capacity) {
+ this.buckets = Support.create(capacity);
+ this.size = 0;
+ }
+
+ public int size() {
+ return this.size;
+ }
+
+ @SuppressWarnings("unchecked")
+ public TEntry get(K1 key1, K2 key2) {
+ long keyHash = D2.Entry.hash(key1, key2);
+ Hashtable.Entry[] thisBuckets = this.buckets;
+ for (Hashtable.Entry e = thisBuckets[Support.bucketIndex(thisBuckets, keyHash)]; e != null; e = e.next) {
+ if (e.keyHash == keyHash) {
+ TEntry te = (TEntry) e;
+ if (te.matches(key1, key2)) return te;
+ }
+ }
+ return null;
+ }
+
+ public TEntry remove(K1 key1, K2 key2) {
+ long keyHash = D2.Entry.hash(key1, key2);
+
+ for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, keyHash); iter.hasNext(); ) {
+ TEntry curEntry = iter.next();
+
+ if (curEntry.matches(key1, key2)) {
+ iter.remove();
+ this.size -= 1;
+ return curEntry;
+ }
+ }
+
+ return null;
+ }
+
+ public void insert(TEntry newEntry) {
+ Hashtable.Entry[] thisBuckets = this.buckets;
+ int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash);
+
+ Hashtable.Entry curHead = thisBuckets[bucketIndex];
+ newEntry.setNext(curHead);
+ thisBuckets[bucketIndex] = newEntry;
+
+ this.size += 1;
+ }
+
+ public TEntry insertOrReplace(TEntry newEntry) {
+ Hashtable.Entry[] thisBuckets = this.buckets;
+
+ for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, newEntry.keyHash); iter.hasNext(); ) {
+ TEntry curEntry = iter.next();
+
+ if (curEntry.matches(newEntry.key1, newEntry.key2)) {
+ iter.replace(newEntry);
+ return curEntry;
+ }
+ }
+
+ int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash);
+
+ Hashtable.Entry curHead = thisBuckets[bucketIndex];
+ newEntry.setNext(curHead);
+ thisBuckets[bucketIndex] = newEntry;
+ this.size += 1;
+ return null;
+ }
+
+ public void clear() {
+ Support.clear(this.buckets);
+ this.size = 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void forEach(Consumer super TEntry> consumer) {
+ Hashtable.Entry[] thisBuckets = this.buckets;
+ for (int i = 0; i < thisBuckets.length; i++) {
+ for (Hashtable.Entry e = thisBuckets[i]; e != null; e = e.next()) {
+ consumer.accept((TEntry) e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Internal building blocks for hash-table operations.
+ *
+ * Used by {@link D1} and {@link D2}, and available to package code that
+ * wants to assemble its own higher-arity table (3+ key parts) without
+ * re-implementing the bucket-array mechanics. The typical recipe:
+ *
+ *
+ * - Subclass {@link Hashtable.Entry} directly, adding the key fields and
+ * a {@code matches(...)} method of your chosen arity.
+ *
- Allocate a backing array with {@link #create(int)}.
+ *
- Use {@link #bucketIndex(Object[], long)} for the bucket lookup,
+ * {@link #bucketIterator(Hashtable.Entry[], long)} for read-only chain
+ * walks, and {@link #mutatingBucketIterator(Hashtable.Entry[], long)}
+ * when you also need {@code remove} / {@code replace}.
+ *
- Clear with {@link #clear(Hashtable.Entry[])}.
+ *
+ *
+ * All bucket arrays produced by {@link #create(int)} have a power-of-two
+ * length, so {@link #bucketIndex(Object[], long)} can use a bit mask.
+ *
+ *
Methods on this class are package-private; the class itself is public
+ * only so that its nested {@link BucketIterator} can be referenced by
+ * callers in other packages.
+ */
+ public static final class Support {
+ public static final Hashtable.Entry[] create(int capacity) {
+ return new Entry[sizeFor(capacity)];
+ }
+
+ static final int sizeFor(int requestedCapacity) {
+ int pow;
+ for ( pow = 1; pow < requestedCapacity; pow *= 2 );
+ return pow;
+ }
+
+ public static final void clear(Hashtable.Entry[] buckets) {
+ Arrays.fill(buckets, null);
+ }
+
+ public static final BucketIterator bucketIterator(Hashtable.Entry[] buckets, long keyHash) {
+ return new BucketIterator(buckets, keyHash);
+ }
+
+ public static final MutatingBucketIterator mutatingBucketIterator(Hashtable.Entry[] buckets, long keyHash) {
+ return new MutatingBucketIterator(buckets, keyHash);
+ }
+
+ public static final int bucketIndex(Object[] buckets, long keyHash) {
+ return (int)(keyHash & buckets.length - 1);
+ }
+ }
+
+ /**
+ * Read-only iterator over entries in a single bucket whose {@code keyHash}
+ * matches a specific search hash. Cheaper than {@link MutatingBucketIterator}
+ * because it does not track the previous-node pointers required for
+ * splicing — use it when you only need to walk the chain.
+ *
+ * For {@code remove} or {@code replace} operations, use
+ * {@link MutatingBucketIterator} instead.
+ */
+ public static final class BucketIterator implements Iterator {
+ private final long keyHash;
+ private Hashtable.Entry nextEntry;
+
+ BucketIterator(Hashtable.Entry[] buckets, long keyHash) {
+ this.keyHash = keyHash;
+ Hashtable.Entry cur = buckets[Support.bucketIndex(buckets, keyHash)];
+ while (cur != null && cur.keyHash != keyHash) cur = cur.next;
+ this.nextEntry = cur;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.nextEntry != null;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public TEntry next() {
+ Hashtable.Entry cur = this.nextEntry;
+ if (cur == null) throw new NoSuchElementException("no next!");
+
+ Hashtable.Entry advance = cur.next;
+ while (advance != null && advance.keyHash != keyHash) advance = advance.next;
+ this.nextEntry = advance;
+
+ return (TEntry) cur;
+ }
+ }
+
+ /**
+ * Mutating iterator over entries in a single bucket whose {@code keyHash}
+ * matches a specific search hash. Supports {@link #remove()} and
+ * {@link #replace(Entry)} to splice the chain in place.
+ *
+ * Carries previous-node pointers for the current entry and the next-match
+ * entry so that {@code remove} and {@code replace} can fix up the chain in
+ * O(1) without re-walking from the bucket head. After {@code remove} or
+ * {@code replace}, iteration may continue with another {@link #next()}.
+ */
+ public static final class MutatingBucketIterator implements Iterator {
+ private final long keyHash;
+
+ private final Hashtable.Entry[] buckets;
+
+ /**
+ * The entry prior to the last entry returned by next
+ * Used for mutating operations
+ */
+ private Hashtable.Entry curPrevEntry;
+
+ /**
+ * The entry that was last returned by next
+ */
+ private Hashtable.Entry curEntry;
+
+ /**
+ * The entry prior to the next entry
+ */
+ private Hashtable.Entry nextPrevEntry;
+
+ /**
+ * The next entry to be returned by next
+ */
+ private Hashtable.Entry nextEntry;
+
+ MutatingBucketIterator(Hashtable.Entry[] buckets, long keyHash) {
+ this.buckets = buckets;
+ this.keyHash = keyHash;
+
+ int bucketIndex = Support.bucketIndex(buckets, keyHash);
+ Hashtable.Entry headEntry = this.buckets[bucketIndex];
+ if ( headEntry == null ) {
+ this.nextEntry = null;
+ this.nextPrevEntry = null;
+
+ this.curEntry = null;
+ this.curPrevEntry = null;
+ } else {
+ Hashtable.Entry prev, cur;
+ for ( prev = null, cur = headEntry; cur != null; prev = cur, cur = cur.next() ) {
+ if ( cur.keyHash == keyHash ) break;
+ }
+ this.nextPrevEntry = prev;
+ this.nextEntry = cur;
+
+ this.curEntry = null;
+ this.curPrevEntry = null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (this.nextEntry != null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public TEntry next() {
+ Hashtable.Entry curEntry = this.nextEntry;
+ if ( curEntry == null ) throw new NoSuchElementException("no next!");
+
+ this.curEntry = curEntry;
+ this.curPrevEntry = this.nextPrevEntry;
+
+ Hashtable.Entry prev, cur;
+ for ( prev = this.nextEntry, cur = this.nextEntry.next(); cur != null; prev = cur, cur = prev.next() ) {
+ if ( cur.keyHash == keyHash ) break;
+ }
+ this.nextPrevEntry = prev;
+ this.nextEntry = cur;
+
+ return (TEntry) curEntry;
+ }
+
+ @Override
+ public void remove() {
+ Hashtable.Entry oldCurEntry = this.curEntry;
+ if ( oldCurEntry == null ) throw new IllegalStateException();
+
+ this.setPrevNext(oldCurEntry.next());
+
+ // If the next match was directly after oldCurEntry, its predecessor is now
+ // curPrevEntry (oldCurEntry was just unlinked from the chain).
+ if ( this.nextPrevEntry == oldCurEntry ) {
+ this.nextPrevEntry = this.curPrevEntry;
+ }
+ this.curEntry = null;
+ }
+
+ public void replace(TEntry replacementEntry) {
+ Hashtable.Entry oldCurEntry = this.curEntry;
+ if ( oldCurEntry == null ) throw new IllegalStateException();
+
+ replacementEntry.setNext(oldCurEntry.next());
+ this.setPrevNext(replacementEntry);
+
+ // If the next match was directly after oldCurEntry, its predecessor is now
+ // the replacement entry (which took oldCurEntry's chain slot).
+ if ( this.nextPrevEntry == oldCurEntry ) {
+ this.nextPrevEntry = replacementEntry;
+ }
+ this.curEntry = replacementEntry;
+ }
+
+ void setPrevNext(Hashtable.Entry nextEntry) {
+ if ( this.curPrevEntry == null ) {
+ Hashtable.Entry[] buckets = this.buckets;
+ buckets[Support.bucketIndex(buckets, this.keyHash)] = nextEntry;
+ } else {
+ this.curPrevEntry.setNext(nextEntry);
+ }
+ }
+ }
+}
diff --git a/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java b/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java
new file mode 100644
index 00000000000..bc53bc4ecb6
--- /dev/null
+++ b/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java
@@ -0,0 +1,158 @@
+package datadog.trace.util;
+
+/**
+ * This class is intended to be a drop-in replacement for the hashing portions of java.util.Objects.
+ * This class provides more convenience methods for hashing primitives and includes overrides for
+ * hash that take many argument lengths to avoid var-args allocation.
+ */
+public final class LongHashingUtils {
+ private LongHashingUtils() {}
+
+ public static final long hashCodeX(Object obj) {
+ return obj == null ? Long.MIN_VALUE : obj.hashCode();
+ }
+
+ public static final long hash(boolean value) {
+ return Boolean.hashCode(value);
+ }
+
+ public static final long hash(char value) {
+ return Character.hashCode(value);
+ }
+
+ public static final long hash(byte value) {
+ return Byte.hashCode(value);
+ }
+
+ public static final long hash(short value) {
+ return Short.hashCode(value);
+ }
+
+ public static final long hash(int value) {
+ return Integer.hashCode(value);
+ }
+
+ public static final long hash(long value) {
+ return value;
+ }
+
+ public static final long hash(float value) {
+ return Float.hashCode(value);
+ }
+
+ public static final long hash(double value) {
+ return Double.doubleToRawLongBits(value);
+ }
+
+ public static final long hash(Object obj0, Object obj1) {
+ return hash(intHash(obj0), intHash(obj1));
+ }
+
+ public static final long hash(int hash0, int hash1) {
+ return 31L * hash0 + hash1;
+ }
+
+ private static final int intHash(Object obj) {
+ return obj == null ? 0 : obj.hashCode();
+ }
+
+ public static final long hash(Object obj0, Object obj1, Object obj2) {
+ return hash(intHash(obj0), intHash(obj1), intHash(obj2));
+ }
+
+ public static final long hash(long hash0, long hash1, long hash2) {
+ // DQH - Micro-optimizing, 31L * 31L will constant fold
+ // Since there are multiple execution ports for load & store,
+ // this will make good use of the core.
+ return 31L * 31L * hash0 + 31L * hash1 + hash2;
+ }
+
+ public static final long hash(Object obj0, Object obj1, Object obj2, Object obj3) {
+ return hash(intHash(obj0), intHash(obj1), intHash(obj2), intHash(obj3));
+ }
+
+ public static final long hash(int hash0, int hash1, int hash2, int hash3) {
+ // DQH - Micro-optimizing, 31L * 31L will constant fold
+ // Since there are multiple execution ports for load & store,
+ // this will make good use of the core.
+ return 31L * 31L * 31L * hash0 + 31L * 31L * hash1 + 31L * hash2 + hash3;
+ }
+
+ public static final long hash(Object obj0, Object obj1, Object obj2, Object obj3, Object obj4) {
+ return hash(intHash(obj0), intHash(obj1), intHash(obj2), intHash(obj3), intHash(obj4));
+ }
+
+ public static final long hash(int hash0, int hash1, int hash2, int hash3, int hash4) {
+ // DQH - Micro-optimizing, 31L * 31L will constant fold
+ // Since there are multiple execution ports for load & store,
+ // this will make good use of the core.
+ return 31L * 31L * 31L * 31L * hash0 + 31L * 31L * 31L * hash1 + 31L * 31L * hash2 + 31L * hash3 + hash4;
+ }
+
+ @Deprecated
+ public static final long hash(int[] hashes) {
+ long result = 0;
+ for (int hash : hashes) {
+ result = addToHash(result, hash);
+ }
+ return result;
+ }
+
+ public static final long addToHash(long hash, int value) {
+ return 31L * hash + value;
+ }
+
+ public static final long addToHash(long hash, Object obj) {
+ return addToHash(hash, intHash(obj));
+ }
+
+ public static final long addToHash(long hash, boolean value) {
+ return addToHash(hash, Boolean.hashCode(value));
+ }
+
+ public static final long addToHash(long hash, char value) {
+ return addToHash(hash, Character.hashCode(value));
+ }
+
+ public static final long addToHash(long hash, byte value) {
+ return addToHash(hash, Byte.hashCode(value));
+ }
+
+ public static final long addToHash(long hash, short value) {
+ return addToHash(hash, Short.hashCode(value));
+ }
+
+ public static final long addToHash(long hash, long value) {
+ return addToHash(hash, Long.hashCode(value));
+ }
+
+ public static final long addToHash(long hash, float value) {
+ return addToHash(hash, Float.hashCode(value));
+ }
+
+ public static final long addToHash(long hash, double value) {
+ return addToHash(hash, Double.hashCode(value));
+ }
+
+ public static final long hash(Iterable> objs) {
+ long result = 0;
+ for (Object obj : objs) {
+ result = addToHash(result, obj);
+ }
+ return result;
+ }
+
+ /**
+ * Calling this var-arg version can result in large amounts of allocation (see HashingBenchmark)
+ * Rather than calliing this method, add another override of hash that handles a larger number of
+ * arguments or use calls to addToHash.
+ */
+ @Deprecated
+ public static final long hash(Object[] objs) {
+ long result = 0;
+ for (Object obj : objs) {
+ result = addToHash(result, obj);
+ }
+ return result;
+ }
+}