diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry/resource-registry.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry/resource-registry.tsx index 460950dfa18..7c3f6a4db5e 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry/resource-registry.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry/resource-registry.tsx @@ -25,9 +25,9 @@ import { knowledgeKeys } from '@/hooks/queries/kb/knowledge' import { logKeys } from '@/hooks/queries/logs' import { mothershipChatKeys } from '@/hooks/queries/mothership-chats' import { scheduleKeys } from '@/hooks/queries/schedules' -import { tableKeys } from '@/hooks/queries/tables' import { folderKeys } from '@/hooks/queries/utils/folder-keys' import { invalidateWorkflowLists } from '@/hooks/queries/utils/invalidate-workflow-lists' +import { tableKeys } from '@/hooks/queries/utils/table-keys' import { workspaceFileFolderKeys } from '@/hooks/queries/workspace-file-folders' import { workspaceFilesKeys } from '@/hooks/queries/workspace-files' diff --git a/apps/sim/app/workspace/[workspaceId]/lib/prefetch.test.ts b/apps/sim/app/workspace/[workspaceId]/lib/prefetch.test.ts index d031c0648ef..2f8375c836f 100644 --- a/apps/sim/app/workspace/[workspaceId]/lib/prefetch.test.ts +++ b/apps/sim/app/workspace/[workspaceId]/lib/prefetch.test.ts @@ -21,8 +21,8 @@ import { prefetchHomeLists } from '@/app/workspace/[workspaceId]/home/prefetch' import { prefetchKnowledgeBases } from '@/app/workspace/[workspaceId]/knowledge/prefetch' import { prefetchTables } from '@/app/workspace/[workspaceId]/tables/prefetch' import { knowledgeKeys } from '@/hooks/queries/kb/knowledge' -import { tableKeys } from '@/hooks/queries/tables' import { folderKeys } from '@/hooks/queries/utils/folder-keys' +import { tableKeys } from '@/hooks/queries/utils/table-keys' import { workspaceFileFolderKeys } from '@/hooks/queries/workspace-file-folders' import { workspaceFilesKeys } from '@/hooks/queries/workspace-files' diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts index 34789dff546..6a0efe9695d 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -13,8 +13,8 @@ import { downloadExportResult, snapshotAndMutateRows, type TableRunState, - tableKeys, } from '@/hooks/queries/tables' +import { tableKeys } from '@/hooks/queries/utils/table-keys' const logger = createLogger('useTableEventStream') diff --git a/apps/sim/app/workspace/[workspaceId]/tables/prefetch.ts b/apps/sim/app/workspace/[workspaceId]/tables/prefetch.ts index 60d6a79a735..8d41a1d6680 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/prefetch.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/prefetch.ts @@ -1,7 +1,7 @@ import type { QueryClient } from '@tanstack/react-query' import type { TableDefinition } from '@/lib/table' import { prefetchInternalJson } from '@/app/workspace/[workspaceId]/lib/prefetch-internal-fetch' -import { tableKeys } from '@/hooks/queries/tables' +import { tableKeys } from '@/hooks/queries/utils/table-keys' /** * Prefetches the workspace's tables list under the same query key the client diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx index d4caa19424f..fe3892ec05c 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx @@ -561,6 +561,7 @@ export function Chat() { } } + let finalError: string | null = null try { await readSSEEvents<{ event?: string; data?: ExecutionResult; chunk?: string }>(reader, { onParseError: (_data, e) => { @@ -571,12 +572,7 @@ export function Chat() { if (event === 'final' && eventData) { if ('success' in eventData && !eventData.success) { - const errorMessage = eventData.error || 'Workflow execution failed' - flushChunks() - appendMessageContent( - responseMessageId, - `${accumulatedContent ? '\n\n' : ''}Error: ${errorMessage}` - ) + finalError = eventData.error || 'Workflow execution failed' } return true } @@ -589,6 +585,12 @@ export function Chat() { }, }) flushChunks() + if (finalError) { + appendMessageContent( + responseMessageId, + `${accumulatedContent ? '\n\n' : ''}Error: ${finalError}` + ) + } finalizeMessageStream(responseMessageId) } catch (error) { if ((error as Error)?.name !== 'AbortError') { diff --git a/apps/sim/hooks/queries/tables.test.ts b/apps/sim/hooks/queries/tables.test.ts index da4c8c1cc04..d8245e50c31 100644 --- a/apps/sim/hooks/queries/tables.test.ts +++ b/apps/sim/hooks/queries/tables.test.ts @@ -83,13 +83,13 @@ vi.mock('@/components/emcn', () => ({ })) import { - tableKeys, tableRowsInfiniteOptions, tableRowsParamsKey, useDeleteColumn, useRestoreTable, useUpdateColumn, } from '@/hooks/queries/tables' +import { tableKeys } from '@/hooks/queries/utils/table-keys' const TABLE_ID = 'tbl-1' const WORKSPACE_ID = 'ws-1' diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index df49de32b17..cbf69d8c57a 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -96,34 +96,10 @@ import { optimisticallyScheduleNewlyEligibleGroups, } from '@/lib/table/deps' import { runUploadStrategy } from '@/lib/uploads/client/direct-upload' +import { type TableQueryScope, tableKeys } from '@/hooks/queries/utils/table-keys' const logger = createLogger('TableQueries') -type TableQueryScope = 'active' | 'archived' | 'all' - -export const tableKeys = { - all: ['tables'] as const, - lists: () => [...tableKeys.all, 'list'] as const, - list: (workspaceId?: string, scope: TableQueryScope = 'active') => - [...tableKeys.lists(), workspaceId ?? '', scope] as const, - details: () => [...tableKeys.all, 'detail'] as const, - detail: (tableId: string) => [...tableKeys.details(), tableId] as const, - exportJobs: (workspaceId?: string) => - [...tableKeys.all, 'export-jobs', workspaceId ?? ''] as const, - rowsRoot: (tableId: string) => [...tableKeys.detail(tableId), 'rows'] as const, - infiniteRows: (tableId: string, paramsKey: string) => - [...tableKeys.rowsRoot(tableId), 'infinite', paramsKey] as const, - rowWrites: (tableId: string) => [...tableKeys.rowsRoot(tableId), 'write'] as const, - find: (tableId: string, paramsKey: string) => - [...tableKeys.rowsRoot(tableId), 'find', paramsKey] as const, - activeDispatches: (tableId: string) => - [...tableKeys.detail(tableId), 'active-dispatches'] as const, - enrichmentDetails: (tableId: string) => - [...tableKeys.detail(tableId), 'enrichment-detail'] as const, - enrichmentDetail: (tableId: string, rowId: string, groupId: string) => - [...tableKeys.enrichmentDetails(tableId), rowId, groupId] as const, -} - type TableRowsParams = Omit & TableIdParamsInput & { filter?: Filter | null diff --git a/apps/sim/hooks/queries/utils/table-keys.ts b/apps/sim/hooks/queries/utils/table-keys.ts new file mode 100644 index 00000000000..cf27bddd013 --- /dev/null +++ b/apps/sim/hooks/queries/utils/table-keys.ts @@ -0,0 +1,34 @@ +/** + * React Query key factory for user-defined tables. + * + * Lives in this standalone (non-`'use client'`) module — like + * {@link file://./folder-keys.ts} — so it can be imported from server + * components (e.g. the tables page prefetch) without pulling in the + * `'use client'` `@/hooks/queries/tables` module, whose exports would + * otherwise resolve to client-reference stubs on the server. + */ + +export type TableQueryScope = 'active' | 'archived' | 'all' + +export const tableKeys = { + all: ['tables'] as const, + lists: () => [...tableKeys.all, 'list'] as const, + list: (workspaceId?: string, scope: TableQueryScope = 'active') => + [...tableKeys.lists(), workspaceId ?? '', scope] as const, + details: () => [...tableKeys.all, 'detail'] as const, + detail: (tableId: string) => [...tableKeys.details(), tableId] as const, + exportJobs: (workspaceId?: string) => + [...tableKeys.all, 'export-jobs', workspaceId ?? ''] as const, + rowsRoot: (tableId: string) => [...tableKeys.detail(tableId), 'rows'] as const, + infiniteRows: (tableId: string, paramsKey: string) => + [...tableKeys.rowsRoot(tableId), 'infinite', paramsKey] as const, + rowWrites: (tableId: string) => [...tableKeys.rowsRoot(tableId), 'write'] as const, + find: (tableId: string, paramsKey: string) => + [...tableKeys.rowsRoot(tableId), 'find', paramsKey] as const, + activeDispatches: (tableId: string) => + [...tableKeys.detail(tableId), 'active-dispatches'] as const, + enrichmentDetails: (tableId: string) => + [...tableKeys.detail(tableId), 'enrichment-detail'] as const, + enrichmentDetail: (tableId: string, rowId: string, groupId: string) => + [...tableKeys.enrichmentDetails(tableId), rowId, groupId] as const, +} diff --git a/apps/sim/lib/core/utils/sse.test.ts b/apps/sim/lib/core/utils/sse.test.ts index 579e23fd607..7ddf99cbd3b 100644 --- a/apps/sim/lib/core/utils/sse.test.ts +++ b/apps/sim/lib/core/utils/sse.test.ts @@ -361,6 +361,32 @@ describe('readSSEEvents', () => { expect(events).toEqual([{ msg: 'hello' }]) }) + it('emits a final data: line that has no trailing newline (stream tail)', async () => { + const stream = streamFromStringChunks(['data: {"n":1}\n', 'data: {"n":2}']) + const events: number[] = [] + await readSSEEvents<{ n: number }>(stream, { + onEvent: (e) => { + events.push(e.n) + }, + }) + expect(events).toEqual([1, 2]) + }) + + it('flushes a multi-byte character in the final unterminated line', async () => { + const encoder = new TextEncoder() + const euro = encoder.encode('€') + const chunk1 = new Uint8Array([...encoder.encode('data: {"s":"'), euro[0], euro[1]]) + const chunk2 = new Uint8Array([euro[2], ...encoder.encode('"}')]) + const stream = createStreamFromChunks([chunk1, chunk2]) + const events: Array<{ s: string }> = [] + await readSSEEvents<{ s: string }>(stream, { + onEvent: (e) => { + events.push(e) + }, + }) + expect(events).toEqual([{ s: '€' }]) + }) + it('skips the [DONE] sentinel', async () => { const stream = streamFromStringChunks(['data: {"n":1}\n\n', 'data: [DONE]\n\n']) const events: number[] = [] diff --git a/apps/sim/lib/core/utils/sse.ts b/apps/sim/lib/core/utils/sse.ts index 50c758f0013..2651147136e 100644 --- a/apps/sim/lib/core/utils/sse.ts +++ b/apps/sim/lib/core/utils/sse.ts @@ -138,11 +138,10 @@ export async function readSSELines(source: SSESource, options: ReadSSELinesOptio if (signal?.aborted) break const { done, value } = await reader.read() - if (done) break - buffer += decoder.decode(value, { stream: true }) + buffer += done ? decoder.decode() : decoder.decode(value, { stream: true }) const lines = buffer.split('\n') - buffer = lines.pop() ?? '' + buffer = done ? '' : (lines.pop() ?? '') for (const rawLine of lines) { if (signal?.aborted) return @@ -156,6 +155,8 @@ export async function readSSELines(source: SSESource, options: ReadSSELinesOptio if ((await onData(data)) === true) return } + + if (done) break } } finally { if (ownsLock) reader.releaseLock() diff --git a/apps/sim/triggers/table/poller.ts b/apps/sim/triggers/table/poller.ts index 6fe6ad17f81..e922c0502ec 100644 --- a/apps/sim/triggers/table/poller.ts +++ b/apps/sim/triggers/table/poller.ts @@ -3,7 +3,7 @@ import { requestJson } from '@/lib/api/client/request' import { listTablesContract } from '@/lib/api/contracts/tables' import type { TableDefinition } from '@/lib/table' import { getQueryClient } from '@/app/_shell/providers/get-query-client' -import { tableKeys } from '@/hooks/queries/tables' +import { tableKeys } from '@/hooks/queries/utils/table-keys' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useSubBlockStore } from '@/stores/workflows/subblock/store' import type { TriggerConfig } from '@/triggers/types'