Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comments
  • Loading branch information
icecrasher321 committed Apr 11, 2026
commit e5f55af465b3dcb76667f17b95a7b23aaf0f3930
Original file line number Diff line number Diff line change
Expand Up @@ -59,40 +59,61 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
const hasOperationError = useOperationQueueStore((state) => state.hasOperationError)
const addNotification = useNotificationStore((state) => state.addNotification)
const removeNotification = useNotificationStore((state) => state.removeNotification)
const { isReconnecting } = useSocket()
const reconnectingNotificationIdRef = useRef<string | null>(null)
const { isReconnecting, isRetryingWorkflowJoin } = useSocket()
const realtimeStatusNotificationIdRef = useRef<string | null>(null)
const realtimeStatusNotificationMessageRef = useRef<string | null>(null)

const isOfflineMode = hasOperationError
const realtimeStatusMessage = isReconnecting
? 'Reconnecting...'
: isRetryingWorkflowJoin
? 'Joining workflow...'
: null

const clearRealtimeStatusNotification = useCallback(() => {
if (!realtimeStatusNotificationIdRef.current) {
return
}

removeNotification(realtimeStatusNotificationIdRef.current)
realtimeStatusNotificationIdRef.current = null
realtimeStatusNotificationMessageRef.current = null
}, [removeNotification])

useEffect(() => {
if (isReconnecting && !reconnectingNotificationIdRef.current && !isOfflineMode) {
const id = addNotification({
level: 'error',
message: 'Reconnecting...',
})
reconnectingNotificationIdRef.current = id
} else if (!isReconnecting && reconnectingNotificationIdRef.current) {
removeNotification(reconnectingNotificationIdRef.current)
reconnectingNotificationIdRef.current = null
if (isOfflineMode || !realtimeStatusMessage) {
clearRealtimeStatusNotification()
return
}

return () => {
if (reconnectingNotificationIdRef.current) {
removeNotification(reconnectingNotificationIdRef.current)
reconnectingNotificationIdRef.current = null
}
if (
realtimeStatusNotificationIdRef.current &&
realtimeStatusNotificationMessageRef.current === realtimeStatusMessage
) {
return
}
}, [isReconnecting, isOfflineMode, addNotification, removeNotification])

clearRealtimeStatusNotification()

const id = addNotification({
level: 'error',
message: realtimeStatusMessage,
})

realtimeStatusNotificationIdRef.current = id
realtimeStatusNotificationMessageRef.current = realtimeStatusMessage
}, [addNotification, clearRealtimeStatusNotification, isOfflineMode, realtimeStatusMessage])

useEffect(() => {
return clearRealtimeStatusNotification
}, [clearRealtimeStatusNotification])

useEffect(() => {
if (!isOfflineMode || hasShownOfflineNotification) {
return
}

if (reconnectingNotificationIdRef.current) {
removeNotification(reconnectingNotificationIdRef.current)
reconnectingNotificationIdRef.current = null
}
clearRealtimeStatusNotification()

try {
addNotification({
Expand All @@ -107,7 +128,7 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
} catch (error) {
logger.error('Failed to add offline notification', { error })
}
}, [addNotification, removeNotification, hasShownOfflineNotification, isOfflineMode])
}, [addNotification, clearRealtimeStatusNotification, hasShownOfflineNotification, isOfflineMode])

