Skip to content

Commit 7462bfe

Browse files
committed
feat(webapp): give the realtime runs feed its own ClickHouse pool
Resolve tag/batch run ids on a dedicated REALTIME_RUNS_CLICKHOUSE_* pool (falling back to CLICKHOUSE_URL) so the feed can't contend with the shared analytics client.
1 parent 3e3a821 commit 7462bfe

4 files changed

Lines changed: 64 additions & 3 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,6 +1633,20 @@ const EnvironmentSchema = z
16331633
.enum(["log", "error", "warn", "info", "debug"])
16341634
.default("info"),
16351635
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
1636+
// ClickHouse client used by the realtime runs feed for tag/batch id resolution.
1637+
// Kept on its own URL + pool so the feed's reads can't contend with the main
1638+
// analytics client (CLICKHOUSE_URL). Falls back to the main URL when unset.
1639+
REALTIME_RUNS_CLICKHOUSE_URL: z
1640+
.string()
1641+
.optional()
1642+
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
1643+
REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
1644+
REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
1645+
REALTIME_RUNS_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
1646+
REALTIME_RUNS_CLICKHOUSE_LOG_LEVEL: z
1647+
.enum(["log", "error", "warn", "info", "debug"])
1648+
.default("info"),
1649+
REALTIME_RUNS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
16361650
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
16371651
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
16381652
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),

apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,36 @@ function initializeRunEngineClickhouseClient(): ClickHouse {
211211
});
212212
}
213213

214+
/** Realtime runs feed tag/batch id resolution (`REALTIME_RUNS_CLICKHOUSE_URL`);
215+
* falls back to the default client if unset. */
216+
const defaultRealtimeClickhouseClient = singleton(
217+
"realtimeClickhouseClient",
218+
initializeRealtimeClickhouseClient
219+
);
220+
221+
function initializeRealtimeClickhouseClient(): ClickHouse {
222+
if (!env.REALTIME_RUNS_CLICKHOUSE_URL) {
223+
return defaultClickhouseClient;
224+
}
225+
226+
const url = new URL(env.REALTIME_RUNS_CLICKHOUSE_URL);
227+
url.searchParams.delete("secure");
228+
229+
return new ClickHouse({
230+
url: url.toString(),
231+
name: "realtime-runs-clickhouse",
232+
keepAlive: {
233+
enabled: env.REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
234+
idleSocketTtl: env.REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
235+
},
236+
logLevel: env.REALTIME_RUNS_CLICKHOUSE_LOG_LEVEL,
237+
compression: {
238+
request: env.REALTIME_RUNS_CLICKHOUSE_COMPRESSION_REQUEST === "1",
239+
},
240+
maxOpenConnections: env.REALTIME_RUNS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
241+
});
242+
}
243+
214244
/** Task events (`EVENTS_CLICKHOUSE_URL`); not exported — accessed via factory. */
215245
const defaultEventsClickhouseClient = singleton(
216246
"eventsClickhouseClient",
@@ -257,7 +287,8 @@ export type ClientType =
257287
| "logs"
258288
| "query"
259289
| "admin"
260-
| "engine";
290+
| "engine"
291+
| "realtime";
261292

262293
function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHouse {
263294
const parsed = new URL(url);
@@ -330,6 +361,20 @@ function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHou
330361
},
331362
maxOpenConnections: env.RUN_ENGINE_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
332363
});
364+
case "realtime":
365+
return new ClickHouse({
366+
url: parsed.toString(),
367+
name,
368+
keepAlive: {
369+
enabled: env.REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
370+
idleSocketTtl: env.REALTIME_RUNS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
371+
},
372+
logLevel: env.REALTIME_RUNS_CLICKHOUSE_LOG_LEVEL,
373+
compression: {
374+
request: env.REALTIME_RUNS_CLICKHOUSE_COMPRESSION_REQUEST === "1",
375+
},
376+
maxOpenConnections: env.REALTIME_RUNS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
377+
});
333378
case "standard":
334379
case "query":
335380
case "admin":
@@ -398,6 +443,8 @@ export class ClickhouseFactory {
398443
return defaultAdminClickhouseClient;
399444
case "engine":
400445
return defaultRunEngineClickhouseClient;
446+
case "realtime":
447+
return defaultRealtimeClickhouseClient;
401448
}
402449
}
403450

apps/webapp/app/services/realtime/notifierRealtimeClientInstance.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ function initializeNotifierRealtimeClient(): NotifierRealtimeClient {
5555
runReader: new RunHydrator({ replica: $replica }),
5656
runListResolver: new ClickHouseRunListResolver({
5757
getClickhouse: (organizationId) =>
58-
clickhouseFactory.getClickhouseForOrganization(organizationId, "standard"),
58+
clickhouseFactory.getClickhouseForOrganization(organizationId, "realtime"),
5959
prisma: $replica,
6060
}),
6161
notifier: getRunChangeNotifier(),

apps/webapp/app/services/realtime/shadowRealtimeClientInstance.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ function initializeShadowRealtimeClient(): ShadowRealtimeClient {
2626
runReader: new RunHydrator({ replica: $replica }),
2727
runListResolver: new ClickHouseRunListResolver({
2828
getClickhouse: (organizationId) =>
29-
clickhouseFactory.getClickhouseForOrganization(organizationId, "standard"),
29+
clickhouseFactory.getClickhouseForOrganization(organizationId, "realtime"),
3030
prisma: $replica,
3131
}),
3232
});

0 commit comments

Comments
 (0)