Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 12 additions & 19 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@ import { isValidDuration } from "./services/realtime/duration.server";
// `z.string()` constrained to a `parseDuration`-parseable string (e.g.
// `7d`, `1h`). Validated at boot so a typo'd duration fails fast.
function durationString() {
return z
.string()
.refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
return z.string().refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
}

// Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a
// non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS
// and its _REQUIRED variant. Adds zod issues for empty input or unknown names.
const parseMachinePresetCsv = (
raw: string,
ctx: z.RefinementCtx
): MachinePresetName[] => {
const parseMachinePresetCsv = (raw: string, ctx: z.RefinementCtx): MachinePresetName[] => {
const names = raw
.split(",")
.map((s) => s.trim())
Expand Down Expand Up @@ -496,10 +491,7 @@ const EnvironmentSchema = z
.string()
.optional()
.transform((v, ctx) =>
parseMachinePresetCsv(
v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x",
ctx
)
parseMachinePresetCsv(v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x", ctx)
),

DEPLOY_IMAGE_PLATFORM: z.string().default("linux/amd64"),
Expand Down Expand Up @@ -671,6 +663,7 @@ const EnvironmentSchema = z
ALERT_RATE_LIMITER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

LOOPS_API_KEY: z.string().optional(),
ATTIO_API_KEY: z.string().optional(),
MARQS_DISABLE_REBALANCING: BoolEnv.default(false),
MARQS_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
Expand Down Expand Up @@ -1154,7 +1147,9 @@ const EnvironmentSchema = z
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a buffer
// when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z
.string()
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
.string()
Expand All @@ -1164,7 +1159,7 @@ const EnvironmentSchema = z
.number()
.optional()
.transform(
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
),
TRIGGER_MOLLIFIER_REDIS_USERNAME: z
.string()
Expand All @@ -1174,7 +1169,9 @@ const EnvironmentSchema = z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z
.string()
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
Expand Down Expand Up @@ -1238,11 +1235,7 @@ const EnvironmentSchema = z
// (retrieve, trace) have a safety net while PG replica lag settles.
TRIGGER_MOLLIFIER_ACK_GRACE_TTL_SECONDS: z.coerce.number().int().positive().default(30),
// ioredis per-request retry limit on the buffer's Redis client.
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce
.number()
.int()
.positive()
.default(20),
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce.number().int().positive().default(20),
// ioredis reconnect backoff envelope for the buffer client: the base
// grows by `STEP_MS` per attempt, capped at `MAX_MS`, then equal-jittered.
TRIGGER_MOLLIFIER_REDIS_RECONNECT_STEP_MS: z.coerce.number().int().positive().default(50),
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/models/organization.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { env } from "~/env.server";
import { featuresForUrl } from "~/features.server";
import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server";
import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server";
import { enqueueAttioWorkspaceSync } from "~/services/attio.server";
export type { Organization };

const nanoid = customAlphabet("1234567890abcdef", 4);
Expand Down Expand Up @@ -82,6 +83,16 @@ export async function createOrganization(
},
});

// Fire-and-forget; never blocks org creation.
void enqueueAttioWorkspaceSync({
orgId: organization.id,
title: organization.title,
slug: organization.slug,
companySize: organization.companySize,
createdAt: organization.createdAt,
adminUserId: userId,
});

return { ...organization };
}

Expand Down
121 changes: 121 additions & 0 deletions apps/webapp/app/services/attio.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { z } from "zod";
import { env } from "~/env.server";
import { logger } from "./logger.server";

// Syncs new orgs/users into Attio (workspaces/users objects) at signup, via the
// common worker so a slow Attio never blocks signup. Ongoing field updates are
// handled by the scheduled sync, not here. No-op without ATTIO_API_KEY.

const ATTIO_API = "https://api.attio.com/v2";
const IS_TEST = env.APP_ENV !== "production";

export const AttioWorkspaceSyncSchema = z.object({
orgId: z.string(),
title: z.string(),
slug: z.string(),
companySize: z.string().nullish(),
createdAt: z.coerce.date(),
adminUserId: z.string(),
});
export type AttioWorkspaceSync = z.infer<typeof AttioWorkspaceSyncSchema>;

export const AttioUserSyncSchema = z.object({
userId: z.string(),
email: z.string(),
referralSource: z.string().nullish(),
marketingEmails: z.boolean(),
createdAt: z.coerce.date(),
});
export type AttioUserSync = z.infer<typeof AttioUserSyncSchema>;

class AttioClient {
constructor(private readonly apiKey: string) {}

// Create-or-update by unique attribute; returns the record id. Throws on failure so the worker retries.
async #assert(object: string, matchingAttribute: string, values: Record<string, unknown>): Promise<string> {
const url = `${ATTIO_API}/objects/${object}/records?matching_attribute=${matchingAttribute}`;
const response = await fetch(url, {
method: "PUT",
headers: { Authorization: `Bearer ${this.apiKey}`, "Content-Type": "application/json" },
body: JSON.stringify({ data: { values } }),
});

if (!response.ok) {
const body = await response.text();
logger.error("Attio assert failed", { object, matchingAttribute, status: response.status, body });
throw new Error(`Attio assert ${object} failed with status ${response.status}`);
}

return ((await response.json()) as any).data?.id?.record_id as string;
}

