Skip to content

Commit 8f4aa4d

Browse files
committed
improvement(sockets): make offline mode recoverable and stop transient races tripping it
1 parent bc55fc3 commit 8f4aa4d

9 files changed

Lines changed: 302 additions & 54 deletions

File tree

apps/realtime/src/handlers/subblocks.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
130130
socket.emit('operation-failed', {
131131
operationId,
132132
error: 'User session not found',
133-
retryable: false,
133+
retryable: true,
134134
})
135135
}
136136
return
@@ -250,7 +250,7 @@ async function flushSubblockUpdate(
250250
io.to(socketId).emit('operation-failed', {
251251
operationId: opId,
252252
error: 'Workflow not found',
253-
retryable: false,
253+
retryable: true,
254254
})
255255
})
256256
return
@@ -352,7 +352,7 @@ async function flushSubblockUpdate(
352352
io.to(socketId).emit('operation-failed', {
353353
operationId: opId,
354354
error: 'Block no longer exists',
355-
retryable: false,
355+
retryable: true,
356356
})
357357
})
358358
}

apps/realtime/src/handlers/variables.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
118118
socket.emit('operation-failed', {
119119
operationId,
120120
error: 'User session not found',
121-
retryable: false,
121+
retryable: true,
122122
})
123123
}
124124
return
@@ -236,7 +236,7 @@ async function flushVariableUpdate(
236236
io.to(socketId).emit('operation-failed', {
237237
operationId: opId,
238238
error: 'Workflow not found',
239-
retryable: false,
239+
retryable: true,
240240
})
241241
})
242242
return
@@ -318,7 +318,7 @@ async function flushVariableUpdate(
318318
io.to(socketId).emit('operation-failed', {
319319
operationId: opId,
320320
error: 'Variable no longer exists',
321-
retryable: false,
321+
retryable: true,
322322
})
323323
})
324324
}

apps/realtime/src/middleware/permissions.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ export function checkRolePermission(
8383
return { allowed: true }
8484
}
8585

86+
/**
87+
* Verifies a user's access to a workflow via workspace permissions.
88+
*
89+
* Returns `hasAccess: false` only for genuine denials (workflow missing/archived
90+
* or no workspace permission). Transient failures (DB errors) are rethrown so the
91+
* caller can report them as retryable instead of a permanent access denial.
92+
*/
8693
export async function verifyWorkflowAccess(
8794
userId: string,
8895
workflowId: string
@@ -129,6 +136,6 @@ export async function verifyWorkflowAccess(
129136
`Error verifying workflow access for user ${userId}, workflow ${workflowId}:`,
130137
error
131138
)
132-
return { hasAccess: false }
139+
throw error
133140
}
134141
}

apps/sim/app/workspace/[workspaceId]/providers/workspace-permissions-provider.tsx

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
6363
const { isReconnecting, isRetryingWorkflowJoin } = useSocket()
6464
const realtimeStatusNotificationIdRef = useRef<string | null>(null)
6565
const realtimeStatusNotificationMessageRef = useRef<string | null>(null)
66+
const offlineNotificationIdRef = useRef<string | null>(null)
6667

6768
const isOfflineMode = hasOperationError
6869
const realtimeStatusMessage = isReconnecting
@@ -81,6 +82,13 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
8182
realtimeStatusNotificationMessageRef.current = null
8283
}, [])
8384

85+
const clearOfflineNotification = useCallback(() => {
86+
if (offlineNotificationIdRef.current) {
87+
toast.dismiss(offlineNotificationIdRef.current)
88+
offlineNotificationIdRef.current = null
89+
}
90+
}, [])
91+
8492
useEffect(() => {
8593
if (isOfflineMode || !realtimeStatusMessage) {
8694
clearRealtimeStatusNotification()
@@ -103,8 +111,11 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
103111
}, [clearRealtimeStatusNotification, isOfflineMode, realtimeStatusMessage])
104112

105113
useEffect(() => {
106-
return clearRealtimeStatusNotification
107-
}, [clearRealtimeStatusNotification])
114+
return () => {
115+
clearRealtimeStatusNotification()
116+
clearOfflineNotification()
117+
}
118+
}, [clearRealtimeStatusNotification, clearOfflineNotification])
108119

