From 5357b823149ccd22655fcc89abe3230fb9c3f4ac Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 11 Jun 2026 12:04:43 -0700 Subject: [PATCH 1/2] fix(db-part-1): eliminate pool self-deadlock from nested checkouts inside transactions --- apps/realtime/src/database/operations.ts | 25 +- .../api/credential-sets/[id]/members/route.ts | 20 +- .../credential-sets/invite/[token]/route.ts | 16 +- .../api/credential-sets/memberships/route.ts | 18 +- .../v1/admin/workspaces/[id]/members/route.ts | 8 +- .../api/workspaces/[id]/permissions/route.ts | 9 +- apps/sim/lib/billing/core/usage-log.ts | 46 ++-- apps/sim/lib/invitations/core.test.ts | 6 + apps/sim/lib/invitations/core.ts | 16 +- apps/sim/lib/knowledge/chunks/service.ts | 215 ++++++++++-------- apps/sim/lib/knowledge/service.test.ts | 10 +- apps/sim/lib/knowledge/service.ts | 27 ++- apps/sim/lib/logs/execution/logger.test.ts | 4 + apps/sim/lib/logs/execution/logger.ts | 12 +- apps/sim/lib/mcp/workflow-mcp-sync.ts | 43 ++-- apps/sim/lib/permission-groups/auto-add.ts | 11 +- apps/sim/lib/table/service.ts | 3 +- apps/sim/lib/table/validation.ts | 9 +- apps/sim/lib/webhooks/deploy.ts | 14 +- apps/sim/lib/webhooks/utils.server.ts | 25 +- .../sim/lib/workflows/orchestration/deploy.ts | 7 +- .../workflows/persistence/duplicate.test.ts | 19 +- .../lib/workflows/persistence/duplicate.ts | 67 ++++-- .../lib/workflows/persistence/utils.test.ts | 1 + apps/sim/lib/workflows/persistence/utils.ts | 41 +++- apps/sim/lib/workflows/utils.ts | 10 +- bun.lock | 1 + packages/db/db.ts | 10 +- packages/db/index.ts | 1 + packages/db/package.json | 1 + packages/db/tx-tripwire.test.ts | 186 +++++++++++++++ packages/db/tx-tripwire.ts | 167 ++++++++++++++ packages/db/vitest.config.ts | 2 +- packages/testing/src/mocks/database.mock.ts | 4 + 34 files changed, 813 insertions(+), 241 deletions(-) create mode 100644 packages/db/tx-tripwire.test.ts create mode 100644 packages/db/tx-tripwire.ts diff --git a/apps/realtime/src/database/operations.ts b/apps/realtime/src/database/operations.ts index 8e301acf9bc..c5474a5f6f8 100644 --- a/apps/realtime/src/database/operations.ts +++ b/apps/realtime/src/database/operations.ts @@ -1,6 +1,12 @@ import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' import * as schema from '@sim/db' -import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '@sim/db' +import { + instrumentPoolClient, + workflow, + workflowBlocks, + workflowEdges, + workflowSubflows, +} from '@sim/db' import { createLogger } from '@sim/logger' import { BLOCK_OPERATIONS, @@ -27,13 +33,16 @@ const logger = createLogger('SocketDatabase') const connectionString = env.DATABASE_URL const socketDb = drizzle( - postgres(connectionString, { - prepare: false, - idle_timeout: 10, - connect_timeout: 20, - max: 15, - onnotice: () => {}, - }), + instrumentPoolClient( + postgres(connectionString, { + prepare: false, + idle_timeout: 10, + connect_timeout: 20, + max: 15, + onnotice: () => {}, + }), + 'socketDb' + ), { schema } ) diff --git a/apps/sim/app/api/credential-sets/[id]/members/route.ts b/apps/sim/app/api/credential-sets/[id]/members/route.ts index 9b94debf958..b49aadcfae0 100644 --- a/apps/sim/app/api/credential-sets/[id]/members/route.ts +++ b/apps/sim/app/api/credential-sets/[id]/members/route.ts @@ -185,16 +185,24 @@ export const DELETE = withRouteHandler( const requestId = generateId().slice(0, 8) - // Use transaction to ensure member deletion + webhook sync are atomic - await db.transaction(async (tx) => { - await tx.delete(credentialSetMember).where(eq(credentialSetMember.id, memberId)) - - const syncResult = await syncAllWebhooksForCredentialSet(id, requestId, tx) + await db.delete(credentialSetMember).where(eq(credentialSetMember.id, memberId)) + + // Runs after the deletion commits: the sync performs external HTTP + // (OAuth refresh, provider unsubscribe) and must not hold a pooled + // connection. A sync failure must not fail the committed mutation — + // it self-heals on the next membership change/deploy. + try { + const syncResult = await syncAllWebhooksForCredentialSet(id, requestId) logger.info('Synced webhooks after member removed', { credentialSetId: id, ...syncResult, }) - }) + } catch (syncError) { + logger.error('Webhook sync failed after member removal', { + credentialSetId: id, + error: syncError, + }) + } logger.info('Removed member from credential set', { credentialSetId: id, diff --git a/apps/sim/app/api/credential-sets/invite/[token]/route.ts b/apps/sim/app/api/credential-sets/invite/[token]/route.ts index 43bd6df4e38..efaff3de3af 100644 --- a/apps/sim/app/api/credential-sets/invite/[token]/route.ts +++ b/apps/sim/app/api/credential-sets/invite/[token]/route.ts @@ -194,17 +194,27 @@ export const POST = withRouteHandler( ) ) } + }) + // Runs after the membership commits: the sync performs external HTTP + // (OAuth refresh, provider unsubscribe) and must not hold a pooled + // connection. A sync failure must not fail the committed mutation — + // it self-heals on the next membership change/deploy. + try { const syncResult = await syncAllWebhooksForCredentialSet( invitation.credentialSetId, - requestId, - tx + requestId ) logger.info('Synced webhooks after member joined', { credentialSetId: invitation.credentialSetId, ...syncResult, }) - }) + } catch (syncError) { + logger.error('Webhook sync failed after invitation accept', { + credentialSetId: invitation.credentialSetId, + error: syncError, + }) + } logger.info('Accepted credential set invitation', { invitationId: invitation.id, diff --git a/apps/sim/app/api/credential-sets/memberships/route.ts b/apps/sim/app/api/credential-sets/memberships/route.ts index ee3a5ec3e41..8ce556f5b5d 100644 --- a/apps/sim/app/api/credential-sets/memberships/route.ts +++ b/apps/sim/app/api/credential-sets/memberships/route.ts @@ -74,7 +74,6 @@ export const DELETE = withRouteHandler(async (req: NextRequest) => { try { const requestId = generateId().slice(0, 8) - // Use transaction to ensure revocation + webhook sync are atomic await db.transaction(async (tx) => { // Find and verify membership const [membership] = await tx @@ -104,15 +103,26 @@ export const DELETE = withRouteHandler(async (req: NextRequest) => { updatedAt: new Date(), }) .where(eq(credentialSetMember.id, membership.id)) + }) - // Sync webhooks to remove this user's credential webhooks - const syncResult = await syncAllWebhooksForCredentialSet(credentialSetId, requestId, tx) + // Runs after the revocation commits: the sync performs external HTTP + // (OAuth refresh, provider unsubscribe) and must not hold a pooled + // connection. A sync failure must not fail the committed mutation — + // it self-heals on the next membership change/deploy. + try { + const syncResult = await syncAllWebhooksForCredentialSet(credentialSetId, requestId) logger.info('Synced webhooks after member left', { credentialSetId, userId: session.user.id, ...syncResult, }) - }) + } catch (syncError) { + logger.error('Webhook sync failed after member left', { + credentialSetId, + userId: session.user.id, + error: syncError, + }) + } logger.info('User left credential set', { credentialSetId, diff --git a/apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts b/apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts index 9cf5c2e3866..b6a4387a11f 100644 --- a/apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts +++ b/apps/sim/app/api/v1/admin/workspaces/[id]/members/route.ts @@ -47,6 +47,7 @@ import { adminV1ListWorkspaceMembersContract, } from '@/lib/api/contracts/v1/admin' import { parseRequest } from '@/lib/api/server' +import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { revokeWorkspaceCredentialMembershipsTx } from '@/lib/credentials/access' import { syncWorkspaceEnvCredentials } from '@/lib/credentials/environment' @@ -247,7 +248,12 @@ export const POST = withRouteHandler( updatedAt: now, }) - await applyWorkspaceAutoAddGroup(db, workspaceId, userId) + await applyWorkspaceAutoAddGroup( + db, + workspaceId, + userId, + await isWorkspaceOnEnterprisePlan(workspaceId) + ) logger.info(`Admin API: Added user ${userId} to workspace ${workspaceId}`, { permissions: permissionLevel, diff --git a/apps/sim/app/api/workspaces/[id]/permissions/route.ts b/apps/sim/app/api/workspaces/[id]/permissions/route.ts index c2452fdbfde..bda0bbf81a0 100644 --- a/apps/sim/app/api/workspaces/[id]/permissions/route.ts +++ b/apps/sim/app/api/workspaces/[id]/permissions/route.ts @@ -8,6 +8,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { updateWorkspacePermissionsContract } from '@/lib/api/contracts/workspaces' import { parseRequest } from '@/lib/api/server' import { getSession } from '@/lib/auth' +import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { syncWorkspaceEnvCredentials } from '@/lib/credentials/environment' import { applyWorkspaceAutoAddGroup } from '@/lib/permission-groups/auto-add' @@ -159,6 +160,12 @@ export const PATCH = withRouteHandler( existingPerms.map((p) => [p.userId, { permission: p.permissionType, email: p.email }]) ) + // Resolved before the transaction: the entitlement check reads billing + // tables on the global pool and must not run while the tx holds a + // pooled connection. + const hasNewMembers = body.updates.some((update) => !permLookup.has(update.userId)) + const autoAddEntitled = hasNewMembers ? await isWorkspaceOnEnterprisePlan(workspaceId) : false + await db.transaction(async (tx) => { for (const update of body.updates) { const isNew = !permLookup.has(update.userId) @@ -184,7 +191,7 @@ export const PATCH = withRouteHandler( }) if (isNew) { - await applyWorkspaceAutoAddGroup(tx, workspaceId, update.userId) + await applyWorkspaceAutoAddGroup(tx, workspaceId, update.userId, autoAddEntitled) } } }) diff --git a/apps/sim/lib/billing/core/usage-log.ts b/apps/sim/lib/billing/core/usage-log.ts index 147e0406146..985713bebcd 100644 --- a/apps/sim/lib/billing/core/usage-log.ts +++ b/apps/sim/lib/billing/core/usage-log.ts @@ -77,11 +77,7 @@ interface UsageEntry { metadata?: UsageLogMetadata } -/** - * Parameters for the central recordUsage function. - * This is the single entry point for all billing mutations. - */ -export interface RecordUsageParams { +interface RecordUsageBaseParams { /** The user being charged */ userId: string /** One or more usage_log entries to record. Total cost is derived from these. */ @@ -92,19 +88,37 @@ export interface RecordUsageParams { workflowId?: string /** Execution context */ executionId?: string - /** Billing entity scope, resolved by caller when already known. */ - billingEntity?: BillingEntity - /** Billing period bounds, resolved by caller when already known. */ - billingPeriod?: { start: Date; end: Date } - /** - * Optional transaction to run the ledger INSERT in. Callers that reconcile a - * read-then-insert under a lock (e.g. the per-execution advisory lock in the - * workflow completion path) pass their tx so the insert participates in the - * same locked transaction. Defaults to the pooled db. - */ - tx?: DbOrTx } +/** + * Parameters for the central recordUsage function. + * This is the single entry point for all billing mutations. + * + * Callers that pass `tx` (e.g. the per-execution advisory-lock reconciliation + * in the workflow completion path) must pre-resolve the billing context before + * opening the transaction: resolving it inside would run the subscription + * lookups on the global pool while the tx already holds a pooled connection, + * starving the pool under load (see recordCumulativeUsage for the history). + */ +export type RecordUsageParams = RecordUsageBaseParams & + ( + | { + /** Transaction the ledger INSERT participates in. */ + tx: DbOrTx + /** Billing entity scope, resolved before the transaction opened. */ + billingEntity: BillingEntity + /** Billing period bounds, resolved before the transaction opened. */ + billingPeriod: { start: Date; end: Date } + } + | { + tx?: undefined + /** Billing entity scope, resolved by caller when already known. */ + billingEntity?: BillingEntity + /** Billing period bounds, resolved by caller when already known. */ + billingPeriod?: { start: Date; end: Date } + } + ) + export function stableEventKey(parts: Record): string { const payload = Object.keys(parts) .sort() diff --git a/apps/sim/lib/invitations/core.test.ts b/apps/sim/lib/invitations/core.test.ts index a71d9773f3c..6c81664db2e 100644 --- a/apps/sim/lib/invitations/core.test.ts +++ b/apps/sim/lib/invitations/core.test.ts @@ -15,6 +15,7 @@ const { mockSyncUsageLimitsFromSubscription, mockSyncWorkspaceEnvCredentials, mockApplyWorkspaceAutoAddGroup, + mockIsWorkspaceOnEnterprisePlan, mockFeatureFlags, } = vi.hoisted(() => ({ mockEnsureUserInOrganization: vi.fn(), @@ -27,6 +28,7 @@ const { mockSyncUsageLimitsFromSubscription: vi.fn(), mockSyncWorkspaceEnvCredentials: vi.fn(), mockApplyWorkspaceAutoAddGroup: vi.fn(), + mockIsWorkspaceOnEnterprisePlan: vi.fn(async () => true), mockFeatureFlags: { isBillingEnabled: true }, })) @@ -60,6 +62,10 @@ vi.mock('@/lib/auth/active-organization', () => ({ setActiveOrganizationForCurrentSession: mockSetActiveOrganizationForCurrentSession, })) +vi.mock('@/lib/billing/core/subscription', () => ({ + isWorkspaceOnEnterprisePlan: mockIsWorkspaceOnEnterprisePlan, +})) + vi.mock('@/lib/billing/core/usage', () => ({ syncUsageLimitsFromSubscription: mockSyncUsageLimitsFromSubscription, })) diff --git a/apps/sim/lib/invitations/core.ts b/apps/sim/lib/invitations/core.ts index f1dabbfef5d..979eb11cf53 100644 --- a/apps/sim/lib/invitations/core.ts +++ b/apps/sim/lib/invitations/core.ts @@ -17,6 +17,7 @@ import { createLogger } from '@sim/logger' import { generateId } from '@sim/utils/id' import { and, eq, inArray, lte } from 'drizzle-orm' import { setActiveOrganizationForCurrentSession } from '@/lib/auth/active-organization' +import { isWorkspaceOnEnterprisePlan } from '@/lib/billing/core/subscription' import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage' import { acquireOrgMembershipLock, @@ -433,6 +434,14 @@ export async function acceptInvitation( const acceptedWorkspaceIds: string[] = [] + // Resolved before the transaction: the entitlement check reads billing + // tables on the global pool, which must not run while the tx below holds a + // pooled connection and the org-membership advisory lock. + const autoAddEntitlementByWorkspace = new Map() + for (const workspaceId of new Set(inv.grants.map((grant) => grant.workspaceId))) { + autoAddEntitlementByWorkspace.set(workspaceId, await isWorkspaceOnEnterprisePlan(workspaceId)) + } + try { await db.transaction(async (tx) => { /** @@ -502,7 +511,12 @@ export async function acceptInvitation( }) } - await applyWorkspaceAutoAddGroup(tx, grant.workspaceId, input.userId) + await applyWorkspaceAutoAddGroup( + tx, + grant.workspaceId, + input.userId, + autoAddEntitlementByWorkspace.get(grant.workspaceId) ?? false + ) acceptedWorkspaceIds.push(grant.workspaceId) } diff --git a/apps/sim/lib/knowledge/chunks/service.ts b/apps/sim/lib/knowledge/chunks/service.ts index ab5c307397b..fe34a40b543 100644 --- a/apps/sim/lib/knowledge/chunks/service.ts +++ b/apps/sim/lib/knowledge/chunks/service.ts @@ -336,135 +336,152 @@ export async function updateChunk( requestId: string, workspaceId?: string | null ): Promise { - const dbUpdateData: { - updatedAt: Date - content?: string - contentLength?: number - tokenCount?: number - chunkHash?: string - embedding?: number[] - enabled?: boolean - } = { - updatedAt: new Date(), - } - - // Use transaction if content is being updated to ensure consistent document statistics + // Content updates run in a transaction to keep document statistics + // consistent. The embedding API call happens BEFORE the transaction opens so + // a held pooled connection never waits on external I/O; the transaction then + // re-reads the chunk under a row lock and retries the whole flow in the rare + // case a concurrent edit invalidated the regeneration decision. if (updateData.content !== undefined && typeof updateData.content === 'string') { - return await db.transaction(async (tx) => { - // Get current chunk data for character count calculation and content comparison - const currentChunk = await tx - .select({ - documentId: embedding.documentId, - content: embedding.content, - contentLength: embedding.contentLength, - tokenCount: embedding.tokenCount, - }) + const content = updateData.content + const MAX_UPDATE_ATTEMPTS = 3 + + for (let attempt = 1; attempt <= MAX_UPDATE_ATTEMPTS; attempt++) { + const [preRead] = await db + .select({ documentId: embedding.documentId, content: embedding.content }) .from(embedding) .where(eq(embedding.id, chunkId)) .limit(1) - if (currentChunk.length === 0) { + if (!preRead) { throw new Error(`Chunk ${chunkId} not found`) } - const oldContentLength = currentChunk[0].contentLength - const oldTokenCount = currentChunk[0].tokenCount - const content = updateData.content! // We know it's defined from the if check above - const newContentLength = content.length - - // Only regenerate embedding if content actually changed - if (content !== currentChunk[0].content) { - logger.info(`[${requestId}] Content changed, regenerating embedding for chunk ${chunkId}`) - - const kbRow = await tx + // The embedding is a function of the new content alone, so generating it + // outside the transaction is always valid. + let regenerated: { embedding: number[]; tokenCount: number } | null = null + if (content !== preRead.content) { + const kbRow = await db .select({ embeddingModel: knowledgeBase.embeddingModel }) .from(knowledgeBase) .innerJoin(document, eq(document.knowledgeBaseId, knowledgeBase.id)) - .where(eq(document.id, currentChunk[0].documentId)) + .where(eq(document.id, preRead.documentId)) .limit(1) const chunkEmbeddingModel = kbRow[0]?.embeddingModel if (!chunkEmbeddingModel) { throw new Error('Knowledge base for chunk not found') } + + logger.info(`[${requestId}] Content changed, regenerating embedding for chunk ${chunkId}`) const { embeddings } = await generateEmbeddings([content], chunkEmbeddingModel, workspaceId) + regenerated = { + embedding: embeddings[0], + tokenCount: estimateTokenCount( + content, + getEmbeddingModelInfo(chunkEmbeddingModel).tokenizerProvider + ).count, + } + } + + const result = await db.transaction(async (tx) => { + const currentChunk = await tx + .select({ + documentId: embedding.documentId, + content: embedding.content, + contentLength: embedding.contentLength, + tokenCount: embedding.tokenCount, + }) + .from(embedding) + .where(eq(embedding.id, chunkId)) + .limit(1) + .for('update') + + if (currentChunk.length === 0) { + throw new Error(`Chunk ${chunkId} not found`) + } - const tokenCount = estimateTokenCount( + // A concurrent edit landed between the pre-read and this row lock and + // we skipped regeneration based on stale content; retry so the + // decision is re-made against the committed content. + if (!regenerated && currentChunk[0].content !== content) { + return null + } + + const oldContentLength = currentChunk[0].contentLength + const oldTokenCount = currentChunk[0].tokenCount + const newContentLength = content.length + + const chunkUpdate = { + updatedAt: new Date(), content, - getEmbeddingModelInfo(chunkEmbeddingModel).tokenizerProvider - ) + contentLength: newContentLength, + chunkHash: sha256Hex(content), + tokenCount: regenerated ? regenerated.tokenCount : oldTokenCount, + ...(regenerated ? { embedding: regenerated.embedding } : {}), + ...(updateData.enabled !== undefined ? { enabled: updateData.enabled } : {}), + } - dbUpdateData.content = content - dbUpdateData.contentLength = newContentLength - dbUpdateData.tokenCount = tokenCount.count - dbUpdateData.chunkHash = sha256Hex(content) - // Add the embedding field to the update data - dbUpdateData.embedding = embeddings[0] - } else { - // Content hasn't changed, just update other fields if needed - dbUpdateData.content = content - dbUpdateData.contentLength = newContentLength - dbUpdateData.tokenCount = oldTokenCount // Keep the same token count if content is identical - dbUpdateData.chunkHash = sha256Hex(content) - } + await tx.update(embedding).set(chunkUpdate).where(eq(embedding.id, chunkId)) - if (updateData.enabled !== undefined) { - dbUpdateData.enabled = updateData.enabled - } + const charDiff = newContentLength - oldContentLength + const tokenDiff = chunkUpdate.tokenCount - oldTokenCount - // Update the chunk - await tx.update(embedding).set(dbUpdateData).where(eq(embedding.id, chunkId)) + await tx + .update(document) + .set({ + characterCount: sql`${document.characterCount} + ${charDiff}`, + tokenCount: sql`${document.tokenCount} + ${tokenDiff}`, + }) + .where(eq(document.id, currentChunk[0].documentId)) - // Update document statistics for the character and token count changes - const charDiff = newContentLength - oldContentLength - const tokenDiff = dbUpdateData.tokenCount! - oldTokenCount + const updatedChunk = await tx + .select({ + id: embedding.id, + chunkIndex: embedding.chunkIndex, + content: embedding.content, + contentLength: embedding.contentLength, + tokenCount: embedding.tokenCount, + enabled: embedding.enabled, + startOffset: embedding.startOffset, + endOffset: embedding.endOffset, + tag1: embedding.tag1, + tag2: embedding.tag2, + tag3: embedding.tag3, + tag4: embedding.tag4, + tag5: embedding.tag5, + tag6: embedding.tag6, + tag7: embedding.tag7, + createdAt: embedding.createdAt, + updatedAt: embedding.updatedAt, + }) + .from(embedding) + .where(eq(embedding.id, chunkId)) + .limit(1) - await tx - .update(document) - .set({ - characterCount: sql`${document.characterCount} + ${charDiff}`, - tokenCount: sql`${document.tokenCount} + ${tokenDiff}`, - }) - .where(eq(document.id, currentChunk[0].documentId)) + logger.info( + `[${requestId}] Updated chunk: ${chunkId}${regenerated ? ' (regenerated embedding)' : ''}` + ) - // Fetch and return the updated chunk - const updatedChunk = await tx - .select({ - id: embedding.id, - chunkIndex: embedding.chunkIndex, - content: embedding.content, - contentLength: embedding.contentLength, - tokenCount: embedding.tokenCount, - enabled: embedding.enabled, - startOffset: embedding.startOffset, - endOffset: embedding.endOffset, - tag1: embedding.tag1, - tag2: embedding.tag2, - tag3: embedding.tag3, - tag4: embedding.tag4, - tag5: embedding.tag5, - tag6: embedding.tag6, - tag7: embedding.tag7, - createdAt: embedding.createdAt, - updatedAt: embedding.updatedAt, - }) - .from(embedding) - .where(eq(embedding.id, chunkId)) - .limit(1) + return updatedChunk[0] as ChunkData + }) - logger.info( - `[${requestId}] Updated chunk: ${chunkId}${updateData.content !== currentChunk[0].content ? ' (regenerated embedding)' : ''}` - ) + if (result) { + return result + } + } - return updatedChunk[0] as ChunkData - }) + throw new Error( + `Chunk ${chunkId} was concurrently modified ${MAX_UPDATE_ATTEMPTS} times; retry the update` + ) } // If only enabled status is being updated, no need for transaction - if (updateData.enabled !== undefined) { - dbUpdateData.enabled = updateData.enabled - } - - await db.update(embedding).set(dbUpdateData).where(eq(embedding.id, chunkId)) + await db + .update(embedding) + .set({ + updatedAt: new Date(), + ...(updateData.enabled !== undefined ? { enabled: updateData.enabled } : {}), + }) + .where(eq(embedding.id, chunkId)) // Fetch the updated chunk const updatedChunk = await db diff --git a/apps/sim/lib/knowledge/service.test.ts b/apps/sim/lib/knowledge/service.test.ts index e0c12cd6520..be1fff5ba87 100644 --- a/apps/sim/lib/knowledge/service.test.ts +++ b/apps/sim/lib/knowledge/service.test.ts @@ -95,10 +95,16 @@ describe('updateKnowledgeBase — workspace transfer authorization', () => { actorUserId: 'u-1', }) ).rejects.toThrow('Knowledge base kb-missing not found') - expect(permissionsMockFns.mockGetUserEntityPermissions).not.toHaveBeenCalled() + // The target-workspace permission is resolved before the transaction + // opens (pool safety), so the lookup runs even when the KB is missing. + expect(permissionsMockFns.mockGetUserEntityPermissions).toHaveBeenCalledWith( + 'u-1', + 'workspace', + 'ws-target' + ) }) - it('locks the knowledge base row (SELECT … FOR UPDATE) before the permission check', async () => { + it('locks the knowledge base row (SELECT … FOR UPDATE) and enforces the pre-resolved permission', async () => { dbChainMockFns.limit.mockResolvedValueOnce([{ workspaceId: 'ws-current', userId: 'u-1' }]) permissionsMockFns.mockGetUserEntityPermissions.mockResolvedValueOnce(null) diff --git a/apps/sim/lib/knowledge/service.ts b/apps/sim/lib/knowledge/service.ts index ea8a0040ca0..084e6feb020 100644 --- a/apps/sim/lib/knowledge/service.ts +++ b/apps/sim/lib/knowledge/service.ts @@ -272,6 +272,17 @@ export async function updateKnowledgeBase( ) } + // Resolved before the transaction: the target workspace comes from the + // request input, so checking it inside the FOR UPDATE tx would only issue a + // second pooled-connection checkout while the first is held. + const targetWorkspacePermission = updates.workspaceId + ? await getUserEntityPermissions( + options?.actorUserId as string, + 'workspace', + updates.workspaceId + ) + : null + try { await db.transaction(async (tx) => { const [currentKb] = await tx @@ -297,17 +308,13 @@ export async function updateKnowledgeBase( 'Only the knowledge base owner can remove it from a workspace' ) } - } else { - const targetPermission = await getUserEntityPermissions( - actorUserId, - 'workspace', - targetWorkspaceId + } else if ( + targetWorkspacePermission !== 'write' && + targetWorkspacePermission !== 'admin' + ) { + throw new KnowledgeBasePermissionError( + 'User does not have permission on the target workspace' ) - if (targetPermission !== 'write' && targetPermission !== 'admin') { - throw new KnowledgeBasePermissionError( - 'User does not have permission on the target workspace' - ) - } } } } diff --git a/apps/sim/lib/logs/execution/logger.test.ts b/apps/sim/lib/logs/execution/logger.test.ts index f016533b1ee..3acb14dcae6 100644 --- a/apps/sim/lib/logs/execution/logger.test.ts +++ b/apps/sim/lib/logs/execution/logger.test.ts @@ -50,6 +50,10 @@ vi.mock('@/lib/billing/core/usage', () => ({ vi.mock('@/lib/billing/core/usage-log', () => ({ recordUsage: vi.fn(() => Promise.resolve()), stableEventKey: vi.fn((parts: Record) => JSON.stringify(parts)), + deriveBillingContext: vi.fn((userId: string) => ({ + billingEntity: { type: 'user', id: userId }, + billingPeriod: { start: new Date('2024-01-01'), end: new Date('2024-02-01') }, + })), })) vi.mock('@/lib/billing/threshold-billing', () => ({ diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 5c46172adcb..9733a5550d6 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -1111,6 +1111,12 @@ export class ExecutionLogger implements IExecutionLoggerService { return 0 } + // Resolved before the advisory-locked transaction below: resolving inside + // it would run the subscription lookups on the global pool while the tx + // already holds a pooled connection (see recordCumulativeUsage). + const resolvedBillingContext = + billingContext ?? deriveBillingContext(userId, await getHighestPrioritySubscription(userId)) + // Build the run's *cumulative* target ledger lines from the cost summary. // The usage_log is then reconciled to these targets: at each completion // boundary (pause or terminal) we record only the increment versus what @@ -1262,7 +1268,8 @@ export class ExecutionLogger implements IExecutionLoggerService { workflowId, executionId, tx, - ...(billingContext ?? {}), + billingEntity: resolvedBillingContext.billingEntity, + billingPeriod: resolvedBillingContext.billingPeriod, }) recordedIncrement = entries.reduce((acc, e) => acc + e.cost, 0) @@ -1291,7 +1298,8 @@ export class ExecutionLogger implements IExecutionLoggerService { entries, workspaceId: workflowRecord.workspaceId ?? undefined, workflowId, - ...(billingContext ?? {}), + billingEntity: resolvedBillingContext.billingEntity, + billingPeriod: resolvedBillingContext.billingPeriod, }) recordedIncrement = entries.reduce((acc, e) => acc + e.cost, 0) } diff --git a/apps/sim/lib/mcp/workflow-mcp-sync.ts b/apps/sim/lib/mcp/workflow-mcp-sync.ts index ca0c20fb97e..7a6a25d4eb0 100644 --- a/apps/sim/lib/mcp/workflow-mcp-sync.ts +++ b/apps/sim/lib/mcp/workflow-mcp-sync.ts @@ -138,18 +138,26 @@ export async function getDeployedWorkflowInputFormat( return extractInputFormatFromBlocks(deployed.blocks as Record) ?? [] } -interface SyncOptions { +interface SyncOptionsBase { workflowId: string requestId: string - /** If provided, use this state instead of loading from DB */ - state?: { blocks?: Record } /** Context for logging (e.g., 'deploy', 'revert', 'activate') */ context?: string - tx?: DbOrTx notify?: boolean throwOnError?: boolean } +/** + * Callers running inside a transaction must preload the workflow state: + * loading it lazily would issue queries on the global pool while the + * transaction already holds a pooled connection. + */ +type SyncOptions = SyncOptionsBase & + ( + | { tx: DbOrTx; state: { blocks?: Record } } + | { tx?: undefined; state?: { blocks?: Record } } + ) + /** * Sync MCP tools for a workflow with the latest parameter schema. * - If the workflow has no start block, removes all MCP tools @@ -164,8 +172,22 @@ export async function syncMcpToolsForWorkflow( options: SyncOptions ): Promise> { if (!options.tx) { + let state = options.state + if (!state) { + try { + state = await loadDeployedWorkflowState(options.workflowId) + } catch (error) { + logger.error( + `[${options.requestId}] Error loading deployed state for MCP tool sync (${options.context ?? 'sync'}):`, + error + ) + if (options.throwOnError) throw error + return [] + } + } + const resolvedState = state const tools = await db.transaction((tx) => - syncMcpToolsForWorkflow({ ...options, tx, notify: false }) + syncMcpToolsForWorkflow({ ...options, state: resolvedState, tx, notify: false }) ) if (options.notify ?? true) notifyMcpToolServers(tools) return tools @@ -182,19 +204,14 @@ export async function syncMcpToolsForWorkflow( } = options try { - let workflowState: { blocks?: Record } | null = state ?? null - if (!workflowState) { - workflowState = await loadDeployedWorkflowState(workflowId) - } - - if (!hasValidStartBlockInState(workflowState as WorkflowState | null)) { + if (!hasValidStartBlockInState(state as WorkflowState | null)) { const affectedTools = await removeMcpToolsForWorkflow(workflowId, requestId, tx, false, true) if (notify) notifyMcpToolServers(affectedTools) return affectedTools } - const generatedParameterSchema = workflowState?.blocks - ? generateSchemaFromBlocks(workflowState.blocks) + const generatedParameterSchema = state.blocks + ? generateSchemaFromBlocks(state.blocks) : EMPTY_SCHEMA const schemaLimitError = validateMcpToolMetadataForStorage({ parameterSchema: generatedParameterSchema, diff --git a/apps/sim/lib/permission-groups/auto-add.ts b/apps/sim/lib/permission-groups/auto-add.ts index 8eae5876680..cde464a8054 100644 --- a/apps/sim/lib/permission-groups/auto-add.ts +++ b/apps/sim/lib/permission-groups/auto-add.ts @@ -3,7 +3,6 @@ import { permissionGroup, permissionGroupMember } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { generateId } from '@sim/utils/id' import { and, eq } from 'drizzle-orm' -import { isWorkspaceOnEnterprisePlan } from '@/lib/billing' const logger = createLogger('PermissionGroupsAutoAdd') @@ -17,14 +16,20 @@ type Client = DbClient | TransactionClient * already a member of a group in that workspace. Safe to call unconditionally * on any workspace-permission grant; no-ops when access control isn't * available or no auto-add group is configured. + * + * `entitled` is the result of `isWorkspaceOnEnterprisePlan(workspaceId)`, + * resolved by the caller. Callers running inside a transaction must resolve it + * before opening the tx: the entitlement check queries billing tables on the + * global pool, which would require a second pooled connection while the + * transaction holds the first. */ export async function applyWorkspaceAutoAddGroup( client: Client, workspaceId: string, - userId: string + userId: string, + entitled: boolean ): Promise { try { - const entitled = await isWorkspaceOnEnterprisePlan(workspaceId) if (!entitled) return const [autoAddGroup] = await client diff --git a/apps/sim/lib/table/service.ts b/apps/sim/lib/table/service.ts index 429ac7bb04e..46dd031efcc 100644 --- a/apps/sim/lib/table/service.ts +++ b/apps/sim/lib/table/service.ts @@ -2278,7 +2278,8 @@ export async function upsertRow( data.tableId, data.data, schema, - existingRow?.id // exclude the matched row on updates + existingRow?.id, // exclude the matched row on updates + trx ) if (!uniqueValidation.valid) { throw new Error(`Unique constraint violation: ${uniqueValidation.errors.join(', ')}`) diff --git a/apps/sim/lib/table/validation.ts b/apps/sim/lib/table/validation.ts index c5d61e19f42..72b077ecc55 100644 --- a/apps/sim/lib/table/validation.ts +++ b/apps/sim/lib/table/validation.ts @@ -405,12 +405,17 @@ export function validateUniqueConstraints( * Checks unique constraints using targeted database queries. * Only queries for specific conflicting values instead of loading all rows. * This reduces memory usage from O(n) to O(1) where n is the number of rows. + * + * Pass a transaction as `executor` when running inside an open tx so the + * lookup runs on the transaction's connection and observes its uncommitted + * writes; otherwise the default `db` connection only observes committed state. */ export async function checkUniqueConstraintsDb( tableId: string, data: RowData, schema: TableSchema, - excludeRowId?: string + excludeRowId?: string, + executor: UniqueCheckExecutor = db ): Promise { const errors: string[] = [] const uniqueColumns = getUniqueColumns(schema) @@ -459,7 +464,7 @@ export async function checkUniqueConstraintsDb( ? and(baseCondition, sql`${userTableRows.id} != ${excludeRowId}`) : baseCondition - const conflictingRow = await db + const conflictingRow = await executor .select({ id: userTableRows.id, position: userTableRows.position }) .from(userTableRows) .where(whereClause) diff --git a/apps/sim/lib/webhooks/deploy.ts b/apps/sim/lib/webhooks/deploy.ts index f3fa642685b..8b204288b2a 100644 --- a/apps/sim/lib/webhooks/deploy.ts +++ b/apps/sim/lib/webhooks/deploy.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { generateShortId } from '@sim/utils/id' import { and, eq, inArray, isNotNull, isNull, or } from 'drizzle-orm' import type { NextRequest } from 'next/server' +import type { DbOrTx } from '@/lib/db/types' import { getProviderIdFromServiceId } from '@/lib/oauth' import { PendingWebhookVerificationTracker } from '@/lib/webhooks/pending-verification' import { @@ -37,7 +38,8 @@ interface TriggerSaveResult { } export async function validateTriggerWebhookConfigForDeploy( - blocks: Record + blocks: Record, + executor: DbOrTx = db ): Promise { const triggerBlocks = Object.values(blocks || {}).filter((b) => b && b.enabled !== false) @@ -74,7 +76,8 @@ export async function validateTriggerWebhookConfigForDeploy( const oauthProviderId = getProviderIdFromServiceId(provider) const hasCredential = await credentialSetHasProviderCredential( providerConfig.credentialSetId as string, - oauthProviderId + oauthProviderId, + executor ) if (!hasCredential) { return { @@ -93,9 +96,10 @@ export async function validateTriggerWebhookConfigForDeploy( async function credentialSetHasProviderCredential( credentialSetId: string, - providerId: string + providerId: string, + executor: DbOrTx ): Promise { - const members = await db + const members = await executor .select({ userId: credentialSetMember.userId }) .from(credentialSetMember) .where( @@ -107,7 +111,7 @@ async function credentialSetHasProviderCredential( if (members.length === 0) return false - const [credential] = await db + const [credential] = await executor .select({ id: account.id }) .from(account) .where( diff --git a/apps/sim/lib/webhooks/utils.server.ts b/apps/sim/lib/webhooks/utils.server.ts index eab8f9cccfe..8a34efc72a7 100644 --- a/apps/sim/lib/webhooks/utils.server.ts +++ b/apps/sim/lib/webhooks/utils.server.ts @@ -81,6 +81,11 @@ export interface CredentialSetWebhookSyncResult { * 3. Creates webhooks for new credentials * 4. Updates config for existing webhooks (preserving state) * 5. Deletes webhooks for credentials no longer in the set + * + * Must run OUTSIDE any open transaction: credential resolution can trigger an + * OAuth token refresh (external HTTP + its own committed writes) and webhook + * cleanup can call external provider APIs, neither of which may execute while + * a pooled connection is held. */ export async function syncWebhooksForCredentialSet(params: { workflowId: string @@ -91,7 +96,6 @@ export async function syncWebhooksForCredentialSet(params: { oauthProviderId: string providerConfig: Record requestId: string - tx?: DbOrTx deploymentVersionId?: string }): Promise { const { @@ -103,12 +107,9 @@ export async function syncWebhooksForCredentialSet(params: { oauthProviderId, providerConfig, requestId, - tx, deploymentVersionId, } = params - const dbCtx = tx ?? db - const syncLogger = createLogger('CredentialSetWebhookSync') syncLogger.info( `[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}` @@ -129,7 +130,7 @@ export async function syncWebhooksForCredentialSet(params: { `[${requestId}] Found ${credentials.length} credentials in set ${credentialSetId}` ) - const existingWebhooks = await dbCtx + const existingWebhooks = await db .select() .from(webhook) .where( @@ -201,7 +202,7 @@ export async function syncWebhooksForCredentialSet(params: { userId: cred.userId, } - await dbCtx + await db .update(webhook) .set({ ...(deploymentVersionId ? { deploymentVersionId } : {}), @@ -235,7 +236,7 @@ export async function syncWebhooksForCredentialSet(params: { userId: cred.userId, } - await dbCtx.insert(webhook).values({ + await db.insert(webhook).values({ id: webhookId, workflowId, blockId, @@ -278,7 +279,7 @@ export async function syncWebhooksForCredentialSet(params: { if (workflowRecord) { await cleanupExternalWebhook(existingWebhook, workflowRecord, requestId) } - await dbCtx.delete(webhook).where(eq(webhook.id, existingWebhook.id)) + await db.delete(webhook).where(eq(webhook.id, existingWebhook.id)) result.deleted++ syncLogger.debug( @@ -309,17 +310,16 @@ export async function syncWebhooksForCredentialSet(params: { * Called when credential set membership changes (member added/removed). * * This finds all workflows with webhooks using this credential set and resyncs them. + * Must run OUTSIDE any open transaction — see {@link syncWebhooksForCredentialSet}. */ export async function syncAllWebhooksForCredentialSet( credentialSetId: string, - requestId: string, - tx?: DbOrTx + requestId: string ): Promise<{ workflowsUpdated: number; totalCreated: number; totalDeleted: number }> { - const dbCtx = tx ?? db const syncLogger = createLogger('CredentialSetMembershipSync') syncLogger.info(`[${requestId}] Syncing all webhooks for credential set ${credentialSetId}`) - const webhooksForSet = await dbCtx + const webhooksForSet = await db .select({ webhook }) .from(webhook) .leftJoin( @@ -385,7 +385,6 @@ export async function syncAllWebhooksForCredentialSet( oauthProviderId, providerConfig: baseConfig, requestId, - tx: dbCtx, deploymentVersionId: representativeWebhook.deploymentVersionId || undefined, }) diff --git a/apps/sim/lib/workflows/orchestration/deploy.ts b/apps/sim/lib/workflows/orchestration/deploy.ts index 16ba0be08a3..b2e5b4ec45c 100644 --- a/apps/sim/lib/workflows/orchestration/deploy.ts +++ b/apps/sim/lib/workflows/orchestration/deploy.ts @@ -122,7 +122,7 @@ export async function performFullDeploy( workflowName: workflowName || workflowRecord.name || undefined, description: params.versionDescription, name: params.versionName, - validateWorkflowState: async (workflowState) => { + validateWorkflowState: async (workflowState, executor) => { const scheduleValidation = validateWorkflowSchedules(workflowState.blocks) if (!scheduleValidation.isValid) { return { @@ -131,7 +131,10 @@ export async function performFullDeploy( errorCode: 'validation', } } - const triggerValidation = await validateTriggerWebhookConfigForDeploy(workflowState.blocks) + const triggerValidation = await validateTriggerWebhookConfigForDeploy( + workflowState.blocks, + executor + ) if (!triggerValidation.success) { return { success: false, diff --git a/apps/sim/lib/workflows/persistence/duplicate.test.ts b/apps/sim/lib/workflows/persistence/duplicate.test.ts index 47af9421b75..5d7be01ed4f 100644 --- a/apps/sim/lib/workflows/persistence/duplicate.test.ts +++ b/apps/sim/lib/workflows/persistence/duplicate.test.ts @@ -1,19 +1,12 @@ /** * @vitest-environment node */ -import { - permissionsMock, - permissionsMockFns, - workflowAuthzMockFns, - workflowsUtilsMock, - workflowsUtilsMockFns, -} from '@sim/testing' +import { workflowAuthzMockFns, workflowsUtilsMock, workflowsUtilsMockFns } from '@sim/testing' import { drizzleOrmMock } from '@sim/testing/mocks' import { beforeEach, describe, expect, it, vi } from 'vitest' const mockAuthorizeWorkflowByWorkspacePermission = workflowAuthzMockFns.mockAuthorizeWorkflowByWorkspacePermission -const mockGetUserEntityPermissions = permissionsMockFns.mockGetUserEntityPermissions const { mockDb } = vi.hoisted(() => ({ mockDb: { @@ -27,8 +20,6 @@ vi.mock('drizzle-orm', () => ({ })) vi.mock('@/lib/workflows/utils', () => workflowsUtilsMock) -vi.mock('@/lib/workspaces/permissions/utils', () => permissionsMock) - vi.mock('@sim/db', () => ({ db: mockDb, })) @@ -85,11 +76,15 @@ describe('duplicateWorkflow ordering', () => { randomUUID: vi.fn().mockReturnValue('new-workflow-id'), }) - mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ allowed: true }) + mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ + allowed: true, + status: 200, + workflow: { id: 'source-workflow-id', workspaceId: 'workspace-123' }, + workspacePermission: 'write', + }) workflowsUtilsMockFns.mockDeduplicateWorkflowName.mockImplementation( async (name: string) => name ) - mockGetUserEntityPermissions.mockResolvedValue('write') }) it('uses mixed-sibling top insertion sort order', async () => { diff --git a/apps/sim/lib/workflows/persistence/duplicate.ts b/apps/sim/lib/workflows/persistence/duplicate.ts index dbec61663a1..0d409629f77 100644 --- a/apps/sim/lib/workflows/persistence/duplicate.ts +++ b/apps/sim/lib/workflows/persistence/duplicate.ts @@ -13,7 +13,6 @@ import { and, eq, isNull, min } from 'drizzle-orm' import type { DbOrTx } from '@/lib/db/types' import { remapConditionBlockIds, remapConditionEdgeHandle } from '@/lib/workflows/condition-ids' import { deduplicateWorkflowName } from '@/lib/workflows/utils' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' import type { Variable } from '@/stores/variables/types' import type { LoopConfig, ParallelConfig } from '@/stores/workflows/workflow/types' import { SYSTEM_SUBBLOCK_IDS, TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants' @@ -29,6 +28,13 @@ interface DuplicateWorkflowOptions { folderId?: string | null requestId?: string newWorkflowId?: string + /** + * Run inside the caller's transaction. Callers that pass `tx` must have + * already authorized the user on the source workflow's workspace: the + * authorization helpers query through the global pool, so running them here + * would require a second pooled connection while the caller's transaction + * holds the first. + */ tx?: DbOrTx workflowIdMap?: Map } @@ -281,6 +287,41 @@ export async function duplicateWorkflow( const newWorkflowId = clientNewWorkflowId || workflowIdMap?.get(sourceWorkflowId) || generateId() const now = new Date() + // Authorization runs before the transaction opens so its global-pool + // queries never execute while a pooled connection is held. Callers that + // pass `tx` authorize the workspace themselves (see DuplicateWorkflowOptions). + if (!providedTx) { + const sourceAuthorization = await authorizeWorkflowByWorkspacePermission({ + workflowId: sourceWorkflowId, + userId, + action: 'read', + }) + if (!sourceAuthorization.allowed || !sourceAuthorization.workflow) { + throw new Error('Source workflow not found or access denied') + } + + const sourceWorkspaceId = sourceAuthorization.workflow.workspaceId + if (!sourceWorkspaceId) { + throw new Error( + 'This workflow is not attached to a workspace. Personal workflows are deprecated and cannot be duplicated.' + ) + } + + const targetWorkspaceId = workspaceId || sourceWorkspaceId + if (targetWorkspaceId !== sourceWorkspaceId) { + throw new Error('Cross-workspace workflow duplication is not supported') + } + + // The target workspace equals the source workspace, so the permission + // resolved by the authorization above is the target permission. + if ( + sourceAuthorization.workspacePermission !== 'admin' && + sourceAuthorization.workspacePermission !== 'write' + ) { + throw new Error('Write or admin access required for target workspace') + } + } + const duplicateWithinTransaction = async (tx: DbOrTx) => { // First verify the source workflow exists const sourceWorkflowRow = await tx @@ -300,28 +341,11 @@ export async function duplicateWorkflow( ) } - const sourceAuthorization = await authorizeWorkflowByWorkspacePermission({ - workflowId: sourceWorkflowId, - userId, - action: 'read', - }) - if (!sourceAuthorization.allowed) { - throw new Error('Source workflow not found or access denied') - } - const targetWorkspaceId = workspaceId || source.workspaceId if (targetWorkspaceId !== source.workspaceId) { throw new Error('Cross-workspace workflow duplication is not supported') } - const targetWorkspacePermission = await getUserEntityPermissions( - userId, - 'workspace', - targetWorkspaceId - ) - if (targetWorkspacePermission !== 'admin' && targetWorkspacePermission !== 'write') { - throw new Error('Write or admin access required for target workspace') - } const targetFolderId = folderId !== undefined ? folderId : source.folderId await assertTargetFolderMutable(tx, targetFolderId, targetWorkspaceId) @@ -354,7 +378,12 @@ export async function duplicateWorkflow( // Mapping from old variable IDs to new variable IDs (populated during variable duplication) const varIdMapping = new Map() - const deduplicatedName = await deduplicateWorkflowName(name, targetWorkspaceId, targetFolderId) + const deduplicatedName = await deduplicateWorkflowName( + name, + targetWorkspaceId, + targetFolderId, + tx + ) await tx.insert(workflow).values({ id: newWorkflowId, diff --git a/apps/sim/lib/workflows/persistence/utils.test.ts b/apps/sim/lib/workflows/persistence/utils.test.ts index 1059c61caf7..afcc4081fad 100644 --- a/apps/sim/lib/workflows/persistence/utils.test.ts +++ b/apps/sim/lib/workflows/persistence/utils.test.ts @@ -95,6 +95,7 @@ const { mockDb, mockWorkflowBlocks, mockWorkflowEdges, mockWorkflowSubflows } = vi.mock('@sim/db', () => ({ db: mockDb, + runOutsideTransactionContext: (fn: () => T): T => fn(), workflowBlocks: mockWorkflowBlocks, workflowEdges: mockWorkflowEdges, workflowSubflows: mockWorkflowSubflows, diff --git a/apps/sim/lib/workflows/persistence/utils.ts b/apps/sim/lib/workflows/persistence/utils.ts index 1974ef12991..25b77d380d1 100644 --- a/apps/sim/lib/workflows/persistence/utils.ts +++ b/apps/sim/lib/workflows/persistence/utils.ts @@ -1,4 +1,4 @@ -import { db, workflow, workflowDeploymentVersion } from '@sim/db' +import { db, runOutsideTransactionContext, workflow, workflowDeploymentVersion } from '@sim/db' import { credential } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' @@ -159,6 +159,7 @@ export async function loadDeployedWorkflowState( interface MigrationContext { blocks: Record workspaceId: string + executor: DbOrTx migrated: boolean } @@ -167,9 +168,10 @@ type BlockMigration = (ctx: MigrationContext) => MigrationContext | Promise, - workspaceId: string + workspaceId: string, + executor: DbOrTx = db ): Promise<{ blocks: Record; migrated: boolean }> => { - let ctx: MigrationContext = { blocks, workspaceId, migrated: false } + let ctx: MigrationContext = { blocks, workspaceId, executor, migrated: false } for (const migration of migrations) { ctx = await migration(ctx) } @@ -194,7 +196,11 @@ const applyBlockMigrations = createMigrationPipeline([ }, async (ctx) => { - const { blocks, migrated } = await migrateCredentialIds(ctx.blocks, ctx.workspaceId) + const { blocks, migrated } = await migrateCredentialIds( + ctx.blocks, + ctx.workspaceId, + ctx.executor + ) return { ...ctx, blocks, migrated: ctx.migrated || migrated } }, @@ -273,7 +279,8 @@ export const CREDENTIAL_SUBBLOCK_IDS = new Set([ async function migrateCredentialIds( blocks: Record, - workspaceId: string + workspaceId: string, + executor: DbOrTx ): Promise<{ blocks: Record; migrated: boolean }> { const potentialLegacyIds = new Set() @@ -305,7 +312,7 @@ async function migrateCredentialIds( return { blocks, migrated: false } } - const rows = await db + const rows = await executor .select({ id: credential.id, accountId: credential.accountId }) .from(credential) .where( @@ -379,12 +386,21 @@ export async function loadWorkflowFromNormalizedTables( const raw = await loadWorkflowFromNormalizedTablesRaw(workflowId, externalTx) if (!raw) return null - const { blocks: finalBlocks, migrated } = await applyBlockMigrations(raw.blocks, raw.workspaceId) + const { blocks: finalBlocks, migrated } = await applyBlockMigrations( + raw.blocks, + raw.workspaceId, + externalTx ?? db + ) if (migrated) { - Promise.resolve().then(() => - persistMigratedBlocks(workflowId, raw.blocks, finalBlocks, raw.blockUpdatedAtById) - ) + // Deliberate fire-and-forget persistence on the global pool: it must not + // join (or block) a read transaction this load may be running inside, so + // it escapes the transaction context instead of tripping the wire. + runOutsideTransactionContext(() => { + Promise.resolve().then(() => + persistMigratedBlocks(workflowId, raw.blocks, finalBlocks, raw.blockUpdatedAtById) + ) + }) } const patchedLoops: Record = { ...raw.loops } @@ -556,7 +572,8 @@ export async function deployWorkflow(params: { name?: string | null workflowState?: WorkflowState validateWorkflowState?: ( - workflowState: WorkflowState + workflowState: WorkflowState, + executor: DbOrTx ) => DeployWorkflowValidationResult | Promise onDeployTransaction?: ( tx: DbOrTx, @@ -596,7 +613,7 @@ export async function deployWorkflow(params: { } } - const validationError = await params.validateWorkflowState?.(currentState) + const validationError = await params.validateWorkflowState?.(currentState, tx) if (validationError && !validationError.success) { return { success: false as const, diff --git a/apps/sim/lib/workflows/utils.ts b/apps/sim/lib/workflows/utils.ts index 4f653da8af0..219c194e080 100644 --- a/apps/sim/lib/workflows/utils.ts +++ b/apps/sim/lib/workflows/utils.ts @@ -53,17 +53,21 @@ export async function listWorkflows(workspaceId: string, options?: { scope?: Wor /** * Generates a unique workflow name within a workspace+folder scope. * If the name already exists among active workflows, appends (2), (3), etc. + * + * Pass a transaction as `executor` when running inside an open tx so the + * lookup observes workflows inserted earlier in the same transaction. */ export async function deduplicateWorkflowName( name: string, workspaceId: string, - folderId: string | null | undefined + folderId: string | null | undefined, + executor: Pick = db ): Promise { const folderCondition = folderId ? eq(workflowTable.folderId, folderId) : isNull(workflowTable.folderId) - const [existing] = await db + const [existing] = await executor .select({ id: workflowTable.id }) .from(workflowTable) .where( @@ -82,7 +86,7 @@ export async function deduplicateWorkflowName( for (let i = 2; i < 100; i++) { const candidate = `${name} (${i})` - const [dup] = await db + const [dup] = await executor .select({ id: workflowTable.id }) .from(workflowTable) .where( diff --git a/bun.lock b/bun.lock index 5848be606fe..2160341cb2e 100644 --- a/bun.lock +++ b/bun.lock @@ -350,6 +350,7 @@ "name": "@sim/db", "version": "0.1.0", "dependencies": { + "@sim/logger": "workspace:*", "@sim/utils": "workspace:*", "drizzle-orm": "^0.45.2", "postgres": "^3.4.5", diff --git a/packages/db/db.ts b/packages/db/db.ts index 397e11a894c..dae0815ed15 100644 --- a/packages/db/db.ts +++ b/packages/db/db.ts @@ -1,6 +1,7 @@ import { drizzle } from 'drizzle-orm/postgres-js' import postgres from 'postgres' import * as schema from './schema' +import { instrumentPoolClient } from './tx-tripwire' const connectionString = process.env.DATABASE_URL! if (!connectionString) { @@ -14,7 +15,10 @@ const poolOptions = { onnotice: () => {}, } -const postgresClient = postgres(connectionString, { ...poolOptions, max: 15 }) +const postgresClient = instrumentPoolClient( + postgres(connectionString, { ...poolOptions, max: 15 }), + 'db' +) export const db = drizzle(postgresClient, { schema }) @@ -32,5 +36,7 @@ if (replicaUrl && !/^postgres(ql)?:\/\//.test(replicaUrl)) { } export const dbReplica: typeof db = replicaUrl - ? drizzle(postgres(replicaUrl, { ...poolOptions, max: 10 }), { schema }) + ? drizzle(instrumentPoolClient(postgres(replicaUrl, { ...poolOptions, max: 10 }), 'dbReplica'), { + schema, + }) : db diff --git a/packages/db/index.ts b/packages/db/index.ts index 8278709aa2d..a28cf889527 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -1,3 +1,4 @@ export * from './db' export * from './schema' export * from './triggers' +export { instrumentPoolClient, runOutsideTransactionContext } from './tx-tripwire' diff --git a/packages/db/package.json b/packages/db/package.json index c4e794ce07c..9b4a5c07b79 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -30,6 +30,7 @@ "format:check": "biome format ." }, "dependencies": { + "@sim/logger": "workspace:*", "@sim/utils": "workspace:*", "drizzle-orm": "^0.45.2", "postgres": "^3.4.5", diff --git a/packages/db/tx-tripwire.test.ts b/packages/db/tx-tripwire.test.ts new file mode 100644 index 00000000000..52e1d466071 --- /dev/null +++ b/packages/db/tx-tripwire.test.ts @@ -0,0 +1,186 @@ +import { afterEach, describe, expect, it, vi } from 'vitest' + +const { mockError } = vi.hoisted(() => ({ mockError: vi.fn() })) + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + error: mockError, + warn: vi.fn(), + info: vi.fn(), + debug: vi.fn(), + }), +})) + +import { + instrumentPoolClient, + isInsideDbTransaction, + runOutsideTransactionContext, +} from './tx-tripwire' + +interface FakeReserved { + unsafe: (query: string) => Promise +} + +function createFakeClient() { + const rootQueries: string[] = [] + const reservedQueries: string[] = [] + + const client = instrumentPoolClient( + { + unsafe(query: string) { + rootQueries.push(query) + return Promise.resolve([]) + }, + // Mirrors postgres-js: begin issues its internal BEGIN through the root + // client's unsafe (the instrumented one) before running the callback on + // a reserved connection. + begin(...args: unknown[]) { + const callback = args[args.length - 1] as (reserved: FakeReserved) => unknown + const reserved: FakeReserved = { + unsafe: (query: string) => { + reservedQueries.push(query) + return Promise.resolve([]) + }, + } + return Promise.resolve(this.unsafe('begin')).then(() => callback(reserved)) + }, + }, + 'test-pool' + ) + + return { client, rootQueries, reservedQueries } +} + +afterEach(() => { + vi.unstubAllEnvs() + mockError.mockClear() +}) + +describe('tx tripwire', () => { + it('marks the transaction context for the duration of a begin callback', async () => { + const { client } = createFakeClient() + + expect(isInsideDbTransaction()).toBe(false) + await client.begin(async () => { + expect(isInsideDbTransaction()).toBe(true) + await Promise.resolve() + expect(isInsideDbTransaction()).toBe(true) + }) + expect(isInsideDbTransaction()).toBe(false) + }) + + it('throws when the root client is queried inside a transaction callback, at any await depth', async () => { + const { client } = createFakeClient() + const deeplyNestedHelper = async () => { + await Promise.resolve() + return client.unsafe('select 1 as nested_checkout') + } + + await expect( + client.begin(async () => { + await deeplyNestedHelper() + }) + ).rejects.toThrow(/inside a transaction callback/) + }) + + it('allows queries on the reserved connection inside the callback', async () => { + const { client, reservedQueries } = createFakeClient() + + await client.begin(async (reserved: FakeReserved) => { + await reserved.unsafe('select 1 as tx_handle_query') + }) + + expect(reservedQueries).toEqual(['select 1 as tx_handle_query']) + }) + + it('allows root-client queries outside any transaction', async () => { + const { client, rootQueries } = createFakeClient() + + await client.unsafe('select 1 as plain_query') + + expect(rootQueries).toEqual(['select 1 as plain_query']) + }) + + it('throws when a nested transaction is opened on the root client', async () => { + const { client } = createFakeClient() + + await expect( + client.begin(async () => { + await client.begin(async () => {}) + }) + ).rejects.toThrow(/nested transaction/i) + }) + + it('runOutsideTransactionContext escapes lazy thenables awaited by the caller', async () => { + const { client, rootQueries } = createFakeClient() + + await client.begin(async () => { + // Mirrors a drizzle query builder: no work until .then is invoked. The + // helper must assimilate it inside the exited context so the caller's + // await (inside the tx context) does not trip the wire. + const lazyQuery = { + then(resolve: (value: T) => void) { + resolve(client.unsafe('select 1 as lazy_escaped') as T) + }, + } + await runOutsideTransactionContext(() => lazyQuery) + }) + + expect(rootQueries).toEqual(['begin', 'select 1 as lazy_escaped']) + }) + + it('runOutsideTransactionContext escapes the context, including scheduled promises', async () => { + const { client, rootQueries } = createFakeClient() + + await client.begin(async () => { + await runOutsideTransactionContext(() => { + expect(isInsideDbTransaction()).toBe(false) + return Promise.resolve().then(() => client.unsafe('select 1 as escaped_query')) + }) + expect(isInsideDbTransaction()).toBe(true) + }) + + expect(rootQueries).toEqual(['begin', 'select 1 as escaped_query']) + }) + + it('DB_TX_TRIPWIRE=off disables detection', async () => { + vi.stubEnv('DB_TX_TRIPWIRE', 'off') + const { client, rootQueries } = createFakeClient() + + await client.begin(async () => { + await client.unsafe('select 1 as off_mode_query') + }) + + expect(rootQueries).toEqual(['begin', 'select 1 as off_mode_query']) + expect(mockError).not.toHaveBeenCalled() + }) + + it('DB_TX_TRIPWIRE=warn logs instead of throwing and dedupes repeats', async () => { + vi.stubEnv('DB_TX_TRIPWIRE', 'warn') + const { client, rootQueries } = createFakeClient() + + await client.begin(async () => { + await client.unsafe('select 1 as warn_mode_query') + await client.unsafe('select 1 as warn_mode_query') + }) + + expect(rootQueries).toEqual([ + 'begin', + 'select 1 as warn_mode_query', + 'select 1 as warn_mode_query', + ]) + expect(mockError).toHaveBeenCalledTimes(1) + expect(mockError.mock.calls[0][1]).toMatchObject({ poolName: 'test-pool' }) + }) + + it('DB_TX_TRIPWIRE=warn reports a nested transaction exactly once', async () => { + vi.stubEnv('DB_TX_TRIPWIRE', 'warn') + const { client } = createFakeClient() + + await client.begin(async () => { + await client.begin(async () => {}) + }) + + expect(mockError).toHaveBeenCalledTimes(1) + }) +}) diff --git a/packages/db/tx-tripwire.ts b/packages/db/tx-tripwire.ts new file mode 100644 index 00000000000..88e768279e5 --- /dev/null +++ b/packages/db/tx-tripwire.ts @@ -0,0 +1,167 @@ +import { AsyncLocalStorage } from 'node:async_hooks' +import { createLogger } from '@sim/logger' + +const logger = createLogger('DbTxTripwire') + +/** + * Ambient marker set for the duration of a transaction callback on an + * instrumented pool. Everything awaited or scheduled inside the callback + * inherits it, at any call depth. + */ +const transactionContext = new AsyncLocalStorage() + +type TripwireMode = 'off' | 'warn' | 'throw' + +/** + * Tripwire mode resolution: + * - `DB_TX_TRIPWIRE=off|warn|throw` overrides everything + * - otherwise `throw` outside production (bugs fail loudly in dev/CI) and + * `warn` in production (rate-limited error log, never breaks traffic) + */ +function resolveMode(): TripwireMode { + const override = process.env.DB_TX_TRIPWIRE + if (override === 'off' || override === 'warn' || override === 'throw') return override + return process.env.NODE_ENV === 'production' ? 'warn' : 'throw' +} + +/** + * True while the current async context is inside a transaction callback on an + * instrumented pool (any `db.transaction(...)` / `sql.begin(...)`). + */ +export function isInsideDbTransaction(): boolean { + return transactionContext.getStore() === true +} + +function isThenable(value: unknown): value is PromiseLike { + return ( + typeof value === 'object' && + value !== null && + typeof (value as PromiseLike).then === 'function' + ) +} + +/** + * Escape hatch: run `fn` outside the ambient transaction context. + * + * For DELIBERATE global-pool work initiated inside a transaction callback — + * e.g. fire-and-forget writes that must not join (or block) the transaction. + * Promises and timers scheduled inside `fn` also escape the context. + * + * Lazy thenables (drizzle query builders) only begin executing when their + * `.then` is first invoked, so a returned query is assimilated here, inside + * the exited context — otherwise the caller's `await` would start the query + * back inside the transaction context and trip the wire. + */ +export function runOutsideTransactionContext(fn: () => T): T { + return transactionContext.exit(() => { + const result = fn() + if (isThenable(result)) { + return Promise.resolve().then(() => result) as T + } + return result + }) +} + +const WARN_DEDUPE_WINDOW_MS = 5 * 60 * 1000 +const WARN_DEDUPE_MAX_KEYS = 256 +const recentWarnings = new Map() + +function report(poolName: string, query: string): void { + const mode = resolveMode() + if (mode === 'off') return + + const queryPreview = query.replace(/\s+/g, ' ').slice(0, 160) + const message = + `Query on the global "${poolName}" pool issued inside a transaction callback. ` + + 'The transaction already holds a pooled connection, so this checks out a second one ' + + 'and can deadlock the pool at saturation. Pass the tx handle to the query, hoist the ' + + 'work above the transaction, or wrap deliberate fire-and-forget global-pool work in ' + + 'runOutsideTransactionContext().' + + if (mode === 'throw') { + throw new Error(`${message} Query: ${queryPreview}`) + } + + const now = Date.now() + const lastLogged = recentWarnings.get(queryPreview) + if (lastLogged !== undefined && now - lastLogged < WARN_DEDUPE_WINDOW_MS) return + if (recentWarnings.size >= WARN_DEDUPE_MAX_KEYS) { + for (const [key, loggedAt] of recentWarnings) { + if (now - loggedAt >= WARN_DEDUPE_WINDOW_MS) recentWarnings.delete(key) + } + if (recentWarnings.size >= WARN_DEDUPE_MAX_KEYS) recentWarnings.clear() + } + recentWarnings.set(queryPreview, now) + logger.error(message, { poolName, queryPreview, stack: new Error().stack }) +} + +/** + * Minimal structural surface of a postgres-js root client needed by the + * instrumentation. Method syntax keeps the real `Sql` client assignable and + * lets tests pass lightweight fakes. + */ +export interface InstrumentablePoolClient { + unsafe(query: string, ...rest: never[]): unknown + begin(...args: never[]): unknown +} + +interface CallableClient { + unsafe(...args: unknown[]): unknown + begin(...args: unknown[]): unknown +} + +/** + * Instrument a postgres-js ROOT client so nested pool checkouts inside + * transactions are detected at runtime: + * + * - `begin(...)` (which `db.transaction(...)` delegates to) runs its callback + * inside the ambient transaction context. + * - `unsafe(...)` — the funnel for every drizzle query issued on the root + * client — reports when called while that context is set: the caller is + * inside a transaction but querying the global pool instead of the tx + * handle. + * - `begin(...)` itself reports when called inside the context: a nested + * `db.transaction(...)` on the global client (savepoints via + * `tx.transaction` use the reserved connection and are unaffected). + * + * Queries on the reserved connection that `begin` hands to its callback go + * through that connection's own methods, not the root client's, so + * transaction-handle queries are exempt by construction. + */ +export function instrumentPoolClient( + client: T, + poolName: string +): T { + // double-cast-allowed: widens the postgres client's generic method signatures to plain callables so the wrappers can forward arbitrary arguments unchanged + const target = client as unknown as CallableClient + const rawUnsafe = target.unsafe.bind(target) + const rawBegin = target.begin.bind(target) + + target.unsafe = (...args: unknown[]) => { + if (transactionContext.getStore()) { + report(poolName, typeof args[0] === 'string' ? args[0] : '(non-string query)') + } + return rawUnsafe(...args) + } + + target.begin = (...args: unknown[]) => { + if (transactionContext.getStore()) { + report(poolName, 'BEGIN (nested transaction opened on the global client)') + } + const callbackIndex = args.length - 1 + const callback = args[callbackIndex] + if (typeof callback === 'function') { + args[callbackIndex] = (...callbackArgs: unknown[]) => + transactionContext.run(true, () => + (callback as (...inner: unknown[]) => unknown)(...callbackArgs) + ) + } + // postgres-js issues its internal BEGIN/COMMIT statements through the + // root client's `unsafe`; running `begin` outside the ambient context + // keeps those from re-reporting a nested transaction the wrapper above + // already reported. The callback wrapper re-enters the context itself. + return transactionContext.exit(() => rawBegin(...args)) + } + + return client +} diff --git a/packages/db/vitest.config.ts b/packages/db/vitest.config.ts index ce441ea3306..997ab96aefe 100644 --- a/packages/db/vitest.config.ts +++ b/packages/db/vitest.config.ts @@ -4,6 +4,6 @@ export default defineConfig({ test: { globals: false, environment: 'node', - include: ['scripts/**/*.test.ts'], + include: ['scripts/**/*.test.ts', '*.test.ts'], }, }) diff --git a/packages/testing/src/mocks/database.mock.ts b/packages/testing/src/mocks/database.mock.ts index 7c7b9d6b489..bb34193d505 100644 --- a/packages/testing/src/mocks/database.mock.ts +++ b/packages/testing/src/mocks/database.mock.ts @@ -250,6 +250,8 @@ export const dbChainMock = { db: dbChainInstance, /** Same instance as `db` so per-test chain overrides cover both clients. */ dbReplica: dbChainInstance, + runOutsideTransactionContext: (fn: () => T): T => fn(), + instrumentPoolClient: (client: T): T => client, } /** @@ -320,6 +322,8 @@ export const databaseMock = { /** Same instance as `db` so per-test overrides cover both clients. */ dbReplica: mockDbInstance, sql: createMockSql(), + runOutsideTransactionContext: (fn: () => T): T => fn(), + instrumentPoolClient: (client: T): T => client, ...createMockSqlOperators(), } From a1ddd02dc68e731238ca83b1db33299f98f114bc Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 11 Jun 2026 17:10:56 -0700 Subject: [PATCH 2/2] update docs --- apps/docs/content/docs/en/platform/costs.mdx | 4 ++-- apps/docs/content/docs/en/platform/permissions.mdx | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/docs/content/docs/en/platform/costs.mdx b/apps/docs/content/docs/en/platform/costs.mdx index 8a360e5a2e1..934cc91fd8f 100644 --- a/apps/docs/content/docs/en/platform/costs.mdx +++ b/apps/docs/content/docs/en/platform/costs.mdx @@ -317,9 +317,9 @@ By default, your usage is capped at the credits included in your plan. To allow | **Free** | 1 | — | | **Pro** | Up to 3 | — | | **Max** | Up to 10 | — | -| **Team / Enterprise** | Unlimited | Unlimited | +| **Team / Enterprise** | — | Unlimited (Owners and Admins) | -Team and Enterprise plans unlock shared workspaces that belong to your organization. Internal members invited to a shared workspace join the organization and count toward your seat total. Existing Sim users who already belong to another organization can be added as external workspace members; they get workspace access without joining your organization or using one of your seats. When a Team or Enterprise subscription is cancelled or downgraded, existing shared workspaces remain accessible to current members but new invites are disabled until the organization is upgraded again. +Team and Enterprise plans unlock shared workspaces that belong to your organization. Every workspace created under a Team or Enterprise plan is organization-owned: Owners and Admins can create unlimited shared workspaces, while organization Members cannot create workspaces (personal workspaces created before joining the organization remain accessible). Internal members invited to a shared workspace join the organization and count toward your seat total — Enterprise invites require an available seat at invite time, while Team plans add a seat automatically when the invitee accepts. Existing Sim users who already belong to another organization can be added as external workspace members; they get workspace access without joining your organization or using one of your seats. When a Team or Enterprise subscription is cancelled or downgraded, existing shared workspaces remain accessible to current members but new invites are disabled until the organization is upgraded again. ### Rate Limits diff --git a/apps/docs/content/docs/en/platform/permissions.mdx b/apps/docs/content/docs/en/platform/permissions.mdx index 22024e2eb73..df92c38bef4 100644 --- a/apps/docs/content/docs/en/platform/permissions.mdx +++ b/apps/docs/content/docs/en/platform/permissions.mdx @@ -23,7 +23,9 @@ Sim has two kinds of workspaces: | **Free** | 1 | — | | **Pro** | Up to 3 | — | | **Max** | Up to 10 | — | -| **Team / Enterprise** | Unlimited | Unlimited (seat-gated invites) | +| **Team / Enterprise** | — | Unlimited (Owners and Admins) | + +On Team and Enterprise plans, every workspace you create belongs to the organization. Organization Owners and Admins can create unlimited shared workspaces; organization Members cannot create workspaces. Personal workspaces created before joining the organization remain accessible. Enterprise invites require an available seat at invite time; on Team plans, a seat is added automatically when the invitee accepts. When a Team or Enterprise subscription is cancelled or downgraded, existing shared workspaces stay accessible to current members. New invitations are blocked until the organization is upgraded again.