diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java new file mode 100644 index 00000000000..e2fda9fde47 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -0,0 +1,368 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; + +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.util.Hashtable; +import datadog.trace.util.LongHashingUtils; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** + * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data + * {@link SerializingMetricWriter} writes to the wire) plus the mutable {@link AggregateMetric}. + * + *

{@link #matches(SpanSnapshot)} compares the entry's stored UTF8 forms against the snapshot's + * raw {@code CharSequence}/{@code String}/{@code String[]} fields via content-equality, so {@code + * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry + * instead of splitting. + * + *

The static UTF8 caches that used to live on {@code MetricKey} and {@code + * ConflatingMetricsAggregator} are consolidated here. + */ +final class AggregateEntry extends Hashtable.Entry { + + // UTF8 caches consolidated from the previous MetricKey + ConflatingMetricsAggregator split. + private static final DDCache RESOURCE_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache SERVICE_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache OPERATION_CACHE = + DDCaches.newFixedSizeCache(64); + private static final DDCache SERVICE_SOURCE_CACHE = + DDCaches.newFixedSizeCache(16); + private static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8); + private static final DDCache SPAN_KIND_CACHE = + DDCaches.newFixedSizeCache(16); + private static final DDCache HTTP_METHOD_CACHE = + DDCaches.newFixedSizeCache(8); + private static final DDCache HTTP_ENDPOINT_CACHE = + DDCaches.newFixedSizeCache(32); + private static final DDCache GRPC_STATUS_CODE_CACHE = + DDCaches.newFixedSizeCache(32); + + /** + * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner + * cache produces the "name:value" encoded form the serializer writes. + */ + private static final DDCache< + String, Pair, Function>> + PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64); + + private static final Function< + String, Pair, Function>> + PEER_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); + + private final UTF8BytesString resource; + private final UTF8BytesString service; + private final UTF8BytesString operationName; + private final UTF8BytesString serviceSource; // nullable + private final UTF8BytesString type; + private final UTF8BytesString spanKind; + private final UTF8BytesString httpMethod; // nullable + private final UTF8BytesString httpEndpoint; // nullable + private final UTF8BytesString grpcStatusCode; // nullable + private final short httpStatusCode; + private final boolean synthetic; + private final boolean traceRoot; + + // Peer tags carried in two forms: raw String[] for matches() against the snapshot's pairs, + // and pre-encoded List ("name:value") for the serializer. + private final String[] peerTagPairsRaw; + private final List peerTags; + + final AggregateMetric aggregate; + + /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */ + private AggregateEntry(SpanSnapshot s, long keyHash, AggregateMetric aggregate) { + super(keyHash); + this.resource = canonicalize(RESOURCE_CACHE, s.resourceName); + this.service = SERVICE_CACHE.computeIfAbsent(s.serviceName, UTF8_ENCODE); + this.operationName = canonicalize(OPERATION_CACHE, s.operationName); + this.serviceSource = + s.serviceNameSource == null + ? null + : canonicalize(SERVICE_SOURCE_CACHE, s.serviceNameSource); + this.type = canonicalize(TYPE_CACHE, s.spanType); + this.spanKind = SPAN_KIND_CACHE.computeIfAbsent(s.spanKind, UTF8BytesString::create); + this.httpMethod = + s.httpMethod == null + ? null + : HTTP_METHOD_CACHE.computeIfAbsent(s.httpMethod, UTF8BytesString::create); + this.httpEndpoint = + s.httpEndpoint == null + ? null + : HTTP_ENDPOINT_CACHE.computeIfAbsent(s.httpEndpoint, UTF8BytesString::create); + this.grpcStatusCode = + s.grpcStatusCode == null + ? null + : GRPC_STATUS_CODE_CACHE.computeIfAbsent(s.grpcStatusCode, UTF8BytesString::create); + this.httpStatusCode = s.httpStatusCode; + this.synthetic = s.synthetic; + this.traceRoot = s.traceRoot; + this.peerTagPairsRaw = s.peerTagPairs; + this.peerTags = materializePeerTags(s.peerTagPairs); + this.aggregate = aggregate; + } + + /** Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. */ + static AggregateEntry of( + CharSequence resource, + CharSequence service, + CharSequence operationName, + CharSequence serviceSource, + CharSequence type, + int httpStatusCode, + boolean synthetic, + boolean traceRoot, + CharSequence spanKind, + List peerTags, + CharSequence httpMethod, + CharSequence httpEndpoint, + CharSequence grpcStatusCode) { + String[] rawPairs = peerTagsToRawPairs(peerTags); + SpanSnapshot synthetic_snapshot = + new SpanSnapshot( + resource, + service == null ? null : service.toString(), + operationName, + serviceSource, + type, + (short) httpStatusCode, + synthetic, + traceRoot, + spanKind == null ? null : spanKind.toString(), + rawPairs, + httpMethod == null ? null : httpMethod.toString(), + httpEndpoint == null ? null : httpEndpoint.toString(), + grpcStatusCode == null ? null : grpcStatusCode.toString(), + 0L); + return new AggregateEntry( + synthetic_snapshot, hashOf(synthetic_snapshot), new AggregateMetric()); + } + + /** Construct from a snapshot at consumer-thread miss time. */ + static AggregateEntry forSnapshot(SpanSnapshot s, AggregateMetric aggregate) { + return new AggregateEntry(s, hashOf(s), aggregate); + } + + boolean matches(SpanSnapshot s) { + return httpStatusCode == s.httpStatusCode + && synthetic == s.synthetic + && traceRoot == s.traceRoot + && contentEquals(resource, s.resourceName) + && stringContentEquals(service, s.serviceName) + && contentEquals(operationName, s.operationName) + && contentEquals(serviceSource, s.serviceNameSource) + && contentEquals(type, s.spanType) + && stringContentEquals(spanKind, s.spanKind) + && Arrays.equals(peerTagPairsRaw, s.peerTagPairs) + && stringContentEquals(httpMethod, s.httpMethod) + && stringContentEquals(httpEndpoint, s.httpEndpoint) + && stringContentEquals(grpcStatusCode, s.grpcStatusCode); + } + + /** + * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no + * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's + * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes + * to the same bucket the snapshot itself looks up. + * + *

Hashes are content-stable across {@code String} / {@code UTF8BytesString}: {@link + * UTF8BytesString#hashCode()} returns the underlying {@code String}'s hash. + */ + static long hashOf(SpanSnapshot s) { + long h = 0; + h = LongHashingUtils.addToHash(h, s.resourceName); + h = LongHashingUtils.addToHash(h, s.serviceName); + h = LongHashingUtils.addToHash(h, s.operationName); + h = LongHashingUtils.addToHash(h, s.serviceNameSource); + h = LongHashingUtils.addToHash(h, s.spanType); + h = LongHashingUtils.addToHash(h, s.httpStatusCode); + h = LongHashingUtils.addToHash(h, s.synthetic); + h = LongHashingUtils.addToHash(h, s.traceRoot); + h = LongHashingUtils.addToHash(h, s.spanKind); + if (s.peerTagPairs != null) { + for (String p : s.peerTagPairs) { + h = LongHashingUtils.addToHash(h, p); + } + } + h = LongHashingUtils.addToHash(h, s.httpMethod); + h = LongHashingUtils.addToHash(h, s.httpEndpoint); + h = LongHashingUtils.addToHash(h, s.grpcStatusCode); + return h; + } + + // Accessors for SerializingMetricWriter. + UTF8BytesString getResource() { + return resource; + } + + UTF8BytesString getService() { + return service; + } + + UTF8BytesString getOperationName() { + return operationName; + } + + UTF8BytesString getServiceSource() { + return serviceSource; + } + + UTF8BytesString getType() { + return type; + } + + UTF8BytesString getSpanKind() { + return spanKind; + } + + UTF8BytesString getHttpMethod() { + return httpMethod; + } + + UTF8BytesString getHttpEndpoint() { + return httpEndpoint; + } + + UTF8BytesString getGrpcStatusCode() { + return grpcStatusCode; + } + + int getHttpStatusCode() { + return httpStatusCode; + } + + boolean isSynthetics() { + return synthetic; + } + + boolean isTraceRoot() { + return traceRoot; + } + + List getPeerTags() { + return peerTags; + } + + /** + * Equality on the 13 label fields (not on the aggregate). Used only by test mock matchers; the + * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link #matches(SpanSnapshot)} + * and never calls {@code equals}. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof AggregateEntry)) return false; + AggregateEntry that = (AggregateEntry) o; + return httpStatusCode == that.httpStatusCode + && synthetic == that.synthetic + && traceRoot == that.traceRoot + && java.util.Objects.equals(resource, that.resource) + && java.util.Objects.equals(service, that.service) + && java.util.Objects.equals(operationName, that.operationName) + && java.util.Objects.equals(serviceSource, that.serviceSource) + && java.util.Objects.equals(type, that.type) + && java.util.Objects.equals(spanKind, that.spanKind) + && peerTags.equals(that.peerTags) + && java.util.Objects.equals(httpMethod, that.httpMethod) + && java.util.Objects.equals(httpEndpoint, that.httpEndpoint) + && java.util.Objects.equals(grpcStatusCode, that.grpcStatusCode); + } + + @Override + public int hashCode() { + return (int) keyHash; + } + + // ----- helpers ----- + + private static UTF8BytesString canonicalize( + DDCache cache, CharSequence charSeq) { + if (charSeq == null) { + return EMPTY; + } + if (charSeq instanceof UTF8BytesString) { + return (UTF8BytesString) charSeq; + } + return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); + } + + /** UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. */ + private static boolean contentEquals(UTF8BytesString a, CharSequence b) { + if (a == null) { + return b == null; + } + if (b == null) { + return false; + } + // UTF8BytesString.toString() returns the underlying String -- O(1), no allocation. + String aStr = a.toString(); + if (b instanceof String) { + return aStr.equals(b); + } + if (b instanceof UTF8BytesString) { + return aStr.equals(b.toString()); + } + return aStr.contentEquals(b); + } + + private static boolean stringContentEquals(UTF8BytesString a, String b) { + if (a == null) { + return b == null; + } + return b != null && a.toString().equals(b); + } + + private static List materializePeerTags(String[] pairs) { + if (pairs == null || pairs.length == 0) { + return Collections.emptyList(); + } + if (pairs.length == 2) { + return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); + } + List tags = new ArrayList<>(pairs.length / 2); + for (int i = 0; i < pairs.length; i += 2) { + tags.add(encodePeerTag(pairs[i], pairs[i + 1])); + } + return tags; + } + + private static UTF8BytesString encodePeerTag(String name, String value) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); + return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + } + + /** + * Inverse of {@link #materializePeerTags}: takes pre-encoded UTF8 peer tags and recovers the raw + * {@code [name0, value0, name1, value1, ...]} pairs. Used by the test factory {@link #of}, not by + * the hot path. + */ + private static String[] peerTagsToRawPairs(List peerTags) { + if (peerTags == null || peerTags.isEmpty()) { + return null; + } + String[] pairs = new String[peerTags.size() * 2]; + int i = 0; + for (UTF8BytesString peerTag : peerTags) { + String s = peerTag.toString(); + int colon = s.indexOf(':'); + pairs[i++] = colon < 0 ? s : s.substring(0, colon); + pairs[i++] = colon < 0 ? "" : s.substring(colon + 1); + } + return pairs; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java new file mode 100644 index 00000000000..08300eab296 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java @@ -0,0 +1,132 @@ +package datadog.trace.common.metrics; + +import datadog.trace.util.Hashtable; +import java.util.function.Consumer; + +/** + * Consumer-side {@link AggregateMetric} store, keyed on the raw fields of a {@link SpanSnapshot}. + * + *

Replaces the prior {@code LRUCache}. The win is on the + * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise + * {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache + * lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) live on the {@link + * AggregateEntry} itself and are built once per unique key at insert time. + * + *

Not thread-safe. The aggregator thread is the sole writer; {@link #clear()} must be + * routed through the inbox rather than called from arbitrary threads. + */ +final class AggregateTable { + + private final Hashtable.Entry[] buckets; + private final int maxAggregates; + private int size; + + AggregateTable(int maxAggregates) { + this.buckets = Hashtable.Support.create(maxAggregates * 4 / 3); + this.maxAggregates = maxAggregates; + } + + int size() { + return size; + } + + boolean isEmpty() { + return size == 0; + } + + /** + * Returns the {@link AggregateMetric} to update for {@code snapshot}, lazily creating an entry on + * miss. Returns {@code null} when the table is at capacity and no stale entry can be evicted -- + * the caller should drop the data point in that case. + */ + AggregateMetric findOrInsert(SpanSnapshot snapshot) { + long keyHash = AggregateEntry.hashOf(snapshot); + int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash); + for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) { + if (e.keyHash == keyHash) { + AggregateEntry candidate = (AggregateEntry) e; + if (candidate.matches(snapshot)) { + return candidate.aggregate; + } + } + } + if (size >= maxAggregates && !evictOneStale()) { + return null; + } + AggregateEntry entry = AggregateEntry.forSnapshot(snapshot, new AggregateMetric()); + entry.setNext(buckets[bucketIndex]); + buckets[bucketIndex] = entry; + size++; + return entry.aggregate; + } + + /** Unlink the first entry whose {@code AggregateMetric.getHitCount() == 0}. */ + private boolean evictOneStale() { + for (int i = 0; i < buckets.length; i++) { + Hashtable.Entry head = buckets[i]; + if (head == null) { + continue; + } + if (((AggregateEntry) head).aggregate.getHitCount() == 0) { + buckets[i] = head.next(); + size--; + return true; + } + Hashtable.Entry prev = head; + Hashtable.Entry cur = head.next(); + while (cur != null) { + if (((AggregateEntry) cur).aggregate.getHitCount() == 0) { + prev.setNext(cur.next()); + size--; + return true; + } + prev = cur; + cur = cur.next(); + } + } + return false; + } + + void forEach(Consumer consumer) { + for (int i = 0; i < buckets.length; i++) { + for (Hashtable.Entry e = buckets[i]; e != null; e = e.next()) { + consumer.accept((AggregateEntry) e); + } + } + } + + /** Removes entries whose {@code AggregateMetric.getHitCount() == 0}. */ + void expungeStaleAggregates() { + for (int i = 0; i < buckets.length; i++) { + // unlink leading stale entries + Hashtable.Entry head = buckets[i]; + while (head != null && ((AggregateEntry) head).aggregate.getHitCount() == 0) { + head = head.next(); + size--; + } + buckets[i] = head; + if (head == null) { + continue; + } + // unlink stale entries in the chain + Hashtable.Entry prev = head; + Hashtable.Entry cur = head.next(); + while (cur != null) { + if (((AggregateEntry) cur).aggregate.getHitCount() == 0) { + Hashtable.Entry skipped = cur.next(); + prev.setNext(skipped); + size--; + cur = skipped; + } else { + prev = cur; + cur = cur.next(); + } + } + } + } + + void clear() { + Hashtable.Support.clear(buckets); + size = 0; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index e632555cc21..b4fc59d5a1d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -1,26 +1,12 @@ package datadog.trace.common.metrics; -import static datadog.trace.api.Functions.UTF8_ENCODE; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES; -import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import datadog.trace.api.Pair; -import datadog.trace.api.cache.DDCache; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.common.metrics.SignalItem.ClearSignal; import datadog.trace.common.metrics.SignalItem.StopSignal; import datadog.trace.core.monitor.HealthMetrics; -import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +18,9 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); private final MessagePassingQueue inbox; - private final LRUCache aggregates; + private final AggregateTable aggregates; private final MetricWriter writer; + private final HealthMetrics healthMetrics; // the reporting interval controls how much history will be buffered // when the agent is unresponsive (only 10 pending requests will be // buffered by OkHttpSink) @@ -73,27 +60,10 @@ final class Aggregator implements Runnable { HealthMetrics healthMetrics) { this.writer = writer; this.inbox = inbox; - this.aggregates = - new LRUCache<>( - new AggregateExpiry(healthMetrics), maxAggregates * 4 / 3, 0.75f, maxAggregates); + this.aggregates = new AggregateTable(maxAggregates); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; - } - - private static final class AggregateExpiry - implements LRUCache.ExpiryListener { - private final HealthMetrics healthMetrics; - - AggregateExpiry(HealthMetrics healthMetrics) { - this.healthMetrics = healthMetrics; - } - - @Override - public void accept(Map.Entry expired) { - if (expired.getValue().getHitCount() > 0) { - healthMetrics.onStatsAggregateDropped(); - } - } + this.healthMetrics = healthMetrics; } public void clearAggregates() { @@ -126,7 +96,13 @@ private final class Drainer implements MessagePassingQueue.Consumer { @Override public void accept(InboxItem item) { - if (item instanceof SignalItem) { + if (item == ClearSignal.CLEAR) { + if (!stopped) { + aggregates.clear(); + inbox.clear(); + } + ((SignalItem) item).complete(); + } else if (item instanceof SignalItem) { SignalItem signal = (SignalItem) item; if (!stopped) { report(wallClockTime(), signal); @@ -139,64 +115,31 @@ public void accept(InboxItem item) { } } else if (item instanceof SpanSnapshot && !stopped) { SpanSnapshot snapshot = (SpanSnapshot) item; - MetricKey key = buildMetricKey(snapshot); - AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); - aggregate.recordOneDuration(snapshot.tagAndDuration); - dirty = true; + AggregateMetric aggregate = aggregates.findOrInsert(snapshot); + if (aggregate != null) { + aggregate.recordOneDuration(snapshot.tagAndDuration); + dirty = true; + } else { + // table at cap with no stale entry available to evict + healthMetrics.onStatsAggregateDropped(); + } } } } - private static MetricKey buildMetricKey(SpanSnapshot s) { - return new MetricKey( - s.resourceName, - SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE), - s.operationName, - s.serviceNameSource, - s.spanType, - s.httpStatusCode, - s.synthetic, - s.traceRoot, - SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create), - materializePeerTags(s.peerTagPairs), - s.httpMethod, - s.httpEndpoint, - s.grpcStatusCode); - } - - private static List materializePeerTags(String[] pairs) { - if (pairs == null || pairs.length == 0) { - return Collections.emptyList(); - } - if (pairs.length == 2) { - // single-entry fast path (matches the original singletonList shape for INTERNAL spans) - return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); - } - List tags = new ArrayList<>(pairs.length / 2); - for (int i = 0; i < pairs.length; i += 2) { - tags.add(encodePeerTag(pairs[i], pairs[i + 1])); - } - return tags; - } - - private static UTF8BytesString encodePeerTag(String name, String value) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); - return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); - } - private void report(long when, SignalItem signal) { boolean skipped = true; if (dirty) { try { - expungeStaleAggregates(); + aggregates.expungeStaleAggregates(); if (!aggregates.isEmpty()) { skipped = false; writer.startBucket(aggregates.size(), when, reportingIntervalNanos); - for (Map.Entry aggregate : aggregates.entrySet()) { - writer.add(aggregate.getKey(), aggregate.getValue()); - aggregate.getValue().clear(); - } + aggregates.forEach( + entry -> { + writer.add(entry); + entry.aggregate.clear(); + }); // note that this may do IO and block writer.finishBucket(); } @@ -212,17 +155,6 @@ private void report(long when, SignalItem signal) { } } - private void expungeStaleAggregates() { - Iterator> it = aggregates.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry pair = it.next(); - AggregateMetric metric = pair.getValue(); - if (metric.getHitCount() == 0) { - it.remove(); - } - } - } - private long wallClockTime() { return MILLISECONDS.toNanos(System.currentTimeMillis()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 9ea77140113..c675fcb23c4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -8,6 +8,7 @@ import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; +import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP; import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; @@ -19,12 +20,8 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; -import datadog.trace.api.Pair; import datadog.trace.api.WellKnownTags; -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.ReportSignal; import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.core.CoreSpan; @@ -39,7 +36,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,22 +47,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); - - static final DDCache SPAN_KINDS = DDCaches.newFixedSizeCache(16); - static final DDCache< - String, Pair, Function>> - PEER_TAGS_CACHE = - DDCaches.newFixedSizeCache( - 64); // it can be unbounded since those values are returned by the agent and should be - // under control. 64 entries is enough in this case to contain all the peer tags. - static final Function< - String, Pair, Function>> - PEER_TAGS_CACHE_ADDER = - key -> - Pair.of( - DDCaches.newFixedSizeCache(512), - value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private static final SpanKindFilter METRICS_ELIGIBLE_KINDS = @@ -418,8 +398,10 @@ private void disable() { features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - this.inbox.clear(); - this.aggregator.clearAggregates(); + // Route the clear through the inbox so the aggregator thread is the only writer. + // AggregateTable is not thread-safe; calling clearAggregates() directly from this thread + // would race with Drainer.accept on the aggregator thread. + inbox.offer(CLEAR); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java index 7d66cad6a15..a0625be095b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/InboxItem.java @@ -28,4 +28,15 @@ private StopSignal() {} static final class ReportSignal extends SignalItem { static final ReportSignal REPORT = new ReportSignal(); } + + /** + * Posted from arbitrary threads (e.g. the Sink event thread during agent downgrade) so the + * aggregator thread is the one that actually performs the table reset. Keeps {@link + * AggregateTable} and {@code inbox.clear()} single-writer. + */ + static final class ClearSignal extends SignalItem { + static final ClearSignal CLEAR = new ClearSignal(); + + private ClearSignal() {} + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java deleted file mode 100644 index 9e2e2098d1f..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ /dev/null @@ -1,178 +0,0 @@ -package datadog.trace.common.metrics; - -import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; - -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; -import datadog.trace.util.HashingUtils; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -/** The aggregation key for tracked metrics. */ -public final class MetricKey { - static final DDCache RESOURCE_CACHE = DDCaches.newFixedSizeCache(32); - static final DDCache SERVICE_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache SERVICE_SOURCE_CACHE = - DDCaches.newFixedSizeCache(16); - static final DDCache OPERATION_CACHE = DDCaches.newFixedSizeCache(64); - static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache KIND_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache HTTP_METHOD_CACHE = DDCaches.newFixedSizeCache(8); - static final DDCache HTTP_ENDPOINT_CACHE = - DDCaches.newFixedSizeCache(32); - static final DDCache GRPC_STATUS_CODE_CACHE = - DDCaches.newFixedSizeCache(32); - - private final UTF8BytesString resource; - private final UTF8BytesString service; - private final UTF8BytesString serviceSource; - private final UTF8BytesString operationName; - private final UTF8BytesString type; - private final int httpStatusCode; - private final boolean synthetics; - private final int hash; - private final boolean isTraceRoot; - private final UTF8BytesString spanKind; - private final List peerTags; - private final UTF8BytesString httpMethod; - private final UTF8BytesString httpEndpoint; - private final UTF8BytesString grpcStatusCode; - - public MetricKey( - CharSequence resource, - CharSequence service, - CharSequence operationName, - CharSequence serviceSource, - CharSequence type, - int httpStatusCode, - boolean synthetics, - boolean isTraceRoot, - CharSequence spanKind, - List peerTags, - CharSequence httpMethod, - CharSequence httpEndpoint, - CharSequence grpcStatusCode) { - this.resource = null == resource ? EMPTY : utf8(RESOURCE_CACHE, resource); - this.service = null == service ? EMPTY : utf8(SERVICE_CACHE, service); - this.serviceSource = null == serviceSource ? null : utf8(SERVICE_SOURCE_CACHE, serviceSource); - this.operationName = null == operationName ? EMPTY : utf8(OPERATION_CACHE, operationName); - this.type = null == type ? EMPTY : utf8(TYPE_CACHE, type); - this.httpStatusCode = httpStatusCode; - this.synthetics = synthetics; - this.isTraceRoot = isTraceRoot; - this.spanKind = null == spanKind ? EMPTY : utf8(KIND_CACHE, spanKind); - this.peerTags = peerTags == null ? Collections.emptyList() : peerTags; - this.httpMethod = httpMethod == null ? null : utf8(HTTP_METHOD_CACHE, httpMethod); - this.httpEndpoint = httpEndpoint == null ? null : utf8(HTTP_ENDPOINT_CACHE, httpEndpoint); - this.grpcStatusCode = - grpcStatusCode == null ? null : utf8(GRPC_STATUS_CODE_CACHE, grpcStatusCode); - - int tmpHash = 0; - tmpHash = HashingUtils.addToHash(tmpHash, this.isTraceRoot); - tmpHash = HashingUtils.addToHash(tmpHash, this.spanKind); - tmpHash = HashingUtils.addToHash(tmpHash, this.peerTags); - tmpHash = HashingUtils.addToHash(tmpHash, this.resource); - tmpHash = HashingUtils.addToHash(tmpHash, this.service); - tmpHash = HashingUtils.addToHash(tmpHash, this.operationName); - tmpHash = HashingUtils.addToHash(tmpHash, this.type); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpStatusCode); - tmpHash = HashingUtils.addToHash(tmpHash, this.synthetics); - tmpHash = HashingUtils.addToHash(tmpHash, this.serviceSource); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpEndpoint); - tmpHash = HashingUtils.addToHash(tmpHash, this.httpMethod); - tmpHash = HashingUtils.addToHash(tmpHash, this.grpcStatusCode); - this.hash = tmpHash; - } - - static UTF8BytesString utf8(DDCache cache, CharSequence charSeq) { - if (charSeq instanceof UTF8BytesString) { - return (UTF8BytesString) charSeq; - } else { - return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); - } - } - - public UTF8BytesString getResource() { - return resource; - } - - public UTF8BytesString getService() { - return service; - } - - public UTF8BytesString getServiceSource() { - return serviceSource; - } - - public UTF8BytesString getOperationName() { - return operationName; - } - - public UTF8BytesString getType() { - return type; - } - - public int getHttpStatusCode() { - return httpStatusCode; - } - - public boolean isSynthetics() { - return synthetics; - } - - public boolean isTraceRoot() { - return isTraceRoot; - } - - public UTF8BytesString getSpanKind() { - return spanKind; - } - - public List getPeerTags() { - return peerTags; - } - - public UTF8BytesString getHttpMethod() { - return httpMethod; - } - - public UTF8BytesString getHttpEndpoint() { - return httpEndpoint; - } - - public UTF8BytesString getGrpcStatusCode() { - return grpcStatusCode; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if ((o instanceof MetricKey)) { - MetricKey metricKey = (MetricKey) o; - return hash == metricKey.hash - && synthetics == metricKey.synthetics - && httpStatusCode == metricKey.httpStatusCode - && resource.equals(metricKey.resource) - && service.equals(metricKey.service) - && operationName.equals(metricKey.operationName) - && type.equals(metricKey.type) - && isTraceRoot == metricKey.isTraceRoot - && spanKind.equals(metricKey.spanKind) - && peerTags.equals(metricKey.peerTags) - && Objects.equals(serviceSource, metricKey.serviceSource) - && Objects.equals(httpMethod, metricKey.httpMethod) - && Objects.equals(httpEndpoint, metricKey.httpEndpoint) - && Objects.equals(grpcStatusCode, metricKey.grpcStatusCode); - } - return false; - } - - @Override - public int hashCode() { - return hash; - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java index fa26ed2e5db..c31825f6af8 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricWriter.java @@ -3,7 +3,11 @@ public interface MetricWriter { void startBucket(int metricCount, long start, long duration); - void add(MetricKey key, AggregateMetric aggregate); + /** + * Serialize one aggregate. The {@link AggregateEntry} carries both the label fields (resource, + * service, span.kind, peer tags, etc.) and the {@link AggregateMetric} counters being reported. + */ + void add(AggregateEntry entry); void finishBucket(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 0f84964e9db..ba6ae6c2699 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -142,12 +142,13 @@ public void startBucket(int metricCount, long start, long duration) { } @Override - public void add(MetricKey key, AggregateMetric aggregate) { + public void add(AggregateEntry entry) { + final AggregateMetric aggregate = entry.aggregate; // Calculate dynamic map size based on optional fields - final boolean hasHttpMethod = key.getHttpMethod() != null; - final boolean hasHttpEndpoint = key.getHttpEndpoint() != null; - final boolean hasServiceSource = key.getServiceSource() != null; - final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null; + final boolean hasHttpMethod = entry.getHttpMethod() != null; + final boolean hasHttpEndpoint = entry.getHttpEndpoint() != null; + final boolean hasServiceSource = entry.getServiceSource() != null; + final boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null; final int mapSize = 15 + (hasServiceSource ? 1 : 0) @@ -158,31 +159,31 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.startMap(mapSize); writer.writeUTF8(NAME); - writer.writeUTF8(key.getOperationName()); + writer.writeUTF8(entry.getOperationName()); writer.writeUTF8(SERVICE); - writer.writeUTF8(key.getService()); + writer.writeUTF8(entry.getService()); writer.writeUTF8(RESOURCE); - writer.writeUTF8(key.getResource()); + writer.writeUTF8(entry.getResource()); writer.writeUTF8(TYPE); - writer.writeUTF8(key.getType()); + writer.writeUTF8(entry.getType()); writer.writeUTF8(HTTP_STATUS_CODE); - writer.writeInt(key.getHttpStatusCode()); + writer.writeInt(entry.getHttpStatusCode()); writer.writeUTF8(SYNTHETICS); - writer.writeBoolean(key.isSynthetics()); + writer.writeBoolean(entry.isSynthetics()); writer.writeUTF8(IS_TRACE_ROOT); - writer.writeInt(key.isTraceRoot() ? TRISTATE_TRUE : TRISTATE_FALSE); + writer.writeInt(entry.isTraceRoot() ? TRISTATE_TRUE : TRISTATE_FALSE); writer.writeUTF8(SPAN_KIND); - writer.writeUTF8(key.getSpanKind()); + writer.writeUTF8(entry.getSpanKind()); writer.writeUTF8(PEER_TAGS); - final List peerTags = key.getPeerTags(); + final List peerTags = entry.getPeerTags(); writer.startArray(peerTags.size()); for (UTF8BytesString peerTag : peerTags) { @@ -191,24 +192,24 @@ public void add(MetricKey key, AggregateMetric aggregate) { if (hasServiceSource) { writer.writeUTF8(SERVICE_SOURCE); - writer.writeUTF8(key.getServiceSource()); + writer.writeUTF8(entry.getServiceSource()); } // Only include HTTPMethod if present if (hasHttpMethod) { writer.writeUTF8(HTTP_METHOD); - writer.writeUTF8(key.getHttpMethod()); + writer.writeUTF8(entry.getHttpMethod()); } // Only include HTTPEndpoint if present if (hasHttpEndpoint) { writer.writeUTF8(HTTP_ENDPOINT); - writer.writeUTF8(key.getHttpEndpoint()); + writer.writeUTF8(entry.getHttpEndpoint()); } // Only include GRPCStatusCode if present (rpc-type spans) if (hasGrpcStatusCode) { writer.writeUTF8(GRPC_STATUS_CODE); - writer.writeUTF8(key.getGrpcStatusCode()); + writer.writeUTF8(entry.getGrpcStatusCode()); } writer.writeUTF8(HITS); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java index 2816fad0411..b7f81712945 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -2,7 +2,8 @@ /** * Immutable per-span value posted from the producer to the aggregator thread. Carries the raw - * inputs the aggregator needs to build a {@link MetricKey} and update an {@link AggregateMetric}. + * inputs the aggregator needs to build an {@link AggregateEntry} and update its {@link + * AggregateMetric}. * *

All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the * aggregator thread; the producer just shuffles references. diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 962ad2ce892..4dd0155443a 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -119,7 +119,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( null, "service", "operation", @@ -133,8 +133,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -165,7 +165,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -179,8 +179,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -217,7 +217,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered == statsComputed (statsComputed ? 1 : 0) * writer.startBucket(1, _, _) (statsComputed ? 1 : 0) * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -231,9 +231,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { httpMethod, httpEndpoint, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + } (statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -279,7 +279,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(2, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -293,11 +293,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + } 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -311,9 +311,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -344,7 +344,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(1, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -358,9 +358,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 0 && e.aggregate.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -396,7 +396,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -410,9 +410,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == topLevelCount && e.aggregate.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -455,7 +455,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.finishBucket() >> { latch.countDown() } 1 * writer.startBucket(2, _, SECONDS.toNanos(reportingInterval)) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -469,10 +469,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration + } + 1 * writer.add(AggregateEntry.of( "resource2", "service2", "operation2", @@ -486,9 +486,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration * 2 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration * 2 + } cleanup: aggregator.close() @@ -526,7 +526,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should aggregate into single metric" latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -540,9 +540,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == count && e.aggregate.getDuration() == count * duration + } 1 * writer.finishBucket() >> { latch.countDown() } when: "publish spans with different endpoints" @@ -567,7 +567,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create separate metrics for each endpoint/method combination" latchTriggered2 1 * writer.startBucket(3, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -581,10 +581,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -598,10 +598,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/orders/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -615,9 +615,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 3 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 3 + } 1 * writer.finishBucket() >> { latch2.countDown() } cleanup: @@ -665,7 +665,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create 4 separate metrics" latchTriggered 1 * writer.startBucket(4, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -679,10 +679,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -696,10 +696,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -713,10 +713,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 3 - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 3 + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -730,9 +730,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/orders/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 4 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 4 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -769,7 +769,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create separate metric keys for spans with and without HTTP tags" latchTriggered 1 * writer.startBucket(2, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -783,10 +783,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -800,9 +800,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration * 2 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration * 2 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -837,7 +837,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "should create the different metric keys for spans with and without sources" latchTriggered 1 * writer.startBucket(2, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -851,10 +851,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 2 && value.getDuration() == 2 * duration - }) - 1 * writer.add(new MetricKey( + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 2 && e.aggregate.getDuration() == 2 * duration + } + 1 * writer.add(AggregateEntry.of( "resource", "service", "operation", @@ -868,16 +868,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: aggregator.close() } - def "test least recently written to aggregate flushed when size limit exceeded"() { + def "new aggregates beyond size limit are dropped when no stale entries can be evicted"() { + // The table only evicts entries with hitCount == 0 to make room. When all entries are live + // (all have been recorded against), an over-cap insert drops the new key rather than evicting + // an established one. This protects the data we've already collected from a burst of new keys. setup: int maxAggregates = 10 MetricWriter writer = Mock(MetricWriter) @@ -901,11 +904,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { aggregator.report() def latchTriggered = latch.await(2, SECONDS) - then: "the first aggregate should be dropped but the rest reported" + then: "the established service0..service9 are reported; service10 is dropped" latchTriggered 1 * writer.startBucket(10, _, SECONDS.toNanos(reportingInterval)) - for (int i = 1; i < 11; ++i) { - 1 * writer.add(new MetricKey( + for (int i = 0; i < 10; ++i) { + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -919,13 +922,13 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration } } - 0 * writer.add(new MetricKey( + 0 * writer.add(AggregateEntry.of( "resource", - "service0", + "service10", "operation", null, "type", @@ -937,7 +940,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + )) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1052,7 +1055,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -1066,9 +1069,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1087,7 +1090,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(4, _, SECONDS.toNanos(reportingInterval)) for (int i = 1; i < 5; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -1101,11 +1104,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } } - 0 * writer.add(new MetricKey( + 0 * writer.add(AggregateEntry.of( "resource", "service0", "operation", @@ -1119,7 +1122,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + )) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1154,7 +1157,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -1168,9 +1171,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1180,7 +1183,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: "aggregate not updated in cycle is not reported" 0 * writer.finishBucket() 0 * writer.startBucket(_, _, _) - 0 * writer.add(_, _) + 0 * writer.add(_) cleanup: aggregator.close() @@ -1213,7 +1216,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(1)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "resource", "service" + i, "operation", @@ -1227,9 +1230,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getDuration() == duration + } } 1 * writer.finishBucket() >> { latch.countDown() } @@ -1380,7 +1383,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(1, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1394,9 +1397,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1435,7 +1438,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(1, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1449,9 +1452,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 3 && aggregateMetric.getTopLevelCount() == 3 && aggregateMetric.getDuration() == 450 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 3 && e.aggregate.getTopLevelCount() == 3 && e.aggregate.getDuration() == 450 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1490,7 +1493,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(3, _, _) 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1504,11 +1507,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 100 + } 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1522,11 +1525,11 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/orders", null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 200 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 200 + } 1 * writer.add( - new MetricKey( + AggregateEntry.of( "resource", "service", "operation", @@ -1540,9 +1543,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> - aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 150 - }) + )) >> { AggregateEntry e -> + e.aggregate.getHitCount() == 1 && e.aggregate.getTopLevelCount() == 1 && e.aggregate.getDuration() == 150 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1578,7 +1581,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(3, _, _) - 1 * writer.add(new MetricKey( + 1 * writer.add(AggregateEntry.of( "grpc.service/Method", "service", "grpc.server", @@ -1592,8 +1595,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, "0" - ), _) - 1 * writer.add(new MetricKey( + )) + 1 * writer.add(AggregateEntry.of( "grpc.service/Method", "service", "grpc.server", @@ -1607,8 +1610,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, "5" - ), _) - 1 * writer.add(new MetricKey( + )) + 1 * writer.add(AggregateEntry.of( "GET /api", "service", "http.request", @@ -1622,7 +1625,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + )) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 3ff81de9851..08f0f7cbb92 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -7,7 +7,6 @@ import static java.util.concurrent.TimeUnit.SECONDS import datadog.metrics.api.Histograms import datadog.metrics.impl.DDSketchHistograms import datadog.trace.api.Config -import datadog.trace.api.Pair import datadog.trace.api.ProcessTags import datadog.trace.api.WellKnownTags import datadog.trace.api.git.CommitInfo @@ -26,6 +25,30 @@ class SerializingMetricWriterTest extends DDSpecification { Histograms.register(DDSketchHistograms.FACTORY) } + /** Build an {@link AggregateEntry} with a pre-recorded duration count. */ + private static AggregateEntry entry( + CharSequence resource, + CharSequence service, + CharSequence operationName, + CharSequence serviceSource, + CharSequence type, + int httpStatusCode, + boolean synthetic, + boolean traceRoot, + CharSequence spanKind, + List peerTags, + CharSequence httpMethod, + CharSequence httpEndpoint, + CharSequence grpcStatusCode, + int hitCount) { + AggregateEntry e = AggregateEntry.of( + resource, service, operationName, serviceSource, type, + httpStatusCode, synthetic, traceRoot, spanKind, peerTags, + httpMethod, httpEndpoint, grpcStatusCode) + e.aggregate.recordDurations(hitCount, new AtomicLongArray(1L)) + return e + } + def "should produce correct message #iterationIndex with process tags enabled #withProcessTags" () { setup: if (!withProcessTags) { @@ -40,8 +63,8 @@ class SerializingMetricWriterTest extends DDSpecification { when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() @@ -55,88 +78,40 @@ class SerializingMetricWriterTest extends DDSpecification { where: content << [ [ - Pair.of( - new MetricKey( - "resource1", - "service1", - "operation1", - null, - "type", - 0, - false, - false, - "client", + entry( + "resource1", "service1", "operation1", null, "type", 0, + false, false, "client", [ UTF8BytesString.create("country:canada"), UTF8BytesString.create("georegion:amer"), UTF8BytesString.create("peer.service:remote-service") ], - null, - null, - null - ), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) - ), - Pair.of( - new MetricKey( - "resource2", - "service2", - "operation2", - null, - "type2", - 200, - true, - false, - "producer", + null, null, null, + 10), + entry( + "resource2", "service2", "operation2", null, "type2", 200, + true, false, "producer", [ UTF8BytesString.create("country:canada"), UTF8BytesString.create("georegion:amer"), UTF8BytesString.create("peer.service:remote-service") ], - null, - null, - null - ), - new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) - ), - Pair.of( - new MetricKey( - "GET /api/users/:id", - "web-service", - "http.request", - null, - "web", - 200, - false, - true, - "server", + null, null, null, + 9), + entry( + "GET /api/users/:id", "web-service", "http.request", null, "web", 200, + false, true, "server", [], - "GET", - "/api/users/:id", - null - ), - new AggregateMetric().recordDurations(5, new AtomicLongArray(1L)) - ) + null, null, null, + 5) ], (0..10000).collect({ i -> - Pair.of( - new MetricKey( - "resource" + i, - "service" + i, - "operation" + i, - null, - "type", - 0, - false, - false, - "producer", + entry( + "resource" + i, "service" + i, "operation" + i, null, "type", 0, + false, false, "producer", [UTF8BytesString.create("messaging.destination:dest" + i)], - null, - null, - null - ), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) - ) + null, null, null, + 10) }) ] withProcessTags << [true, false] @@ -148,22 +123,18 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null) + def entryNoSource = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) + def entryWithSource = entry("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null, 1) - def content = [ - Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - ] + def content = [entryNoSource, entryWithSource] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() @@ -177,34 +148,25 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null,null) - def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders",null) - def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null) - - def content = [ - Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] + def entryWithBoth = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) + def entryWithMethodOnly = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null, 1) + def entryWithEndpointOnly = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null, 1) + def entryWithNeither = entry("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null, 1) + + def content = [entryWithBoth, entryWithMethodOnly, entryWithEndpointOnly, entryWithNeither] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) when: writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry e : content) { + writer.add(e) } writer.finishBucket() then: sink.validatedInput() - // Test passes if validation in ValidatingSink succeeds - // ValidatingSink verifies that map size matches actual number of fields - // and that HTTPMethod/HTTPEndpoint are only present when non-empty } def "add git sha commit info when sha commit is #shaCommit"() { @@ -216,40 +178,63 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - // Create keys with different combinations of HTTP fields - def key = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) + def e = entry("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, 1) - def content = [Pair.of(key, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),] + def content = [e] ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128, gitInfoProvider) when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) + for (AggregateEntry entryItem : content) { + writer.add(entryItem) } writer.finishBucket() then: - sink.validatedInput() where: shaCommit << [null, "123456"] } + def "GRPCStatusCode field is present in payload for rpc-type spans"() { + setup: + long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) + long duration = SECONDS.toNanos(10) + WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") + + def entryWithGrpc = entry("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK", 1) + def entryWithGrpcError = entry("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND", 1) + def entryWithoutGrpc = entry("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null, 1) + + def content = [entryWithGrpc, entryWithGrpcError, entryWithoutGrpc] + + ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) + SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) + + when: + writer.startBucket(content.size(), startTime, duration) + for (AggregateEntry e : content) { + writer.add(e) + } + writer.finishBucket() + + then: + sink.validatedInput() + } + static class ValidatingSink implements Sink { private final WellKnownTags wellKnownTags private final long startTimeNanos private final long duration private boolean validated = false - private List> content + private List content ValidatingSink(WellKnownTags wellKnownTags, long startTimeNanos, long duration, - List> content) { + List content) { this.wellKnownTags = wellKnownTags this.startTimeNanos = startTimeNanos this.duration = duration @@ -298,70 +283,69 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackString() == "Stats" int statCount = unpacker.unpackArrayHeader() assert statCount == content.size() - for (Pair pair : content) { - MetricKey key = pair.getLeft() - AggregateMetric value = pair.getRight() + for (AggregateEntry entry : content) { + AggregateMetric value = entry.aggregate int metricMapSize = unpacker.unpackMapHeader() // Calculate expected map size based on optional fields - boolean hasHttpMethod = key.getHttpMethod() != null - boolean hasHttpEndpoint = key.getHttpEndpoint() != null - boolean hasServiceSource = key.getServiceSource() != null - boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null + boolean hasHttpMethod = entry.getHttpMethod() != null + boolean hasHttpEndpoint = entry.getHttpEndpoint() != null + boolean hasServiceSource = entry.getServiceSource() != null + boolean hasGrpcStatusCode = entry.getGrpcStatusCode() != null int expectedMapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) assert metricMapSize == expectedMapSize int elementCount = 0 assert unpacker.unpackString() == "Name" - assert unpacker.unpackString() == key.getOperationName() as String + assert unpacker.unpackString() == entry.getOperationName() as String ++elementCount assert unpacker.unpackString() == "Service" - assert unpacker.unpackString() == key.getService() as String + assert unpacker.unpackString() == entry.getService() as String ++elementCount assert unpacker.unpackString() == "Resource" - assert unpacker.unpackString() == key.getResource() as String + assert unpacker.unpackString() == entry.getResource() as String ++elementCount assert unpacker.unpackString() == "Type" - assert unpacker.unpackString() == key.getType() as String + assert unpacker.unpackString() == entry.getType() as String ++elementCount assert unpacker.unpackString() == "HTTPStatusCode" - assert unpacker.unpackInt() == key.getHttpStatusCode() + assert unpacker.unpackInt() == entry.getHttpStatusCode() ++elementCount assert unpacker.unpackString() == "Synthetics" - assert unpacker.unpackBoolean() == key.isSynthetics() + assert unpacker.unpackBoolean() == entry.isSynthetics() ++elementCount assert unpacker.unpackString() == "IsTraceRoot" - assert unpacker.unpackInt() == (key.isTraceRoot() ? TriState.TRUE.serialValue : TriState.FALSE.serialValue) + assert unpacker.unpackInt() == (entry.isTraceRoot() ? TriState.TRUE.serialValue : TriState.FALSE.serialValue) ++elementCount assert unpacker.unpackString() == "SpanKind" - assert unpacker.unpackString() == key.getSpanKind() as String + assert unpacker.unpackString() == entry.getSpanKind() as String ++elementCount assert unpacker.unpackString() == "PeerTags" int peerTagsLength = unpacker.unpackArrayHeader() - assert peerTagsLength == key.getPeerTags().size() + assert peerTagsLength == entry.getPeerTags().size() for (int i = 0; i < peerTagsLength; i++) { def unpackedPeerTag = unpacker.unpackString() - assert unpackedPeerTag == key.getPeerTags()[i].toString() + assert unpackedPeerTag == entry.getPeerTags()[i].toString() } ++elementCount // Service source is only present when the service name has been overridden by the tracer if (hasServiceSource) { assert unpacker.unpackString() == "srv_src" - assert unpacker.unpackString() == key.getServiceSource().toString() + assert unpacker.unpackString() == entry.getServiceSource().toString() ++elementCount } // HTTPMethod and HTTPEndpoint are optional - only present if non-null if (hasHttpMethod) { assert unpacker.unpackString() == "HTTPMethod" - assert unpacker.unpackString() == key.getHttpMethod() as String + assert unpacker.unpackString() == entry.getHttpMethod() as String ++elementCount } if (hasHttpEndpoint) { assert unpacker.unpackString() == "HTTPEndpoint" - assert unpacker.unpackString() == key.getHttpEndpoint() as String + assert unpacker.unpackString() == entry.getHttpEndpoint() as String ++elementCount } if (hasGrpcStatusCode) { assert unpacker.unpackString() == "GRPCStatusCode" - assert unpacker.unpackString() == key.getGrpcStatusCode() as String + assert unpacker.unpackString() == entry.getGrpcStatusCode() as String ++elementCount } assert unpacker.unpackString() == "Hits" @@ -397,99 +381,4 @@ class SerializingMetricWriterTest extends DDSpecification { return validated } } - - def "ServiceSource optional in the payload"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - // Create keys with different combinations of HTTP fields - def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null) - - def content = [ - Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - } - - def "GRPCStatusCode field is present in payload for rpc-type spans"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - def keyWithGrpc = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK") - def keyWithGrpcError = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND") - def keyWithoutGrpc = new MetricKey("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null) - - def content = [ - Pair.of(keyWithGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithGrpcError, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithoutGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - } - - def "HTTPMethod and HTTPEndpoint fields are optional in payload"() { - setup: - long startTime = MILLISECONDS.toNanos(System.currentTimeMillis()) - long duration = SECONDS.toNanos(10) - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - - // Create keys with different combinations of HTTP fields - def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null) - def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null) - def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null) - - def content = [ - Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithMethodOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithEndpointOnly, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), - Pair.of(keyWithNeither, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))) - ] - - ValidatingSink sink = new ValidatingSink(wellKnownTags, startTime, duration, content) - SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128) - - when: - writer.startBucket(content.size(), startTime, duration) - for (Pair pair : content) { - writer.add(pair.getLeft(), pair.getRight()) - } - writer.finishBucket() - - then: - sink.validatedInput() - // Test passes if validation in ValidatingSink succeeds - // ValidatingSink verifies that map size matches actual number of fields - // and that HTTPMethod/HTTPEndpoint are only present when non-empty - } } diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java new file mode 100644 index 00000000000..44f2b36cb6b --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -0,0 +1,234 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; +import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.metrics.agent.AgentMeter; +import datadog.metrics.api.statsd.StatsDClient; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.metrics.impl.MonitoringImpl; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class AggregateTableTest { + + @BeforeAll + static void initAgentMeter() { + // AggregateMetric.recordOneDuration -> Histogram.accept needs AgentMeter to be initialized. + // Mirror what AggregateMetricTest does. + MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS); + AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY); + monitoring.newTimer("test.init"); + } + + @Test + void insertOnMissReturnsNewAggregate() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot s = snapshot("svc", "op", "client"); + + AggregateMetric agg = table.findOrInsert(s); + + assertNotNull(agg); + assertEquals(1, table.size()); + assertEquals(0, agg.getHitCount()); + } + + @Test + void hitReturnsSameAggregateInstance() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot s1 = snapshot("svc", "op", "client"); + SpanSnapshot s2 = snapshot("svc", "op", "client"); + + AggregateMetric first = table.findOrInsert(s1); + AggregateMetric second = table.findOrInsert(s2); + + assertSame(first, second); + assertEquals(1, table.size()); + } + + @Test + void differentKindFieldsAreDistinct() { + AggregateTable table = new AggregateTable(8); + + AggregateMetric clientAgg = table.findOrInsert(snapshot("svc", "op", "client")); + AggregateMetric serverAgg = table.findOrInsert(snapshot("svc", "op", "server")); + + assertNotSame(clientAgg, serverAgg); + assertEquals(2, table.size()); + } + + @Test + void peerTagPairsParticipateInIdentity() { + AggregateTable table = new AggregateTable(8); + SpanSnapshot withTags = + builder("svc", "op", "client").peerTags("peer.hostname", "host-a").build(); + SpanSnapshot otherTags = + builder("svc", "op", "client").peerTags("peer.hostname", "host-b").build(); + SpanSnapshot noTags = builder("svc", "op", "client").build(); + + AggregateMetric a = table.findOrInsert(withTags); + AggregateMetric b = table.findOrInsert(otherTags); + AggregateMetric c = table.findOrInsert(noTags); + + assertNotSame(a, b); + assertNotSame(a, c); + assertNotSame(b, c); + assertEquals(3, table.size()); + } + + @Test + void capOverrunEvictsStaleEntry() { + AggregateTable table = new AggregateTable(2); + + AggregateMetric stale = table.findOrInsert(snapshot("svc-a", "op", "client")); + // do not record on stale -> hitCount stays at 0 + + AggregateMetric live = table.findOrInsert(snapshot("svc-b", "op", "client")); + live.recordOneDuration(10L | TOP_LEVEL_TAG); // hitCount=1, not evictable + + // table is full (size=2). Inserting a third should evict the stale one and succeed. + AggregateMetric newcomer = table.findOrInsert(snapshot("svc-c", "op", "client")); + assertNotNull(newcomer); + assertEquals(2, table.size()); + + // re-inserting the stale snapshot should miss now (it was evicted) and produce a fresh entry + AggregateMetric staleAgain = table.findOrInsert(snapshot("svc-a", "op", "client")); + assertNotSame(stale, staleAgain); + } + + @Test + void capOverrunWithNoStaleReturnsNull() { + AggregateTable table = new AggregateTable(2); + + AggregateMetric a = table.findOrInsert(snapshot("svc-a", "op", "client")); + AggregateMetric b = table.findOrInsert(snapshot("svc-b", "op", "client")); + a.recordOneDuration(10L); + b.recordOneDuration(20L); + + AggregateMetric c = table.findOrInsert(snapshot("svc-c", "op", "client")); + assertNull(c); + assertEquals(2, table.size()); + } + + @Test + void expungeStaleAggregatesRemovesZeroHitsOnly() { + AggregateTable table = new AggregateTable(16); + + AggregateMetric live = table.findOrInsert(snapshot("svc-live", "op", "client")); + live.recordOneDuration(10L); + AggregateMetric stale1 = table.findOrInsert(snapshot("svc-stale1", "op", "client")); + AggregateMetric stale2 = table.findOrInsert(snapshot("svc-stale2", "op", "client")); + assertEquals(3, table.size()); + assertEquals(0, stale1.getHitCount()); + assertEquals(0, stale2.getHitCount()); + + table.expungeStaleAggregates(); + + assertEquals(1, table.size()); + // the live entry must still be reachable + assertSame(live, table.findOrInsert(snapshot("svc-live", "op", "client"))); + } + + @Test + void forEachVisitsEveryEntry() { + AggregateTable table = new AggregateTable(8); + table.findOrInsert(snapshot("a", "op", "client")).recordOneDuration(1L); + table.findOrInsert(snapshot("b", "op", "client")).recordOneDuration(2L); + table.findOrInsert(snapshot("c", "op", "client")).recordOneDuration(3L | ERROR_TAG); + + Map visited = new HashMap<>(); + table.forEach(e -> visited.put(e.getService().toString(), e.aggregate.getDuration())); + + assertEquals(3, visited.size()); + assertEquals(1L, visited.get("a")); + assertEquals(2L, visited.get("b")); + assertEquals(3L, visited.get("c")); + } + + @Test + void clearEmptiesTheTable() { + AggregateTable table = new AggregateTable(8); + table.findOrInsert(snapshot("a", "op", "client")); + table.findOrInsert(snapshot("b", "op", "client")); + assertEquals(2, table.size()); + + table.clear(); + + assertTrue(table.isEmpty()); + assertEquals(0, table.size()); + // and re-insertion works after clear + assertNotNull(table.findOrInsert(snapshot("a", "op", "client"))); + } + + @Test + void encodedLabelsAreBuiltOnInsert() { + AggregateTable table = new AggregateTable(4); + List seen = new ArrayList<>(); + table.findOrInsert(snapshot("svc", "op", "client")); + table.forEach(seen::add); + + assertEquals(1, seen.size()); + AggregateEntry e = seen.get(0); + assertEquals("svc", e.getService().toString()); + assertEquals("op", e.getOperationName().toString()); + assertEquals("client", e.getSpanKind().toString()); + } + + // ---------- helpers ---------- + + private static SpanSnapshot snapshot(String service, String operation, String spanKind) { + return builder(service, operation, spanKind).build(); + } + + private static SnapshotBuilder builder(String service, String operation, String spanKind) { + return new SnapshotBuilder(service, operation, spanKind); + } + + private static final class SnapshotBuilder { + private final String service; + private final String operation; + private final String spanKind; + private String[] peerTagPairs; + private long tagAndDuration = 0L; + + SnapshotBuilder(String service, String operation, String spanKind) { + this.service = service; + this.operation = operation; + this.spanKind = spanKind; + } + + SnapshotBuilder peerTags(String... namesAndValues) { + this.peerTagPairs = namesAndValues; + return this; + } + + SpanSnapshot build() { + return new SpanSnapshot( + "resource", + service, + operation, + null, + "web", + (short) 200, + false, + true, + spanKind, + peerTagPairs, + null, + null, + null, + tagAndDuration); + } + } +} diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index 2972ffa2c18..81a476c67c8 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -8,9 +8,8 @@ import datadog.metrics.impl.DDSketchHistograms import datadog.trace.api.Config import datadog.trace.api.WellKnownTags import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString -import datadog.trace.common.metrics.AggregateMetric +import datadog.trace.common.metrics.AggregateEntry import datadog.trace.common.metrics.EventListener -import datadog.trace.common.metrics.MetricKey import datadog.trace.common.metrics.OkHttpSink import datadog.trace.common.metrics.SerializingMetricWriter import java.util.concurrent.CopyOnWriteArrayList @@ -39,14 +38,12 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { sink ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) - writer.add( - new MetricKey("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null), - new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) - ) - writer.add( - new MetricKey("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null), - new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) - ) + def entry1 = AggregateEntry.of("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null) + entry1.aggregate.recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) + writer.add(entry1) + def entry2 = AggregateEntry.of("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null) + entry2.aggregate.recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) + writer.add(entry2) writer.finishBucket() then: diff --git a/internal-api/src/main/java/datadog/trace/util/Hashtable.java b/internal-api/src/main/java/datadog/trace/util/Hashtable.java new file mode 100644 index 00000000000..d7f49dcae00 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/util/Hashtable.java @@ -0,0 +1,553 @@ +package datadog.trace.util; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * Light weight simple Hashtable system that can be useful when HashMap would + * be unnecessarily heavy. + * + *

+ * + * Convenience classes are provided for lower key dimensions. + * + * For higher key dimensions, client code must implement its own class, + * but can still use the support class to ease the implementation complexity. + */ +public abstract class Hashtable { + /** + * Internal base class for entries. Stores the precomputed 64-bit keyHash and + * the chain-next pointer used to link colliding entries within a single bucket. + * + *

Subclasses add the actual key field(s) and a {@code matches(...)} method + * tailored to their key arity. See {@link D1.Entry} and {@link D2.Entry}; for + * higher arities, client code can subclass this directly and use {@link Support} + * to drive the table mechanics. + */ + public static abstract class Entry { + public final long keyHash; + Entry next = null; + + protected Entry(long keyHash) { + this.keyHash = keyHash; + } + + public final void setNext(TEntry next) { + this.next = next; + } + + @SuppressWarnings("unchecked") + public final TEntry next() { + return (TEntry)this.next; + } + } + + /** + * Single-key open hash table with chaining. + * + *

The user supplies an {@link D1.Entry} subclass that carries the key and + * whatever value fields they want to mutate in place, then instantiates this + * class over that entry type. The main advantage over {@code HashMap} + * is that mutating an existing entry's value fields requires no allocation: + * call {@link #get} once and write directly to the returned entry's fields. + * For counter-style workloads this can be several times faster than + * {@code HashMap} and produces effectively zero GC pressure. + * + *

Capacity is fixed at construction. The table does not resize, so the + * caller is responsible for choosing a capacity appropriate to the working + * set. Actual bucket-array length is rounded up to the next power of two. + * + *

Null keys are permitted; they collapse to a single bucket via the + * sentinel hash {@link Long#MIN_VALUE} defined in {@link D1.Entry#hash}. + * + *

Not thread-safe. Concurrent access (including mixing reads with + * writes) requires external synchronization. + * + * @param the key type + * @param the user's {@link D1.Entry D1.Entry<K>} subclass + */ + public static final class D1> { + /** + * Abstract base for {@link D1} entries. Subclass to add value fields you + * wish to mutate in place after retrieving the entry via {@link D1#get}. + * + *

The key is captured at construction and stored alongside its + * precomputed 64-bit hash. {@link #matches(Object)} uses + * {@link Objects#equals} by default; override if a different equality + * semantics is needed (e.g. reference equality for interned keys). + * + * @param the key type + */ + public static abstract class Entry extends Hashtable.Entry { + final K key; + + protected Entry(K key) { + super(hash(key)); + this.key = key; + } + + public boolean matches(Object key) { + return Objects.equals(this.key, key); + } + + public static long hash(Object key) { + return (key == null ) ? Long.MIN_VALUE : key.hashCode(); + } + } + + private final Hashtable.Entry[] buckets; + private int size; + + public D1(int capacity) { + this.buckets = Support.create(capacity); + this.size = 0; + } + + public int size() { + return this.size; + } + + @SuppressWarnings("unchecked") + public TEntry get(K key) { + long keyHash = D1.Entry.hash(key); + Hashtable.Entry[] thisBuckets = this.buckets; + for (Hashtable.Entry e = thisBuckets[Support.bucketIndex(thisBuckets, keyHash)]; e != null; e = e.next) { + if (e.keyHash == keyHash) { + TEntry te = (TEntry) e; + if (te.matches(key)) return te; + } + } + return null; + } + + public TEntry remove(K key) { + long keyHash = D1.Entry.hash(key); + + for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, keyHash); iter.hasNext(); ) { + TEntry curEntry = iter.next(); + + if (curEntry.matches(key)) { + iter.remove(); + this.size -= 1; + return curEntry; + } + } + + return null; + } + + public void insert(TEntry newEntry) { + Hashtable.Entry[] thisBuckets = this.buckets; + int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash); + + Hashtable.Entry curHead = thisBuckets[bucketIndex]; + newEntry.setNext(curHead); + thisBuckets[bucketIndex] = newEntry; + + this.size += 1; + } + + public TEntry insertOrReplace(TEntry newEntry) { + Hashtable.Entry[] thisBuckets = this.buckets; + + for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, newEntry.keyHash); iter.hasNext(); ) { + TEntry curEntry = iter.next(); + + if (curEntry.matches(newEntry.key)) { + iter.replace(newEntry); + return curEntry; + } + } + + int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash); + + Hashtable.Entry curHead = thisBuckets[bucketIndex]; + newEntry.setNext(curHead); + thisBuckets[bucketIndex] = newEntry; + this.size += 1; + return null; + } + + public void clear() { + Support.clear(this.buckets); + this.size = 0; + } + + @SuppressWarnings("unchecked") + public void forEach(Consumer consumer) { + Hashtable.Entry[] thisBuckets = this.buckets; + for (int i = 0; i < thisBuckets.length; i++) { + for (Hashtable.Entry e = thisBuckets[i]; e != null; e = e.next()) { + consumer.accept((TEntry) e); + } + } + } + } + + /** + * Two-key (composite-key) hash table with chaining. + * + *

The user supplies a {@link D2.Entry} subclass carrying both key parts + * and any value fields. Compared to {@code HashMap} this avoids the + * per-lookup {@code Pair} (or record) allocation: both key parts are passed + * directly through {@link #get}, {@link #remove}, {@link #insert}, and + * {@link #insertOrReplace}. Combined with in-place value mutation, this + * makes {@code D2} substantially less GC-intensive than the equivalent + * {@code HashMap} for counter-style workloads. + * + *

Capacity is fixed at construction; the table does not resize. Actual + * bucket-array length is rounded up to the next power of two. + * + *

Key parts are combined into a 64-bit hash via {@link LongHashingUtils}; + * see {@link D2.Entry#hash(Object, Object)}. + * + *

Not thread-safe. + * + * @param first key type + * @param second key type + * @param the user's {@link D2.Entry D2.Entry<K1, K2>} subclass + */ + public static final class D2> { + /** + * Abstract base for {@link D2} entries. Subclass to add value fields you + * wish to mutate in place. + * + *

Both key parts are captured at construction and stored alongside their + * combined 64-bit hash. {@link #matches(Object, Object)} uses + * {@link Objects#equals} pairwise on the two parts. + * + * @param first key type + * @param second key type + */ + public static abstract class Entry extends Hashtable.Entry { + final K1 key1; + final K2 key2; + + protected Entry(K1 key1, K2 key2) { + super(hash(key1, key2)); + this.key1 = key1; + this.key2 = key2; + } + + public boolean matches(K1 key1, K2 key2) { + return Objects.equals(this.key1, key1) && Objects.equals(this.key2, key2); + } + + public static long hash(Object key1, Object key2) { + return LongHashingUtils.hash(key1, key2); + } + } + + private final Hashtable.Entry[] buckets; + private int size; + + public D2(int capacity) { + this.buckets = Support.create(capacity); + this.size = 0; + } + + public int size() { + return this.size; + } + + @SuppressWarnings("unchecked") + public TEntry get(K1 key1, K2 key2) { + long keyHash = D2.Entry.hash(key1, key2); + Hashtable.Entry[] thisBuckets = this.buckets; + for (Hashtable.Entry e = thisBuckets[Support.bucketIndex(thisBuckets, keyHash)]; e != null; e = e.next) { + if (e.keyHash == keyHash) { + TEntry te = (TEntry) e; + if (te.matches(key1, key2)) return te; + } + } + return null; + } + + public TEntry remove(K1 key1, K2 key2) { + long keyHash = D2.Entry.hash(key1, key2); + + for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, keyHash); iter.hasNext(); ) { + TEntry curEntry = iter.next(); + + if (curEntry.matches(key1, key2)) { + iter.remove(); + this.size -= 1; + return curEntry; + } + } + + return null; + } + + public void insert(TEntry newEntry) { + Hashtable.Entry[] thisBuckets = this.buckets; + int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash); + + Hashtable.Entry curHead = thisBuckets[bucketIndex]; + newEntry.setNext(curHead); + thisBuckets[bucketIndex] = newEntry; + + this.size += 1; + } + + public TEntry insertOrReplace(TEntry newEntry) { + Hashtable.Entry[] thisBuckets = this.buckets; + + for (MutatingBucketIterator iter = Support.mutatingBucketIterator(this.buckets, newEntry.keyHash); iter.hasNext(); ) { + TEntry curEntry = iter.next(); + + if (curEntry.matches(newEntry.key1, newEntry.key2)) { + iter.replace(newEntry); + return curEntry; + } + } + + int bucketIndex = Support.bucketIndex(thisBuckets, newEntry.keyHash); + + Hashtable.Entry curHead = thisBuckets[bucketIndex]; + newEntry.setNext(curHead); + thisBuckets[bucketIndex] = newEntry; + this.size += 1; + return null; + } + + public void clear() { + Support.clear(this.buckets); + this.size = 0; + } + + @SuppressWarnings("unchecked") + public void forEach(Consumer consumer) { + Hashtable.Entry[] thisBuckets = this.buckets; + for (int i = 0; i < thisBuckets.length; i++) { + for (Hashtable.Entry e = thisBuckets[i]; e != null; e = e.next()) { + consumer.accept((TEntry) e); + } + } + } + } + + /** + * Internal building blocks for hash-table operations. + * + *

Used by {@link D1} and {@link D2}, and available to package code that + * wants to assemble its own higher-arity table (3+ key parts) without + * re-implementing the bucket-array mechanics. The typical recipe: + * + *

+ * + *

All bucket arrays produced by {@link #create(int)} have a power-of-two + * length, so {@link #bucketIndex(Object[], long)} can use a bit mask. + * + *

Methods on this class are package-private; the class itself is public + * only so that its nested {@link BucketIterator} can be referenced by + * callers in other packages. + */ + public static final class Support { + public static final Hashtable.Entry[] create(int capacity) { + return new Entry[sizeFor(capacity)]; + } + + static final int sizeFor(int requestedCapacity) { + int pow; + for ( pow = 1; pow < requestedCapacity; pow *= 2 ); + return pow; + } + + public static final void clear(Hashtable.Entry[] buckets) { + Arrays.fill(buckets, null); + } + + public static final BucketIterator bucketIterator(Hashtable.Entry[] buckets, long keyHash) { + return new BucketIterator(buckets, keyHash); + } + + public static final MutatingBucketIterator mutatingBucketIterator(Hashtable.Entry[] buckets, long keyHash) { + return new MutatingBucketIterator(buckets, keyHash); + } + + public static final int bucketIndex(Object[] buckets, long keyHash) { + return (int)(keyHash & buckets.length - 1); + } + } + + /** + * Read-only iterator over entries in a single bucket whose {@code keyHash} + * matches a specific search hash. Cheaper than {@link MutatingBucketIterator} + * because it does not track the previous-node pointers required for + * splicing — use it when you only need to walk the chain. + * + *

For {@code remove} or {@code replace} operations, use + * {@link MutatingBucketIterator} instead. + */ + public static final class BucketIterator implements Iterator { + private final long keyHash; + private Hashtable.Entry nextEntry; + + BucketIterator(Hashtable.Entry[] buckets, long keyHash) { + this.keyHash = keyHash; + Hashtable.Entry cur = buckets[Support.bucketIndex(buckets, keyHash)]; + while (cur != null && cur.keyHash != keyHash) cur = cur.next; + this.nextEntry = cur; + } + + @Override + public boolean hasNext() { + return this.nextEntry != null; + } + + @Override + @SuppressWarnings("unchecked") + public TEntry next() { + Hashtable.Entry cur = this.nextEntry; + if (cur == null) throw new NoSuchElementException("no next!"); + + Hashtable.Entry advance = cur.next; + while (advance != null && advance.keyHash != keyHash) advance = advance.next; + this.nextEntry = advance; + + return (TEntry) cur; + } + } + + /** + * Mutating iterator over entries in a single bucket whose {@code keyHash} + * matches a specific search hash. Supports {@link #remove()} and + * {@link #replace(Entry)} to splice the chain in place. + * + *

Carries previous-node pointers for the current entry and the next-match + * entry so that {@code remove} and {@code replace} can fix up the chain in + * O(1) without re-walking from the bucket head. After {@code remove} or + * {@code replace}, iteration may continue with another {@link #next()}. + */ + public static final class MutatingBucketIterator implements Iterator { + private final long keyHash; + + private final Hashtable.Entry[] buckets; + + /** + * The entry prior to the last entry returned by next + * Used for mutating operations + */ + private Hashtable.Entry curPrevEntry; + + /** + * The entry that was last returned by next + */ + private Hashtable.Entry curEntry; + + /** + * The entry prior to the next entry + */ + private Hashtable.Entry nextPrevEntry; + + /** + * The next entry to be returned by next + */ + private Hashtable.Entry nextEntry; + + MutatingBucketIterator(Hashtable.Entry[] buckets, long keyHash) { + this.buckets = buckets; + this.keyHash = keyHash; + + int bucketIndex = Support.bucketIndex(buckets, keyHash); + Hashtable.Entry headEntry = this.buckets[bucketIndex]; + if ( headEntry == null ) { + this.nextEntry = null; + this.nextPrevEntry = null; + + this.curEntry = null; + this.curPrevEntry = null; + } else { + Hashtable.Entry prev, cur; + for ( prev = null, cur = headEntry; cur != null; prev = cur, cur = cur.next() ) { + if ( cur.keyHash == keyHash ) break; + } + this.nextPrevEntry = prev; + this.nextEntry = cur; + + this.curEntry = null; + this.curPrevEntry = null; + } + } + + @Override + public boolean hasNext() { + return (this.nextEntry != null); + } + + @Override + @SuppressWarnings("unchecked") + public TEntry next() { + Hashtable.Entry curEntry = this.nextEntry; + if ( curEntry == null ) throw new NoSuchElementException("no next!"); + + this.curEntry = curEntry; + this.curPrevEntry = this.nextPrevEntry; + + Hashtable.Entry prev, cur; + for ( prev = this.nextEntry, cur = this.nextEntry.next(); cur != null; prev = cur, cur = prev.next() ) { + if ( cur.keyHash == keyHash ) break; + } + this.nextPrevEntry = prev; + this.nextEntry = cur; + + return (TEntry) curEntry; + } + + @Override + public void remove() { + Hashtable.Entry oldCurEntry = this.curEntry; + if ( oldCurEntry == null ) throw new IllegalStateException(); + + this.setPrevNext(oldCurEntry.next()); + + // If the next match was directly after oldCurEntry, its predecessor is now + // curPrevEntry (oldCurEntry was just unlinked from the chain). + if ( this.nextPrevEntry == oldCurEntry ) { + this.nextPrevEntry = this.curPrevEntry; + } + this.curEntry = null; + } + + public void replace(TEntry replacementEntry) { + Hashtable.Entry oldCurEntry = this.curEntry; + if ( oldCurEntry == null ) throw new IllegalStateException(); + + replacementEntry.setNext(oldCurEntry.next()); + this.setPrevNext(replacementEntry); + + // If the next match was directly after oldCurEntry, its predecessor is now + // the replacement entry (which took oldCurEntry's chain slot). + if ( this.nextPrevEntry == oldCurEntry ) { + this.nextPrevEntry = replacementEntry; + } + this.curEntry = replacementEntry; + } + + void setPrevNext(Hashtable.Entry nextEntry) { + if ( this.curPrevEntry == null ) { + Hashtable.Entry[] buckets = this.buckets; + buckets[Support.bucketIndex(buckets, this.keyHash)] = nextEntry; + } else { + this.curPrevEntry.setNext(nextEntry); + } + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java b/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java new file mode 100644 index 00000000000..bc53bc4ecb6 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/util/LongHashingUtils.java @@ -0,0 +1,158 @@ +package datadog.trace.util; + +/** + * This class is intended to be a drop-in replacement for the hashing portions of java.util.Objects. + * This class provides more convenience methods for hashing primitives and includes overrides for + * hash that take many argument lengths to avoid var-args allocation. + */ +public final class LongHashingUtils { + private LongHashingUtils() {} + + public static final long hashCodeX(Object obj) { + return obj == null ? Long.MIN_VALUE : obj.hashCode(); + } + + public static final long hash(boolean value) { + return Boolean.hashCode(value); + } + + public static final long hash(char value) { + return Character.hashCode(value); + } + + public static final long hash(byte value) { + return Byte.hashCode(value); + } + + public static final long hash(short value) { + return Short.hashCode(value); + } + + public static final long hash(int value) { + return Integer.hashCode(value); + } + + public static final long hash(long value) { + return value; + } + + public static final long hash(float value) { + return Float.hashCode(value); + } + + public static final long hash(double value) { + return Double.doubleToRawLongBits(value); + } + + public static final long hash(Object obj0, Object obj1) { + return hash(intHash(obj0), intHash(obj1)); + } + + public static final long hash(int hash0, int hash1) { + return 31L * hash0 + hash1; + } + + private static final int intHash(Object obj) { + return obj == null ? 0 : obj.hashCode(); + } + + public static final long hash(Object obj0, Object obj1, Object obj2) { + return hash(intHash(obj0), intHash(obj1), intHash(obj2)); + } + + public static final long hash(long hash0, long hash1, long hash2) { + // DQH - Micro-optimizing, 31L * 31L will constant fold + // Since there are multiple execution ports for load & store, + // this will make good use of the core. + return 31L * 31L * hash0 + 31L * hash1 + hash2; + } + + public static final long hash(Object obj0, Object obj1, Object obj2, Object obj3) { + return hash(intHash(obj0), intHash(obj1), intHash(obj2), intHash(obj3)); + } + + public static final long hash(int hash0, int hash1, int hash2, int hash3) { + // DQH - Micro-optimizing, 31L * 31L will constant fold + // Since there are multiple execution ports for load & store, + // this will make good use of the core. + return 31L * 31L * 31L * hash0 + 31L * 31L * hash1 + 31L * hash2 + hash3; + } + + public static final long hash(Object obj0, Object obj1, Object obj2, Object obj3, Object obj4) { + return hash(intHash(obj0), intHash(obj1), intHash(obj2), intHash(obj3), intHash(obj4)); + } + + public static final long hash(int hash0, int hash1, int hash2, int hash3, int hash4) { + // DQH - Micro-optimizing, 31L * 31L will constant fold + // Since there are multiple execution ports for load & store, + // this will make good use of the core. + return 31L * 31L * 31L * 31L * hash0 + 31L * 31L * 31L * hash1 + 31L * 31L * hash2 + 31L * hash3 + hash4; + } + + @Deprecated + public static final long hash(int[] hashes) { + long result = 0; + for (int hash : hashes) { + result = addToHash(result, hash); + } + return result; + } + + public static final long addToHash(long hash, int value) { + return 31L * hash + value; + } + + public static final long addToHash(long hash, Object obj) { + return addToHash(hash, intHash(obj)); + } + + public static final long addToHash(long hash, boolean value) { + return addToHash(hash, Boolean.hashCode(value)); + } + + public static final long addToHash(long hash, char value) { + return addToHash(hash, Character.hashCode(value)); + } + + public static final long addToHash(long hash, byte value) { + return addToHash(hash, Byte.hashCode(value)); + } + + public static final long addToHash(long hash, short value) { + return addToHash(hash, Short.hashCode(value)); + } + + public static final long addToHash(long hash, long value) { + return addToHash(hash, Long.hashCode(value)); + } + + public static final long addToHash(long hash, float value) { + return addToHash(hash, Float.hashCode(value)); + } + + public static final long addToHash(long hash, double value) { + return addToHash(hash, Double.hashCode(value)); + } + + public static final long hash(Iterable objs) { + long result = 0; + for (Object obj : objs) { + result = addToHash(result, obj); + } + return result; + } + + /** + * Calling this var-arg version can result in large amounts of allocation (see HashingBenchmark) + * Rather than calliing this method, add another override of hash that handles a larger number of + * arguments or use calls to addToHash. + */ + @Deprecated + public static final long hash(Object[] objs) { + long result = 0; + for (Object obj : objs) { + result = addToHash(result, obj); + } + return result; + } +}