diff --git a/.server-changes/trace-view-large-runs-subtree.md b/.server-changes/trace-view-large-runs-subtree.md new file mode 100644 index 00000000000..c28d5197099 --- /dev/null +++ b/.server-changes/trace-view-large-runs-subtree.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Fix empty trace views for child and nested runs in very large traces. The dashboard and retrieve-trace API now return the requested run's span subtree. diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index c4c3ac88c48..6eac6ca35bc 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -1,6 +1,7 @@ import { millisecondsToNanoseconds, RunAnnotations } from "@trigger.dev/core/v3"; import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView"; import { prisma, type PrismaClient } from "~/db.server"; +import { logger } from "~/services/logger.server"; import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents"; import { getUsername } from "~/utils/username"; import { SpanSummary } from "~/v3/eventRepository/eventRepository.types"; @@ -179,16 +180,49 @@ export class RunPresenter { run.runtimeEnvironment.organizationId ); - // get the events + const traceTimeBounds = { + startCreatedAt: run.rootTaskRun?.createdAt ?? run.createdAt, + endCreatedAt: run.completedAt ?? undefined, + }; + + // Fast path: full trace summary. Slow path: subtree fetch when the anchor + // span fell past the row cap (large traces ordered by start_time ASC). let traceSummary = await repository.getTraceSummary( getTaskEventStoreTableForRun(run), run.runtimeEnvironment.id, run.traceId, - run.rootTaskRun?.createdAt ?? run.createdAt, - run.completedAt ?? undefined, + traceTimeBounds.startCreatedAt, + traceTimeBounds.endCreatedAt, { includeDebugLogs: showDebug } ); + let isTruncated = traceSummary?.isTruncated ?? false; + const hasAnchorSpan = traceSummary?.spans.some((span) => span.id === run.spanId) ?? false; + + if (traceSummary && !hasAnchorSpan) { + logger.warn("Trace summary missing anchor span, falling back to subtree fetch", { + runId: run.friendlyId, + spanId: run.spanId, + traceId: run.traceId, + spanCount: traceSummary.spans.length, + }); + + const subtreeSummary = await repository.getTraceSubtreeSummary( + getTaskEventStoreTableForRun(run), + run.runtimeEnvironment.id, + run.traceId, + run.spanId, + traceTimeBounds.startCreatedAt, + traceTimeBounds.endCreatedAt, + { includeDebugLogs: showDebug } + ); + + if (subtreeSummary) { + traceSummary = subtreeSummary; + isTruncated = subtreeSummary.isTruncated ?? false; + } + } + if (!traceSummary) { const spanSummary: SpanSummary = { id: run.spanId, @@ -237,11 +271,22 @@ export class RunPresenter { // Resolve agent-kind once so the tree renderer can swap icon/colour for // the current run's spans without doing per-row lookups. - const isAgentRun = - RunAnnotations.safeParse(run.annotations).data?.taskKind === "AGENT"; + const isAgentRun = RunAnnotations.safeParse(run.annotations).data?.taskKind === "AGENT"; //this tree starts at the passed in span (hides parent elements if there are any) const tree = createTreeFromFlatItems(traceSummary.spans, run.spanId); + const missingAnchor = !traceSummary.spans.some((span) => span.id === run.spanId) || !tree; + + if (missingAnchor) { + logger.warn("Trace view anchor span not found in trace summary", { + runId: run.friendlyId, + spanId: run.spanId, + traceId: run.traceId, + spanCount: traceSummary.spans.length, + }); + + isTruncated = true; + } //we need the start offset for each item, and the total duration of the entire tree const treeRootStartTimeMs = tree ? tree?.data.startTime.getTime() : 0; @@ -313,6 +358,8 @@ export class RunPresenter { : undefined, overridesBySpanId: traceSummary.overridesBySpanId, linkedRunIdBySpanId, + isTruncated, + missingAnchor, }, maximumLiveReloadingSetting: repository.maximumLiveReloadingSetting, }; diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx index de23b935cd6..7ee6c540bec 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx @@ -36,6 +36,7 @@ import { AdminDebugTooltip } from "~/components/admin/debugTooltip"; import { PageBody } from "~/components/layout/AppLayout"; import { Badge } from "~/components/primitives/Badge"; import { Button, LinkButton } from "~/components/primitives/Buttons"; +import { Callout } from "~/components/primitives/Callout"; import { CopyableText } from "~/components/primitives/CopyableText"; import { DateTimeShort } from "~/components/primitives/DateTime"; import { Dialog, DialogTrigger } from "~/components/primitives/Dialog"; @@ -599,8 +600,16 @@ function TraceView({ return <>; } - const { events, duration, rootSpanStatus, rootStartedAt, queuedDuration, overridesBySpanId } = - trace; + const { + events, + duration, + rootSpanStatus, + rootStartedAt, + queuedDuration, + overridesBySpanId, + isTruncated = false, + missingAnchor = false, + } = trace; const changeToSpan = useDebounce((selectedSpan: string) => { replaceSearchParam("span", selectedSpan, { replace: true }); @@ -647,31 +656,44 @@ function TraceView({ id={resizableSettings.parent.main.id} min={resizableSettings.parent.main.min} > - { - //instantly close the panel if no span is selected - if (!selectedSpan) { - replaceSearchParam("span"); - return; - } - - changeToSpan(selectedSpan); - }} - totalDuration={duration} - rootSpanStatus={rootSpanStatus} - rootStartedAt={rootStartedAt ? new Date(rootStartedAt) : undefined} - queuedDuration={queuedDuration} - environmentType={run.environment.type} - shouldLiveReload={isLiveReloading} - maximumLiveReloadingSetting={maximumLiveReloadingSetting} - rootRun={run.rootTaskRun} - parentRun={run.parentTaskRun} - isCompleted={run.completedAt !== null} - treeSnapshot={resizable.tree as ResizableSnapshot} - /> +
+ {isTruncated && ( +
+ + {missingAnchor + ? "Trace too large to display completely." + : "This run's trace is partially displayed because it exceeds the view limit."} + +
+ )} +
+ { + //instantly close the panel if no span is selected + if (!selectedSpan) { + replaceSearchParam("span"); + return; + } + + changeToSpan(selectedSpan); + }} + totalDuration={duration} + rootSpanStatus={rootSpanStatus} + rootStartedAt={rootStartedAt ? new Date(rootStartedAt) : undefined} + queuedDuration={queuedDuration} + environmentType={run.environment.type} + shouldLiveReload={isLiveReloading} + maximumLiveReloadingSetting={maximumLiveReloadingSetting} + rootRun={run.rootTaskRun} + parentRun={run.parentTaskRun} + isCompleted={run.completedAt !== null} + treeSnapshot={resizable.tree as ResizableSnapshot} + /> +
+
> & {} } - | { source: "buffer"; run: NonNullable>> }; + | { + source: "buffer"; + run: NonNullable>>; + }; async function findPgRun(runId: string, environmentId: string) { - return runStore.findRun( - { friendlyId: runId, runtimeEnvironmentId: environmentId }, - $replica - ); + return runStore.findRun({ friendlyId: runId, runtimeEnvironmentId: environmentId }, $replica); } export const loader = createLoaderApiRoute( @@ -96,10 +93,11 @@ export const loader = createLoaderApiRoute( authentication.environment.organization.id ); - const traceSummary = await eventRepository.getTraceDetailedSummary( + const traceSummary = await eventRepository.getTraceDetailedSubtreeSummary( getTaskEventStoreTableForRun(run), authentication.environment.id, run.traceId, + run.spanId, run.createdAt, run.completedAt ?? undefined ); diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index 1e091944e6a..0b2d0872988 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -1298,44 +1298,335 @@ export class ClickhouseEventRepository implements IEventRepository { endCreatedAt?: Date, options?: { includeDebugLogs?: boolean } ): Promise { - const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 60_000); - const endCreatedAtWithBuffer = endCreatedAt - ? new Date(endCreatedAt.getTime() + 60_000) - : undefined; + const records = await this.#fetchTraceSummaryRecords({ + environmentId, + traceId, + startCreatedAt, + endCreatedAt, + options, + limit: this._config.maximumTraceSummaryViewCount, + }); - const queryBuilder = - this._version === "v2" - ? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder() - : this._clickhouse.taskEvents.traceSummaryQueryBuilder(); + if (!records) { + return; + } - queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); - queryBuilder.where("trace_id = {traceId: String}", { traceId }); - queryBuilder.where("start_time >= {startCreatedAt: String}", { - startCreatedAt: convertDateToNanoseconds(startCreatedAtWithBuffer).toString(), + return this.#buildTraceSummaryFromRecords(records); + } + + async getTraceSubtreeSummary( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + anchorSpanId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): Promise { + const { records, isTruncated, missingAnchor } = await this.#fetchTraceSubtreeRecords({ + environmentId, + traceId, + anchorSpanId, + startCreatedAt, + endCreatedAt, + options, + limit: this._config.maximumTraceSummaryViewCount, }); - if (endCreatedAtWithBuffer) { - queryBuilder.where("start_time <= {endCreatedAt: String}", { - endCreatedAt: convertDateToNanoseconds(endCreatedAtWithBuffer).toString(), + if (missingAnchor) { + return; + } + + const summary = this.#buildTraceSummaryFromRecords(records, { + rootSpanId: anchorSpanId, + }); + + if (!summary) { + return; + } + + return { + ...summary, + isTruncated, + }; + } + + async #fetchTraceSubtreeRecords({ + environmentId, + traceId, + anchorSpanId, + startCreatedAt, + endCreatedAt, + options, + limit: maxRows, + }: { + environmentId: string; + traceId: string; + anchorSpanId: string; + startCreatedAt: Date; + endCreatedAt?: Date; + options?: { includeDebugLogs?: boolean }; + limit?: number; + }): Promise<{ + records: TaskEventSummaryV1Result[]; + isTruncated: boolean; + missingAnchor: boolean; + }> { + return this.#collectTraceSubtreeRecords({ + anchorSpanId, + maxRows, + fetchAncestor: (batch) => + this.#fetchTraceSummaryRecords({ + environmentId, + traceId, + options, + skipTimeWindow: true, + ...batch, + }), + fetchDescendant: (batch) => + this.#fetchTraceSummaryRecords({ + environmentId, + traceId, + startCreatedAt, + endCreatedAt, + options, + ...batch, + }), + }); + } + + async #fetchTraceDetailedSubtreeRecords({ + environmentId, + traceId, + anchorSpanId, + startCreatedAt, + endCreatedAt, + options, + limit: maxRows, + }: { + environmentId: string; + traceId: string; + anchorSpanId: string; + startCreatedAt: Date; + endCreatedAt?: Date; + options?: { includeDebugLogs?: boolean }; + limit?: number; + }): Promise<{ + records: TaskEventDetailedSummaryV1Result[]; + isTruncated: boolean; + missingAnchor: boolean; + }> { + return this.#collectTraceSubtreeRecords({ + anchorSpanId, + maxRows, + fetchAncestor: (batch) => + this.#fetchTraceDetailedSummaryRecords({ + environmentId, + traceId, + options, + skipTimeWindow: true, + ...batch, + }), + fetchDescendant: (batch) => + this.#fetchTraceDetailedSummaryRecords({ + environmentId, + traceId, + startCreatedAt, + endCreatedAt, + options, + ...batch, + }), + }); + } + + async #collectTraceSubtreeRecords({ + anchorSpanId, + maxRows, + fetchAncestor, + fetchDescendant, + }: { + anchorSpanId: string; + maxRows?: number; + fetchAncestor: (batch: { spanIds: string[]; limit?: number }) => Promise; + fetchDescendant: (batch: { + spanIds?: string[]; + parentSpanIds?: string[]; + limit?: number; + }) => Promise; + }): Promise<{ + records: T[]; + isTruncated: boolean; + missingAnchor: boolean; + }> { + const allRecords: T[] = []; + const collectedSpanIds = new Set(); + let isTruncated = false; + + const anchorRecords = await fetchDescendant({ + spanIds: [anchorSpanId], + limit: maxRows, + }); + + if (!anchorRecords || anchorRecords.length === 0) { + return { records: [], isTruncated: false, missingAnchor: true }; + } + + if (maxRows && anchorRecords.length >= maxRows) { + isTruncated = true; + } + + allRecords.push(...anchorRecords); + collectedSpanIds.add(anchorSpanId); + + let parentSpanId = this.#parentSpanIdFromRecords(anchorRecords, anchorSpanId); + while (parentSpanId) { + if (collectedSpanIds.has(parentSpanId)) { + break; + } + + if (maxRows && allRecords.length >= maxRows) { + isTruncated = true; + break; + } + + const parentRecords = await fetchAncestor({ + spanIds: [parentSpanId], + limit: maxRows ? maxRows - allRecords.length : undefined, }); + + if (!parentRecords || parentRecords.length === 0) { + break; + } + + allRecords.push(...parentRecords); + collectedSpanIds.add(parentSpanId); + parentSpanId = this.#parentSpanIdFromRecords(parentRecords, parentSpanId); } - // For v2, add inserted_at filtering for partition pruning - if (this._version === "v2") { - queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { - insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + let frontier = [anchorSpanId]; + while (frontier.length > 0) { + if (maxRows && allRecords.length >= maxRows) { + isTruncated = true; + break; + } + + const remaining = maxRows ? maxRows - allRecords.length : undefined; + const childRecords = await fetchDescendant({ + parentSpanIds: frontier, + limit: remaining, }); - // No upper bound on inserted_at - we want all events inserted up to now + + if (!childRecords || childRecords.length === 0) { + break; + } + + if (remaining !== undefined && childRecords.length >= remaining) { + isTruncated = true; + } + + allRecords.push(...childRecords); + + const nextFrontier: string[] = []; + for (const record of childRecords) { + if (!collectedSpanIds.has(record.span_id)) { + collectedSpanIds.add(record.span_id); + nextFrontier.push(record.span_id); + } + } + + frontier = nextFrontier; + } + + return { + records: allRecords, + isTruncated, + missingAnchor: false, + }; + } + + #parentSpanIdFromRecords( + records: Array<{ span_id: string; parent_span_id: string }>, + spanId: string + ): string | undefined { + const parentSpanId = records.find((record) => record.span_id === spanId)?.parent_span_id; + return parentSpanId ? parentSpanId : undefined; + } + + #createTraceSummaryQueryBuilder() { + return this._version === "v2" + ? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder() + : this._clickhouse.taskEvents.traceSummaryQueryBuilder(); + } + + async #fetchTraceSummaryRecords({ + environmentId, + traceId, + startCreatedAt, + endCreatedAt, + options, + spanIds, + parentSpanIds, + limit, + skipTimeWindow, + }: { + environmentId: string; + traceId: string; + startCreatedAt?: Date; + endCreatedAt?: Date; + options?: { includeDebugLogs?: boolean }; + spanIds?: string[]; + parentSpanIds?: string[]; + limit?: number; + skipTimeWindow?: boolean; + }): Promise { + const queryBuilder = this.#createTraceSummaryQueryBuilder(); + + queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); + queryBuilder.where("trace_id = {traceId: String}", { traceId }); + + if (!skipTimeWindow) { + if (!startCreatedAt) { + throw new Error("startCreatedAt is required when skipTimeWindow is false"); + } + + const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 60_000); + const endCreatedAtWithBuffer = endCreatedAt + ? new Date(endCreatedAt.getTime() + 60_000) + : undefined; + + queryBuilder.where("start_time >= {startCreatedAt: String}", { + startCreatedAt: convertDateToNanoseconds(startCreatedAtWithBuffer).toString(), + }); + + if (endCreatedAtWithBuffer) { + queryBuilder.where("start_time <= {endCreatedAt: String}", { + endCreatedAt: convertDateToNanoseconds(endCreatedAtWithBuffer).toString(), + }); + } + + if (this._version === "v2") { + queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { + insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + }); + } } if (options?.includeDebugLogs === false) { queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" }); } + if (spanIds && spanIds.length > 0) { + queryBuilder.where("span_id IN {spanIds: Array(String)}", { spanIds }); + } + + if (parentSpanIds && parentSpanIds.length > 0) { + queryBuilder.where("parent_span_id IN {parentSpanIds: Array(String)}", { parentSpanIds }); + } + queryBuilder.orderBy("start_time ASC"); - if (this._config.maximumTraceSummaryViewCount) { - queryBuilder.limit(this._config.maximumTraceSummaryViewCount); + if (limit) { + queryBuilder.limit(limit); } const [queryError, records] = await queryBuilder.execute(); @@ -1344,11 +1635,17 @@ export class ClickhouseEventRepository implements IEventRepository { throw queryError; } - if (!records) { + return records; + } + + #buildTraceSummaryFromRecords( + records: TaskEventSummaryV1Result[], + options?: { rootSpanId?: string } + ): TraceSummary | undefined { + if (records.length === 0) { return; } - // O(n) grouping instead of O(n²) array spreading const recordsGroupedBySpanId: Record = {}; for (const record of records) { if (!recordsGroupedBySpanId[record.span_id]) { @@ -1358,9 +1655,8 @@ export class ClickhouseEventRepository implements IEventRepository { } const spanSummaries = new Map(); - let rootSpanId: string | undefined; + let rootSpanId: string | undefined = options?.rootSpanId; - // Create temporary metadata cache for this query const metadataCache = new Map>(); for (const [spanId, spanRecords] of Object.entries(recordsGroupedBySpanId)) { @@ -1865,48 +2161,78 @@ export class ClickhouseEventRepository implements IEventRepository { return result; } - async getTraceDetailedSummary( - storeTable: TaskEventStoreTable, - environmentId: string, - traceId: string, - startCreatedAt: Date, - endCreatedAt?: Date, - options?: { includeDebugLogs?: boolean } - ): Promise { - const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000); + #createTraceDetailedSummaryQueryBuilder() { + return this._version === "v2" + ? this._clickhouse.taskEventsV2.traceDetailedSummaryQueryBuilder() + : this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder(); + } - const queryBuilder = - this._version === "v2" - ? this._clickhouse.taskEventsV2.traceDetailedSummaryQueryBuilder() - : this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder(); + async #fetchTraceDetailedSummaryRecords({ + environmentId, + traceId, + startCreatedAt, + endCreatedAt, + options, + spanIds, + parentSpanIds, + limit, + skipTimeWindow, + }: { + environmentId: string; + traceId: string; + startCreatedAt?: Date; + endCreatedAt?: Date; + options?: { includeDebugLogs?: boolean }; + spanIds?: string[]; + parentSpanIds?: string[]; + limit?: number; + skipTimeWindow?: boolean; + }): Promise { + const queryBuilder = this.#createTraceDetailedSummaryQueryBuilder(); queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); queryBuilder.where("trace_id = {traceId: String}", { traceId }); - queryBuilder.where("start_time >= {startCreatedAt: String}", { - startCreatedAt: convertDateToNanoseconds(startCreatedAtWithBuffer).toString(), - }); - if (endCreatedAt) { - queryBuilder.where("start_time <= {endCreatedAt: String}", { - endCreatedAt: convertDateToNanoseconds(endCreatedAt).toString(), - }); - } + if (!skipTimeWindow) { + if (!startCreatedAt) { + throw new Error("startCreatedAt is required when skipTimeWindow is false"); + } - // For v2, add inserted_at filtering for partition pruning - if (this._version === "v2") { - queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { - insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000); + + queryBuilder.where("start_time >= {startCreatedAt: String}", { + startCreatedAt: convertDateToNanoseconds(startCreatedAtWithBuffer).toString(), }); + + if (endCreatedAt) { + queryBuilder.where("start_time <= {endCreatedAt: String}", { + endCreatedAt: convertDateToNanoseconds(endCreatedAt).toString(), + }); + } + + if (this._version === "v2") { + queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", { + insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer), + }); + } } if (options?.includeDebugLogs === false) { queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" }); } + if (spanIds && spanIds.length > 0) { + queryBuilder.where("span_id IN {spanIds: Array(String)}", { spanIds }); + } + + if (parentSpanIds && parentSpanIds.length > 0) { + queryBuilder.where("parent_span_id IN {parentSpanIds: Array(String)}", { parentSpanIds }); + } + queryBuilder.orderBy("start_time ASC"); - if (this._config.maximumTraceDetailedSummaryViewCount) { - queryBuilder.limit(this._config.maximumTraceDetailedSummaryViewCount); + if (limit) { + queryBuilder.limit(limit); } const [queryError, records] = await queryBuilder.execute(); @@ -1915,11 +2241,18 @@ export class ClickhouseEventRepository implements IEventRepository { throw queryError; } - if (!records) { + return records; + } + + #buildTraceDetailedSummaryFromRecords( + traceId: string, + records: TaskEventDetailedSummaryV1Result[], + rootSpanId?: string + ): TraceDetailedSummary | undefined { + if (records.length === 0) { return; } - // O(n) grouping instead of O(n²) array spreading const recordsGroupedBySpanId: Record = {}; for (const record of records) { if (!recordsGroupedBySpanId[record.span_id]) { @@ -1929,9 +2262,8 @@ export class ClickhouseEventRepository implements IEventRepository { } const spanSummaries = new Map(); - let rootSpanId: string | undefined; + let resolvedRootSpanId: string | undefined = rootSpanId; - // Create temporary metadata cache for this query const metadataCache = new Map>(); for (const [spanId, spanRecords] of Object.entries(recordsGroupedBySpanId)) { @@ -1947,12 +2279,12 @@ export class ClickhouseEventRepository implements IEventRepository { spanSummaries.set(spanId, spanSummary); - if (!rootSpanId && !spanSummary.parentId) { - rootSpanId = spanId; + if (!resolvedRootSpanId && !spanSummary.parentId) { + resolvedRootSpanId = spanId; } } - if (!rootSpanId) { + if (!resolvedRootSpanId) { return; } @@ -1967,7 +2299,6 @@ export class ClickhouseEventRepository implements IEventRepository { return finalSpan; }); - // Second pass: build parent-child relationships for (const finalSpan of finalSpans) { if (finalSpan.parentId) { const parent = spanDetailedSummaryMap.get(finalSpan.parentId); @@ -1977,7 +2308,7 @@ export class ClickhouseEventRepository implements IEventRepository { } } - const rootSpan = spanDetailedSummaryMap.get(rootSpanId); + const rootSpan = spanDetailedSummaryMap.get(resolvedRootSpanId); if (!rootSpan) { return; @@ -1989,6 +2320,73 @@ export class ClickhouseEventRepository implements IEventRepository { }; } + async getTraceDetailedSummary( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): Promise { + const limit = this._config.maximumTraceDetailedSummaryViewCount; + const records = await this.#fetchTraceDetailedSummaryRecords({ + environmentId, + traceId, + startCreatedAt, + endCreatedAt, + options, + limit, + }); + + if (!records) { + return; + } + + const summary = this.#buildTraceDetailedSummaryFromRecords(traceId, records); + if (!summary) { + return; + } + + return { + ...summary, + isTruncated: limit !== undefined && records.length >= limit, + }; + } + + async getTraceDetailedSubtreeSummary( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + anchorSpanId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): Promise { + const { records, isTruncated, missingAnchor } = await this.#fetchTraceDetailedSubtreeRecords({ + environmentId, + traceId, + anchorSpanId, + startCreatedAt, + endCreatedAt, + options, + limit: this._config.maximumTraceDetailedSummaryViewCount, + }); + + if (missingAnchor) { + return; + } + + const summary = this.#buildTraceDetailedSummaryFromRecords(traceId, records, anchorSpanId); + if (!summary) { + return; + } + + return { + ...summary, + isTruncated, + }; + } + async *streamTraceEvents( storeTable: TaskEventStoreTable, environmentId: string, diff --git a/apps/webapp/app/v3/eventRepository/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository/eventRepository.server.ts index 268c955fd25..ef0d44f469c 100644 --- a/apps/webapp/app/v3/eventRepository/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/eventRepository.server.ts @@ -30,6 +30,7 @@ import { singleton } from "~/utils/singleton"; import { DynamicFlushScheduler } from "../dynamicFlushScheduler.server"; import { tracePubSub } from "../services/tracePubSub.server"; import { DetailedTraceEvent, TaskEventStore, TaskEventStoreTable } from "../taskEventStore.server"; +import { clampToEmergencySpanCap } from "./emergencySpanCap.server"; import { startActiveSpan } from "../tracer.server"; import { startSpan } from "../tracing.server"; import { @@ -438,107 +439,62 @@ export class EventRepository implements IEventRepository { { includeDebugLogs: options?.includeDebugLogs } ); - let preparedEvents: Array = []; - let rootSpanId: string | undefined; - const eventsBySpanId = new Map(); - - for (const event of events) { - preparedEvents.push(prepareEvent(event)); - - if (!rootSpanId && !event.parentId) { - rootSpanId = event.spanId; - } - } - - for (const event of preparedEvents) { - const existingEvent = eventsBySpanId.get(event.spanId); - - if (!existingEvent) { - eventsBySpanId.set(event.spanId, event); - continue; - } - - // This is an invisible event, and we just want to keep the original event but concat together - // the event.events with the existingEvent.events - if (event.kind === "UNSPECIFIED") { - eventsBySpanId.set(event.spanId, { - ...existingEvent, - events: [...(existingEvent.events ?? []), ...(event.events ?? [])], - }); - continue; - } - - if (event.isCancelled || !event.isPartial) { - const mergedEvent: PreparedEvent = { - ...event, - // Preserve style from the original partial event - style: existingEvent.style, - events: [...(existingEvent.events ?? []), ...(event.events ?? [])], - }; - eventsBySpanId.set(event.spanId, mergedEvent); - continue; - } - } - - preparedEvents = Array.from(eventsBySpanId.values()); - - const spansBySpanId = new Map(); - - const spans = preparedEvents.map((event) => { - const overrides = getAncestorOverrides({ - spansById: eventsBySpanId, - span: event, - }); - - const ancestorCancelled = overrides?.isCancelled ?? false; - const ancestorIsError = overrides?.isError ?? false; - const duration = overrides?.duration ?? event.duration; - const events = [...(overrides?.events ?? []), ...(event.events ?? [])]; - const isPartial = ancestorCancelled || ancestorIsError ? false : event.isPartial; - const isCancelled = - event.isCancelled === true ? true : event.isPartial && ancestorCancelled; - const isError = isCancelled - ? false - : typeof overrides?.isError === "boolean" - ? overrides.isError - : event.isError; - - const span = { - id: event.spanId, - parentId: event.parentId ?? undefined, - runId: event.runId, - data: { - message: event.message, - style: event.style, - duration, - isError, - isPartial, - isCancelled, - isDebug: event.kind === TaskEventKind.LOG, - startTime: getDateFromNanoseconds(event.startTime), - level: event.level, - events, - }, - }; - - spansBySpanId.set(event.spanId, span); + return buildTraceSummaryFromQueriedEvents(events); + }); + } - return span; - }); + public async getTraceSubtreeSummary( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + anchorSpanId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): Promise { + return await startActiveSpan("getTraceSubtreeSummary", async () => { + const maxRows = clampToEmergencySpanCap(env.MAXIMUM_TRACE_SUMMARY_VIEW_COUNT); + const traceEventSelect = { + spanId: true, + parentId: true, + runId: true, + message: true, + style: true, + startTime: true, + duration: true, + isError: true, + isPartial: true, + isCancelled: true, + level: true, + events: true, + kind: true, + attemptNumber: true, + } as const; + + const { events, isTruncated, missingAnchor } = await collectTraceSubtreeEvents( + this.taskEventStore, + storeTable, + traceId, + anchorSpanId, + startCreatedAt, + endCreatedAt, + traceEventSelect, + options, + maxRows + ); - if (!rootSpanId) { + if (missingAnchor) { return; } - const rootSpan = spansBySpanId.get(rootSpanId); - - if (!rootSpan) { + const summary = buildTraceSummaryFromQueriedEvents(events, anchorSpanId); + if (!summary) { return; } return { - rootSpan, - spans, + ...summary, + isTruncated, }; }); } @@ -560,126 +516,69 @@ export class EventRepository implements IEventRepository { { includeDebugLogs: options?.includeDebugLogs } ); - let preparedEvents: Array = []; - let rootSpanId: string | undefined; - const eventsBySpanId = new Map(); - - for (const event of events) { - preparedEvents.push(prepareDetailedEvent(event)); - - if (!rootSpanId && !event.parentId) { - rootSpanId = event.spanId; - } - } - - for (const event of preparedEvents) { - const existingEvent = eventsBySpanId.get(event.spanId); - - if (!existingEvent) { - eventsBySpanId.set(event.spanId, event); - continue; - } - - // This is an invisible event, and we just want to keep the original event but concat together - // the event.events with the existingEvent.events - if (event.kind === "UNSPECIFIED") { - eventsBySpanId.set(event.spanId, { - ...existingEvent, - events: [...(existingEvent.events ?? []), ...(event.events ?? [])], - }); - continue; - } - - if (event.isCancelled || !event.isPartial) { - // If we have a cancelled event and an existing partial event, - // merge them: use cancelled event data but preserve style from the partial event - if (event.isCancelled && existingEvent.isPartial && !existingEvent.isCancelled) { - const mergedEvent: PreparedDetailedEvent = { - ...event, // Use cancelled event as base (has correct timing, status, events) - // Preserve style from the original partial event - style: existingEvent.style, - events: [...(existingEvent.events ?? []), ...(event.events ?? [])], - }; - eventsBySpanId.set(event.spanId, mergedEvent); - continue; - } - } - } + return buildTraceDetailedSummaryFromQueriedEvents(traceId, events); + }); + } - preparedEvents = Array.from(eventsBySpanId.values()); + public async getTraceDetailedSubtreeSummary( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + anchorSpanId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): Promise { + return await startActiveSpan("getTraceDetailedSubtreeSummary", async () => { + const maxRows = clampToEmergencySpanCap(env.MAXIMUM_TRACE_DETAILED_SUMMARY_VIEW_COUNT); + const detailedEventSelect = { + spanId: true, + parentId: true, + runId: true, + message: true, + style: true, + startTime: true, + duration: true, + isError: true, + isPartial: true, + isCancelled: true, + level: true, + events: true, + kind: true, + taskSlug: true, + properties: true, + attemptNumber: true, + } as const; + + const { events, isTruncated, missingAnchor } = await collectTraceSubtreeEvents( + this.taskEventStore, + storeTable, + traceId, + anchorSpanId, + startCreatedAt, + endCreatedAt, + detailedEventSelect, + options, + maxRows + ); - if (!rootSpanId) { + if (missingAnchor) { return; } - // Build hierarchical structure - const spanDetailedSummaryMap = new Map(); - - // First pass: create all span detailed summaries - for (const event of preparedEvents) { - const overrides = getAncestorOverrides({ - spansById: eventsBySpanId, - span: event, - }); - - const ancestorCancelled = overrides?.isCancelled ?? false; - const ancestorIsError = overrides?.isError ?? false; - const duration = overrides?.duration ?? event.duration; - const events = [...(overrides?.events ?? []), ...(event.events ?? [])]; - const isPartial = ancestorCancelled || ancestorIsError ? false : event.isPartial; - const isCancelled = - event.isCancelled === true ? true : event.isPartial && ancestorCancelled; - const isError = isCancelled - ? false - : typeof overrides?.isError === "boolean" - ? overrides.isError - : event.isError; - - const properties = event.properties - ? removePrivateProperties(event.properties as Attributes) - : {}; - - const spanDetailedSummary: SpanDetailedSummary = { - id: event.spanId, - parentId: event.parentId ?? undefined, - runId: event.runId, - data: { - message: event.message, - taskSlug: event.taskSlug ?? undefined, - events: events?.filter((e) => !e.name.startsWith("trigger.dev")), - startTime: getDateFromNanoseconds(event.startTime), - duration: nanosecondsToMilliseconds(duration), - isError, - isPartial, - isCancelled, - level: event.level, - properties, - }, - children: [], - }; - - spanDetailedSummaryMap.set(event.spanId, spanDetailedSummary); - } - - // Second pass: build parent-child relationships - for (const spanSummary of spanDetailedSummaryMap.values()) { - if (spanSummary.parentId) { - const parent = spanDetailedSummaryMap.get(spanSummary.parentId); - if (parent) { - parent.children.push(spanSummary); - } - } - } - - const rootSpan = spanDetailedSummaryMap.get(rootSpanId); + const summary = buildTraceDetailedSummaryFromQueriedEvents( + traceId, + events as DetailedTraceEvent[], + anchorSpanId + ); - if (!rootSpan) { + if (!summary) { return; } return { - traceId, - rootSpan, + ...summary, + isTruncated, }; }); } @@ -1590,6 +1489,353 @@ function prepareEvent(event: QueriedEvent): PreparedEvent { }; } +function buildTraceSummaryFromQueriedEvents( + events: QueriedEvent[], + rootSpanId?: string +): TraceSummary | undefined { + let preparedEvents: Array = []; + let resolvedRootSpanId: string | undefined = rootSpanId; + const eventsBySpanId = new Map(); + + for (const event of events) { + preparedEvents.push(prepareEvent(event)); + + if (!resolvedRootSpanId && !event.parentId) { + resolvedRootSpanId = event.spanId; + } + } + + for (const event of preparedEvents) { + const existingEvent = eventsBySpanId.get(event.spanId); + + if (!existingEvent) { + eventsBySpanId.set(event.spanId, event); + continue; + } + + if (event.kind === "UNSPECIFIED") { + eventsBySpanId.set(event.spanId, { + ...existingEvent, + events: [...(existingEvent.events ?? []), ...(event.events ?? [])], + }); + continue; + } + + if (event.isCancelled || !event.isPartial) { + const mergedEvent: PreparedEvent = { + ...event, + style: existingEvent.style, + events: [...(existingEvent.events ?? []), ...(event.events ?? [])], + }; + eventsBySpanId.set(event.spanId, mergedEvent); + continue; + } + } + + preparedEvents = Array.from(eventsBySpanId.values()); + + const spansBySpanId = new Map(); + + const spans = preparedEvents.map((event) => { + const overrides = getAncestorOverrides({ + spansById: eventsBySpanId, + span: event, + }); + + const ancestorCancelled = overrides?.isCancelled ?? false; + const ancestorIsError = overrides?.isError ?? false; + const duration = overrides?.duration ?? event.duration; + const spanEvents = [...(overrides?.events ?? []), ...(event.events ?? [])]; + const isPartial = ancestorCancelled || ancestorIsError ? false : event.isPartial; + const isCancelled = event.isCancelled === true ? true : event.isPartial && ancestorCancelled; + const isError = isCancelled + ? false + : typeof overrides?.isError === "boolean" + ? overrides.isError + : event.isError; + + const span = { + id: event.spanId, + parentId: event.parentId ?? undefined, + runId: event.runId, + data: { + message: event.message, + style: event.style, + duration, + isError, + isPartial, + isCancelled, + isDebug: event.kind === TaskEventKind.LOG, + startTime: getDateFromNanoseconds(event.startTime), + level: event.level, + events: spanEvents, + }, + }; + + spansBySpanId.set(event.spanId, span); + + return span; + }); + + if (!resolvedRootSpanId) { + return; + } + + const rootSpan = spansBySpanId.get(resolvedRootSpanId); + + if (!rootSpan) { + return; + } + + return { + rootSpan, + spans, + }; +} + +async function collectTraceSubtreeEvents( + taskEventStore: TaskEventStore, + storeTable: TaskEventStoreTable, + traceId: string, + anchorSpanId: string, + startCreatedAt: Date, + endCreatedAt: Date | undefined, + select: Prisma.TaskEventSelect, + options: { includeDebugLogs?: boolean } | undefined, + maxRows: number +): Promise<{ + events: QueriedEvent[]; + isTruncated: boolean; + missingAnchor: boolean; +}> { + type SubtreeEvent = QueriedEvent & { parentId: string | null }; + + const allEvents: SubtreeEvent[] = []; + const collectedSpanIds = new Set(); + let isTruncated = false; + + const findDescendantEvents = (where: Prisma.TaskEventWhereInput, limit?: number) => + taskEventStore.findMany( + storeTable, + where, + startCreatedAt, + endCreatedAt, + select, + { startTime: "asc" }, + { + includeDebugLogs: options?.includeDebugLogs, + limit, + } + ) as Promise; + + const findAncestorEvents = (spanId: string, limit?: number) => + taskEventStore.findMany( + storeTable, + { traceId, spanId }, + startCreatedAt, + endCreatedAt, + select, + { startTime: "asc" }, + { + includeDebugLogs: options?.includeDebugLogs, + limit, + skipTimeWindow: true, + } + ) as Promise; + + const anchorEvents = await findDescendantEvents({ traceId, spanId: anchorSpanId }, maxRows); + + if (anchorEvents.length === 0) { + return { events: [], isTruncated: false, missingAnchor: true }; + } + + if (anchorEvents.length >= maxRows) { + isTruncated = true; + } + + allEvents.push(...anchorEvents); + collectedSpanIds.add(anchorSpanId); + + let parentSpanId = anchorEvents.find((event) => event.spanId === anchorSpanId)?.parentId; + while (parentSpanId) { + if (collectedSpanIds.has(parentSpanId)) { + break; + } + + if (allEvents.length >= maxRows) { + isTruncated = true; + break; + } + + const parentEvents = await findAncestorEvents(parentSpanId, maxRows - allEvents.length); + + if (parentEvents.length === 0) { + break; + } + + allEvents.push(...parentEvents); + collectedSpanIds.add(parentSpanId); + parentSpanId = parentEvents.find((event) => event.spanId === parentSpanId)?.parentId ?? null; + } + + let frontier = [anchorSpanId]; + while (frontier.length > 0) { + if (allEvents.length >= maxRows) { + isTruncated = true; + break; + } + + const remaining = maxRows - allEvents.length; + const childEvents = await findDescendantEvents( + { traceId, parentId: { in: frontier } }, + remaining + ); + + if (childEvents.length === 0) { + break; + } + + if (childEvents.length >= remaining) { + isTruncated = true; + } + + allEvents.push(...childEvents); + + const nextFrontier: string[] = []; + for (const event of childEvents) { + if (!collectedSpanIds.has(event.spanId)) { + collectedSpanIds.add(event.spanId); + nextFrontier.push(event.spanId); + } + } + + frontier = nextFrontier; + } + + return { + events: allEvents, + isTruncated, + missingAnchor: false, + }; +} + +function buildTraceDetailedSummaryFromQueriedEvents( + traceId: string, + events: DetailedTraceEvent[], + rootSpanId?: string +): TraceDetailedSummary | undefined { + let preparedEvents: Array = []; + let resolvedRootSpanId: string | undefined = rootSpanId; + const eventsBySpanId = new Map(); + + for (const event of events) { + preparedEvents.push(prepareDetailedEvent(event)); + + if (!resolvedRootSpanId && !event.parentId) { + resolvedRootSpanId = event.spanId; + } + } + + for (const event of preparedEvents) { + const existingEvent = eventsBySpanId.get(event.spanId); + + if (!existingEvent) { + eventsBySpanId.set(event.spanId, event); + continue; + } + + if (event.kind === "UNSPECIFIED") { + eventsBySpanId.set(event.spanId, { + ...existingEvent, + events: [...(existingEvent.events ?? []), ...(event.events ?? [])], + }); + continue; + } + + if (event.isCancelled || !event.isPartial) { + const mergedEvent: PreparedDetailedEvent = { + ...event, + style: existingEvent.style, + events: [...(existingEvent.events ?? []), ...(event.events ?? [])], + }; + eventsBySpanId.set(event.spanId, mergedEvent); + continue; + } + } + + preparedEvents = Array.from(eventsBySpanId.values()); + + if (!resolvedRootSpanId) { + return; + } + + const spanDetailedSummaryMap = new Map(); + + for (const event of preparedEvents) { + const overrides = getAncestorOverrides({ + spansById: eventsBySpanId as Map, + span: event as PreparedEvent, + }); + + const ancestorCancelled = overrides?.isCancelled ?? false; + const ancestorIsError = overrides?.isError ?? false; + const duration = overrides?.duration ?? event.duration; + const spanEvents = [...(overrides?.events ?? []), ...(event.events ?? [])]; + const isPartial = ancestorCancelled || ancestorIsError ? false : event.isPartial; + const isCancelled = event.isCancelled === true ? true : event.isPartial && ancestorCancelled; + const isError = isCancelled + ? false + : typeof overrides?.isError === "boolean" + ? overrides.isError + : event.isError; + + const properties = event.properties + ? removePrivateProperties(event.properties as Attributes) + : {}; + + const spanDetailedSummary: SpanDetailedSummary = { + id: event.spanId, + parentId: event.parentId ?? undefined, + runId: event.runId, + data: { + message: event.message, + taskSlug: event.taskSlug ?? undefined, + events: spanEvents?.filter((e) => !e.name.startsWith("trigger.dev")), + startTime: getDateFromNanoseconds(event.startTime), + duration: nanosecondsToMilliseconds(duration), + isError, + isPartial, + isCancelled, + level: event.level, + properties, + }, + children: [], + }; + + spanDetailedSummaryMap.set(event.spanId, spanDetailedSummary); + } + + for (const spanSummary of spanDetailedSummaryMap.values()) { + if (spanSummary.parentId) { + const parent = spanDetailedSummaryMap.get(spanSummary.parentId); + if (parent) { + parent.children.push(spanSummary); + } + } + } + + const rootSpan = spanDetailedSummaryMap.get(resolvedRootSpanId); + + if (!rootSpan) { + return; + } + + return { + traceId, + rootSpan, + }; +} + function prepareDetailedEvent(event: DetailedTraceEvent): PreparedDetailedEvent { return { ...event, diff --git a/apps/webapp/app/v3/eventRepository/eventRepository.types.ts b/apps/webapp/app/v3/eventRepository/eventRepository.types.ts index 591c7927a58..4b7db3f870a 100644 --- a/apps/webapp/app/v3/eventRepository/eventRepository.types.ts +++ b/apps/webapp/app/v3/eventRepository/eventRepository.types.ts @@ -308,6 +308,8 @@ export type TraceSummary = { rootSpan: SpanSummary; spans: Array; overridesBySpanId?: Record; + /** Set when a subtree fetch hit the row cap before collecting all descendants. */ + isTruncated?: boolean; }; export type SpanDetailedSummary = { @@ -351,6 +353,8 @@ export type StreamedTraceEvent = { export type TraceDetailedSummary = { traceId: string; rootSpan: SpanDetailedSummary; + /** Set when a fetch hit the row cap before collecting all spans. */ + isTruncated?: boolean; }; // ============================================================================ @@ -416,6 +420,17 @@ export interface IEventRepository { options?: { includeDebugLogs?: boolean } ): Promise; + /** Fetch the anchor span, its ancestors (for override propagation), and all descendants. */ + getTraceSubtreeSummary( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + anchorSpanId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): Promise; + getTraceDetailedSummary( storeTable: TaskEventStoreTable, environmentId: string, @@ -425,6 +440,17 @@ export interface IEventRepository { options?: { includeDebugLogs?: boolean } ): Promise; + /** Fetch the anchor span subtree as a detailed hierarchical trace rooted at anchorSpanId. */ + getTraceDetailedSubtreeSummary( + storeTable: TaskEventStoreTable, + environmentId: string, + traceId: string, + anchorSpanId: string, + startCreatedAt: Date, + endCreatedAt?: Date, + options?: { includeDebugLogs?: boolean } + ): Promise; + // Streams a trace's events in start_time order, one at a time, without ever // materialising the full result set or a tree. Powers the streaming trace // export so arbitrarily large traces download with bounded memory. diff --git a/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts index 03b6e03ca1c..4a1c6129f7d 100644 --- a/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts +++ b/apps/webapp/app/v3/mollifier/syntheticTrace.server.ts @@ -41,9 +41,7 @@ export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { const events = tree ? flattenTree(tree).map((n) => { - const offset = millisecondsToNanoseconds( - n.data.startTime.getTime() - treeRootStartTimeMs - ); + const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs); // Mirror RunPresenter: raw span events stay server-side, only // timelineEvents ship to the client. const { events: spanEvents, ...data } = n.data; @@ -51,7 +49,11 @@ export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { ...n, data: { ...data, - timelineEvents: createTimelineSpanEventsFromSpanEvents(spanEvents, false, treeRootStartTimeMs), + timelineEvents: createTimelineSpanEventsFromSpanEvents( + spanEvents, + false, + treeRootStartTimeMs + ), duration: n.data.isPartial ? null : n.data.duration, offset, isRoot: n.id === spanId, @@ -82,5 +84,7 @@ export function buildSyntheticTraceForBufferedRun(run: SyntheticRun) { queuedDuration: undefined, overridesBySpanId: undefined, linkedRunIdBySpanId: {} as Record, + isTruncated: false, + missingAnchor: false, }; } diff --git a/apps/webapp/app/v3/taskEventStore.server.ts b/apps/webapp/app/v3/taskEventStore.server.ts index ed580b40d0e..19bbe746b43 100644 --- a/apps/webapp/app/v3/taskEventStore.server.ts +++ b/apps/webapp/app/v3/taskEventStore.server.ts @@ -56,7 +56,10 @@ export function getTaskEventStore(): TaskEventStoreTable { } export class TaskEventStore { - constructor(private db: PrismaClient, private readReplica: PrismaReplicaClient) {} + constructor( + private db: PrismaClient, + private readReplica: PrismaReplicaClient + ) {} /** * Insert one record. @@ -97,11 +100,11 @@ export class TaskEventStore { endCreatedAt?: Date, select?: TSelect, orderBy?: Prisma.TaskEventOrderByWithRelationInput, - options?: { includeDebugLogs?: boolean; limit?: number } + options?: { includeDebugLogs?: boolean; limit?: number; skipTimeWindow?: boolean } ): Promise[]> { let finalWhere: Prisma.TaskEventWhereInput = where; - if (table === "taskEventPartitioned") { + if (table === "taskEventPartitioned" && !options?.skipTimeWindow) { // Add buffer to start and end of the range to make sure we include all events in the range. const end = endCreatedAt ? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000) diff --git a/apps/webapp/test/getTraceDetailedSubtreeSummary.integration.test.ts b/apps/webapp/test/getTraceDetailedSubtreeSummary.integration.test.ts new file mode 100644 index 00000000000..e532d387db2 --- /dev/null +++ b/apps/webapp/test/getTraceDetailedSubtreeSummary.integration.test.ts @@ -0,0 +1,304 @@ +import { ClickHouse, type TaskEventV2Input } from "@internal/clickhouse"; +import { clickhouseTest } from "@internal/testcontainers"; +import { describe, expect } from "vitest"; +import type { SpanDetailedSummary } from "~/v3/eventRepository/eventRepository.types"; +import { + ClickhouseEventRepository, + convertDateToClickhouseDateTime, +} from "~/v3/eventRepository/clickhouseEventRepository.server"; + +/** + * Proves getTraceDetailedSubtreeSummary (used by GET /api/v1/runs/:runId/trace) + * returns a tree rooted at the requested run's span — not the trace-wide root. + * + * Reproduces the large-trace failure mode: a full-trace fetch is capped by + * ORDER BY start_time ASC LIMIT N, so late spans are excluded. Subtree fetch + * looks up the anchor span directly and still returns the run-scoped tree. + */ +const INTEGRATION_TIMEOUT_MS = 60_000; +const TRACE_ROW_LIMIT = 50; +const FILLER_COUNT = 60; + +function startTimeNs(baseMs: number, offsetMs: number): string { + return ((BigInt(baseMs) + BigInt(offsetMs)) * 1_000_000n).toString(); +} + +function formatClickhouseStartTime(baseMs: number, offsetMs: number): string { + const nanoseconds = startTimeNs(baseMs, offsetMs); + if (nanoseconds.length !== 19) { + return nanoseconds; + } + + return `${nanoseconds.substring(0, 10)}.${nanoseconds.substring(10)}`; +} + +function findSpan( + span: SpanDetailedSummary | undefined, + spanId: string +): SpanDetailedSummary | undefined { + if (!span) { + return undefined; + } + if (span.id === spanId) { + return span; + } + for (const child of span.children) { + const found = findSpan(child, spanId); + if (found) { + return found; + } + } + return undefined; +} + +describe("getTraceDetailedSubtreeSummary", () => { + clickhouseTest( + "roots the API trace at the requested run span even when it is outside the full-trace row cap", + async ({ clickhouseContainer }) => { + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + logLevel: "warn", + }); + + const repository = new ClickhouseEventRepository({ + clickhouse, + version: "v2", + maximumTraceDetailedSummaryViewCount: TRACE_ROW_LIMIT, + }); + + const environmentId = "env_trace_subtree_test"; + const organizationId = "org_trace_subtree_test"; + const projectId = "proj_trace_subtree_test"; + const traceId = "a".repeat(32); + const spanRoot = "rootspan00000001"; + const spanChild = "childspan0000001"; + const spanGrandchild = "grandchildspan01"; + const runRoot = "run_root_task_run"; + const runChild = "run_child_task_run"; + const baseMs = Date.now(); + const runCreatedAt = new Date(baseMs - 60_000); + const expiresAt = convertDateToClickhouseDateTime( + new Date(baseMs + 365 * 24 * 60 * 60 * 1000) + ); + + function makeSpanRow({ + spanId, + parentSpanId, + runId, + startOffsetMs, + message, + }: { + spanId: string; + parentSpanId: string; + runId: string; + startOffsetMs: number; + message: string; + }): TaskEventV2Input { + return { + environment_id: environmentId, + organization_id: organizationId, + project_id: projectId, + task_identifier: "subtree-test-task", + run_id: runId, + start_time: formatClickhouseStartTime(baseMs, startOffsetMs), + duration: "1000000", + trace_id: traceId, + span_id: spanId, + parent_span_id: parentSpanId, + message, + kind: "SPAN", + status: "OK", + attributes: {}, + metadata: "{}", + expires_at: expiresAt, + }; + } + + const rows: TaskEventV2Input[] = [ + makeSpanRow({ + spanId: spanRoot, + parentSpanId: "", + runId: runRoot, + startOffsetMs: 0, + message: "root task", + }), + ...Array.from({ length: FILLER_COUNT }, (_, index) => + makeSpanRow({ + spanId: `filler${String(index).padStart(10, "0")}`, + parentSpanId: spanRoot, + runId: runRoot, + startOffsetMs: index + 1, + message: `filler span ${index}`, + }) + ), + makeSpanRow({ + spanId: spanChild, + parentSpanId: spanRoot, + runId: runChild, + startOffsetMs: 100_000, + message: "child task", + }), + makeSpanRow({ + spanId: spanGrandchild, + parentSpanId: spanChild, + runId: runChild, + startOffsetMs: 100_001, + message: "grandchild span", + }), + ]; + + const [insertError] = await clickhouse.taskEventsV2.insert(rows, { + clickhouse_settings: { async_insert: 0 }, + }); + expect(insertError).toBeNull(); + + const fullTrace = await repository.getTraceDetailedSummary( + "taskEvent", + environmentId, + traceId, + runCreatedAt + ); + + expect(fullTrace?.rootSpan.id).toBe(spanRoot); + expect(fullTrace?.isTruncated).toBe(true); + expect(findSpan(fullTrace?.rootSpan, spanChild)).toBeUndefined(); + + const subtree = await repository.getTraceDetailedSubtreeSummary( + "taskEvent", + environmentId, + traceId, + spanChild, + runCreatedAt + ); + + expect(subtree).toBeDefined(); + expect(subtree!.isTruncated).toBe(false); + expect(subtree!.traceId).toBe(traceId); + expect(subtree!.rootSpan.id).toBe(spanChild); + expect(subtree!.rootSpan.runId).toBe(runChild); + expect(subtree!.rootSpan.parentId).toBe(spanRoot); + expect(subtree!.rootSpan.children).toHaveLength(1); + expect(subtree!.rootSpan.children[0]?.id).toBe(spanGrandchild); + expect(subtree!.rootSpan.children[0]?.runId).toBe(runChild); + }, + INTEGRATION_TIMEOUT_MS + ); + + clickhouseTest( + "loads ancestors outside the anchor run time window for override propagation", + async ({ clickhouseContainer }) => { + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + logLevel: "warn", + }); + + const repository = new ClickhouseEventRepository({ + clickhouse, + version: "v2", + maximumTraceDetailedSummaryViewCount: TRACE_ROW_LIMIT, + }); + + const environmentId = "env_trace_subtree_ancestor"; + const organizationId = "org_trace_subtree_ancestor"; + const projectId = "proj_trace_subtree_ancestor"; + const traceId = "b".repeat(32); + const spanRoot = "rootspan00000002"; + const spanChild = "childspan0000002"; + const runRoot = "run_root_cancelled"; + const runChild = "run_child_partial"; + const baseMs = Date.now(); + const childRunCreatedAt = new Date(baseMs + 100_000); + const expiresAt = convertDateToClickhouseDateTime( + new Date(baseMs + 365 * 24 * 60 * 60 * 1000) + ); + + function makeSpanRow({ + spanId, + parentSpanId, + runId, + startOffsetMs, + insertedOffsetMs, + message, + status, + }: { + spanId: string; + parentSpanId: string; + runId: string; + startOffsetMs: number; + insertedOffsetMs: number; + message: string; + status: string; + }): TaskEventV2Input { + return { + environment_id: environmentId, + organization_id: organizationId, + project_id: projectId, + task_identifier: "subtree-ancestor-test-task", + run_id: runId, + start_time: formatClickhouseStartTime(baseMs, startOffsetMs), + inserted_at: convertDateToClickhouseDateTime(new Date(baseMs + insertedOffsetMs)), + duration: "5000000000", + trace_id: traceId, + span_id: spanId, + parent_span_id: parentSpanId, + message, + kind: "SPAN", + status, + attributes: {}, + metadata: "{}", + expires_at: expiresAt, + }; + } + + const rows: TaskEventV2Input[] = [ + makeSpanRow({ + spanId: spanRoot, + parentSpanId: "", + runId: runRoot, + startOffsetMs: 0, + insertedOffsetMs: 0, + message: "root task", + status: "PARTIAL", + }), + makeSpanRow({ + spanId: spanRoot, + parentSpanId: "", + runId: runRoot, + startOffsetMs: 5_000, + insertedOffsetMs: 5_000, + message: "root task", + status: "CANCELLED", + }), + makeSpanRow({ + spanId: spanChild, + parentSpanId: spanRoot, + runId: runChild, + startOffsetMs: 100_000, + insertedOffsetMs: 100_000, + message: "child task", + status: "PARTIAL", + }), + ]; + + const [insertError] = await clickhouse.taskEventsV2.insert(rows, { + clickhouse_settings: { async_insert: 0 }, + }); + expect(insertError).toBeNull(); + + const subtree = await repository.getTraceDetailedSubtreeSummary( + "taskEvent", + environmentId, + traceId, + spanChild, + childRunCreatedAt + ); + + expect(subtree).toBeDefined(); + expect(subtree!.rootSpan.id).toBe(spanChild); + expect(subtree!.rootSpan.parentId).toBe(spanRoot); + expect(subtree!.rootSpan.data.isPartial).toBe(false); + expect(subtree!.rootSpan.data.isCancelled).toBe(true); + }, + INTEGRATION_TIMEOUT_MS + ); +}); diff --git a/docs/management/runs/retrieve-trace.mdx b/docs/management/runs/retrieve-trace.mdx index 668718cf76a..9aa237108d1 100644 --- a/docs/management/runs/retrieve-trace.mdx +++ b/docs/management/runs/retrieve-trace.mdx @@ -2,3 +2,7 @@ title: "Retrieve run trace" openapi: "v3-openapi GET /api/v1/runs/{runId}/trace" --- + +Returns the OpenTelemetry trace subtree for the run you request. The response `trace.rootSpan` is that run's span — not necessarily the trace-wide root — with its descendant spans nested under `children`. + +For a child or nested run inside a large trace, this endpoint scopes the tree to that run so you still get a useful subtree even when the full trace has more spans than the platform can return in one response. diff --git a/docs/v3-openapi.yaml b/docs/v3-openapi.yaml index 63700c90dc0..7c6fd78c1dc 100644 --- a/docs/v3-openapi.yaml +++ b/docs/v3-openapi.yaml @@ -448,7 +448,7 @@ paths: get: operationId: get_run_trace_v1 summary: Retrieve run trace - description: Returns the full OTel trace tree for a run, including all spans and their children. + description: Returns the OTel trace subtree for the requested run — the run's span as `rootSpan`, its ancestor chain, and its descendant spans. For child or nested runs in a large trace, this is scoped to that run rather than the trace-wide root. responses: "200": description: Successful request @@ -464,7 +464,9 @@ paths: type: string description: The OTel trace ID. rootSpan: - $ref: "#/components/schemas/SpanDetailedSummary" + allOf: + - $ref: "#/components/schemas/SpanDetailedSummary" + description: The requested run's span, with nested descendant spans as `children`. Not necessarily the trace-wide root span. "401": description: Unauthorized request content: