From 2b071d50a5f48ce7476a5e7cc58b6224d5eae0a9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 13 May 2026 16:54:15 +0100 Subject: [PATCH 1/5] fix(webapp): auto-recover replication services after stream errors When the underlying logical-replication client errored (e.g. after a Postgres failover), the runs and sessions replication services logged the error and left the stream stopped. The host process kept running, the WAL backed up, and ClickHouse silently fell behind. Both services now run a configurable recovery strategy on stream errors, defaulting to in-process reconnect with exponential backoff so a fresh self-hosted setup heals on its own: - "reconnect" (default) re-subscribes via the existing subscribe(lastLsn) path with exponential backoff (1s -> 60s cap, unlimited attempts), which re-validates the publication, re-acquires the leader lock, and resumes from the last acknowledged LSN. - "exit" calls process.exit after a short flush window so a host's supervisor (Docker restart=always, systemd, k8s, etc.) can replace the process. - "log" preserves the historical behaviour. Per-service strategy + exit knobs are env-driven via RUN_REPLICATION_ERROR_STRATEGY / SESSION_REPLICATION_ERROR_STRATEGY plus matching *_EXIT_DELAY_MS / *_EXIT_CODE. Reconnect tuning is shared across both services via REPLICATION_RECONNECT_INITIAL_DELAY_MS / _MAX_DELAY_MS / _MAX_ATTEMPTS (0 = unlimited). --- .server-changes/replication-error-recovery.md | 6 + apps/webapp/app/env.server.ts | 23 ++ .../replicationErrorRecovery.server.ts | 190 +++++++++++ .../runsReplicationInstance.server.ts | 9 + .../services/runsReplicationService.server.ts | 21 ++ .../sessionsReplicationInstance.server.ts | 9 + .../sessionsReplicationService.server.ts | 21 ++ ...nsReplicationService.errorRecovery.test.ts | 303 ++++++++++++++++++ 8 files changed, 582 insertions(+) create mode 100644 .server-changes/replication-error-recovery.md create mode 100644 apps/webapp/app/services/replicationErrorRecovery.server.ts create mode 100644 apps/webapp/test/runsReplicationService.errorRecovery.test.ts diff --git a/.server-changes/replication-error-recovery.md b/.server-changes/replication-error-recovery.md new file mode 100644 index 00000000000..f5c8dfc6223 --- /dev/null +++ b/.server-changes/replication-error-recovery.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Runs and sessions replication services now auto-recover from stream errors (e.g. after a Postgres failover) instead of silently leaving replication stopped. Behaviour is configurable per service — reconnect (default), exit so a process supervisor can restart the host, or log. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 8eacb9634e1..4a3d13b6886 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1330,6 +1330,16 @@ const EnvironmentSchema = z RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"), RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"), RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"), + // What to do when the runs replication client errors (e.g. after a + // Postgres failover). `reconnect` (default) re-subscribes in-process with + // exponential backoff; `exit` exits the process so a supervisor restarts + // it; `log` preserves the old no-op behaviour. Reconnect tuning is + // shared across both replication services via REPLICATION_RECONNECT_*. + RUN_REPLICATION_ERROR_STRATEGY: z + .enum(["reconnect", "exit", "log"]) + .default("reconnect"), + RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000), + RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1), // Session replication (Postgres → ClickHouse sessions_v1). Shares Redis // with the runs replicator for leader locking but has its own slot and @@ -1362,6 +1372,19 @@ const EnvironmentSchema = z SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3), SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100), SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), + // Error recovery — same semantics as RUN_REPLICATION_ERROR_STRATEGY. + SESSION_REPLICATION_ERROR_STRATEGY: z + .enum(["reconnect", "exit", "log"]) + .default("reconnect"), + SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000), + SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1), + + // Reconnect tuning shared across both replication services. Only + // applies when error strategy is `reconnect`. Max attempts of 0 means + // unlimited (default). + REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().default(1_000), + REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().default(60_000), + REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().default(0), // Clickhouse CLICKHOUSE_URL: z.string(), diff --git a/apps/webapp/app/services/replicationErrorRecovery.server.ts b/apps/webapp/app/services/replicationErrorRecovery.server.ts new file mode 100644 index 00000000000..ea3f2c0bf47 --- /dev/null +++ b/apps/webapp/app/services/replicationErrorRecovery.server.ts @@ -0,0 +1,190 @@ +import { Logger } from "@trigger.dev/core/logger"; + +// When the LogicalReplicationClient's WAL stream errors (e.g. after a +// Postgres failover) it calls stop() on itself and stays stopped. The host +// service has to decide how to recover. Three strategies are available: +// +// - "reconnect" — re-subscribe in-process with exponential backoff. Default; +// works without a process supervisor. +// - "exit" — exit the process so an external supervisor (Docker +// restart=always, ECS, systemd, k8s, ...) replaces it. Recommended when a +// supervisor is present because it gets a clean slate every time. +// - "log" — preserve the historical no-op behaviour. Useful for +// debugging or in test environments where you want to observe the +// silent-death failure mode. +export type ReplicationErrorRecoveryStrategy = + | { + type: "reconnect"; + initialDelayMs?: number; + maxDelayMs?: number; + // 0 (or undefined) means retry forever. + maxAttempts?: number; + } + | { + type: "exit"; + exitDelayMs?: number; + exitCode?: number; + } + | { type: "log" }; + +export interface ReplicationErrorRecoveryDeps { + strategy: ReplicationErrorRecoveryStrategy; + logger: Logger; + // Re-subscribe the underlying replication client. Implementations should + // call client.subscribe(lastAcknowledgedLsn) and resolve once that returns. + reconnect: () => Promise; + // True once the host service has begun graceful shutdown — recovery + // suppresses all work in that state. + isShuttingDown: () => boolean; +} + +export interface ReplicationErrorRecovery { + // Called from the replication client's "error" event handler. + handle(error: unknown): void; + // Called from the replication client's "start" event handler. Resets the + // reconnect attempt counter so the next failure starts from initialDelayMs. + notifyStreamStarted(): void; + // Cancel any pending reconnect/exit timer. Called from shutdown(). + dispose(): void; +} + +export function createReplicationErrorRecovery( + deps: ReplicationErrorRecoveryDeps +): ReplicationErrorRecovery { + const { strategy, logger, reconnect, isShuttingDown } = deps; + let attempt = 0; + let pendingReconnect: NodeJS.Timeout | null = null; + let pendingExit: NodeJS.Timeout | null = null; + let exiting = false; + + function scheduleReconnect(error: unknown): void { + if (strategy.type !== "reconnect") return; + if (pendingReconnect) return; + + attempt += 1; + const maxAttempts = strategy.maxAttempts ?? 0; + if (maxAttempts > 0 && attempt > maxAttempts) { + logger.error("Replication reconnect exceeded maxAttempts; giving up", { + attempt, + maxAttempts, + error, + }); + return; + } + + const initialDelay = strategy.initialDelayMs ?? 1_000; + const maxDelay = strategy.maxDelayMs ?? 60_000; + const delay = Math.min(initialDelay * Math.pow(2, attempt - 1), maxDelay); + + logger.error("Replication stream lost — scheduling reconnect", { + attempt, + delayMs: delay, + error, + }); + + pendingReconnect = setTimeout(async () => { + pendingReconnect = null; + if (isShuttingDown()) return; + + try { + await reconnect(); + // Success path is handled by notifyStreamStarted, which fires from + // the replication client's "start" event after the stream is live. + } catch (err) { + // subscribe() emits an "error" event of its own on failure, so the + // next attempt is scheduled by handle(). Log here anyway so reconnect + // failures stay visible even if the error event is suppressed. + logger.error("Replication reconnect attempt failed", { + attempt, + error: err, + }); + } + }, delay); + } + + function scheduleExit(): void { + if (strategy.type !== "exit") return; + if (exiting) return; + exiting = true; + + const delay = strategy.exitDelayMs ?? 5_000; + const code = strategy.exitCode ?? 1; + + logger.error("Fatal replication error — exiting to let process supervisor restart", { + exitCode: code, + exitDelayMs: delay, + }); + + pendingExit = setTimeout(() => { + // eslint-disable-next-line no-process-exit + process.exit(code); + }, delay); + // Don't hold a clean shutdown back on this timer. + pendingExit.unref(); + } + + return { + handle(error) { + if (isShuttingDown()) return; + switch (strategy.type) { + case "log": + return; + case "exit": + return scheduleExit(); + case "reconnect": + return scheduleReconnect(error); + } + }, + notifyStreamStarted() { + if (attempt > 0) { + logger.info("Replication reconnect succeeded", { attempt }); + attempt = 0; + } + }, + dispose() { + if (pendingReconnect) { + clearTimeout(pendingReconnect); + pendingReconnect = null; + } + if (pendingExit) { + clearTimeout(pendingExit); + pendingExit = null; + } + }, + }; +} + +// Shape of the env-driven configuration object the instance bootstrap files +// build from process.env. Kept separate from the strategy union above so the +// instance code can pass a single object regardless of which strategy is set. +export type ReplicationErrorRecoveryEnv = { + strategy: "reconnect" | "exit" | "log"; + reconnectInitialDelayMs?: number; + reconnectMaxDelayMs?: number; + reconnectMaxAttempts?: number; + exitDelayMs?: number; + exitCode?: number; +}; + +export function strategyFromEnv( + env: ReplicationErrorRecoveryEnv +): ReplicationErrorRecoveryStrategy { + switch (env.strategy) { + case "exit": + return { + type: "exit", + exitDelayMs: env.exitDelayMs, + exitCode: env.exitCode, + }; + case "log": + return { type: "log" }; + case "reconnect": + default: + return { + type: "reconnect", + initialDelayMs: env.reconnectInitialDelayMs, + maxDelayMs: env.reconnectMaxDelayMs, + maxAttempts: env.reconnectMaxAttempts, + }; + } +} diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 0a8ab5e1bde..a614793ccc9 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -3,6 +3,7 @@ import invariant from "tiny-invariant"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { meter, provider } from "~/v3/tracer.server"; +import { strategyFromEnv } from "./replicationErrorRecovery.server"; import { RunsReplicationService } from "./runsReplicationService.server"; import { signalsEmitter } from "./signals.server"; @@ -69,6 +70,14 @@ function initializeRunsReplicationInstance() { insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY, disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1", disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1", + errorRecovery: strategyFromEnv({ + strategy: env.RUN_REPLICATION_ERROR_STRATEGY, + reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS, + reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS, + reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS, + exitDelayMs: env.RUN_REPLICATION_EXIT_DELAY_MS, + exitCode: env.RUN_REPLICATION_EXIT_CODE, + }), }); if (env.RUN_REPLICATION_ENABLED === "1") { diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 167564572eb..b873b4da5d3 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -29,6 +29,11 @@ import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; +import { + createReplicationErrorRecovery, + type ReplicationErrorRecovery, + type ReplicationErrorRecoveryStrategy, +} from "./replicationErrorRecovery.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -73,6 +78,9 @@ export type RunsReplicationServiceOptions = { insertMaxDelayMs?: number; disablePayloadInsert?: boolean; disableErrorFingerprinting?: boolean; + // What to do when the replication client errors (e.g. after a Postgres + // failover). Defaults to in-process reconnect with exponential backoff. + errorRecovery?: ReplicationErrorRecoveryStrategy; }; type PostgresTaskRun = TaskRun & { masterQueue: string }; @@ -119,6 +127,7 @@ export class RunsReplicationService { private _insertStrategy: "insert" | "insert_async"; private _disablePayloadInsert: boolean; private _disableErrorFingerprinting: boolean; + private _errorRecovery: ReplicationErrorRecovery; // Metrics private _replicationLagHistogram: Histogram; @@ -250,14 +259,25 @@ export class RunsReplicationService { } }); + this._errorRecovery = createReplicationErrorRecovery({ + strategy: options.errorRecovery ?? { type: "reconnect" }, + logger: this.logger, + reconnect: async () => { + await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + }, + isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, + }); + this._replicationClient.events.on("error", (error) => { this.logger.error("Replication client error", { error, }); + this._errorRecovery.handle(error); }); this._replicationClient.events.on("start", () => { this.logger.info("Replication client started"); + this._errorRecovery.notifyStreamStarted(); }); this._replicationClient.events.on("acknowledge", ({ lsn }) => { @@ -278,6 +298,7 @@ export class RunsReplicationService { if (this._isShuttingDown) return; this._isShuttingDown = true; + this._errorRecovery.dispose(); this.logger.info("Initiating shutdown of runs replication service"); diff --git a/apps/webapp/app/services/sessionsReplicationInstance.server.ts b/apps/webapp/app/services/sessionsReplicationInstance.server.ts index c6ed1b6b088..5954d50df57 100644 --- a/apps/webapp/app/services/sessionsReplicationInstance.server.ts +++ b/apps/webapp/app/services/sessionsReplicationInstance.server.ts @@ -3,6 +3,7 @@ import invariant from "tiny-invariant"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { meter, provider } from "~/v3/tracer.server"; +import { strategyFromEnv } from "./replicationErrorRecovery.server"; import { SessionsReplicationService } from "./sessionsReplicationService.server"; export const sessionsReplicationInstance = singleton( @@ -66,6 +67,14 @@ function initializeSessionsReplicationInstance() { insertBaseDelayMs: env.SESSION_REPLICATION_INSERT_BASE_DELAY_MS, insertMaxDelayMs: env.SESSION_REPLICATION_INSERT_MAX_DELAY_MS, insertStrategy: env.SESSION_REPLICATION_INSERT_STRATEGY, + errorRecovery: strategyFromEnv({ + strategy: env.SESSION_REPLICATION_ERROR_STRATEGY, + reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS, + reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS, + reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS, + exitDelayMs: env.SESSION_REPLICATION_EXIT_DELAY_MS, + exitCode: env.SESSION_REPLICATION_EXIT_CODE, + }), }); return service; diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts index f7f384faffc..2465bd11de8 100644 --- a/apps/webapp/app/services/sessionsReplicationService.server.ts +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -23,6 +23,11 @@ import { tryCatch } from "@trigger.dev/core/utils"; import { type Session } from "@trigger.dev/database"; import EventEmitter from "node:events"; import { ConcurrentFlushScheduler } from "./runsReplicationService.server"; +import { + createReplicationErrorRecovery, + type ReplicationErrorRecovery, + type ReplicationErrorRecoveryStrategy, +} from "./replicationErrorRecovery.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -65,6 +70,9 @@ export type SessionsReplicationServiceOptions = { insertMaxRetries?: number; insertBaseDelayMs?: number; insertMaxDelayMs?: number; + // What to do when the replication client errors (e.g. after a Postgres + // failover). Defaults to in-process reconnect with exponential backoff. + errorRecovery?: ReplicationErrorRecoveryStrategy; }; type SessionInsert = { @@ -105,6 +113,7 @@ export class SessionsReplicationService { private _insertBaseDelayMs: number; private _insertMaxDelayMs: number; private _insertStrategy: "insert" | "insert_async"; + private _errorRecovery: ReplicationErrorRecovery; // Metrics private _replicationLagHistogram: Histogram; @@ -231,14 +240,25 @@ export class SessionsReplicationService { } }); + this._errorRecovery = createReplicationErrorRecovery({ + strategy: options.errorRecovery ?? { type: "reconnect" }, + logger: this.logger, + reconnect: async () => { + await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + }, + isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, + }); + this._replicationClient.events.on("error", (error) => { this.logger.error("Replication client error", { error, }); + this._errorRecovery.handle(error); }); this._replicationClient.events.on("start", () => { this.logger.info("Replication client started"); + this._errorRecovery.notifyStreamStarted(); }); this._replicationClient.events.on("acknowledge", ({ lsn }) => { @@ -259,6 +279,7 @@ export class SessionsReplicationService { if (this._isShuttingDown) return; this._isShuttingDown = true; + this._errorRecovery.dispose(); this.logger.info("Initiating shutdown of sessions replication service"); diff --git a/apps/webapp/test/runsReplicationService.errorRecovery.test.ts b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts new file mode 100644 index 00000000000..be3965ebebe --- /dev/null +++ b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts @@ -0,0 +1,303 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { containerTest } from "@internal/testcontainers"; +import { setTimeout } from "node:timers/promises"; +import { z } from "zod"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; + +vi.setConfig({ testTimeout: 120_000 }); + +// These tests force a replication-stream disconnect (the same shape Postgres +// reports during an RDS failover) and verify each error-recovery strategy +// behaves correctly: +// - "reconnect" (default) auto-resubscribes and resumes from the last LSN +// - "exit" exits the process so a supervisor restarts it +// - "log" keeps historical behaviour (silent death of the stream) +describe("RunsReplicationService error recovery", () => { + containerTest( + "reconnect strategy auto-recovers after the replication backend is killed", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + // Tight backoff so the test doesn't wait minutes. + errorRecovery: { + type: "reconnect", + initialDelayMs: 200, + maxDelayMs: 1000, + }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + // Insert a row pre-failure and verify it replicates. + const runA = await createTaskRun(prisma, seed, "run_pre_failover"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + // Kill the WAL sender backend — same shape as the RDS failover that + // dropped both replication clients on test cloud. + await killReplicationBackend(prisma, "runs-replication"); + + // Insert a row after the kill. With the reconnect strategy the + // service should automatically resubscribe and pick this up. + const runB = await createTaskRun(prisma, seed, "run_post_failover"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id, runB.id], { timeoutMs: 30_000 }); + } finally { + await service.shutdown(); + } + } + ); + + containerTest( + "exit strategy calls process.exit after the replication backend is killed", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + // Stub process.exit so the test process itself doesn't terminate. + // mockImplementation returns never; cast to satisfy the signature. + const exitSpy = vi + .spyOn(process, "exit") + .mockImplementation(((code?: number) => undefined as never) as typeof process.exit); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + errorRecovery: { + type: "exit", + // Short delay so the test stays quick; the flush window doesn't + // matter here because we're stubbing the actual exit call. + exitDelayMs: 100, + exitCode: 1, + }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + // Sanity check: replication is alive before the kill. + const runA = await createTaskRun(prisma, seed, "run_pre_exit"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + await killReplicationBackend(prisma, "runs-replication"); + + // Wait long enough for the error event to fire and the exit timer + // to elapse, plus slack. + await setTimeout(2000); + + expect(exitSpy).toHaveBeenCalledWith(1); + } finally { + exitSpy.mockRestore(); + await service.shutdown(); + } + } + ); + + containerTest( + "log strategy leaves replication permanently stopped", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + errorRecovery: { type: "log" }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + const runA = await createTaskRun(prisma, seed, "run_pre_log"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + await killReplicationBackend(prisma, "runs-replication"); + + // Give the service time to attempt (and not) any recovery. + await setTimeout(2000); + + // Insert a row after the kill — under the log strategy nothing + // brings the stream back, so this should not appear in ClickHouse. + const runB = await createTaskRun(prisma, seed, "run_post_log"); + await setTimeout(3000); + + const ids = await readReplicatedRunIds(clickhouse); + expect(ids).toContain(runA.id); + expect(ids).not.toContain(runB.id); + } finally { + await service.shutdown(); + } + } + ); +}); + +// -------------------------------------------------------------------------- +// helpers +// -------------------------------------------------------------------------- + +type SeedRefs = { + organizationId: string; + projectId: string; + runtimeEnvironmentId: string; +}; + +async function seedOrgProjectEnv(prisma: any): Promise { + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + return { + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: runtimeEnvironment.id, + }; +} + +async function createTaskRun(prisma: any, seed: SeedRefs, friendlyId: string) { + return prisma.taskRun.create({ + data: { + friendlyId, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: friendlyId, + spanId: friendlyId, + queue: "test", + runtimeEnvironmentId: seed.runtimeEnvironmentId, + projectId: seed.projectId, + organizationId: seed.organizationId, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); +} + +// Kills any active WAL-sender backends whose application_name matches the +// service. This mirrors the failover-style disconnect that surfaced the bug: +// the WAL stream connection drops and the LogicalReplicationClient errors. +async function killReplicationBackend(prisma: any, applicationName: string) { + // Wait briefly for the WAL sender to appear in pg_stat_replication after + // subscribe() completes — there's a small async gap between + // replicationStart firing and the row being visible to other sessions. + for (let attempt = 0; attempt < 20; attempt++) { + const rows = await prisma.$queryRawUnsafe<{ pid: number }[]>( + `SELECT pid FROM pg_stat_replication WHERE application_name = $1`, + applicationName + ); + if (rows.length > 0) { + for (const { pid } of rows) { + await prisma.$executeRawUnsafe(`SELECT pg_terminate_backend(${pid})`); + } + return; + } + await setTimeout(100); + } + throw new Error( + `No active replication backend found for application_name=${applicationName} after 2s` + ); +} + +async function readReplicatedRunIds(clickhouse: ClickHouse): Promise { + const queryRuns = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT run_id FROM trigger_dev.task_runs_v2", + schema: z.object({ run_id: z.string() }), + }); + const [queryError, result] = await queryRuns({}); + if (queryError) throw queryError; + return (result ?? []).map((row) => row.run_id); +} + +async function waitForRunIdsInClickHouse( + clickhouse: ClickHouse, + expectedIds: string[], + options: { timeoutMs?: number; pollIntervalMs?: number } = {} +) { + const timeoutMs = options.timeoutMs ?? 10_000; + const pollIntervalMs = options.pollIntervalMs ?? 250; + const deadline = Date.now() + timeoutMs; + let lastIds: string[] = []; + while (Date.now() < deadline) { + lastIds = await readReplicatedRunIds(clickhouse); + if (expectedIds.every((id) => lastIds.includes(id))) return; + await setTimeout(pollIntervalMs); + } + throw new Error( + `Timed out waiting for run ids ${JSON.stringify(expectedIds)} to land in ClickHouse. ` + + `Last seen: ${JSON.stringify(lastIds)}` + ); +} From 72d40794ca2b225d7091077d1d639ea8d2e7f561 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 13 May 2026 22:31:30 +0100 Subject: [PATCH 2/5] fix(webapp): reschedule reconnect when subscribe() throws MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR review feedback: - LogicalReplicationClient.subscribe() can throw before its internal "error" listener is wired up (notably when pg client.connect() fails mid-failover). The reconnect strategy's catch block only logged, so recovery silently stopped. Now also calls scheduleReconnect(err) — the pendingReconnect guard makes it idempotent if an error event was also emitted. - Reject negative values for the new replication-recovery env vars and cap exit codes at 255. - Convert the new ReplicationErrorRecovery{Deps,} interfaces to type aliases to match the repo's TypeScript style. - Tighten the reconnect dep comment to drop a stale "lastAcknowledgedLsn" reference (the wrapper-tracked resume LSN is what callers actually pass). - Restore process.exit after service.shutdown() in the exit-strategy test so a delayed exit timer can't terminate the test worker. --- apps/webapp/app/env.server.ts | 14 ++++++------- .../replicationErrorRecovery.server.ts | 20 +++++++++++-------- ...nsReplicationService.errorRecovery.test.ts | 4 +++- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 4a3d13b6886..e5b02e3dcad 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1338,8 +1338,8 @@ const EnvironmentSchema = z RUN_REPLICATION_ERROR_STRATEGY: z .enum(["reconnect", "exit", "log"]) .default("reconnect"), - RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000), - RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1), + RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000), + RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1), // Session replication (Postgres → ClickHouse sessions_v1). Shares Redis // with the runs replicator for leader locking but has its own slot and @@ -1376,15 +1376,15 @@ const EnvironmentSchema = z SESSION_REPLICATION_ERROR_STRATEGY: z .enum(["reconnect", "exit", "log"]) .default("reconnect"), - SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000), - SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1), + SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000), + SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1), // Reconnect tuning shared across both replication services. Only // applies when error strategy is `reconnect`. Max attempts of 0 means // unlimited (default). - REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().default(1_000), - REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().default(60_000), - REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().default(0), + REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().min(0).default(1_000), + REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().min(0).default(60_000), + REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().min(0).default(0), // Clickhouse CLICKHOUSE_URL: z.string(), diff --git a/apps/webapp/app/services/replicationErrorRecovery.server.ts b/apps/webapp/app/services/replicationErrorRecovery.server.ts index ea3f2c0bf47..55f7495959d 100644 --- a/apps/webapp/app/services/replicationErrorRecovery.server.ts +++ b/apps/webapp/app/services/replicationErrorRecovery.server.ts @@ -27,18 +27,18 @@ export type ReplicationErrorRecoveryStrategy = } | { type: "log" }; -export interface ReplicationErrorRecoveryDeps { +export type ReplicationErrorRecoveryDeps = { strategy: ReplicationErrorRecoveryStrategy; logger: Logger; // Re-subscribe the underlying replication client. Implementations should - // call client.subscribe(lastAcknowledgedLsn) and resolve once that returns. + // call client.subscribe(...) and resolve once the stream is started. reconnect: () => Promise; // True once the host service has begun graceful shutdown — recovery // suppresses all work in that state. isShuttingDown: () => boolean; -} +}; -export interface ReplicationErrorRecovery { +export type ReplicationErrorRecovery = { // Called from the replication client's "error" event handler. handle(error: unknown): void; // Called from the replication client's "start" event handler. Resets the @@ -46,7 +46,7 @@ export interface ReplicationErrorRecovery { notifyStreamStarted(): void; // Cancel any pending reconnect/exit timer. Called from shutdown(). dispose(): void; -} +}; export function createReplicationErrorRecovery( deps: ReplicationErrorRecoveryDeps @@ -91,13 +91,17 @@ export function createReplicationErrorRecovery( // Success path is handled by notifyStreamStarted, which fires from // the replication client's "start" event after the stream is live. } catch (err) { - // subscribe() emits an "error" event of its own on failure, so the - // next attempt is scheduled by handle(). Log here anyway so reconnect - // failures stay visible even if the error event is suppressed. + // subscribe() can throw without first emitting an "error" event — + // notably when the initial pg client.connect() fails because Postgres + // is still unreachable mid-failover. Schedule the next attempt + // ourselves so recovery doesn't silently stop. If subscribe() did + // also emit an "error" event, handle() will call scheduleReconnect() + // first; the guard on pendingReconnect makes this idempotent. logger.error("Replication reconnect attempt failed", { attempt, error: err, }); + scheduleReconnect(err); } }, delay); } diff --git a/apps/webapp/test/runsReplicationService.errorRecovery.test.ts b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts index be3965ebebe..fc25c3b9eef 100644 --- a/apps/webapp/test/runsReplicationService.errorRecovery.test.ts +++ b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts @@ -126,8 +126,10 @@ describe("RunsReplicationService error recovery", () => { expect(exitSpy).toHaveBeenCalledWith(1); } finally { - exitSpy.mockRestore(); + // shutdown() before mockRestore() so any in-flight exit timer can + // be disposed without terminating the Vitest worker. await service.shutdown(); + exitSpy.mockRestore(); } } ); From 1e6617cd11e5292d9ed9ea09a0255fb6329f870b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 16:31:28 +0100 Subject: [PATCH 3/5] fix(webapp): reschedule reconnect when subscribe() returns stopped MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LogicalReplicationClient.subscribe() can resolve without throwing or emitting an "error" event when leader-lock acquisition fails — it just calls this.stop() and returns. The reconnect callback now checks isStopped after subscribe() and throws so the recovery handler can schedule the next attempt instead of silently giving up. --- apps/webapp/app/services/runsReplicationService.server.ts | 7 +++++++ .../app/services/sessionsReplicationService.server.ts | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index b873b4da5d3..00efb82222e 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -264,6 +264,13 @@ export class RunsReplicationService { logger: this.logger, reconnect: async () => { await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + if (this._replicationClient.isStopped) { + // subscribe() can resolve without throwing or emitting an "error" + // event when leader-lock acquisition fails (see LogicalReplication- + // Client.subscribe leader-election branch). Throw here so the + // recovery handler reschedules the next attempt. + throw new Error("Replication client stopped after subscribe()"); + } }, isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, }); diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts index 2465bd11de8..588c65b9c41 100644 --- a/apps/webapp/app/services/sessionsReplicationService.server.ts +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -245,6 +245,11 @@ export class SessionsReplicationService { logger: this.logger, reconnect: async () => { await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + if (this._replicationClient.isStopped) { + // See RunsReplicationService for the rationale: subscribe() can + // resolve without throwing when leader-lock acquisition fails. + throw new Error("Replication client stopped after subscribe()"); + } }, isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, }); From a2eaf3e80d8c521e44e83f2f9db3a2aae7a4737f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 17:08:58 +0100 Subject: [PATCH 4/5] fix(webapp): drop bogus isStopped check, route leader-lock failure through handle() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous post-subscribe() isStopped check was always true on the happy path: subscribe() calls stop() up front (setting _isStopped=true) and only resets the flag inside the replicationStart event, which fires asynchronously after subscribe() returns. So the check threw on every successful reconnect, the catch rescheduled, the next attempt tore down the just-built client, and the cycle continued — replication briefly worked between teardowns, which is why the integration test passed. Replace it with the correct nudge: subscribe to leaderElection and call the recovery handler on isLeader=false. That's the only subscribe() exit path that doesn't either throw or emit an "error" event (the other silent-return paths emit "error" first via createPublication/createSlot failures). --- .../services/runsReplicationService.server.ts | 16 +++++++++------- .../sessionsReplicationService.server.ts | 11 ++++++----- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 00efb82222e..d1f4f8e2bd9 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -264,13 +264,6 @@ export class RunsReplicationService { logger: this.logger, reconnect: async () => { await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); - if (this._replicationClient.isStopped) { - // subscribe() can resolve without throwing or emitting an "error" - // event when leader-lock acquisition fails (see LogicalReplication- - // Client.subscribe leader-election branch). Throw here so the - // recovery handler reschedules the next attempt. - throw new Error("Replication client stopped after subscribe()"); - } }, isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, }); @@ -293,6 +286,15 @@ export class RunsReplicationService { this._replicationClient.events.on("leaderElection", (isLeader) => { this.logger.info("Leader election", { isLeader }); + if (!isLeader) { + // Failed leader election doesn't throw or emit an "error" event — + // subscribe() just emits leaderElection(false), calls stop(), and + // returns. Nudge the recovery handler so reconnect doesn't silently + // stall when another instance holds the lock. + this._errorRecovery.handle( + new Error("Failed to acquire replication leader lock") + ); + } }); // Initialize retry configuration diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts index 588c65b9c41..9cf7fd69a7a 100644 --- a/apps/webapp/app/services/sessionsReplicationService.server.ts +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -245,11 +245,6 @@ export class SessionsReplicationService { logger: this.logger, reconnect: async () => { await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); - if (this._replicationClient.isStopped) { - // See RunsReplicationService for the rationale: subscribe() can - // resolve without throwing when leader-lock acquisition fails. - throw new Error("Replication client stopped after subscribe()"); - } }, isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, }); @@ -272,6 +267,12 @@ export class SessionsReplicationService { this._replicationClient.events.on("leaderElection", (isLeader) => { this.logger.info("Leader election", { isLeader }); + if (!isLeader) { + // See RunsReplicationService for the rationale. + this._errorRecovery.handle( + new Error("Failed to acquire replication leader lock") + ); + } }); // Initialize retry configuration From 499060eebdf32b484d499bfce20ffd7c428b4b2f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 15 May 2026 18:43:11 +0100 Subject: [PATCH 5/5] fix(webapp): scope leaderElection-lost recovery to reconnect strategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit routed leaderElection(false) through handle(), which under the exit strategy schedules process.exit. In a multi-instance deployment that turns lost leader election — a normal operational state — into a restart loop: exit, supervisor restarts, election fails again, exit, and so on. Add a dedicated notifyLeaderElectionLost() on ReplicationErrorRecovery that the reconnect strategy treats as another retry trigger, while exit and log strategies no-op. Wire the wrapper services through the new method. --- .../app/services/replicationErrorRecovery.server.ts | 13 +++++++++++++ .../app/services/runsReplicationService.server.ts | 7 ++++--- .../services/sessionsReplicationService.server.ts | 2 +- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/services/replicationErrorRecovery.server.ts b/apps/webapp/app/services/replicationErrorRecovery.server.ts index 55f7495959d..46cf05a3181 100644 --- a/apps/webapp/app/services/replicationErrorRecovery.server.ts +++ b/apps/webapp/app/services/replicationErrorRecovery.server.ts @@ -44,6 +44,11 @@ export type ReplicationErrorRecovery = { // Called from the replication client's "start" event handler. Resets the // reconnect attempt counter so the next failure starts from initialDelayMs. notifyStreamStarted(): void; + // Called from the replication client's "leaderElection" event handler with + // isLeader=false. Only the reconnect strategy acts on this; exit and log + // strategies treat losing the lock as a normal multi-instance state (an + // "exit" instance would otherwise restart-loop whenever a peer holds it). + notifyLeaderElectionLost(error: unknown): void; // Cancel any pending reconnect/exit timer. Called from shutdown(). dispose(): void; }; @@ -145,6 +150,14 @@ export function createReplicationErrorRecovery( attempt = 0; } }, + notifyLeaderElectionLost(error) { + if (isShuttingDown()) return; + // Only the reconnect strategy should react. For exit, losing the + // lock to a peer would otherwise trigger a restart loop. For log, + // we keep historical no-op semantics. + if (strategy.type !== "reconnect") return; + scheduleReconnect(error); + }, dispose() { if (pendingReconnect) { clearTimeout(pendingReconnect); diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index d1f4f8e2bd9..4bdc2551dd8 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -289,9 +289,10 @@ export class RunsReplicationService { if (!isLeader) { // Failed leader election doesn't throw or emit an "error" event — // subscribe() just emits leaderElection(false), calls stop(), and - // returns. Nudge the recovery handler so reconnect doesn't silently - // stall when another instance holds the lock. - this._errorRecovery.handle( + // returns. Route through a dedicated handler so only the reconnect + // strategy acts; the exit strategy must not restart-loop when + // another instance holds the lock. + this._errorRecovery.notifyLeaderElectionLost( new Error("Failed to acquire replication leader lock") ); } diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts index 9cf7fd69a7a..95b386f9686 100644 --- a/apps/webapp/app/services/sessionsReplicationService.server.ts +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -269,7 +269,7 @@ export class SessionsReplicationService { this.logger.info("Leader election", { isLeader }); if (!isLeader) { // See RunsReplicationService for the rationale. - this._errorRecovery.handle( + this._errorRecovery.notifyLeaderElectionLost( new Error("Failed to acquire replication leader lock") ); }