Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 70 additions & 25 deletions apps/sim/hooks/queries/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,43 @@ async function fetchTableRunState(tableId: string, signal?: AbortSignal): Promis
}
}

/** Count groups flipped to in-flight (`pending`) by an optimistic schedule that
* weren't in-flight before — the delta to add to the run-state counter. */
function countNewlyInFlight(before: RowExecutions, after: RowExecutions): number {
let n = 0
for (const gid of Object.keys(after)) {
if (after[gid]?.status === 'pending' && !isExecInFlight(before[gid])) n++
}
return n
}

/** Add optimistically-stamped cells to the run-state counter so the "X running"
* badge + per-row gutter Stop reflect them instantly (the optimistic stamp
* eats the dispatcher's `pending` SSE, so `applyCell` never bumps the count).
* Returns the prior snapshot for rollback, or `null` when nothing was bumped. */
function bumpRunState(
queryClient: ReturnType<typeof useQueryClient>,
tableId: string,
stampedByRow: Record<string, number>
): { snapshot: TableRunState | undefined } | null {
const total = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
if (total === 0) return null
const snapshot = queryClient.getQueryData<TableRunState>(tableKeys.activeDispatches(tableId))
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
const base = prev ?? { dispatches: [], runningCellCount: 0, runningByRowId: {} }
const nextByRow = { ...base.runningByRowId }
for (const [rid, n] of Object.entries(stampedByRow)) {
nextByRow[rid] = (nextByRow[rid] ?? 0) + n
}
return {
...base,
runningCellCount: base.runningCellCount + total,
runningByRowId: nextByRow,
}
})
return { snapshot }
}

/**
* Aggregate live state for a table: active dispatches (drives the "about to
* run" overlay), the running-cell count (top-right counter), and per-row
Expand Down Expand Up @@ -453,6 +490,11 @@ export function useCreateTableRow({ workspaceId, tableId }: RowMutationContext)
.workflowGroups ?? []
const stamped = withOptimisticAutoFireExec(groups, row)
reconcileCreatedRow(queryClient, tableId, stamped)
// Bump the run-state counter for any auto-fire groups stamped pending so
// the "X running" badge + gutter Stop show immediately (the row had no
// prior executions, so the stamped set is the full delta).
const stampedCount = countNewlyInFlight({}, stamped.executions ?? {})
if (stampedCount > 0) bumpRunState(queryClient, tableId, { [row.id]: stampedCount })
Comment thread
TheodoreSpeaks marked this conversation as resolved.
},
onError: (error) => {
if (isValidationError(error)) return
Expand Down Expand Up @@ -618,18 +660,27 @@ export function useUpdateTableRow({ workspaceId, tableId }: RowMutationContext)
queryClient.getQueryData<TableDefinition>(tableKeys.detail(tableId))?.schema
.workflowGroups ?? []

const stampedByRow: Record<string, number> = {}
patchCachedRows(queryClient, tableId, (row) => {
if (row.id !== rowId) return row
const patch = data as Partial<RowData>
const nextExecutions = optimisticallyScheduleNewlyEligibleGroups(groups, row, patch)
if (nextExecutions) {
stampedByRow[row.id] = countNewlyInFlight(row.executions ?? {}, nextExecutions)
}
return {
...row,
data: { ...row.data, ...patch } as RowData,
...(nextExecutions ? { executions: nextExecutions } : {}),
}
})

return { previousQueries }
const bumped = bumpRunState(queryClient, tableId, stampedByRow)
return {
previousQueries,
runStateSnapshot: bumped?.snapshot,
didBumpRunState: bumped !== null,
}
},
onSuccess: (response, { rowId, data: mutatedData }) => {
const serverRow = response.data.row
Expand All @@ -655,6 +706,9 @@ export function useUpdateTableRow({ workspaceId, tableId }: RowMutationContext)
queryClient.setQueryData(queryKey, data)
}
}
if (context?.didBumpRunState) {
queryClient.setQueryData(tableKeys.activeDispatches(tableId), context.runStateSnapshot)
}
if (isValidationError(error)) return
toast.error(error.message, { duration: 5000 })
},
Expand Down Expand Up @@ -694,26 +748,38 @@ export function useBatchUpdateTableRows({ workspaceId, tableId }: RowMutationCon
queryClient.getQueryData<TableDefinition>(tableKeys.detail(tableId))?.schema
.workflowGroups ?? []

const stampedByRow: Record<string, number> = {}
patchCachedRows(queryClient, tableId, (row) => {
const raw = updateMap.get(row.id)
if (!raw) return row
const patch = raw as Partial<RowData>
const nextExecutions = optimisticallyScheduleNewlyEligibleGroups(groups, row, patch)
if (nextExecutions) {
stampedByRow[row.id] = countNewlyInFlight(row.executions ?? {}, nextExecutions)
}
return {
...row,
data: { ...row.data, ...patch } as RowData,
...(nextExecutions ? { executions: nextExecutions } : {}),
}
})

return { previousQueries }
const bumped = bumpRunState(queryClient, tableId, stampedByRow)
return {
previousQueries,
runStateSnapshot: bumped?.snapshot,
didBumpRunState: bumped !== null,
}
},
onError: (error, _vars, context) => {
if (context?.previousQueries) {
for (const [queryKey, data] of context.previousQueries) {
queryClient.setQueryData(queryKey, data)
}
}
if (context?.didBumpRunState) {
queryClient.setQueryData(tableKeys.activeDispatches(tableId), context.runStateSnapshot)
}
if (isValidationError(error)) return
toast.error(error.message, { duration: 5000 })
},
Expand Down Expand Up @@ -1376,29 +1442,8 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
return { ...r, data: nextData, executions: next }
})

// Bump the counter to match the stamped cells. Without it the "X running"
// badge + gutter Stop stay at zero until a refetch: the optimistic stamp
// already marks the cell in-flight, so the dispatcher's `pending` SSE
// sees no `wasInFlight` transition and never bumps the counter.
const runStateSnapshot = queryClient.getQueryData<TableRunState>(
tableKeys.activeDispatches(tableId)
)
const totalStamped = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
if (totalStamped > 0) {
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
const base = prev ?? { dispatches: [], runningCellCount: 0, runningByRowId: {} }
const nextByRow = { ...base.runningByRowId }
for (const [rid, n] of Object.entries(stampedByRow)) {
nextByRow[rid] = (nextByRow[rid] ?? 0) + n
}
return {
...base,
runningCellCount: base.runningCellCount + totalStamped,
runningByRowId: nextByRow,
}
})
}
return { snapshots, runStateSnapshot, didBumpRunState: totalStamped > 0 }
const bumped = bumpRunState(queryClient, tableId, stampedByRow)
return { snapshots, runStateSnapshot: bumped?.snapshot, didBumpRunState: bumped !== null }
},
onError: (_err, _variables, context) => {
if (context?.snapshots) restoreCachedWorkflowCells(queryClient, context.snapshots)
Expand Down
Loading