diff --git a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java index 6c35a87edbb..df5bc74668f 100644 --- a/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java +++ b/dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java @@ -18,6 +18,7 @@ import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -54,8 +55,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann } AgentScope parentScope = null; - final AgentScope.Continuation continuation = - ctx.channel().attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove(); + final AgentScope.Continuation continuation = takeConnectContinuation(ctx.channel()); if (continuation != null) { parentScope = continuation.activate(); } @@ -111,4 +111,16 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann } } } + + private static AgentScope.Continuation takeConnectContinuation(final Channel channel) { + AgentScope.Continuation continuation = + channel.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove(); + if (continuation == null) { + final Channel parent = channel.parent(); + if (parent != null) { + continuation = parent.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove(); + } + } + return continuation; + } } diff --git a/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/ConnectSpanSubscriber.java b/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/ConnectSpanSubscriber.java new file mode 100644 index 00000000000..0d712560ff9 --- /dev/null +++ b/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/ConnectSpanSubscriber.java @@ -0,0 +1,46 @@ +package datadog.trace.instrumentation.reactor.netty; + +import static datadog.trace.instrumentation.reactor.netty.CaptureConnectSpan.CONNECT_SPAN; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.netty.Connection; +import reactor.util.context.Context; + +public final class ConnectSpanSubscriber implements CoreSubscriber { + + private final CoreSubscriber actual; + private final AgentSpan span; + + public ConnectSpanSubscriber( + final CoreSubscriber actual, final AgentSpan span) { + this.actual = actual; + this.span = span; + } + + @Override + public void onSubscribe(final Subscription subscription) { + actual.onSubscribe(subscription); + } + + @Override + public void onNext(final Connection connection) { + actual.onNext(connection); + } + + @Override + public void onError(final Throwable throwable) { + actual.onError(throwable); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public Context currentContext() { + return actual.currentContext().put(CONNECT_SPAN, span); + } +} diff --git a/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/MonoHttpConnectInstrumentation.java b/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/MonoHttpConnectInstrumentation.java new file mode 100644 index 00000000000..d0bfd2706c8 --- /dev/null +++ b/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/MonoHttpConnectInstrumentation.java @@ -0,0 +1,82 @@ +package datadog.trace.instrumentation.reactor.netty; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.matcher.ElementMatcher; +import reactor.core.CoreSubscriber; +import reactor.netty.Connection; + +/** + * Suppresses generic async captures created while Reactor Netty subscribes to connection setup. + * + *

The subscriber is wrapped first so the active span is still available from Reactor context; + * {@link TransferConnectSpan} later turns that context value into the continuation consumed by + * Netty request tracing. + */ +@AutoService(InstrumenterModule.class) +public class MonoHttpConnectInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public MonoHttpConnectInstrumentation() { + super("reactor-netty", "reactor-netty-1"); + } + + @Override + public ElementMatcher.Junction classLoaderMatcher() { + // Avoid matching pre-1.0 releases which are not compatible. + return hasClassNamed("reactor.netty.transport.AddressUtils"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".ConnectSpanSubscriber", + }; + } + + @Override + public String instrumentedType() { + return "reactor.netty.http.client.HttpClientConnect$MonoHttpConnect"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + named("subscribe").and(takesArgument(0, named("reactor.core.CoreSubscriber"))), + getClass().getName() + "$SubscribeAdvice"); + } + + public static class SubscribeAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static boolean before( + @Advice.Argument(value = 0, readOnly = false) + CoreSubscriber subscriber) { + final AgentSpan span = activeSpan(); + if (span != null) { + subscriber = new ConnectSpanSubscriber(subscriber, span); + } + if (isAsyncPropagationEnabled()) { + setAsyncPropagationEnabled(false); + return true; + } + return false; + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void after(@Advice.Enter final boolean wasDisabled) { + if (wasDisabled) { + setAsyncPropagationEnabled(true); + } + } + } +} diff --git a/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/TransferConnectSpan.java b/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/TransferConnectSpan.java index 5d07e5c3c26..e558398e36d 100644 --- a/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/TransferConnectSpan.java +++ b/dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/TransferConnectSpan.java @@ -6,6 +6,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentScope.Continuation; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.netty.channel.Channel; import java.util.function.BiConsumer; import reactor.netty.Connection; import reactor.netty.http.client.HttpClientRequest; @@ -16,14 +17,24 @@ public void accept(HttpClientRequest httpClientRequest, Connection connection) { final AgentSpan span = httpClientRequest.currentContextView().getOrDefault(CONNECT_SPAN, null); final Continuation continuation = null == span ? null : captureSpan(span); if (null != continuation) { - Continuation current = - connection - .channel() - .attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY) - .getAndSet(continuation); + final Channel channel = connection.channel(); + final Continuation current = + channel.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndSet(continuation); if (null != current) { current.cancel(); } + + // HTTP/2 requests operate on stream channels. Netty captures the TCP connect continuation on + // the parent channel, but request tracing consumes the request-specific continuation from the + // stream channel, so the parent copy must not keep the trace open. + final Channel parent = channel.parent(); + if (parent != null) { + final Continuation parentCurrent = + parent.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove(); + if (null != parentCurrent) { + parentCurrent.cancel(); + } + } } } } diff --git a/dd-java-agent/instrumentation/reactor-netty-1.0/src/test/groovy/ReactorNettyHttp2ClientTest.groovy b/dd-java-agent/instrumentation/reactor-netty-1.0/src/test/groovy/ReactorNettyHttp2ClientTest.groovy index 7c5c40f8bfa..a81fa12c8f5 100644 --- a/dd-java-agent/instrumentation/reactor-netty-1.0/src/test/groovy/ReactorNettyHttp2ClientTest.groovy +++ b/dd-java-agent/instrumentation/reactor-netty-1.0/src/test/groovy/ReactorNettyHttp2ClientTest.groovy @@ -24,11 +24,6 @@ class ReactorNettyHttp2ClientTest extends InstrumentationSpecification { .handle { req, res -> res.status(200).send() } .bindNow() - @Override - boolean useStrictTraceWrites() { - false - } - @Override def cleanupSpec() { server?.disposeNow() diff --git a/dd-java-agent/instrumentation/reactor-netty-1.0/src/test/groovy/ReactorNettyHttpClientTest.groovy b/dd-java-agent/instrumentation/reactor-netty-1.0/src/test/groovy/ReactorNettyHttpClientTest.groovy index 2949a87dc6c..4a0a4cf10cf 100644 --- a/dd-java-agent/instrumentation/reactor-netty-1.0/src/test/groovy/ReactorNettyHttpClientTest.groovy +++ b/dd-java-agent/instrumentation/reactor-netty-1.0/src/test/groovy/ReactorNettyHttpClientTest.groovy @@ -1,5 +1,6 @@ import datadog.environment.JavaVirtualMachine import datadog.trace.agent.test.base.HttpClientTest +import datadog.trace.agent.test.utils.PortUtils import datadog.trace.agent.test.naming.TestingNettyHttpNamingConventions import datadog.trace.instrumentation.netty41.client.NettyHttpClientDecorator import io.netty.handler.codec.http.HttpMethod @@ -70,6 +71,40 @@ class ReactorNettyHttpClientTest extends HttpClientTest implements TestingNettyH false } + def "connection error produces netty.connect error span"() { + given: + def uri = new URI("http://localhost:${PortUtils.UNUSABLE_PORT}/") + + when: + runUnderTrace("parent") { + httpClient.get() + .uri(uri) + .responseSingle({ r, b -> b.asString() }) + .block() + } + + then: + def ex = thrown(Exception) + assertTraces(1) { + trace(2) { + basicSpan(it, "parent", null, ex) + span { + operationName "netty.connect" + resourceName "netty.connect" + childOf span(0) + errored true + tags { + "component" "netty" + "error.type" String + "error.message" String + "error.stack" String + defaultTags() + } + } + } + } + } + @Override boolean testRemoteConnection() { return false diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/bootTest/groovy/SpringWebfluxTest.groovy b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/bootTest/groovy/SpringWebfluxTest.groovy index f34edaf1f33..598a5498f5e 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/bootTest/groovy/SpringWebfluxTest.groovy +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/bootTest/groovy/SpringWebfluxTest.groovy @@ -53,11 +53,6 @@ class SpringWebfluxTest extends InstrumentationSpecification { WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector()).build() - @Override - boolean useStrictTraceWrites() { - false - } - def "Basic GET test #testName"() { setup: String url = "http://localhost:$port$urlPath" diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/test/groovy/dd/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientBase.groovy b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/test/groovy/dd/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientBase.groovy index 8c60fddbff3..3b8fca1f2a9 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/test/groovy/dd/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientBase.groovy +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/test/groovy/dd/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientBase.groovy @@ -21,12 +21,6 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan abstract class SpringWebfluxHttpClientBase extends HttpClientTest implements TestingGenericHttpNamingConventions.ClientV0 { - @Override - boolean useStrictTraceWrites() { - // TODO fix this by making sure that spans get closed properly - return false - } - abstract WebClient createClient(CharSequence component) abstract void check() diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-6.0/src/test/groovy/dd/trace/instrumentation/springwebflux6/client/SpringWebfluxHttpClientBase.groovy b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-6.0/src/test/groovy/dd/trace/instrumentation/springwebflux6/client/SpringWebfluxHttpClientBase.groovy index 0048091d901..b419398ca6a 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-6.0/src/test/groovy/dd/trace/instrumentation/springwebflux6/client/SpringWebfluxHttpClientBase.groovy +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-6.0/src/test/groovy/dd/trace/instrumentation/springwebflux6/client/SpringWebfluxHttpClientBase.groovy @@ -22,12 +22,6 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan abstract class SpringWebfluxHttpClientBase extends HttpClientTest implements TestingGenericHttpNamingConventions.ClientV0 { - @Override - boolean useStrictTraceWrites() { - // TODO fix this by making sure that spans get closed properly - return false - } - abstract WebClient createClient(CharSequence component) abstract void check()