@@ -4,12 +4,16 @@ import {
44 type FilterRunsOptions ,
55 type IRunsRepository ,
66 type ListRunsOptions ,
7+ type RunIdsPage ,
78 type RunListInputOptions ,
89 type RunsRepositoryOptions ,
910 type TagListOptions ,
1011 convertRunListInputOptionsToFilterRunsOptions ,
1112} from "./runsRepository.server" ;
1213import parseDuration from "parse-duration" ;
14+ import { decodeRunsCursor , encodeRunsCursor } from "./runsCursor.server" ;
15+
16+ type RunCursorRow = { runId : string ; createdAt : number } ;
1317
1418export class ClickHouseRunsRepository implements IRunsRepository {
1519 constructor ( private readonly options : RunsRepositoryOptions ) { }
@@ -18,25 +22,52 @@ export class ClickHouseRunsRepository implements IRunsRepository {
1822 return "clickhouse" ;
1923 }
2024
21- async listRunIds ( options : ListRunsOptions ) {
25+ /**
26+ * Runs the keyset-paginated query and returns `{ runId, createdAt }` rows
27+ * (one extra beyond `page.size` to signal "has more"). The ordering is always
28+ * the composite `(created_at, run_id)`; the cursor predicate must match it.
29+ *
30+ * Composite cursors carry both components, so we cut on the
31+ * `(created_at, run_id)` tuple — sound regardless of how run_id order relates
32+ * to created_at order. Legacy bare-run_id cursors fall back to the old
33+ * `run_id`-only predicate (knowingly unsound) for backwards compatibility
34+ * with in-flight cursors.
35+ */
36+ private async listRunRows ( options : ListRunsOptions ) : Promise < RunCursorRow [ ] > {
2237 const queryBuilder = this . options . clickhouse . taskRuns . queryBuilder ( ) ;
2338 applyRunFiltersToQueryBuilder (
2439 queryBuilder ,
2540 await convertRunListInputOptionsToFilterRunsOptions ( options , this . options . prisma )
2641 ) ;
2742
43+ const forward = options . page . direction === "forward" || ! options . page . direction ;
44+
2845 if ( options . page . cursor ) {
29- if ( options . page . direction === "forward" || ! options . page . direction ) {
30- queryBuilder
31- . where ( "run_id < {runId: String}" , { runId : options . page . cursor } )
32- . orderBy ( "created_at DESC, run_id DESC" )
33- . limit ( options . page . size + 1 ) ;
46+ const decoded = decodeRunsCursor ( options . page . cursor ) ;
47+
48+ if ( forward ) {
49+ if ( decoded . kind === "composite" ) {
50+ queryBuilder . where (
51+ "(created_at, run_id) < (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})" ,
52+ { cursorCreatedAt : decoded . createdAt , runId : decoded . runId }
53+ ) ;
54+ } else {
55+ queryBuilder . where ( "run_id < {runId: String}" , { runId : decoded . runId } ) ;
56+ }
57+ queryBuilder . orderBy ( "created_at DESC, run_id DESC" ) ;
3458 } else {
35- queryBuilder
36- . where ( "run_id > {runId: String}" , { runId : options . page . cursor } )
37- . orderBy ( "created_at ASC, run_id ASC" )
38- . limit ( options . page . size + 1 ) ;
59+ if ( decoded . kind === "composite" ) {
60+ queryBuilder . where (
61+ "(created_at, run_id) > (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})" ,
62+ { cursorCreatedAt : decoded . createdAt , runId : decoded . runId }
63+ ) ;
64+ } else {
65+ queryBuilder . where ( "run_id > {runId: String}" , { runId : decoded . runId } ) ;
66+ }
67+ queryBuilder . orderBy ( "created_at ASC, run_id ASC" ) ;
3968 }
69+
70+ queryBuilder . limit ( options . page . size + 1 ) ;
4071 } else {
4172 // Initial page - no cursor provided
4273 queryBuilder . orderBy ( "created_at DESC, run_id DESC" ) . limit ( options . page . size + 1 ) ;
@@ -48,75 +79,89 @@ export class ClickHouseRunsRepository implements IRunsRepository {
4879 throw queryError ;
4980 }
5081
51- const runIds = result . map ( ( row ) => row . run_id ) ;
52- return runIds ;
82+ return result . map ( ( row ) => ( { runId : row . run_id , createdAt : row . created_at_ms } ) ) ;
5383 }
5484
55- async listFriendlyRunIds ( options : ListRunsOptions ) {
56- // First get internal IDs from ClickHouse
57- const internalIds = await this . listRunIds ( options ) ;
85+ /**
86+ * A keyset-paginated page of run ids ordered by `(created_at, run_id)`, plus
87+ * the cursors to page forward/backward. Cursors are composite tokens that
88+ * match the ordering, so pagination can't duplicate or skip runs even when
89+ * run_id order diverges from created_at order. This is the single source of
90+ * cursor construction — `listRuns` and bulk actions both build on it.
91+ */
92+ async listRunIds ( options : ListRunsOptions ) : Promise < RunIdsPage > {
93+ const rows = await this . listRunRows ( options ) ;
5894
59- if ( internalIds . length === 0 ) {
60- return [ ] ;
61- }
62-
63- // Then get friendly IDs from Prisma
64- const runs = await this . options . prisma . taskRun . findMany ( {
65- where : {
66- id : {
67- in : internalIds ,
68- } ,
69- } ,
70- select : {
71- friendlyId : true ,
72- } ,
73- } ) ;
74-
75- return runs . map ( ( run ) => run . friendlyId ) ;
76- }
77-
78- async listRuns ( options : ListRunsOptions ) {
79- const runIds = await this . listRunIds ( options ) ;
95+ // listRunRows fetches one extra row beyond page.size to detect "has more".
96+ const hasMore = rows . length > options . page . size ;
8097
81- // If there are more runs than the page size, we need to fetch the next page
82- const hasMore = runIds . length > options . page . size ;
98+ const cursorFor = ( row : RunCursorRow | undefined ) : string | null =>
99+ row ? encodeRunsCursor ( row . createdAt , row . runId ) : null ;
83100
84101 let nextCursor : string | null = null ;
85102 let previousCursor : string | null = null ;
86103
87- //get cursors for next and previous pages
88104 const direction = options . page . direction ?? "forward" ;
89105 switch ( direction ) {
90106 case "forward" : {
91- previousCursor = options . page . cursor ? runIds . at ( 0 ) ?? null : null ;
107+ previousCursor = options . page . cursor ? cursorFor ( rows . at ( 0 ) ) : null ;
92108 if ( hasMore ) {
93- // The next cursor should be the last run ID from this page
94- nextCursor = runIds [ options . page . size - 1 ] ;
109+ // The next cursor is the last run on this page.
110+ nextCursor = cursorFor ( rows [ options . page . size - 1 ] ) ;
95111 }
96112 break ;
97113 }
98114 case "backward" : {
99- const reversedRunIds = [ ...runIds ] . reverse ( ) ;
115+ const reversedRows = [ ...rows ] . reverse ( ) ;
100116 if ( hasMore ) {
101- previousCursor = reversedRunIds . at ( 1 ) ?? null ;
102- nextCursor = reversedRunIds . at ( options . page . size ) ?? null ;
117+ previousCursor = cursorFor ( reversedRows . at ( 1 ) ) ;
118+ nextCursor = cursorFor ( reversedRows . at ( options . page . size ) ) ;
103119 } else {
104- nextCursor = reversedRunIds . at ( options . page . size - 1 ) ?? null ;
120+ nextCursor = cursorFor ( reversedRows . at ( options . page . size - 1 ) ) ;
105121 }
106-
107122 break ;
108123 }
109124 }
110125
111- const runIdsToReturn =
112- options . page . direction === "backward" && hasMore
113- ? runIds . slice ( 1 , options . page . size + 1 )
114- : runIds . slice ( 0 , options . page . size ) ;
126+ const runIds = (
127+ direction === "backward" && hasMore
128+ ? rows . slice ( 1 , options . page . size + 1 )
129+ : rows . slice ( 0 , options . page . size )
130+ ) . map ( ( row ) => row . runId ) ;
131+
132+ return { runIds, pagination : { nextCursor, previousCursor } } ;
133+ }
134+
135+ async listFriendlyRunIds ( options : ListRunsOptions ) {
136+ // First get internal IDs from ClickHouse
137+ const { runIds } = await this . listRunIds ( options ) ;
138+
139+ if ( runIds . length === 0 ) {
140+ return [ ] ;
141+ }
142+
143+ // Then get friendly IDs from Prisma
144+ const runs = await this . options . prisma . taskRun . findMany ( {
145+ where : {
146+ id : {
147+ in : runIds ,
148+ } ,
149+ } ,
150+ select : {
151+ friendlyId : true ,
152+ } ,
153+ } ) ;
154+
155+ return runs . map ( ( run ) => run . friendlyId ) ;
156+ }
157+
158+ async listRuns ( options : ListRunsOptions ) {
159+ const { runIds, pagination } = await this . listRunIds ( options ) ;
115160
116161 let runs = await this . options . prisma . taskRun . findMany ( {
117162 where : {
118163 id : {
119- in : runIdsToReturn ,
164+ in : runIds ,
120165 } ,
121166 } ,
122167 orderBy : {
@@ -163,10 +208,7 @@ export class ClickHouseRunsRepository implements IRunsRepository {
163208
164209 return {
165210 runs,
166- pagination : {
167- nextCursor,
168- previousCursor,
169- } ,
211+ pagination,
170212 } ;
171213 }
172214
0 commit comments