const {
data: workspacePermissions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ describe('SocketJoinController', () => {

expect(errorResult.apply).toBe(false)
expect(errorResult.retryScheduled).toBe(true)
expect(errorResult.retriesExhausted).toBe(false)
expect(errorResult.commands).toEqual([
{
type: 'schedule-retry',
Expand Down
48 changes: 15 additions & 33 deletions apps/sim/app/workspace/providers/socket-join-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,22 @@ export type SocketJoinCommand =
delayMs: number
}

export interface SocketJoinSuccessResult {
interface SocketJoinSuccessResult {
apply: boolean
commands: SocketJoinCommand[]
ignored: boolean
workflowId: string
}

export interface SocketJoinErrorResult {
interface SocketJoinErrorResult {
apply: boolean
commands: SocketJoinCommand[]
ignored: boolean
retryScheduled: boolean
retriesExhausted: boolean
workflowId: string | null
}

export interface SocketJoinDeleteResult {
interface SocketJoinDeleteResult {
commands: SocketJoinCommand[]
shouldClearCurrent: boolean
}
Expand All @@ -45,14 +44,6 @@ export class SocketJoinController {
private retryAttempt = 0
private isConnected = false

getDesiredWorkflowId(): string | null {
return this.desiredWorkflowId
}

getPendingJoinWorkflowId(): string | null {
return this.pendingJoinWorkflowId
}

getJoinedWorkflowId(): string | null {
return this.joinedWorkflowId
}
Expand Down Expand Up @@ -171,20 +162,18 @@ export class SocketJoinController {
commands: [...baseCommands, ...this.flush()],
ignored: true,
retryScheduled: false,
retriesExhausted: false,
workflowId: resolvedWorkflowId,
}
}

if (retryable && resolvedWorkflowId) {
const retryResult = this.scheduleRetry(resolvedWorkflowId)
const commands = this.scheduleRetry(resolvedWorkflowId)

return {
apply: false,
commands: [...baseCommands, ...retryResult.commands],
commands: [...baseCommands, ...commands],
ignored: false,
retryScheduled: retryResult.retryScheduled,
retriesExhausted: false,
retryScheduled: true,
workflowId: resolvedWorkflowId,
}
}
Expand All @@ -196,7 +185,6 @@ export class SocketJoinController {
commands: [...this.clearRetryCommands(), ...leaveCommands, ...this.flush()],
ignored: false,
retryScheduled: false,
retriesExhausted: false,
workflowId: resolvedWorkflowId,
}
Comment thread
icecrasher321 marked this conversation as resolved.
}
Expand Down Expand Up @@ -240,10 +228,7 @@ export class SocketJoinController {
return [{ type: 'join', workflowId: this.desiredWorkflowId }]
}

private scheduleRetry(workflowId: string): {
commands: SocketJoinCommand[]
retryScheduled: boolean
} {
private scheduleRetry(workflowId: string): SocketJoinCommand[] {
const nextAttempt = this.retryWorkflowId === workflowId ? this.retryAttempt + 1 : 1
const delayMs = Math.min(
SOCKET_JOIN_RETRY_BASE_DELAY_MS * 2 ** Math.max(0, nextAttempt - 1),
Expand All @@ -253,17 +238,14 @@ export class SocketJoinController {
this.retryWorkflowId = workflowId
this.retryAttempt = nextAttempt

return {
commands: [
{
type: 'schedule-retry',
workflowId,
attempt: nextAttempt,
delayMs,
},
],
retryScheduled: true,
}
return [
{
type: 'schedule-retry',
workflowId,
attempt: nextAttempt,
delayMs,
},
]
}

private takeRetryResetCommands(nextWorkflowId?: string | null): SocketJoinCommand[] {
Expand Down
39 changes: 37 additions & 2 deletions apps/sim/app/workspace/providers/socket-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ interface SocketContextType {
isConnected: boolean
isConnecting: boolean
isReconnecting: boolean
isRetryingWorkflowJoin: boolean
authFailed: boolean
currentWorkflowId: string | null
currentSocketId: string | null
Expand Down Expand Up @@ -110,6 +111,7 @@ const SocketContext = createContext<SocketContextType>({
isConnected: false,
isConnecting: false,
isReconnecting: false,
isRetryingWorkflowJoin: false,
authFailed: false,
currentWorkflowId: null,
currentSocketId: null,
Expand Down Expand Up @@ -146,6 +148,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [isConnected, setIsConnected] = useState(false)
const [isConnecting, setIsConnecting] = useState(false)
const [isReconnecting, setIsReconnecting] = useState(false)
const [isRetryingWorkflowJoin, setIsRetryingWorkflowJoin] = useState(false)
const [currentWorkflowId, setCurrentWorkflowId] = useState<string | null>(null)
const [currentSocketId, setCurrentSocketId] = useState<string | null>(null)
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
Expand Down Expand Up @@ -236,10 +239,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
commands.forEach((command) => {
if (command.type === 'cancel-retry') {
clearJoinRetryTimeout()
setIsRetryingWorkflowJoin(false)
return
}

if (command.type === 'leave') {
setIsRetryingWorkflowJoin(false)
clearJoinedWorkflowState(true)

if (!socketInstance) {
Expand Down Expand Up @@ -278,6 +283,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}

clearJoinRetryTimeout()
setIsRetryingWorkflowJoin(true)
joinRetryTimeoutRef.current = setTimeout(() => {
joinRetryTimeoutRef.current = null
executeJoinCommands(joinControllerRef.current.retryJoin(command.workflowId))
Expand Down Expand Up @@ -376,6 +382,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('disconnect', (reason) => {
setIsConnected(false)
setIsConnecting(false)
setIsRetryingWorkflowJoin(false)
setCurrentSocketId(null)
executeJoinCommands(joinControllerRef.current.setConnected(false))
clearJoinedWorkflowState(false)
Expand Down Expand Up @@ -469,6 +476,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
if (result.ignored) {
logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`)
} else {
setIsRetryingWorkflowJoin(false)
setVisibleWorkflowId(workflowId)
setPresenceUsers(presenceUsers || [])
logger.info(`Successfully joined workflow room: ${workflowId}`, {
Expand All @@ -495,6 +503,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
code,
})
} else if (result.apply) {
setIsRetryingWorkflowJoin(false)
if (result.workflowId) {
useOperationQueueStore.getState().cancelOperationsForWorkflow(result.workflowId)
}
Expand Down Expand Up @@ -854,7 +863,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
workflowId !== currentWorkflowIdRef.current ||
!isWorkflowVisible(workflowId)
) {
logger.warn('Cannot emit subblock update: no socket connection', { workflowId, blockId })
const reason = !socket
? 'socket_unavailable'
: workflowId !== currentWorkflowIdRef.current
? 'joined_workflow_mismatch'
: 'workflow_not_visible'

logger.debug('Skipping subblock update emit', {
workflowId,
blockId,
subblockId,
reason,
currentWorkflowId: currentWorkflowIdRef.current,
})
return
Comment thread
icecrasher321 marked this conversation as resolved.
}
socket.emit('subblock-update', {
Expand Down Expand Up @@ -882,7 +903,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
workflowId !== currentWorkflowIdRef.current ||
!isWorkflowVisible(workflowId)
) {
logger.warn('Cannot emit variable update: no socket connection', { workflowId, variableId })
const reason = !socket
? 'socket_unavailable'
: workflowId !== currentWorkflowIdRef.current
? 'joined_workflow_mismatch'
: 'workflow_not_visible'

logger.debug('Skipping variable update emit', {
workflowId,
variableId,
field,
reason,
currentWorkflowId: currentWorkflowIdRef.current,
})
return
}
socket.emit('variable-update', {
Expand Down Expand Up @@ -975,6 +1008,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
isConnected,
isConnecting,
isReconnecting,
isRetryingWorkflowJoin,
authFailed,
currentWorkflowId,
currentSocketId,
Expand Down Expand Up @@ -1003,6 +1037,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
isConnected,
isConnecting,
isReconnecting,
isRetryingWorkflowJoin,
authFailed,
currentWorkflowId,
currentSocketId,
Expand Down
Loading