From 328963ece0631cfc66bc6e29de92605fc0706b73 Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 11 Jun 2026 10:26:53 -0700 Subject: [PATCH 1/5] improvement(db): route additional staleness-tolerant reads to the read replica --- .../[id]/metrics/executions/route.ts | 8 ++++---- apps/sim/lib/copilot/chat/process-contents.ts | 8 ++++---- apps/sim/lib/copilot/chat/workspace-context.ts | 18 +++++++++--------- apps/sim/lib/knowledge/chunks/service.ts | 6 +++--- apps/sim/lib/knowledge/tags/service.ts | 12 ++++++------ apps/sim/lib/workspace-events/no-activity.ts | 8 ++++---- apps/sim/lib/workspace-events/rules.ts | 10 +++++----- 7 files changed, 35 insertions(+), 35 deletions(-) diff --git a/apps/sim/app/api/workspaces/[id]/metrics/executions/route.ts b/apps/sim/app/api/workspaces/[id]/metrics/executions/route.ts index 9562343261c..2bf216a930f 100644 --- a/apps/sim/app/api/workspaces/[id]/metrics/executions/route.ts +++ b/apps/sim/app/api/workspaces/[id]/metrics/executions/route.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { pausedExecutions, permissions, workflow, workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, gte, inArray, isNotNull, isNull, lte, or, type SQL, sql } from 'drizzle-orm' @@ -60,7 +60,7 @@ export const GET = withRouteHandler( wfWhere.push(inArray(workflow.id, wfList)) } - const workflows = await db + const workflows = await dbReplica .select({ id: workflow.id, name: workflow.name }) .from(workflow) .where(and(...wfWhere)) @@ -124,7 +124,7 @@ export const GET = withRouteHandler( } if (isAllTime) { - const boundsQuery = db + const boundsQuery = dbReplica .select({ minDate: sql`MIN(${workflowExecutionLogs.startedAt})`, maxDate: sql`MAX(${workflowExecutionLogs.startedAt})`, @@ -168,7 +168,7 @@ export const GET = withRouteHandler( lte(workflowExecutionLogs.startedAt, end), ] - const logs = await db + const logs = await dbReplica .select({ workflowId: workflowExecutionLogs.workflowId, level: workflowExecutionLogs.level, diff --git a/apps/sim/lib/copilot/chat/process-contents.ts b/apps/sim/lib/copilot/chat/process-contents.ts index 076d77f5707..9bc4a101c50 100644 --- a/apps/sim/lib/copilot/chat/process-contents.ts +++ b/apps/sim/lib/copilot/chat/process-contents.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { knowledgeBase } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { @@ -454,7 +454,7 @@ async function processKnowledgeFromDb( if (currentWorkspaceId) { conditions.push(eq(knowledgeBase.workspaceId, currentWorkspaceId)) } - const kbRows = await db + const kbRows = await dbReplica .select({ id: knowledgeBase.id, name: knowledgeBase.name, @@ -562,8 +562,8 @@ async function processExecutionLogFromDb( ): Promise { try { const { workflowExecutionLogs, workflow } = await import('@sim/db/schema') - const { db } = await import('@sim/db') - const rows = await db + const { dbReplica } = await import('@sim/db') + const rows = await dbReplica .select({ id: workflowExecutionLogs.id, workflowId: workflowExecutionLogs.workflowId, diff --git a/apps/sim/lib/copilot/chat/workspace-context.ts b/apps/sim/lib/copilot/chat/workspace-context.ts index 09c9a02c51d..d1a4b4ded82 100644 --- a/apps/sim/lib/copilot/chat/workspace-context.ts +++ b/apps/sim/lib/copilot/chat/workspace-context.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { knowledgeBase, knowledgeConnector, @@ -302,7 +302,7 @@ export async function generateWorkspaceContext( ] = await Promise.all([ getUsersWithPermissions(workspaceId), - db + dbReplica .select({ id: workflow.id, name: workflow.name, @@ -314,7 +314,7 @@ export async function generateWorkspaceContext( .from(workflow) .where(and(eq(workflow.workspaceId, workspaceId), isNull(workflow.archivedAt))), - db + dbReplica .select({ id: workflowFolder.id, name: workflowFolder.name, @@ -323,7 +323,7 @@ export async function generateWorkspaceContext( .from(workflowFolder) .where(and(eq(workflowFolder.workspaceId, workspaceId), isNull(workflowFolder.archivedAt))), - db + dbReplica .select({ id: knowledgeBase.id, name: knowledgeBase.name, @@ -332,7 +332,7 @@ export async function generateWorkspaceContext( .from(knowledgeBase) .where(and(eq(knowledgeBase.workspaceId, workspaceId), isNull(knowledgeBase.deletedAt))), - db + dbReplica .select({ id: userTableDefinitions.id, name: userTableDefinitions.name, @@ -352,7 +352,7 @@ export async function generateWorkspaceContext( listCustomTools({ userId, workspaceId }), - db + dbReplica .select({ id: mcpServers.id, name: mcpServers.name, @@ -364,7 +364,7 @@ export async function generateWorkspaceContext( listSkills({ workspaceId, includeBuiltins: false }), - db + dbReplica .select({ id: workflowSchedule.id, jobTitle: workflowSchedule.jobTitle, @@ -388,7 +388,7 @@ export async function generateWorkspaceContext( tables.length > 0 ? await Promise.all( tables.map(async (t) => { - const [row] = await db + const [row] = await dbReplica .select({ count: count() }) .from(userTableRows) .where(eq(userTableRows.tableId, t.id)) @@ -400,7 +400,7 @@ export async function generateWorkspaceContext( const kbIds = kbs.map((kb) => kb.id) const connectorRows = kbIds.length > 0 - ? await db + ? await dbReplica .select({ knowledgeBaseId: knowledgeConnector.knowledgeBaseId, connectorType: knowledgeConnector.connectorType, diff --git a/apps/sim/lib/knowledge/chunks/service.ts b/apps/sim/lib/knowledge/chunks/service.ts index ab5c307397b..e2e5d03cdb7 100644 --- a/apps/sim/lib/knowledge/chunks/service.ts +++ b/apps/sim/lib/knowledge/chunks/service.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { document, embedding, knowledgeBase } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { sha256Hex } from '@sim/security/hash' @@ -46,7 +46,7 @@ export async function queryChunks( conditions.push(ilike(embedding.content, `%${search}%`)) } - const chunks = await db + const chunks = await dbReplica .select({ id: embedding.id, chunkIndex: embedding.chunkIndex, @@ -82,7 +82,7 @@ export async function queryChunks( .limit(limit) .offset(offset) - const totalCount = await db + const totalCount = await dbReplica .select({ count: sql`count(*)` }) .from(embedding) .where(and(...conditions)) diff --git a/apps/sim/lib/knowledge/tags/service.ts b/apps/sim/lib/knowledge/tags/service.ts index 86b5b9e3d02..08687daed9d 100644 --- a/apps/sim/lib/knowledge/tags/service.ts +++ b/apps/sim/lib/knowledge/tags/service.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { document, embedding, knowledgeBaseTagDefinitions } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { generateId } from '@sim/utils/id' @@ -99,7 +99,7 @@ export async function getNextAvailableSlot( export async function getDocumentTagDefinitions( knowledgeBaseId: string ): Promise { - const definitions = await db + const definitions = await dbReplica .select({ id: knowledgeBaseTagDefinitions.id, knowledgeBaseId: knowledgeBaseTagDefinitions.knowledgeBaseId, @@ -123,7 +123,7 @@ export async function getDocumentTagDefinitions( * Get all tag definitions for a knowledge base (alias for compatibility) */ export async function getTagDefinitions(knowledgeBaseId: string): Promise { - const tagDefinitions = await db + const tagDefinitions = await dbReplica .select({ id: knowledgeBaseTagDefinitions.id, tagSlot: knowledgeBaseTagDefinitions.tagSlot, @@ -655,7 +655,7 @@ export async function getTagUsage( whereConditions.push(sql`${sql.raw(tagSlot)} != ''`) } - const documentsWithTag = await db + const documentsWithTag = await dbReplica .select({ id: document.id, filename: document.filename, @@ -703,7 +703,7 @@ export async function getTagUsageStats( const tagSlot = def.tagSlot validateTagSlot(tagSlot) - const docCountResult = await db + const docCountResult = await dbReplica .select({ count: sql`count(*)` }) .from(document) .where( @@ -716,7 +716,7 @@ export async function getTagUsageStats( ) ) - const chunkCountResult = await db + const chunkCountResult = await dbReplica .select({ count: sql`count(*)` }) .from(embedding) .innerJoin(document, eq(embedding.documentId, document.id)) diff --git a/apps/sim/lib/workspace-events/no-activity.ts b/apps/sim/lib/workspace-events/no-activity.ts index de69868997a..840daedc385 100644 --- a/apps/sim/lib/workspace-events/no-activity.ts +++ b/apps/sim/lib/workspace-events/no-activity.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { webhook, workflow, workflowDeploymentVersion, workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, asc, eq, gt, gte, inArray, isNull, ne, or, sql } from 'drizzle-orm' @@ -42,7 +42,7 @@ export interface NoActivityPollResult { async function fetchNoActivitySubscriptionPage( afterWebhookId: string | null ): Promise { - const rows = await db + const rows = await dbReplica .select({ webhook, workflow }) .from(webhook) .innerJoin(workflow, eq(webhook.workflowId, workflow.id)) @@ -105,7 +105,7 @@ async function fetchWatchedWorkflowPage( conditions.push(gt(workflow.id, afterWorkflowId)) } - return db + return dbReplica .select({ id: workflow.id, name: workflow.name }) .from(workflow) .where(and(...conditions)) @@ -120,7 +120,7 @@ async function hasRecentActivity( ): Promise { const windowStart = new Date(Date.now() - config.inactivityHours * 60 * 60 * 1000) - const recentLogs = await db + const recentLogs = await dbReplica .select({ id: workflowExecutionLogs.id }) .from(workflowExecutionLogs) .where( diff --git a/apps/sim/lib/workspace-events/rules.ts b/apps/sim/lib/workspace-events/rules.ts index 674d446076b..7204f0ea8ba 100644 --- a/apps/sim/lib/workspace-events/rules.ts +++ b/apps/sim/lib/workspace-events/rules.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, avg, count, desc, eq, gte, ne, type SQL, sql } from 'drizzle-orm' @@ -22,7 +22,7 @@ export function excludeSimExecutionsCondition(): SQL { } async function checkConsecutiveFailures(workflowId: string, threshold: number): Promise { - const recentLogs = await db + const recentLogs = await dbReplica .select({ level: workflowExecutionLogs.level }) .from(workflowExecutionLogs) .where(and(eq(workflowExecutionLogs.workflowId, workflowId), excludeSimExecutionsCondition())) @@ -51,7 +51,7 @@ async function checkFailureRate( // Single DB-side aggregate: the window is user-configured and this runs on // the execution-completion path, so never materialize the in-window rows. - const result = await db + const result = await dbReplica .select({ total: count(), errors: count(sql`case when ${workflowExecutionLogs.level} = 'error' then 1 end`), @@ -82,7 +82,7 @@ async function checkLatencySpike( ): Promise { const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000) - const result = await db + const result = await dbReplica .select({ avgDuration: avg(workflowExecutionLogs.totalDurationMs), count: count(), @@ -114,7 +114,7 @@ async function checkErrorCount( ): Promise { const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000) - const result = await db + const result = await dbReplica .select({ count: count() }) .from(workflowExecutionLogs) .where( From 48e5ec2532e589fd041d6c3557f26fa558e5465b Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 11 Jun 2026 10:31:35 -0700 Subject: [PATCH 2/5] fix(db): keep event-rule and tag-slot reads on the primary --- apps/sim/lib/knowledge/tags/service.ts | 4 ++-- apps/sim/lib/workspace-events/rules.ts | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/sim/lib/knowledge/tags/service.ts b/apps/sim/lib/knowledge/tags/service.ts index 08687daed9d..a0a8652e004 100644 --- a/apps/sim/lib/knowledge/tags/service.ts +++ b/apps/sim/lib/knowledge/tags/service.ts @@ -99,7 +99,7 @@ export async function getNextAvailableSlot( export async function getDocumentTagDefinitions( knowledgeBaseId: string ): Promise { - const definitions = await dbReplica + const definitions = await db .select({ id: knowledgeBaseTagDefinitions.id, knowledgeBaseId: knowledgeBaseTagDefinitions.knowledgeBaseId, @@ -123,7 +123,7 @@ export async function getDocumentTagDefinitions( * Get all tag definitions for a knowledge base (alias for compatibility) */ export async function getTagDefinitions(knowledgeBaseId: string): Promise { - const tagDefinitions = await dbReplica + const tagDefinitions = await db .select({ id: knowledgeBaseTagDefinitions.id, tagSlot: knowledgeBaseTagDefinitions.tagSlot, diff --git a/apps/sim/lib/workspace-events/rules.ts b/apps/sim/lib/workspace-events/rules.ts index 7204f0ea8ba..674d446076b 100644 --- a/apps/sim/lib/workspace-events/rules.ts +++ b/apps/sim/lib/workspace-events/rules.ts @@ -1,4 +1,4 @@ -import { dbReplica } from '@sim/db' +import { db } from '@sim/db' import { workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, avg, count, desc, eq, gte, ne, type SQL, sql } from 'drizzle-orm' @@ -22,7 +22,7 @@ export function excludeSimExecutionsCondition(): SQL { } async function checkConsecutiveFailures(workflowId: string, threshold: number): Promise { - const recentLogs = await dbReplica + const recentLogs = await db .select({ level: workflowExecutionLogs.level }) .from(workflowExecutionLogs) .where(and(eq(workflowExecutionLogs.workflowId, workflowId), excludeSimExecutionsCondition())) @@ -51,7 +51,7 @@ async function checkFailureRate( // Single DB-side aggregate: the window is user-configured and this runs on // the execution-completion path, so never materialize the in-window rows. - const result = await dbReplica + const result = await db .select({ total: count(), errors: count(sql`case when ${workflowExecutionLogs.level} = 'error' then 1 end`), @@ -82,7 +82,7 @@ async function checkLatencySpike( ): Promise { const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000) - const result = await dbReplica + const result = await db .select({ avgDuration: avg(workflowExecutionLogs.totalDurationMs), count: count(), @@ -114,7 +114,7 @@ async function checkErrorCount( ): Promise { const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000) - const result = await dbReplica + const result = await db .select({ count: count() }) .from(workflowExecutionLogs) .where( From 8ecc8457ead031587322811509cf6ff69f6ce44d Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 11 Jun 2026 10:46:30 -0700 Subject: [PATCH 3/5] fix(db): keep chunk listing and tag-usage counts on the primary --- apps/sim/lib/copilot/chat/process-contents.test.ts | 2 +- apps/sim/lib/copilot/chat/process-contents.ts | 1 - apps/sim/lib/knowledge/chunks/service.ts | 6 +++--- apps/sim/lib/knowledge/tags/service.ts | 8 ++++---- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/apps/sim/lib/copilot/chat/process-contents.test.ts b/apps/sim/lib/copilot/chat/process-contents.test.ts index c55f0e83a90..76033ef908d 100644 --- a/apps/sim/lib/copilot/chat/process-contents.test.ts +++ b/apps/sim/lib/copilot/chat/process-contents.test.ts @@ -7,7 +7,7 @@ import type { ChatContext } from '@/stores/panel' const { getSkillById } = vi.hoisted(() => ({ getSkillById: vi.fn() })) -vi.mock('@sim/db', () => ({ db: {} })) +vi.mock('@sim/db', () => ({ db: {}, dbReplica: {} })) vi.mock('@sim/db/schema', () => ({ document: {}, knowledgeBase: {} })) vi.mock('@/lib/workflows/skills/operations', () => ({ getSkillById })) diff --git a/apps/sim/lib/copilot/chat/process-contents.ts b/apps/sim/lib/copilot/chat/process-contents.ts index 9bc4a101c50..cfb5abd962a 100644 --- a/apps/sim/lib/copilot/chat/process-contents.ts +++ b/apps/sim/lib/copilot/chat/process-contents.ts @@ -562,7 +562,6 @@ async function processExecutionLogFromDb( ): Promise { try { const { workflowExecutionLogs, workflow } = await import('@sim/db/schema') - const { dbReplica } = await import('@sim/db') const rows = await dbReplica .select({ id: workflowExecutionLogs.id, diff --git a/apps/sim/lib/knowledge/chunks/service.ts b/apps/sim/lib/knowledge/chunks/service.ts index e2e5d03cdb7..ab5c307397b 100644 --- a/apps/sim/lib/knowledge/chunks/service.ts +++ b/apps/sim/lib/knowledge/chunks/service.ts @@ -1,4 +1,4 @@ -import { db, dbReplica } from '@sim/db' +import { db } from '@sim/db' import { document, embedding, knowledgeBase } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { sha256Hex } from '@sim/security/hash' @@ -46,7 +46,7 @@ export async function queryChunks( conditions.push(ilike(embedding.content, `%${search}%`)) } - const chunks = await dbReplica + const chunks = await db .select({ id: embedding.id, chunkIndex: embedding.chunkIndex, @@ -82,7 +82,7 @@ export async function queryChunks( .limit(limit) .offset(offset) - const totalCount = await dbReplica + const totalCount = await db .select({ count: sql`count(*)` }) .from(embedding) .where(and(...conditions)) diff --git a/apps/sim/lib/knowledge/tags/service.ts b/apps/sim/lib/knowledge/tags/service.ts index a0a8652e004..86b5b9e3d02 100644 --- a/apps/sim/lib/knowledge/tags/service.ts +++ b/apps/sim/lib/knowledge/tags/service.ts @@ -1,4 +1,4 @@ -import { db, dbReplica } from '@sim/db' +import { db } from '@sim/db' import { document, embedding, knowledgeBaseTagDefinitions } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { generateId } from '@sim/utils/id' @@ -655,7 +655,7 @@ export async function getTagUsage( whereConditions.push(sql`${sql.raw(tagSlot)} != ''`) } - const documentsWithTag = await dbReplica + const documentsWithTag = await db .select({ id: document.id, filename: document.filename, @@ -703,7 +703,7 @@ export async function getTagUsageStats( const tagSlot = def.tagSlot validateTagSlot(tagSlot) - const docCountResult = await dbReplica + const docCountResult = await db .select({ count: sql`count(*)` }) .from(document) .where( @@ -716,7 +716,7 @@ export async function getTagUsageStats( ) ) - const chunkCountResult = await dbReplica + const chunkCountResult = await db .select({ count: sql`count(*)` }) .from(embedding) .innerJoin(document, eq(embedding.documentId, document.id)) From 4f92b15997abe79e7b694027e1d62141b14e007d Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 11 Jun 2026 10:51:30 -0700 Subject: [PATCH 4/5] fix(db): execution-log mention lookup stays on the primary --- apps/sim/lib/copilot/chat/process-contents.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/copilot/chat/process-contents.ts b/apps/sim/lib/copilot/chat/process-contents.ts index cfb5abd962a..cc930889f13 100644 --- a/apps/sim/lib/copilot/chat/process-contents.ts +++ b/apps/sim/lib/copilot/chat/process-contents.ts @@ -1,4 +1,4 @@ -import { dbReplica } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { knowledgeBase } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { @@ -562,7 +562,7 @@ async function processExecutionLogFromDb( ): Promise { try { const { workflowExecutionLogs, workflow } = await import('@sim/db/schema') - const rows = await dbReplica + const rows = await db .select({ id: workflowExecutionLogs.id, workflowId: workflowExecutionLogs.workflowId, From 293dc79b2e361234eef7ab22cad612202f2e5b1b Mon Sep 17 00:00:00 2001 From: waleed Date: Thu, 11 Jun 2026 11:06:23 -0700 Subject: [PATCH 5/5] fix(db): no-activity decision read stays on the primary --- apps/sim/lib/workspace-events/no-activity.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/workspace-events/no-activity.ts b/apps/sim/lib/workspace-events/no-activity.ts index 840daedc385..a9fbc744c14 100644 --- a/apps/sim/lib/workspace-events/no-activity.ts +++ b/apps/sim/lib/workspace-events/no-activity.ts @@ -1,4 +1,4 @@ -import { dbReplica } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { webhook, workflow, workflowDeploymentVersion, workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, asc, eq, gt, gte, inArray, isNull, ne, or, sql } from 'drizzle-orm' @@ -120,7 +120,7 @@ async function hasRecentActivity( ): Promise { const windowStart = new Date(Date.now() - config.inactivityHours * 60 * 60 * 1000) - const recentLogs = await dbReplica + const recentLogs = await db .select({ id: workflowExecutionLogs.id }) .from(workflowExecutionLogs) .where(