diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java similarity index 95% rename from dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java rename to dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java index b9a2f7f8c54..b9d72eaf3ab 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java @@ -34,12 +34,12 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(MICROSECONDS) @Fork(value = 1) -public class ConflatingMetricsAggregatorBenchmark { +public class ClientStatsAggregatorBenchmark { private final DDAgentFeaturesDiscovery featuresDiscovery = new FixedAgentFeaturesDiscovery( Collections.singleton("peer.hostname"), Collections.emptySet()); - private final ConflatingMetricsAggregator aggregator = - new ConflatingMetricsAggregator( + private final ClientStatsAggregator aggregator = + new ClientStatsAggregator( new WellKnownTags("", "", "", "", "", ""), Collections.emptySet(), featuresDiscovery, diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java similarity index 85% rename from dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java rename to dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java index 02c6aaffc1a..06052c57ded 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java @@ -28,8 +28,8 @@ import org.openjdk.jmh.infra.Blackhole; /** - * Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances - * instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link + * Parallels {@link ClientStatsAggregatorBenchmark} but uses real {@link DDSpan} instances instead + * of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link * CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's * dispatch. */ @@ -39,21 +39,21 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(MICROSECONDS) @Fork(value = 1) -public class ConflatingMetricsAggregatorDDSpanBenchmark { +public class ClientStatsAggregatorDDSpanBenchmark { private static final CoreTracer TRACER = CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build(); private final DDAgentFeaturesDiscovery featuresDiscovery = - new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery( Collections.singleton("peer.hostname"), Collections.emptySet()); - private final ConflatingMetricsAggregator aggregator = - new ConflatingMetricsAggregator( + private final ClientStatsAggregator aggregator = + new ClientStatsAggregator( new WellKnownTags("", "", "", "", "", ""), Collections.emptySet(), featuresDiscovery, HealthMetrics.NO_OP, - new ConflatingMetricsAggregatorBenchmark.NullSink(), + new ClientStatsAggregatorBenchmark.NullSink(), 2048, 2048, false); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index e2fda9fde47..225f03197e5 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -1,122 +1,106 @@ package datadog.trace.common.metrics; -import static datadog.trace.api.Functions.UTF8_ENCODE; -import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; - -import datadog.trace.api.Pair; -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.util.Hashtable; import datadog.trace.util.LongHashingUtils; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.function.Function; +import java.util.Objects; /** * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data * {@link SerializingMetricWriter} writes to the wire) plus the mutable {@link AggregateMetric}. * - *

{@link #matches(SpanSnapshot)} compares the entry's stored UTF8 forms against the snapshot's - * raw {@code CharSequence}/{@code String}/{@code String[]} fields via content-equality, so {@code - * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry - * instead of splitting. + *

UTF8 canonicalization runs through per-field {@link PropertyCardinalityHandler}s (and {@link + * TagCardinalityHandler}s for peer tags), so cardinality is capped per reporting interval. The + * critical property: hashing and matching happen after canonicalization, so when a field's + * cardinality budget is exhausted and overflow values collapse to a {@code blocked_by_tracer} + * sentinel, those values land in the same bucket and merge into a single entry rather than + * fragmenting. + * + *

The aggregator thread is the sole writer. {@link AggregateTable} holds a reusable {@link + * Canonical} scratch buffer so the canonicalization itself doesn't allocate per lookup; on a miss + * the buffer's references are copied into a fresh entry. On a hit nothing is allocated. + * + *

The handlers are reset on the aggregator thread every reporting cycle via {@link + * #resetCardinalityHandlers()}. * - *

The static UTF8 caches that used to live on {@code MetricKey} and {@code - * ConflatingMetricsAggregator} are consolidated here. + *

Thread-safety: the cardinality handlers and {@link Canonical} are not thread-safe. Only + * the aggregator thread may call {@link Canonical#populate} or {@link #resetCardinalityHandlers}. + * Test code uses {@link #of} which constructs entries without touching the handlers. */ final class AggregateEntry extends Hashtable.Entry { - // UTF8 caches consolidated from the previous MetricKey + ConflatingMetricsAggregator split. - private static final DDCache RESOURCE_CACHE = - DDCaches.newFixedSizeCache(32); - private static final DDCache SERVICE_CACHE = - DDCaches.newFixedSizeCache(32); - private static final DDCache OPERATION_CACHE = - DDCaches.newFixedSizeCache(64); - private static final DDCache SERVICE_SOURCE_CACHE = - DDCaches.newFixedSizeCache(16); - private static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8); - private static final DDCache SPAN_KIND_CACHE = - DDCaches.newFixedSizeCache(16); - private static final DDCache HTTP_METHOD_CACHE = - DDCaches.newFixedSizeCache(8); - private static final DDCache HTTP_ENDPOINT_CACHE = - DDCaches.newFixedSizeCache(32); - private static final DDCache GRPC_STATUS_CODE_CACHE = - DDCaches.newFixedSizeCache(32); - - /** - * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner - * cache produces the "name:value" encoded form the serializer writes. - */ - private static final DDCache< - String, Pair, Function>> - PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64); - - private static final Function< - String, Pair, Function>> - PEER_TAGS_CACHE_ADDER = - key -> - Pair.of( - DDCaches.newFixedSizeCache(512), - value -> UTF8BytesString.create(key + ":" + value)); - - private final UTF8BytesString resource; - private final UTF8BytesString service; - private final UTF8BytesString operationName; - private final UTF8BytesString serviceSource; // nullable - private final UTF8BytesString type; - private final UTF8BytesString spanKind; - private final UTF8BytesString httpMethod; // nullable - private final UTF8BytesString httpEndpoint; // nullable - private final UTF8BytesString grpcStatusCode; // nullable - private final short httpStatusCode; - private final boolean synthetic; - private final boolean traceRoot; - - // Peer tags carried in two forms: raw String[] for matches() against the snapshot's pairs, - // and pre-encoded List ("name:value") for the serializer. - private final String[] peerTagPairsRaw; - private final List peerTags; - + // Per-field cardinality limits. Identical to the prior DDCache sizes. + static final PropertyCardinalityHandler RESOURCE_HANDLER = new PropertyCardinalityHandler(32); + static final PropertyCardinalityHandler SERVICE_HANDLER = new PropertyCardinalityHandler(32); + static final PropertyCardinalityHandler OPERATION_HANDLER = new PropertyCardinalityHandler(64); + static final PropertyCardinalityHandler SERVICE_SOURCE_HANDLER = + new PropertyCardinalityHandler(16); + static final PropertyCardinalityHandler TYPE_HANDLER = new PropertyCardinalityHandler(8); + static final PropertyCardinalityHandler SPAN_KIND_HANDLER = new PropertyCardinalityHandler(16); + static final PropertyCardinalityHandler HTTP_METHOD_HANDLER = new PropertyCardinalityHandler(8); + static final PropertyCardinalityHandler HTTP_ENDPOINT_HANDLER = + new PropertyCardinalityHandler(32); + static final PropertyCardinalityHandler GRPC_STATUS_CODE_HANDLER = + new PropertyCardinalityHandler(32); + + final UTF8BytesString resource; + final UTF8BytesString service; + final UTF8BytesString operationName; + final UTF8BytesString serviceSource; // nullable + final UTF8BytesString type; + final UTF8BytesString spanKind; + final UTF8BytesString httpMethod; // nullable + final UTF8BytesString httpEndpoint; // nullable + final UTF8BytesString grpcStatusCode; // nullable + final short httpStatusCode; + final boolean synthetic; + final boolean traceRoot; + final List peerTags; final AggregateMetric aggregate; - /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */ - private AggregateEntry(SpanSnapshot s, long keyHash, AggregateMetric aggregate) { + /** Field-bearing constructor used by both the hot path and the test factory. */ + private AggregateEntry( + long keyHash, + UTF8BytesString resource, + UTF8BytesString service, + UTF8BytesString operationName, + UTF8BytesString serviceSource, + UTF8BytesString type, + UTF8BytesString spanKind, + UTF8BytesString httpMethod, + UTF8BytesString httpEndpoint, + UTF8BytesString grpcStatusCode, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + List peerTags, + AggregateMetric aggregate) { super(keyHash); - this.resource = canonicalize(RESOURCE_CACHE, s.resourceName); - this.service = SERVICE_CACHE.computeIfAbsent(s.serviceName, UTF8_ENCODE); - this.operationName = canonicalize(OPERATION_CACHE, s.operationName); - this.serviceSource = - s.serviceNameSource == null - ? null - : canonicalize(SERVICE_SOURCE_CACHE, s.serviceNameSource); - this.type = canonicalize(TYPE_CACHE, s.spanType); - this.spanKind = SPAN_KIND_CACHE.computeIfAbsent(s.spanKind, UTF8BytesString::create); - this.httpMethod = - s.httpMethod == null - ? null - : HTTP_METHOD_CACHE.computeIfAbsent(s.httpMethod, UTF8BytesString::create); - this.httpEndpoint = - s.httpEndpoint == null - ? null - : HTTP_ENDPOINT_CACHE.computeIfAbsent(s.httpEndpoint, UTF8BytesString::create); - this.grpcStatusCode = - s.grpcStatusCode == null - ? null - : GRPC_STATUS_CODE_CACHE.computeIfAbsent(s.grpcStatusCode, UTF8BytesString::create); - this.httpStatusCode = s.httpStatusCode; - this.synthetic = s.synthetic; - this.traceRoot = s.traceRoot; - this.peerTagPairsRaw = s.peerTagPairs; - this.peerTags = materializePeerTags(s.peerTagPairs); + this.resource = resource; + this.service = service; + this.operationName = operationName; + this.serviceSource = serviceSource; + this.type = type; + this.spanKind = spanKind; + this.httpMethod = httpMethod; + this.httpEndpoint = httpEndpoint; + this.grpcStatusCode = grpcStatusCode; + this.httpStatusCode = httpStatusCode; + this.synthetic = synthetic; + this.traceRoot = traceRoot; + this.peerTags = peerTags; this.aggregate = aggregate; } - /** Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. */ + /** + * Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. Bypasses + * the cardinality handlers so tests don't pollute their state -- {@link UTF8BytesString}s are + * created directly. Content-equal entries from {@link Canonical#toEntry} still {@link #equals} an + * entry built via {@code of(...)}. + */ static AggregateEntry of( CharSequence resource, CharSequence service, @@ -131,76 +115,106 @@ static AggregateEntry of( CharSequence httpMethod, CharSequence httpEndpoint, CharSequence grpcStatusCode) { - String[] rawPairs = peerTagsToRawPairs(peerTags); - SpanSnapshot synthetic_snapshot = - new SpanSnapshot( - resource, - service == null ? null : service.toString(), - operationName, - serviceSource, - type, + UTF8BytesString resourceUtf = createUtf8(resource); + UTF8BytesString serviceUtf = createUtf8(service); + UTF8BytesString operationNameUtf = createUtf8(operationName); + UTF8BytesString serviceSourceUtf = serviceSource == null ? null : createUtf8(serviceSource); + UTF8BytesString typeUtf = createUtf8(type); + UTF8BytesString spanKindUtf = createUtf8(spanKind); + UTF8BytesString httpMethodUtf = httpMethod == null ? null : createUtf8(httpMethod); + UTF8BytesString httpEndpointUtf = httpEndpoint == null ? null : createUtf8(httpEndpoint); + UTF8BytesString grpcUtf = grpcStatusCode == null ? null : createUtf8(grpcStatusCode); + List peerTagsList = peerTags == null ? Collections.emptyList() : peerTags; + long keyHash = + hashOf( + resourceUtf, + serviceUtf, + operationNameUtf, + serviceSourceUtf, + typeUtf, + spanKindUtf, + httpMethodUtf, + httpEndpointUtf, + grpcUtf, (short) httpStatusCode, synthetic, traceRoot, - spanKind == null ? null : spanKind.toString(), - rawPairs, - httpMethod == null ? null : httpMethod.toString(), - httpEndpoint == null ? null : httpEndpoint.toString(), - grpcStatusCode == null ? null : grpcStatusCode.toString(), - 0L); + peerTagsList); return new AggregateEntry( - synthetic_snapshot, hashOf(synthetic_snapshot), new AggregateMetric()); - } - - /** Construct from a snapshot at consumer-thread miss time. */ - static AggregateEntry forSnapshot(SpanSnapshot s, AggregateMetric aggregate) { - return new AggregateEntry(s, hashOf(s), aggregate); + keyHash, + resourceUtf, + serviceUtf, + operationNameUtf, + serviceSourceUtf, + typeUtf, + spanKindUtf, + httpMethodUtf, + httpEndpointUtf, + grpcUtf, + (short) httpStatusCode, + synthetic, + traceRoot, + peerTagsList, + new AggregateMetric()); } - boolean matches(SpanSnapshot s) { - return httpStatusCode == s.httpStatusCode - && synthetic == s.synthetic - && traceRoot == s.traceRoot - && contentEquals(resource, s.resourceName) - && stringContentEquals(service, s.serviceName) - && contentEquals(operationName, s.operationName) - && contentEquals(serviceSource, s.serviceNameSource) - && contentEquals(type, s.spanType) - && stringContentEquals(spanKind, s.spanKind) - && Arrays.equals(peerTagPairsRaw, s.peerTagPairs) - && stringContentEquals(httpMethod, s.httpMethod) - && stringContentEquals(httpEndpoint, s.httpEndpoint) - && stringContentEquals(grpcStatusCode, s.grpcStatusCode); + /** + * Resets every cardinality handler's working set. Must be called on the aggregator thread. + * Existing entries continue to hold their previously-issued {@link UTF8BytesString} references; + * matches via content-equality so snapshots delivered after a reset still resolve to the existing + * entries. + */ + static void resetCardinalityHandlers() { + RESOURCE_HANDLER.reset(); + SERVICE_HANDLER.reset(); + OPERATION_HANDLER.reset(); + SERVICE_SOURCE_HANDLER.reset(); + TYPE_HANDLER.reset(); + SPAN_KIND_HANDLER.reset(); + HTTP_METHOD_HANDLER.reset(); + HTTP_ENDPOINT_HANDLER.reset(); + GRPC_STATUS_CODE_HANDLER.reset(); + PeerTagSchema.resetAll(); } /** - * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no - * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's - * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes - * to the same bucket the snapshot itself looks up. - * - *

Hashes are content-stable across {@code String} / {@code UTF8BytesString}: {@link - * UTF8BytesString#hashCode()} returns the underlying {@code String}'s hash. + * 64-bit lookup hash, computed over UTF8-encoded fields so that cardinality-blocked values (which + * all canonicalize to the same sentinel {@link UTF8BytesString}) collide in the same bucket. + * {@link UTF8BytesString#hashCode()} returns the underlying String hash, so entries built via + * {@link #of} produce the same hash as entries built from a snapshot with matching content. */ - static long hashOf(SpanSnapshot s) { + static long hashOf( + UTF8BytesString resource, + UTF8BytesString service, + UTF8BytesString operationName, + UTF8BytesString serviceSource, + UTF8BytesString type, + UTF8BytesString spanKind, + UTF8BytesString httpMethod, + UTF8BytesString httpEndpoint, + UTF8BytesString grpcStatusCode, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + List peerTags) { long h = 0; - h = LongHashingUtils.addToHash(h, s.resourceName); - h = LongHashingUtils.addToHash(h, s.serviceName); - h = LongHashingUtils.addToHash(h, s.operationName); - h = LongHashingUtils.addToHash(h, s.serviceNameSource); - h = LongHashingUtils.addToHash(h, s.spanType); - h = LongHashingUtils.addToHash(h, s.httpStatusCode); - h = LongHashingUtils.addToHash(h, s.synthetic); - h = LongHashingUtils.addToHash(h, s.traceRoot); - h = LongHashingUtils.addToHash(h, s.spanKind); - if (s.peerTagPairs != null) { - for (String p : s.peerTagPairs) { - h = LongHashingUtils.addToHash(h, p); - } + h = LongHashingUtils.addToHash(h, resource); + h = LongHashingUtils.addToHash(h, service); + h = LongHashingUtils.addToHash(h, operationName); + h = LongHashingUtils.addToHash(h, serviceSource); + h = LongHashingUtils.addToHash(h, type); + h = LongHashingUtils.addToHash(h, httpStatusCode); + h = LongHashingUtils.addToHash(h, synthetic); + h = LongHashingUtils.addToHash(h, traceRoot); + h = LongHashingUtils.addToHash(h, spanKind); + // indexed iteration -- avoids the iterator allocation a for-each over a List would do + int peerTagCount = peerTags.size(); + for (int i = 0; i < peerTagCount; i++) { + h = LongHashingUtils.addToHash(h, peerTags.get(i)); } - h = LongHashingUtils.addToHash(h, s.httpMethod); - h = LongHashingUtils.addToHash(h, s.httpEndpoint); - h = LongHashingUtils.addToHash(h, s.grpcStatusCode); + h = LongHashingUtils.addToHash(h, httpMethod); + h = LongHashingUtils.addToHash(h, httpEndpoint); + h = LongHashingUtils.addToHash(h, grpcStatusCode); return h; } @@ -259,8 +273,8 @@ List getPeerTags() { /** * Equality on the 13 label fields (not on the aggregate). Used only by test mock matchers; the - * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link #matches(SpanSnapshot)} - * and never calls {@code equals}. + * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link Canonical#matches} and + * never calls {@code equals}. */ @Override public boolean equals(Object o) { @@ -270,16 +284,16 @@ public boolean equals(Object o) { return httpStatusCode == that.httpStatusCode && synthetic == that.synthetic && traceRoot == that.traceRoot - && java.util.Objects.equals(resource, that.resource) - && java.util.Objects.equals(service, that.service) - && java.util.Objects.equals(operationName, that.operationName) - && java.util.Objects.equals(serviceSource, that.serviceSource) - && java.util.Objects.equals(type, that.type) - && java.util.Objects.equals(spanKind, that.spanKind) + && Objects.equals(resource, that.resource) + && Objects.equals(service, that.service) + && Objects.equals(operationName, that.operationName) + && Objects.equals(serviceSource, that.serviceSource) + && Objects.equals(type, that.type) + && Objects.equals(spanKind, that.spanKind) && peerTags.equals(that.peerTags) - && java.util.Objects.equals(httpMethod, that.httpMethod) - && java.util.Objects.equals(httpEndpoint, that.httpEndpoint) - && java.util.Objects.equals(grpcStatusCode, that.grpcStatusCode); + && Objects.equals(httpMethod, that.httpMethod) + && Objects.equals(httpEndpoint, that.httpEndpoint) + && Objects.equals(grpcStatusCode, that.grpcStatusCode); } @Override @@ -287,82 +301,175 @@ public int hashCode() { return (int) keyHash; } - // ----- helpers ----- - - private static UTF8BytesString canonicalize( - DDCache cache, CharSequence charSeq) { - if (charSeq == null) { - return EMPTY; - } - if (charSeq instanceof UTF8BytesString) { - return (UTF8BytesString) charSeq; + /** + * Reusable scratch buffer for canonicalizing a {@link SpanSnapshot} into UTF8 fields, computing + * its lookup hash, comparing against existing entries, and building a fresh entry on miss. + * + *

One instance is held by an {@link AggregateTable} and reused on every {@code findOrInsert} + * call. Single-threaded use only. Fields are deliberately mutable -- this is a hot-path scratch + * area, not a value class. + */ + static final class Canonical { + UTF8BytesString resource; + UTF8BytesString service; + UTF8BytesString operationName; + UTF8BytesString serviceSource; // nullable + UTF8BytesString type; + UTF8BytesString spanKind; + UTF8BytesString httpMethod; // nullable + UTF8BytesString httpEndpoint; // nullable + UTF8BytesString grpcStatusCode; // nullable + short httpStatusCode; + boolean synthetic; + boolean traceRoot; + + /** + * Reusable buffer of canonicalized peer-tag UTF8 forms. Cleared and refilled in {@link + * #populate}; on miss, {@link #toEntry} copies it into an immutable list for the entry to own. + * Zero allocation on the hit path. + */ + final ArrayList peerTagsBuffer = new ArrayList<>(4); + + long keyHash; + + /** Canonicalize all fields from {@code s} through the handlers into this buffer. */ + void populate(SpanSnapshot s) { + this.resource = registerOrEmpty(RESOURCE_HANDLER, s.resourceName); + this.service = registerOrEmpty(SERVICE_HANDLER, s.serviceName); + this.operationName = registerOrEmpty(OPERATION_HANDLER, s.operationName); + this.serviceSource = + s.serviceNameSource == null ? null : SERVICE_SOURCE_HANDLER.register(s.serviceNameSource); + this.type = registerOrEmpty(TYPE_HANDLER, s.spanType); + this.spanKind = registerOrEmpty(SPAN_KIND_HANDLER, s.spanKind); + this.httpMethod = s.httpMethod == null ? null : HTTP_METHOD_HANDLER.register(s.httpMethod); + this.httpEndpoint = + s.httpEndpoint == null ? null : HTTP_ENDPOINT_HANDLER.register(s.httpEndpoint); + this.grpcStatusCode = + s.grpcStatusCode == null ? null : GRPC_STATUS_CODE_HANDLER.register(s.grpcStatusCode); + this.httpStatusCode = s.httpStatusCode; + this.synthetic = s.synthetic; + this.traceRoot = s.traceRoot; + populatePeerTags(s.peerTagSchema, s.peerTagValues); + this.keyHash = + hashOf( + resource, + service, + operationName, + serviceSource, + type, + spanKind, + httpMethod, + httpEndpoint, + grpcStatusCode, + httpStatusCode, + synthetic, + traceRoot, + peerTagsBuffer); } - return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); - } - /** UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. */ - private static boolean contentEquals(UTF8BytesString a, CharSequence b) { - if (a == null) { - return b == null; - } - if (b == null) { - return false; - } - // UTF8BytesString.toString() returns the underlying String -- O(1), no allocation. - String aStr = a.toString(); - if (b instanceof String) { - return aStr.equals(b); - } - if (b instanceof UTF8BytesString) { - return aStr.equals(b.toString()); + /** + * Fills {@link #peerTagsBuffer} with canonical UTF8 forms, applying {@code schema.handler(i)} + * to each non-null value at the same index. No allocation when the schema/values are absent or + * all values are null (buffer is just cleared). + */ + private void populatePeerTags(PeerTagSchema schema, String[] values) { + peerTagsBuffer.clear(); + if (schema == null || values == null) { + return; + } + int n = schema.size(); + for (int i = 0; i < n; i++) { + String v = values[i]; + if (v != null) { + peerTagsBuffer.add(schema.handler(i).register(v)); + } + } } - return aStr.contentEquals(b); - } - private static boolean stringContentEquals(UTF8BytesString a, String b) { - if (a == null) { - return b == null; + /** + * Whether this canonicalized snapshot matches the given entry. Compares UTF8 fields via + * content-equality (so an entry surviving a handler reset still matches a freshly-canonicalized + * snapshot of the same content). + */ + boolean matches(AggregateEntry e) { + return httpStatusCode == e.httpStatusCode + && synthetic == e.synthetic + && traceRoot == e.traceRoot + && Objects.equals(resource, e.resource) + && Objects.equals(service, e.service) + && Objects.equals(operationName, e.operationName) + && Objects.equals(serviceSource, e.serviceSource) + && Objects.equals(type, e.type) + && Objects.equals(spanKind, e.spanKind) + && peerTagsEqual(peerTagsBuffer, e.peerTags) + && Objects.equals(httpMethod, e.httpMethod) + && Objects.equals(httpEndpoint, e.httpEndpoint) + && Objects.equals(grpcStatusCode, e.grpcStatusCode); } - return b != null && a.toString().equals(b); - } - private static List materializePeerTags(String[] pairs) { - if (pairs == null || pairs.length == 0) { - return Collections.emptyList(); - } - if (pairs.length == 2) { - return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); + /** Indexed list comparison -- avoids the iterator a {@code List.equals} would allocate. */ + private static boolean peerTagsEqual(List a, List b) { + int n = a.size(); + if (n != b.size()) { + return false; + } + for (int i = 0; i < n; i++) { + if (!a.get(i).equals(b.get(i))) { + return false; + } + } + return true; } - List tags = new ArrayList<>(pairs.length / 2); - for (int i = 0; i < pairs.length; i += 2) { - tags.add(encodePeerTag(pairs[i], pairs[i + 1])); + + /** + * Build a new entry from the currently-populated canonical fields. The peer-tag buffer is + * copied into an immutable list so the entry's reference stays stable across subsequent {@link + * #populate} calls. + */ + AggregateEntry toEntry(AggregateMetric aggregate) { + List snapshottedPeerTags; + int n = peerTagsBuffer.size(); + if (n == 0) { + snapshottedPeerTags = Collections.emptyList(); + } else if (n == 1) { + snapshottedPeerTags = Collections.singletonList(peerTagsBuffer.get(0)); + } else { + snapshottedPeerTags = new ArrayList<>(peerTagsBuffer); + } + return new AggregateEntry( + keyHash, + resource, + service, + operationName, + serviceSource, + type, + spanKind, + httpMethod, + httpEndpoint, + grpcStatusCode, + httpStatusCode, + synthetic, + traceRoot, + snapshottedPeerTags, + aggregate); } - return tags; } - private static UTF8BytesString encodePeerTag(String name, String value) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); - return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + // ----- helpers ----- + + private static UTF8BytesString registerOrEmpty( + PropertyCardinalityHandler handler, CharSequence value) { + return value == null ? UTF8BytesString.EMPTY : handler.register(value); } - /** - * Inverse of {@link #materializePeerTags}: takes pre-encoded UTF8 peer tags and recovers the raw - * {@code [name0, value0, name1, value1, ...]} pairs. Used by the test factory {@link #of}, not by - * the hot path. - */ - private static String[] peerTagsToRawPairs(List peerTags) { - if (peerTags == null || peerTags.isEmpty()) { - return null; + /** Direct {@link UTF8BytesString} creation that bypasses the cardinality handlers. */ + private static UTF8BytesString createUtf8(CharSequence cs) { + if (cs == null) { + return UTF8BytesString.EMPTY; } - String[] pairs = new String[peerTags.size() * 2]; - int i = 0; - for (UTF8BytesString peerTag : peerTags) { - String s = peerTag.toString(); - int colon = s.indexOf(':'); - pairs[i++] = colon < 0 ? s : s.substring(0, colon); - pairs[i++] = colon < 0 ? "" : s.substring(colon + 1); + if (cs instanceof UTF8BytesString) { + return (UTF8BytesString) cs; } - return pairs; + return UTF8BytesString.create(cs.toString()); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java index 08300eab296..38d45ef5e85 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java @@ -4,13 +4,14 @@ import java.util.function.Consumer; /** - * Consumer-side {@link AggregateMetric} store, keyed on the raw fields of a {@link SpanSnapshot}. + * Consumer-side {@link AggregateMetric} store, keyed on the canonical UTF8-encoded labels of a + * {@link SpanSnapshot}. * - *

Replaces the prior {@code LRUCache}. The win is on the - * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise - * {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache - * lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) live on the {@link - * AggregateEntry} itself and are built once per unique key at insert time. + *

{@link #findOrInsert} canonicalizes the snapshot's fields through the cardinality handlers (so + * cardinality-blocked values share a sentinel and collapse into one entry) and then computes the + * lookup hash from that canonical form. Canonicalization runs into a reusable {@link + * AggregateEntry.Canonical} scratch buffer; on a hit nothing is allocated, on a miss the buffer's + * references are copied into a fresh entry and the buffer is overwritten on the next call. * *

Not thread-safe. The aggregator thread is the sole writer; {@link #clear()} must be * routed through the inbox rather than called from arbitrary threads. @@ -19,6 +20,7 @@ final class AggregateTable { private final Hashtable.Entry[] buckets; private final int maxAggregates; + private final AggregateEntry.Canonical canonical = new AggregateEntry.Canonical(); private int size; AggregateTable(int maxAggregates) { @@ -40,12 +42,13 @@ boolean isEmpty() { * the caller should drop the data point in that case. */ AggregateMetric findOrInsert(SpanSnapshot snapshot) { - long keyHash = AggregateEntry.hashOf(snapshot); + canonical.populate(snapshot); + long keyHash = canonical.keyHash; int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash); for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) { if (e.keyHash == keyHash) { AggregateEntry candidate = (AggregateEntry) e; - if (candidate.matches(snapshot)) { + if (canonical.matches(candidate)) { return candidate.aggregate; } } @@ -53,7 +56,7 @@ AggregateMetric findOrInsert(SpanSnapshot snapshot) { if (size >= maxAggregates && !evictOneStale()) { return null; } - AggregateEntry entry = AggregateEntry.forSnapshot(snapshot, new AggregateMetric()); + AggregateEntry entry = canonical.toEntry(new AggregateMetric()); entry.setNext(buckets[bucketIndex]); buckets[bucketIndex] = entry; size++; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index b4fc59d5a1d..8fe25288acd 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -66,10 +66,6 @@ final class Aggregator implements Runnable { this.healthMetrics = healthMetrics; } - public void clearAggregates() { - this.aggregates.clear(); - } - @Override public void run() { Thread currentThread = Thread.currentThread(); @@ -149,6 +145,9 @@ private void report(long when, SignalItem signal) { } dirty = false; } + // Reset cardinality handlers each report cycle so the per-field budgets refresh. + // Safe to call on this (aggregator) thread; handlers are HashMap-based and not thread-safe. + AggregateEntry.resetCardinalityHandlers(); signal.complete(); if (skipped) { log.debug("skipped metrics reporting because no points have changed"); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java similarity index 77% rename from dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java rename to dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java index c675fcb23c4..d08ce611100 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java @@ -2,10 +2,8 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT; import static datadog.trace.api.DDSpanTypes.RPC; -import static datadog.trace.api.DDTags.BASE_SERVICE; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR; @@ -40,14 +38,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class ConflatingMetricsAggregator implements MetricsAggregator, EventListener { +public final class ClientStatsAggregator implements MetricsAggregator, EventListener { - private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class); + private static final Logger log = LoggerFactory.getLogger(ClientStatsAggregator.class); private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; + private static final String SYNTHETICS_ORIGIN = "synthetics"; private static final SpanKindFilter METRICS_ELIGIBLE_KINDS = SpanKindFilter.builder() @@ -76,7 +74,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private volatile AgentTaskScheduler.Scheduled cancellation; - public ConflatingMetricsAggregator( + public ClientStatsAggregator( Config config, SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetrics) { @@ -97,7 +95,7 @@ public ConflatingMetricsAggregator( config.isTraceResourceRenamingEnabled()); } - ConflatingMetricsAggregator( + ClientStatsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, DDAgentFeaturesDiscovery features, @@ -119,7 +117,7 @@ public ConflatingMetricsAggregator( includeEndpointInMetrics); } - ConflatingMetricsAggregator( + ClientStatsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, DDAgentFeaturesDiscovery features, @@ -143,7 +141,7 @@ public ConflatingMetricsAggregator( includeEndpointInMetrics); } - ConflatingMetricsAggregator( + ClientStatsAggregator( Set ignoredResources, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, @@ -244,6 +242,14 @@ public boolean publish(List> trace) { boolean forceKeep = false; int counted = 0; if (features.supportsMetrics()) { + // Sync the peer-aggregation schema once per trace; peer-tag configuration is stable for + // the duration of a single trace publish in production (DDAgentFeaturesDiscovery returns + // the same Set instance until remote-config reconfiguration). + Set eligiblePeerTags = features.peerTags(); + PeerTagSchema peerAggSchema = + (eligiblePeerTags == null || eligiblePeerTags.isEmpty()) + ? null + : PeerTagSchema.currentSyncedTo(eligiblePeerTags); for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); if (shouldComputeMetric(span, isTopLevel)) { @@ -254,7 +260,7 @@ public boolean publish(List> trace) { break; } counted++; - forceKeep |= publish(span, isTopLevel); + forceKeep |= publish(span, isTopLevel, peerAggSchema); } } healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep); @@ -269,7 +275,7 @@ private boolean shouldComputeMetric(CoreSpan span, boolean isTopLevel) { && span.getDurationNano() > 0; } - private boolean publish(CoreSpan span, boolean isTopLevel) { + private boolean publish(CoreSpan span, boolean isTopLevel, PeerTagSchema peerAggSchema) { // Extract HTTP method and endpoint only if the feature is enabled String httpMethod = null; String httpEndpoint = null; @@ -286,14 +292,26 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE); grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null; } - // CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString - // tag values don't trigger a ClassCastException on the String assignment. - final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString(); + // DDSpan resolves this from a cached span.kind ordinal via a small lookup array, skipping a + // tag-map lookup. Other CoreSpan impls fall back to the tag map by default. + String spanKind = span.getSpanKindString(); + if (spanKind == null) { + spanKind = ""; + } boolean error = span.getError() > 0; long tagAndDuration = span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); + PeerTagSchema peerTagSchema = peerTagSchemaFor(span, peerAggSchema); + String[] peerTagValues = + peerTagSchema == null ? null : capturePeerTagValues(span, peerTagSchema); + if (peerTagValues == null) { + // capture returned no non-null values -- drop the schema reference so the consumer doesn't + // bother iterating an all-null array. + peerTagSchema = null; + } + SpanSnapshot snapshot = new SpanSnapshot( span.getResourceName(), @@ -305,7 +323,8 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { isSynthetic(span), span.getParentId() == 0, spanKind, - extractPeerTagPairs(span), + peerTagSchema, + peerTagValues, httpMethod, httpEndpoint, grpcStatusCode, @@ -317,43 +336,45 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return error; } - private String[] extractPeerTagPairs(CoreSpan span) { - if (span.isKind(PEER_AGGREGATION_KINDS)) { - final Set eligiblePeerTags = features.peerTags(); - String[] pairs = null; - int count = 0; - for (String peerTag : eligiblePeerTags) { - Object value = span.unsafeGetTag(peerTag); - if (value != null) { - if (pairs == null) { - // pairs are flattened [name, value, ...]; size for worst case - pairs = new String[eligiblePeerTags.size() * 2]; - } - pairs[count++] = peerTag; - pairs[count++] = value.toString(); + /** + * Picks the peer-tag schema for a span. The {@code peerAggSchema} argument is the per-trace + * cached schema (synced from {@code features.peerTags()} once in {@link #publish(List)}); it's + * {@code null} when no peer tags are configured. For internal-kind spans the static {@link + * PeerTagSchema#INTERNAL} schema is used regardless. + */ + private static PeerTagSchema peerTagSchemaFor(CoreSpan span, PeerTagSchema peerAggSchema) { + if (peerAggSchema != null && span.isKind(PEER_AGGREGATION_KINDS)) { + return peerAggSchema; + } + if (span.isKind(INTERNAL_KIND)) { + return PeerTagSchema.INTERNAL; + } + return null; + } + + /** + * Captures the span's peer tag values into a {@code String[]} parallel to {@code schema.names}. + * Returns {@code null} when none of the configured peer tags are set on the span. + */ + private static String[] capturePeerTagValues(CoreSpan span, PeerTagSchema schema) { + String[] names = schema.names; + int n = names.length; + String[] values = null; + for (int i = 0; i < n; i++) { + Object v = span.unsafeGetTag(names[i]); + if (v != null) { + if (values == null) { + values = new String[n]; } - } - if (pairs == null) { - return null; - } - if (count < pairs.length) { - String[] trimmed = new String[count]; - System.arraycopy(pairs, 0, trimmed, 0, count); - return trimmed; - } - return pairs; - } else if (span.isKind(INTERNAL_KIND)) { - // in this case only the base service should be aggregated if present - final Object baseService = span.unsafeGetTag(BASE_SERVICE); - if (baseService != null) { - return new String[] {BASE_SERVICE, baseService.toString()}; + values[i] = v.toString(); } } - return null; + return values; } private static boolean isSynthetic(CoreSpan span) { - return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); + CharSequence origin = span.getOrigin(); + return origin != null && SYNTHETICS_ORIGIN.contentEquals(origin); } public void stop() { @@ -399,17 +420,16 @@ private void disable() { if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); // Route the clear through the inbox so the aggregator thread is the only writer. - // AggregateTable is not thread-safe; calling clearAggregates() directly from this thread - // would race with Drainer.accept on the aggregator thread. + // AggregateTable is not thread-safe; clearing it directly from this thread would race + // with Drainer.accept on the aggregator thread. inbox.offer(CLEAR); } } - private static final class ReportTask - implements AgentTaskScheduler.Task { + private static final class ReportTask implements AgentTaskScheduler.Task { @Override - public void run(ConflatingMetricsAggregator target) { + public void run(ClientStatsAggregator target) { target.report(); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java index 09464310113..b9530871763 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java @@ -15,7 +15,7 @@ public static MetricsAggregator createMetricsAggregator( HealthMetrics healthMetrics) { if (config.isTracerMetricsEnabled()) { log.debug("tracer metrics enabled"); - return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics); + return new ClientStatsAggregator(config, sharedCommunicationObjects, healthMetrics); } log.debug("tracer metrics disabled"); return NoOpMetricsAggregator.INSTANCE; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java new file mode 100644 index 00000000000..4efaec4a0a2 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java @@ -0,0 +1,122 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.api.DDTags.BASE_SERVICE; + +import java.util.Set; + +/** + * Parallel arrays of peer-tag names and their {@link TagCardinalityHandler}s, indexed in lockstep. + * + *

Replaces the previous {@code Map} lookup with positional array + * access: the producer captures span tag values into a {@code String[]} parallel to {@link #names}, + * and the consumer applies {@link #handler(int)} at the same index to canonicalize. + * + *

Two schemas exist: + * + *

+ * + *

Each {@link SpanSnapshot} captures its own schema reference so producer and consumer agree on + * the indexing even if the current schema is replaced between capture and consumption. + * + *

Thread-safety: {@link #currentSyncedTo} may be called from producer threads; + * replacement of the volatile {@code CURRENT} reference is guarded by a lock. The {@link + * TagCardinalityHandler}s themselves are not thread-safe and must only be exercised on the + * aggregator thread (this is where the snapshot's schema is consumed). + */ +final class PeerTagSchema { + + private static final int VALUE_LIMIT_PER_TAG = 512; + + /** Singleton schema for internal-kind spans -- only {@code base.service}. */ + static final PeerTagSchema INTERNAL = new PeerTagSchema(new String[] {BASE_SERVICE}); + + /** Current schema for peer-aggregation kinds; replaced atomically when peer tag names change. */ + private static volatile PeerTagSchema CURRENT = new PeerTagSchema(new String[0]); + + /** + * Identity cache of the most recently observed {@code features.peerTags()} {@link Set} instance. + * The producer hot path checks this first and skips the {@code names}-vs-set comparison when the + * caller's set instance hasn't changed. In production this is the common case -- {@code + * DDAgentFeaturesDiscovery} returns the same Set instance until reconfiguration. + */ + private static volatile Set LAST_SYNCED_INPUT; + + final String[] names; + final TagCardinalityHandler[] handlers; + + private PeerTagSchema(String[] names) { + this.names = names; + this.handlers = new TagCardinalityHandler[names.length]; + for (int i = 0; i < names.length; i++) { + this.handlers[i] = new TagCardinalityHandler(names[i], VALUE_LIMIT_PER_TAG); + } + } + + /** + * Returns the current peer-aggregation schema, lazily refreshing it if the supplied {@code + * peerTagNames} differ from the cached set. Designed to be called from the producer hot path: the + * common case is a single volatile read and an array-length / set-contains comparison. + */ + static PeerTagSchema currentSyncedTo(Set peerTagNames) { + // Fast path: same Set instance as the last sync -> the cached schema is still valid, no + // matches() loop needed. In production this is the steady-state case. + if (peerTagNames == LAST_SYNCED_INPUT) { + return CURRENT; + } + PeerTagSchema cur = CURRENT; + if (matches(cur.names, peerTagNames)) { + LAST_SYNCED_INPUT = peerTagNames; + return cur; + } + synchronized (PeerTagSchema.class) { + cur = CURRENT; + if (!matches(cur.names, peerTagNames)) { + cur = new PeerTagSchema(peerTagNames.toArray(new String[0])); + CURRENT = cur; + } + LAST_SYNCED_INPUT = peerTagNames; + return cur; + } + } + + /** Resets the working sets of {@link #INTERNAL} and {@link #current()}. */ + static void resetAll() { + PeerTagSchema cur = CURRENT; + for (TagCardinalityHandler h : cur.handlers) { + h.reset(); + } + for (TagCardinalityHandler h : INTERNAL.handlers) { + h.reset(); + } + } + + int size() { + return names.length; + } + + String name(int i) { + return names[i]; + } + + TagCardinalityHandler handler(int i) { + return handlers[i]; + } + + private static boolean matches(String[] cur, Set set) { + if (cur.length != set.size()) { + return false; + } + for (String n : cur) { + if (!set.contains(n)) { + return false; + } + } + return true; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java new file mode 100644 index 00000000000..61560a32a71 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java @@ -0,0 +1,45 @@ +package datadog.trace.common.metrics; + +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.HashMap; + +public final class PropertyCardinalityHandler { + private final int cardinalityLimit; + + private final HashMap curUtf8s; + + private UTF8BytesString cacheBlocked = null; + + public PropertyCardinalityHandler(int cardinalityLimit) { + this.cardinalityLimit = cardinalityLimit; + + // pre-sizing properly to avoid rehashing + this.curUtf8s = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1); + } + + public UTF8BytesString register(CharSequence value) { + if (this.curUtf8s.size() >= this.cardinalityLimit) { + return this.blockedByTracer(); + } + + UTF8BytesString existingUtf8 = this.curUtf8s.get(value); + if (existingUtf8 != null) return existingUtf8; + + // TODO: maybe use a fallback cache to reduce allocations across reset cycles + UTF8BytesString newUtf8 = UTF8BytesString.create(value); + this.curUtf8s.put(value, newUtf8); + return newUtf8; + } + + private UTF8BytesString blockedByTracer() { + UTF8BytesString cacheBlocked = this.cacheBlocked; + if (cacheBlocked != null) return cacheBlocked; + + this.cacheBlocked = cacheBlocked = UTF8BytesString.create("blocked_by_tracer"); + return cacheBlocked; + } + + public void reset() { + this.curUtf8s.clear(); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java index b7f81712945..5967c1302c7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -21,10 +21,18 @@ final class SpanSnapshot implements InboxItem { final String spanKind; /** - * Flattened name/value pairs of peer-tag matches: {@code [name0, value0, name1, value1, ...]}. - * {@code null} when there are no matches (the common case). + * Schema for {@link #peerTagValues}. {@code null} when the span has no peer tags. The schema + * carries the names + {@link TagCardinalityHandler}s in parallel array form; {@code + * peerTagValues} holds the per-span tag values at the same indices. */ - final String[] peerTagPairs; + final PeerTagSchema peerTagSchema; + + /** + * Peer tag values captured from the span, parallel to {@code peerTagSchema.names}. A {@code null} + * entry means the span didn't have that peer tag set. {@code null} (the whole array) when {@link + * #peerTagSchema} is {@code null}. + */ + final String[] peerTagValues; final String httpMethod; final String httpEndpoint; @@ -43,7 +51,8 @@ final class SpanSnapshot implements InboxItem { boolean synthetic, boolean traceRoot, String spanKind, - String[] peerTagPairs, + PeerTagSchema peerTagSchema, + String[] peerTagValues, String httpMethod, String httpEndpoint, String grpcStatusCode, @@ -57,7 +66,8 @@ final class SpanSnapshot implements InboxItem { this.synthetic = synthetic; this.traceRoot = traceRoot; this.spanKind = spanKind; - this.peerTagPairs = peerTagPairs; + this.peerTagSchema = peerTagSchema; + this.peerTagValues = peerTagValues; this.httpMethod = httpMethod; this.httpEndpoint = httpEndpoint; this.grpcStatusCode = grpcStatusCode; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java new file mode 100644 index 00000000000..1fdfed5c7c4 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java @@ -0,0 +1,46 @@ +package datadog.trace.common.metrics; + +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.HashMap; + +public final class TagCardinalityHandler { + private final String tag; + private final int cardinalityLimit; + + private final HashMap curUtf8Pairs; + + private UTF8BytesString cacheBlocked = null; + + public TagCardinalityHandler(String tag, int cardinalityLimit) { + this.tag = tag; + this.cardinalityLimit = cardinalityLimit; + + // pre-sizing properly to avoid rehashing + this.curUtf8Pairs = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1); + } + + public UTF8BytesString register(String value) { + if (this.curUtf8Pairs.size() >= this.cardinalityLimit) { + return this.blockedByTracer(); + } + + UTF8BytesString existing = this.curUtf8Pairs.get(value); + if (existing != null) return existing; + + UTF8BytesString newPair = UTF8BytesString.create(this.tag + ":" + value); + this.curUtf8Pairs.put(value, newPair); + return newPair; + } + + private UTF8BytesString blockedByTracer() { + UTF8BytesString cacheBlocked = this.cacheBlocked; + if (cacheBlocked != null) return cacheBlocked; + + this.cacheBlocked = cacheBlocked = UTF8BytesString.create(this.tag + ":blocked_by_tracer"); + return cacheBlocked; + } + + public void reset() { + this.curUtf8Pairs.clear(); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java index 7d183670883..810b13884de 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java @@ -82,6 +82,16 @@ default U unsafeGetTag(CharSequence name) { boolean isKind(SpanKindFilter filter); + /** + * Returns the {@code span.kind} tag value as a String, or {@code null} if not set. Default + * implementation reads the tag map; {@link DDSpan} overrides to use a cached ordinal that + * resolves via a small lookup array, skipping the tag-map lookup on the hot path. + */ + default String getSpanKindString() { + Object v = unsafeGetTag(datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND); + return v == null ? null : v.toString(); + } + CharSequence getType(); /** diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index 4c438e1c915..943776e7577 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -963,6 +963,11 @@ public boolean isKind(SpanKindFilter filter) { return filter.matches(context.getSpanKindOrdinal()); } + @Override + public String getSpanKindString() { + return context.getSpanKindString(); + } + @Override public void copyPropagationAndBaggage(final AgentSpan source) { if (source instanceof DDSpan) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 76051645fcb..db384a7e42e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -382,7 +382,7 @@ private static class Flush implements AgentTaskScheduler.Task empty = new HashSet<>() @@ -35,7 +35,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( + ClientStatsAggregator aggregator = new ClientStatsAggregator( wellKnownTags, empty, features, @@ -65,7 +65,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( + ClientStatsAggregator aggregator = new ClientStatsAggregator( wellKnownTags, [ignoredResourceName].toSet(), features, @@ -103,7 +103,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -149,7 +149,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -195,7 +195,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -253,14 +253,17 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "client" | "GET" | "/external/api" | true } - def "should create bucket for each set of peer tags"() { + def "should create separate buckets for distinct peer tag values"() { + // Peer-tag NAMES are configured per-tracer and stable for the duration of a trace publish; + // peer-tag VALUES vary per-span. Two spans with the same names but different values should + // produce two distinct aggregate buckets. setup: MetricWriter writer = Mock(MetricWriter) Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.peerTags() >>> [["country"], ["country", "georegion"],] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features.peerTags() >> ["country", "georegion"] + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -270,7 +273,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"), new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) - .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe") + .setTag(SPAN_KIND, "client").setTag("country", "germany").setTag("georegion", "europe") ]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) @@ -289,7 +292,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "client", - [UTF8BytesString.create("country:france")], + [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")], null, null, null @@ -307,7 +310,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { false, false, "client", - [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")], + [UTF8BytesString.create("country:germany"), UTF8BytesString.create("georegion:europe")], null, null, null @@ -327,7 +330,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -380,7 +383,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -432,7 +435,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) long duration = 100 List trace = [ @@ -504,7 +507,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -631,7 +634,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -746,7 +749,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -816,7 +819,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -888,7 +891,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -956,7 +959,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] HealthMetrics healthMetrics = Mock(HealthMetrics) - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -990,7 +993,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] HealthMetrics healthMetrics = Mock(HealthMetrics) - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1035,7 +1038,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1137,7 +1140,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1197,7 +1200,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1248,7 +1251,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1279,7 +1282,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { MetricWriter writer = Mock(MetricWriter) Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) aggregator.start() @@ -1301,7 +1304,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> false features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, false) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) @@ -1333,7 +1336,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) when: @@ -1366,7 +1369,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1413,7 +1416,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1468,7 +1471,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -1559,7 +1562,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1632,14 +1635,14 @@ class ConflatingMetricAggregatorTest extends DDSpecification { aggregator.close() } - def reportAndWaitUntilEmpty(ConflatingMetricsAggregator aggregator) { + def reportAndWaitUntilEmpty(ClientStatsAggregator aggregator) { waitUntilEmpty(aggregator) aggregator.report() waitUntilEmpty(aggregator) } - def waitUntilEmpty(ConflatingMetricsAggregator aggregator) { + def waitUntilEmpty(ClientStatsAggregator aggregator) { int i = 0 while (!aggregator.inbox.isEmpty() && i++ < 100) { Thread.sleep(10) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index eceedeb1935..86a91c23b3f 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -37,7 +37,7 @@ class FootprintForkedTest extends DDSpecification { it.supportsMetrics() >> true it.peerTags() >> [] } - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( + ClientStatsAggregator aggregator = new ClientStatsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, features, diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy index 07f246bf9a9..dc9eb86fde3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy @@ -28,6 +28,6 @@ class MetricsAggregatorFactoryTest extends DDSpecification { expect: def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco, HealthMetrics.NO_OP, ) - assert aggregator instanceof ConflatingMetricsAggregator + assert aggregator instanceof ClientStatsAggregator } } diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index 44f2b36cb6b..7a4f84c30dd 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -87,6 +87,27 @@ void peerTagPairsParticipateInIdentity() { assertEquals(3, table.size()); } + @Test + void cardinalityBlockedValuesCollapseIntoOneEntry() { + // SERVICE_HANDLER has a cardinality limit of 32. With 50 distinct service names, services 33+ + // canonicalize to the "blocked_by_tracer" sentinel. Because the table hashes from the canonical + // (post-handler) form, all blocked services land in the same bucket and merge into a single + // entry rather than fragmenting. + AggregateEntry.resetCardinalityHandlers(); + AggregateTable table = new AggregateTable(128); + + for (int i = 0; i < 50; i++) { + AggregateMetric agg = table.findOrInsert(snapshot("svc-" + i, "op", "client")); + assertNotNull(agg); + agg.recordOneDuration(1L); + } + + // 32 in-budget services + 1 collapsed "blocked_by_tracer" entry = 33 total. + assertEquals(33, table.size()); + + AggregateEntry.resetCardinalityHandlers(); + } + @Test void capOverrunEvictsStaleEntry() { AggregateTable table = new AggregateTable(2); @@ -199,7 +220,8 @@ private static final class SnapshotBuilder { private final String service; private final String operation; private final String spanKind; - private String[] peerTagPairs; + private PeerTagSchema peerTagSchema; + private String[] peerTagValues; private long tagAndDuration = 0L; SnapshotBuilder(String service, String operation, String spanKind) { @@ -209,7 +231,23 @@ private static final class SnapshotBuilder { } SnapshotBuilder peerTags(String... namesAndValues) { - this.peerTagPairs = namesAndValues; + // Build a schema from the (name, value, name, value, ...) input. Synced through the + // production singleton so canonicalization actually goes through the same handlers the + // aggregator would use in production -- which is the surface the test wants to exercise. + java.util.LinkedHashSet names = new java.util.LinkedHashSet<>(); + for (int i = 0; i < namesAndValues.length; i += 2) { + names.add(namesAndValues[i]); + } + this.peerTagSchema = PeerTagSchema.currentSyncedTo(names); + this.peerTagValues = new String[peerTagSchema.size()]; + for (int i = 0; i < namesAndValues.length; i += 2) { + for (int j = 0; j < peerTagSchema.size(); j++) { + if (peerTagSchema.name(j).equals(namesAndValues[i])) { + peerTagValues[j] = namesAndValues[i + 1]; + break; + } + } + } return this; } @@ -224,7 +262,8 @@ SpanSnapshot build() { false, true, spanKind, - peerTagPairs, + peerTagSchema, + peerTagValues, null, null, null, diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java new file mode 100644 index 00000000000..bbdffb6061a --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java @@ -0,0 +1,88 @@ +package datadog.trace.common.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; + +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import org.junit.jupiter.api.Test; + +class CardinalityHandlerTest { + + @Test + void propertyReturnsSameInstanceForRepeatedValueUntilLimit() { + PropertyCardinalityHandler h = new PropertyCardinalityHandler(3); + UTF8BytesString a1 = h.register("a"); + UTF8BytesString a2 = h.register("a"); + assertSame(a1, a2); + assertEquals("a", a1.toString()); + } + + @Test + void propertyOverLimitReturnsBlockedSentinel() { + PropertyCardinalityHandler h = new PropertyCardinalityHandler(2); + UTF8BytesString a = h.register("a"); + UTF8BytesString b = h.register("b"); + UTF8BytesString blocked1 = h.register("c"); + UTF8BytesString blocked2 = h.register("d"); + + assertEquals("blocked_by_tracer", blocked1.toString()); + assertSame(blocked1, blocked2); // same sentinel for all overflow values + assertNotSame(blocked1, a); + assertNotSame(blocked1, b); + } + + @Test + void propertyResetRefreshesBudget() { + PropertyCardinalityHandler h = new PropertyCardinalityHandler(2); + h.register("a"); + h.register("b"); + UTF8BytesString blocked = h.register("c"); + assertEquals("blocked_by_tracer", blocked.toString()); + + h.reset(); + + // After reset, three distinct values fit again, but the previous instances aren't reused. + UTF8BytesString afterReset = h.register("a"); + assertEquals("a", afterReset.toString()); + UTF8BytesString c = h.register("c"); + assertEquals("c", c.toString()); + UTF8BytesString blockedAgain = h.register("d"); + UTF8BytesString blockedYetAgain = h.register("e"); + assertEquals("blocked_by_tracer", blockedAgain.toString()); + assertSame(blockedAgain, blockedYetAgain); + } + + @Test + void tagPrefixesValuesAndReusesUnderLimit() { + TagCardinalityHandler h = new TagCardinalityHandler("peer.hostname", 4); + UTF8BytesString first = h.register("host-a"); + UTF8BytesString second = h.register("host-a"); + UTF8BytesString other = h.register("host-b"); + + assertSame(first, second); + assertNotSame(first, other); + assertEquals("peer.hostname:host-a", first.toString()); + assertEquals("peer.hostname:host-b", other.toString()); + } + + @Test + void tagOverLimitReturnsTaggedSentinel() { + TagCardinalityHandler h = new TagCardinalityHandler("peer.service", 1); + h.register("svc-1"); + UTF8BytesString blocked = h.register("svc-2"); + assertEquals("peer.service:blocked_by_tracer", blocked.toString()); + } + + @Test + void tagResetRefreshesBudgetAndSentinelStaysStable() { + TagCardinalityHandler h = new TagCardinalityHandler("x", 1); + h.register("v1"); + UTF8BytesString blockedBefore = h.register("v2"); + h.reset(); + h.register("v1"); + UTF8BytesString blockedAfter = h.register("v2"); + // Both are the same sentinel instance (cacheBlocked is not cleared on reset). + assertSame(blockedBefore, blockedAfter); + } +} diff --git a/docs/client_metrics_design.md b/docs/client_metrics_design.md new file mode 100644 index 00000000000..489763fd413 --- /dev/null +++ b/docs/client_metrics_design.md @@ -0,0 +1,308 @@ +# Client-side metrics (stats aggregator) design + +This document describes the design of the **client-side metrics pipeline** that +lives under `dd-trace-core/.../common/metrics/`. The pipeline aggregates per-span +duration / count / error statistics on the tracer and sends rolled-up "client +stats" payloads to the Datadog Agent on a fixed reporting interval, so the agent +does not have to sample every span to know request rates and latencies. + +Code lives in package `datadog.trace.common.metrics`. + +## High-level shape + +``` + producer thread(s) aggregator thread + inbox + trace ─▶ ClientStatsAggregator.publish(trace) ──MPSC──▶ Aggregator.run + │ │ + │ per metrics-eligible span │ Drainer.accept + │ │ + │ allocates one SpanSnapshot ▼ + │ (immutable, ~15 refs) AggregateTable.findOrInsert + │ │ + │ inbox.offer(snapshot) │ canonicalize → hash + └────────────────────────────────────▶ │ → lookup or insert + │ + scheduled REPORT signal ──▶│ + │ Aggregator.report + │ → MetricWriter.add(entry) + │ → OkHttpSink (HTTP POST) + │ → reset cardinality handlers +``` + +Three rules govern the design: + +1. **The producer never touches shared state.** The hot path on the application + thread builds an immutable `SpanSnapshot` and offers it to a bounded MPSC + queue. No locks, no maps, no hashing of the metric key. +2. **The aggregator thread is the sole writer of every shared structure.** The + aggregate table, the cardinality handlers, the metric writer state — all of + them are accessed only from that thread. Control operations (clear, report, + stop) are themselves enqueued as `SignalItem`s so they serialize with data. +3. **Cardinality is bounded.** Per-field handlers cap the unique values; once a + field's budget is exhausted, overflow values collapse into a single + `blocked_by_tracer` sentinel so the aggregate table can't blow up. + +## Component map + +| Component | File | Role | +|---|---|---| +| `ClientStatsAggregator` | `ClientStatsAggregator.java` | Producer facade. Decides which spans are eligible, builds `SpanSnapshot`s, offers them to the inbox. Also owns the agent-feature check, the scheduled report timer, and the agent-downgrade handler. | +| `SpanSnapshot` | `SpanSnapshot.java` | Immutable, allocation-pooled-by-GC value posted from producer → aggregator. Carries raw label fields plus a duration word with `TOP_LEVEL` / `ERROR` bits OR-ed in. | +| `PeerTagSchema` | `PeerTagSchema.java` | Parallel `String[] names` + `TagCardinalityHandler[] handlers` describing the peer-aggregation tags in effect. One singleton for internal-kind spans; one volatile "current" schema for client/producer/consumer spans, refreshed from `DDAgentFeaturesDiscovery.peerTags()`. | +| `Aggregator` | `Aggregator.java` | Consumer thread `Runnable`. Drains the inbox; dispatches `SpanSnapshot`s into `AggregateTable`; processes signals (`REPORT`, `CLEAR`, `STOP`); calls the writer on report. | +| `AggregateTable` | `AggregateTable.java` | Hashtable-backed store keyed on the canonicalized labels. Owns a single reusable `Canonical` scratch buffer. Handles cap-overflow by evicting one stale entry or rejecting new ones. | +| `AggregateEntry` | `AggregateEntry.java` | `Hashtable.Entry` holding the 13 UTF8 label fields + the mutable `AggregateMetric`. Owns the static `PropertyCardinalityHandler`s for the fixed label fields, and `Canonical` for hot-path canonicalization. | +| `AggregateMetric` | `AggregateMetric.java` | Per-bucket accumulator: hit count, error count, top-level count, duration sum, ok/error latency histograms. Single-threaded; cleared each report. | +| `PropertyCardinalityHandler` | `PropertyCardinalityHandler.java` | Per-field UTF8 interner with a max-unique-values cap. Returns a `blocked_by_tracer` sentinel `UTF8BytesString` once the cap is hit. Reset by the aggregator each cycle. | +| `TagCardinalityHandler` | `TagCardinalityHandler.java` | Same pattern as the property handler, but the cached UTF8 form is the full `tag:value` pair (peer tags are wire-encoded as `tag:value`, not just the value). | +| `SerializingMetricWriter` / `OkHttpSink` | `SerializingMetricWriter.java`, `OkHttpSink.java` | Wire serialization (MessagePack) + HTTP POST to the agent's `/v0.6/stats` endpoint. | +| `MetricsAggregatorFactory` / `NoOpMetricsAggregator` | factory + no-op | Picks the real implementation when client stats are enabled and the agent supports the endpoint, no-op otherwise. | + +## Producer-side flow (`ClientStatsAggregator.publish`) + +The producer holds **no shared state**. Per trace it: + +1. Snapshots the current peer-aggregation schema **once per trace** (not per + span): + ```java + Set eligiblePeerTags = features.peerTags(); + PeerTagSchema peerAggSchema = + (eligiblePeerTags == null || eligiblePeerTags.isEmpty()) + ? null + : PeerTagSchema.currentSyncedTo(eligiblePeerTags); + ``` + `currentSyncedTo` has a fast path: identity-equal to the previously-synced + `Set` instance → return the cached schema (the common case, since + `DDAgentFeaturesDiscovery` returns the same `Set` until remote-config + reconfiguration). The cached schema is `volatile`; replacement is guarded by + a `synchronized` block. + +2. Iterates the trace; for each metrics-eligible span: + + - **Eligibility** (`shouldComputeMetric`): + ```java + (measured || isTopLevel || isKind(SERVER|CLIENT|PRODUCER|CONSUMER)) + && longRunningVersion <= 0 + && durationNano > 0 + ``` + `isMeasured` / `isTopLevel` are flag reads on `DDSpanContext`; `isKind` + reads the **cached `byte` span-kind ordinal** through a `SpanKindFilter` + bitmask test — no tag-map lookup. + + - **Resource-name ignore-list** breaks out of the trace early; the entire + trace is dropped on a match. + + - **Picks the peer-tag schema** (`peerTagSchemaFor`): for client/producer/ + consumer kinds → `peerAggSchema` (already synced for this trace); for + internal-kind spans → `PeerTagSchema.INTERNAL` (single `base.service` + entry); otherwise `null`. + + - **Captures peer-tag *values***, not pairs: walks `schema.names` and pulls + `unsafeGetTag(name)` for each, into a parallel `String[]`. Names + handlers + are the schema's job; the producer only carries raw values. Returns `null` + when no peer tags are set, in which case the schema reference is dropped + too so the consumer doesn't loop over an all-null array. + + - **Builds and offers** a `SpanSnapshot` to the MPSC inbox. The span-kind + string is taken from `CoreSpan.getSpanKindString()`, which DDSpan + overrides to resolve via the cached byte ordinal through a small lookup + array — **no tag-map lookup**. Origin equality uses `contentEquals`. + `httpMethod` / `httpEndpoint` are only fetched when + `traceClientStatsEndpoints=true`; `grpcStatusCode` only when span type is + `rpc`. + + - On inbox-full: the snapshot is dropped and `healthMetrics.onStatsInboxFull()` + fires. The producer never blocks. + +3. Reports `healthMetrics.onClientStatTraceComputed(counted, total, dropped)`. + + `forceKeep` is the only signal returned upward — `true` if any of the + trace's metrics-eligible spans had errors, so the trace writer keeps the + raw trace too. + +### Why the producer is lean + +The cumulative cost of running these checks on every finished span is the +single biggest concern. The producer deliberately avoids: + +- locking or synchronization of any kind on the hot path, +- hashing the metric key (deferred to the aggregator thread), +- map / cache lookups for label canonicalization (deferred), +- tag-map lookups when a span carries the relevant information on the context + itself (`span.kind` via the cached byte ordinal; `isMeasured`, `isTopLevel` + via flag reads), +- allocation beyond the `SpanSnapshot` itself and a single `String[]` for peer + tag values when any are present. + +## Aggregator-side flow (`Aggregator.run`) + +A single agent thread runs the `Aggregator.run` loop. The thread drains the +inbox via `inbox.drain(drainer)`; when the queue is empty it sleeps +`DEFAULT_SLEEP_MILLIS` (10 ms) and retries. The Drainer dispatches by item +type: + +- `SpanSnapshot` → `AggregateTable.findOrInsert(snapshot)` returns either an + existing or freshly-inserted `AggregateMetric`, then the snapshot's + `tagAndDuration` is recorded. If the table is at capacity and no stale entry + can be evicted, `healthMetrics.onStatsAggregateDropped()` fires. + +- `ReportSignal` → on the scheduled cadence (the default report interval is + 10 s; configurable via `tracerMetricsMaxAggregates` / reporting interval), + `Aggregator.report`: + 1. Expunges entries with `hitCount == 0` (stale). + 2. If anything remains, opens a bucket via `MetricWriter.startBucket(...)`, + walks `AggregateTable.forEach`, writes each entry, clears its metric. + 3. Calls `MetricWriter.finishBucket()` (which may do I/O and block). + 4. **Resets all cardinality handlers** so the next interval starts with a + fresh budget. Existing entries keep their previously-issued UTF8 + references, and matching is by content-equality, so canonicalizing a + post-reset snapshot against an existing entry still resolves to the + same bucket. + +- `ClearSignal` → drops the aggregate state. The downgrade handler + (`onEvent(DOWNGRADED, ...)`) offers `CLEAR` to the inbox rather than calling + `clearAggregates()` directly, so the aggregator thread remains the sole + writer of the table. + +- `StopSignal` → final report + thread exit. + +## The canonical-key trick (cardinality-safe deduplication) + +The lookup hash is computed from the **canonicalized** label fields, not the +raw `SpanSnapshot` fields. This is the property that makes +cardinality-blocking actually save space: + +```java +// AggregateTable.findOrInsert +canonical.populate(snapshot); // runs every field through its handler +long keyHash = canonical.keyHash; +int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash); +for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) { + if (e.keyHash == keyHash) { + AggregateEntry candidate = (AggregateEntry) e; + if (canonical.matches(candidate)) { + return candidate.aggregate; + } + } +} +// miss → toEntry, splice into bucket head +``` + +`Canonical.populate` runs each label field through its +`PropertyCardinalityHandler` (or `TagCardinalityHandler` for peer tags). Once a +handler's working set is full, **every subsequent unique value resolves to the +same `UTF8BytesString` sentinel** — so the hash computed from the canonical +form is identical for all blocked values. They land in the same bucket and +merge into one `AggregateEntry` rather than fragmenting into N entries. + +The `Canonical` scratch buffer is reused per `findOrInsert` call. On a hit, +nothing is allocated. On a miss, `toEntry` snapshots the buffer's references +into a fresh entry; the buffer is overwritten on the next call. + +### Hash chain (no varargs) + +`AggregateEntry.hashOf` uses chained primitive calls into +`LongHashingUtils.addToHash(long, T)` rather than a varargs `addToHash(long, +Object...)`. This avoids the `Object[]` allocation and boxing of the primitive +fields (`httpStatusCode`, `synthetic`, `traceRoot`) that varargs would force. + +## Reporting cadence and cardinality reset + +Two distinct cadences: + +- **Reporting interval** (default 10 s): when the report timer fires, + `ReportTask` calls `report()` which `inbox.offer(REPORT)`. The aggregator + drains up to that signal, then writes the bucket and resets the cardinality + handlers. The handlers reset *every reporting cycle*, so the per-field + budgets refresh. + +- **Schema sync**: `PeerTagSchema.currentSyncedTo` runs on the producer thread + per trace, with an identity-check fast path. The schema reference is + replaced atomically when remote-config reconfigures the peer-tag set. + +## Memory and lifetime + +- `AggregateMetric` is **not thread-safe**. It is mutated only by the + aggregator thread. +- `AggregateTable` is **not thread-safe**. All paths (producer-side `CLEAR`, + schedule-driven `REPORT`, drainer-driven inserts) route through the inbox. +- `Canonical` and the cardinality handlers are aggregator-thread-only. +- `PeerTagSchema.CURRENT` is `volatile` with `synchronized` replacement; the + schema's `TagCardinalityHandler`s themselves are aggregator-thread-only and + are reset alongside the property handlers each cycle. +- Entries retain their `UTF8BytesString` references across handler resets; + matches via content-equality so post-reset snapshots still resolve. +- Cap: `tracerMetricsMaxAggregates` bounds table size. Cap-overrun policy: + evict one stale entry (`hitCount == 0`) or drop the new data point. + +## Health metrics + +The producer reports per-trace stats via `HealthMetrics`: + +- `onClientStatTraceComputed(counted, totalSpans, dropped)` — per `publish`. +- `onStatsInboxFull()` — when the MPSC queue rejects an offer. +- `onClientStatPayloadSent()` / `onClientStatDowngraded()` / + `onClientStatErrorReceived()` — on agent-side outcomes. +- `onStatsAggregateDropped()` — when the aggregator thread can't fit a new + entry. + +## Failure modes + +| Failure | Effect | +|---|---| +| Inbox full | Snapshot dropped, `onStatsInboxFull` increments, producer continues. | +| Agent unavailable / errors | `OkHttpSink` reports `BAD_PAYLOAD` / `ERROR`; metric reporting continues. | +| Agent downgrade (no /v0.6/stats) | `disable()` offers `CLEAR` to the inbox; the aggregator wipes its table. Producer's `features.supportsMetrics()` returns false on subsequent calls, so new snapshots are not built. | +| Aggregate table full, no stale entry | New snapshot dropped, `onStatsAggregateDropped` increments. Existing entries continue to accumulate. | +| Cardinality budget exhausted | Overflow values canonicalize to a `blocked_by_tracer` sentinel and merge into one bucket. Total entry count stays bounded by `maxAggregates`. | +| Producer throws mid-trace | Caught by the writer's normal error path; `onClientStatTraceComputed` is not called for that trace. | + +## Why the redesign (history) + +The pipeline was previously `ConflatingMetricsAggregator` with: + +- producer-side `MetricKey` construction (string-canonicalization on the hot + path), +- a `LRUCache` of `MetricKey → AggregateMetric`, +- per-tag `DDCache` instances for canonicalization (one per label field), +- early computation of `tag:value` peer pairs on the producer thread. + +The current `ClientStatsAggregator` shape was motivated by JMH benchmarks that +showed the producer dominating CPU time. The major shifts: + +1. **Move all canonicalization off the producer.** Producer just shuffles + references into a `SpanSnapshot`. +2. **Replace `MetricKey` with inlined fields on `AggregateEntry`.** Removes a + per-snapshot allocation; lets us own the hash code on the entry itself. +3. **Replace the `LRUCache` with a `Hashtable`** keyed on canonicalized labels. + Hash is computed once per insert/lookup; chained primitive hashing avoids + boxing. +4. **Replace per-tag `DDCache`s with per-field `PropertyCardinalityHandler`s** + that share a `blocked_by_tracer` sentinel for cardinality overflow. Reset + each reporting cycle. +5. **Capture peer-tag values, not pairs.** Tag-name + handler live on + `PeerTagSchema`; the producer carries values in a parallel `String[]`. The + aggregator does the `tag:value` interning via `TagCardinalityHandler` on + its own thread. +6. **Sync peer-tag schema once per trace.** `currentSyncedTo` has an + identity-check fast path; the steady-state cost is one volatile read. +7. **Single owner of all shared state.** `disable()` routes through `CLEAR` + rather than mutating the aggregate table directly. + +### Benchmark summary + +`ClientStatsAggregatorDDSpanBenchmark` (64 client-kind DDSpans per op, single +trace, real `CoreTracer` with a no-op writer): + +| Variant | µs/op | +|---|---| +| master (`ConflatingMetricsAggregator`, baseline) | 6.428 | +| with `SpanSnapshot` + background aggregation | 2.454 | +| with peer-tag schema hoist | 2.410 | +| with cached span-kind ordinal + isSynthetic fix | 1.995 | + +The remaining producer-thread hotspots (from JFR sampling) are tag-map +lookups for `peer.hostname` / other peer-tag values inside +`capturePeerTagValues`. A bulk peer-tag accessor on `DDSpan` would crack that +chunk further, but is a structural change beyond the current package.