Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
ALTER TABLE `session_message` ADD `seq` integer NOT NULL;--> statement-breakpoint
UPDATE `session_message`
SET `seq` = COALESCE(
(SELECT `seq` FROM `event` WHERE `event`.`id` = `session_message`.`id`),
(
SELECT COUNT(*) - 1
FROM `session_message` AS `ordered`
WHERE `ordered`.`session_id` = `session_message`.`session_id`
AND (
`ordered`.`time_created` < `session_message`.`time_created`
OR (
`ordered`.`time_created` = `session_message`.`time_created`
AND `ordered`.`id` <= `session_message`.`id`
)
)
)
);--> statement-breakpoint
Comment on lines 1 to +17
DROP INDEX IF EXISTS `session_message_session_time_created_id_idx`;--> statement-breakpoint
DROP INDEX IF EXISTS `session_message_session_type_time_created_id_idx`;--> statement-breakpoint
CREATE INDEX `session_message_session_seq_idx` ON `session_message` (`session_id`,`seq`);--> statement-breakpoint
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
CREATE TEMP TABLE `project_worktree_canonical` (
`old_id` text PRIMARY KEY,
`new_id` text NOT NULL
);--> statement-breakpoint
INSERT INTO `project_worktree_canonical` (`old_id`, `new_id`)
SELECT
`project`.`id`,
(
SELECT `canonical`.`id`
FROM `project` AS `canonical`
WHERE `canonical`.`worktree` = `project`.`worktree`
ORDER BY
`canonical`.`time_updated` DESC,
`canonical`.`time_created` DESC,
`canonical`.`id` DESC
LIMIT 1
)
FROM `project`
WHERE `project`.`worktree` IN (
SELECT `worktree`
FROM `project`
GROUP BY `worktree`
HAVING COUNT(*) > 1
);--> statement-breakpoint
DELETE FROM `project_worktree_canonical`
WHERE `old_id` = `new_id`;--> statement-breakpoint
UPDATE `session`
SET `project_id` = (
SELECT `new_id`
FROM `project_worktree_canonical`
WHERE `old_id` = `session`.`project_id`
)
WHERE `project_id` IN (
SELECT `old_id`
FROM `project_worktree_canonical`
);--> statement-breakpoint
UPDATE `workspace`
SET `project_id` = (
SELECT `new_id`
FROM `project_worktree_canonical`
WHERE `old_id` = `workspace`.`project_id`
)
WHERE `project_id` IN (
SELECT `old_id`
FROM `project_worktree_canonical`
);--> statement-breakpoint
INSERT OR IGNORE INTO `project_directory` (`project_id`, `directory`, `type`, `time_created`)
SELECT
`project_worktree_canonical`.`new_id`,
`project_directory`.`directory`,
`project_directory`.`type`,
`project_directory`.`time_created`
FROM `project_directory`
JOIN `project_worktree_canonical`
ON `project_worktree_canonical`.`old_id` = `project_directory`.`project_id`;--> statement-breakpoint
DELETE FROM `project_directory`
WHERE `project_id` IN (
SELECT `old_id`
FROM `project_worktree_canonical`
);--> statement-breakpoint
DELETE FROM `project`
WHERE `id` IN (
SELECT `old_id`
FROM `project_worktree_canonical`
);--> statement-breakpoint
DROP TABLE `project_worktree_canonical`;
1 change: 1 addition & 0 deletions packages/core/src/database/migration.gen.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,23 @@ export default {
return Effect.gen(function* () {
yield* tx.run(`ALTER TABLE \`session_message\` ADD COLUMN \`seq\` integer NOT NULL DEFAULT 0;`)
yield* tx.run(
`UPDATE \`session_message\` SET \`seq\` = COALESCE((SELECT \`seq\` + 1 FROM \`event\` WHERE \`event\`.\`id\` = \`session_message\`.\`id\`), 0);`,
`UPDATE \`session_message\`
SET \`seq\` = COALESCE(
(SELECT \`seq\` FROM \`event\` WHERE \`event\`.\`id\` = \`session_message\`.\`id\`),
(
SELECT COUNT(*) - 1
FROM \`session_message\` AS \`ordered\`
WHERE \`ordered\`.\`session_id\` = \`session_message\`.\`session_id\`
AND (
\`ordered\`.\`time_created\` < \`session_message\`.\`time_created\`
OR (
\`ordered\`.\`time_created\` = \`session_message\`.\`time_created\`
AND \`ordered\`.\`id\` <= \`session_message\`.\`id\`
)
)
)
);`,
Comment on lines +10 to +25
)
const unmatched = yield* tx.get<{ count: number }>(
`SELECT COUNT(*) AS \`count\` FROM \`session_message\` WHERE \`seq\` = 0;`,
)
if ((unmatched?.count ?? 0) > 0)
return yield* Effect.die("Cannot migrate session_message projections without matching durable events")
yield* tx.run(`UPDATE \`session_message\` SET \`seq\` = \`seq\` - 1;`)
yield* tx.run(`DROP INDEX IF EXISTS \`session_message_session_type_time_created_id_idx\`;`)
yield* tx.run(`CREATE INDEX \`session_message_session_seq_idx\` ON \`session_message\` (\`session_id\`,\`seq\`);`)
yield* tx.run(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"

export default {
id: "20260604120000_dedupe_project_worktrees",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`
CREATE TEMP TABLE \`project_worktree_canonical\` (
\`old_id\` text PRIMARY KEY,
\`new_id\` text NOT NULL
);
`)
yield* tx.run(`
INSERT INTO \`project_worktree_canonical\` (\`old_id\`, \`new_id\`)
SELECT
\`project\`.\`id\`,
(
SELECT \`canonical\`.\`id\`
FROM \`project\` AS \`canonical\`
WHERE \`canonical\`.\`worktree\` = \`project\`.\`worktree\`
ORDER BY
\`canonical\`.\`time_updated\` DESC,
\`canonical\`.\`time_created\` DESC,
\`canonical\`.\`id\` DESC
LIMIT 1
)
FROM \`project\`
WHERE \`project\`.\`worktree\` IN (
SELECT \`worktree\`
FROM \`project\`
GROUP BY \`worktree\`
HAVING COUNT(*) > 1
);
`)
yield* tx.run(`
DELETE FROM \`project_worktree_canonical\`
WHERE \`old_id\` = \`new_id\`;
`)
yield* tx.run(`
UPDATE \`session\`
SET \`project_id\` = (
SELECT \`new_id\`
FROM \`project_worktree_canonical\`
WHERE \`old_id\` = \`session\`.\`project_id\`
)
WHERE \`project_id\` IN (
SELECT \`old_id\`
FROM \`project_worktree_canonical\`
);
`)
yield* tx.run(`
UPDATE \`workspace\`
SET \`project_id\` = (
SELECT \`new_id\`
FROM \`project_worktree_canonical\`
WHERE \`old_id\` = \`workspace\`.\`project_id\`
)
WHERE \`project_id\` IN (
SELECT \`old_id\`
FROM \`project_worktree_canonical\`
);
`)
yield* tx.run(`
INSERT OR IGNORE INTO \`project_directory\` (\`project_id\`, \`directory\`, \`type\`, \`time_created\`)
SELECT
\`project_worktree_canonical\`.\`new_id\`,
\`project_directory\`.\`directory\`,
\`project_directory\`.\`type\`,
\`project_directory\`.\`time_created\`
FROM \`project_directory\`
JOIN \`project_worktree_canonical\`
ON \`project_worktree_canonical\`.\`old_id\` = \`project_directory\`.\`project_id\`;
`)
yield* tx.run(`
DELETE FROM \`project_directory\`
WHERE \`project_id\` IN (
SELECT \`old_id\`
FROM \`project_worktree_canonical\`
);
`)
yield* tx.run(`
DELETE FROM \`project\`
WHERE \`id\` IN (
SELECT \`old_id\`
FROM \`project_worktree_canonical\`
);
`)
yield* tx.run(`DROP TABLE \`project_worktree_canonical\`;`)
})
},
} satisfies DatabaseMigration.Migration
85 changes: 67 additions & 18 deletions packages/core/test/database-migration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { DatabaseMigration } from "@opencode-ai/core/database/migration"
import sessionUsageMigration from "@opencode-ai/core/database/migration/20260510033149_session_usage"
import normalizeStoragePathsMigration from "@opencode-ai/core/database/migration/20260601010001_normalize_storage_paths"
import sessionMessageProjectionOrderMigration from "@opencode-ai/core/database/migration/20260603040000_session_message_projection_order"
import dedupeProjectWorktreesMigration from "@opencode-ai/core/database/migration/20260604120000_dedupe_project_worktrees"
import { ProjectV2 } from "@opencode-ai/core/project"
import { ProjectTable } from "@opencode-ai/core/project/sql"
import { AbsolutePath } from "@opencode-ai/core/schema"
Expand Down Expand Up @@ -62,7 +63,7 @@ describe("DatabaseMigration", () => {
expect(
yield* db.get(sql`SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'session_input'`),
).toEqual({ name: "session_input" })
expect(yield* db.get(sql`SELECT count(*) as count FROM migration`)).toEqual({ count: 29 })
expect(yield* db.get(sql`SELECT count(*) as count FROM migration`)).toEqual({ count: 30 })
expect(
yield* db.all(
sql`SELECT name FROM sqlite_master WHERE type = 'index' AND name IN ('event_aggregate_seq_idx', 'event_aggregate_type_seq_idx', 'session_input_session_pending_seq_idx', 'session_input_session_pending_delivery_seq_idx', 'session_message_session_idx', 'session_message_session_type_idx', 'session_message_session_seq_idx', 'session_message_session_type_seq_idx', 'session_message_session_time_created_id_idx') ORDER BY name`,
Expand Down Expand Up @@ -108,23 +109,71 @@ describe("DatabaseMigration", () => {
)
})

test("fails projected Session message order backfill without a durable event", async () => {
await expect(
run(
Effect.gen(function* () {
const db = yield* makeDb
yield* db.run(sql`CREATE TABLE event (id text PRIMARY KEY, seq integer NOT NULL)`)
yield* db.run(
sql`CREATE TABLE session_message (id text PRIMARY KEY, session_id text NOT NULL, type text NOT NULL, time_created integer NOT NULL, data text NOT NULL)`,
)
yield* db.run(
sql`INSERT INTO session_message (id, session_id, type, time_created, data) VALUES ('evt_missing', 'session', 'user', 0, '{}')`,
)

yield* DatabaseMigration.applyOnly(db, [sessionMessageProjectionOrderMigration])
}),
),
).rejects.toThrow("Cannot migrate session_message projections without matching durable events")
test("falls back to projected Session message order without durable events", async () => {
await run(
Effect.gen(function* () {
const db = yield* makeDb
yield* db.run(sql`CREATE TABLE event (id text PRIMARY KEY, seq integer NOT NULL)`)
yield* db.run(
sql`CREATE TABLE session_message (id text PRIMARY KEY, session_id text NOT NULL, type text NOT NULL, time_created integer NOT NULL, data text NOT NULL)`,
)
yield* db.run(
sql`INSERT INTO session_message (id, session_id, type, time_created, data) VALUES ('evt_later', 'session', 'user', 2, '{}'), ('evt_same_b', 'session', 'user', 1, '{}'), ('evt_same_a', 'session', 'user', 1, '{}')`,
)

yield* DatabaseMigration.applyOnly(db, [sessionMessageProjectionOrderMigration])

expect(yield* db.all(sql`SELECT id, seq FROM session_message ORDER BY seq`)).toEqual([
{ id: "evt_same_a", seq: 0 },
{ id: "evt_same_b", seq: 1 },
{ id: "evt_later", seq: 2 },
])
}),
)
})

test("deduplicates projects with the same worktree", async () => {
await run(
Effect.gen(function* () {
const db = yield* makeDb
yield* db.run(
sql`CREATE TABLE project (id text PRIMARY KEY, worktree text NOT NULL, time_created integer NOT NULL, time_updated integer NOT NULL)`,
)
yield* db.run(
sql`CREATE TABLE session (id text PRIMARY KEY, project_id text NOT NULL, directory text NOT NULL)`,
)
yield* db.run(sql`CREATE TABLE workspace (id text PRIMARY KEY, project_id text NOT NULL)`)
yield* db.run(
sql`CREATE TABLE project_directory (project_id text NOT NULL, directory text NOT NULL, type text NOT NULL, time_created integer NOT NULL, PRIMARY KEY (project_id, directory))`,
)
yield* db.run(
sql`INSERT INTO project (id, worktree, time_created, time_updated) VALUES ('old', '/repo', 1, 10), ('new', '/repo', 2, 20), ('other', '/other', 3, 30)`,
)
yield* db.run(
sql`INSERT INTO session (id, project_id, directory) VALUES ('old-session', 'old', '/repo'), ('new-session', 'new', '/repo'), ('other-session', 'other', '/other')`,
)
yield* db.run(sql`INSERT INTO workspace (id, project_id) VALUES ('old-workspace', 'old')`)
yield* db.run(
sql`INSERT INTO project_directory (project_id, directory, type, time_created) VALUES ('old', '/repo/packages/app', 'root', 11), ('old', '/repo', 'main', 12), ('new', '/repo', 'main', 22)`,
)

yield* DatabaseMigration.applyOnly(db, [dedupeProjectWorktreesMigration])

expect(yield* db.all(sql`SELECT id, project_id FROM session ORDER BY id`)).toEqual([
{ id: "new-session", project_id: "new" },
{ id: "old-session", project_id: "new" },
{ id: "other-session", project_id: "other" },
])
expect(yield* db.all(sql`SELECT id, project_id FROM workspace`)).toEqual([
{ id: "old-workspace", project_id: "new" },
])
expect(yield* db.all(sql`SELECT id FROM project ORDER BY id`)).toEqual([{ id: "new" }, { id: "other" }])
expect(yield* db.all(sql`SELECT project_id, directory FROM project_directory ORDER BY directory`)).toEqual([
{ project_id: "new", directory: "/repo" },
{ project_id: "new", directory: "/repo/packages/app" },
])
}),
)
})

test("runs session usage backfill in order with schema changes", async () => {
Expand Down
31 changes: 30 additions & 1 deletion packages/opencode/src/project/project.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { and, eq, sql } from "drizzle-orm"
import { and, eq, ne, sql } from "drizzle-orm"
import { Database } from "@opencode-ai/core/database/database"
import { ProjectDirectoryTable, ProjectTable } from "@opencode-ai/core/project/sql"
import { SessionTable } from "@opencode-ai/core/session/sql"
Expand Down Expand Up @@ -209,6 +209,20 @@ export const layer = Layer.effect(
.set({ project_id: newID })
.where(eq(WorkspaceTable.project_id, oldID))
.run()
yield* d.run(sql`
INSERT OR IGNORE INTO project_directory (project_id, directory, type, time_created)
SELECT
${newID},
directory,
type,
time_created
FROM project_directory
WHERE project_id = ${oldID}
`)
Comment on lines +212 to +221
yield* d
.delete(ProjectDirectoryTable)
.where(eq(ProjectDirectoryTable.project_id, oldID))
.run()

if (oldProject) yield* d.delete(ProjectTable).where(eq(ProjectTable.id, oldID)).run()
}),
Expand All @@ -217,6 +231,20 @@ export const layer = Layer.effect(
.pipe(Effect.orDie)
})

const migrateProjectWorktreeIds = Effect.fn("Project.migrateProjectWorktreeIds")(function* (
worktree: string,
projectID: ProjectV2.ID,
) {
if (projectID === ProjectV2.ID.global) return
const rows = yield* db
.select({ id: ProjectTable.id })
.from(ProjectTable)
.where(and(eq(ProjectTable.worktree, AbsolutePath.make(worktree)), ne(ProjectTable.id, projectID)))
.all()
.pipe(Effect.orDie)
yield* Effect.forEach(rows, (row) => migrateProjectId(row.id, projectID), { discard: true })
})

const saveProjectDirectory = Effect.fn("Project.saveProjectDirectory")(function* (input: {
projectID: ProjectV2.ID
directory: string
Expand Down Expand Up @@ -260,6 +288,7 @@ export const layer = Layer.effect(
// Phase 2: upsert
const projectID = ProjectV2.ID.make(data.id)
yield* migrateProjectId(data.previous ? ProjectV2.ID.make(data.previous) : undefined, projectID)
yield* migrateProjectWorktreeIds(worktree, projectID)
const row = yield* db.select().from(ProjectTable).where(eq(ProjectTable.id, projectID)).get().pipe(Effect.orDie)
const existing = row
? fromRow(row)
Expand Down
Loading
Loading