Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/smart-needles-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Remove unimplemented batchOptions
1 change: 1 addition & 0 deletions apps/docker-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class DockerTaskOperations implements TaskOperations {
return await execa("docker", [
"exec",
containerName,
"busybox",
"wget",
"-q",
"-O-",
Expand Down
2 changes: 1 addition & 1 deletion apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ class KubernetesTaskOperations implements TaskOperations {
`for i in $(seq ${retries}); do sleep 1; busybox wget -q -O- 127.0.0.1:8000/${type}?cause=${cause} && break; done`,
];

logger.log("getLifecycleCommand()", { exec });
logger.debug("getLifecycleCommand()", { exec });

return exec;
}
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/components/runs/v3/RunFilters.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TrashIcon } from "@heroicons/react/20/solid";
import { XMarkIcon } from "@heroicons/react/20/solid";
import { useNavigate } from "@remix-run/react";
import type { TaskRunStatus as TaskRunStatusType } from "@trigger.dev/database";
import { RuntimeEnvironment, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
Expand Down Expand Up @@ -247,7 +247,7 @@ export function RunsFilters({ possibleEnvironments, possibleTasks }: RunFiltersP

<TimeFrameFilter from={from} to={to} onRangeChanged={handleTimeFrameChange} />

<Button variant="minimal/small" onClick={() => clearFilters()} LeadingIcon={TrashIcon} />
<Button variant="minimal/small" onClick={() => clearFilters()} LeadingIcon={XMarkIcon} />
</div>
);
}
235 changes: 119 additions & 116 deletions apps/webapp/app/v3/services/resumeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import {
TaskRunExecution,
TaskRunExecutionResult,
} from "@trigger.dev/core/v3";
import { $transaction } from "~/db.server";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { socketIo } from "../handleSocketIo.server";
import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server";
import { BaseService } from "./baseService.server";
import { TaskRunAttempt } from "@trigger.dev/database";

export class ResumeAttemptService extends BaseService {
public async call(
Expand All @@ -24,7 +25,7 @@ export class ResumeAttemptService extends BaseService {
},
include: {
taskRun: true,
taskRunDependency: {
dependencies: {
Comment thread
matt-aitken marked this conversation as resolved.
include: {
taskRun: {
include: {
Expand All @@ -40,8 +41,12 @@ export class ResumeAttemptService extends BaseService {
},
},
},
orderBy: {
createdAt: "desc",
},
take: 1,
},
batchTaskRunDependency: {
batchDependencies: {
include: {
items: {
include: {
Expand All @@ -61,6 +66,10 @@ export class ResumeAttemptService extends BaseService {
},
},
},
orderBy: {
createdAt: "desc",
},
take: 1,
},
},
});
Expand All @@ -78,6 +87,8 @@ export class ResumeAttemptService extends BaseService {
return;
Comment thread
matt-aitken marked this conversation as resolved.
}

let completedAttemptIds: string[] = [];

switch (params.type) {
case "WAIT_FOR_DURATION": {
logger.error(
Expand All @@ -93,148 +104,140 @@ export class ResumeAttemptService extends BaseService {
});
break;
}
case "WAIT_FOR_TASK":
case "WAIT_FOR_BATCH": {
let completedAttemptIds: string[] = [];

if (attempt.taskRunDependency) {
const dependentAttempt = attempt.taskRunDependency.taskRun.attempts[0];
case "WAIT_FOR_TASK": {
if (attempt.dependencies.length) {
// We only care about the latest dependency
const dependentAttempt = attempt.dependencies[0].taskRun.attempts[0];

if (!dependentAttempt) {
logger.error("No dependent attempt", { attemptId: attempt.id });
return;
}

completedAttemptIds = [dependentAttempt.id];

await tx.taskRunAttempt.update({
where: {
id: attempt.id,
},
data: {
taskRunDependency: {
disconnect: true,
},
},
});
} else if (attempt.batchTaskRunDependency) {
const dependentBatchItems = attempt.batchTaskRunDependency.items;
} else {
logger.error("No task dependency", { attemptId: attempt.id });
return;
}
break;
}
case "WAIT_FOR_BATCH": {
if (attempt.batchDependencies) {
// We only care about the latest batch dependency
const dependentBatchItems = attempt.batchDependencies[0].items;

if (!dependentBatchItems) {
logger.error("No dependent batch items", { attemptId: attempt.id });
return;
}

completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id);

await tx.taskRunAttempt.update({
where: {
id: attempt.id,
},
data: {
batchTaskRunDependency: {
disconnect: true,
},
},
});
} else {
logger.error("No dependencies", { attemptId: attempt.id });
return;
}

if (completedAttemptIds.length === 0) {
logger.error("No completed attempt IDs", { attemptId: attempt.id });
logger.error("No batch dependency", { attemptId: attempt.id });
return;
}
break;
}
default: {
break;
}
}

const completions: TaskRunExecutionResult[] = [];
const executions: TaskRunExecution[] = [];
await this.#handleDependencyResume(attempt, completedAttemptIds, tx);
});
}

for (const completedAttemptId of completedAttemptIds) {
const completedAttempt = await tx.taskRunAttempt.findUnique({
where: {
id: completedAttemptId,
taskRun: {
lockedAt: {
not: null,
},
lockedById: {
not: null,
},
},
},
});

if (!completedAttempt) {
logger.error("Completed attempt not found", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}
async #handleDependencyResume(
attempt: TaskRunAttempt,
completedAttemptIds: string[],
tx: PrismaClientOrTransaction
) {
if (completedAttemptIds.length === 0) {
logger.error("No completed attempt IDs", { attemptId: attempt.id });
return;
}

const completions: TaskRunExecutionResult[] = [];
const executions: TaskRunExecution[] = [];

for (const completedAttemptId of completedAttemptIds) {
const completedAttempt = await tx.taskRunAttempt.findUnique({
where: {
id: completedAttemptId,
taskRun: {
lockedAt: {
not: null,
},
lockedById: {
not: null,
},
},
},
});

const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt(
completedAttempt.id
);
if (!completedAttempt) {
logger.error("Completed attempt not found", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}

if (!completion) {
logger.error("Failed to get completion payload", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}
const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt(
completedAttempt.id
);

completions.push(completion);
if (!completion) {
logger.error("Failed to get completion payload", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}

const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt(
completedAttempt.id
);
completions.push(completion);

if (!executionPayload) {
logger.error("Failed to get execution payload", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt(
completedAttempt.id
);

executions.push(executionPayload.execution);
}
if (!executionPayload) {
logger.error("Failed to get execution payload", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}

const updated = await tx.taskRunAttempt.update({
where: {
id: attempt.id,
},
executions.push(executionPayload.execution);
}

const updated = await tx.taskRunAttempt.update({
where: {
id: attempt.id,
},
data: {
status: "EXECUTING",
taskRun: {
update: {
data: {
status: "EXECUTING",
taskRun: {
update: {
data: {
status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING",
},
},
},
status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING",
},
});
},
},
},
});

socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
version: "v1",
runId: attempt.taskRunId,
attemptId: attempt.id,
attemptFriendlyId: attempt.friendlyId,
completions,
executions,
});
break;
}
default: {
break;
}
}
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
version: "v1",
runId: attempt.taskRunId,
attemptId: attempt.id,
attemptFriendlyId: attempt.friendlyId,
completions,
executions,
});
}
}
2 changes: 1 addition & 1 deletion docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
},
{
"name": "v2",
"url": "https://trigger.dev/docs",
"url": "https://trigger.dev/docs/documentation",
"version": "v3 (Developer Preview)"
},
{
Expand Down
Loading