Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4f91839
Bulldozer DB
N2D4 Mar 24, 2026
9694c33
declareGroupByTable
N2D4 Mar 24, 2026
09ba416
Fix Prisma schema
N2D4 Mar 24, 2026
4232a94
declareMapTable
N2D4 Mar 24, 2026
31b6ac6
Performance tests
N2D4 Mar 24, 2026
2f7f09a
Load tests
N2D4 Mar 24, 2026
863ee05
Interface updates
N2D4 Mar 24, 2026
109cf5d
Bulldozer Studio
N2D4 Mar 25, 2026
53f7302
Remove unnecessary table
N2D4 Mar 25, 2026
006cf5e
Add flat map interface
N2D4 Mar 25, 2026
4cdc057
FlatMap table
N2D4 Mar 25, 2026
49dc922
Flat map fuzz tests
N2D4 Mar 25, 2026
3eccc22
Build MapTable from FlatMapTable
N2D4 Mar 25, 2026
e041e7d
Filter tables
N2D4 Mar 25, 2026
e2b0d3d
Limit tables
N2D4 Mar 25, 2026
f7f21aa
Speed up fuzzing
N2D4 Mar 26, 2026
90f61ec
Concat table
N2D4 Mar 26, 2026
69b3d4f
Bulldozer Studio: Better node placing algorithm
N2D4 Mar 26, 2026
e306770
Add left-join table
N2D4 Mar 26, 2026
5036c2a
Sort table
N2D4 Mar 27, 2026
d55470b
LFold table
N2D4 Mar 27, 2026
43d7304
Left join table
N2D4 Mar 27, 2026
a33faa0
Improve left join performance
N2D4 Mar 27, 2026
aaeb7d1
Improved performance for most tables
N2D4 Mar 27, 2026
ef9915a
Refactor Bulldozer into individual files
N2D4 Mar 27, 2026
037d20f
Merge branch 'dev' into bulldozer-db
N2D4 Mar 27, 2026
2403c17
Update apps/backend/src/lib/bulldozer/db/tables/group-by-table.ts
N2D4 Mar 27, 2026
d3a2daa
Performance improvements
N2D4 Mar 28, 2026
b459240
PR comments
N2D4 Mar 29, 2026
c01d931
Some more perf changes
N2D4 Mar 30, 2026
2d49d23
Fix various comparison key bugs
N2D4 Mar 31, 2026
2bb89c2
Performance improvements
N2D4 Mar 31, 2026
199cdf2
Lint fixes
N2D4 Apr 1, 2026
1dc76dc
Merge remote-tracking branch 'origin/dev' into bulldozer-db
N2D4 Apr 1, 2026
e4a5221
More fixes...
N2D4 Apr 3, 2026
e353232
Various changes
N2D4 Apr 6, 2026
4b400e2
Comments from Aman
N2D4 Apr 8, 2026
5b69a18
Fix tests
N2D4 Apr 8, 2026
a7f999f
Improve perf tests
N2D4 Apr 10, 2026
19abfb3
TimeFold table
N2D4 Apr 11, 2026
9cb5f5b
Clean up Prisma schema
N2D4 Apr 11, 2026
e3c3865
Make migration warnings into errors
N2D4 Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Filter tables
  • Loading branch information
