Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,19 @@ import { getSecretStore } from "../secrets/secretStore.server";
import { ClickhouseConnectionSchema } from "../clickhouse/clickhouseSecretSchemas.server";

export class OrganizationDataStoresRegistry {
private _prisma: PrismaClient | PrismaReplicaClient;
/**
* Writer client — used by every method that mutates state
* (`addDataStore` / `updateDataStore` / `deleteDataStore` and their backing
* SecretStore writes). Must be the primary connection; replica-targeted
* writes are rejected by Postgres with code 25006 (read-only transaction).
*/
private _writer: PrismaClient;
/**
* Read client used by the polling `loadFromDatabase()` (and its
* `SecretStore.getSecret` lookups). Can be a replica — these are
* cache-fillers, not on hot user-facing paths.
*/
private _replica: PrismaClient | PrismaReplicaClient;
/** Keyed by `${organizationId}:${kind}` */
private _lookup: Map<string, ParsedDataStore> = new Map();
private _loaded = false;
Expand All @@ -21,8 +33,9 @@ export class OrganizationDataStoresRegistry {
*/
readonly isReady: Promise<void>;

constructor(prisma: PrismaClient | PrismaReplicaClient) {
this._prisma = prisma;
constructor(writer: PrismaClient, replica: PrismaClient | PrismaReplicaClient) {
this._writer = writer;
this._replica = replica;
this.isReady = new Promise<void>((resolve) => {
this._readyResolve = resolve;
});
Expand All @@ -37,10 +50,10 @@ export class OrganizationDataStoresRegistry {
// same `${orgId}:${kind}` appears in multiple rows. The registry must never
// throw on overlap — failing the load would break every customer, not just the
// misconfigured orgs — so we keep the first entry and log an error instead.
const rows = await this._prisma.organizationDataStore.findMany({
const rows = await this._replica.organizationDataStore.findMany({
orderBy: { key: "asc" },
});
const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma });
const secretStore = getSecretStore("DATABASE", { prismaClient: this._replica });
Comment thread
matt-aitken marked this conversation as resolved.

const lookup = new Map<string, ParsedDataStore>();
/** Tracks which row's `key` already owns each `${orgId}:${kind}` so we can log conflicts. */
Expand Down Expand Up @@ -125,10 +138,10 @@ export class OrganizationDataStoresRegistry {
}) {
const secretKey = this.#secretKey(key, kind);

const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma });
const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer });
await secretStore.setSecret(secretKey, config);

return this._prisma.organizationDataStore.create({
return this._writer.organizationDataStore.create({
data: {
key,
organizationIds,
Expand All @@ -153,11 +166,11 @@ export class OrganizationDataStoresRegistry {
const secretKey = this.#secretKey(key, kind);

if (config) {
const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma });
const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer });
await secretStore.setSecret(secretKey, config);
}

return this._prisma.organizationDataStore.update({
return this._writer.organizationDataStore.update({
where: {
key,
},
Expand All @@ -170,12 +183,12 @@ export class OrganizationDataStoresRegistry {

async deleteDataStore({ key, kind }: { key: string; kind: DataStoreKind }) {
const secretKey = this.#secretKey(key, kind);
const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma });
const secretStore = getSecretStore("DATABASE", { prismaClient: this._writer });
await secretStore.deleteSecret(secretKey).catch(() => {
// Secret may not exist — proceed with deletion
});

await this._prisma.organizationDataStore.delete({ where: { key } });
await this._writer.organizationDataStore.delete({ where: { key } });
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import pRetry from "p-retry";
import { $replica } from "~/db.server";
import { $replica, prisma } from "~/db.server";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { signalsEmitter } from "~/services/signals.server";
import { singleton } from "~/utils/singleton";
import { OrganizationDataStoresRegistry } from "./organizationDataStoresRegistry.server";

export const organizationDataStoresRegistry = singleton("organizationDataStoresRegistry", () => {
const registry = new OrganizationDataStoresRegistry($replica);
const registry = new OrganizationDataStoresRegistry(prisma, $replica);

// Runs as soon as this singleton is created (first import of this module). The
// registry’s `isReady` promise resolves when this eventually succeeds.
Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/test/clickhouseFactory.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"returns default client when org has no data store",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);
await registry.loadFromDatabase();

const factory = new ClickhouseFactory(registry);
Expand All @@ -28,7 +28,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"returns org-specific client when a data store is configured",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "factory-store",
Expand All @@ -52,7 +52,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"two orgs sharing the same data store get the same cached client",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "shared-factory-store",
Expand All @@ -75,7 +75,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"two data stores with different URLs produce different clients",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "store-a",
Expand Down Expand Up @@ -104,7 +104,7 @@ describe("ClickHouse Factory", () => {
postgresTest(
"after reload with a deleted store, org falls back to default",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "removable-store",
Expand Down
26 changes: 13 additions & 13 deletions apps/webapp/test/organizationDataStoresRegistry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ const TEST_URL_2 = "https://default:password@clickhouse2.example.com:8443";

describe("OrganizationDataStoresRegistry", () => {
postgresTest("isLoaded is false before loadFromDatabase", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);
expect(registry.isLoaded).toBe(false);
expect(registry.get("any-org", "CLICKHOUSE")).toBeNull();
});

postgresTest("isLoaded is true after loadFromDatabase", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);
await registry.loadFromDatabase();
expect(registry.isLoaded).toBe(true);
});

postgresTest("isReady resolves after loadFromDatabase", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);
let resolved = false;
registry.isReady.then(() => {
resolved = true;
Expand All @@ -36,13 +36,13 @@ describe("OrganizationDataStoresRegistry", () => {
});

postgresTest("get returns null when no data stores exist", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);
await registry.loadFromDatabase();
expect(registry.get("org-1", "CLICKHOUSE")).toBeNull();
});

postgresTest("addDataStore creates a row and stores the secret", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "test-store",
Expand All @@ -63,7 +63,7 @@ describe("OrganizationDataStoresRegistry", () => {
});

postgresTest("loadFromDatabase resolves secrets and makes orgs available via get", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "hipaa-store",
Expand All @@ -81,7 +81,7 @@ describe("OrganizationDataStoresRegistry", () => {
});

postgresTest("get returns null for orgs not in any data store", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "partial-store",
Expand All @@ -97,7 +97,7 @@ describe("OrganizationDataStoresRegistry", () => {
});

postgresTest("multiple orgs can share the same data store", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "shared-store",
Expand All @@ -120,7 +120,7 @@ describe("OrganizationDataStoresRegistry", () => {
postgresTest(
"when an org appears in multiple data stores, first row by id asc wins",
async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);
const sharedOrg = "org-dup-overlap";

await registry.addDataStore({
Expand Down Expand Up @@ -151,7 +151,7 @@ describe("OrganizationDataStoresRegistry", () => {
);

postgresTest("updateDataStore updates organizationIds and rotates the secret", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "update-store",
Expand All @@ -176,7 +176,7 @@ describe("OrganizationDataStoresRegistry", () => {
});

postgresTest("reload picks up changes made after initial load", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);
await registry.loadFromDatabase();
expect(registry.get("org-reload", "CLICKHOUSE")).toBeNull();

Expand All @@ -194,7 +194,7 @@ describe("OrganizationDataStoresRegistry", () => {
});

postgresTest("deleteDataStore removes the row and its secret", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "delete-store",
Expand All @@ -210,7 +210,7 @@ describe("OrganizationDataStoresRegistry", () => {
});

postgresTest("after delete and reload, org no longer has a data store", async ({ prisma }) => {
const registry = new OrganizationDataStoresRegistry(prisma);
const registry = new OrganizationDataStoresRegistry(prisma, prisma);

await registry.addDataStore({
key: "ephemeral-store",
Expand Down