From 8f4aa4daa4aa0e8dc77592b6701984a9a01e3d19 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 11 Jun 2026 16:02:54 -0700 Subject: [PATCH 1/3] improvement(sockets): make offline mode recoverable and stop transient races tripping it --- apps/realtime/src/handlers/subblocks.ts | 6 +- apps/realtime/src/handlers/variables.ts | 6 +- apps/realtime/src/middleware/permissions.ts | 9 +- .../workspace-permissions-provider.tsx | 37 +++++- .../workspace/providers/socket-provider.tsx | 64 ++++++---- apps/sim/hooks/use-collaborative-workflow.ts | 3 + apps/sim/stores/operation-queue/store.test.ts | 118 ++++++++++++++++++ apps/sim/stores/operation-queue/store.ts | 109 +++++++++++++--- apps/sim/stores/workflows/registry/store.ts | 4 + 9 files changed, 302 insertions(+), 54 deletions(-) diff --git a/apps/realtime/src/handlers/subblocks.ts b/apps/realtime/src/handlers/subblocks.ts index a54e3eb99e2..1ee35d3e722 100644 --- a/apps/realtime/src/handlers/subblocks.ts +++ b/apps/realtime/src/handlers/subblocks.ts @@ -130,7 +130,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: socket.emit('operation-failed', { operationId, error: 'User session not found', - retryable: false, + retryable: true, }) } return @@ -250,7 +250,7 @@ async function flushSubblockUpdate( io.to(socketId).emit('operation-failed', { operationId: opId, error: 'Workflow not found', - retryable: false, + retryable: true, }) }) return @@ -352,7 +352,7 @@ async function flushSubblockUpdate( io.to(socketId).emit('operation-failed', { operationId: opId, error: 'Block no longer exists', - retryable: false, + retryable: true, }) }) } diff --git a/apps/realtime/src/handlers/variables.ts b/apps/realtime/src/handlers/variables.ts index b7c7a529c80..41aeed3f83f 100644 --- a/apps/realtime/src/handlers/variables.ts +++ b/apps/realtime/src/handlers/variables.ts @@ -118,7 +118,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: socket.emit('operation-failed', { operationId, error: 'User session not found', - retryable: false, + retryable: true, }) } return @@ -236,7 +236,7 @@ async function flushVariableUpdate( io.to(socketId).emit('operation-failed', { operationId: opId, error: 'Workflow not found', - retryable: false, + retryable: true, }) }) return @@ -318,7 +318,7 @@ async function flushVariableUpdate( io.to(socketId).emit('operation-failed', { operationId: opId, error: 'Variable no longer exists', - retryable: false, + retryable: true, }) }) } diff --git a/apps/realtime/src/middleware/permissions.ts b/apps/realtime/src/middleware/permissions.ts index 661f4d52d44..26a4d8c8542 100644 --- a/apps/realtime/src/middleware/permissions.ts +++ b/apps/realtime/src/middleware/permissions.ts @@ -83,6 +83,13 @@ export function checkRolePermission( return { allowed: true } } +/** + * Verifies a user's access to a workflow via workspace permissions. + * + * Returns `hasAccess: false` only for genuine denials (workflow missing/archived + * or no workspace permission). Transient failures (DB errors) are rethrown so the + * caller can report them as retryable instead of a permanent access denial. + */ export async function verifyWorkflowAccess( userId: string, workflowId: string @@ -129,6 +136,6 @@ export async function verifyWorkflowAccess( `Error verifying workflow access for user ${userId}, workflow ${workflowId}:`, error ) - return { hasAccess: false } + throw error } } diff --git a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx index 5423cae324a..8f3c98277ee 100644 --- a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx +++ b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx @@ -63,6 +63,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP const { isReconnecting, isRetryingWorkflowJoin } = useSocket() const realtimeStatusNotificationIdRef = useRef(null) const realtimeStatusNotificationMessageRef = useRef(null) + const offlineNotificationIdRef = useRef(null) const isOfflineMode = hasOperationError const realtimeStatusMessage = isReconnecting @@ -81,6 +82,13 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP realtimeStatusNotificationMessageRef.current = null }, []) + const clearOfflineNotification = useCallback(() => { + if (offlineNotificationIdRef.current) { + toast.dismiss(offlineNotificationIdRef.current) + offlineNotificationIdRef.current = null + } + }, []) + useEffect(() => { if (isOfflineMode || !realtimeStatusMessage) { clearRealtimeStatusNotification() @@ -103,8 +111,11 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP }, [clearRealtimeStatusNotification, isOfflineMode, realtimeStatusMessage]) useEffect(() => { - return clearRealtimeStatusNotification - }, [clearRealtimeStatusNotification]) + return () => { + clearRealtimeStatusNotification() + clearOfflineNotification() + } + }, [clearRealtimeStatusNotification, clearOfflineNotification]) useRegisterGlobalCommands(() => createCommands([ @@ -121,14 +132,25 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP ) useEffect(() => { - if (!isOfflineMode || hasShownOfflineNotification) { + if (!isOfflineMode) { + // Offline mode can recover (successful room rejoin or workspace switch); + // dismiss the persistent toast and re-arm the notification for any future + // offline transition. + clearOfflineNotification() + if (hasShownOfflineNotification) { + setHasShownOfflineNotification(false) + } + return + } + + if (hasShownOfflineNotification) { return } clearRealtimeStatusNotification() try { - toast.error('Connection unavailable', { + offlineNotificationIdRef.current = toast.error('Connection unavailable', { duration: 0, persistAcrossRoutes: true, action: { label: 'Refresh', onClick: () => window.location.reload() }, @@ -137,7 +159,12 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP } catch (error) { logger.error('Failed to add offline notification', { error }) } - }, [clearRealtimeStatusNotification, hasShownOfflineNotification, isOfflineMode]) + }, [ + clearOfflineNotification, + clearRealtimeStatusNotification, + hasShownOfflineNotification, + isOfflineMode, + ]) const { data: workspacePermissions, diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index af9ee9befb8..8985dfe68ae 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -75,27 +75,33 @@ interface SocketContextType { joinWorkflow: (workflowId: string) => void leaveWorkflow: () => void retryConnection: () => void + /** + * Emit functions return whether the payload was actually sent over the socket. + * `false` means the emit was skipped because the target room is not currently + * joined/visible; the operation queue keeps such operations pending instead of + * waiting on a confirmation that will never arrive. + */ emitWorkflowOperation: ( workflowId: string, operation: string, target: string, payload: any, operationId?: string - ) => void + ) => boolean emitSubblockUpdate: ( blockId: string, subblockId: string, value: any, operationId: string | undefined, workflowId: string - ) => void + ) => boolean emitVariableUpdate: ( variableId: string, field: string, value: any, operationId: string | undefined, workflowId: string - ) => void + ) => boolean emitCursorUpdate: (cursor: { x: number; y: number } | null) => void emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void @@ -126,9 +132,9 @@ const SocketContext = createContext({ joinWorkflow: () => {}, leaveWorkflow: () => {}, retryConnection: () => {}, - emitWorkflowOperation: () => {}, - emitSubblockUpdate: () => {}, - emitVariableUpdate: () => {}, + emitWorkflowOperation: () => false, + emitSubblockUpdate: () => false, + emitVariableUpdate: () => false, emitCursorUpdate: () => {}, emitSelectionUpdate: () => {}, onWorkflowOperation: () => {}, @@ -519,6 +525,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { setIsRetryingWorkflowJoin(false) setVisibleWorkflowId(workflowId) setPresenceUsers(presenceUsers || []) + // A successful join is always followed by a workflow-state push that + // rehydrates local stores from server truth, so a previously tripped + // offline mode is safe to clear here. + useOperationQueueStore.getState().clearError() logger.info(`Successfully joined workflow room: ${workflowId}`, { presenceCount: presenceUsers?.length || 0, }) @@ -847,7 +857,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) { }, [authFailed]) const emitWorkflowOperation = useCallback( - (workflowId: string, operation: string, target: string, payload: any, operationId?: string) => { + ( + workflowId: string, + operation: string, + target: string, + payload: any, + operationId?: string + ): boolean => { if ( !socket || !currentWorkflowId || @@ -861,7 +877,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { operation, target, }) - return + return false } const isPositionUpdate = operation === 'update-position' && target === 'block' @@ -885,7 +901,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { clearTimeout(timeoutId) positionUpdateTimeouts.current.delete(blockId) } - return + return true } pendingPositionUpdates.current.set(blockId, { @@ -909,16 +925,18 @@ export function SocketProvider({ children, user }: SocketProviderProps) { positionUpdateTimeouts.current.set(blockId, timeoutId) } - } else { - socket.emit('workflow-operation', { - workflowId, - operation, - target, - payload, - timestamp: Date.now(), - operationId, - }) + return true } + + socket.emit('workflow-operation', { + workflowId, + operation, + target, + payload, + timestamp: Date.now(), + operationId, + }) + return true }, [socket, currentWorkflowId, isWorkflowVisible] ) @@ -930,7 +948,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { value: any, operationId: string | undefined, workflowId: string - ) => { + ): boolean => { if ( !socket || workflowId !== currentWorkflowIdRef.current || @@ -949,7 +967,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { reason, currentWorkflowId: currentWorkflowIdRef.current, }) - return + return false } socket.emit('subblock-update', { workflowId, @@ -959,6 +977,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { timestamp: Date.now(), operationId, }) + return true }, [socket] ) @@ -970,7 +989,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { value: any, operationId: string | undefined, workflowId: string - ) => { + ): boolean => { if ( !socket || workflowId !== currentWorkflowIdRef.current || @@ -989,7 +1008,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { reason, currentWorkflowId: currentWorkflowIdRef.current, }) - return + return false } socket.emit('variable-update', { workflowId, @@ -999,6 +1018,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { timestamp: Date.now(), operationId, }) + return true }, [socket] ) diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index 5825cb38a66..ddee1b798b1 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -378,6 +378,7 @@ export function useCollaborativeWorkflow() { } break case VARIABLE_OPERATIONS.REMOVE: + useOperationQueueStore.getState().cancelOperationsForVariable(payload.variableId) useVariablesStore.getState().deleteVariable(payload.variableId) break } @@ -443,6 +444,8 @@ export function useCollaborativeWorkflow() { }) if (ids && ids.length > 0) { + const operationQueue = useOperationQueueStore.getState() + ids.forEach((id: string) => operationQueue.cancelOperationsForBlock(id)) useWorkflowStore.getState().batchRemoveBlocks(ids) } diff --git a/apps/sim/stores/operation-queue/store.test.ts b/apps/sim/stores/operation-queue/store.test.ts index b18439f60cc..2380ed71036 100644 --- a/apps/sim/stores/operation-queue/store.test.ts +++ b/apps/sim/stores/operation-queue/store.test.ts @@ -74,6 +74,124 @@ describe('operation queue room gating', () => { useOperationQueueStore.getState().confirmOperation('op-1') }) + it('reverts the operation to pending without retrying when the emit is skipped', () => { + const skippingEmit = vi.fn(() => false) + registerEmitFunctions(skippingEmit, vi.fn(), vi.fn(), 'workflow-a') + + useOperationQueueStore.getState().addToQueue({ + id: 'op-1', + workflowId: 'workflow-a', + userId: 'user-1', + operation: { + operation: 'replace-state', + target: 'workflow', + payload: { state: {} }, + }, + }) + + expect(skippingEmit).toHaveBeenCalledTimes(1) + + const state = useOperationQueueStore.getState() + expect(state.isProcessing).toBe(false) + expect(state.hasOperationError).toBe(false) + expect(state.operations).toEqual([ + expect.objectContaining({ id: 'op-1', status: 'pending', retryCount: 0 }), + ]) + }) + + it('emits a previously skipped operation once the room becomes joinable', () => { + const skippingEmit = vi.fn(() => false) + registerEmitFunctions(skippingEmit, vi.fn(), vi.fn(), 'workflow-a') + + useOperationQueueStore.getState().addToQueue({ + id: 'op-1', + workflowId: 'workflow-a', + userId: 'user-1', + operation: { + operation: 'replace-state', + target: 'workflow', + payload: { state: {} }, + }, + }) + + expect(skippingEmit).toHaveBeenCalledTimes(1) + + const sendingEmit = vi.fn(() => true) + registerEmitFunctions(sendingEmit, vi.fn(), vi.fn(), 'workflow-a') + + expect(sendingEmit).toHaveBeenCalledWith( + 'workflow-a', + 'replace-state', + 'workflow', + { state: {} }, + 'op-1' + ) + expect(useOperationQueueStore.getState().operations).toEqual([ + expect.objectContaining({ id: 'op-1', status: 'processing' }), + ]) + + useOperationQueueStore.getState().confirmOperation('op-1') + }) + + it('triggers offline mode for a non-retryable failure and recovers via clearError', () => { + registerEmitFunctions( + vi.fn(() => true), + vi.fn(), + vi.fn(), + 'workflow-a' + ) + + useOperationQueueStore.getState().addToQueue({ + id: 'op-1', + workflowId: 'workflow-a', + userId: 'user-1', + operation: { + operation: 'replace-state', + target: 'workflow', + payload: { state: {} }, + }, + }) + + useOperationQueueStore.getState().failOperation('op-1', false) + + expect(useOperationQueueStore.getState().hasOperationError).toBe(true) + expect(useOperationQueueStore.getState().operations).toEqual([]) + + useOperationQueueStore.getState().clearError() + + expect(useOperationQueueStore.getState().hasOperationError).toBe(false) + }) + + it('triggers offline mode once retries exhaust for retryable failures', () => { + registerEmitFunctions( + vi.fn(() => true), + vi.fn(), + vi.fn(), + 'workflow-a' + ) + + useOperationQueueStore.getState().addToQueue({ + id: 'op-1', + workflowId: 'workflow-a', + userId: 'user-1', + operation: { + operation: 'replace-state', + target: 'workflow', + payload: { state: {} }, + }, + }) + + useOperationQueueStore.getState().failOperation('op-1', true) + useOperationQueueStore.getState().failOperation('op-1', true) + useOperationQueueStore.getState().failOperation('op-1', true) + expect(useOperationQueueStore.getState().hasOperationError).toBe(false) + + useOperationQueueStore.getState().failOperation('op-1', true) + + expect(useOperationQueueStore.getState().hasOperationError).toBe(true) + expect(useOperationQueueStore.getState().operations).toEqual([]) + }) + it('reports pending operations per workflow', () => { useOperationQueueStore.getState().addToQueue({ id: 'op-1', diff --git a/apps/sim/stores/operation-queue/store.ts b/apps/sim/stores/operation-queue/store.ts index bf0d9533e85..d3c91c32a0a 100644 --- a/apps/sim/stores/operation-queue/store.ts +++ b/apps/sim/stores/operation-queue/store.ts @@ -12,6 +12,16 @@ function isBlockStillPresent(blockId: string | undefined): boolean { } } +function isVariableStillPresent(variableId: string | undefined): boolean { + if (!variableId) return true + try { + const { useVariablesStore } = require('@/stores/variables/store') + return Boolean(useVariablesStore.getState().variables[variableId]) + } catch { + return true + } +} + const logger = createLogger('OperationQueue') /** Timeout for subblock/variable operations before considering them failed */ @@ -31,6 +41,11 @@ const retryTimeouts = new Map() const operationTimeouts = new Map() const DEFAULT_WORKFLOW_DRAIN_TIMEOUT_MS = 20000 +/** + * Emit functions return whether the operation was actually sent over the socket. + * A `false` return means the emit was skipped (room not joined/visible) and the + * operation should stay pending instead of waiting on a confirmation timeout. + */ let emitWorkflowOperation: | (( workflowId: string, @@ -38,7 +53,7 @@ let emitWorkflowOperation: target: string, payload: any, operationId?: string - ) => void) + ) => boolean) | null = null let emitSubblockUpdate: | (( @@ -47,7 +62,7 @@ let emitSubblockUpdate: value: any, operationId: string | undefined, workflowId: string - ) => void) + ) => boolean) | null = null let emitVariableUpdate: | (( @@ -56,7 +71,7 @@ let emitVariableUpdate: value: any, operationId: string | undefined, workflowId: string - ) => void) + ) => boolean) | null = null export function registerEmitFunctions( @@ -66,21 +81,21 @@ export function registerEmitFunctions( target: string, payload: any, operationId?: string - ) => void, + ) => boolean, subblockEmit: ( blockId: string, subblockId: string, value: any, operationId: string | undefined, workflowId: string - ) => void, + ) => boolean, variableEmit: ( variableId: string, field: string, value: any, operationId: string | undefined, workflowId: string - ) => void, + ) => boolean, workflowId: string | null ) { emitWorkflowOperation = workflowEmit @@ -94,6 +109,44 @@ export function registerEmitFunctions( let currentRegisteredWorkflowId: string | null = null +/** + * Drops a failed operation whose target block or variable no longer exists + * locally (e.g. it was removed by a remote collaborator while the operation + * was in flight). Returns true when the operation was dropped, in which case + * offline mode must not be triggered. + */ +function dropOperationForMissingTarget(operation: QueuedOperation): boolean { + const { target, payload } = operation.operation + + const isVariableOperation = target === 'variable' + const targetId = isVariableOperation + ? payload?.variableId || payload?.id + : payload?.blockId || payload?.id + const targetStillPresent = isVariableOperation + ? isVariableStillPresent(targetId) + : isBlockStillPresent(targetId) + + if (!targetId || targetStillPresent) { + return false + } + + logger.debug( + isVariableOperation + ? 'Dropping failed operation for deleted variable' + : 'Dropping failed operation for deleted block', + { + operationId: operation.id, + targetId, + } + ) + useOperationQueueStore.setState((s) => ({ + operations: s.operations.filter((op) => op.id !== operation.id), + isProcessing: false, + })) + useOperationQueueStore.getState().processNextOperation() + return true +} + export const useOperationQueueStore = create((set, get) => ({ operations: [], workflowOperationVersions: {}, @@ -240,17 +293,7 @@ export const useOperationQueueStore = create((set, get) => } if (!retryable) { - const targetBlockId = operation.operation.payload?.blockId || operation.operation.payload?.id - if (targetBlockId && !isBlockStillPresent(targetBlockId)) { - logger.debug('Dropping failed operation for deleted block', { - operationId, - blockId: targetBlockId, - }) - set((s) => ({ - operations: s.operations.filter((op) => op.id !== operationId), - isProcessing: false, - })) - get().processNextOperation() + if (dropOperationForMissingTarget(operation)) { return } @@ -307,6 +350,10 @@ export const useOperationQueueStore = create((set, get) => retryTimeouts.set(operationId, timeout) } else { + if (dropOperationForMissingTarget(operation)) { + return + } + logger.error('Operation failed after max retries, triggering offline mode', { operationId, operation: operation.operation.operation, @@ -363,9 +410,10 @@ export const useOperationQueueStore = create((set, get) => }) const { operation: op, target, payload } = nextOperation.operation + let emitted = false if (op === 'subblock-update' && target === 'subblock') { if (emitSubblockUpdate) { - emitSubblockUpdate( + emitted = emitSubblockUpdate( payload.blockId, payload.subblockId, payload.value, @@ -375,7 +423,7 @@ export const useOperationQueueStore = create((set, get) => } } else if (op === 'variable-update' && target === 'variable') { if (emitVariableUpdate) { - emitVariableUpdate( + emitted = emitVariableUpdate( payload.variableId, payload.field, payload.value, @@ -385,10 +433,31 @@ export const useOperationQueueStore = create((set, get) => } } else { if (emitWorkflowOperation) { - emitWorkflowOperation(nextOperation.workflowId, op, target, payload, nextOperation.id) + emitted = emitWorkflowOperation( + nextOperation.workflowId, + op, + target, + payload, + nextOperation.id + ) } } + if (!emitted) { + logger.debug('Emit skipped for operation - leaving it pending until the room is joinable', { + operationId: nextOperation.id, + operation: nextOperation.operation.operation, + workflowId: nextOperation.workflowId, + }) + set((state) => ({ + operations: state.operations.map((o) => + o.id === nextOperation.id ? { ...o, status: 'pending' as const } : o + ), + isProcessing: false, + })) + return + } + const isSubblockOrVariable = (nextOperation.operation.operation === 'subblock-update' && nextOperation.operation.target === 'subblock') || diff --git a/apps/sim/stores/workflows/registry/store.ts b/apps/sim/stores/workflows/registry/store.ts index 9cf4519ff6f..c6928749977 100644 --- a/apps/sim/stores/workflows/registry/store.ts +++ b/apps/sim/stores/workflows/registry/store.ts @@ -9,6 +9,7 @@ import { getQueryClient } from '@/app/_shell/providers/get-query-client' import type { WorkflowDeploymentInfo } from '@/hooks/queries/deployments' import { deploymentKeys } from '@/hooks/queries/deployments' import { invalidateWorkflowLists } from '@/hooks/queries/utils/invalidate-workflow-lists' +import { useOperationQueueStore } from '@/stores/operation-queue/store' import { useVariablesStore } from '@/stores/variables/store' import type { Variable } from '@/stores/variables/types' import type { HydrationState, WorkflowRegistry } from '@/stores/workflows/registry/types' @@ -56,6 +57,9 @@ export const useWorkflowRegistry = create()( logger.info(`Switching to workspace: ${workspaceId}`) resetWorkflowStores() + // Workflow stores are fully reset and reloaded from the server in the new + // workspace, so a previously tripped offline mode must not carry over. + useOperationQueueStore.getState().clearError() void invalidateWorkflowLists(getQueryClient(), workspaceId) set({ From 191f5b8129d999419e7a824f19c30879a6560066 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 11 Jun 2026 16:12:13 -0700 Subject: [PATCH 2/3] data persistence issues should trigger offline mode and force refresh --- .../providers/workspace-permissions-provider.tsx | 1 + apps/sim/app/workspace/providers/socket-provider.tsx | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx index 8f3c98277ee..d1d910d5898 100644 --- a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx +++ b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx @@ -151,6 +151,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP try { offlineNotificationIdRef.current = toast.error('Connection unavailable', { + description: 'Recent changes may not have been saved. Refresh to resync.', duration: 0, persistAcrossRoutes: true, action: { label: 'Refresh', onClick: () => window.location.reload() }, diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 8985dfe68ae..e77b54019ec 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -525,10 +525,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) { setIsRetryingWorkflowJoin(false) setVisibleWorkflowId(workflowId) setPresenceUsers(presenceUsers || []) - // A successful join is always followed by a workflow-state push that - // rehydrates local stores from server truth, so a previously tripped - // offline mode is safe to clear here. - useOperationQueueStore.getState().clearError() logger.info(`Successfully joined workflow room: ${workflowId}`, { presenceCount: presenceUsers?.length || 0, }) From b5a57fddbeba58c4b20342db868f278b48925179 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 11 Jun 2026 16:40:37 -0700 Subject: [PATCH 3/3] code cleanup --- .../workspace-permissions-provider.tsx | 175 ++++++++---------- .../workspace/providers/socket-provider.tsx | 52 +++--- apps/sim/stores/operation-queue/store.ts | 84 +++------ apps/sim/stores/operation-queue/types.ts | 29 +++ 4 files changed, 159 insertions(+), 181 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx index d1d910d5898..05d250acbd0 100644 --- a/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx +++ b/apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx @@ -1,7 +1,7 @@ 'use client' import type React from 'react' -import { createContext, useCallback, useContext, useEffect, useMemo, useRef, useState } from 'react' +import { createContext, useCallback, useContext, useEffect, useMemo, useRef } from 'react' import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' import { useParams } from 'next/navigation' @@ -19,6 +19,60 @@ import { useOperationQueueStore } from '@/stores/operation-queue/store' const logger = createLogger('WorkspacePermissionsProvider') +interface PersistentToastOptions { + description?: string + action?: { label: string; onClick: () => void } +} + +/** + * Shows a persistent error toast while `message` is non-null, replaces it when + * the message changes, and dismisses it when the message becomes null or the + * owning component unmounts. + */ +function usePersistentErrorToast(message: string | null, options?: PersistentToastOptions) { + const { toast } = useToast() + const toastIdRef = useRef(null) + const shownMessageRef = useRef(null) + const optionsRef = useRef(options) + optionsRef.current = options + + const dismiss = useCallback(() => { + if (!toastIdRef.current) { + return + } + + toast.dismiss(toastIdRef.current) + toastIdRef.current = null + shownMessageRef.current = null + }, []) + + useEffect(() => { + if (!message) { + dismiss() + return + } + + if (toastIdRef.current && shownMessageRef.current === message) { + return + } + + dismiss() + + try { + toastIdRef.current = toast.error(message, { + ...optionsRef.current, + duration: 0, + persistAcrossRoutes: true, + }) + shownMessageRef.current = message + } catch (error) { + logger.error('Failed to show persistent notification', { error, message }) + } + }, [dismiss, message]) + + useEffect(() => dismiss, [dismiss]) +} + interface WorkspacePermissionsContextType { workspacePermissions: WorkspacePermissions | null permissionsLoading: boolean @@ -55,67 +109,34 @@ interface WorkspacePermissionsProviderProps { export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsProviderProps) { const params = useParams() const workspaceId = params?.workspaceId as string + const urlWorkflowId = params?.workflowId as string | undefined const queryClient = useQueryClient() const { toast } = useToast() - const [hasShownOfflineNotification, setHasShownOfflineNotification] = useState(false) const hasOperationError = useOperationQueueStore((state) => state.hasOperationError) - const { isReconnecting, isRetryingWorkflowJoin } = useSocket() - const realtimeStatusNotificationIdRef = useRef(null) - const realtimeStatusNotificationMessageRef = useRef(null) - const offlineNotificationIdRef = useRef(null) + const { isReconnecting, isRetryingWorkflowJoin, blockedJoinWorkflowId } = useSocket() const isOfflineMode = hasOperationError - const realtimeStatusMessage = isReconnecting - ? 'Reconnecting...' - : isRetryingWorkflowJoin - ? 'Joining workflow...' - : null - - const clearRealtimeStatusNotification = useCallback(() => { - if (!realtimeStatusNotificationIdRef.current) { - return - } - - toast.dismiss(realtimeStatusNotificationIdRef.current) - realtimeStatusNotificationIdRef.current = null - realtimeStatusNotificationMessageRef.current = null - }, []) - - const clearOfflineNotification = useCallback(() => { - if (offlineNotificationIdRef.current) { - toast.dismiss(offlineNotificationIdRef.current) - offlineNotificationIdRef.current = null - } - }, []) - - useEffect(() => { - if (isOfflineMode || !realtimeStatusMessage) { - clearRealtimeStatusNotification() - return - } - - if ( - realtimeStatusNotificationIdRef.current && - realtimeStatusNotificationMessageRef.current === realtimeStatusMessage - ) { - return - } - - clearRealtimeStatusNotification() - - const id = toast.error(realtimeStatusMessage, { duration: 0, persistAcrossRoutes: true }) - - realtimeStatusNotificationIdRef.current = id - realtimeStatusNotificationMessageRef.current = realtimeStatusMessage - }, [clearRealtimeStatusNotification, isOfflineMode, realtimeStatusMessage]) - - useEffect(() => { - return () => { - clearRealtimeStatusNotification() - clearOfflineNotification() - } - }, [clearRealtimeStatusNotification, clearOfflineNotification]) + const isJoinBlocked = Boolean(blockedJoinWorkflowId) && blockedJoinWorkflowId === urlWorkflowId + const realtimeStatusMessage = isOfflineMode + ? null + : isReconnecting + ? 'Reconnecting...' + : isRetryingWorkflowJoin + ? 'Joining workflow...' + : null + + usePersistentErrorToast(realtimeStatusMessage) + // Offline mode only recovers via workspace switch or refresh; the join block + // lifts when the user targets a different workflow or refreshes. + usePersistentErrorToast(isOfflineMode ? 'Connection unavailable' : null, { + description: 'Recent changes may not have been saved. Refresh to resync.', + action: { label: 'Refresh', onClick: () => window.location.reload() }, + }) + usePersistentErrorToast(isJoinBlocked ? 'Unable to connect to workflow' : null, { + description: 'Changes cannot be saved. Refresh to retry.', + action: { label: 'Refresh', onClick: () => window.location.reload() }, + }) useRegisterGlobalCommands(() => createCommands([ @@ -131,42 +152,6 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP ]) ) - useEffect(() => { - if (!isOfflineMode) { - // Offline mode can recover (successful room rejoin or workspace switch); - // dismiss the persistent toast and re-arm the notification for any future - // offline transition. - clearOfflineNotification() - if (hasShownOfflineNotification) { - setHasShownOfflineNotification(false) - } - return - } - - if (hasShownOfflineNotification) { - return - } - - clearRealtimeStatusNotification() - - try { - offlineNotificationIdRef.current = toast.error('Connection unavailable', { - description: 'Recent changes may not have been saved. Refresh to resync.', - duration: 0, - persistAcrossRoutes: true, - action: { label: 'Refresh', onClick: () => window.location.reload() }, - }) - setHasShownOfflineNotification(true) - } catch (error) { - logger.error('Failed to add offline notification', { error }) - } - }, [ - clearOfflineNotification, - clearRealtimeStatusNotification, - hasShownOfflineNotification, - isOfflineMode, - ]) - const { data: workspacePermissions, isLoading: permissionsLoading, @@ -195,13 +180,13 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP ) const userPermissions = useMemo((): WorkspaceUserPermissions & { isOfflineMode?: boolean } => { - if (isOfflineMode) { + if (isOfflineMode || isJoinBlocked) { return { ...baseUserPermissions, canEdit: false, canAdmin: false, canRead: baseUserPermissions.canRead, - isOfflineMode: true, + isOfflineMode, } } @@ -209,7 +194,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP ...baseUserPermissions, isOfflineMode: false, } - }, [baseUserPermissions, isOfflineMode]) + }, [baseUserPermissions, isOfflineMode, isJoinBlocked]) const contextValue = useMemo( () => ({ diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index e77b54019ec..47587fe4f0e 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -25,6 +25,11 @@ import { resolveSocketWorkflowTarget, } from '@/app/workspace/providers/socket-join-target' import { useOperationQueueStore } from '@/stores/operation-queue/store' +import type { + SubblockUpdateEmit, + VariableUpdateEmit, + WorkflowOperationEmit, +} from '@/stores/operation-queue/types' import { useWorkflowRegistry as useWorkflowRegistryStore } from '@/stores/workflows/registry/store' const logger = createLogger('SocketContext') @@ -69,39 +74,21 @@ interface SocketContextType { isReconnecting: boolean isRetryingWorkflowJoin: boolean authFailed: boolean + /** + * Workflow whose room join failed non-retryably (e.g. access denied). The room + * is blocked until the user targets a different workflow or refreshes; edits made + * while blocked would never persist, so consumers should surface this and block edits. + */ + blockedJoinWorkflowId: string | null currentWorkflowId: string | null currentSocketId: string | null presenceUsers: PresenceUser[] joinWorkflow: (workflowId: string) => void leaveWorkflow: () => void retryConnection: () => void - /** - * Emit functions return whether the payload was actually sent over the socket. - * `false` means the emit was skipped because the target room is not currently - * joined/visible; the operation queue keeps such operations pending instead of - * waiting on a confirmation that will never arrive. - */ - emitWorkflowOperation: ( - workflowId: string, - operation: string, - target: string, - payload: any, - operationId?: string - ) => boolean - emitSubblockUpdate: ( - blockId: string, - subblockId: string, - value: any, - operationId: string | undefined, - workflowId: string - ) => boolean - emitVariableUpdate: ( - variableId: string, - field: string, - value: any, - operationId: string | undefined, - workflowId: string - ) => boolean + emitWorkflowOperation: WorkflowOperationEmit + emitSubblockUpdate: SubblockUpdateEmit + emitVariableUpdate: VariableUpdateEmit emitCursorUpdate: (cursor: { x: number; y: number } | null) => void emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void @@ -126,6 +113,7 @@ const SocketContext = createContext({ isReconnecting: false, isRetryingWorkflowJoin: false, authFailed: false, + blockedJoinWorkflowId: null, currentWorkflowId: null, currentSocketId: null, presenceUsers: [], @@ -167,6 +155,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { const [currentSocketId, setCurrentSocketId] = useState(null) const [presenceUsers, setPresenceUsers] = useState([]) const [authFailed, setAuthFailed] = useState(false) + const [blockedJoinWorkflowId, setBlockedJoinWorkflowId] = useState(null) const [explicitWorkflowId, setExplicitWorkflowId] = useState(null) const initializedRef = useRef(false) const socketRef = useRef(null) @@ -523,6 +512,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`) } else { setIsRetryingWorkflowJoin(false) + setBlockedJoinWorkflowId(null) setVisibleWorkflowId(workflowId) setPresenceUsers(presenceUsers || []) logger.info(`Successfully joined workflow room: ${workflowId}`, { @@ -553,6 +543,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { if (result.workflowId) { useOperationQueueStore.getState().cancelOperationsForWorkflow(result.workflowId) } + setBlockedJoinWorkflowId(result.workflowId ?? null) logger.error('Failed to join workflow:', { workflowId: result.workflowId, @@ -812,7 +803,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { return } - executeJoinCommands(joinControllerRef.current.requestWorkflow(getRequestedWorkflowId())) + const requestedWorkflowId = getRequestedWorkflowId() + + setBlockedJoinWorkflowId((prev) => (prev && prev !== requestedWorkflowId ? null : prev)) + executeJoinCommands(joinControllerRef.current.requestWorkflow(requestedWorkflowId)) }, [ explicitWorkflowId, getRequestedWorkflowId, @@ -1103,6 +1097,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { isReconnecting, isRetryingWorkflowJoin, authFailed, + blockedJoinWorkflowId, currentWorkflowId, currentSocketId, presenceUsers, @@ -1133,6 +1128,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { isReconnecting, isRetryingWorkflowJoin, authFailed, + blockedJoinWorkflowId, currentWorkflowId, currentSocketId, presenceUsers, diff --git a/apps/sim/stores/operation-queue/store.ts b/apps/sim/stores/operation-queue/store.ts index d3c91c32a0a..200abdf2b3f 100644 --- a/apps/sim/stores/operation-queue/store.ts +++ b/apps/sim/stores/operation-queue/store.ts @@ -1,6 +1,12 @@ import { createLogger } from '@sim/logger' import { create } from 'zustand' -import type { OperationQueueState, QueuedOperation } from './types' +import type { + OperationQueueState, + QueuedOperation, + SubblockUpdateEmit, + VariableUpdateEmit, + WorkflowOperationEmit, +} from './types' function isBlockStillPresent(blockId: string | undefined): boolean { if (!blockId) return true @@ -41,61 +47,14 @@ const retryTimeouts = new Map() const operationTimeouts = new Map() const DEFAULT_WORKFLOW_DRAIN_TIMEOUT_MS = 20000 -/** - * Emit functions return whether the operation was actually sent over the socket. - * A `false` return means the emit was skipped (room not joined/visible) and the - * operation should stay pending instead of waiting on a confirmation timeout. - */ -let emitWorkflowOperation: - | (( - workflowId: string, - operation: string, - target: string, - payload: any, - operationId?: string - ) => boolean) - | null = null -let emitSubblockUpdate: - | (( - blockId: string, - subblockId: string, - value: any, - operationId: string | undefined, - workflowId: string - ) => boolean) - | null = null -let emitVariableUpdate: - | (( - variableId: string, - field: string, - value: any, - operationId: string | undefined, - workflowId: string - ) => boolean) - | null = null +let emitWorkflowOperation: WorkflowOperationEmit | null = null +let emitSubblockUpdate: SubblockUpdateEmit | null = null +let emitVariableUpdate: VariableUpdateEmit | null = null export function registerEmitFunctions( - workflowEmit: ( - workflowId: string, - operation: string, - target: string, - payload: any, - operationId?: string - ) => boolean, - subblockEmit: ( - blockId: string, - subblockId: string, - value: any, - operationId: string | undefined, - workflowId: string - ) => boolean, - variableEmit: ( - variableId: string, - field: string, - value: any, - operationId: string | undefined, - workflowId: string - ) => boolean, + workflowEmit: WorkflowOperationEmit, + subblockEmit: SubblockUpdateEmit, + variableEmit: VariableUpdateEmit, workflowId: string | null ) { emitWorkflowOperation = workflowEmit @@ -109,16 +68,25 @@ export function registerEmitFunctions( let currentRegisteredWorkflowId: string | null = null +/** Targets whose payload id refers to a canvas block (subflow ids are loop/parallel blocks). */ +const BLOCK_SCOPED_TARGETS = ['block', 'subblock', 'subflow'] + /** - * Drops a failed operation whose target block or variable no longer exists - * locally (e.g. it was removed by a remote collaborator while the operation - * was in flight). Returns true when the operation was dropped, in which case - * offline mode must not be triggered. + * Drops a failed operation whose target entity no longer exists locally (e.g. it + * was removed by a remote collaborator while the operation was in flight), so a + * stale per-entity failure does not trip offline mode. Applies only to block-, + * subblock-, subflow-, and variable-scoped operations; structural operations + * (workflow/blocks/edges) have no single target entity and always fall through + * to offline mode. Returns true when the operation was dropped. */ function dropOperationForMissingTarget(operation: QueuedOperation): boolean { const { target, payload } = operation.operation const isVariableOperation = target === 'variable' + if (!isVariableOperation && !BLOCK_SCOPED_TARGETS.includes(target)) { + return false + } + const targetId = isVariableOperation ? payload?.variableId || payload?.id : payload?.blockId || payload?.id diff --git a/apps/sim/stores/operation-queue/types.ts b/apps/sim/stores/operation-queue/types.ts index e59731a82b5..fb56a2a694b 100644 --- a/apps/sim/stores/operation-queue/types.ts +++ b/apps/sim/stores/operation-queue/types.ts @@ -1,3 +1,32 @@ +/** + * Emit functions return whether the payload was actually sent over the socket. + * A `false` return means the emit was skipped (room not joined/visible) and the + * operation should stay pending instead of waiting on a confirmation timeout. + */ +export type WorkflowOperationEmit = ( + workflowId: string, + operation: string, + target: string, + payload: any, + operationId?: string +) => boolean + +export type SubblockUpdateEmit = ( + blockId: string, + subblockId: string, + value: any, + operationId: string | undefined, + workflowId: string +) => boolean + +export type VariableUpdateEmit = ( + variableId: string, + field: string, + value: any, + operationId: string | undefined, + workflowId: string +) => boolean + export interface QueuedOperation { id: string operation: {