async upsertWorkspace(payload: AttioWorkspaceSync) {
// The creating user is an admin of the new org — set their role and link them to the workspace.
const adminRecordId = await this.#assert("users", "user_id", {
user_id: payload.adminUserId,
role: "Admin",
is_test: IS_TEST,
});

await this.#assert("workspaces", "workspace_id", {
workspace_id: payload.orgId,
name: payload.title,
org_slug: payload.slug,
company_size: payload.companySize ?? undefined,
signup_date: toDate(payload.createdAt),
plan: "Free",
account_status: "Active",
is_test: IS_TEST,
users: [{ target_object: "users", target_record_id: adminRecordId }],
});
}

async upsertUser(payload: AttioUserSync) {
await this.#assert("users", "user_id", {
user_id: payload.userId,
primary_email_address: payload.email,
marketing_opt_in: payload.marketingEmails,
referral_source: payload.referralSource ?? undefined,
signup_date: toDate(payload.createdAt),
is_test: IS_TEST,
});
}
}

// Attio `date` attributes want a bare YYYY-MM-DD value.
function toDate(date: Date): string {
return date.toISOString().slice(0, 10);
}

export const attioClient = env.ATTIO_API_KEY ? new AttioClient(env.ATTIO_API_KEY) : null;

export async function enqueueAttioWorkspaceSync(payload: AttioWorkspaceSync) {
if (!attioClient) return;
try {
// Lazy import to avoid a circular dependency with commonWorker (which imports this module's schemas).
const { commonWorker } = await import("~/v3/commonWorker.server");
await commonWorker.enqueue({ id: `attio:workspace:${payload.orgId}`, job: "attio.syncWorkspace", payload });
} catch (error) {
logger.error("Failed to enqueue Attio workspace sync", { orgId: payload.orgId, error });
}
}

export async function enqueueAttioUserSync(payload: AttioUserSync) {
if (!attioClient) return;
try {
const { commonWorker } = await import("~/v3/commonWorker.server");
await commonWorker.enqueue({ id: `attio:user:${payload.userId}`, job: "attio.syncUser", payload });
} catch (error) {
logger.error("Failed to enqueue Attio user sync", { userId: payload.userId, error });
}
}

export async function runAttioWorkspaceSync(payload: AttioWorkspaceSync) {
if (!attioClient) return;
await attioClient.upsertWorkspace(payload);
}

export async function runAttioUserSync(payload: AttioUserSync) {
if (!attioClient) return;
await attioClient.upsertUser(payload);
}
9 changes: 9 additions & 0 deletions apps/webapp/app/services/telemetry.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Organization } from "~/models/organization.server";
import type { Project } from "~/models/project.server";
import type { User } from "~/models/user.server";
import { singleton } from "~/utils/singleton";
import { enqueueAttioUserSync } from "./attio.server";
import { loopsClient } from "./loops.server";

type Options = {
Expand Down Expand Up @@ -74,6 +75,14 @@ class Telemetry {
email: user.email,
name: user.name,
});

enqueueAttioUserSync({
userId: user.id,
email: user.email,
referralSource: referralSource ?? user.referralSource,
marketingEmails: user.marketingEmails,
createdAt: user.createdAt,
});
}
},
};
Expand Down
26 changes: 26 additions & 0 deletions apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import { z } from "zod";
import { env } from "~/env.server";
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
import { sendEmail } from "~/services/email.server";
import {
AttioUserSyncSchema,
AttioWorkspaceSyncSchema,
runAttioUserSync,
runAttioWorkspaceSync,
} from "~/services/attio.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
Expand Down Expand Up @@ -46,6 +52,20 @@ function initializeWorker() {
maxAttempts: 3,
},
},
"attio.syncWorkspace": {
schema: AttioWorkspaceSyncSchema,
visibilityTimeoutMs: 30_000,
retry: {
maxAttempts: 3,
},
},
"attio.syncUser": {
schema: AttioUserSyncSchema,
visibilityTimeoutMs: 30_000,
retry: {
maxAttempts: 3,
},
},
"v3.resumeBatchRun": {
schema: z.object({
batchRunId: z.string(),
Expand Down Expand Up @@ -213,6 +233,12 @@ function initializeWorker() {
scheduleEmail: async ({ payload }) => {
await sendEmail(payload);
},
"attio.syncWorkspace": async ({ payload }) => {
await runAttioWorkspaceSync(payload);
},
"attio.syncUser": async ({ payload }) => {
await runAttioUserSync(payload);
},
"v3.resumeBatchRun": async ({ payload }) => {
const service = new ResumeBatchRunService();
await service.call(payload.batchRunId);
Expand Down
Loading