diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java
similarity index 95%
rename from dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java
rename to dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java
index b9a2f7f8c54..b9d72eaf3ab 100644
--- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java
@@ -34,12 +34,12 @@
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
-public class ConflatingMetricsAggregatorBenchmark {
+public class ClientStatsAggregatorBenchmark {
private final DDAgentFeaturesDiscovery featuresDiscovery =
new FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
- private final ConflatingMetricsAggregator aggregator =
- new ConflatingMetricsAggregator(
+ private final ClientStatsAggregator aggregator =
+ new ClientStatsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java
similarity index 85%
rename from dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java
rename to dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java
index 02c6aaffc1a..06052c57ded 100644
--- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java
@@ -28,8 +28,8 @@
import org.openjdk.jmh.infra.Blackhole;
/**
- * Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances
- * instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link
+ * Parallels {@link ClientStatsAggregatorBenchmark} but uses real {@link DDSpan} instances instead
+ * of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link
* CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's
* dispatch.
*/
@@ -39,21 +39,21 @@
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
-public class ConflatingMetricsAggregatorDDSpanBenchmark {
+public class ClientStatsAggregatorDDSpanBenchmark {
private static final CoreTracer TRACER =
CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build();
private final DDAgentFeaturesDiscovery featuresDiscovery =
- new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
+ new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
- private final ConflatingMetricsAggregator aggregator =
- new ConflatingMetricsAggregator(
+ private final ClientStatsAggregator aggregator =
+ new ClientStatsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
HealthMetrics.NO_OP,
- new ConflatingMetricsAggregatorBenchmark.NullSink(),
+ new ClientStatsAggregatorBenchmark.NullSink(),
2048,
2048,
false);
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
index e2fda9fde47..225f03197e5 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
@@ -1,122 +1,106 @@
package datadog.trace.common.metrics;
-import static datadog.trace.api.Functions.UTF8_ENCODE;
-import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY;
-
-import datadog.trace.api.Pair;
-import datadog.trace.api.cache.DDCache;
-import datadog.trace.api.cache.DDCaches;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.util.Hashtable;
import datadog.trace.util.LongHashingUtils;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.function.Function;
+import java.util.Objects;
/**
* Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data
* {@link SerializingMetricWriter} writes to the wire) plus the mutable {@link AggregateMetric}.
*
- *
{@link #matches(SpanSnapshot)} compares the entry's stored UTF8 forms against the snapshot's
- * raw {@code CharSequence}/{@code String}/{@code String[]} fields via content-equality, so {@code
- * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry
- * instead of splitting.
+ *
UTF8 canonicalization runs through per-field {@link PropertyCardinalityHandler}s (and {@link
+ * TagCardinalityHandler}s for peer tags), so cardinality is capped per reporting interval. The
+ * critical property: hashing and matching happen after canonicalization, so when a field's
+ * cardinality budget is exhausted and overflow values collapse to a {@code blocked_by_tracer}
+ * sentinel, those values land in the same bucket and merge into a single entry rather than
+ * fragmenting.
+ *
+ *
The aggregator thread is the sole writer. {@link AggregateTable} holds a reusable {@link
+ * Canonical} scratch buffer so the canonicalization itself doesn't allocate per lookup; on a miss
+ * the buffer's references are copied into a fresh entry. On a hit nothing is allocated.
+ *
+ *
The handlers are reset on the aggregator thread every reporting cycle via {@link
+ * #resetCardinalityHandlers()}.
*
- *
The static UTF8 caches that used to live on {@code MetricKey} and {@code
- * ConflatingMetricsAggregator} are consolidated here.
+ *
Thread-safety: the cardinality handlers and {@link Canonical} are not thread-safe. Only
+ * the aggregator thread may call {@link Canonical#populate} or {@link #resetCardinalityHandlers}.
+ * Test code uses {@link #of} which constructs entries without touching the handlers.
*/
final class AggregateEntry extends Hashtable.Entry {
- // UTF8 caches consolidated from the previous MetricKey + ConflatingMetricsAggregator split.
- private static final DDCache RESOURCE_CACHE =
- DDCaches.newFixedSizeCache(32);
- private static final DDCache SERVICE_CACHE =
- DDCaches.newFixedSizeCache(32);
- private static final DDCache OPERATION_CACHE =
- DDCaches.newFixedSizeCache(64);
- private static final DDCache SERVICE_SOURCE_CACHE =
- DDCaches.newFixedSizeCache(16);
- private static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8);
- private static final DDCache SPAN_KIND_CACHE =
- DDCaches.newFixedSizeCache(16);
- private static final DDCache HTTP_METHOD_CACHE =
- DDCaches.newFixedSizeCache(8);
- private static final DDCache HTTP_ENDPOINT_CACHE =
- DDCaches.newFixedSizeCache(32);
- private static final DDCache GRPC_STATUS_CODE_CACHE =
- DDCaches.newFixedSizeCache(32);
-
- /**
- * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner
- * cache produces the "name:value" encoded form the serializer writes.
- */
- private static final DDCache<
- String, Pair, Function>>
- PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64);
-
- private static final Function<
- String, Pair, Function>>
- PEER_TAGS_CACHE_ADDER =
- key ->
- Pair.of(
- DDCaches.newFixedSizeCache(512),
- value -> UTF8BytesString.create(key + ":" + value));
-
- private final UTF8BytesString resource;
- private final UTF8BytesString service;
- private final UTF8BytesString operationName;
- private final UTF8BytesString serviceSource; // nullable
- private final UTF8BytesString type;
- private final UTF8BytesString spanKind;
- private final UTF8BytesString httpMethod; // nullable
- private final UTF8BytesString httpEndpoint; // nullable
- private final UTF8BytesString grpcStatusCode; // nullable
- private final short httpStatusCode;
- private final boolean synthetic;
- private final boolean traceRoot;
-
- // Peer tags carried in two forms: raw String[] for matches() against the snapshot's pairs,
- // and pre-encoded List ("name:value") for the serializer.
- private final String[] peerTagPairsRaw;
- private final List peerTags;
-
+ // Per-field cardinality limits. Identical to the prior DDCache sizes.
+ static final PropertyCardinalityHandler RESOURCE_HANDLER = new PropertyCardinalityHandler(32);
+ static final PropertyCardinalityHandler SERVICE_HANDLER = new PropertyCardinalityHandler(32);
+ static final PropertyCardinalityHandler OPERATION_HANDLER = new PropertyCardinalityHandler(64);
+ static final PropertyCardinalityHandler SERVICE_SOURCE_HANDLER =
+ new PropertyCardinalityHandler(16);
+ static final PropertyCardinalityHandler TYPE_HANDLER = new PropertyCardinalityHandler(8);
+ static final PropertyCardinalityHandler SPAN_KIND_HANDLER = new PropertyCardinalityHandler(16);
+ static final PropertyCardinalityHandler HTTP_METHOD_HANDLER = new PropertyCardinalityHandler(8);
+ static final PropertyCardinalityHandler HTTP_ENDPOINT_HANDLER =
+ new PropertyCardinalityHandler(32);
+ static final PropertyCardinalityHandler GRPC_STATUS_CODE_HANDLER =
+ new PropertyCardinalityHandler(32);
+
+ final UTF8BytesString resource;
+ final UTF8BytesString service;
+ final UTF8BytesString operationName;
+ final UTF8BytesString serviceSource; // nullable
+ final UTF8BytesString type;
+ final UTF8BytesString spanKind;
+ final UTF8BytesString httpMethod; // nullable
+ final UTF8BytesString httpEndpoint; // nullable
+ final UTF8BytesString grpcStatusCode; // nullable
+ final short httpStatusCode;
+ final boolean synthetic;
+ final boolean traceRoot;
+ final List peerTags;
final AggregateMetric aggregate;
- /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */
- private AggregateEntry(SpanSnapshot s, long keyHash, AggregateMetric aggregate) {
+ /** Field-bearing constructor used by both the hot path and the test factory. */
+ private AggregateEntry(
+ long keyHash,
+ UTF8BytesString resource,
+ UTF8BytesString service,
+ UTF8BytesString operationName,
+ UTF8BytesString serviceSource,
+ UTF8BytesString type,
+ UTF8BytesString spanKind,
+ UTF8BytesString httpMethod,
+ UTF8BytesString httpEndpoint,
+ UTF8BytesString grpcStatusCode,
+ short httpStatusCode,
+ boolean synthetic,
+ boolean traceRoot,
+ List peerTags,
+ AggregateMetric aggregate) {
super(keyHash);
- this.resource = canonicalize(RESOURCE_CACHE, s.resourceName);
- this.service = SERVICE_CACHE.computeIfAbsent(s.serviceName, UTF8_ENCODE);
- this.operationName = canonicalize(OPERATION_CACHE, s.operationName);
- this.serviceSource =
- s.serviceNameSource == null
- ? null
- : canonicalize(SERVICE_SOURCE_CACHE, s.serviceNameSource);
- this.type = canonicalize(TYPE_CACHE, s.spanType);
- this.spanKind = SPAN_KIND_CACHE.computeIfAbsent(s.spanKind, UTF8BytesString::create);
- this.httpMethod =
- s.httpMethod == null
- ? null
- : HTTP_METHOD_CACHE.computeIfAbsent(s.httpMethod, UTF8BytesString::create);
- this.httpEndpoint =
- s.httpEndpoint == null
- ? null
- : HTTP_ENDPOINT_CACHE.computeIfAbsent(s.httpEndpoint, UTF8BytesString::create);
- this.grpcStatusCode =
- s.grpcStatusCode == null
- ? null
- : GRPC_STATUS_CODE_CACHE.computeIfAbsent(s.grpcStatusCode, UTF8BytesString::create);
- this.httpStatusCode = s.httpStatusCode;
- this.synthetic = s.synthetic;
- this.traceRoot = s.traceRoot;
- this.peerTagPairsRaw = s.peerTagPairs;
- this.peerTags = materializePeerTags(s.peerTagPairs);
+ this.resource = resource;
+ this.service = service;
+ this.operationName = operationName;
+ this.serviceSource = serviceSource;
+ this.type = type;
+ this.spanKind = spanKind;
+ this.httpMethod = httpMethod;
+ this.httpEndpoint = httpEndpoint;
+ this.grpcStatusCode = grpcStatusCode;
+ this.httpStatusCode = httpStatusCode;
+ this.synthetic = synthetic;
+ this.traceRoot = traceRoot;
+ this.peerTags = peerTags;
this.aggregate = aggregate;
}
- /** Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. */
+ /**
+ * Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. Bypasses
+ * the cardinality handlers so tests don't pollute their state -- {@link UTF8BytesString}s are
+ * created directly. Content-equal entries from {@link Canonical#toEntry} still {@link #equals} an
+ * entry built via {@code of(...)}.
+ */
static AggregateEntry of(
CharSequence resource,
CharSequence service,
@@ -131,76 +115,106 @@ static AggregateEntry of(
CharSequence httpMethod,
CharSequence httpEndpoint,
CharSequence grpcStatusCode) {
- String[] rawPairs = peerTagsToRawPairs(peerTags);
- SpanSnapshot synthetic_snapshot =
- new SpanSnapshot(
- resource,
- service == null ? null : service.toString(),
- operationName,
- serviceSource,
- type,
+ UTF8BytesString resourceUtf = createUtf8(resource);
+ UTF8BytesString serviceUtf = createUtf8(service);
+ UTF8BytesString operationNameUtf = createUtf8(operationName);
+ UTF8BytesString serviceSourceUtf = serviceSource == null ? null : createUtf8(serviceSource);
+ UTF8BytesString typeUtf = createUtf8(type);
+ UTF8BytesString spanKindUtf = createUtf8(spanKind);
+ UTF8BytesString httpMethodUtf = httpMethod == null ? null : createUtf8(httpMethod);
+ UTF8BytesString httpEndpointUtf = httpEndpoint == null ? null : createUtf8(httpEndpoint);
+ UTF8BytesString grpcUtf = grpcStatusCode == null ? null : createUtf8(grpcStatusCode);
+ List peerTagsList = peerTags == null ? Collections.emptyList() : peerTags;
+ long keyHash =
+ hashOf(
+ resourceUtf,
+ serviceUtf,
+ operationNameUtf,
+ serviceSourceUtf,
+ typeUtf,
+ spanKindUtf,
+ httpMethodUtf,
+ httpEndpointUtf,
+ grpcUtf,
(short) httpStatusCode,
synthetic,
traceRoot,
- spanKind == null ? null : spanKind.toString(),
- rawPairs,
- httpMethod == null ? null : httpMethod.toString(),
- httpEndpoint == null ? null : httpEndpoint.toString(),
- grpcStatusCode == null ? null : grpcStatusCode.toString(),
- 0L);
+ peerTagsList);
return new AggregateEntry(
- synthetic_snapshot, hashOf(synthetic_snapshot), new AggregateMetric());
- }
-
- /** Construct from a snapshot at consumer-thread miss time. */
- static AggregateEntry forSnapshot(SpanSnapshot s, AggregateMetric aggregate) {
- return new AggregateEntry(s, hashOf(s), aggregate);
+ keyHash,
+ resourceUtf,
+ serviceUtf,
+ operationNameUtf,
+ serviceSourceUtf,
+ typeUtf,
+ spanKindUtf,
+ httpMethodUtf,
+ httpEndpointUtf,
+ grpcUtf,
+ (short) httpStatusCode,
+ synthetic,
+ traceRoot,
+ peerTagsList,
+ new AggregateMetric());
}
- boolean matches(SpanSnapshot s) {
- return httpStatusCode == s.httpStatusCode
- && synthetic == s.synthetic
- && traceRoot == s.traceRoot
- && contentEquals(resource, s.resourceName)
- && stringContentEquals(service, s.serviceName)
- && contentEquals(operationName, s.operationName)
- && contentEquals(serviceSource, s.serviceNameSource)
- && contentEquals(type, s.spanType)
- && stringContentEquals(spanKind, s.spanKind)
- && Arrays.equals(peerTagPairsRaw, s.peerTagPairs)
- && stringContentEquals(httpMethod, s.httpMethod)
- && stringContentEquals(httpEndpoint, s.httpEndpoint)
- && stringContentEquals(grpcStatusCode, s.grpcStatusCode);
+ /**
+ * Resets every cardinality handler's working set. Must be called on the aggregator thread.
+ * Existing entries continue to hold their previously-issued {@link UTF8BytesString} references;
+ * matches via content-equality so snapshots delivered after a reset still resolve to the existing
+ * entries.
+ */
+ static void resetCardinalityHandlers() {
+ RESOURCE_HANDLER.reset();
+ SERVICE_HANDLER.reset();
+ OPERATION_HANDLER.reset();
+ SERVICE_SOURCE_HANDLER.reset();
+ TYPE_HANDLER.reset();
+ SPAN_KIND_HANDLER.reset();
+ HTTP_METHOD_HANDLER.reset();
+ HTTP_ENDPOINT_HANDLER.reset();
+ GRPC_STATUS_CODE_HANDLER.reset();
+ PeerTagSchema.resetAll();
}
/**
- * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no
- * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's
- * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes
- * to the same bucket the snapshot itself looks up.
- *
- * Hashes are content-stable across {@code String} / {@code UTF8BytesString}: {@link
- * UTF8BytesString#hashCode()} returns the underlying {@code String}'s hash.
+ * 64-bit lookup hash, computed over UTF8-encoded fields so that cardinality-blocked values (which
+ * all canonicalize to the same sentinel {@link UTF8BytesString}) collide in the same bucket.
+ * {@link UTF8BytesString#hashCode()} returns the underlying String hash, so entries built via
+ * {@link #of} produce the same hash as entries built from a snapshot with matching content.
*/
- static long hashOf(SpanSnapshot s) {
+ static long hashOf(
+ UTF8BytesString resource,
+ UTF8BytesString service,
+ UTF8BytesString operationName,
+ UTF8BytesString serviceSource,
+ UTF8BytesString type,
+ UTF8BytesString spanKind,
+ UTF8BytesString httpMethod,
+ UTF8BytesString httpEndpoint,
+ UTF8BytesString grpcStatusCode,
+ short httpStatusCode,
+ boolean synthetic,
+ boolean traceRoot,
+ List peerTags) {
long h = 0;
- h = LongHashingUtils.addToHash(h, s.resourceName);
- h = LongHashingUtils.addToHash(h, s.serviceName);
- h = LongHashingUtils.addToHash(h, s.operationName);
- h = LongHashingUtils.addToHash(h, s.serviceNameSource);
- h = LongHashingUtils.addToHash(h, s.spanType);
- h = LongHashingUtils.addToHash(h, s.httpStatusCode);
- h = LongHashingUtils.addToHash(h, s.synthetic);
- h = LongHashingUtils.addToHash(h, s.traceRoot);
- h = LongHashingUtils.addToHash(h, s.spanKind);
- if (s.peerTagPairs != null) {
- for (String p : s.peerTagPairs) {
- h = LongHashingUtils.addToHash(h, p);
- }
+ h = LongHashingUtils.addToHash(h, resource);
+ h = LongHashingUtils.addToHash(h, service);
+ h = LongHashingUtils.addToHash(h, operationName);
+ h = LongHashingUtils.addToHash(h, serviceSource);
+ h = LongHashingUtils.addToHash(h, type);
+ h = LongHashingUtils.addToHash(h, httpStatusCode);
+ h = LongHashingUtils.addToHash(h, synthetic);
+ h = LongHashingUtils.addToHash(h, traceRoot);
+ h = LongHashingUtils.addToHash(h, spanKind);
+ // indexed iteration -- avoids the iterator allocation a for-each over a List would do
+ int peerTagCount = peerTags.size();
+ for (int i = 0; i < peerTagCount; i++) {
+ h = LongHashingUtils.addToHash(h, peerTags.get(i));
}
- h = LongHashingUtils.addToHash(h, s.httpMethod);
- h = LongHashingUtils.addToHash(h, s.httpEndpoint);
- h = LongHashingUtils.addToHash(h, s.grpcStatusCode);
+ h = LongHashingUtils.addToHash(h, httpMethod);
+ h = LongHashingUtils.addToHash(h, httpEndpoint);
+ h = LongHashingUtils.addToHash(h, grpcStatusCode);
return h;
}
@@ -259,8 +273,8 @@ List getPeerTags() {
/**
* Equality on the 13 label fields (not on the aggregate). Used only by test mock matchers; the
- * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link #matches(SpanSnapshot)}
- * and never calls {@code equals}.
+ * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link Canonical#matches} and
+ * never calls {@code equals}.
*/
@Override
public boolean equals(Object o) {
@@ -270,16 +284,16 @@ public boolean equals(Object o) {
return httpStatusCode == that.httpStatusCode
&& synthetic == that.synthetic
&& traceRoot == that.traceRoot
- && java.util.Objects.equals(resource, that.resource)
- && java.util.Objects.equals(service, that.service)
- && java.util.Objects.equals(operationName, that.operationName)
- && java.util.Objects.equals(serviceSource, that.serviceSource)
- && java.util.Objects.equals(type, that.type)
- && java.util.Objects.equals(spanKind, that.spanKind)
+ && Objects.equals(resource, that.resource)
+ && Objects.equals(service, that.service)
+ && Objects.equals(operationName, that.operationName)
+ && Objects.equals(serviceSource, that.serviceSource)
+ && Objects.equals(type, that.type)
+ && Objects.equals(spanKind, that.spanKind)
&& peerTags.equals(that.peerTags)
- && java.util.Objects.equals(httpMethod, that.httpMethod)
- && java.util.Objects.equals(httpEndpoint, that.httpEndpoint)
- && java.util.Objects.equals(grpcStatusCode, that.grpcStatusCode);
+ && Objects.equals(httpMethod, that.httpMethod)
+ && Objects.equals(httpEndpoint, that.httpEndpoint)
+ && Objects.equals(grpcStatusCode, that.grpcStatusCode);
}
@Override
@@ -287,82 +301,175 @@ public int hashCode() {
return (int) keyHash;
}
- // ----- helpers -----
-
- private static UTF8BytesString canonicalize(
- DDCache cache, CharSequence charSeq) {
- if (charSeq == null) {
- return EMPTY;
- }
- if (charSeq instanceof UTF8BytesString) {
- return (UTF8BytesString) charSeq;
+ /**
+ * Reusable scratch buffer for canonicalizing a {@link SpanSnapshot} into UTF8 fields, computing
+ * its lookup hash, comparing against existing entries, and building a fresh entry on miss.
+ *
+ * One instance is held by an {@link AggregateTable} and reused on every {@code findOrInsert}
+ * call. Single-threaded use only. Fields are deliberately mutable -- this is a hot-path scratch
+ * area, not a value class.
+ */
+ static final class Canonical {
+ UTF8BytesString resource;
+ UTF8BytesString service;
+ UTF8BytesString operationName;
+ UTF8BytesString serviceSource; // nullable
+ UTF8BytesString type;
+ UTF8BytesString spanKind;
+ UTF8BytesString httpMethod; // nullable
+ UTF8BytesString httpEndpoint; // nullable
+ UTF8BytesString grpcStatusCode; // nullable
+ short httpStatusCode;
+ boolean synthetic;
+ boolean traceRoot;
+
+ /**
+ * Reusable buffer of canonicalized peer-tag UTF8 forms. Cleared and refilled in {@link
+ * #populate}; on miss, {@link #toEntry} copies it into an immutable list for the entry to own.
+ * Zero allocation on the hit path.
+ */
+ final ArrayList peerTagsBuffer = new ArrayList<>(4);
+
+ long keyHash;
+
+ /** Canonicalize all fields from {@code s} through the handlers into this buffer. */
+ void populate(SpanSnapshot s) {
+ this.resource = registerOrEmpty(RESOURCE_HANDLER, s.resourceName);
+ this.service = registerOrEmpty(SERVICE_HANDLER, s.serviceName);
+ this.operationName = registerOrEmpty(OPERATION_HANDLER, s.operationName);
+ this.serviceSource =
+ s.serviceNameSource == null ? null : SERVICE_SOURCE_HANDLER.register(s.serviceNameSource);
+ this.type = registerOrEmpty(TYPE_HANDLER, s.spanType);
+ this.spanKind = registerOrEmpty(SPAN_KIND_HANDLER, s.spanKind);
+ this.httpMethod = s.httpMethod == null ? null : HTTP_METHOD_HANDLER.register(s.httpMethod);
+ this.httpEndpoint =
+ s.httpEndpoint == null ? null : HTTP_ENDPOINT_HANDLER.register(s.httpEndpoint);
+ this.grpcStatusCode =
+ s.grpcStatusCode == null ? null : GRPC_STATUS_CODE_HANDLER.register(s.grpcStatusCode);
+ this.httpStatusCode = s.httpStatusCode;
+ this.synthetic = s.synthetic;
+ this.traceRoot = s.traceRoot;
+ populatePeerTags(s.peerTagSchema, s.peerTagValues);
+ this.keyHash =
+ hashOf(
+ resource,
+ service,
+ operationName,
+ serviceSource,
+ type,
+ spanKind,
+ httpMethod,
+ httpEndpoint,
+ grpcStatusCode,
+ httpStatusCode,
+ synthetic,
+ traceRoot,
+ peerTagsBuffer);
}
- return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create);
- }
- /** UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. */
- private static boolean contentEquals(UTF8BytesString a, CharSequence b) {
- if (a == null) {
- return b == null;
- }
- if (b == null) {
- return false;
- }
- // UTF8BytesString.toString() returns the underlying String -- O(1), no allocation.
- String aStr = a.toString();
- if (b instanceof String) {
- return aStr.equals(b);
- }
- if (b instanceof UTF8BytesString) {
- return aStr.equals(b.toString());
+ /**
+ * Fills {@link #peerTagsBuffer} with canonical UTF8 forms, applying {@code schema.handler(i)}
+ * to each non-null value at the same index. No allocation when the schema/values are absent or
+ * all values are null (buffer is just cleared).
+ */
+ private void populatePeerTags(PeerTagSchema schema, String[] values) {
+ peerTagsBuffer.clear();
+ if (schema == null || values == null) {
+ return;
+ }
+ int n = schema.size();
+ for (int i = 0; i < n; i++) {
+ String v = values[i];
+ if (v != null) {
+ peerTagsBuffer.add(schema.handler(i).register(v));
+ }
+ }
}
- return aStr.contentEquals(b);
- }
- private static boolean stringContentEquals(UTF8BytesString a, String b) {
- if (a == null) {
- return b == null;
+ /**
+ * Whether this canonicalized snapshot matches the given entry. Compares UTF8 fields via
+ * content-equality (so an entry surviving a handler reset still matches a freshly-canonicalized
+ * snapshot of the same content).
+ */
+ boolean matches(AggregateEntry e) {
+ return httpStatusCode == e.httpStatusCode
+ && synthetic == e.synthetic
+ && traceRoot == e.traceRoot
+ && Objects.equals(resource, e.resource)
+ && Objects.equals(service, e.service)
+ && Objects.equals(operationName, e.operationName)
+ && Objects.equals(serviceSource, e.serviceSource)
+ && Objects.equals(type, e.type)
+ && Objects.equals(spanKind, e.spanKind)
+ && peerTagsEqual(peerTagsBuffer, e.peerTags)
+ && Objects.equals(httpMethod, e.httpMethod)
+ && Objects.equals(httpEndpoint, e.httpEndpoint)
+ && Objects.equals(grpcStatusCode, e.grpcStatusCode);
}
- return b != null && a.toString().equals(b);
- }
- private static List materializePeerTags(String[] pairs) {
- if (pairs == null || pairs.length == 0) {
- return Collections.emptyList();
- }
- if (pairs.length == 2) {
- return Collections.singletonList(encodePeerTag(pairs[0], pairs[1]));
+ /** Indexed list comparison -- avoids the iterator a {@code List.equals} would allocate. */
+ private static boolean peerTagsEqual(List a, List b) {
+ int n = a.size();
+ if (n != b.size()) {
+ return false;
+ }
+ for (int i = 0; i < n; i++) {
+ if (!a.get(i).equals(b.get(i))) {
+ return false;
+ }
+ }
+ return true;
}
- List tags = new ArrayList<>(pairs.length / 2);
- for (int i = 0; i < pairs.length; i += 2) {
- tags.add(encodePeerTag(pairs[i], pairs[i + 1]));
+
+ /**
+ * Build a new entry from the currently-populated canonical fields. The peer-tag buffer is
+ * copied into an immutable list so the entry's reference stays stable across subsequent {@link
+ * #populate} calls.
+ */
+ AggregateEntry toEntry(AggregateMetric aggregate) {
+ List snapshottedPeerTags;
+ int n = peerTagsBuffer.size();
+ if (n == 0) {
+ snapshottedPeerTags = Collections.emptyList();
+ } else if (n == 1) {
+ snapshottedPeerTags = Collections.singletonList(peerTagsBuffer.get(0));
+ } else {
+ snapshottedPeerTags = new ArrayList<>(peerTagsBuffer);
+ }
+ return new AggregateEntry(
+ keyHash,
+ resource,
+ service,
+ operationName,
+ serviceSource,
+ type,
+ spanKind,
+ httpMethod,
+ httpEndpoint,
+ grpcStatusCode,
+ httpStatusCode,
+ synthetic,
+ traceRoot,
+ snapshottedPeerTags,
+ aggregate);
}
- return tags;
}
- private static UTF8BytesString encodePeerTag(String name, String value) {
- final Pair, Function>
- cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER);
- return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight());
+ // ----- helpers -----
+
+ private static UTF8BytesString registerOrEmpty(
+ PropertyCardinalityHandler handler, CharSequence value) {
+ return value == null ? UTF8BytesString.EMPTY : handler.register(value);
}
- /**
- * Inverse of {@link #materializePeerTags}: takes pre-encoded UTF8 peer tags and recovers the raw
- * {@code [name0, value0, name1, value1, ...]} pairs. Used by the test factory {@link #of}, not by
- * the hot path.
- */
- private static String[] peerTagsToRawPairs(List peerTags) {
- if (peerTags == null || peerTags.isEmpty()) {
- return null;
+ /** Direct {@link UTF8BytesString} creation that bypasses the cardinality handlers. */
+ private static UTF8BytesString createUtf8(CharSequence cs) {
+ if (cs == null) {
+ return UTF8BytesString.EMPTY;
}
- String[] pairs = new String[peerTags.size() * 2];
- int i = 0;
- for (UTF8BytesString peerTag : peerTags) {
- String s = peerTag.toString();
- int colon = s.indexOf(':');
- pairs[i++] = colon < 0 ? s : s.substring(0, colon);
- pairs[i++] = colon < 0 ? "" : s.substring(colon + 1);
+ if (cs instanceof UTF8BytesString) {
+ return (UTF8BytesString) cs;
}
- return pairs;
+ return UTF8BytesString.create(cs.toString());
}
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java
index 08300eab296..38d45ef5e85 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java
@@ -4,13 +4,14 @@
import java.util.function.Consumer;
/**
- * Consumer-side {@link AggregateMetric} store, keyed on the raw fields of a {@link SpanSnapshot}.
+ * Consumer-side {@link AggregateMetric} store, keyed on the canonical UTF8-encoded labels of a
+ * {@link SpanSnapshot}.
*
- * Replaces the prior {@code LRUCache}. The win is on the
- * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise
- * {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache
- * lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) live on the {@link
- * AggregateEntry} itself and are built once per unique key at insert time.
+ * {@link #findOrInsert} canonicalizes the snapshot's fields through the cardinality handlers (so
+ * cardinality-blocked values share a sentinel and collapse into one entry) and then computes the
+ * lookup hash from that canonical form. Canonicalization runs into a reusable {@link
+ * AggregateEntry.Canonical} scratch buffer; on a hit nothing is allocated, on a miss the buffer's
+ * references are copied into a fresh entry and the buffer is overwritten on the next call.
*
*
Not thread-safe. The aggregator thread is the sole writer; {@link #clear()} must be
* routed through the inbox rather than called from arbitrary threads.
@@ -19,6 +20,7 @@ final class AggregateTable {
private final Hashtable.Entry[] buckets;
private final int maxAggregates;
+ private final AggregateEntry.Canonical canonical = new AggregateEntry.Canonical();
private int size;
AggregateTable(int maxAggregates) {
@@ -40,12 +42,13 @@ boolean isEmpty() {
* the caller should drop the data point in that case.
*/
AggregateMetric findOrInsert(SpanSnapshot snapshot) {
- long keyHash = AggregateEntry.hashOf(snapshot);
+ canonical.populate(snapshot);
+ long keyHash = canonical.keyHash;
int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash);
for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) {
if (e.keyHash == keyHash) {
AggregateEntry candidate = (AggregateEntry) e;
- if (candidate.matches(snapshot)) {
+ if (canonical.matches(candidate)) {
return candidate.aggregate;
}
}
@@ -53,7 +56,7 @@ AggregateMetric findOrInsert(SpanSnapshot snapshot) {
if (size >= maxAggregates && !evictOneStale()) {
return null;
}
- AggregateEntry entry = AggregateEntry.forSnapshot(snapshot, new AggregateMetric());
+ AggregateEntry entry = canonical.toEntry(new AggregateMetric());
entry.setNext(buckets[bucketIndex]);
buckets[bucketIndex] = entry;
size++;
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
index b4fc59d5a1d..8fe25288acd 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java
@@ -66,10 +66,6 @@ final class Aggregator implements Runnable {
this.healthMetrics = healthMetrics;
}
- public void clearAggregates() {
- this.aggregates.clear();
- }
-
@Override
public void run() {
Thread currentThread = Thread.currentThread();
@@ -149,6 +145,9 @@ private void report(long when, SignalItem signal) {
}
dirty = false;
}
+ // Reset cardinality handlers each report cycle so the per-field budgets refresh.
+ // Safe to call on this (aggregator) thread; handlers are HashMap-based and not thread-safe.
+ AggregateEntry.resetCardinalityHandlers();
signal.complete();
if (skipped) {
log.debug("skipped metrics reporting because no points have changed");
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java
similarity index 77%
rename from dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
rename to dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java
index c675fcb23c4..d08ce611100 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java
@@ -2,10 +2,8 @@
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT;
import static datadog.trace.api.DDSpanTypes.RPC;
-import static datadog.trace.api.DDTags.BASE_SERVICE;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD;
-import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG;
import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR;
@@ -40,14 +38,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class ConflatingMetricsAggregator implements MetricsAggregator, EventListener {
+public final class ClientStatsAggregator implements MetricsAggregator, EventListener {
- private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class);
+ private static final Logger log = LoggerFactory.getLogger(ClientStatsAggregator.class);
private static final Map DEFAULT_HEADERS =
Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);
- private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
+ private static final String SYNTHETICS_ORIGIN = "synthetics";
private static final SpanKindFilter METRICS_ELIGIBLE_KINDS =
SpanKindFilter.builder()
@@ -76,7 +74,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private volatile AgentTaskScheduler.Scheduled> cancellation;
- public ConflatingMetricsAggregator(
+ public ClientStatsAggregator(
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics) {
@@ -97,7 +95,7 @@ public ConflatingMetricsAggregator(
config.isTraceResourceRenamingEnabled());
}
- ConflatingMetricsAggregator(
+ ClientStatsAggregator(
WellKnownTags wellKnownTags,
Set ignoredResources,
DDAgentFeaturesDiscovery features,
@@ -119,7 +117,7 @@ public ConflatingMetricsAggregator(
includeEndpointInMetrics);
}
- ConflatingMetricsAggregator(
+ ClientStatsAggregator(
WellKnownTags wellKnownTags,
Set ignoredResources,
DDAgentFeaturesDiscovery features,
@@ -143,7 +141,7 @@ public ConflatingMetricsAggregator(
includeEndpointInMetrics);
}
- ConflatingMetricsAggregator(
+ ClientStatsAggregator(
Set ignoredResources,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
@@ -244,6 +242,14 @@ public boolean publish(List extends CoreSpan>> trace) {
boolean forceKeep = false;
int counted = 0;
if (features.supportsMetrics()) {
+ // Sync the peer-aggregation schema once per trace; peer-tag configuration is stable for
+ // the duration of a single trace publish in production (DDAgentFeaturesDiscovery returns
+ // the same Set instance until remote-config reconfiguration).
+ Set eligiblePeerTags = features.peerTags();
+ PeerTagSchema peerAggSchema =
+ (eligiblePeerTags == null || eligiblePeerTags.isEmpty())
+ ? null
+ : PeerTagSchema.currentSyncedTo(eligiblePeerTags);
for (CoreSpan> span : trace) {
boolean isTopLevel = span.isTopLevel();
if (shouldComputeMetric(span, isTopLevel)) {
@@ -254,7 +260,7 @@ public boolean publish(List extends CoreSpan>> trace) {
break;
}
counted++;
- forceKeep |= publish(span, isTopLevel);
+ forceKeep |= publish(span, isTopLevel, peerAggSchema);
}
}
healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep);
@@ -269,7 +275,7 @@ private boolean shouldComputeMetric(CoreSpan> span, boolean isTopLevel) {
&& span.getDurationNano() > 0;
}
- private boolean publish(CoreSpan> span, boolean isTopLevel) {
+ private boolean publish(CoreSpan> span, boolean isTopLevel, PeerTagSchema peerAggSchema) {
// Extract HTTP method and endpoint only if the feature is enabled
String httpMethod = null;
String httpEndpoint = null;
@@ -286,14 +292,26 @@ private boolean publish(CoreSpan> span, boolean isTopLevel) {
Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE);
grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null;
}
- // CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString
- // tag values don't trigger a ClassCastException on the String assignment.
- final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString();
+ // DDSpan resolves this from a cached span.kind ordinal via a small lookup array, skipping a
+ // tag-map lookup. Other CoreSpan impls fall back to the tag map by default.
+ String spanKind = span.getSpanKindString();
+ if (spanKind == null) {
+ spanKind = "";
+ }
boolean error = span.getError() > 0;
long tagAndDuration =
span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L);
+ PeerTagSchema peerTagSchema = peerTagSchemaFor(span, peerAggSchema);
+ String[] peerTagValues =
+ peerTagSchema == null ? null : capturePeerTagValues(span, peerTagSchema);
+ if (peerTagValues == null) {
+ // capture returned no non-null values -- drop the schema reference so the consumer doesn't
+ // bother iterating an all-null array.
+ peerTagSchema = null;
+ }
+
SpanSnapshot snapshot =
new SpanSnapshot(
span.getResourceName(),
@@ -305,7 +323,8 @@ private boolean publish(CoreSpan> span, boolean isTopLevel) {
isSynthetic(span),
span.getParentId() == 0,
spanKind,
- extractPeerTagPairs(span),
+ peerTagSchema,
+ peerTagValues,
httpMethod,
httpEndpoint,
grpcStatusCode,
@@ -317,43 +336,45 @@ private boolean publish(CoreSpan> span, boolean isTopLevel) {
return error;
}
- private String[] extractPeerTagPairs(CoreSpan> span) {
- if (span.isKind(PEER_AGGREGATION_KINDS)) {
- final Set eligiblePeerTags = features.peerTags();
- String[] pairs = null;
- int count = 0;
- for (String peerTag : eligiblePeerTags) {
- Object value = span.unsafeGetTag(peerTag);
- if (value != null) {
- if (pairs == null) {
- // pairs are flattened [name, value, ...]; size for worst case
- pairs = new String[eligiblePeerTags.size() * 2];
- }
- pairs[count++] = peerTag;
- pairs[count++] = value.toString();
+ /**
+ * Picks the peer-tag schema for a span. The {@code peerAggSchema} argument is the per-trace
+ * cached schema (synced from {@code features.peerTags()} once in {@link #publish(List)}); it's
+ * {@code null} when no peer tags are configured. For internal-kind spans the static {@link
+ * PeerTagSchema#INTERNAL} schema is used regardless.
+ */
+ private static PeerTagSchema peerTagSchemaFor(CoreSpan> span, PeerTagSchema peerAggSchema) {
+ if (peerAggSchema != null && span.isKind(PEER_AGGREGATION_KINDS)) {
+ return peerAggSchema;
+ }
+ if (span.isKind(INTERNAL_KIND)) {
+ return PeerTagSchema.INTERNAL;
+ }
+ return null;
+ }
+
+ /**
+ * Captures the span's peer tag values into a {@code String[]} parallel to {@code schema.names}.
+ * Returns {@code null} when none of the configured peer tags are set on the span.
+ */
+ private static String[] capturePeerTagValues(CoreSpan> span, PeerTagSchema schema) {
+ String[] names = schema.names;
+ int n = names.length;
+ String[] values = null;
+ for (int i = 0; i < n; i++) {
+ Object v = span.unsafeGetTag(names[i]);
+ if (v != null) {
+ if (values == null) {
+ values = new String[n];
}
- }
- if (pairs == null) {
- return null;
- }
- if (count < pairs.length) {
- String[] trimmed = new String[count];
- System.arraycopy(pairs, 0, trimmed, 0, count);
- return trimmed;
- }
- return pairs;
- } else if (span.isKind(INTERNAL_KIND)) {
- // in this case only the base service should be aggregated if present
- final Object baseService = span.unsafeGetTag(BASE_SERVICE);
- if (baseService != null) {
- return new String[] {BASE_SERVICE, baseService.toString()};
+ values[i] = v.toString();
}
}
- return null;
+ return values;
}
private static boolean isSynthetic(CoreSpan> span) {
- return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
+ CharSequence origin = span.getOrigin();
+ return origin != null && SYNTHETICS_ORIGIN.contentEquals(origin);
}
public void stop() {
@@ -399,17 +420,16 @@ private void disable() {
if (!features.supportsMetrics()) {
log.debug("Disabling metric reporting because an agent downgrade was detected");
// Route the clear through the inbox so the aggregator thread is the only writer.
- // AggregateTable is not thread-safe; calling clearAggregates() directly from this thread
- // would race with Drainer.accept on the aggregator thread.
+ // AggregateTable is not thread-safe; clearing it directly from this thread would race
+ // with Drainer.accept on the aggregator thread.
inbox.offer(CLEAR);
}
}
- private static final class ReportTask
- implements AgentTaskScheduler.Task {
+ private static final class ReportTask implements AgentTaskScheduler.Task {
@Override
- public void run(ConflatingMetricsAggregator target) {
+ public void run(ClientStatsAggregator target) {
target.report();
}
}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java
index 09464310113..b9530871763 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java
@@ -15,7 +15,7 @@ public static MetricsAggregator createMetricsAggregator(
HealthMetrics healthMetrics) {
if (config.isTracerMetricsEnabled()) {
log.debug("tracer metrics enabled");
- return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics);
+ return new ClientStatsAggregator(config, sharedCommunicationObjects, healthMetrics);
}
log.debug("tracer metrics disabled");
return NoOpMetricsAggregator.INSTANCE;
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java
new file mode 100644
index 00000000000..4efaec4a0a2
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java
@@ -0,0 +1,122 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.api.DDTags.BASE_SERVICE;
+
+import java.util.Set;
+
+/**
+ * Parallel arrays of peer-tag names and their {@link TagCardinalityHandler}s, indexed in lockstep.
+ *
+ * Replaces the previous {@code Map} lookup with positional array
+ * access: the producer captures span tag values into a {@code String[]} parallel to {@link #names},
+ * and the consumer applies {@link #handler(int)} at the same index to canonicalize.
+ *
+ * Two schemas exist:
+ *
+ *
+ * - {@link #INTERNAL} — a singleton with one entry for {@code base.service}, used for
+ * internal-kind spans where only the base service is aggregated.
+ *
- {@link #current()} — the schema for {@code client}/{@code producer}/{@code consumer} spans,
+ * refreshed lazily when {@code DDAgentFeaturesDiscovery.peerTags()} changes via {@link
+ * #currentSyncedTo(Set)}.
+ *
+ *
+ * Each {@link SpanSnapshot} captures its own schema reference so producer and consumer agree on
+ * the indexing even if the current schema is replaced between capture and consumption.
+ *
+ *
Thread-safety: {@link #currentSyncedTo} may be called from producer threads;
+ * replacement of the volatile {@code CURRENT} reference is guarded by a lock. The {@link
+ * TagCardinalityHandler}s themselves are not thread-safe and must only be exercised on the
+ * aggregator thread (this is where the snapshot's schema is consumed).
+ */
+final class PeerTagSchema {
+
+ private static final int VALUE_LIMIT_PER_TAG = 512;
+
+ /** Singleton schema for internal-kind spans -- only {@code base.service}. */
+ static final PeerTagSchema INTERNAL = new PeerTagSchema(new String[] {BASE_SERVICE});
+
+ /** Current schema for peer-aggregation kinds; replaced atomically when peer tag names change. */
+ private static volatile PeerTagSchema CURRENT = new PeerTagSchema(new String[0]);
+
+ /**
+ * Identity cache of the most recently observed {@code features.peerTags()} {@link Set} instance.
+ * The producer hot path checks this first and skips the {@code names}-vs-set comparison when the
+ * caller's set instance hasn't changed. In production this is the common case -- {@code
+ * DDAgentFeaturesDiscovery} returns the same Set instance until reconfiguration.
+ */
+ private static volatile Set LAST_SYNCED_INPUT;
+
+ final String[] names;
+ final TagCardinalityHandler[] handlers;
+
+ private PeerTagSchema(String[] names) {
+ this.names = names;
+ this.handlers = new TagCardinalityHandler[names.length];
+ for (int i = 0; i < names.length; i++) {
+ this.handlers[i] = new TagCardinalityHandler(names[i], VALUE_LIMIT_PER_TAG);
+ }
+ }
+
+ /**
+ * Returns the current peer-aggregation schema, lazily refreshing it if the supplied {@code
+ * peerTagNames} differ from the cached set. Designed to be called from the producer hot path: the
+ * common case is a single volatile read and an array-length / set-contains comparison.
+ */
+ static PeerTagSchema currentSyncedTo(Set peerTagNames) {
+ // Fast path: same Set instance as the last sync -> the cached schema is still valid, no
+ // matches() loop needed. In production this is the steady-state case.
+ if (peerTagNames == LAST_SYNCED_INPUT) {
+ return CURRENT;
+ }
+ PeerTagSchema cur = CURRENT;
+ if (matches(cur.names, peerTagNames)) {
+ LAST_SYNCED_INPUT = peerTagNames;
+ return cur;
+ }
+ synchronized (PeerTagSchema.class) {
+ cur = CURRENT;
+ if (!matches(cur.names, peerTagNames)) {
+ cur = new PeerTagSchema(peerTagNames.toArray(new String[0]));
+ CURRENT = cur;
+ }
+ LAST_SYNCED_INPUT = peerTagNames;
+ return cur;
+ }
+ }
+
+ /** Resets the working sets of {@link #INTERNAL} and {@link #current()}. */
+ static void resetAll() {
+ PeerTagSchema cur = CURRENT;
+ for (TagCardinalityHandler h : cur.handlers) {
+ h.reset();
+ }
+ for (TagCardinalityHandler h : INTERNAL.handlers) {
+ h.reset();
+ }
+ }
+
+ int size() {
+ return names.length;
+ }
+
+ String name(int i) {
+ return names[i];
+ }
+
+ TagCardinalityHandler handler(int i) {
+ return handlers[i];
+ }
+
+ private static boolean matches(String[] cur, Set set) {
+ if (cur.length != set.size()) {
+ return false;
+ }
+ for (String n : cur) {
+ if (!set.contains(n)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java
new file mode 100644
index 00000000000..61560a32a71
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java
@@ -0,0 +1,45 @@
+package datadog.trace.common.metrics;
+
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import java.util.HashMap;
+
+public final class PropertyCardinalityHandler {
+ private final int cardinalityLimit;
+
+ private final HashMap curUtf8s;
+
+ private UTF8BytesString cacheBlocked = null;
+
+ public PropertyCardinalityHandler(int cardinalityLimit) {
+ this.cardinalityLimit = cardinalityLimit;
+
+ // pre-sizing properly to avoid rehashing
+ this.curUtf8s = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1);
+ }
+
+ public UTF8BytesString register(CharSequence value) {
+ if (this.curUtf8s.size() >= this.cardinalityLimit) {
+ return this.blockedByTracer();
+ }
+
+ UTF8BytesString existingUtf8 = this.curUtf8s.get(value);
+ if (existingUtf8 != null) return existingUtf8;
+
+ // TODO: maybe use a fallback cache to reduce allocations across reset cycles
+ UTF8BytesString newUtf8 = UTF8BytesString.create(value);
+ this.curUtf8s.put(value, newUtf8);
+ return newUtf8;
+ }
+
+ private UTF8BytesString blockedByTracer() {
+ UTF8BytesString cacheBlocked = this.cacheBlocked;
+ if (cacheBlocked != null) return cacheBlocked;
+
+ this.cacheBlocked = cacheBlocked = UTF8BytesString.create("blocked_by_tracer");
+ return cacheBlocked;
+ }
+
+ public void reset() {
+ this.curUtf8s.clear();
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
index b7f81712945..5967c1302c7 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java
@@ -21,10 +21,18 @@ final class SpanSnapshot implements InboxItem {
final String spanKind;
/**
- * Flattened name/value pairs of peer-tag matches: {@code [name0, value0, name1, value1, ...]}.
- * {@code null} when there are no matches (the common case).
+ * Schema for {@link #peerTagValues}. {@code null} when the span has no peer tags. The schema
+ * carries the names + {@link TagCardinalityHandler}s in parallel array form; {@code
+ * peerTagValues} holds the per-span tag values at the same indices.
*/
- final String[] peerTagPairs;
+ final PeerTagSchema peerTagSchema;
+
+ /**
+ * Peer tag values captured from the span, parallel to {@code peerTagSchema.names}. A {@code null}
+ * entry means the span didn't have that peer tag set. {@code null} (the whole array) when {@link
+ * #peerTagSchema} is {@code null}.
+ */
+ final String[] peerTagValues;
final String httpMethod;
final String httpEndpoint;
@@ -43,7 +51,8 @@ final class SpanSnapshot implements InboxItem {
boolean synthetic,
boolean traceRoot,
String spanKind,
- String[] peerTagPairs,
+ PeerTagSchema peerTagSchema,
+ String[] peerTagValues,
String httpMethod,
String httpEndpoint,
String grpcStatusCode,
@@ -57,7 +66,8 @@ final class SpanSnapshot implements InboxItem {
this.synthetic = synthetic;
this.traceRoot = traceRoot;
this.spanKind = spanKind;
- this.peerTagPairs = peerTagPairs;
+ this.peerTagSchema = peerTagSchema;
+ this.peerTagValues = peerTagValues;
this.httpMethod = httpMethod;
this.httpEndpoint = httpEndpoint;
this.grpcStatusCode = grpcStatusCode;
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java
new file mode 100644
index 00000000000..1fdfed5c7c4
--- /dev/null
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java
@@ -0,0 +1,46 @@
+package datadog.trace.common.metrics;
+
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import java.util.HashMap;
+
+public final class TagCardinalityHandler {
+ private final String tag;
+ private final int cardinalityLimit;
+
+ private final HashMap curUtf8Pairs;
+
+ private UTF8BytesString cacheBlocked = null;
+
+ public TagCardinalityHandler(String tag, int cardinalityLimit) {
+ this.tag = tag;
+ this.cardinalityLimit = cardinalityLimit;
+
+ // pre-sizing properly to avoid rehashing
+ this.curUtf8Pairs = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1);
+ }
+
+ public UTF8BytesString register(String value) {
+ if (this.curUtf8Pairs.size() >= this.cardinalityLimit) {
+ return this.blockedByTracer();
+ }
+
+ UTF8BytesString existing = this.curUtf8Pairs.get(value);
+ if (existing != null) return existing;
+
+ UTF8BytesString newPair = UTF8BytesString.create(this.tag + ":" + value);
+ this.curUtf8Pairs.put(value, newPair);
+ return newPair;
+ }
+
+ private UTF8BytesString blockedByTracer() {
+ UTF8BytesString cacheBlocked = this.cacheBlocked;
+ if (cacheBlocked != null) return cacheBlocked;
+
+ this.cacheBlocked = cacheBlocked = UTF8BytesString.create(this.tag + ":blocked_by_tracer");
+ return cacheBlocked;
+ }
+
+ public void reset() {
+ this.curUtf8Pairs.clear();
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java
index 7d183670883..810b13884de 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java
@@ -82,6 +82,16 @@ default U unsafeGetTag(CharSequence name) {
boolean isKind(SpanKindFilter filter);
+ /**
+ * Returns the {@code span.kind} tag value as a String, or {@code null} if not set. Default
+ * implementation reads the tag map; {@link DDSpan} overrides to use a cached ordinal that
+ * resolves via a small lookup array, skipping the tag-map lookup on the hot path.
+ */
+ default String getSpanKindString() {
+ Object v = unsafeGetTag(datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND);
+ return v == null ? null : v.toString();
+ }
+
CharSequence getType();
/**
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
index 4c438e1c915..943776e7577 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
@@ -963,6 +963,11 @@ public boolean isKind(SpanKindFilter filter) {
return filter.matches(context.getSpanKindOrdinal());
}
+ @Override
+ public String getSpanKindString() {
+ return context.getSpanKindString();
+ }
+
@Override
public void copyPropagationAndBaggage(final AgentSpan source) {
if (source instanceof DDSpan) {
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
index 76051645fcb..db384a7e42e 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java
@@ -382,7 +382,7 @@ private static class Flush implements AgentTaskScheduler.Task empty = new HashSet<>()
@@ -35,7 +35,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(
wellKnownTags,
empty,
features,
@@ -65,7 +65,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(
wellKnownTags,
[ignoredResourceName].toSet(),
features,
@@ -103,7 +103,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -149,7 +149,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -195,7 +195,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -253,14 +253,17 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
"client" | "GET" | "/external/api" | true
}
- def "should create bucket for each set of peer tags"() {
+ def "should create separate buckets for distinct peer tag values"() {
+ // Peer-tag NAMES are configured per-tracer and stable for the duration of a trace publish;
+ // peer-tag VALUES vary per-span. Two spans with the same names but different values should
+ // produce two distinct aggregate buckets.
setup:
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
- features.peerTags() >>> [["country"], ["country", "georegion"],]
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ features.peerTags() >> ["country", "georegion"]
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -270,7 +273,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
.setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"),
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
- .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe")
+ .setTag(SPAN_KIND, "client").setTag("country", "germany").setTag("georegion", "europe")
])
aggregator.report()
def latchTriggered = latch.await(2, SECONDS)
@@ -289,7 +292,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
false,
false,
"client",
- [UTF8BytesString.create("country:france")],
+ [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")],
null,
null,
null
@@ -307,7 +310,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
false,
false,
"client",
- [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")],
+ [UTF8BytesString.create("country:germany"), UTF8BytesString.create("georegion:europe")],
null,
null,
null
@@ -327,7 +330,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> ["peer.hostname", "_dd.base_service"]
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -380,7 +383,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP,
sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -432,7 +435,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
long duration = 100
List trace = [
@@ -504,7 +507,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -631,7 +634,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -746,7 +749,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -816,7 +819,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -888,7 +891,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()
@@ -956,7 +959,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
HealthMetrics healthMetrics = Mock(HealthMetrics)
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()
@@ -990,7 +993,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
HealthMetrics healthMetrics = Mock(HealthMetrics)
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -1035,7 +1038,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1137,7 +1140,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1197,7 +1200,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1248,7 +1251,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
long duration = 100
aggregator.start()
@@ -1279,7 +1282,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
aggregator.start()
@@ -1301,7 +1304,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> false
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, false)
final spans = [
new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK)
@@ -1333,7 +1336,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
when:
@@ -1366,7 +1369,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -1413,7 +1416,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -1468,7 +1471,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()
@@ -1559,7 +1562,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()
@@ -1632,14 +1635,14 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
aggregator.close()
}
- def reportAndWaitUntilEmpty(ConflatingMetricsAggregator aggregator) {
+ def reportAndWaitUntilEmpty(ClientStatsAggregator aggregator) {
waitUntilEmpty(aggregator)
aggregator.report()
waitUntilEmpty(aggregator)
}
- def waitUntilEmpty(ConflatingMetricsAggregator aggregator) {
+ def waitUntilEmpty(ClientStatsAggregator aggregator) {
int i = 0
while (!aggregator.inbox.isEmpty() && i++ < 100) {
Thread.sleep(10)
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy
index eceedeb1935..86a91c23b3f 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy
@@ -37,7 +37,7 @@ class FootprintForkedTest extends DDSpecification {
it.supportsMetrics() >> true
it.peerTags() >> []
}
- ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(
+ ClientStatsAggregator aggregator = new ClientStatsAggregator(
new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"),
[].toSet() as Set,
features,
diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy
index 07f246bf9a9..dc9eb86fde3 100644
--- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy
+++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy
@@ -28,6 +28,6 @@ class MetricsAggregatorFactoryTest extends DDSpecification {
expect:
def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco, HealthMetrics.NO_OP,
)
- assert aggregator instanceof ConflatingMetricsAggregator
+ assert aggregator instanceof ClientStatsAggregator
}
}
diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java
index 44f2b36cb6b..7a4f84c30dd 100644
--- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java
+++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java
@@ -87,6 +87,27 @@ void peerTagPairsParticipateInIdentity() {
assertEquals(3, table.size());
}
+ @Test
+ void cardinalityBlockedValuesCollapseIntoOneEntry() {
+ // SERVICE_HANDLER has a cardinality limit of 32. With 50 distinct service names, services 33+
+ // canonicalize to the "blocked_by_tracer" sentinel. Because the table hashes from the canonical
+ // (post-handler) form, all blocked services land in the same bucket and merge into a single
+ // entry rather than fragmenting.
+ AggregateEntry.resetCardinalityHandlers();
+ AggregateTable table = new AggregateTable(128);
+
+ for (int i = 0; i < 50; i++) {
+ AggregateMetric agg = table.findOrInsert(snapshot("svc-" + i, "op", "client"));
+ assertNotNull(agg);
+ agg.recordOneDuration(1L);
+ }
+
+ // 32 in-budget services + 1 collapsed "blocked_by_tracer" entry = 33 total.
+ assertEquals(33, table.size());
+
+ AggregateEntry.resetCardinalityHandlers();
+ }
+
@Test
void capOverrunEvictsStaleEntry() {
AggregateTable table = new AggregateTable(2);
@@ -199,7 +220,8 @@ private static final class SnapshotBuilder {
private final String service;
private final String operation;
private final String spanKind;
- private String[] peerTagPairs;
+ private PeerTagSchema peerTagSchema;
+ private String[] peerTagValues;
private long tagAndDuration = 0L;
SnapshotBuilder(String service, String operation, String spanKind) {
@@ -209,7 +231,23 @@ private static final class SnapshotBuilder {
}
SnapshotBuilder peerTags(String... namesAndValues) {
- this.peerTagPairs = namesAndValues;
+ // Build a schema from the (name, value, name, value, ...) input. Synced through the
+ // production singleton so canonicalization actually goes through the same handlers the
+ // aggregator would use in production -- which is the surface the test wants to exercise.
+ java.util.LinkedHashSet names = new java.util.LinkedHashSet<>();
+ for (int i = 0; i < namesAndValues.length; i += 2) {
+ names.add(namesAndValues[i]);
+ }
+ this.peerTagSchema = PeerTagSchema.currentSyncedTo(names);
+ this.peerTagValues = new String[peerTagSchema.size()];
+ for (int i = 0; i < namesAndValues.length; i += 2) {
+ for (int j = 0; j < peerTagSchema.size(); j++) {
+ if (peerTagSchema.name(j).equals(namesAndValues[i])) {
+ peerTagValues[j] = namesAndValues[i + 1];
+ break;
+ }
+ }
+ }
return this;
}
@@ -224,7 +262,8 @@ SpanSnapshot build() {
false,
true,
spanKind,
- peerTagPairs,
+ peerTagSchema,
+ peerTagValues,
null,
null,
null,
diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java
new file mode 100644
index 00000000000..bbdffb6061a
--- /dev/null
+++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java
@@ -0,0 +1,88 @@
+package datadog.trace.common.metrics;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
+import org.junit.jupiter.api.Test;
+
+class CardinalityHandlerTest {
+
+ @Test
+ void propertyReturnsSameInstanceForRepeatedValueUntilLimit() {
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(3);
+ UTF8BytesString a1 = h.register("a");
+ UTF8BytesString a2 = h.register("a");
+ assertSame(a1, a2);
+ assertEquals("a", a1.toString());
+ }
+
+ @Test
+ void propertyOverLimitReturnsBlockedSentinel() {
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(2);
+ UTF8BytesString a = h.register("a");
+ UTF8BytesString b = h.register("b");
+ UTF8BytesString blocked1 = h.register("c");
+ UTF8BytesString blocked2 = h.register("d");
+
+ assertEquals("blocked_by_tracer", blocked1.toString());
+ assertSame(blocked1, blocked2); // same sentinel for all overflow values
+ assertNotSame(blocked1, a);
+ assertNotSame(blocked1, b);
+ }
+
+ @Test
+ void propertyResetRefreshesBudget() {
+ PropertyCardinalityHandler h = new PropertyCardinalityHandler(2);
+ h.register("a");
+ h.register("b");
+ UTF8BytesString blocked = h.register("c");
+ assertEquals("blocked_by_tracer", blocked.toString());
+
+ h.reset();
+
+ // After reset, three distinct values fit again, but the previous instances aren't reused.
+ UTF8BytesString afterReset = h.register("a");
+ assertEquals("a", afterReset.toString());
+ UTF8BytesString c = h.register("c");
+ assertEquals("c", c.toString());
+ UTF8BytesString blockedAgain = h.register("d");
+ UTF8BytesString blockedYetAgain = h.register("e");
+ assertEquals("blocked_by_tracer", blockedAgain.toString());
+ assertSame(blockedAgain, blockedYetAgain);
+ }
+
+ @Test
+ void tagPrefixesValuesAndReusesUnderLimit() {
+ TagCardinalityHandler h = new TagCardinalityHandler("peer.hostname", 4);
+ UTF8BytesString first = h.register("host-a");
+ UTF8BytesString second = h.register("host-a");
+ UTF8BytesString other = h.register("host-b");
+
+ assertSame(first, second);
+ assertNotSame(first, other);
+ assertEquals("peer.hostname:host-a", first.toString());
+ assertEquals("peer.hostname:host-b", other.toString());
+ }
+
+ @Test
+ void tagOverLimitReturnsTaggedSentinel() {
+ TagCardinalityHandler h = new TagCardinalityHandler("peer.service", 1);
+ h.register("svc-1");
+ UTF8BytesString blocked = h.register("svc-2");
+ assertEquals("peer.service:blocked_by_tracer", blocked.toString());
+ }
+
+ @Test
+ void tagResetRefreshesBudgetAndSentinelStaysStable() {
+ TagCardinalityHandler h = new TagCardinalityHandler("x", 1);
+ h.register("v1");
+ UTF8BytesString blockedBefore = h.register("v2");
+ h.reset();
+ h.register("v1");
+ UTF8BytesString blockedAfter = h.register("v2");
+ // Both are the same sentinel instance (cacheBlocked is not cleared on reset).
+ assertSame(blockedBefore, blockedAfter);
+ }
+}
diff --git a/docs/client_metrics_design.md b/docs/client_metrics_design.md
new file mode 100644
index 00000000000..489763fd413
--- /dev/null
+++ b/docs/client_metrics_design.md
@@ -0,0 +1,308 @@
+# Client-side metrics (stats aggregator) design
+
+This document describes the design of the **client-side metrics pipeline** that
+lives under `dd-trace-core/.../common/metrics/`. The pipeline aggregates per-span
+duration / count / error statistics on the tracer and sends rolled-up "client
+stats" payloads to the Datadog Agent on a fixed reporting interval, so the agent
+does not have to sample every span to know request rates and latencies.
+
+Code lives in package `datadog.trace.common.metrics`.
+
+## High-level shape
+
+```
+ producer thread(s) aggregator thread
+ inbox
+ trace ─▶ ClientStatsAggregator.publish(trace) ──MPSC──▶ Aggregator.run
+ │ │
+ │ per metrics-eligible span │ Drainer.accept
+ │ │
+ │ allocates one SpanSnapshot ▼
+ │ (immutable, ~15 refs) AggregateTable.findOrInsert
+ │ │
+ │ inbox.offer(snapshot) │ canonicalize → hash
+ └────────────────────────────────────▶ │ → lookup or insert
+ │
+ scheduled REPORT signal ──▶│
+ │ Aggregator.report
+ │ → MetricWriter.add(entry)
+ │ → OkHttpSink (HTTP POST)
+ │ → reset cardinality handlers
+```
+
+Three rules govern the design:
+
+1. **The producer never touches shared state.** The hot path on the application
+ thread builds an immutable `SpanSnapshot` and offers it to a bounded MPSC
+ queue. No locks, no maps, no hashing of the metric key.
+2. **The aggregator thread is the sole writer of every shared structure.** The
+ aggregate table, the cardinality handlers, the metric writer state — all of
+ them are accessed only from that thread. Control operations (clear, report,
+ stop) are themselves enqueued as `SignalItem`s so they serialize with data.
+3. **Cardinality is bounded.** Per-field handlers cap the unique values; once a
+ field's budget is exhausted, overflow values collapse into a single
+ `blocked_by_tracer` sentinel so the aggregate table can't blow up.
+
+## Component map
+
+| Component | File | Role |
+|---|---|---|
+| `ClientStatsAggregator` | `ClientStatsAggregator.java` | Producer facade. Decides which spans are eligible, builds `SpanSnapshot`s, offers them to the inbox. Also owns the agent-feature check, the scheduled report timer, and the agent-downgrade handler. |
+| `SpanSnapshot` | `SpanSnapshot.java` | Immutable, allocation-pooled-by-GC value posted from producer → aggregator. Carries raw label fields plus a duration word with `TOP_LEVEL` / `ERROR` bits OR-ed in. |
+| `PeerTagSchema` | `PeerTagSchema.java` | Parallel `String[] names` + `TagCardinalityHandler[] handlers` describing the peer-aggregation tags in effect. One singleton for internal-kind spans; one volatile "current" schema for client/producer/consumer spans, refreshed from `DDAgentFeaturesDiscovery.peerTags()`. |
+| `Aggregator` | `Aggregator.java` | Consumer thread `Runnable`. Drains the inbox; dispatches `SpanSnapshot`s into `AggregateTable`; processes signals (`REPORT`, `CLEAR`, `STOP`); calls the writer on report. |
+| `AggregateTable` | `AggregateTable.java` | Hashtable-backed store keyed on the canonicalized labels. Owns a single reusable `Canonical` scratch buffer. Handles cap-overflow by evicting one stale entry or rejecting new ones. |
+| `AggregateEntry` | `AggregateEntry.java` | `Hashtable.Entry` holding the 13 UTF8 label fields + the mutable `AggregateMetric`. Owns the static `PropertyCardinalityHandler`s for the fixed label fields, and `Canonical` for hot-path canonicalization. |
+| `AggregateMetric` | `AggregateMetric.java` | Per-bucket accumulator: hit count, error count, top-level count, duration sum, ok/error latency histograms. Single-threaded; cleared each report. |
+| `PropertyCardinalityHandler` | `PropertyCardinalityHandler.java` | Per-field UTF8 interner with a max-unique-values cap. Returns a `blocked_by_tracer` sentinel `UTF8BytesString` once the cap is hit. Reset by the aggregator each cycle. |
+| `TagCardinalityHandler` | `TagCardinalityHandler.java` | Same pattern as the property handler, but the cached UTF8 form is the full `tag:value` pair (peer tags are wire-encoded as `tag:value`, not just the value). |
+| `SerializingMetricWriter` / `OkHttpSink` | `SerializingMetricWriter.java`, `OkHttpSink.java` | Wire serialization (MessagePack) + HTTP POST to the agent's `/v0.6/stats` endpoint. |
+| `MetricsAggregatorFactory` / `NoOpMetricsAggregator` | factory + no-op | Picks the real implementation when client stats are enabled and the agent supports the endpoint, no-op otherwise. |
+
+## Producer-side flow (`ClientStatsAggregator.publish`)
+
+The producer holds **no shared state**. Per trace it:
+
+1. Snapshots the current peer-aggregation schema **once per trace** (not per
+ span):
+ ```java
+ Set eligiblePeerTags = features.peerTags();
+ PeerTagSchema peerAggSchema =
+ (eligiblePeerTags == null || eligiblePeerTags.isEmpty())
+ ? null
+ : PeerTagSchema.currentSyncedTo(eligiblePeerTags);
+ ```
+ `currentSyncedTo` has a fast path: identity-equal to the previously-synced
+ `Set` instance → return the cached schema (the common case, since
+ `DDAgentFeaturesDiscovery` returns the same `Set` until remote-config
+ reconfiguration). The cached schema is `volatile`; replacement is guarded by
+ a `synchronized` block.
+
+2. Iterates the trace; for each metrics-eligible span:
+
+ - **Eligibility** (`shouldComputeMetric`):
+ ```java
+ (measured || isTopLevel || isKind(SERVER|CLIENT|PRODUCER|CONSUMER))
+ && longRunningVersion <= 0
+ && durationNano > 0
+ ```
+ `isMeasured` / `isTopLevel` are flag reads on `DDSpanContext`; `isKind`
+ reads the **cached `byte` span-kind ordinal** through a `SpanKindFilter`
+ bitmask test — no tag-map lookup.
+
+ - **Resource-name ignore-list** breaks out of the trace early; the entire
+ trace is dropped on a match.
+
+ - **Picks the peer-tag schema** (`peerTagSchemaFor`): for client/producer/
+ consumer kinds → `peerAggSchema` (already synced for this trace); for
+ internal-kind spans → `PeerTagSchema.INTERNAL` (single `base.service`
+ entry); otherwise `null`.
+
+ - **Captures peer-tag *values***, not pairs: walks `schema.names` and pulls
+ `unsafeGetTag(name)` for each, into a parallel `String[]`. Names + handlers
+ are the schema's job; the producer only carries raw values. Returns `null`
+ when no peer tags are set, in which case the schema reference is dropped
+ too so the consumer doesn't loop over an all-null array.
+
+ - **Builds and offers** a `SpanSnapshot` to the MPSC inbox. The span-kind
+ string is taken from `CoreSpan.getSpanKindString()`, which DDSpan
+ overrides to resolve via the cached byte ordinal through a small lookup
+ array — **no tag-map lookup**. Origin equality uses `contentEquals`.
+ `httpMethod` / `httpEndpoint` are only fetched when
+ `traceClientStatsEndpoints=true`; `grpcStatusCode` only when span type is
+ `rpc`.
+
+ - On inbox-full: the snapshot is dropped and `healthMetrics.onStatsInboxFull()`
+ fires. The producer never blocks.
+
+3. Reports `healthMetrics.onClientStatTraceComputed(counted, total, dropped)`.
+
+ `forceKeep` is the only signal returned upward — `true` if any of the
+ trace's metrics-eligible spans had errors, so the trace writer keeps the
+ raw trace too.
+
+### Why the producer is lean
+
+The cumulative cost of running these checks on every finished span is the
+single biggest concern. The producer deliberately avoids:
+
+- locking or synchronization of any kind on the hot path,
+- hashing the metric key (deferred to the aggregator thread),
+- map / cache lookups for label canonicalization (deferred),
+- tag-map lookups when a span carries the relevant information on the context
+ itself (`span.kind` via the cached byte ordinal; `isMeasured`, `isTopLevel`
+ via flag reads),
+- allocation beyond the `SpanSnapshot` itself and a single `String[]` for peer
+ tag values when any are present.
+
+## Aggregator-side flow (`Aggregator.run`)
+
+A single agent thread runs the `Aggregator.run` loop. The thread drains the
+inbox via `inbox.drain(drainer)`; when the queue is empty it sleeps
+`DEFAULT_SLEEP_MILLIS` (10 ms) and retries. The Drainer dispatches by item
+type:
+
+- `SpanSnapshot` → `AggregateTable.findOrInsert(snapshot)` returns either an
+ existing or freshly-inserted `AggregateMetric`, then the snapshot's
+ `tagAndDuration` is recorded. If the table is at capacity and no stale entry
+ can be evicted, `healthMetrics.onStatsAggregateDropped()` fires.
+
+- `ReportSignal` → on the scheduled cadence (the default report interval is
+ 10 s; configurable via `tracerMetricsMaxAggregates` / reporting interval),
+ `Aggregator.report`:
+ 1. Expunges entries with `hitCount == 0` (stale).
+ 2. If anything remains, opens a bucket via `MetricWriter.startBucket(...)`,
+ walks `AggregateTable.forEach`, writes each entry, clears its metric.
+ 3. Calls `MetricWriter.finishBucket()` (which may do I/O and block).
+ 4. **Resets all cardinality handlers** so the next interval starts with a
+ fresh budget. Existing entries keep their previously-issued UTF8
+ references, and matching is by content-equality, so canonicalizing a
+ post-reset snapshot against an existing entry still resolves to the
+ same bucket.
+
+- `ClearSignal` → drops the aggregate state. The downgrade handler
+ (`onEvent(DOWNGRADED, ...)`) offers `CLEAR` to the inbox rather than calling
+ `clearAggregates()` directly, so the aggregator thread remains the sole
+ writer of the table.
+
+- `StopSignal` → final report + thread exit.
+
+## The canonical-key trick (cardinality-safe deduplication)
+
+The lookup hash is computed from the **canonicalized** label fields, not the
+raw `SpanSnapshot` fields. This is the property that makes
+cardinality-blocking actually save space:
+
+```java
+// AggregateTable.findOrInsert
+canonical.populate(snapshot); // runs every field through its handler
+long keyHash = canonical.keyHash;
+int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash);
+for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) {
+ if (e.keyHash == keyHash) {
+ AggregateEntry candidate = (AggregateEntry) e;
+ if (canonical.matches(candidate)) {
+ return candidate.aggregate;
+ }
+ }
+}
+// miss → toEntry, splice into bucket head
+```
+
+`Canonical.populate` runs each label field through its
+`PropertyCardinalityHandler` (or `TagCardinalityHandler` for peer tags). Once a
+handler's working set is full, **every subsequent unique value resolves to the
+same `UTF8BytesString` sentinel** — so the hash computed from the canonical
+form is identical for all blocked values. They land in the same bucket and
+merge into one `AggregateEntry` rather than fragmenting into N entries.
+
+The `Canonical` scratch buffer is reused per `findOrInsert` call. On a hit,
+nothing is allocated. On a miss, `toEntry` snapshots the buffer's references
+into a fresh entry; the buffer is overwritten on the next call.
+
+### Hash chain (no varargs)
+
+`AggregateEntry.hashOf` uses chained primitive calls into
+`LongHashingUtils.addToHash(long, T)` rather than a varargs `addToHash(long,
+Object...)`. This avoids the `Object[]` allocation and boxing of the primitive
+fields (`httpStatusCode`, `synthetic`, `traceRoot`) that varargs would force.
+
+## Reporting cadence and cardinality reset
+
+Two distinct cadences:
+
+- **Reporting interval** (default 10 s): when the report timer fires,
+ `ReportTask` calls `report()` which `inbox.offer(REPORT)`. The aggregator
+ drains up to that signal, then writes the bucket and resets the cardinality
+ handlers. The handlers reset *every reporting cycle*, so the per-field
+ budgets refresh.
+
+- **Schema sync**: `PeerTagSchema.currentSyncedTo` runs on the producer thread
+ per trace, with an identity-check fast path. The schema reference is
+ replaced atomically when remote-config reconfigures the peer-tag set.
+
+## Memory and lifetime
+
+- `AggregateMetric` is **not thread-safe**. It is mutated only by the
+ aggregator thread.
+- `AggregateTable` is **not thread-safe**. All paths (producer-side `CLEAR`,
+ schedule-driven `REPORT`, drainer-driven inserts) route through the inbox.
+- `Canonical` and the cardinality handlers are aggregator-thread-only.
+- `PeerTagSchema.CURRENT` is `volatile` with `synchronized` replacement; the
+ schema's `TagCardinalityHandler`s themselves are aggregator-thread-only and
+ are reset alongside the property handlers each cycle.
+- Entries retain their `UTF8BytesString` references across handler resets;
+ matches via content-equality so post-reset snapshots still resolve.
+- Cap: `tracerMetricsMaxAggregates` bounds table size. Cap-overrun policy:
+ evict one stale entry (`hitCount == 0`) or drop the new data point.
+
+## Health metrics
+
+The producer reports per-trace stats via `HealthMetrics`:
+
+- `onClientStatTraceComputed(counted, totalSpans, dropped)` — per `publish`.
+- `onStatsInboxFull()` — when the MPSC queue rejects an offer.
+- `onClientStatPayloadSent()` / `onClientStatDowngraded()` /
+ `onClientStatErrorReceived()` — on agent-side outcomes.
+- `onStatsAggregateDropped()` — when the aggregator thread can't fit a new
+ entry.
+
+## Failure modes
+
+| Failure | Effect |
+|---|---|
+| Inbox full | Snapshot dropped, `onStatsInboxFull` increments, producer continues. |
+| Agent unavailable / errors | `OkHttpSink` reports `BAD_PAYLOAD` / `ERROR`; metric reporting continues. |
+| Agent downgrade (no /v0.6/stats) | `disable()` offers `CLEAR` to the inbox; the aggregator wipes its table. Producer's `features.supportsMetrics()` returns false on subsequent calls, so new snapshots are not built. |
+| Aggregate table full, no stale entry | New snapshot dropped, `onStatsAggregateDropped` increments. Existing entries continue to accumulate. |
+| Cardinality budget exhausted | Overflow values canonicalize to a `blocked_by_tracer` sentinel and merge into one bucket. Total entry count stays bounded by `maxAggregates`. |
+| Producer throws mid-trace | Caught by the writer's normal error path; `onClientStatTraceComputed` is not called for that trace. |
+
+## Why the redesign (history)
+
+The pipeline was previously `ConflatingMetricsAggregator` with:
+
+- producer-side `MetricKey` construction (string-canonicalization on the hot
+ path),
+- a `LRUCache` of `MetricKey → AggregateMetric`,
+- per-tag `DDCache` instances for canonicalization (one per label field),
+- early computation of `tag:value` peer pairs on the producer thread.
+
+The current `ClientStatsAggregator` shape was motivated by JMH benchmarks that
+showed the producer dominating CPU time. The major shifts:
+
+1. **Move all canonicalization off the producer.** Producer just shuffles
+ references into a `SpanSnapshot`.
+2. **Replace `MetricKey` with inlined fields on `AggregateEntry`.** Removes a
+ per-snapshot allocation; lets us own the hash code on the entry itself.
+3. **Replace the `LRUCache` with a `Hashtable`** keyed on canonicalized labels.
+ Hash is computed once per insert/lookup; chained primitive hashing avoids
+ boxing.
+4. **Replace per-tag `DDCache`s with per-field `PropertyCardinalityHandler`s**
+ that share a `blocked_by_tracer` sentinel for cardinality overflow. Reset
+ each reporting cycle.
+5. **Capture peer-tag values, not pairs.** Tag-name + handler live on
+ `PeerTagSchema`; the producer carries values in a parallel `String[]`. The
+ aggregator does the `tag:value` interning via `TagCardinalityHandler` on
+ its own thread.
+6. **Sync peer-tag schema once per trace.** `currentSyncedTo` has an
+ identity-check fast path; the steady-state cost is one volatile read.
+7. **Single owner of all shared state.** `disable()` routes through `CLEAR`
+ rather than mutating the aggregate table directly.
+
+### Benchmark summary
+
+`ClientStatsAggregatorDDSpanBenchmark` (64 client-kind DDSpans per op, single
+trace, real `CoreTracer` with a no-op writer):
+
+| Variant | µs/op |
+|---|---|
+| master (`ConflatingMetricsAggregator`, baseline) | 6.428 |
+| with `SpanSnapshot` + background aggregation | 2.454 |
+| with peer-tag schema hoist | 2.410 |
+| with cached span-kind ordinal + isSynthetic fix | 1.995 |
+
+The remaining producer-thread hotspots (from JFR sampling) are tag-map
+lookups for `peer.hostname` / other peer-tag values inside
+`capturePeerTagValues`. A bulk peer-tag accessor on `DDSpan` would crack that
+chunk further, but is a structural change beyond the current package.