Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ public DDLLMObsSpan(
spanName = kind;
}

// If no explicit session_id was passed, inherit it from the active LLMObs parent.
// This matches dd-trace-py and dd-trace-js, and the public SDK docs which state that
// session_id only needs to be set on the root span — descendants inherit it.
if (sessionId == null || sessionId.isEmpty()) {
String inherited = LLMObsContext.currentSessionId();
if (inherited != null && !inherited.isEmpty()) {
sessionId = inherited;
}
}

AgentTracer.SpanBuilder spanBuilder =
AgentTracer.get()
.buildSpan(LLM_OBS_INSTRUMENTATION_NAME, spanName)
Expand Down Expand Up @@ -109,7 +119,8 @@ public DDLLMObsSpan(
}
}
span.setTag(LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, parentSpanID);
scope = LLMObsContext.attach(span.context());
// Propagate the effective sessionId to descendant LLMObs spans via the context.
scope = LLMObsContext.attach(span.context(), this.hasSessionId ? sessionId : null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,40 @@ class DDLLMObsSpanTest extends DDSpecification{
null | "has_session_id:0"
}

def "child LLMObs span inherits session_id from parent context when none is passed"() {
setup:
def expectedSessionId = "session-abc-123"
def parent = llmObsSpan(Tags.LLMOBS_WORKFLOW_SPAN_KIND, "parent-workflow", expectedSessionId)

when:
// Child created with null sessionId — should inherit from the parent's LLMObsContext.
def child = llmObsSpan(Tags.LLMOBS_LLM_SPAN_KIND, "child-llm", null)

then:
def innerChild = (AgentSpan) child.span
expectedSessionId == innerChild.getTag(LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID)

cleanup:
child.finish()
parent.finish()
}

def "child LLMObs span has no session_id when neither parent nor child passes one"() {
setup:
def parent = llmObsSpan(Tags.LLMOBS_WORKFLOW_SPAN_KIND, "parent-workflow", null)

when:
def child = llmObsSpan(Tags.LLMOBS_LLM_SPAN_KIND, "child-llm", null)

then:
def innerChild = (AgentSpan) child.span
null == innerChild.getTag(LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID)

cleanup:
child.finish()
parent.finish()
}

def "global dd_tags are included in LLMObs span tags"() {
setup:
injectSysConfig("trace.global.tags", "team:backend,owner:ml-platform")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class LLMObsSpanMapper implements RemoteMapper {
private static final byte[] DD = "_dd".getBytes(StandardCharsets.UTF_8);
private static final byte[] APM_TRACE_ID = "apm_trace_id".getBytes(StandardCharsets.UTF_8);
private static final byte[] PARENT_ID = "parent_id".getBytes(StandardCharsets.UTF_8);
private static final byte[] SESSION_ID = "session_id".getBytes(StandardCharsets.UTF_8);
private static final byte[] NAME = "name".getBytes(StandardCharsets.UTF_8);
private static final byte[] DURATION = "duration".getBytes(StandardCharsets.UTF_8);
private static final byte[] START_NS = "start_ns".getBytes(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -88,6 +89,8 @@ public class LLMObsSpanMapper implements RemoteMapper {
private static final byte[] LLM_TOOL_RESULT_RESULT = "result".getBytes(StandardCharsets.UTF_8);

private static final String PARENT_ID_TAG_INTERNAL_FULL = LLMOBS_TAG_PREFIX + "parent_id";
private static final String SESSION_ID_TAG_INTERNAL_FULL =
LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID;

private final MetaWriter metaWriter = new MetaWriter();
private final int size;
Expand Down Expand Up @@ -126,7 +129,13 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
}

for (CoreSpan<?> span : llmobsSpans) {
writable.startMap(11);
// Read session_id off the span before opening the map so we can size it correctly.
// We deliberately do NOT remove the tag (unlike parent_id) — the session_id:<value>
// entry must remain in the tags[] array to match dd-trace-py and dd-trace-js behavior.
String sessionId = span.getTag(SESSION_ID_TAG_INTERNAL_FULL);
boolean hasSessionId = sessionId != null && !sessionId.isEmpty();

writable.startMap(hasSessionId ? 12 : 11);
// 1
writable.writeUTF8(SPAN_ID);
writable.writeString(String.valueOf(span.getSpanId()), null);
Expand Down Expand Up @@ -166,7 +175,14 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
writable.writeUTF8(APM_TRACE_ID);
writable.writeString(span.getTraceId().toHexString(), null);

/* 9 (metrics), 10 (tags), 11 meta */
// 9 — optional top-level session_id field. Required by the LLMObs HTTP intake schema
// and by the LLM Trace Explorer's Sessions filter, which keys off this field.
if (hasSessionId) {
writable.writeUTF8(SESSION_ID);
writable.writeString(sessionId, null);
}

/* metrics, tags, meta */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keep position numbers for following keys in the comment?

span.processTagsAndBaggage(metaWriter.withWritable(writable, getErrorsMap(span)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,126 @@ class LLMObsSpanMapperTest extends DDCoreSpecification {
spanNames.contains("chat-completion-3")
}

def "test LLMObsSpanMapper writes top-level session_id when set"() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need separate test cases that are very repetitive and can instead be just part of "test LLMObsSpanMapper serialization".

setup:
def mapper = new LLMObsSpanMapper()
def tracer = tracerBuilder().writer(new ListWriter()).build()

def sessionId = "abc-123-session"

def llmSpan = tracer.buildSpan("datadog", "openai.request")
.withResourceName("createCompletion")
.withTag("_ml_obs_tag.span.kind", Tags.LLMOBS_LLM_SPAN_KIND)
.withTag("_ml_obs_tag.model_name", "gpt-4")
.withTag("_ml_obs_tag.model_provider", "openai")
.withTag("_ml_obs_tag.session_id", sessionId)
.start()
llmSpan.setSpanType(InternalSpanTypes.LLMOBS)
llmSpan.finish()

def trace = [llmSpan]
CapturingByteBufferConsumer sink = new CapturingByteBufferConsumer()
MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(16 * 1024, sink))

when:
packer.format(trace, mapper)
packer.flush()

then:
sink.captured != null
def payload = mapper.newPayload()
payload.withBody(1, sink.captured)

def channel = new ByteArrayOutputStream()
payload.writeTo(new WritableByteChannel() {
@Override
int write(ByteBuffer src) throws IOException {
def bytes = new byte[src.remaining()]
src.get(bytes)
channel.write(bytes)
return bytes.length
}

@Override
boolean isOpen() {
return true
}

@Override
void close() throws IOException { }
})

def result = objectMapper.readValue(channel.toByteArray(), Map)
def spanData = result["spans"][0]

then:
// Top-level session_id field is present with the right value — this is what
// the LLM Trace Explorer's Sessions filter queries.
spanData.containsKey("session_id")
spanData["session_id"] == sessionId

// The session_id:<value> entry is ALSO present in the tags[] array, matching
// dd-trace-py and dd-trace-js wire-format behavior.
spanData["tags"].contains("session_id:${sessionId}".toString())
}

def "test LLMObsSpanMapper omits top-level session_id when not set"() {
setup:
def mapper = new LLMObsSpanMapper()
def tracer = tracerBuilder().writer(new ListWriter()).build()

def llmSpan = tracer.buildSpan("datadog", "openai.request")
.withResourceName("createCompletion")
.withTag("_ml_obs_tag.span.kind", Tags.LLMOBS_LLM_SPAN_KIND)
.withTag("_ml_obs_tag.model_name", "gpt-4")
.withTag("_ml_obs_tag.model_provider", "openai")
.start()
llmSpan.setSpanType(InternalSpanTypes.LLMOBS)
llmSpan.finish()

def trace = [llmSpan]
CapturingByteBufferConsumer sink = new CapturingByteBufferConsumer()
MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(16 * 1024, sink))

when:
packer.format(trace, mapper)
packer.flush()

then:
sink.captured != null
def payload = mapper.newPayload()
payload.withBody(1, sink.captured)

def channel = new ByteArrayOutputStream()
payload.writeTo(new WritableByteChannel() {
@Override
int write(ByteBuffer src) throws IOException {
def bytes = new byte[src.remaining()]
src.get(bytes)
channel.write(bytes)
return bytes.length
}

@Override
boolean isOpen() {
return true
}

@Override
void close() throws IOException { }
})

def result = objectMapper.readValue(channel.toByteArray(), Map)
def spanData = result["spans"][0]

then:
// No top-level session_id field when the tag was never set.
!spanData.containsKey("session_id")

// And no session_id entry leaks into tags[] either.
spanData["tags"].every { !it.startsWith("session_id:") }
}

static class CapturingByteBufferConsumer implements ByteBufferConsumer {

ByteBuffer captured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,33 @@ private LLMObsContext() {
}

private static final ContextKey<AgentSpanContext> CONTEXT_KEY = ContextKey.named("llmobs_span");
private static final ContextKey<String> SESSION_ID_KEY = ContextKey.named("llmobs_session_id");

public static ContextScope attach(AgentSpanContext ctx) {
return Context.current().with(CONTEXT_KEY, ctx).attach();
return attach(ctx, null);
}

/**
* Attach an LLMObs span context, optionally propagating a session_id to descendant LLMObs spans.
* When sessionId is non-null and non-empty, child LLMObs spans started under this context that do
* not specify their own sessionId will inherit it via {@link #currentSessionId()}.
*/
public static ContextScope attach(AgentSpanContext ctx, String sessionId) {
Context updated = Context.current().with(CONTEXT_KEY, ctx);
if (sessionId != null && !sessionId.isEmpty()) {
updated = updated.with(SESSION_ID_KEY, sessionId);
}
return updated.attach();
}

public static AgentSpanContext current() {
return Context.current().get(CONTEXT_KEY);
}

/**
* Return the session_id propagated from an enclosing LLMObs span, or null if no parent set one.
*/
public static String currentSessionId() {
return Context.current().get(SESSION_ID_KEY);
}
}