Skip to content

Commit 620b838

Browse files
authored
Implement manually invokable jobs through the invokeTrigger (triggerdotdev#700)
* Implement manually invokable jobs through the invokeTrigger Also implemented a job run notification system, that will POST details of a run on completion. This combines with the task callbackUrl system to implement the invokeAndWait * Document the invoke trigger * batch invoke and wait * background fetch timeouts * Use @whatwg-node/fetch instead of the polyfilled fetch * Fix some outdated dependencies in webapp * Improved subtask error propogation messages * Document the OpenAI changes and the batch invoke stuff * Fix dequeuing jobs * Don’t retry the OpenAI completion background task * Added OpenAI changesets * Use the new ResumeTaskService in ProcessCallbackTimeout as well
1 parent c743412 commit 620b838

File tree

73 files changed

+3328
-513
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+3328
-513
lines changed

.changeset/happy-dryers-guess.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Added invokeTrigger(), which allows jobs to be manually invoked

.changeset/lazy-schools-grin.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/integration-kit": patch
3+
"@trigger.dev/openai": patch
4+
---
5+
6+
Allow customizing OpenAI background retries and timeouts

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,5 @@ apps/**/public/build
5252
/test-results/
5353
/playwright-report/
5454
/playwright/.cache/
55+
56+
.cosine

apps/webapp/app/components/primitives/LabelValueStack.tsx

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { cn } from "~/utils/cn";
22
import { Paragraph } from "./Paragraph";
33
import { ArrowTopRightOnSquareIcon } from "@heroicons/react/20/solid";
44
import { SimpleTooltip } from "./Tooltip";
5+
import { Link } from "@remix-run/react";
56

67
const variations = {
78
primary: {
@@ -45,22 +46,48 @@ export function LabelValueStack({
4546
<Paragraph variant={variation.label}>{label}</Paragraph>
4647
<>
4748
{href ? (
48-
<SimpleTooltip
49-
side="bottom"
50-
button={
51-
<Paragraph variant={variation.value}>
52-
<a href={href} className="underline underline-offset-2" target="_blank">
53-
{value}
54-
<ArrowTopRightOnSquareIcon className="ml-1 inline-block h-4 w-4 text-dimmed" />
55-
</a>
56-
</Paragraph>
57-
}
58-
content={href}
59-
/>
49+
<ValueButton value={value} href={href} variant={variant} />
6050
) : (
6151
<Paragraph variant={variation.value}>{value}</Paragraph>
6252
)}
6353
</>
6454
</div>
6555
);
6656
}
57+
58+
type ValueButtonStackProps = {
59+
value: React.ReactNode;
60+
href: string;
61+
variant?: keyof typeof variations;
62+
};
63+
64+
function ValueButton({ value, href, variant = "secondary" }: ValueButtonStackProps) {
65+
const variation = variations[variant];
66+
67+
const isExternalUrl = href.startsWith("http");
68+
69+
if (!isExternalUrl) {
70+
return (
71+
<Paragraph variant={variation.value}>
72+
<Link to={href} reloadDocument className="underline underline-offset-2">
73+
{value}
74+
</Link>
75+
</Paragraph>
76+
);
77+
}
78+
79+
return (
80+
<SimpleTooltip
81+
side="bottom"
82+
button={
83+
<Paragraph variant={variation.value}>
84+
<a href={href} className="underline underline-offset-2" target="_blank">
85+
{value}
86+
<ArrowTopRightOnSquareIcon className="ml-1 inline-block h-4 w-4 text-dimmed" />
87+
</a>
88+
</Paragraph>
89+
}
90+
content={href}
91+
/>
92+
);
93+
}

apps/webapp/app/db.server.ts

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import invariant from "tiny-invariant";
33
import { z } from "zod";
44
import { logger } from "./services/logger.server";
55
import { env } from "./env.server";
6+
import { singleton } from "./utils/singleton";
67

78
export type PrismaTransactionClient = Omit<
89
PrismaClient,
@@ -66,24 +67,7 @@ export async function $transaction<R>(
6667

6768
export { Prisma };
6869

69-
let prisma: PrismaClient;
70-
71-
declare global {
72-
var __db__: PrismaClient;
73-
}
74-
75-
// this is needed because in development we don't want to restart
76-
// the server with every change, but we want to make sure we don't
77-
// create a new connection to the DB with every change either.
78-
// in production we'll have a single connection to the DB.
79-
if (process.env.NODE_ENV === "production") {
80-
prisma = getClient();
81-
} else {
82-
if (!global.__db__) {
83-
global.__db__ = getClient();
84-
}
85-
prisma = global.__db__;
86-
}
70+
export const prisma = singleton("prisma", getClient);
8771

8872
function getClient() {
8973
const { DATABASE_URL } = process.env;
@@ -143,7 +127,6 @@ function getClient() {
143127
return client;
144128
}
145129

146-
export { prisma };
147130
export type { PrismaClient } from "@trigger.dev/database";
148131

149132
export const PrismaErrorSchema = z.object({
Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,47 @@
1-
import { JobRun, JobRunExecution } from "@trigger.dev/database";
1+
import { JobRun } from "@trigger.dev/database";
22
import { PrismaClientOrTransaction } from "~/db.server";
33
import { executionWorker } from "~/services/worker.server";
44

5-
export type EnqueueRunExecutionV2Options = {
5+
export async function dequeueRunExecutionV2(run: JobRun, tx: PrismaClientOrTransaction) {
6+
return await executionWorker.dequeue(`job_run:${run.id}`, {
7+
tx,
8+
});
9+
}
10+
11+
export type EnqueueRunExecutionV3Options = {
612
runAt?: Date;
7-
resumeTaskId?: string;
8-
isRetry?: boolean;
913
skipRetrying?: boolean;
10-
executionCount?: number;
1114
};
1215

13-
export async function enqueueRunExecutionV2(
16+
export async function enqueueRunExecutionV3(
1417
run: JobRun,
1518
tx: PrismaClientOrTransaction,
16-
options: EnqueueRunExecutionV2Options = {}
19+
options: EnqueueRunExecutionV3Options = {}
1720
) {
18-
const job = await executionWorker.enqueue(
19-
"performRunExecutionV2",
21+
const reason = run.status === "PREPROCESSING" ? "PREPROCESS" : "EXECUTE_JOB";
22+
23+
return await executionWorker.enqueue(
24+
"performRunExecutionV3",
2025
{
2126
id: run.id,
22-
reason: run.status === "PREPROCESSING" ? "PREPROCESS" : "EXECUTE_JOB",
23-
resumeTaskId: options.resumeTaskId,
24-
isRetry: typeof options.isRetry === "boolean" ? options.isRetry : false,
27+
reason: reason,
2528
},
2629
{
2730
tx,
2831
runAt: options.runAt,
29-
jobKey: `job_run:${run.id}:${options.executionCount ?? 0}${
30-
options.resumeTaskId ? `:task:${options.resumeTaskId}` : ""
31-
}`,
32+
queueName: `job_run:${run.id}`,
33+
jobKey: `job_run:${reason}:${run.id}`,
3234
maxAttempts: options.skipRetrying ? 1 : undefined,
3335
}
3436
);
3537
}
3638

37-
export async function dequeueRunExecutionV2(run: JobRun, tx: PrismaClientOrTransaction) {
38-
return await executionWorker.dequeue(`job_run:${run.id}`, {
39+
export async function dequeueRunExecutionV3(run: JobRun, tx: PrismaClientOrTransaction) {
40+
await executionWorker.dequeue(`job_run:EXECUTE_JOB:${run.id}`, {
41+
tx,
42+
});
43+
44+
await executionWorker.dequeue(`job_run:PREPROCESS:${run.id}`, {
3945
tx,
4046
});
4147
}

apps/webapp/app/models/task.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export function taskWithAttemptsToServerTask(task: TaskWithAttempts): ServerTask
2525
operation: task.operation,
2626
callbackUrl: task.callbackUrl,
2727
forceYield: task.run.forceYieldImmediately,
28+
childExecutionMode: task.childExecutionMode,
2829
};
2930
}
3031

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ import { run as graphileRun, parseCronItems } from "graphile-worker";
1414
import omit from "lodash.omit";
1515
import { z } from "zod";
1616
import { PrismaClient, PrismaClientOrTransaction } from "~/db.server";
17-
import { workerLogger as logger } from "~/services/logger.server";
1817
import { PgListenService } from "~/services/db/pgListen.server";
19-
import { safeJsonParse } from "~/utils/json";
18+
import { workerLogger as logger } from "~/services/logger.server";
2019

2120
export interface MessageCatalogSchema {
2221
[key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;

apps/webapp/app/routes/api.v1.endpoints.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export async function action({ request }: ActionFunctionArgs) {
6060
return json(endpoint);
6161
} catch (error) {
6262
if (error instanceof Error) {
63-
logger.error("Error creating endpoint", {
63+
logger.debug("Error creating endpoint", {
6464
url: request.url,
6565
error: error.message,
6666
});
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
2+
import { json } from "@remix-run/server-runtime";
3+
import { InvokeJobRequestBodySchema } from "@trigger.dev/core";
4+
import { z } from "zod";
5+
import { PrismaErrorSchema } from "~/db.server";
6+
import { authenticateApiRequest } from "~/services/apiAuth.server";
7+
import { InvokeJobService } from "~/services/jobs/invokeJob.server";
8+
import { logger } from "~/services/logger.server";
9+
10+
const ParamsSchema = z.object({
11+
jobSlug: z.string(),
12+
});
13+
14+
const HeadersSchema = z.object({
15+
"idempotency-key": z.string().optional().nullable(),
16+
"trigger-version": z.string().optional().nullable(),
17+
});
18+
19+
export async function action({ request, params }: ActionFunctionArgs) {
20+
// Ensure this is a POST request
21+
if (request.method.toUpperCase() !== "POST") {
22+
return { status: 405, body: "Method Not Allowed" };
23+
}
24+
25+
// Authenticate the request
26+
const authenticationResult = await authenticateApiRequest(request);
27+
28+
if (!authenticationResult) {
29+
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
30+
}
31+
32+
const parsed = ParamsSchema.safeParse(params);
33+
34+
if (!parsed.success) {
35+
return json({ error: "Invalid or Missing jobSlug" }, { status: 400 });
36+
}
37+
38+
const { jobSlug } = parsed.data;
39+
40+
const headers = HeadersSchema.safeParse(Object.fromEntries(request.headers));
41+
42+
if (!headers.success) {
43+
return json({ error: "Invalid headers" }, { status: 400 });
44+
}
45+
46+
const { "idempotency-key": idempotencyKey, "trigger-version": triggerVersion } = headers.data;
47+
48+
// Now parse the request body
49+
const anyBody = await request.json();
50+
51+
logger.debug("InvokeJobService.call() request body", {
52+
body: anyBody,
53+
jobSlug,
54+
idempotencyKey,
55+
triggerVersion,
56+
});
57+
58+
const body = InvokeJobRequestBodySchema.safeParse(anyBody);
59+
60+
if (!body.success) {
61+
return json({ error: "Invalid request body" }, { status: 400 });
62+
}
63+
64+
const service = new InvokeJobService();
65+
66+
try {
67+
const run = await service.call(
68+
authenticationResult.environment,
69+
jobSlug,
70+
body.data,
71+
idempotencyKey ?? undefined
72+
);
73+
74+
if (!run) {
75+
return json({ error: "Job count not be invoked" }, { status: 500 });
76+
}
77+
78+
return json({ id: run.id });
79+
} catch (error) {
80+
const prismaError = PrismaErrorSchema.safeParse(error);
81+
// Record not found in the database
82+
if (prismaError.success && prismaError.data.code === "P2005") {
83+
return json({ error: "Job not found" }, { status: 404 });
84+
} else {
85+
return json({ error: "Internal Server Error" }, { status: 500 });
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)