Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat(dsm): Unify activation check using dynamic config only
Remove active span coupling and use trace dynamic config instead
  • Loading branch information
PerfectSlayer committed Jun 23, 2025
commit e67e2172433eede13e9c1e2b565212eccefef807
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package datadog.trace.bootstrap.instrumentation.decorator;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;

import datadog.context.Context;
import datadog.context.propagation.CarrierSetter;
import datadog.trace.api.Config;
import datadog.trace.api.TraceConfig;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.Tags;
Expand All @@ -29,18 +29,10 @@ public void onURI(@Nonnull final AgentSpan span, @Nonnull final URI uri) {

public <C> void injectContext(Context context, final C request, CarrierSetter<C> setter) {
// Add additional default DSM context for HTTP clients if missing but DSM is enabled
if (isDataStreamsEnabled(context) && DataStreamsContext.fromContext(context) == null) {
if (DataStreamsContext.fromContext(context) == null && traceConfig().isDataStreamsEnabled()) {
context = context.with(DataStreamsContext.forHttpClient());
}
// Inject context into carrier
defaultPropagator().inject(context, request, setter);
}

private static boolean isDataStreamsEnabled(Context context) {
final AgentSpan agentSpan;
final TraceConfig tracerConfig;
return (agentSpan = AgentSpan.fromContext(context)) != null
&& (tracerConfig = agentSpan.traceConfig()) != null
&& tracerConfig.isDataStreamsEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
Expand Down Expand Up @@ -44,7 +45,6 @@ private static DataStreamsContext createDsmContext() {

private static final Set<String> IGNORED_METHODS = Config.get().getGrpcIgnoredOutboundMethods();
private static final BitSet CLIENT_ERROR_STATUSES = Config.get().getGrpcClientErrorStatuses();
private static final boolean DATA_STREAMS_ENABLED = Config.get().isDataStreamsEnabled();

private static final ClassValue<UTF8BytesString> MESSAGE_TYPES =
GenericClassValue.of(
Expand Down Expand Up @@ -111,7 +111,7 @@ public <ReqT, RespT> AgentSpan startCall(MethodDescriptor<ReqT, RespT> method) {
}

public <C> void injectContext(Context context, final C request, CarrierSetter<C> setter) {
if (DATA_STREAMS_ENABLED) {
if (traceConfig().isDataStreamsEnabled()) {
context = context.with(createDsmContext());
}
defaultPropagator().inject(context, request, setter);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.instrumentation.aws.v0;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;

Expand Down Expand Up @@ -94,8 +95,7 @@ public AgentSpan onRequest(final AgentSpan span, final Request request) {
CharSequence awsRequestName = AwsNameCache.getQualifiedName(request);
span.setResourceName(awsRequestName, RPC_COMMAND_NAME);

if ("s3".equalsIgnoreCase(awsSimplifiedServiceName)
&& span.traceConfig().isDataStreamsEnabled()) {
if ("s3".equalsIgnoreCase(awsSimplifiedServiceName) && traceConfig().isDataStreamsEnabled()) {
span.setTag(Tags.HTTP_REQUEST_CONTENT_LENGTH, getRequestContentLength(request));
}

Expand Down Expand Up @@ -192,7 +192,7 @@ public AgentSpan onRequest(final AgentSpan span, final Request request) {
}

// DSM
if (span.traceConfig().isDataStreamsEnabled()) {
if (traceConfig().isDataStreamsEnabled()) {
if (null != streamArn && "AmazonKinesis".equals(awsServiceName)) {
switch (awsOperation.getSimpleName()) {
case PUT_RECORD_OPERATION_NAME:
Expand Down Expand Up @@ -242,7 +242,7 @@ public AgentSpan onRequest(final AgentSpan span, final Request request) {
public AgentSpan onServiceResponse(
final AgentSpan span, final String awsService, final Response response) {
if ("s3".equalsIgnoreCase(simplifyServiceName(awsService))
&& span.traceConfig().isDataStreamsEnabled()) {
&& traceConfig().isDataStreamsEnabled()) {
long responseSize = getResponseContentLength(response);
span.setTag(Tags.HTTP_RESPONSE_CONTENT_LENGTH, responseSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpanWithoutScope;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.blackholeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
Expand Down Expand Up @@ -107,12 +108,12 @@ public void afterResponse(final Request<?> request, final Response<?> response)
}
}
if (span != null
&& span.traceConfig().isDataStreamsEnabled()
&& traceConfig().isDataStreamsEnabled()
&& "AmazonKinesis".equals(request.getServiceName())
&& "GetRecords".equals(requestAccess.getOperationNameFromType())) {
String streamArn = requestAccess.getStreamARN(originalRequest);
if (null != streamArn) {
List records =
List<?> records =
GetterAccess.of(response.getAwsResponse()).getRecords(response.getAwsResponse());
if (null != records) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.instrumentation.aws.v2;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
Expand Down Expand Up @@ -131,7 +132,7 @@ public AgentSpan onSdkRequest(

// S3
request.getValueForField("Bucket", String.class).ifPresent(name -> setBucketName(span, name));
if ("s3".equalsIgnoreCase(awsServiceName) && span.traceConfig().isDataStreamsEnabled()) {
if ("s3".equalsIgnoreCase(awsServiceName) && traceConfig().isDataStreamsEnabled()) {
request
.getValueForField("Key", String.class)
.ifPresent(key -> span.setTag(InstrumentationTags.AWS_OBJECT_KEY, key));
Expand Down Expand Up @@ -169,7 +170,7 @@ public AgentSpan onSdkRequest(
Optional<String> kinesisStreamArn = request.getValueForField("StreamARN", String.class);
kinesisStreamArn.ifPresent(
streamArn -> {
if (span.traceConfig().isDataStreamsEnabled()) {
if (traceConfig().isDataStreamsEnabled()) {
attributes.putAttribute(KINESIS_STREAM_ARN_ATTRIBUTE, streamArn);
}
int streamNameStart = streamArn.indexOf(":stream/");
Expand All @@ -182,7 +183,7 @@ public AgentSpan onSdkRequest(
request.getValueForField("TableName", String.class).ifPresent(name -> setTableName(span, name));

// DSM
if (span.traceConfig().isDataStreamsEnabled()) {
if (traceConfig().isDataStreamsEnabled()) {
if (kinesisStreamArn.isPresent()
&& "kinesis".equalsIgnoreCase(awsServiceName)
&& KINESIS_PUT_RECORD_OPERATION_NAMES.contains(awsOperationName)) {
Expand Down Expand Up @@ -324,7 +325,7 @@ public AgentSpan onSdkResponse(

final String awsServiceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
final String awsOperationName = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
if (span.traceConfig().isDataStreamsEnabled()
if (traceConfig().isDataStreamsEnabled()
&& "kinesis".equalsIgnoreCase(awsServiceName)
&& "GetRecords".equals(awsOperationName)) {
// https://github.com/DataDog/dd-trace-py/blob/864abb6c99e1cb0449904260bac93e8232261f2a/ddtrace/contrib/botocore/patch.py#L350
Expand Down Expand Up @@ -373,7 +374,7 @@ public AgentSpan onSdkResponse(
}
}

if ("s3".equalsIgnoreCase(awsServiceName) && span.traceConfig().isDataStreamsEnabled()) {
if ("s3".equalsIgnoreCase(awsServiceName) && traceConfig().isDataStreamsEnabled()) {
long responseSize = getResponseContentLength(httpResponse);
span.setTag(Tags.HTTP_RESPONSE_CONTENT_LENGTH, responseSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
Expand Down Expand Up @@ -44,7 +45,6 @@ private static DataStreamsContext createDsmContext() {

private static final Set<String> IGNORED_METHODS = Config.get().getGrpcIgnoredOutboundMethods();
private static final BitSet CLIENT_ERROR_STATUSES = Config.get().getGrpcClientErrorStatuses();
private static final boolean DATA_STREAMS_ENABLED = Config.get().isDataStreamsEnabled();

private static final ClassValue<UTF8BytesString> MESSAGE_TYPES =
GenericClassValue.of(
Expand Down Expand Up @@ -111,7 +111,7 @@ public <ReqT, RespT> AgentSpan startCall(MethodDescriptor<ReqT, RespT> method) {
}

public <C> void injectContext(Context context, final C request, CarrierSetter<C> setter) {
if (DATA_STREAMS_ENABLED) {
if (traceConfig().isDataStreamsEnabled()) {
context = context.with(createDsmContext());
}
defaultPropagator().inject(context, request, setter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_POLL;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
Expand Down Expand Up @@ -205,13 +205,7 @@ public static void muzzleCheck(ConsumerRecord record) {
public static class RecordsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter() {
boolean dataStreamsEnabled;
if (activeSpan() != null) {
dataStreamsEnabled = activeSpan().traceConfig().isDataStreamsEnabled();
} else {
dataStreamsEnabled = Config.get().isDataStreamsEnabled();
}
if (dataStreamsEnabled) {
if (traceConfig().isDataStreamsEnabled()) {
final AgentSpan span = startSpan(KAFKA_POLL);
return activateSpan(span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG;
Expand Down Expand Up @@ -110,7 +111,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
sortedTags.put(TYPE_TAG, "kafka");

final long payloadSize =
span.traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(val) : 0;
traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(val) : 0;
if (STREAMING_CONTEXT.isDisabledForTopic(val.topic())) {
AgentTracer.get()
.getDataStreamsMonitoring()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package datadog.trace.instrumentation.kafka_clients38;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT;
import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_POLL;

import datadog.trace.api.Config;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand All @@ -22,13 +21,7 @@
public class RecordsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter() {
boolean dataStreamsEnabled;
if (activeSpan() != null) {
dataStreamsEnabled = activeSpan().traceConfig().isDataStreamsEnabled();
} else {
dataStreamsEnabled = Config.get().isDataStreamsEnabled();
}
if (dataStreamsEnabled) {
if (traceConfig().isDataStreamsEnabled()) {
final AgentSpan span = startSpan(KAFKA_POLL);
return activateSpan(span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG;
Expand Down Expand Up @@ -110,7 +111,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
sortedTags.put(TYPE_TAG, "kafka");

final long payloadSize =
span.traceConfig().isDataStreamsEnabled() ? Utils.computePayloadSizeBytes(val) : 0;
traceConfig().isDataStreamsEnabled() ? Utils.computePayloadSizeBytes(val) : 0;
if (StreamingContext.STREAMING_CONTEXT.isDisabledForTopic(val.topic())) {
AgentTracer.get()
.getDataStreamsMonitoring()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG;
Expand Down Expand Up @@ -262,7 +263,7 @@ public static void start(
sortedTags.put(TYPE_TAG, "kafka");

final long payloadSize =
span.traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(record.value) : 0;
traceConfig().isDataStreamsEnabled() ? computePayloadSizeBytes(record.value) : 0;
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())) {
AgentTracer.get()
.getDataStreamsMonitoring()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package datadog.trace.instrumentation.servlet.dispatcher;

import static datadog.context.propagation.Propagators.defaultPropagator;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;

import datadog.context.Context;
import datadog.context.propagation.CarrierSetter;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
Expand All @@ -17,7 +17,6 @@ public class RequestDispatcherDecorator extends BaseDecorator {
UTF8BytesString.create("java-web-servlet-dispatcher");
public static final String DD_CONTEXT_PATH_ATTRIBUTE = "datadog.context.path";
public static final String DD_SERVLET_PATH_ATTRIBUTE = "datadog.servlet.path";
private static final boolean DATA_STREAMS_ENABLED = Config.get().isDataStreamsEnabled();

@Override
protected String[] instrumentationNames() {
Expand Down Expand Up @@ -46,7 +45,7 @@ public AgentSpan onError(final AgentSpan span, final Throwable throwable) {

public <C> void injectContext(Context context, final C request, CarrierSetter<C> setter) {
// Add additional default DSM context for HTTP clients if missing but DSM is enabled
if (DATA_STREAMS_ENABLED) {
if (traceConfig().isDataStreamsEnabled()) {
context = context.with(DataStreamsContext.forHttpClient());
}
// Inject context into carrier
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.spark;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.CONSUMER_GROUP_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.PARTITION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
Expand Down Expand Up @@ -1297,7 +1298,7 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat

private static void reportKafkaOffsets(
final String appName, final AgentSpan span, final SourceProgress progress) {
if (!span.traceConfig().isDataStreamsEnabled()
if (!traceConfig().isDataStreamsEnabled()
|| progress == null
|| progress.description() == null) {
return;
Expand Down
Loading