Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/run-engine-single-ttl-path.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Route TTL expiration through the batch TTL path only. Removes the redundant per-run `expireRun` worker job, leaving the batch consumer as the single mechanism that flips runs to `EXPIRED` when their TTL elapses while still queued.
10 changes: 8 additions & 2 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,13 @@ export class RunEngine {
}
} else {
try {
if (taskRun.ttl) {
// The new batch TTL path only expires runs still in the queue
// sorted set (waiting on a concurrency slot). For DEV
// environments where the dev CLI may not be running, fast-pathed
// runs can sit on the worker queue indefinitely and never get
// claimed for expiration. Keep the legacy per-run expireRun job
// armed for DEV so those runs still expire.
if (taskRun.ttl && environment.type === "DEVELOPMENT") {
await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl });
}

Expand All @@ -812,7 +818,7 @@ export class RunEngine {
enableFastPath,
});
} catch (enqueueError) {
this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", {
this.logger.error("engine.trigger(): failed to enqueue run", {
runId: taskRun.id,
friendlyId: taskRun.friendlyId,
taskIdentifier: taskRun.taskIdentifier,
Expand Down
35 changes: 22 additions & 13 deletions internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,34 @@ export class DelayedRunSystem {
return;
}

// The batch TTL path only expires runs still in the queue sorted set.
// For DEV environments where the dev CLI may not be running, fast-pathed
// runs can sit on the worker queue indefinitely. Keep the legacy per-run
// expireRun job armed for DEV so those runs still expire.
if (run.ttl && run.runtimeEnvironment.type === "DEVELOPMENT") {
const expireAt = parseNaturalLanguageDuration(run.ttl);
if (expireAt) {
await this.$.worker.enqueue({
id: `expireRun:${runId}`,
job: "expireRun",
payload: { runId },
availableAt: expireAt,
});
}
}

// Now we need to enqueue the run into the RunQueue
// Skip the lock in enqueueRun since we already hold it
// Skip the lock in enqueueRun since we already hold it.
// includeTtl: true so the run's TTL is armed from the moment it enters
// the queue (not from taskRun.createdAt). The TTL system tracks runs
// that are queued and have never started — delayed runs are first
// enqueued here, so this is the correct point to arm TTL.
await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
batchId: run.batchId ?? undefined,
skipRunLock: true,
includeTtl: true,
});

const queuedAt = new Date();
Expand Down Expand Up @@ -183,18 +204,6 @@ export class DelayedRunSystem {
},
});

if (run.ttl) {
const expireAt = parseNaturalLanguageDuration(run.ttl);

if (expireAt) {
await this.$.worker.enqueue({
id: `expireRun:${runId}`,
job: "expireRun",
payload: { runId },
availableAt: expireAt,
});
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ export class PendingVersionSystem {
run: updatedRun,
env: backgroundWorker.runtimeEnvironment,
tx,
// PENDING_VERSION re-enqueue is the first time this run is actually
// entering the run queue (the original enqueue was held back waiting
// for a worker version). Arm TTL here so the TTL system can expire it
// if it sits queued waiting on a concurrency slot.
includeTtl: true,
});
});

