Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
Expand Down Expand Up @@ -54,8 +55,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
}

AgentScope parentScope = null;
final AgentScope.Continuation continuation =
ctx.channel().attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
final AgentScope.Continuation continuation = takeConnectContinuation(ctx.channel());
if (continuation != null) {
parentScope = continuation.activate();
}
Expand Down Expand Up @@ -111,4 +111,16 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
}
}
}

private static AgentScope.Continuation takeConnectContinuation(final Channel channel) {
AgentScope.Continuation continuation =
channel.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
if (continuation == null) {
final Channel parent = channel.parent();
if (parent != null) {
continuation = parent.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
}
}
return continuation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package datadog.trace.instrumentation.reactor.netty;

import static datadog.trace.instrumentation.reactor.netty.CaptureConnectSpan.CONNECT_SPAN;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.netty.Connection;
import reactor.util.context.Context;

public final class ConnectSpanSubscriber implements CoreSubscriber<Connection> {

private final CoreSubscriber<? super Connection> actual;
private final AgentSpan span;

public ConnectSpanSubscriber(
final CoreSubscriber<? super Connection> actual, final AgentSpan span) {
this.actual = actual;
this.span = span;
}

@Override
public void onSubscribe(final Subscription subscription) {
actual.onSubscribe(subscription);
}

@Override
public void onNext(final Connection connection) {
actual.onNext(connection);
}

@Override
public void onError(final Throwable throwable) {
actual.onError(throwable);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public Context currentContext() {
return actual.currentContext().put(CONNECT_SPAN, span);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package datadog.trace.instrumentation.reactor.netty;

import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.CoreSubscriber;
import reactor.netty.Connection;

/**
* Suppresses generic async captures created while Reactor Netty subscribes to connection setup.
*
* <p>The subscriber is wrapped first so the active span is still available from Reactor context;
* {@link TransferConnectSpan} later turns that context value into the continuation consumed by
* Netty request tracing.
*/
@AutoService(InstrumenterModule.class)
public class MonoHttpConnectInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public MonoHttpConnectInstrumentation() {
super("reactor-netty", "reactor-netty-1");
}

@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
// Avoid matching pre-1.0 releases which are not compatible.
return hasClassNamed("reactor.netty.transport.AddressUtils");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ConnectSpanSubscriber",
};
}

@Override
public String instrumentedType() {
return "reactor.netty.http.client.HttpClientConnect$MonoHttpConnect";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
named("subscribe").and(takesArgument(0, named("reactor.core.CoreSubscriber"))),
getClass().getName() + "$SubscribeAdvice");
}

public static class SubscribeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean before(
@Advice.Argument(value = 0, readOnly = false)
CoreSubscriber<? super Connection> subscriber) {
final AgentSpan span = activeSpan();
if (span != null) {
subscriber = new ConnectSpanSubscriber(subscriber, span);
}
if (isAsyncPropagationEnabled()) {
setAsyncPropagationEnabled(false);
return true;
}
return false;
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void after(@Advice.Enter final boolean wasDisabled) {
if (wasDisabled) {
setAsyncPropagationEnabled(true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import io.netty.channel.Channel;
import java.util.function.BiConsumer;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientRequest;
Expand All @@ -16,14 +17,24 @@ public void accept(HttpClientRequest httpClientRequest, Connection connection) {
final AgentSpan span = httpClientRequest.currentContextView().getOrDefault(CONNECT_SPAN, null);
final Continuation continuation = null == span ? null : captureSpan(span);
if (null != continuation) {
Continuation current =
connection
.channel()
.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY)
.getAndSet(continuation);
final Channel channel = connection.channel();
final Continuation current =
channel.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndSet(continuation);
if (null != current) {
current.cancel();
}

// HTTP/2 requests operate on stream channels. Netty captures the TCP connect continuation on
// the parent channel, but request tracing consumes the request-specific continuation from the
// stream channel, so the parent copy must not keep the trace open.
final Channel parent = channel.parent();
if (parent != null) {
final Continuation parentCurrent =
parent.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
if (null != parentCurrent) {
parentCurrent.cancel();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ class ReactorNettyHttp2ClientTest extends InstrumentationSpecification {
.handle { req, res -> res.status(200).send() }
.bindNow()

@Override
boolean useStrictTraceWrites() {
false
}

@Override
def cleanupSpec() {
server?.disposeNow()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datadog.environment.JavaVirtualMachine
import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.agent.test.utils.PortUtils
import datadog.trace.agent.test.naming.TestingNettyHttpNamingConventions
import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator
import io.netty.handler.codec.http.HttpMethod
Expand Down Expand Up @@ -70,6 +71,40 @@ class ReactorNettyHttpClientTest extends HttpClientTest implements TestingNettyH
false
}

def "connection error produces netty.connect error span"() {
given:
def uri = new URI("http://localhost:${PortUtils.UNUSABLE_PORT}/")

when:
runUnderTrace("parent") {
httpClient.get()
.uri(uri)
.responseSingle({ r, b -> b.asString() })
.block()
}

then:
def ex = thrown(Exception)
assertTraces(1) {
trace(2) {
basicSpan(it, "parent", null, ex)
span {
operationName "netty.connect"
resourceName "netty.connect"
childOf span(0)
errored true
tags {
"component" "netty"
"error.type" String
"error.message" String
"error.stack" String
defaultTags()
}
}
}
}
}

@Override
boolean testRemoteConnection() {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ class SpringWebfluxTest extends InstrumentationSpecification {

WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector()).build()

@Override
boolean useStrictTraceWrites() {
false
}

def "Basic GET test #testName"() {
setup:
String url = "http://localhost:$port$urlPath"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan

abstract class SpringWebfluxHttpClientBase extends HttpClientTest implements TestingGenericHttpNamingConventions.ClientV0 {

@Override
boolean useStrictTraceWrites() {
// TODO fix this by making sure that spans get closed properly
return false
}

abstract WebClient createClient(CharSequence component)

abstract void check()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan

abstract class SpringWebfluxHttpClientBase extends HttpClientTest implements TestingGenericHttpNamingConventions.ClientV0 {

@Override
boolean useStrictTraceWrites() {
// TODO fix this by making sure that spans get closed properly
return false
}

abstract WebClient createClient(CharSequence component)

abstract void check()
Expand Down