Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package datadog.trace.common.metrics;

import static datadog.trace.api.ProtocolVersion.V0_4;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -52,6 +54,7 @@ static List<CoreSpan<?>> generateTrace(int len) {
final List<CoreSpan<?>> trace = new ArrayList<>();
for (int i = 0; i < len; i++) {
SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1);
span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
span.setTag("peer.hostname", Strings.random(10));
trace.add(span);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package datadog.trace.common.metrics;

import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.trace.api.WellKnownTags;
import datadog.trace.common.writer.Writer;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.CoreTracer;
import datadog.trace.core.DDSpan;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.util.Strings;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

/**
* Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances
* instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link
* CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's
* dispatch.
*/
@State(Scope.Benchmark)
@Warmup(iterations = 1, time = 30, timeUnit = SECONDS)
@Measurement(iterations = 3, time = 30, timeUnit = SECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
public class ConflatingMetricsAggregatorDDSpanBenchmark {

private static final CoreTracer TRACER =
CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build();

private final DDAgentFeaturesDiscovery featuresDiscovery =
new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
private final ConflatingMetricsAggregator aggregator =
new ConflatingMetricsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
HealthMetrics.NO_OP,
new ConflatingMetricsAggregatorBenchmark.NullSink(),
2048,
2048,
false);
private final List<CoreSpan<?>> spans = generateTrace(64);

static List<CoreSpan<?>> generateTrace(int len) {
final List<CoreSpan<?>> trace = new ArrayList<>();
for (int i = 0; i < len; i++) {
DDSpan span = (DDSpan) TRACER.startSpan("benchmark", "op");
span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
span.setTag("peer.hostname", Strings.random(10));
// Fix duration; bypasses the wall clock and avoids per-fork drift.
span.finishWithDuration(10);
trace.add(span);
}
return trace;
}

static class NoopWriter implements Writer {
@Override
public void write(List<DDSpan> trace) {}

@Override
public void start() {}

@Override
public boolean flush() {
return true;
}

@Override
public void close() {}

@Override
public void incrementDropCounts(int spanCount) {}
}

@Benchmark
public void benchmark(Blackhole blackhole) {
blackhole.consume(aggregator.publish(spans));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,13 @@
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER;
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG;
import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT;
import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP;
import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.Collections.unmodifiableSet;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.common.queue.Queues;
Expand All @@ -36,12 +30,11 @@
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDTraceCoreInfo;
import datadog.trace.core.SpanKindFilter;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.util.AgentTaskScheduler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -50,7 +43,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.jctools.queues.MessagePassingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,15 +74,19 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
value -> UTF8BytesString.create(key + ":" + value));
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";

private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS =
unmodifiableSet(
new HashSet<>(
Arrays.asList(
SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER)));
private static final SpanKindFilter METRICS_ELIGIBLE_KINDS =
SpanKindFilter.builder()
.includeServer()
.includeClient()
.includeProducer()
.includeConsumer()
.build();

private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION =
unmodifiableSet(
new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER)));
private static final SpanKindFilter PEER_AGGREGATION_KINDS =
SpanKindFilter.builder().includeClient().includeProducer().includeConsumer().build();

private static final SpanKindFilter INTERNAL_KIND =
SpanKindFilter.builder().includeInternal().build();

private final Set<String> ignoredResources;
private final MessagePassingQueue<Batch> batchPool;
Expand Down Expand Up @@ -289,36 +285,30 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
if (features.supportsMetrics()) {
for (CoreSpan<?> span : trace) {
boolean isTopLevel = span.isTopLevel();
final CharSequence spanKind = span.unsafeGetTag(SPAN_KIND, "");
if (shouldComputeMetric(span, spanKind)) {
if (shouldComputeMetric(span, isTopLevel)) {
final CharSequence resourceName = span.getResourceName();
if (resourceName != null && ignoredResources.contains(resourceName.toString())) {
// skip publishing all children
forceKeep = false;
break;
}
counted++;
forceKeep |= publish(span, isTopLevel, spanKind);
forceKeep |= publish(span, isTopLevel);
}
}
healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep);
}
return forceKeep;
}

private boolean shouldComputeMetric(CoreSpan<?> span, @Nonnull CharSequence spanKind) {
return (span.isMeasured() || span.isTopLevel() || spanKindEligible(spanKind))
private boolean shouldComputeMetric(CoreSpan<?> span, boolean isTopLevel) {
return (span.isMeasured() || isTopLevel || span.isKind(METRICS_ELIGIBLE_KINDS))
&& span.getLongRunningVersion()
<= 0 // either not long-running or unpublished long-running span
&& span.getDurationNano() > 0;
}

private boolean spanKindEligible(@Nonnull CharSequence spanKind) {
// use toString since it could be a CharSequence...
return ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
}

private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanKind) {
private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
// Extract HTTP method and endpoint only if the feature is enabled
String httpMethod = null;
String httpEndpoint = null;
Expand All @@ -335,6 +325,9 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE);
grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null;
}
// CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString
// tag values don't trigger a ClassCastException on the String assignment.
final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString();
MetricKey newKey =
new MetricKey(
span.getResourceName(),
Expand All @@ -347,7 +340,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
span.getParentId() == 0,
SPAN_KINDS.computeIfAbsent(
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
getPeerTags(span, spanKind.toString()),
getPeerTags(span),
httpMethod,
httpEndpoint,
grpcStatusCode);
Expand Down Expand Up @@ -382,23 +375,26 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
return span.getError() > 0;
}