N2D4 committed Mar 25, 2026
commit e041e7d708df8bf8d08d58030285bd5b596afc5c
34 changes: 29 additions & 5 deletions apps/backend/scripts/run-bulldozer-studio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ function getStudioPageHtml(): string {
--text: #f2f2f2;
--muted: #b0b0b0;
--accent: #66a3ff;
--filter: #f7b955;
--danger: #ff5f56;
--ok: #35c769;
--mono: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace;
Expand All @@ -350,6 +351,7 @@ function getStudioPageHtml(): string {
--text: #111111;
--muted: #555555;
--accent: #245ee9;
--filter: #b06b00;
--danger: #d72638;
--ok: #118a3e;
}
Expand Down Expand Up @@ -476,6 +478,18 @@ function getStudioPageHtml(): string {
.node-type.derived {
color: var(--accent);
}
.node-type.filter {
color: var(--filter);
}
.node-type.map {
color: var(--accent);
}
.node-type.flatmap {
color: color-mix(in srgb, var(--accent) 70%, var(--ok));
}
.node-type.groupby {
color: color-mix(in srgb, var(--accent) 80%, white);
}
.node-name {
font-size: 13px;
font-weight: 700;
Expand All @@ -489,6 +503,9 @@ function getStudioPageHtml(): string {
.node-name.derived {
color: var(--text);
}
.node-name.filter {
color: var(--filter);
}
.node-meta {
font-size: 11px;
color: var(--muted);
Expand Down Expand Up @@ -955,18 +972,24 @@ function getStudioPageHtml(): string {
for (const table of tables) {
const pos = positions.get(table.id);
if (!pos) continue;
const isStoredTable = String(table.operator || "").toLowerCase() === "stored";
const operatorClass = (() => {
const normalized = String(table.operator || "unknown").toLowerCase();
if (normalized === "stored" || normalized === "map" || normalized === "flatmap" || normalized === "groupby" || normalized === "filter") {
return normalized;
}
return "derived";
})();
const node = document.createElement("div");
node.className = "node" + (state.selectedTableId === table.id ? " active" : "");
node.style.left = pos.x + "px";
node.style.top = pos.y + "px";

const type = document.createElement("div");
type.className = "node-type " + (isStoredTable ? "stored" : "derived");
type.className = "node-type " + operatorClass;
type.textContent = String(table.operator || "unknown");

const name = document.createElement("div");
name.className = "node-name mono " + (isStoredTable ? "stored" : "derived");
name.className = "node-name mono " + operatorClass;
name.textContent = table.name;

const meta = document.createElement("div");
Expand All @@ -980,7 +1003,7 @@ function getStudioPageHtml(): string {
left.className = "row";
if (!table.initialized) {
const initBtn = document.createElement("button");
initBtn.className = "btn good";
initBtn.className = "btn bad";
initBtn.textContent = "🚀 init";
initBtn.onclick = (event) => {
event.stopPropagation();
Expand All @@ -989,6 +1012,7 @@ function getStudioPageHtml(): string {
});
};
left.appendChild(initBtn);
node.style.borderColor = "red";
}
const focusBtn = document.createElement("button");
focusBtn.className = "btn icon";
Expand Down Expand Up @@ -1021,7 +1045,7 @@ function getStudioPageHtml(): string {
}

function getRawInputDefault() {
return "{\\n \\"accountId\\": \\"acct-demo\\",\\n \\"asset\\": \\"USD\\",\\n \\"amount\\": \\"10.00\\",\\n \\"side\\": \\"credit\\",\\n \\"txHash\\": \\"0xdemo\\",\\n \\"blockNumber\\": 1,\\n \\"timestamp\\": \\"2026-01-01T00:00:00Z\\",\\n \\"counterparty\\": null,\\n \\"memo\\": null\\n}";
return "{\\n \\"accountId\\": \\"acct-demo\\",\\n \\"asset\\": \\"USD\\",\\n \\"amount\\": \\"1500.00\\",\\n \\"side\\": \\"credit\\",\\n \\"txHash\\": \\"0xdemo\\",\\n \\"blockNumber\\": 1,\\n \\"timestamp\\": \\"2026-01-01T00:00:00Z\\",\\n \\"counterparty\\": \\"acct-peer\\",\\n \\"memo\\": null\\n}";
}

async function loadSchema() {
Expand Down
27 changes: 26 additions & 1 deletion apps/backend/src/lib/bulldozer/db/example-schema.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { declareFlatMapTable, declareGroupByTable, declareMapTable, declareStoredTable } from "./index";
import { declareFilterTable, declareFlatMapTable, declareGroupByTable, declareMapTable, declareStoredTable } from "./index";

const mapper = (sql: string) => ({ type: "mapper" as const, sql });
const predicate = (sql: string) => ({ type: "predicate" as const, sql });

/**
* Example fungible-asset ledger schema composed from Bulldozer table operators.
Expand Down Expand Up @@ -99,6 +100,27 @@ export const exampleFungibleLedgerSchema = (() => {
`),
});

// Keep only entries with a non-null counterparty for suspicious-flow style inspections.
const accountEntriesWithCounterparty = declareFilterTable({
tableId: "bulldozer-example-ledger-account-entries-with-counterparty",
fromTable: entriesByAccount,
filter: predicate(`("rowData"->'counterparty') IS NOT NULL`),
});
Comment thread
N2D4 marked this conversation as resolved.
Comment thread
N2D4 marked this conversation as resolved.

// Keep only large-value entries to model risk/alerting-style subsets.
const highValueEntriesByAsset = declareFilterTable({
tableId: "bulldozer-example-ledger-high-value-entries-by-asset",
fromTable: entriesByAsset,
filter: predicate(`(("rowData"->>'amount')::numeric) >= 1000`),
});

// Partition high-value entries by account for analyst-friendly slices.
const highValueEntriesByAssetAccount = declareGroupByTable({
tableId: "bulldozer-example-ledger-high-value-entries-by-asset-account",
fromTable: highValueEntriesByAsset,
groupBy: mapper(`"rowData"->'accountId' AS "groupKey"`),
});

// Enrich asset-grouped rows for downstream analytics views.
const assetEntriesNormalized = declareMapTable({
tableId: "bulldozer-example-ledger-asset-entries-normalized",
Expand All @@ -123,6 +145,9 @@ export const exampleFungibleLedgerSchema = (() => {
accountEntriesNormalized,
accountEntryLegs,
accountAssetPartitions,
accountEntriesWithCounterparty,
highValueEntriesByAsset,
highValueEntriesByAssetAccount,
assetEntriesNormalized,
};
})();
119 changes: 118 additions & 1 deletion apps/backend/src/lib/bulldozer/db/index.fuzz.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { stringCompare, templateIdentity } from "@stackframe/stack-shared/dist/utils/strings";
import postgres from "postgres";
import { afterAll, beforeAll, beforeEach, describe, expect, test } from "vitest";
import { declareFlatMapTable, declareGroupByTable, declareMapTable, declareStoredTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index";
import { declareFilterTable, declareFlatMapTable, declareGroupByTable, declareMapTable, declareStoredTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index";

type TestDb = { full: string, base: string };

Expand Down Expand Up @@ -146,6 +146,21 @@ function flatMapGroups<OldRow extends Record<string, unknown>, NewRow extends Re
}
return mapped;
}
function filterGroups<Row extends Record<string, unknown>>(
groups: GroupedRows<Row>,
predicateFn: (row: Row) => boolean,
): GroupedRows<Row> {
const filtered: GroupedRows<Row> = new Map();
for (const [groupKey, group] of groups) {
const rows = new Map<string, Row>();
for (const [rowIdentifier, rowData] of group.rows) {
if (!predicateFn(rowData)) continue;
rows.set(`${rowIdentifier}:1`, rowData);
}
filtered.set(groupKey, { groupKey: group.groupKey, rows });
}
return filtered;
}

describe.sequential("bulldozer db fuzz composition (real postgres)", () => {
const dbUrls = getTestDbUrls();
Expand Down Expand Up @@ -535,6 +550,108 @@ describe.sequential("bulldozer db fuzz composition (real postgres)", () => {
}
}, 120_000);

test("fuzz: filter/map pipelines preserve invariants under random mutations and re-inits", async () => {
const identifiers = ["ff1", "ff2", "ff3", "ff:4", "ff 5"] as const;
const teams = ["alpha", "beta", "gamma", null] as const;

for (const seed of [701]) {
const rng = createRng(seed);
const sourceRows = new Map<string, SourceRow>();
let filterPipelineInitialized = true;

const fromTable = declareStoredTable<{ value: number, team: string | null }>({ tableId: `filter-fuzz-users-${seed}` });
const groupedTable = declareGroupByTable({
tableId: `filter-fuzz-users-by-team-${seed}`,
fromTable,
groupBy: mapper(`"rowData"->'team' AS "groupKey"`),
});
const filterTable = declareFilterTable({
tableId: `filter-fuzz-users-threshold-${seed}`,
fromTable: groupedTable,
filter: { type: "predicate", sql: `("rowData"->'team') IS NOT NULL AND (("rowData"->>'value')::int) >= 10` },
});
const mappedAfterFilter = declareMapTable({
tableId: `filter-fuzz-users-mapped-${seed}`,
fromTable: filterTable,
mapper: mapper(`
("rowData"->'team') AS "team",
(("rowData"->>'value')::int * 10) AS "scaledValue"
`),
});

await runStatements(fromTable.init());
await runStatements(groupedTable.init());
await runStatements(filterTable.init());
await runStatements(mappedAfterFilter.init());

for (let step = 0; step < 28; step++) {
const roll = rng();
if (roll < 0.6) {
const rowIdentifier = choose(rng, identifiers);
const rowData: SourceRow = {
team: choose(rng, teams),
value: Math.floor(rng() * 35) - 5,
};
sourceRows.set(rowIdentifier, rowData);
await runStatements(fromTable.setRow(rowIdentifier, expr(jsonbLiteral(rowData))));
} else if (roll < 0.82) {
const rowIdentifier = choose(rng, identifiers);
sourceRows.delete(rowIdentifier);
await runStatements(fromTable.deleteRow(rowIdentifier));
} else if (roll < 0.9) {
if (filterPipelineInitialized) {
await runStatements(mappedAfterFilter.delete());
await runStatements(filterTable.delete());
filterPipelineInitialized = false;
}
} else {
if (!filterPipelineInitialized) {
await runStatements(filterTable.init());
await runStatements(mappedAfterFilter.init());
filterPipelineInitialized = true;
}
}

if (step % 3 === 0 || step === 27) {
const expectedGrouped = computeTeamGroups(sourceRows);
const expectedFiltered = filterGroups(expectedGrouped, (row) => row.team != null && row.value >= 10);
const expectedMapped = mapGroups(expectedFiltered, (row) => {
if (row.team == null) {
throw new Error("expected non-null team after filter predicate");
}
return {
team: row.team,
scaledValue: row.value * 10,
};
});

await assertTableMatches(groupedTable, expectedGrouped);
if (filterPipelineInitialized) {
expect(await readBoolean(filterTable.isInitialized())).toBe(true);
expect(await readBoolean(mappedAfterFilter.isInitialized())).toBe(true);
await assertTableMatches(filterTable, expectedFiltered);
await assertTableMatches(mappedAfterFilter, expectedMapped);
} else {
expect(await readBoolean(filterTable.isInitialized())).toBe(false);
expect(await readBoolean(mappedAfterFilter.isInitialized())).toBe(false);
expect(await readRows(filterTable.listGroups({
start: "start",
end: "end",
startInclusive: true,
endInclusive: true,
}))).toEqual([]);
expect(await readRows(mappedAfterFilter.listGroups({
start: "start",
end: "end",
startInclusive: true,
endInclusive: true,
}))).toEqual([]);
}
}
}
}
}, 120_000);

test("fuzz: parallel map tables remain isolated with independent re-inits", async () => {
const identifiers = ["m1", "m2", "m3", "m 4", "m:5"] as const;
const teams = ["alpha", "beta", null] as const;
Expand Down
45 changes: 42 additions & 3 deletions apps/backend/src/lib/bulldozer/db/index.perf.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { stringCompare } from "@stackframe/stack-shared/dist/utils/strings";
import postgres from "postgres";
import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest";
import { declareFlatMapTable, declareGroupByTable, declareMapTable, declareStoredTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index";
import { declareFilterTable, declareFlatMapTable, declareGroupByTable, declareMapTable, declareStoredTable, toExecutableSqlTransaction, toQueryableSqlQuery } from "./index";

type TestDb = { full: string, base: string };
type SqlExpression<T> = { type: "expression", sql: string };
Expand Down Expand Up @@ -32,6 +32,8 @@ const LOAD_DERIVED_COUNT_QUERY_MAX_MS = 10_000;
const LOAD_EXPANDING_INIT_MAX_MS = 120_000;
const LOAD_EXPANDING_COUNT_QUERY_MAX_MS = 15_000;
const LOAD_FILTERED_QUERY_MAX_MS = 4_000;
const LOAD_FILTER_TABLE_INIT_MAX_MS = 90_000;
const LOAD_FILTER_TABLE_COUNT_QUERY_MAX_MS = 8_000;

function getTestDbUrls(): TestDb {
const env = Reflect.get(import.meta, "env");
Expand Down Expand Up @@ -422,6 +424,11 @@ describe.sequential("bulldozer db performance (real postgres)", () => {
fromTable: mappedTwice,
groupBy: { type: "mapper", sql: `"rowData"->'bucket' AS "groupKey"` },
});
const filteredHighValue = declareFilterTable({
tableId: "load-prefilled-users-high-value",
fromTable: groupedByTeam,
filter: { type: "predicate", sql: `( ("rowData"->>'value')::int ) >= 700` },
});
const expandedByTeam = declareFlatMapTable({
tableId: "load-prefilled-users-expanded",
fromTable: groupedByTeam,
Expand Down Expand Up @@ -457,6 +464,10 @@ describe.sequential("bulldozer db performance (real postgres)", () => {
await runStatements(groupedByBucket.init());
});
expect(bucketInit.elapsedMs).toBeLessThan(LOAD_DERIVED_INIT_MAX_MS);
const filterInit = await measureMs("load init filteredHighValue", async () => {
await runStatements(filteredHighValue.init());
});
expect(filterInit.elapsedMs).toBeLessThan(LOAD_FILTER_TABLE_INIT_MAX_MS);
const expandInit = await measureMs("load init expandedByTeam", async () => {
await runStatements(expandedByTeam.init());
});
Expand Down Expand Up @@ -486,19 +497,37 @@ describe.sequential("bulldozer db performance (real postgres)", () => {
startInclusive: true,
endInclusive: true,
});
const filteredHighValueCountQuery = filteredHighValue.listRowsInGroup({
start: "start",
end: "end",
startInclusive: true,
endInclusive: true,
});
const derivedCounts = await measureMs("load count derived tables", async () => {
return await Promise.all([
sql.unsafe(`SELECT COUNT(*)::int AS "count" FROM (${toQueryableSqlQuery(groupedCountQuery)}) AS "rows"`),
sql.unsafe(`SELECT COUNT(*)::int AS "count" FROM (${toQueryableSqlQuery(mappedCountQuery)}) AS "rows"`),
sql.unsafe(`SELECT COUNT(*)::int AS "count" FROM (${toQueryableSqlQuery(bucketCountQuery)}) AS "rows"`),
sql.unsafe(`SELECT COUNT(*)::int AS "count" FROM (${toQueryableSqlQuery(filteredHighValueCountQuery)}) AS "rows"`),
sql.unsafe(`SELECT COUNT(*)::int AS "count" FROM (${toQueryableSqlQuery(expandedCountQuery)}) AS "rows"`),
]);
});
expect(derivedCounts.elapsedMs).toBeLessThan(LOAD_DERIVED_COUNT_QUERY_MAX_MS);
expect(Number(derivedCounts.result[0][0].count)).toBe(loadRowCount - 1);
expect(Number(derivedCounts.result[1][0].count)).toBe(loadRowCount - 1);
expect(Number(derivedCounts.result[2][0].count)).toBe(loadRowCount - 1);
expect(Number(derivedCounts.result[3][0].count)).toBe((loadRowCount - 1) * 2);
expect(Number(derivedCounts.result[3][0].count)).toBeGreaterThan(0);
expect(Number(derivedCounts.result[3][0].count)).toBeLessThan(loadRowCount);
expect(Number(derivedCounts.result[4][0].count)).toBe((loadRowCount - 1) * 2);

const filteredHighValueCountOnly = await measureMs("load count filteredHighValue table only", async () => {
return await sql.unsafe(`
SELECT COUNT(*)::int AS "count"
FROM (${toQueryableSqlQuery(filteredHighValueCountQuery)}) AS "rows"
`);
});
expect(filteredHighValueCountOnly.elapsedMs).toBeLessThan(LOAD_FILTER_TABLE_COUNT_QUERY_MAX_MS);
expect(Number(filteredHighValueCountOnly.result[0].count)).toBeGreaterThan(0);

const expandedCountOnly = await measureMs("load count expanded table only", async () => {
return await sql.unsafe(`
Expand Down Expand Up @@ -564,6 +593,16 @@ describe.sequential("bulldozer db performance (real postgres)", () => {
{ rowIdentifier: "seed-100000:1", rowData: { team: "delta", kind: "base", mappedValue: 1009 } },
{ rowIdentifier: "seed-100000:2", rowData: { team: "delta", kind: "double", mappedValue: 1998 } },
]);
const filteredDeltaRows = await readRows(filteredHighValue.listRowsInGroup({
groupKey: expr(`to_jsonb('delta'::text)`),
start: "start",
end: "end",
startInclusive: true,
endInclusive: true,
}));
expect(filteredDeltaRows.map((row) => ({ rowIdentifier: row.rowidentifier, rowData: row.rowdata }))).toEqual([
{ rowIdentifier: "seed-100000:1", rowData: { team: "delta", value: 999 } },
]);

const bulkDelete = await measureMs("load full table delete", async () => {
await runStatements(table.delete());
Expand All @@ -583,7 +622,7 @@ describe.sequential("bulldozer db performance (real postgres)", () => {
`;
expect(isInitializedRows[0].initialized).toBe(false);

logLine(`[bulldozer-perf] load thresholds(ms): prefill<=${LOAD_PREFILL_MAX_MS}, baseCount<=${LOAD_COUNT_QUERY_MAX_MS}, setRowAvg<=${LOAD_SET_ROW_AVG_MAX_MS} over ${LOAD_SET_ROW_AVG_ITERATIONS}, pointDelete<=${LOAD_POINT_MUTATION_MAX_MS}, derivedInit<=${LOAD_DERIVED_INIT_MAX_MS}, expandingInit<=${LOAD_EXPANDING_INIT_MAX_MS}, derivedCount<=${LOAD_DERIVED_COUNT_QUERY_MAX_MS}, expandingCount<=${LOAD_EXPANDING_COUNT_QUERY_MAX_MS}, filteredQuery<=${LOAD_FILTERED_QUERY_MAX_MS}, tableDelete<=${LOAD_TABLE_DELETE_MAX_MS}`);
logLine(`[bulldozer-perf] load thresholds(ms): prefill<=${LOAD_PREFILL_MAX_MS}, baseCount<=${LOAD_COUNT_QUERY_MAX_MS}, setRowAvg<=${LOAD_SET_ROW_AVG_MAX_MS} over ${LOAD_SET_ROW_AVG_ITERATIONS}, pointDelete<=${LOAD_POINT_MUTATION_MAX_MS}, derivedInit<=${LOAD_DERIVED_INIT_MAX_MS}, filterInit<=${LOAD_FILTER_TABLE_INIT_MAX_MS}, expandingInit<=${LOAD_EXPANDING_INIT_MAX_MS}, derivedCount<=${LOAD_DERIVED_COUNT_QUERY_MAX_MS}, filterCount<=${LOAD_FILTER_TABLE_COUNT_QUERY_MAX_MS}, expandingCount<=${LOAD_EXPANDING_COUNT_QUERY_MAX_MS}, filteredQuery<=${LOAD_FILTERED_QUERY_MAX_MS}, tableDelete<=${LOAD_TABLE_DELETE_MAX_MS}`);
}, 180_000);
});

Loading
Loading