109120
useRegisterGlobalCommands(() =>
110121
createCommands([
@@ -121,14 +132,25 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
121132
)
122133

123134
useEffect(() => {
124-
if (!isOfflineMode || hasShownOfflineNotification) {
135+
if (!isOfflineMode) {
136+
// Offline mode can recover (successful room rejoin or workspace switch);
137+
// dismiss the persistent toast and re-arm the notification for any future
138+
// offline transition.
139+
clearOfflineNotification()
140+
if (hasShownOfflineNotification) {
141+
setHasShownOfflineNotification(false)
142+
}
143+
return
144+
}
145+
146+
if (hasShownOfflineNotification) {
125147
return
126148
}
127149

128150
clearRealtimeStatusNotification()
129151

130152
try {
131-
toast.error('Connection unavailable', {
153+
offlineNotificationIdRef.current = toast.error('Connection unavailable', {
132154
duration: 0,
133155
persistAcrossRoutes: true,
134156
action: { label: 'Refresh', onClick: () => window.location.reload() },
@@ -137,7 +159,12 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
137159
} catch (error) {
138160
logger.error('Failed to add offline notification', { error })
139161
}
140-
}, [clearRealtimeStatusNotification, hasShownOfflineNotification, isOfflineMode])
162+
}, [
163+
clearOfflineNotification,
164+
clearRealtimeStatusNotification,
165+
hasShownOfflineNotification,
166+
isOfflineMode,
167+
])
141168

142169
const {
143170
data: workspacePermissions,

apps/sim/app/workspace/providers/socket-provider.tsx

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -75,27 +75,33 @@ interface SocketContextType {
7575
joinWorkflow: (workflowId: string) => void
7676
leaveWorkflow: () => void
7777
retryConnection: () => void
78+
/**
79+
* Emit functions return whether the payload was actually sent over the socket.
80+
* `false` means the emit was skipped because the target room is not currently
81+
* joined/visible; the operation queue keeps such operations pending instead of
82+
* waiting on a confirmation that will never arrive.
83+
*/
7884
emitWorkflowOperation: (
7985
workflowId: string,
8086
operation: string,
8187
target: string,
8288
payload: any,
8389
operationId?: string
84-
) => void
90+
) => boolean
8591
emitSubblockUpdate: (
8692
blockId: string,
8793
subblockId: string,
8894
value: any,
8995
operationId: string | undefined,
9096
workflowId: string
91-
) => void
97+
) => boolean
9298
emitVariableUpdate: (
9399
variableId: string,
94100
field: string,
95101
value: any,
96102
operationId: string | undefined,
97103
workflowId: string
98-
) => void
104+
) => boolean
99105

100106
emitCursorUpdate: (cursor: { x: number; y: number } | null) => void
101107
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
@@ -126,9 +132,9 @@ const SocketContext = createContext<SocketContextType>({
126132
joinWorkflow: () => {},
127133
leaveWorkflow: () => {},
128134
retryConnection: () => {},
129-
emitWorkflowOperation: () => {},
130-
emitSubblockUpdate: () => {},
131-
emitVariableUpdate: () => {},
135+
emitWorkflowOperation: () => false,
136+
emitSubblockUpdate: () => false,
137+
emitVariableUpdate: () => false,
132138
emitCursorUpdate: () => {},
133139
emitSelectionUpdate: () => {},
134140
onWorkflowOperation: () => {},
@@ -519,6 +525,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
519525
setIsRetryingWorkflowJoin(false)
520526
setVisibleWorkflowId(workflowId)
521527
setPresenceUsers(presenceUsers || [])
528+
// A successful join is always followed by a workflow-state push that
529+
// rehydrates local stores from server truth, so a previously tripped
530+
// offline mode is safe to clear here.
531+
useOperationQueueStore.getState().clearError()
522532
logger.info(`Successfully joined workflow room: ${workflowId}`, {
523533
presenceCount: presenceUsers?.length || 0,
524534
})
@@ -847,7 +857,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
847857
}, [authFailed])
848858

849859
const emitWorkflowOperation = useCallback(
850-
(workflowId: string, operation: string, target: string, payload: any, operationId?: string) => {
860+
(
861+
workflowId: string,
862+
operation: string,
863+
target: string,
864+
payload: any,
865+
operationId?: string
866+
): boolean => {
851867
if (
852868
!socket ||
853869
!currentWorkflowId ||
@@ -861,7 +877,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
861877
operation,
862878
target,
863879
})
864-
return
880+
return false
865881
}
866882

867883
const isPositionUpdate = operation === 'update-position' && target === 'block'
@@ -885,7 +901,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
885901
clearTimeout(timeoutId)
886902
positionUpdateTimeouts.current.delete(blockId)
887903
}
888-
return
904+
return true
889905
}
890906

891907
pendingPositionUpdates.current.set(blockId, {
@@ -909,16 +925,18 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
909925

910926
positionUpdateTimeouts.current.set(blockId, timeoutId)
911927
}
912-
} else {
913-
socket.emit('workflow-operation', {
914-
workflowId,
915-
operation,
916-
target,
917-
payload,
918-
timestamp: Date.now(),
919-
operationId,
920-
})
928+
return true
921929
}
930+
931+
socket.emit('workflow-operation', {
932+
workflowId,
933+
operation,
934+
target,
935+
payload,
936+
timestamp: Date.now(),
937+
operationId,
938+
})
939+
return true
922940
},
923941
[socket, currentWorkflowId, isWorkflowVisible]
924942
)
@@ -930,7 +948,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
930948
value: any,
931949
operationId: string | undefined,
932950
workflowId: string
933-
) => {
951+
): boolean => {
934952
if (
935953
!socket ||
936954
workflowId !== currentWorkflowIdRef.current ||
@@ -949,7 +967,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
949967
reason,
950968
currentWorkflowId: currentWorkflowIdRef.current,
951969
})
952-
return
970+
return false
953971
}
954972
socket.emit('subblock-update', {
955973
workflowId,
@@ -959,6 +977,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
959977
timestamp: Date.now(),
960978
operationId,
961979
})
980+
return true
962981
},
963982
[socket]
964983
)
@@ -970,7 +989,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
970989
value: any,
971990
operationId: string | undefined,
972991
workflowId: string
973-
) => {
992+
): boolean => {
974993
if (
975994
!socket ||
976995
workflowId !== currentWorkflowIdRef.current ||
@@ -989,7 +1008,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
9891008
reason,
9901009
currentWorkflowId: currentWorkflowIdRef.current,
9911010
})
992-
return
1011+
return false
9931012
}
9941013
socket.emit('variable-update', {
9951014
workflowId,
@@ -999,6 +1018,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
9991018
timestamp: Date.now(),
10001019
operationId,
10011020
})
1021+
return true
10021022
},
10031023
[socket]
10041024
)

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ export function useCollaborativeWorkflow() {
378378
}
379379
break
380380
case VARIABLE_OPERATIONS.REMOVE:
381+
useOperationQueueStore.getState().cancelOperationsForVariable(payload.variableId)
381382
useVariablesStore.getState().deleteVariable(payload.variableId)
382383
break
383384
}
@@ -443,6 +444,8 @@ export function useCollaborativeWorkflow() {
443444
})
444445

445446
if (ids && ids.length > 0) {
447+
const operationQueue = useOperationQueueStore.getState()
448+
ids.forEach((id: string) => operationQueue.cancelOperationsForBlock(id))
446449
useWorkflowStore.getState().batchRemoveBlocks(ids)
447450
}
448451

0 commit comments

Comments
 (0)