private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span) {
if (span.isKind(PEER_AGGREGATION_KINDS)) {
final Set<String> eligiblePeerTags = features.peerTags();
List<UTF8BytesString> peerTags = new ArrayList<>(eligiblePeerTags.size());
List<UTF8BytesString> peerTags = null;
for (String peerTag : eligiblePeerTags) {
Object value = span.unsafeGetTag(peerTag);
if (value != null) {
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
if (peerTags == null) {
peerTags = new ArrayList<>(eligiblePeerTags.size());
}
peerTags.add(
cacheAndCreator
.getLeft()
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
}
}
return peerTags;
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
return peerTags == null ? Collections.emptyList() : peerTags;
} else if (span.isKind(INTERNAL_KIND)) {
// in this case only the base service should be aggregated if present
final Object baseService = span.unsafeGetTag(BASE_SERVICE);
if (baseService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ default <U> U unsafeGetTag(CharSequence name) {

boolean isForceKeep();

boolean isKind(SpanKindFilter filter);

CharSequence getType();

/**
Expand Down
4 changes: 4 additions & 0 deletions dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,10 @@ public boolean isOutbound() {
return ordinal == DDSpanContext.SPAN_KIND_CLIENT || ordinal == DDSpanContext.SPAN_KIND_PRODUCER;
}

public boolean isKind(SpanKindFilter filter) {
return filter.matches(context.getSpanKindOrdinal());
}

@Override
public void copyPropagationAndBaggage(final AgentSpan source) {
if (source instanceof DDSpan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,22 +771,26 @@ static boolean tagEquals(String tagValue, String tagLiteral) {
* span.kind is set.
*/
public void setSpanKindOrdinal(String kind) {
spanKindOrdinal = spanKindOrdinalOf(kind);
}

public static byte spanKindOrdinalOf(String kind) {
if (kind == null) {
spanKindOrdinal = SPAN_KIND_UNSET;
return SPAN_KIND_UNSET;
} else if (tagEquals(kind, Tags.SPAN_KIND_SERVER)) {
spanKindOrdinal = SPAN_KIND_SERVER;
return SPAN_KIND_SERVER;
} else if (tagEquals(kind, Tags.SPAN_KIND_CLIENT)) {
spanKindOrdinal = SPAN_KIND_CLIENT;
return SPAN_KIND_CLIENT;
} else if (tagEquals(kind, Tags.SPAN_KIND_PRODUCER)) {
spanKindOrdinal = SPAN_KIND_PRODUCER;
return SPAN_KIND_PRODUCER;
} else if (tagEquals(kind, Tags.SPAN_KIND_CONSUMER)) {
spanKindOrdinal = SPAN_KIND_CONSUMER;
return SPAN_KIND_CONSUMER;
} else if (tagEquals(kind, Tags.SPAN_KIND_INTERNAL)) {
spanKindOrdinal = SPAN_KIND_INTERNAL;
return SPAN_KIND_INTERNAL;
} else if (tagEquals(kind, Tags.SPAN_KIND_BROKER)) {
spanKindOrdinal = SPAN_KIND_BROKER;
return SPAN_KIND_BROKER;
} else {
spanKindOrdinal = SPAN_KIND_CUSTOM;
return SPAN_KIND_CUSTOM;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package datadog.trace.core;

public final class SpanKindFilter {
public static final class Builder {
private int kindMask;

public Builder includeServer() {
return this.include(DDSpanContext.SPAN_KIND_SERVER);
}

public Builder includeClient() {
return this.include(DDSpanContext.SPAN_KIND_CLIENT);
}

public Builder includeProducer() {
return this.include(DDSpanContext.SPAN_KIND_PRODUCER);
}

public Builder includeConsumer() {
return this.include(DDSpanContext.SPAN_KIND_CONSUMER);
}

public Builder includeInternal() {
return this.include(DDSpanContext.SPAN_KIND_INTERNAL);
}

public Builder includeBroker() {
return this.include(DDSpanContext.SPAN_KIND_BROKER);
}

public final SpanKindFilter build() {
return new SpanKindFilter(this.kindMask);
}

private Builder include(int spanKindConstant) {
this.kindMask |= (1 << spanKindConstant);
return this;
}
}

public static final Builder builder() {
return new Builder();
}

private final int kindMask;

private SpanKindFilter(int kindMask) {
this.kindMask = kindMask;
}

/** Test whether a span with the given span.kind string passes this filter. */
public boolean matches(String spanKind) {
return matches(DDSpanContext.spanKindOrdinalOf(spanKind));
}

/** Fast-path test for callers that already hold the span's cached kind ordinal. */
public boolean matches(byte spanKindOrdinal) {
return (kindMask & (1 << spanKindOrdinal)) != 0;
}
}
Loading
Loading