diff --git a/apps/realtime/src/handlers/operations.ts b/apps/realtime/src/handlers/operations.ts index 3fa4ac0f8e7..49d5bbcd0b2 100644 --- a/apps/realtime/src/handlers/operations.ts +++ b/apps/realtime/src/handlers/operations.ts @@ -15,7 +15,7 @@ import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { ZodError } from 'zod' import { persistWorkflowOperation } from '@/database/operations' import type { AuthenticatedSocket } from '@/middleware/auth' -import { checkRolePermission } from '@/middleware/permissions' +import { checkWorkflowOperationPermission } from '@/middleware/permissions' import type { IRoomManager, UserSession } from '@/rooms' const logger = createLogger('OperationsHandlers') @@ -125,11 +125,17 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() }) - // Check permissions using cached role (no DB query) - const permissionCheck = checkRolePermission(userPresence.role, operation) + // Re-validate the workspace role against the DB (cached per pod for a short + // window) so revoked or downgraded collaborators lose write access live. + const permissionCheck = await checkWorkflowOperationPermission( + session.userId, + workflowId, + operation, + userPresence.role + ) if (!permissionCheck.allowed) { logger.warn( - `User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}` + `User ${session.userId} (role: ${permissionCheck.role ?? 'none'}) forbidden from ${operation} on ${target}` ) emitOperationError({ type: 'INSUFFICIENT_PERMISSIONS', diff --git a/apps/realtime/src/handlers/subblocks.ts b/apps/realtime/src/handlers/subblocks.ts index 1ee35d3e722..0295aff458c 100644 --- a/apps/realtime/src/handlers/subblocks.ts +++ b/apps/realtime/src/handlers/subblocks.ts @@ -7,7 +7,7 @@ import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { isWorkflowBlockProtected } from '@sim/workflow-types/workflow' import { and, eq } from 'drizzle-orm' import type { AuthenticatedSocket } from '@/middleware/auth' -import { checkRolePermission } from '@/middleware/permissions' +import { checkWorkflowOperationPermission } from '@/middleware/permissions' import type { IRoomManager } from '@/rooms' const logger = createLogger('SubblocksHandlers') @@ -136,7 +136,12 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: return } - const permissionCheck = checkRolePermission(userPresence.role, SUBBLOCK_OPERATIONS.UPDATE) + const permissionCheck = await checkWorkflowOperationPermission( + session.userId, + workflowId, + SUBBLOCK_OPERATIONS.UPDATE, + userPresence.role + ) if (!permissionCheck.allowed) { socket.emit('operation-forbidden', { type: 'INSUFFICIENT_PERMISSIONS', diff --git a/apps/realtime/src/handlers/variables.ts b/apps/realtime/src/handlers/variables.ts index 41aeed3f83f..f9b1c1f0c68 100644 --- a/apps/realtime/src/handlers/variables.ts +++ b/apps/realtime/src/handlers/variables.ts @@ -6,7 +6,7 @@ import { getErrorMessage } from '@sim/utils/errors' import { assertWorkflowMutable, WorkflowLockedError } from '@sim/workflow-authz' import { eq } from 'drizzle-orm' import type { AuthenticatedSocket } from '@/middleware/auth' -import { checkRolePermission } from '@/middleware/permissions' +import { checkWorkflowOperationPermission } from '@/middleware/permissions' import type { IRoomManager } from '@/rooms' const logger = createLogger('VariablesHandlers') @@ -124,7 +124,12 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: return } - const permissionCheck = checkRolePermission(userPresence.role, VARIABLE_OPERATIONS.UPDATE) + const permissionCheck = await checkWorkflowOperationPermission( + session.userId, + workflowId, + VARIABLE_OPERATIONS.UPDATE, + userPresence.role + ) if (!permissionCheck.allowed) { socket.emit('operation-forbidden', { type: 'INSUFFICIENT_PERMISSIONS', diff --git a/apps/realtime/src/index.test.ts b/apps/realtime/src/index.test.ts index f2d6d8e6868..92ddc101c69 100644 --- a/apps/realtime/src/index.test.ts +++ b/apps/realtime/src/index.test.ts @@ -73,6 +73,10 @@ vi.mock('@/middleware/permissions', () => ({ checkRolePermission: vi.fn().mockReturnValue({ allowed: true, }), + checkWorkflowOperationPermission: vi.fn().mockResolvedValue({ + allowed: true, + role: 'admin', + }), })) vi.mock('@/database/operations', () => ({ diff --git a/apps/realtime/src/middleware/permissions.test.ts b/apps/realtime/src/middleware/permissions.test.ts index 0aa9cada905..554ba8355fd 100644 --- a/apps/realtime/src/middleware/permissions.test.ts +++ b/apps/realtime/src/middleware/permissions.test.ts @@ -13,8 +13,17 @@ import { ROLE_ALLOWED_OPERATIONS, SOCKET_OPERATIONS, } from '@sim/testing' -import { describe, expect, it } from 'vitest' -import { checkRolePermission } from '@/middleware/permissions' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { mockAuthorize } = vi.hoisted(() => ({ + mockAuthorize: vi.fn(), +})) + +vi.mock('@sim/workflow-authz', () => ({ + authorizeWorkflowByWorkspacePermission: mockAuthorize, +})) + +import { checkRolePermission, checkWorkflowOperationPermission } from '@/middleware/permissions' describe('checkRolePermission', () => { describe('admin role', () => { @@ -279,3 +288,129 @@ describe('checkRolePermission', () => { }) }) }) + +describe('checkWorkflowOperationPermission', () => { + const userId = 'user-1' + let workflowCounter = 0 + let workflowId: string + + beforeEach(() => { + vi.clearAllMocks() + // Unique workflowId per test so the module-level role cache never leaks across tests + workflowCounter += 1 + workflowId = `wf-${workflowCounter}` + }) + + it('allows a write operation when the user still has write access', async () => { + mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'write' }) + + const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read') + + expect(result.allowed).toBe(true) + expect(result.role).toBe('write') + }) + + it('denies all writes once workspace access has been revoked', async () => { + mockAuthorize.mockResolvedValue({ allowed: false, workspacePermission: null }) + + const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'write') + + expect(result.allowed).toBe(false) + expect(result.role).toBeNull() + expect(result.reason).toMatch(/revoked/i) + }) + + it('denies writes after a downgrade to read but still allows position updates', async () => { + mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'read' }) + + const denied = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'write') + expect(denied.allowed).toBe(false) + expect(denied.role).toBe('read') + + const allowed = await checkWorkflowOperationPermission( + userId, + workflowId, + 'update-position', + 'write' + ) + expect(allowed.allowed).toBe(true) + expect(allowed.role).toBe('read') + }) + + it('caches the role within the TTL to avoid a DB read on every operation', async () => { + mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'write' }) + + await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read') + await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read') + + expect(mockAuthorize).toHaveBeenCalledTimes(1) + }) + + it('re-reads the role after the cache TTL expires', async () => { + vi.useFakeTimers() + try { + mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'write' }) + await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read') + + // Downgraded to read after the first check + mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'read' }) + vi.advanceTimersByTime(31_000) + + const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'write') + expect(mockAuthorize).toHaveBeenCalledTimes(2) + expect(result.allowed).toBe(false) + expect(result.role).toBe('read') + } finally { + vi.useRealTimers() + } + }) + + it('falls back to the join-time role on a transient DB error when nothing is cached yet', async () => { + mockAuthorize.mockRejectedValue(new Error('db unavailable')) + + const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'write') + + expect(result.allowed).toBe(true) + expect(result.role).toBe('write') + }) + + it('preserves a recorded revocation through a later transient DB error', async () => { + vi.useFakeTimers() + try { + // First check records the revocation (null) in the cache + mockAuthorize.mockResolvedValue({ allowed: false, workspacePermission: null }) + const first = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'admin') + expect(first.allowed).toBe(false) + expect(first.role).toBeNull() + + // TTL expires, then the DB blips on the next re-validation. The stale join-time + // role ('admin') must NOT resurrect access — the recorded revocation wins. + vi.advanceTimersByTime(31_000) + mockAuthorize.mockRejectedValue(new Error('db unavailable')) + + const second = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'admin') + expect(second.allowed).toBe(false) + expect(second.role).toBeNull() + } finally { + vi.useRealTimers() + } + }) + + it('uses the last cached role (not the join-time role) on a transient DB error', async () => { + vi.useFakeTimers() + try { + mockAuthorize.mockResolvedValue({ allowed: true, workspacePermission: 'write' }) + await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read') + + vi.advanceTimersByTime(31_000) + mockAuthorize.mockRejectedValue(new Error('db unavailable')) + + // fallbackRole is 'read', but the last recorded decision was 'write' — use that + const result = await checkWorkflowOperationPermission(userId, workflowId, 'update', 'read') + expect(result.allowed).toBe(true) + expect(result.role).toBe('write') + } finally { + vi.useRealTimers() + } + }) +}) diff --git a/apps/realtime/src/middleware/permissions.ts b/apps/realtime/src/middleware/permissions.ts index 26a4d8c8542..23069ff51de 100644 --- a/apps/realtime/src/middleware/permissions.ts +++ b/apps/realtime/src/middleware/permissions.ts @@ -83,6 +83,111 @@ export function checkRolePermission( return { allowed: true } } +/** + * TTL for the per-pod role cache backing live re-validation of mutating operations. + * Bounds how long a revoked or downgraded collaborator can retain write access on an + * already-connected socket. + */ +const ROLE_REVALIDATION_TTL_MS = 30_000 + +/** Soft cap on cached entries before an opportunistic purge of expired ones runs. */ +const MAX_ROLE_CACHE_ENTRIES = 5_000 + +interface CachedRole { + /** Authoritative workspace role, or `null` when the user has no access. */ + role: string | null + expiresAt: number +} + +/** + * Per-pod cache of authoritative workspace roles, keyed by `${userId}:${workflowId}`. + * + * Socket connections are sticky to a single pod, so a socket's mutating operations are + * always gated by the same pod's cache. We rely on TTL expiry (not cross-pod + * invalidation) to bound stale authorization to {@link ROLE_REVALIDATION_TTL_MS}, which + * keeps this correct under a multi-pod deployment without any shared state. + */ +const roleCache = new Map() + +function purgeExpiredRoles(now: number): void { + for (const [key, entry] of roleCache) { + if (entry.expiresAt <= now) { + roleCache.delete(key) + } + } +} + +/** + * Resolves a user's current workspace role for a workflow, re-reading the `permissions` + * table at most once per {@link ROLE_REVALIDATION_TTL_MS} per pod. + * + * Returns `null` when the user genuinely has no access (removed/revoked). On a transient + * DB failure it reuses the last recorded decision for this (user, workflow) — including a + * previously recorded revocation (`null`) — and only falls back to `fallbackRole` when no + * decision has been recorded yet, so a blip neither blocks legitimate editors nor + * resurrects already-revoked access. + */ +export async function resolveCurrentWorkflowRole( + userId: string, + workflowId: string, + fallbackRole: string +): Promise { + const now = Date.now() + const key = `${userId}:${workflowId}` + const cached = roleCache.get(key) + if (cached && cached.expiresAt > now) { + return cached.role + } + + try { + const authorization = await authorizeWorkflowByWorkspacePermission({ + workflowId, + userId, + action: 'read', + }) + const role = authorization.allowed ? (authorization.workspacePermission ?? null) : null + if (roleCache.size >= MAX_ROLE_CACHE_ENTRIES) { + purgeExpiredRoles(now) + } + roleCache.set(key, { role, expiresAt: now + ROLE_REVALIDATION_TTL_MS }) + return role + } catch (error) { + logger.warn( + `Failed to re-validate role for user ${userId} on workflow ${workflowId}; using last known role`, + error + ) + // Prefer the last recorded decision — even if expired, and even if it is `null` for an + // already-revoked user — so a recorded revocation survives a transient DB failure + // instead of reverting to the stale join-time role. Only trust `fallbackRole` when + // nothing has been recorded for this (user, workflow) yet. + const lastKnown = roleCache.get(key) + return lastKnown !== undefined ? lastKnown.role : fallbackRole + } +} + +/** + * Live permission gate for mutating socket operations. Re-validates the user's workspace + * role against the database (cached per pod for {@link ROLE_REVALIDATION_TTL_MS}) so that + * revoked or downgraded collaborators lose write access on an open connection without + * needing to rejoin the workflow. + */ +export async function checkWorkflowOperationPermission( + userId: string, + workflowId: string, + operation: string, + fallbackRole: string +): Promise<{ allowed: boolean; reason?: string; role: string | null }> { + const role = await resolveCurrentWorkflowRole(userId, workflowId, fallbackRole) + if (!role) { + return { + allowed: false, + reason: 'Access to this workflow has been revoked', + role: null, + } + } + return { ...checkRolePermission(role, operation), role } +} + /** * Verifies a user's access to a workflow via workspace permissions. *