Expand Down
45 changes: 37 additions & 8 deletions internal-packages/run-engine/src/engine/tests/delays.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ describe("RunEngine delays", () => {
},
queue: {
redis: redisOptions,
ttlSystem: {
pollIntervalMs: 100,
batchSize: 10,
batchMaxWaitMs: 100,
},
},
runLock: {
redis: redisOptions,
Expand Down Expand Up @@ -230,7 +235,21 @@ describe("RunEngine delays", () => {
taskIdentifier
);

// TTL only expires runs still queued waiting on a concurrency slot.
// Once the delay elapses, the run gets enqueued; saturate env concurrency
// so it stays queued so the new TTL path can expire it.
await engine.runQueue.updateEnvConcurrencyLimits({
...authenticatedEnvironment,
maximumConcurrencyLimit: 0,
});

const enqueuedAfterDelayTimes: number[] = [];
engine.eventBus.on("runEnqueuedAfterDelay", () => {
enqueuedAfterDelayTimes.push(Date.now());
});

//trigger the run
const triggerTime = Date.now();
const run = await engine.trigger(
{
number: 1,
Expand All @@ -247,7 +266,7 @@ describe("RunEngine delays", () => {
queue: "task/test-task",
isTest: false,
tags: [],
delayUntil: new Date(Date.now() + 1000),
delayUntil: new Date(triggerTime + 1000),
ttl: "2s",
},
prisma
Expand All @@ -259,7 +278,7 @@ describe("RunEngine delays", () => {
expect(executionData.snapshot.executionStatus).toBe("DELAYED");
expect(run.status).toBe("DELAYED");

//wait for 1 seconds
//wait so the delay elapses and the run is enqueued
await setTimeout(2_500);

//should now be queued
Expand All @@ -273,19 +292,29 @@ describe("RunEngine delays", () => {

expect(run2.status).toBe("PENDING");

//wait for 3 seconds
await setTimeout(3_000);
// TTL is armed at queue-enter time (not from triggerTime). With a 2s TTL
// and a 1s delay, the run becomes eligible to expire ~3s after trigger.
// Confirm the TTL was not armed against triggerTime (i.e. didn't already
// fire while still DELAYED), and that the run only expires after the
// queue-enter timestamp + ttl has elapsed.
expect(enqueuedAfterDelayTimes.length).toBe(1);
const enqueuedAt = enqueuedAfterDelayTimes[0]!;
expect(enqueuedAt - triggerTime).toBeGreaterThanOrEqual(1000);

//should now be expired
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData3);
expect(executionData3.snapshot.executionStatus).toBe("FINISHED");
//wait so the TTL fires (counted from when the run was enqueued)
await setTimeout(3_000);

// Status comes from the DB; the batch TTL path does not create
// execution snapshots, so getRunExecutionData may still show QUEUED.
const run3 = await prisma.taskRun.findFirstOrThrow({
where: { id: run.id },
});

expect(run3.status).toBe("EXPIRED");
assertNonNullable(run3.expiredAt);
// The expiry must happen after enqueue + ttl, not after trigger + ttl.
// Allow a small tolerance for poll interval + batch wait.
expect(run3.expiredAt.getTime()).toBeGreaterThanOrEqual(enqueuedAt + 2_000);
} finally {
await engine.quit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ describe("RunEngine lazy waitpoint creation", () => {
ttlSystem: {
pollIntervalMs: 100,
batchSize: 10,
batchMaxWaitMs: 100,
},
},
runLock: {
Expand All @@ -434,6 +435,12 @@ describe("RunEngine lazy waitpoint creation", () => {

await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);

// TTL only expires runs still queued waiting on a concurrency slot.
await engine.runQueue.updateEnvConcurrencyLimits({
...authenticatedEnvironment,
maximumConcurrencyLimit: 0,
});

// Trigger a standalone run with TTL (no waitpoint)
const run = await engine.trigger(
{
Expand Down Expand Up @@ -467,11 +474,15 @@ describe("RunEngine lazy waitpoint creation", () => {
// Wait for TTL to expire
await setTimeout(1_500);

// Verify run expired successfully (no throw)
const executionData = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData);
expect(executionData.run.status).toBe("EXPIRED");
expect(executionData.snapshot.executionStatus).toBe("FINISHED");
// Verify run expired successfully (no throw).
// The batch TTL path does not create execution snapshots, so check
// the status directly from the database rather than via
// getRunExecutionData.
const expiredRun = await prisma.taskRun.findUnique({
where: { id: run.id },
select: { status: true },
});
expect(expiredRun?.status).toBe("EXPIRED");
} finally {
await engine.quit();
}
Expand Down
116 changes: 116 additions & 0 deletions internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,120 @@ describe("RunEngine pending version", () => {
}
}
);

containerTest(
"PENDING_VERSION re-enqueue arms TTL on the queued message",
async ({ prisma, redisOptions }) => {
// When a run enters PENDING_VERSION (background worker doesn't yet have
// the task), the first enqueue happens but the message is dequeued and
// its TTL set entry is dropped while the run waits for a matching worker.
// Once a worker arrives, pendingVersionSystem re-enqueues. That
// re-enqueue is the first time the run is actually queued for a worker,
// so TTL must be armed at that point — not held over from the original
// enqueue.
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
processWorkerQueueDebounceMs: 50,
masterQueueConsumersDisabled: true,
ttlSystem: {
pollIntervalMs: 100,
batchSize: 10,
batchMaxWaitMs: 100,
},
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0001,
},
tracer: trace.getTracer("test", "0.0.0"),
});

try {
const taskIdentifier = "test-task";

// Trigger a run with TTL — no background worker exists yet for this
// task, so it will end up in PENDING_VERSION.
const run = await engine.trigger(
{
number: 1,
friendlyId: "run_pvttl1",
environment: authenticatedEnvironment,
taskIdentifier,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "tpv1",
spanId: "spv1",
queue: "task/test-task",
isTest: false,
tags: [],
ttl: "10m",
},
prisma
);

// A worker arrives that doesn't have this task — flushes the run to
// PENDING_VERSION.
await setupBackgroundWorker(engine, authenticatedEnvironment, ["test-task-other"]);

await setTimeout(500);

// The consumer attempt is what flushes the run to PENDING_VERSION —
// dequeue finds no matching task version and returns nothing.
const dequeuedEmpty = await engine.dequeueFromWorkerQueue({
consumerId: "test_pv",
workerQueue: "main",
});
expect(dequeuedEmpty.length).toBe(0);

const executionDataAfter = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionDataAfter);
expect(executionDataAfter.run.status).toBe("PENDING_VERSION");

// Now a worker arrives WITH the task — pendingVersionSystem
// re-enqueues the run.
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);

await setTimeout(1000);

const executionDataReenqueued = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionDataReenqueued);
expect(executionDataReenqueued.run.status).toBe("PENDING");

// The re-enqueued message must carry ttlExpiresAt so the TTL set
// tracks it for expiration.
const message = await engine.runQueue.readMessage(
authenticatedEnvironment.organization.id,
run.id
);
assertNonNullable(message);
expect(message.ttlExpiresAt).toBeDefined();
expect(typeof message.ttlExpiresAt).toBe("number");
} finally {
await engine.quit();
}
}
);
});
Loading
Loading