diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index f5cd30dd27..2bed3b05d9 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -8,18 +8,13 @@ import { isValidDuration } from "./services/realtime/duration.server"; // `z.string()` constrained to a `parseDuration`-parseable string (e.g. // `7d`, `1h`). Validated at boot so a typo'd duration fails fast. function durationString() { - return z - .string() - .refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y"); + return z.string().refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y"); } // Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a // non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS // and its _REQUIRED variant. Adds zod issues for empty input or unknown names. -const parseMachinePresetCsv = ( - raw: string, - ctx: z.RefinementCtx -): MachinePresetName[] => { +const parseMachinePresetCsv = (raw: string, ctx: z.RefinementCtx): MachinePresetName[] => { const names = raw .split(",") .map((s) => s.trim()) @@ -496,10 +491,7 @@ const EnvironmentSchema = z .string() .optional() .transform((v, ctx) => - parseMachinePresetCsv( - v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x", - ctx - ) + parseMachinePresetCsv(v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x", ctx) ), DEPLOY_IMAGE_PLATFORM: z.string().default("linux/amd64"), @@ -671,6 +663,7 @@ const EnvironmentSchema = z ALERT_RATE_LIMITER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), LOOPS_API_KEY: z.string().optional(), + ATTIO_API_KEY: z.string().optional(), MARQS_DISABLE_REBALANCING: BoolEnv.default(false), MARQS_VISIBILITY_TIMEOUT_MS: z.coerce .number() @@ -1154,7 +1147,9 @@ const EnvironmentSchema = z // setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a // no-op because the gate-side singleton refuses to construct a buffer // when the system is off. - TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), + TRIGGER_MOLLIFIER_DRAINER_ENABLED: z + .string() + .default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"), TRIGGER_MOLLIFIER_REDIS_HOST: z .string() @@ -1164,7 +1159,7 @@ const EnvironmentSchema = z .number() .optional() .transform( - (v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined), + (v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined) ), TRIGGER_MOLLIFIER_REDIS_USERNAME: z .string() @@ -1174,7 +1169,9 @@ const EnvironmentSchema = z .string() .optional() .transform((v) => v ?? process.env.REDIS_PASSWORD), - TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z + .string() + .default(process.env.REDIS_TLS_DISABLED ?? "false"), TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200), TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100), TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500), @@ -1238,11 +1235,7 @@ const EnvironmentSchema = z // (retrieve, trace) have a safety net while PG replica lag settles. TRIGGER_MOLLIFIER_ACK_GRACE_TTL_SECONDS: z.coerce.number().int().positive().default(30), // ioredis per-request retry limit on the buffer's Redis client. - TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce - .number() - .int() - .positive() - .default(20), + TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce.number().int().positive().default(20), // ioredis reconnect backoff envelope for the buffer client: the base // grows by `STEP_MS` per attempt, capped at `MAX_MS`, then equal-jittered. TRIGGER_MOLLIFIER_REDIS_RECONNECT_STEP_MS: z.coerce.number().int().positive().default(50), diff --git a/apps/webapp/app/models/organization.server.ts b/apps/webapp/app/models/organization.server.ts index 14315dd337..dfca917c09 100644 --- a/apps/webapp/app/models/organization.server.ts +++ b/apps/webapp/app/models/organization.server.ts @@ -14,6 +14,7 @@ import { env } from "~/env.server"; import { featuresForUrl } from "~/features.server"; import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server"; import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server"; +import { enqueueAttioWorkspaceSync } from "~/services/attio.server"; export type { Organization }; const nanoid = customAlphabet("1234567890abcdef", 4); @@ -82,6 +83,16 @@ export async function createOrganization( }, }); + // Fire-and-forget; never blocks org creation. + void enqueueAttioWorkspaceSync({ + orgId: organization.id, + title: organization.title, + slug: organization.slug, + companySize: organization.companySize, + createdAt: organization.createdAt, + adminUserId: userId, + }); + return { ...organization }; } diff --git a/apps/webapp/app/services/attio.server.ts b/apps/webapp/app/services/attio.server.ts new file mode 100644 index 0000000000..9c5aad407d --- /dev/null +++ b/apps/webapp/app/services/attio.server.ts @@ -0,0 +1,121 @@ +import { z } from "zod"; +import { env } from "~/env.server"; +import { logger } from "./logger.server"; + +// Syncs new orgs/users into Attio (workspaces/users objects) at signup, via the +// common worker so a slow Attio never blocks signup. Ongoing field updates are +// handled by the scheduled sync, not here. No-op without ATTIO_API_KEY. + +const ATTIO_API = "https://api.attio.com/v2"; +const IS_TEST = env.APP_ENV !== "production"; + +export const AttioWorkspaceSyncSchema = z.object({ + orgId: z.string(), + title: z.string(), + slug: z.string(), + companySize: z.string().nullish(), + createdAt: z.coerce.date(), + adminUserId: z.string(), +}); +export type AttioWorkspaceSync = z.infer; + +export const AttioUserSyncSchema = z.object({ + userId: z.string(), + email: z.string(), + referralSource: z.string().nullish(), + marketingEmails: z.boolean(), + createdAt: z.coerce.date(), +}); +export type AttioUserSync = z.infer; + +class AttioClient { + constructor(private readonly apiKey: string) {} + + // Create-or-update by unique attribute; returns the record id. Throws on failure so the worker retries. + async #assert(object: string, matchingAttribute: string, values: Record): Promise { + const url = `${ATTIO_API}/objects/${object}/records?matching_attribute=${matchingAttribute}`; + const response = await fetch(url, { + method: "PUT", + headers: { Authorization: `Bearer ${this.apiKey}`, "Content-Type": "application/json" }, + body: JSON.stringify({ data: { values } }), + }); + + if (!response.ok) { + const body = await response.text(); + logger.error("Attio assert failed", { object, matchingAttribute, status: response.status, body }); + throw new Error(`Attio assert ${object} failed with status ${response.status}`); + } + + return ((await response.json()) as any).data?.id?.record_id as string; + } + + async upsertWorkspace(payload: AttioWorkspaceSync) { + // The creating user is an admin of the new org — set their role and link them to the workspace. + const adminRecordId = await this.#assert("users", "user_id", { + user_id: payload.adminUserId, + role: "Admin", + is_test: IS_TEST, + }); + + await this.#assert("workspaces", "workspace_id", { + workspace_id: payload.orgId, + name: payload.title, + org_slug: payload.slug, + company_size: payload.companySize ?? undefined, + signup_date: toDate(payload.createdAt), + plan: "Free", + account_status: "Active", + is_test: IS_TEST, + users: [{ target_object: "users", target_record_id: adminRecordId }], + }); + } + + async upsertUser(payload: AttioUserSync) { + await this.#assert("users", "user_id", { + user_id: payload.userId, + primary_email_address: payload.email, + marketing_opt_in: payload.marketingEmails, + referral_source: payload.referralSource ?? undefined, + signup_date: toDate(payload.createdAt), + is_test: IS_TEST, + }); + } +} + +// Attio `date` attributes want a bare YYYY-MM-DD value. +function toDate(date: Date): string { + return date.toISOString().slice(0, 10); +} + +export const attioClient = env.ATTIO_API_KEY ? new AttioClient(env.ATTIO_API_KEY) : null; + +export async function enqueueAttioWorkspaceSync(payload: AttioWorkspaceSync) { + if (!attioClient) return; + try { + // Lazy import to avoid a circular dependency with commonWorker (which imports this module's schemas). + const { commonWorker } = await import("~/v3/commonWorker.server"); + await commonWorker.enqueue({ id: `attio:workspace:${payload.orgId}`, job: "attio.syncWorkspace", payload }); + } catch (error) { + logger.error("Failed to enqueue Attio workspace sync", { orgId: payload.orgId, error }); + } +} + +export async function enqueueAttioUserSync(payload: AttioUserSync) { + if (!attioClient) return; + try { + const { commonWorker } = await import("~/v3/commonWorker.server"); + await commonWorker.enqueue({ id: `attio:user:${payload.userId}`, job: "attio.syncUser", payload }); + } catch (error) { + logger.error("Failed to enqueue Attio user sync", { userId: payload.userId, error }); + } +} + +export async function runAttioWorkspaceSync(payload: AttioWorkspaceSync) { + if (!attioClient) return; + await attioClient.upsertWorkspace(payload); +} + +export async function runAttioUserSync(payload: AttioUserSync) { + if (!attioClient) return; + await attioClient.upsertUser(payload); +} diff --git a/apps/webapp/app/services/telemetry.server.ts b/apps/webapp/app/services/telemetry.server.ts index f8bd3d3d99..ad67871c19 100644 --- a/apps/webapp/app/services/telemetry.server.ts +++ b/apps/webapp/app/services/telemetry.server.ts @@ -5,6 +5,7 @@ import type { Organization } from "~/models/organization.server"; import type { Project } from "~/models/project.server"; import type { User } from "~/models/user.server"; import { singleton } from "~/utils/singleton"; +import { enqueueAttioUserSync } from "./attio.server"; import { loopsClient } from "./loops.server"; type Options = { @@ -74,6 +75,14 @@ class Telemetry { email: user.email, name: user.name, }); + + enqueueAttioUserSync({ + userId: user.id, + email: user.email, + referralSource: referralSource ?? user.referralSource, + marketingEmails: user.marketingEmails, + createdAt: user.createdAt, + }); } }, }; diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts index a2fae9c73c..16dc388a6d 100644 --- a/apps/webapp/app/v3/commonWorker.server.ts +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -5,6 +5,12 @@ import { z } from "zod"; import { env } from "~/env.server"; import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server"; import { sendEmail } from "~/services/email.server"; +import { + AttioUserSyncSchema, + AttioWorkspaceSyncSchema, + runAttioUserSync, + runAttioWorkspaceSync, +} from "~/services/attio.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { DeliverAlertService } from "./services/alerts/deliverAlert.server"; @@ -46,6 +52,20 @@ function initializeWorker() { maxAttempts: 3, }, }, + "attio.syncWorkspace": { + schema: AttioWorkspaceSyncSchema, + visibilityTimeoutMs: 30_000, + retry: { + maxAttempts: 3, + }, + }, + "attio.syncUser": { + schema: AttioUserSyncSchema, + visibilityTimeoutMs: 30_000, + retry: { + maxAttempts: 3, + }, + }, "v3.resumeBatchRun": { schema: z.object({ batchRunId: z.string(), @@ -213,6 +233,12 @@ function initializeWorker() { scheduleEmail: async ({ payload }) => { await sendEmail(payload); }, + "attio.syncWorkspace": async ({ payload }) => { + await runAttioWorkspaceSync(payload); + }, + "attio.syncUser": async ({ payload }) => { + await runAttioUserSync(payload); + }, "v3.resumeBatchRun": async ({ payload }) => { const service = new ResumeBatchRunService(); await service.call(payload.batchRunId);