Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import { knowledgeKeys } from '@/hooks/queries/kb/knowledge'
import { logKeys } from '@/hooks/queries/logs'
import { mothershipChatKeys } from '@/hooks/queries/mothership-chats'
import { scheduleKeys } from '@/hooks/queries/schedules'
import { tableKeys } from '@/hooks/queries/tables'
import { folderKeys } from '@/hooks/queries/utils/folder-keys'
import { invalidateWorkflowLists } from '@/hooks/queries/utils/invalidate-workflow-lists'
import { tableKeys } from '@/hooks/queries/utils/table-keys'
import { workspaceFileFolderKeys } from '@/hooks/queries/workspace-file-folders'
import { workspaceFilesKeys } from '@/hooks/queries/workspace-files'

Expand Down
2 changes: 1 addition & 1 deletion apps/sim/app/workspace/[workspaceId]/lib/prefetch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import { prefetchHomeLists } from '@/app/workspace/[workspaceId]/home/prefetch'
import { prefetchKnowledgeBases } from '@/app/workspace/[workspaceId]/knowledge/prefetch'
import { prefetchTables } from '@/app/workspace/[workspaceId]/tables/prefetch'
import { knowledgeKeys } from '@/hooks/queries/kb/knowledge'
import { tableKeys } from '@/hooks/queries/tables'
import { folderKeys } from '@/hooks/queries/utils/folder-keys'
import { tableKeys } from '@/hooks/queries/utils/table-keys'
import { workspaceFileFolderKeys } from '@/hooks/queries/workspace-file-folders'
import { workspaceFilesKeys } from '@/hooks/queries/workspace-files'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import {
downloadExportResult,
snapshotAndMutateRows,
type TableRunState,
tableKeys,
} from '@/hooks/queries/tables'
import { tableKeys } from '@/hooks/queries/utils/table-keys'

const logger = createLogger('useTableEventStream')

Expand Down
2 changes: 1 addition & 1 deletion apps/sim/app/workspace/[workspaceId]/tables/prefetch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { QueryClient } from '@tanstack/react-query'
import type { TableDefinition } from '@/lib/table'
import { prefetchInternalJson } from '@/app/workspace/[workspaceId]/lib/prefetch-internal-fetch'
import { tableKeys } from '@/hooks/queries/tables'
import { tableKeys } from '@/hooks/queries/utils/table-keys'

/**
* Prefetches the workspace's tables list under the same query key the client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ export function Chat() {
}
}

let finalError: string | null = null
try {
await readSSEEvents<{ event?: string; data?: ExecutionResult; chunk?: string }>(reader, {
onParseError: (_data, e) => {
Expand All @@ -571,12 +572,7 @@ export function Chat() {

if (event === 'final' && eventData) {
if ('success' in eventData && !eventData.success) {
const errorMessage = eventData.error || 'Workflow execution failed'
flushChunks()
appendMessageContent(
responseMessageId,
`${accumulatedContent ? '\n\n' : ''}Error: ${errorMessage}`
)
finalError = eventData.error || 'Workflow execution failed'
}
return true
}
Expand All @@ -589,6 +585,12 @@ export function Chat() {
},
})
flushChunks()
if (finalError) {
appendMessageContent(
responseMessageId,
`${accumulatedContent ? '\n\n' : ''}Error: ${finalError}`
)
}
finalizeMessageStream(responseMessageId)
} catch (error) {
if ((error as Error)?.name !== 'AbortError') {
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/hooks/queries/tables.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ vi.mock('@/components/emcn', () => ({
}))

import {
tableKeys,
tableRowsInfiniteOptions,
tableRowsParamsKey,
useDeleteColumn,
useRestoreTable,
useUpdateColumn,
} from '@/hooks/queries/tables'
import { tableKeys } from '@/hooks/queries/utils/table-keys'

const TABLE_ID = 'tbl-1'
const WORKSPACE_ID = 'ws-1'
Expand Down
26 changes: 1 addition & 25 deletions apps/sim/hooks/queries/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,34 +96,10 @@ import {
optimisticallyScheduleNewlyEligibleGroups,
} from '@/lib/table/deps'
import { runUploadStrategy } from '@/lib/uploads/client/direct-upload'
import { type TableQueryScope, tableKeys } from '@/hooks/queries/utils/table-keys'

const logger = createLogger('TableQueries')

type TableQueryScope = 'active' | 'archived' | 'all'

export const tableKeys = {
all: ['tables'] as const,
lists: () => [...tableKeys.all, 'list'] as const,
list: (workspaceId?: string, scope: TableQueryScope = 'active') =>
[...tableKeys.lists(), workspaceId ?? '', scope] as const,
details: () => [...tableKeys.all, 'detail'] as const,
detail: (tableId: string) => [...tableKeys.details(), tableId] as const,
exportJobs: (workspaceId?: string) =>
[...tableKeys.all, 'export-jobs', workspaceId ?? ''] as const,
rowsRoot: (tableId: string) => [...tableKeys.detail(tableId), 'rows'] as const,
infiniteRows: (tableId: string, paramsKey: string) =>
[...tableKeys.rowsRoot(tableId), 'infinite', paramsKey] as const,
rowWrites: (tableId: string) => [...tableKeys.rowsRoot(tableId), 'write'] as const,
find: (tableId: string, paramsKey: string) =>
[...tableKeys.rowsRoot(tableId), 'find', paramsKey] as const,
activeDispatches: (tableId: string) =>
[...tableKeys.detail(tableId), 'active-dispatches'] as const,
enrichmentDetails: (tableId: string) =>
[...tableKeys.detail(tableId), 'enrichment-detail'] as const,
enrichmentDetail: (tableId: string, rowId: string, groupId: string) =>
[...tableKeys.enrichmentDetails(tableId), rowId, groupId] as const,
}

type TableRowsParams = Omit<TableRowsQueryInput, 'filter' | 'sort'> &
TableIdParamsInput & {
filter?: Filter | null
Expand Down
34 changes: 34 additions & 0 deletions apps/sim/hooks/queries/utils/table-keys.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* React Query key factory for user-defined tables.
*
* Lives in this standalone (non-`'use client'`) module — like
* {@link file://./folder-keys.ts} — so it can be imported from server
* components (e.g. the tables page prefetch) without pulling in the
* `'use client'` `@/hooks/queries/tables` module, whose exports would
* otherwise resolve to client-reference stubs on the server.
*/

