Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
spark: use OpenLineage context to generate trace/span id if present
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
  • Loading branch information
mobuchowski committed Apr 23, 2025
commit cc7cdde36a3f040d75694e4259c0887b599b2eb5
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,7 @@ private void captureOpenlineageContextIfPresent(
AgentTracer.SpanBuilder builder, OpenlineageParentContext context) {
builder.asChildOf(context);

builder.withSpanId(context.getChildRootSpanId());

log.debug(
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
context,
context.getTraceId(),
context.getChildRootSpanId());
log.debug("Captured Openlineage context: {}, with trace_id: {}", context, context.getTraceId());

builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class OpenlineageParentContext implements AgentSpanContext {

private final DDTraceId traceId;
private final long spanId;
private final long childRootSpanId;

private final String parentJobNamespace;
private final String parentJobName;
Expand All @@ -50,23 +49,29 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
return Optional.empty();
}

if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
log.error("Have parent info, but not root parent info. Can't construct valid trace id.");
Comment thread
mobuchowski marked this conversation as resolved.
Outdated
return Optional.empty();
}

String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE);
String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME);
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);

if (!UUID.matcher(parentRunId).matches()) {
log.error("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
Comment thread
mobuchowski marked this conversation as resolved.
Outdated
return Optional.empty();
}

if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
log.error("Have parent info, but not root parent info. Can't construct valid trace id.");
String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE);
String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME);
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID);

if (!UUID.matcher(rootParentRunId).matches()) {
log.error("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
Comment thread
mobuchowski marked this conversation as resolved.
Outdated
return Optional.empty();
}

String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE, "");
String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME, "");
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID, "");

return Optional.of(
new OpenlineageParentContext(
parentJobNamespace,
Expand Down Expand Up @@ -101,19 +106,15 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
this.rootParentJobName = rootParentJobName;
this.rootParentRunId = rootParentRunId;

if (this.parentRunId != null) {
if (this.rootParentRunId != null) {
traceId = computeTraceId(this.rootParentRunId);
spanId = computeSpanId(this.parentRunId);
} else if (this.parentRunId != null) {
traceId = computeTraceId(this.parentRunId);
spanId = DDSpanId.ZERO;

if (this.rootParentRunId != null) {
childRootSpanId = computeSpanId(this.rootParentRunId);
} else {
childRootSpanId = DDSpanId.ZERO;
}
spanId = computeSpanId(this.parentRunId);
} else {
traceId = DDTraceId.ZERO;
spanId = DDSpanId.ZERO;
childRootSpanId = DDSpanId.ZERO;
}

log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId);
Expand All @@ -124,6 +125,7 @@ private long computeSpanId(String runId) {
}

private DDTraceId computeTraceId(String runId) {
log.debug("Generating traceID from runId: {}", runId);
return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A));
}

Expand All @@ -137,10 +139,6 @@ public long getSpanId() {
return spanId;
}

public long getChildRootSpanId() {
return childRootSpanId;
}

@Override
public AgentTraceCollector getTraceCollector() {
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,39 @@
package datadog.trace.instrumentation.spark

import datadog.trace.api.DDSpanId
import org.apache.spark.SparkConf
import spock.lang.Specification

class OpenlineageParentContextTest extends Specification {
def "should create none empty OpenLineageParentContext using SHA-256 for TraceID and root span SpanId if all required fields are present" () {
def "should create OpenLineageParentContext with particular trace id based on root parent id" () {
given:
SparkConf mockSparkConf = Mock(SparkConf)

when:
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> parentRunId
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId

then:
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
parentContext.isPresent()

parentContext.get().getParentJobNamespace() == "default"
parentContext.get().getParentJobName() == "dag-push-to-s3-spark.upload_to_s3"
parentContext.get().getParentRunId() == expectedParentRunId
parentContext.get().getRootParentRunId() == rootParentRunId
parentContext.get().getParentRunId() == parentRunId

parentContext.get().traceId.toLong() == expectedTraceId
parentContext.get().spanId == DDSpanId.ZERO
parentContext.get().childRootSpanId == expectedRootSpanId
parentContext.get().traceId.toString() == expectedTraceId
parentContext.get().spanId.toString() == expectedSpanId

where:
parentRunId | expectedParentRunId | expectedTraceId | expectedRootSpanId
"ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | 0xa475569dbce5e6cfL | 0xa475569dbce5e6cfL
"ad3b6baa-8d88-3b38-8dbe-f06232249a85" | "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | 0x31da6680bd14991bL | 0x31da6680bd14991bL
rootParentRunId | parentRunId | expectedTraceId | expectedSpanId
"01964820-5280-7674-b04e-82fbed085f39" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "13959090542865903119" | "2903780135964948649"
"1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e" | "6f6f6f6f-5e5e-4d4d-3c3c-2b2b2b2b2b2b" | "15830118871223350489" | "8020087091656517257"
}

def "should create empty OpenLineageParentContext if any required field is missing" () {
Expand All @@ -43,20 +44,24 @@ class OpenlineageParentContextTest extends Specification {
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> jobNamespacePresent
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> jobNamePresent
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runIdPresent
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentIdPresent

then:
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
parentContext.isPresent() == expected

where:
jobNamespacePresent | jobNamePresent | runIdPresent | expected
true | true | false | false
true | false | true | false
false | true | true | false
true | false | false | false
false | true | false | false
false | false | true | false
false | false | false | false
jobNamespacePresent | jobNamePresent | runIdPresent | rootParentIdPresent | expected
true | true | true | false | false
true | true | false | false | false
true | true | true | false | false
true | true | false | true | false
true | false | true | false | false
false | true | true | true | false
true | false | false | false | false
false | true | false | false | false
false | false | true | true | false
false | false | false | false | false
}

def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () {
Expand All @@ -67,9 +72,12 @@ class OpenlineageParentContextTest extends Specification {
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runId
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> runId


then:
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
Expand All @@ -83,5 +91,33 @@ class OpenlineageParentContextTest extends Specification {
"6afeb6ee-729d-37f7-b8e6f47ca694" | false
"6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true
}

def "should only generate a non-empty OpenlineageParentContext if rootParentRunId is a valid UUID" () {
given:
SparkConf mockSparkConf = Mock(SparkConf)

when:
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> "6afeb6ee-729d-37f7-ad73-b8e6f47ca694"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId


then:
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
parentContext.isPresent() == expected

where:
rootParentRunId | expected
"6afeb6ee-729d-37f7-ad73-b8e6f47ca694" | true
" " | false
"invalid-uuid" | false
"6afeb6ee-729d-37f7-b8e6f47ca694" | false
"6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true
}
}