Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
public class ConflatingMetricsAggregatorBenchmark {
public class ClientStatsAggregatorBenchmark {
private final DDAgentFeaturesDiscovery featuresDiscovery =
new FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
private final ConflatingMetricsAggregator aggregator =
new ConflatingMetricsAggregator(
private final ClientStatsAggregator aggregator =
new ClientStatsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.openjdk.jmh.infra.Blackhole;

/**
* Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances
* instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link
* Parallels {@link ClientStatsAggregatorBenchmark} but uses real {@link DDSpan} instances instead
* of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link
* CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's
* dispatch.
*/
Expand All @@ -39,21 +39,21 @@
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
public class ConflatingMetricsAggregatorDDSpanBenchmark {
public class ClientStatsAggregatorDDSpanBenchmark {

private static final CoreTracer TRACER =
CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build();

private final DDAgentFeaturesDiscovery featuresDiscovery =
new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
private final ConflatingMetricsAggregator aggregator =
new ConflatingMetricsAggregator(
private final ClientStatsAggregator aggregator =
new ClientStatsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
HealthMetrics.NO_OP,
new ConflatingMetricsAggregatorBenchmark.NullSink(),
new ClientStatsAggregatorBenchmark.NullSink(),
2048,
2048,
false);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import java.util.function.Consumer;

/**
* Consumer-side {@link AggregateMetric} store, keyed on the raw fields of a {@link SpanSnapshot}.
* Consumer-side {@link AggregateMetric} store, keyed on the canonical UTF8-encoded labels of a
* {@link SpanSnapshot}.
*
* <p>Replaces the prior {@code LRUCache<MetricKey, AggregateMetric>}. The win is on the
* steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise
* {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache
* lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) live on the {@link
* AggregateEntry} itself and are built once per unique key at insert time.
* <p>{@link #findOrInsert} canonicalizes the snapshot's fields through the cardinality handlers (so
* cardinality-blocked values share a sentinel and collapse into one entry) and then computes the
* lookup hash from that canonical form. Canonicalization runs into a reusable {@link
* AggregateEntry.Canonical} scratch buffer; on a hit nothing is allocated, on a miss the buffer's
* references are copied into a fresh entry and the buffer is overwritten on the next call.
*
* <p><b>Not thread-safe.</b> The aggregator thread is the sole writer; {@link #clear()} must be
* routed through the inbox rather than called from arbitrary threads.
Expand All @@ -19,6 +20,7 @@ final class AggregateTable {

private final Hashtable.Entry[] buckets;
private final int maxAggregates;
private final AggregateEntry.Canonical canonical = new AggregateEntry.Canonical();
private int size;

AggregateTable(int maxAggregates) {
Expand All @@ -40,20 +42,21 @@ boolean isEmpty() {
* the caller should drop the data point in that case.
*/
AggregateMetric findOrInsert(SpanSnapshot snapshot) {
long keyHash = AggregateEntry.hashOf(snapshot);
canonical.populate(snapshot);
long keyHash = canonical.keyHash;
int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash);
for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) {
if (e.keyHash == keyHash) {
AggregateEntry candidate = (AggregateEntry) e;
if (candidate.matches(snapshot)) {
if (canonical.matches(candidate)) {
return candidate.aggregate;
}
}
}
if (size >= maxAggregates && !evictOneStale()) {
return null;
}
AggregateEntry entry = AggregateEntry.forSnapshot(snapshot, new AggregateMetric());
AggregateEntry entry = canonical.toEntry(new AggregateMetric());
entry.setNext(buckets[bucketIndex]);
buckets[bucketIndex] = entry;
size++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ final class Aggregator implements Runnable {
this.healthMetrics = healthMetrics;
}

public void clearAggregates() {
this.aggregates.clear();
}

@Override
public void run() {
Thread currentThread = Thread.currentThread();
Expand Down Expand Up @@ -149,6 +145,9 @@ private void report(long when, SignalItem signal) {
}
dirty = false;
}
// Reset cardinality handlers each report cycle so the per-field budgets refresh.
// Safe to call on this (aggregator) thread; handlers are HashMap-based and not thread-safe.
AggregateEntry.resetCardinalityHandlers();
signal.complete();
if (skipped) {
log.debug("skipped metrics reporting because no points have changed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT;
import static datadog.trace.api.DDSpanTypes.RPC;
import static datadog.trace.api.DDTags.BASE_SERVICE;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG;
import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR;
Expand Down Expand Up @@ -40,14 +38,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConflatingMetricsAggregator implements MetricsAggregator, EventListener {
public final class ClientStatsAggregator implements MetricsAggregator, EventListener {

private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class);
private static final Logger log = LoggerFactory.getLogger(ClientStatsAggregator.class);

private static final Map<String, String> DEFAULT_HEADERS =
Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION);

private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
private static final String SYNTHETICS_ORIGIN = "synthetics";

private static final SpanKindFilter METRICS_ELIGIBLE_KINDS =
SpanKindFilter.builder()
Expand Down Expand Up @@ -76,7 +74,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve

private volatile AgentTaskScheduler.Scheduled<?> cancellation;

public ConflatingMetricsAggregator(
public ClientStatsAggregator(
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics) {
Expand All @@ -97,7 +95,7 @@ public ConflatingMetricsAggregator(
config.isTraceResourceRenamingEnabled());
}

ConflatingMetricsAggregator(
ClientStatsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
Expand All @@ -119,7 +117,7 @@ public ConflatingMetricsAggregator(
includeEndpointInMetrics);
}

ConflatingMetricsAggregator(
ClientStatsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
Expand All @@ -143,7 +141,7 @@ public ConflatingMetricsAggregator(
includeEndpointInMetrics);
}

ConflatingMetricsAggregator(
ClientStatsAggregator(
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
Expand Down Expand Up @@ -244,6 +242,14 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
boolean forceKeep = false;
int counted = 0;
if (features.supportsMetrics()) {
// Sync the peer-aggregation schema once per trace; peer-tag configuration is stable for
// the duration of a single trace publish in production (DDAgentFeaturesDiscovery returns
// the same Set instance until remote-config reconfiguration).
Set<String> eligiblePeerTags = features.peerTags();
PeerTagSchema peerAggSchema =
(eligiblePeerTags == null || eligiblePeerTags.isEmpty())
? null
: PeerTagSchema.currentSyncedTo(eligiblePeerTags);
for (CoreSpan<?> span : trace) {
boolean isTopLevel = span.isTopLevel();
if (shouldComputeMetric(span, isTopLevel)) {
Expand All @@ -254,7 +260,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
break;
}
counted++;
forceKeep |= publish(span, isTopLevel);
forceKeep |= publish(span, isTopLevel, peerAggSchema);
}
}
healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep);
Expand All @@ -269,7 +275,7 @@ private boolean shouldComputeMetric(CoreSpan<?> span, boolean isTopLevel) {
&& span.getDurationNano() > 0;
}

private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
private boolean publish(CoreSpan<?> span, boolean isTopLevel, PeerTagSchema peerAggSchema) {
// Extract HTTP method and endpoint only if the feature is enabled
String httpMethod = null;
String httpEndpoint = null;
Expand All @@ -286,14 +292,26 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE);
grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null;
}
// CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString
// tag values don't trigger a ClassCastException on the String assignment.
final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString();
// DDSpan resolves this from a cached span.kind ordinal via a small lookup array, skipping a
// tag-map lookup. Other CoreSpan impls fall back to the tag map by default.
String spanKind = span.getSpanKindString();
if (spanKind == null) {
spanKind = "";
}

boolean error = span.getError() > 0;
long tagAndDuration =
span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L);

PeerTagSchema peerTagSchema = peerTagSchemaFor(span, peerAggSchema);
String[] peerTagValues =
peerTagSchema == null ? null : capturePeerTagValues(span, peerTagSchema);
if (peerTagValues == null) {
// capture returned no non-null values -- drop the schema reference so the consumer doesn't
// bother iterating an all-null array.
peerTagSchema = null;
}

SpanSnapshot snapshot =
new SpanSnapshot(
span.getResourceName(),
Expand All @@ -305,7 +323,8 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
isSynthetic(span),
span.getParentId() == 0,
spanKind,
extractPeerTagPairs(span),
peerTagSchema,
peerTagValues,
httpMethod,
httpEndpoint,
grpcStatusCode,
Expand All @@ -317,43 +336,45 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
return error;
}

private String[] extractPeerTagPairs(CoreSpan<?> span) {
if (span.isKind(PEER_AGGREGATION_KINDS)) {
final Set<String> eligiblePeerTags = features.peerTags();
String[] pairs = null;
int count = 0;
for (String peerTag : eligiblePeerTags) {
Object value = span.unsafeGetTag(peerTag);
if (value != null) {
if (pairs == null) {
// pairs are flattened [name, value, ...]; size for worst case
pairs = new String[eligiblePeerTags.size() * 2];
}
pairs[count++] = peerTag;
pairs[count++] = value.toString();
/**
* Picks the peer-tag schema for a span. The {@code peerAggSchema} argument is the per-trace
* cached schema (synced from {@code features.peerTags()} once in {@link #publish(List)}); it's
* {@code null} when no peer tags are configured. For internal-kind spans the static {@link
* PeerTagSchema#INTERNAL} schema is used regardless.
*/
private static PeerTagSchema peerTagSchemaFor(CoreSpan<?> span, PeerTagSchema peerAggSchema) {
if (peerAggSchema != null && span.isKind(PEER_AGGREGATION_KINDS)) {
return peerAggSchema;
}
if (span.isKind(INTERNAL_KIND)) {
return PeerTagSchema.INTERNAL;
}
return null;
}

/**
* Captures the span's peer tag values into a {@code String[]} parallel to {@code schema.names}.
* Returns {@code null} when none of the configured peer tags are set on the span.
*/
private static String[] capturePeerTagValues(CoreSpan<?> span, PeerTagSchema schema) {
String[] names = schema.names;
int n = names.length;
String[] values = null;
for (int i = 0; i < n; i++) {
Object v = span.unsafeGetTag(names[i]);
if (v != null) {
if (values == null) {
values = new String[n];
}
}
if (pairs == null) {
return null;
}
if (count < pairs.length) {
String[] trimmed = new String[count];
System.arraycopy(pairs, 0, trimmed, 0, count);
return trimmed;
}
return pairs;
} else if (span.isKind(INTERNAL_KIND)) {
// in this case only the base service should be aggregated if present
final Object baseService = span.unsafeGetTag(BASE_SERVICE);
if (baseService != null) {
return new String[] {BASE_SERVICE, baseService.toString()};
values[i] = v.toString();
}
}
return null;
return values;
}

private static boolean isSynthetic(CoreSpan<?> span) {
return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
CharSequence origin = span.getOrigin();
return origin != null && SYNTHETICS_ORIGIN.contentEquals(origin);
}

public void stop() {
Expand Down Expand Up @@ -399,17 +420,16 @@ private void disable() {
if (!features.supportsMetrics()) {
log.debug("Disabling metric reporting because an agent downgrade was detected");
// Route the clear through the inbox so the aggregator thread is the only writer.
// AggregateTable is not thread-safe; calling clearAggregates() directly from this thread
// would race with Drainer.accept on the aggregator thread.
// AggregateTable is not thread-safe; clearing it directly from this thread would race
// with Drainer.accept on the aggregator thread.
inbox.offer(CLEAR);
}
}

private static final class ReportTask
implements AgentTaskScheduler.Task<ConflatingMetricsAggregator> {
private static final class ReportTask implements AgentTaskScheduler.Task<ClientStatsAggregator> {

@Override
public void run(ConflatingMetricsAggregator target) {
public void run(ClientStatsAggregator target) {
target.report();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static MetricsAggregator createMetricsAggregator(
HealthMetrics healthMetrics) {
if (config.isTracerMetricsEnabled()) {
log.debug("tracer metrics enabled");
return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics);
return new ClientStatsAggregator(config, sharedCommunicationObjects, healthMetrics);
}
log.debug("tracer metrics disabled");
return NoOpMetricsAggregator.INSTANCE;
Expand Down
Loading
Loading