export type TableQueryScope = 'active' | 'archived' | 'all'

export const tableKeys = {
all: ['tables'] as const,
lists: () => [...tableKeys.all, 'list'] as const,
list: (workspaceId?: string, scope: TableQueryScope = 'active') =>
[...tableKeys.lists(), workspaceId ?? '', scope] as const,
details: () => [...tableKeys.all, 'detail'] as const,
detail: (tableId: string) => [...tableKeys.details(), tableId] as const,
exportJobs: (workspaceId?: string) =>
[...tableKeys.all, 'export-jobs', workspaceId ?? ''] as const,
rowsRoot: (tableId: string) => [...tableKeys.detail(tableId), 'rows'] as const,
infiniteRows: (tableId: string, paramsKey: string) =>
[...tableKeys.rowsRoot(tableId), 'infinite', paramsKey] as const,
rowWrites: (tableId: string) => [...tableKeys.rowsRoot(tableId), 'write'] as const,
find: (tableId: string, paramsKey: string) =>
[...tableKeys.rowsRoot(tableId), 'find', paramsKey] as const,
activeDispatches: (tableId: string) =>
[...tableKeys.detail(tableId), 'active-dispatches'] as const,
enrichmentDetails: (tableId: string) =>
[...tableKeys.detail(tableId), 'enrichment-detail'] as const,
enrichmentDetail: (tableId: string, rowId: string, groupId: string) =>
[...tableKeys.enrichmentDetails(tableId), rowId, groupId] as const,
}
26 changes: 26 additions & 0 deletions apps/sim/lib/core/utils/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,32 @@ describe('readSSEEvents', () => {
expect(events).toEqual([{ msg: 'hello' }])
})

it('emits a final data: line that has no trailing newline (stream tail)', async () => {
const stream = streamFromStringChunks(['data: {"n":1}\n', 'data: {"n":2}'])
const events: number[] = []
await readSSEEvents<{ n: number }>(stream, {
onEvent: (e) => {
events.push(e.n)
},
})
expect(events).toEqual([1, 2])
})

it('flushes a multi-byte character in the final unterminated line', async () => {
const encoder = new TextEncoder()
const euro = encoder.encode('€')
const chunk1 = new Uint8Array([...encoder.encode('data: {"s":"'), euro[0], euro[1]])
const chunk2 = new Uint8Array([euro[2], ...encoder.encode('"}')])
const stream = createStreamFromChunks([chunk1, chunk2])
const events: Array<{ s: string }> = []
await readSSEEvents<{ s: string }>(stream, {
onEvent: (e) => {
events.push(e)
},
})
expect(events).toEqual([{ s: '€' }])
})

it('skips the [DONE] sentinel', async () => {
const stream = streamFromStringChunks(['data: {"n":1}\n\n', 'data: [DONE]\n\n'])
const events: number[] = []
Expand Down
7 changes: 4 additions & 3 deletions apps/sim/lib/core/utils/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ export async function readSSELines(source: SSESource, options: ReadSSELinesOptio
if (signal?.aborted) break

const { done, value } = await reader.read()
if (done) break

buffer += decoder.decode(value, { stream: true })
buffer += done ? decoder.decode() : decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
buffer = done ? '' : (lines.pop() ?? '')

for (const rawLine of lines) {
if (signal?.aborted) return
Expand All @@ -156,6 +155,8 @@ export async function readSSELines(source: SSESource, options: ReadSSELinesOptio

if ((await onData(data)) === true) return
}

if (done) break
}
} finally {
if (ownsLock) reader.releaseLock()
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/triggers/table/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { requestJson } from '@/lib/api/client/request'
import { listTablesContract } from '@/lib/api/contracts/tables'
import type { TableDefinition } from '@/lib/table'
import { getQueryClient } from '@/app/_shell/providers/get-query-client'
import { tableKeys } from '@/hooks/queries/tables'
import { tableKeys } from '@/hooks/queries/utils/table-keys'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import type { TriggerConfig } from '@/triggers/types'
Expand Down
Loading