diff --git a/packages/core/migration/20260603040000_session_message_projection_order/migration.sql b/packages/core/migration/20260603040000_session_message_projection_order/migration.sql index a56b07b67613..9b7b5a8a85f4 100644 --- a/packages/core/migration/20260603040000_session_message_projection_order/migration.sql +++ b/packages/core/migration/20260603040000_session_message_projection_order/migration.sql @@ -1,4 +1,20 @@ ALTER TABLE `session_message` ADD `seq` integer NOT NULL;--> statement-breakpoint +UPDATE `session_message` +SET `seq` = COALESCE( + (SELECT `seq` FROM `event` WHERE `event`.`id` = `session_message`.`id`), + ( + SELECT COUNT(*) - 1 + FROM `session_message` AS `ordered` + WHERE `ordered`.`session_id` = `session_message`.`session_id` + AND ( + `ordered`.`time_created` < `session_message`.`time_created` + OR ( + `ordered`.`time_created` = `session_message`.`time_created` + AND `ordered`.`id` <= `session_message`.`id` + ) + ) + ) +);--> statement-breakpoint DROP INDEX IF EXISTS `session_message_session_time_created_id_idx`;--> statement-breakpoint DROP INDEX IF EXISTS `session_message_session_type_time_created_id_idx`;--> statement-breakpoint CREATE INDEX `session_message_session_seq_idx` ON `session_message` (`session_id`,`seq`);--> statement-breakpoint diff --git a/packages/core/migration/20260604120000_dedupe_project_worktrees/migration.sql b/packages/core/migration/20260604120000_dedupe_project_worktrees/migration.sql new file mode 100644 index 000000000000..34d813ab1117 --- /dev/null +++ b/packages/core/migration/20260604120000_dedupe_project_worktrees/migration.sql @@ -0,0 +1,66 @@ +CREATE TEMP TABLE `project_worktree_canonical` ( + `old_id` text PRIMARY KEY, + `new_id` text NOT NULL +);--> statement-breakpoint +INSERT INTO `project_worktree_canonical` (`old_id`, `new_id`) +SELECT + `project`.`id`, + ( + SELECT `canonical`.`id` + FROM `project` AS `canonical` + WHERE `canonical`.`worktree` = `project`.`worktree` + ORDER BY + `canonical`.`time_updated` DESC, + `canonical`.`time_created` DESC, + `canonical`.`id` DESC + LIMIT 1 + ) +FROM `project` +WHERE `project`.`worktree` IN ( + SELECT `worktree` + FROM `project` + GROUP BY `worktree` + HAVING COUNT(*) > 1 +);--> statement-breakpoint +DELETE FROM `project_worktree_canonical` +WHERE `old_id` = `new_id`;--> statement-breakpoint +UPDATE `session` +SET `project_id` = ( + SELECT `new_id` + FROM `project_worktree_canonical` + WHERE `old_id` = `session`.`project_id` +) +WHERE `project_id` IN ( + SELECT `old_id` + FROM `project_worktree_canonical` +);--> statement-breakpoint +UPDATE `workspace` +SET `project_id` = ( + SELECT `new_id` + FROM `project_worktree_canonical` + WHERE `old_id` = `workspace`.`project_id` +) +WHERE `project_id` IN ( + SELECT `old_id` + FROM `project_worktree_canonical` +);--> statement-breakpoint +INSERT OR IGNORE INTO `project_directory` (`project_id`, `directory`, `type`, `time_created`) +SELECT + `project_worktree_canonical`.`new_id`, + `project_directory`.`directory`, + `project_directory`.`type`, + `project_directory`.`time_created` +FROM `project_directory` +JOIN `project_worktree_canonical` +ON `project_worktree_canonical`.`old_id` = `project_directory`.`project_id`;--> statement-breakpoint +DELETE FROM `project_directory` +WHERE `project_id` IN ( + SELECT `old_id` + FROM `project_worktree_canonical` +);--> statement-breakpoint +DELETE FROM `project` +WHERE `id` IN ( + SELECT `old_id` + FROM `project_worktree_canonical` +);--> statement-breakpoint +DROP TABLE `project_worktree_canonical`; diff --git a/packages/core/src/database/migration.gen.ts b/packages/core/src/database/migration.gen.ts index 9f2ba82eb3f1..77a52fc2618a 100644 --- a/packages/core/src/database/migration.gen.ts +++ b/packages/core/src/database/migration.gen.ts @@ -31,5 +31,6 @@ export const migrations = ( import("./migration/20260603040000_session_message_projection_order"), import("./migration/20260603141458_session_input_inbox"), import("./migration/20260603160727_jittery_ezekiel_stane"), + import("./migration/20260604120000_dedupe_project_worktrees"), ]) ).map((module) => module.default) satisfies DatabaseMigration.Migration[] diff --git a/packages/core/src/database/migration/20260603040000_session_message_projection_order.ts b/packages/core/src/database/migration/20260603040000_session_message_projection_order.ts index bb539d93d2dc..718f549d1390 100644 --- a/packages/core/src/database/migration/20260603040000_session_message_projection_order.ts +++ b/packages/core/src/database/migration/20260603040000_session_message_projection_order.ts @@ -7,14 +7,23 @@ export default { return Effect.gen(function* () { yield* tx.run(`ALTER TABLE \`session_message\` ADD COLUMN \`seq\` integer NOT NULL DEFAULT 0;`) yield* tx.run( - `UPDATE \`session_message\` SET \`seq\` = COALESCE((SELECT \`seq\` + 1 FROM \`event\` WHERE \`event\`.\`id\` = \`session_message\`.\`id\`), 0);`, + `UPDATE \`session_message\` + SET \`seq\` = COALESCE( + (SELECT \`seq\` FROM \`event\` WHERE \`event\`.\`id\` = \`session_message\`.\`id\`), + ( + SELECT COUNT(*) - 1 + FROM \`session_message\` AS \`ordered\` + WHERE \`ordered\`.\`session_id\` = \`session_message\`.\`session_id\` + AND ( + \`ordered\`.\`time_created\` < \`session_message\`.\`time_created\` + OR ( + \`ordered\`.\`time_created\` = \`session_message\`.\`time_created\` + AND \`ordered\`.\`id\` <= \`session_message\`.\`id\` + ) + ) + ) + );`, ) - const unmatched = yield* tx.get<{ count: number }>( - `SELECT COUNT(*) AS \`count\` FROM \`session_message\` WHERE \`seq\` = 0;`, - ) - if ((unmatched?.count ?? 0) > 0) - return yield* Effect.die("Cannot migrate session_message projections without matching durable events") - yield* tx.run(`UPDATE \`session_message\` SET \`seq\` = \`seq\` - 1;`) yield* tx.run(`DROP INDEX IF EXISTS \`session_message_session_type_time_created_id_idx\`;`) yield* tx.run(`CREATE INDEX \`session_message_session_seq_idx\` ON \`session_message\` (\`session_id\`,\`seq\`);`) yield* tx.run( diff --git a/packages/core/src/database/migration/20260604120000_dedupe_project_worktrees.ts b/packages/core/src/database/migration/20260604120000_dedupe_project_worktrees.ts new file mode 100644 index 000000000000..d47f8273a28e --- /dev/null +++ b/packages/core/src/database/migration/20260604120000_dedupe_project_worktrees.ts @@ -0,0 +1,92 @@ +import { Effect } from "effect" +import type { DatabaseMigration } from "../migration" + +export default { + id: "20260604120000_dedupe_project_worktrees", + up(tx) { + return Effect.gen(function* () { + yield* tx.run(` + CREATE TEMP TABLE \`project_worktree_canonical\` ( + \`old_id\` text PRIMARY KEY, + \`new_id\` text NOT NULL + ); + `) + yield* tx.run(` + INSERT INTO \`project_worktree_canonical\` (\`old_id\`, \`new_id\`) + SELECT + \`project\`.\`id\`, + ( + SELECT \`canonical\`.\`id\` + FROM \`project\` AS \`canonical\` + WHERE \`canonical\`.\`worktree\` = \`project\`.\`worktree\` + ORDER BY + \`canonical\`.\`time_updated\` DESC, + \`canonical\`.\`time_created\` DESC, + \`canonical\`.\`id\` DESC + LIMIT 1 + ) + FROM \`project\` + WHERE \`project\`.\`worktree\` IN ( + SELECT \`worktree\` + FROM \`project\` + GROUP BY \`worktree\` + HAVING COUNT(*) > 1 + ); + `) + yield* tx.run(` + DELETE FROM \`project_worktree_canonical\` + WHERE \`old_id\` = \`new_id\`; + `) + yield* tx.run(` + UPDATE \`session\` + SET \`project_id\` = ( + SELECT \`new_id\` + FROM \`project_worktree_canonical\` + WHERE \`old_id\` = \`session\`.\`project_id\` + ) + WHERE \`project_id\` IN ( + SELECT \`old_id\` + FROM \`project_worktree_canonical\` + ); + `) + yield* tx.run(` + UPDATE \`workspace\` + SET \`project_id\` = ( + SELECT \`new_id\` + FROM \`project_worktree_canonical\` + WHERE \`old_id\` = \`workspace\`.\`project_id\` + ) + WHERE \`project_id\` IN ( + SELECT \`old_id\` + FROM \`project_worktree_canonical\` + ); + `) + yield* tx.run(` + INSERT OR IGNORE INTO \`project_directory\` (\`project_id\`, \`directory\`, \`type\`, \`time_created\`) + SELECT + \`project_worktree_canonical\`.\`new_id\`, + \`project_directory\`.\`directory\`, + \`project_directory\`.\`type\`, + \`project_directory\`.\`time_created\` + FROM \`project_directory\` + JOIN \`project_worktree_canonical\` + ON \`project_worktree_canonical\`.\`old_id\` = \`project_directory\`.\`project_id\`; + `) + yield* tx.run(` + DELETE FROM \`project_directory\` + WHERE \`project_id\` IN ( + SELECT \`old_id\` + FROM \`project_worktree_canonical\` + ); + `) + yield* tx.run(` + DELETE FROM \`project\` + WHERE \`id\` IN ( + SELECT \`old_id\` + FROM \`project_worktree_canonical\` + ); + `) + yield* tx.run(`DROP TABLE \`project_worktree_canonical\`;`) + }) + }, +} satisfies DatabaseMigration.Migration diff --git a/packages/core/test/database-migration.test.ts b/packages/core/test/database-migration.test.ts index 0a3ce9ef574c..bf76fdb90092 100644 --- a/packages/core/test/database-migration.test.ts +++ b/packages/core/test/database-migration.test.ts @@ -10,6 +10,7 @@ import { DatabaseMigration } from "@opencode-ai/core/database/migration" import sessionUsageMigration from "@opencode-ai/core/database/migration/20260510033149_session_usage" import normalizeStoragePathsMigration from "@opencode-ai/core/database/migration/20260601010001_normalize_storage_paths" import sessionMessageProjectionOrderMigration from "@opencode-ai/core/database/migration/20260603040000_session_message_projection_order" +import dedupeProjectWorktreesMigration from "@opencode-ai/core/database/migration/20260604120000_dedupe_project_worktrees" import { ProjectV2 } from "@opencode-ai/core/project" import { ProjectTable } from "@opencode-ai/core/project/sql" import { AbsolutePath } from "@opencode-ai/core/schema" @@ -62,7 +63,7 @@ describe("DatabaseMigration", () => { expect( yield* db.get(sql`SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'session_input'`), ).toEqual({ name: "session_input" }) - expect(yield* db.get(sql`SELECT count(*) as count FROM migration`)).toEqual({ count: 29 }) + expect(yield* db.get(sql`SELECT count(*) as count FROM migration`)).toEqual({ count: 30 }) expect( yield* db.all( sql`SELECT name FROM sqlite_master WHERE type = 'index' AND name IN ('event_aggregate_seq_idx', 'event_aggregate_type_seq_idx', 'session_input_session_pending_seq_idx', 'session_input_session_pending_delivery_seq_idx', 'session_message_session_idx', 'session_message_session_type_idx', 'session_message_session_seq_idx', 'session_message_session_type_seq_idx', 'session_message_session_time_created_id_idx') ORDER BY name`, @@ -108,23 +109,71 @@ describe("DatabaseMigration", () => { ) }) - test("fails projected Session message order backfill without a durable event", async () => { - await expect( - run( - Effect.gen(function* () { - const db = yield* makeDb - yield* db.run(sql`CREATE TABLE event (id text PRIMARY KEY, seq integer NOT NULL)`) - yield* db.run( - sql`CREATE TABLE session_message (id text PRIMARY KEY, session_id text NOT NULL, type text NOT NULL, time_created integer NOT NULL, data text NOT NULL)`, - ) - yield* db.run( - sql`INSERT INTO session_message (id, session_id, type, time_created, data) VALUES ('evt_missing', 'session', 'user', 0, '{}')`, - ) - - yield* DatabaseMigration.applyOnly(db, [sessionMessageProjectionOrderMigration]) - }), - ), - ).rejects.toThrow("Cannot migrate session_message projections without matching durable events") + test("falls back to projected Session message order without durable events", async () => { + await run( + Effect.gen(function* () { + const db = yield* makeDb + yield* db.run(sql`CREATE TABLE event (id text PRIMARY KEY, seq integer NOT NULL)`) + yield* db.run( + sql`CREATE TABLE session_message (id text PRIMARY KEY, session_id text NOT NULL, type text NOT NULL, time_created integer NOT NULL, data text NOT NULL)`, + ) + yield* db.run( + sql`INSERT INTO session_message (id, session_id, type, time_created, data) VALUES ('evt_later', 'session', 'user', 2, '{}'), ('evt_same_b', 'session', 'user', 1, '{}'), ('evt_same_a', 'session', 'user', 1, '{}')`, + ) + + yield* DatabaseMigration.applyOnly(db, [sessionMessageProjectionOrderMigration]) + + expect(yield* db.all(sql`SELECT id, seq FROM session_message ORDER BY seq`)).toEqual([ + { id: "evt_same_a", seq: 0 }, + { id: "evt_same_b", seq: 1 }, + { id: "evt_later", seq: 2 }, + ]) + }), + ) + }) + + test("deduplicates projects with the same worktree", async () => { + await run( + Effect.gen(function* () { + const db = yield* makeDb + yield* db.run( + sql`CREATE TABLE project (id text PRIMARY KEY, worktree text NOT NULL, time_created integer NOT NULL, time_updated integer NOT NULL)`, + ) + yield* db.run( + sql`CREATE TABLE session (id text PRIMARY KEY, project_id text NOT NULL, directory text NOT NULL)`, + ) + yield* db.run(sql`CREATE TABLE workspace (id text PRIMARY KEY, project_id text NOT NULL)`) + yield* db.run( + sql`CREATE TABLE project_directory (project_id text NOT NULL, directory text NOT NULL, type text NOT NULL, time_created integer NOT NULL, PRIMARY KEY (project_id, directory))`, + ) + yield* db.run( + sql`INSERT INTO project (id, worktree, time_created, time_updated) VALUES ('old', '/repo', 1, 10), ('new', '/repo', 2, 20), ('other', '/other', 3, 30)`, + ) + yield* db.run( + sql`INSERT INTO session (id, project_id, directory) VALUES ('old-session', 'old', '/repo'), ('new-session', 'new', '/repo'), ('other-session', 'other', '/other')`, + ) + yield* db.run(sql`INSERT INTO workspace (id, project_id) VALUES ('old-workspace', 'old')`) + yield* db.run( + sql`INSERT INTO project_directory (project_id, directory, type, time_created) VALUES ('old', '/repo/packages/app', 'root', 11), ('old', '/repo', 'main', 12), ('new', '/repo', 'main', 22)`, + ) + + yield* DatabaseMigration.applyOnly(db, [dedupeProjectWorktreesMigration]) + + expect(yield* db.all(sql`SELECT id, project_id FROM session ORDER BY id`)).toEqual([ + { id: "new-session", project_id: "new" }, + { id: "old-session", project_id: "new" }, + { id: "other-session", project_id: "other" }, + ]) + expect(yield* db.all(sql`SELECT id, project_id FROM workspace`)).toEqual([ + { id: "old-workspace", project_id: "new" }, + ]) + expect(yield* db.all(sql`SELECT id FROM project ORDER BY id`)).toEqual([{ id: "new" }, { id: "other" }]) + expect(yield* db.all(sql`SELECT project_id, directory FROM project_directory ORDER BY directory`)).toEqual([ + { project_id: "new", directory: "/repo" }, + { project_id: "new", directory: "/repo/packages/app" }, + ]) + }), + ) }) test("runs session usage backfill in order with schema changes", async () => { diff --git a/packages/opencode/src/project/project.ts b/packages/opencode/src/project/project.ts index 80cde325e210..586624786829 100644 --- a/packages/opencode/src/project/project.ts +++ b/packages/opencode/src/project/project.ts @@ -1,4 +1,4 @@ -import { and, eq, sql } from "drizzle-orm" +import { and, eq, ne, sql } from "drizzle-orm" import { Database } from "@opencode-ai/core/database/database" import { ProjectDirectoryTable, ProjectTable } from "@opencode-ai/core/project/sql" import { SessionTable } from "@opencode-ai/core/session/sql" @@ -209,6 +209,20 @@ export const layer = Layer.effect( .set({ project_id: newID }) .where(eq(WorkspaceTable.project_id, oldID)) .run() + yield* d.run(sql` + INSERT OR IGNORE INTO project_directory (project_id, directory, type, time_created) + SELECT + ${newID}, + directory, + type, + time_created + FROM project_directory + WHERE project_id = ${oldID} + `) + yield* d + .delete(ProjectDirectoryTable) + .where(eq(ProjectDirectoryTable.project_id, oldID)) + .run() if (oldProject) yield* d.delete(ProjectTable).where(eq(ProjectTable.id, oldID)).run() }), @@ -217,6 +231,20 @@ export const layer = Layer.effect( .pipe(Effect.orDie) }) + const migrateProjectWorktreeIds = Effect.fn("Project.migrateProjectWorktreeIds")(function* ( + worktree: string, + projectID: ProjectV2.ID, + ) { + if (projectID === ProjectV2.ID.global) return + const rows = yield* db + .select({ id: ProjectTable.id }) + .from(ProjectTable) + .where(and(eq(ProjectTable.worktree, AbsolutePath.make(worktree)), ne(ProjectTable.id, projectID))) + .all() + .pipe(Effect.orDie) + yield* Effect.forEach(rows, (row) => migrateProjectId(row.id, projectID), { discard: true }) + }) + const saveProjectDirectory = Effect.fn("Project.saveProjectDirectory")(function* (input: { projectID: ProjectV2.ID directory: string @@ -260,6 +288,7 @@ export const layer = Layer.effect( // Phase 2: upsert const projectID = ProjectV2.ID.make(data.id) yield* migrateProjectId(data.previous ? ProjectV2.ID.make(data.previous) : undefined, projectID) + yield* migrateProjectWorktreeIds(worktree, projectID) const row = yield* db.select().from(ProjectTable).where(eq(ProjectTable.id, projectID)).get().pipe(Effect.orDie) const existing = row ? fromRow(row) diff --git a/packages/opencode/test/project/project.test.ts b/packages/opencode/test/project/project.test.ts index 09e12eb6be37..fa839a26fe91 100644 --- a/packages/opencode/test/project/project.test.ts +++ b/packages/opencode/test/project/project.test.ts @@ -7,7 +7,7 @@ import path from "path" import { tmpdirScoped } from "../fixture/fixture" import { GlobalBus } from "../../src/bus/global" import { Database } from "@opencode-ai/core/database/database" -import { ProjectTable } from "@opencode-ai/core/project/sql" +import { ProjectDirectoryTable, ProjectTable } from "@opencode-ai/core/project/sql" import { SessionTable } from "@opencode-ai/core/session/sql" import { WorkspaceTable } from "@opencode-ai/core/control-plane/workspace.sql" import { eq } from "drizzle-orm" @@ -244,6 +244,72 @@ describe("Project.fromDirectory", () => { ).toBe(remoteID) }), ) + + it.live("migrates same-worktree project data when cache already points at remote ID", () => + Effect.gen(function* () { + const { db } = yield* Database.Service + const tmp = yield* tmpdirScoped({ git: true }) + const projects = yield* Project.Service + const rootResult = yield* projects.fromDirectory(tmp) + const rootProject = rootResult.project + const remoteID = remoteProjectID("github.com/acme/restored") + const sessionID = crypto.randomUUID() as SessionID + const workspaceID = WorkspaceV2.ID.ascending() + const directory = path.join(tmp, "packages", "app") + + yield* db + .insert(SessionTable) + .values({ + id: sessionID, + project_id: rootProject.id, + slug: sessionID, + directory: tmp, + title: "test", + version: "0.0.0-test", + time_created: Date.now(), + time_updated: Date.now(), + }) + .run() + .pipe(Effect.orDie) + yield* db + .insert(WorkspaceTable) + .values({ id: workspaceID, type: "local", name: "test", project_id: rootProject.id }) + .run() + .pipe(Effect.orDie) + yield* db + .insert(ProjectDirectoryTable) + .values({ project_id: rootProject.id, directory, type: "root", time_created: Date.now() }) + .run() + .pipe(Effect.orDie) + yield* Effect.promise(() => $`git remote add origin git@github.com:acme/restored.git`.cwd(tmp).quiet()) + yield* Effect.promise(() => Bun.write(path.join(tmp, ".git", "opencode"), remoteID)) + + const result = yield* projects.fromDirectory(tmp) + + expect(result.project.id).toBe(remoteID) + expect( + yield* db.select().from(ProjectTable).where(eq(ProjectTable.id, rootProject.id)).get().pipe(Effect.orDie), + ).toBeUndefined() + expect( + (yield* db.select().from(SessionTable).where(eq(SessionTable.id, sessionID)).get().pipe(Effect.orDie)) + ?.project_id, + ).toBe(remoteID) + expect( + (yield* db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, workspaceID)).get().pipe(Effect.orDie)) + ?.project_id, + ).toBe(remoteID) + expect( + ( + yield* db + .select() + .from(ProjectDirectoryTable) + .where(eq(ProjectDirectoryTable.directory, directory)) + .get() + .pipe(Effect.orDie) + )?.project_id, + ).toBe(remoteID) + }), + ) }) describe("Project.fromDirectory git failure paths", () => {