-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat(webapp): add a new backend for the realtime runs feed #3864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a6db506
023588a
058311a
9a32dd1
905b7de
3d07c47
d3730d9
d2987a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Add a new backend for the realtime runs feed (single runs, tags, and batches) that scales under high concurrency, available behind a feature flag |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| /** | ||
| * Tiny in-process bounded TTL cache shared by the realtime feeds. | ||
| * | ||
| * Entries expire after `ttlMs`. An expired entry is evicted when read (`get`); on | ||
| * write, if the cache is at `maxEntries`, expired entries are swept and, if it's | ||
| * still full (pathologically all live), the oldest insertion is dropped. Node is | ||
| * single-threaded so no locking is needed. Used where a miss is cheap and | ||
| * correctness-safe (read-through hydration, per-handle working sets, per-org flag | ||
| * resolution). | ||
| * | ||
| * A stored value of `undefined` cannot be distinguished from a miss; callers that | ||
| * need to cache "absence" should store an explicit sentinel (e.g. `null`). | ||
| */ | ||
| export class BoundedTtlCache<V> { | ||
| readonly #entries = new Map<string, { value: V; expiresAt: number }>(); | ||
|
|
||
| constructor( | ||
| private readonly ttlMs: number, | ||
| private readonly maxEntries: number | ||
| ) {} | ||
|
|
||
| get(key: string): V | undefined { | ||
| const entry = this.#entries.get(key); | ||
| if (!entry) { | ||
| return undefined; | ||
| } | ||
| if (entry.expiresAt > Date.now()) { | ||
| return entry.value; | ||
| } | ||
| // Evict on read so expired entries don't linger until the next at-capacity | ||
| // sweep — important for read-heavy / low-churn caches (per-handle working sets). | ||
| this.#entries.delete(key); | ||
| return undefined; | ||
| } | ||
|
|
||
| set(key: string, value: V): void { | ||
| // Only run capacity eviction when inserting a NEW key — updating an existing key | ||
| // doesn't grow the map, so it must never drop an unrelated live entry. | ||
| if (!this.#entries.has(key) && this.#entries.size >= this.maxEntries) { | ||
| const now = Date.now(); | ||
| for (const [key, entry] of this.#entries) { | ||
| if (entry.expiresAt <= now) { | ||
| this.#entries.delete(key); | ||
| } | ||
| } | ||
| if (this.#entries.size >= this.maxEntries) { | ||
| const oldest = this.#entries.keys().next().value; | ||
| if (oldest !== undefined) { | ||
| this.#entries.delete(oldest); | ||
| } | ||
| } | ||
| } | ||
| this.#entries.set(key, { value, expiresAt: Date.now() + this.ttlMs }); | ||
| } | ||
|
|
||
| get size(): number { | ||
| return this.#entries.size; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| import { type ClickHouse } from "@internal/clickhouse"; | ||
| import { type PrismaClientOrTransaction } from "~/db.server"; | ||
| import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; | ||
| import { type RunListFilter, type RunListResolver } from "./runReader.server"; | ||
|
|
||
| export type ClickHouseRunListResolverOptions = { | ||
| /** Resolves the per-organization ClickHouse client (multi-tenant routing). */ | ||
| getClickhouse: (organizationId: string) => Promise<ClickHouse>; | ||
| prisma: PrismaClientOrTransaction; | ||
| }; | ||
|
|
||
| /** | ||
| * Resolves the realtime tag/list filter into matching run ids via ClickHouse | ||
| * `listRunIds`. Tag matching is contains-ANY (OR), the same | ||
| * semantics the dashboard runs list uses. Filter-only: ids only, hydrated from | ||
| * Postgres by id afterward. This keeps the realtime tag feed off the Postgres | ||
| * `runTags` GIN index entirely. | ||
| * | ||
| * (Multi-tag subscribeToRunsWithTag is therefore OR, not the AND that Electric's | ||
| * `runTags @> ARRAY[...]` shape used. Restoring AND is a follow-up: add a | ||
| * `hasAll` mode to the ClickHouse runs filter and use it here.) | ||
|
ericallam marked this conversation as resolved.
|
||
| */ | ||
| export class ClickHouseRunListResolver implements RunListResolver { | ||
| constructor(private readonly options: ClickHouseRunListResolverOptions) {} | ||
|
|
||
| async resolveMatchingRunIds(filter: RunListFilter): Promise<string[]> { | ||
| const clickhouse = await this.options.getClickhouse(filter.organizationId); | ||
| const repository = new RunsRepository({ clickhouse, prisma: this.options.prisma }); | ||
|
|
||
| const { runIds } = await repository.listRunIds({ | ||
| organizationId: filter.organizationId, | ||
| projectId: filter.projectId, | ||
| environmentId: filter.environmentId, | ||
| tags: filter.tags && filter.tags.length > 0 ? filter.tags : undefined, | ||
| batchId: filter.batchId, | ||
| from: filter.createdAtAfter?.getTime(), | ||
| page: { size: filter.limit }, | ||
| }); | ||
|
Comment on lines
+30
to
+38
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 ClickHouseRunListResolver destructures listRunIds result as { runIds } At Was this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| // listRunIds is keyset-paginated; runIds is already capped to page.size (= limit). | ||
| return runIds; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.