diff --git a/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts b/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts index dff741cab0f..07a4f1d3592 100644 --- a/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts +++ b/apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts @@ -7,7 +7,19 @@ import { getSecretStore } from "../secrets/secretStore.server"; import { ClickhouseConnectionSchema } from "../clickhouse/clickhouseSecretSchemas.server"; export class OrganizationDataStoresRegistry { - private _prisma: PrismaClient | PrismaReplicaClient; + /** + * Writer client — used by every method that mutates state + * (`addDataStore` / `updateDataStore` / `deleteDataStore` and their backing + * SecretStore writes). Must be the primary connection; replica-targeted + * writes are rejected by Postgres with code 25006 (read-only transaction). + */ + private _writer: PrismaClient; + /** + * Read client used by the polling `loadFromDatabase()` (and its + * `SecretStore.getSecret` lookups). Can be a replica — these are + * cache-fillers, not on hot user-facing paths. + */ + private _replica: PrismaClient | PrismaReplicaClient; /** Keyed by `${organizationId}:${kind}` */ private _lookup: Map = new Map(); private _loaded = false; @@ -21,8 +33,9 @@ export class OrganizationDataStoresRegistry { */ readonly isReady: Promise; - constructor(prisma: PrismaClient | PrismaReplicaClient) { - this._prisma = prisma; + constructor(writer: PrismaClient, replica: PrismaClient | PrismaReplicaClient) { + this._writer = writer; + this._replica = replica; this.isReady = new Promise((resolve) => { this._readyResolve = resolve; }); @@ -37,10 +50,10 @@ export class OrganizationDataStoresRegistry { // same `${orgId}:${kind}` appears in multiple rows. The registry must never // throw on overlap — failing the load would break every customer, not just the // misconfigured orgs — so we keep the first entry and log an error instead. - const rows = await this._prisma.organizationDataStore.findMany({ + const rows = await this._replica.organizationDataStore.findMany({ orderBy: { key: "asc" }, }); - const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); + const secretStore = getSecretStore("DATABASE", { prismaClient: this._replica }); const lookup = new Map(); /** Tracks which row's `key` already owns each `${orgId}:${kind}` so we can log conflicts. */ @@ -125,10 +138,10 @@ export class OrganizationDataStoresRegistry { }) { const secretKey = this.#secretKey(key, kind); - const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); + const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer }); await secretStore.setSecret(secretKey, config); - return this._prisma.organizationDataStore.create({ + return this._writer.organizationDataStore.create({ data: { key, organizationIds, @@ -153,11 +166,11 @@ export class OrganizationDataStoresRegistry { const secretKey = this.#secretKey(key, kind); if (config) { - const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); + const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer }); await secretStore.setSecret(secretKey, config); } - return this._prisma.organizationDataStore.update({ + return this._writer.organizationDataStore.update({ where: { key, }, @@ -170,12 +183,12 @@ export class OrganizationDataStoresRegistry { async deleteDataStore({ key, kind }: { key: string; kind: DataStoreKind }) { const secretKey = this.#secretKey(key, kind); - const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); + const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer }); await secretStore.deleteSecret(secretKey).catch(() => { // Secret may not exist — proceed with deletion }); - await this._prisma.organizationDataStore.delete({ where: { key } }); + await this._writer.organizationDataStore.delete({ where: { key } }); } /** diff --git a/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts b/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts index 24ec572c5d5..239683965ea 100644 --- a/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts +++ b/apps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.ts @@ -1,5 +1,5 @@ import pRetry from "p-retry"; -import { $replica } from "~/db.server"; +import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; import { signalsEmitter } from "~/services/signals.server"; @@ -7,7 +7,7 @@ import { singleton } from "~/utils/singleton"; import { OrganizationDataStoresRegistry } from "./organizationDataStoresRegistry.server"; export const organizationDataStoresRegistry = singleton("organizationDataStoresRegistry", () => { - const registry = new OrganizationDataStoresRegistry($replica); + const registry = new OrganizationDataStoresRegistry(prisma, $replica); // Runs as soon as this singleton is created (first import of this module). The // registry’s `isReady` promise resolves when this eventually succeeds. diff --git a/apps/webapp/test/clickhouseFactory.test.ts b/apps/webapp/test/clickhouseFactory.test.ts index 501462ab677..7f19ea1f218 100644 --- a/apps/webapp/test/clickhouseFactory.test.ts +++ b/apps/webapp/test/clickhouseFactory.test.ts @@ -16,7 +16,7 @@ describe("ClickHouse Factory", () => { postgresTest( "returns default client when org has no data store", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.loadFromDatabase(); const factory = new ClickhouseFactory(registry); @@ -28,7 +28,7 @@ describe("ClickHouse Factory", () => { postgresTest( "returns org-specific client when a data store is configured", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "factory-store", @@ -52,7 +52,7 @@ describe("ClickHouse Factory", () => { postgresTest( "two orgs sharing the same data store get the same cached client", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "shared-factory-store", @@ -75,7 +75,7 @@ describe("ClickHouse Factory", () => { postgresTest( "two data stores with different URLs produce different clients", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "store-a", @@ -104,7 +104,7 @@ describe("ClickHouse Factory", () => { postgresTest( "after reload with a deleted store, org falls back to default", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "removable-store", diff --git a/apps/webapp/test/organizationDataStoresRegistry.test.ts b/apps/webapp/test/organizationDataStoresRegistry.test.ts index f8ff9dde41f..9d94e0447fe 100644 --- a/apps/webapp/test/organizationDataStoresRegistry.test.ts +++ b/apps/webapp/test/organizationDataStoresRegistry.test.ts @@ -13,19 +13,19 @@ const TEST_URL_2 = "https://default:password@clickhouse2.example.com:8443"; describe("OrganizationDataStoresRegistry", () => { postgresTest("isLoaded is false before loadFromDatabase", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); expect(registry.isLoaded).toBe(false); expect(registry.get("any-org", "CLICKHOUSE")).toBeNull(); }); postgresTest("isLoaded is true after loadFromDatabase", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.loadFromDatabase(); expect(registry.isLoaded).toBe(true); }); postgresTest("isReady resolves after loadFromDatabase", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); let resolved = false; registry.isReady.then(() => { resolved = true; @@ -36,13 +36,13 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("get returns null when no data stores exist", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.loadFromDatabase(); expect(registry.get("org-1", "CLICKHOUSE")).toBeNull(); }); postgresTest("addDataStore creates a row and stores the secret", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "test-store", @@ -63,7 +63,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("loadFromDatabase resolves secrets and makes orgs available via get", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "hipaa-store", @@ -81,7 +81,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("get returns null for orgs not in any data store", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "partial-store", @@ -97,7 +97,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("multiple orgs can share the same data store", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "shared-store", @@ -120,7 +120,7 @@ describe("OrganizationDataStoresRegistry", () => { postgresTest( "when an org appears in multiple data stores, first row by id asc wins", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); const sharedOrg = "org-dup-overlap"; await registry.addDataStore({ @@ -151,7 +151,7 @@ describe("OrganizationDataStoresRegistry", () => { ); postgresTest("updateDataStore updates organizationIds and rotates the secret", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "update-store", @@ -176,7 +176,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("reload picks up changes made after initial load", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.loadFromDatabase(); expect(registry.get("org-reload", "CLICKHOUSE")).toBeNull(); @@ -194,7 +194,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("deleteDataStore removes the row and its secret", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "delete-store", @@ -210,7 +210,7 @@ describe("OrganizationDataStoresRegistry", () => { }); postgresTest("after delete and reload, org no longer has a data store", async ({ prisma }) => { - const registry = new OrganizationDataStoresRegistry(prisma); + const registry = new OrganizationDataStoresRegistry(prisma, prisma); await registry.addDataStore({ key: "ephemeral-store",