Skip to content

Commit ad7d627

Browse files
committed
fix(run-engine): gate counter INCR on ZADD/SADD and 24h TTL the counter
1 parent 732a848 commit ad7d627

2 files changed

Lines changed: 152 additions & 22 deletions

File tree

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2577,7 +2577,8 @@ export class RunQueue {
25772577
messageQueue,
25782578
JSON.stringify(message),
25792579
String(messageScore),
2580-
ckWildcardName
2580+
ckWildcardName,
2581+
this.options.redis.keyPrefix ?? ""
25812582
);
25822583
} else {
25832584
await this.redis.nackMessage(
@@ -3461,7 +3462,9 @@ end
34613462
-- Slow path: normal enqueue
34623463
redis.call('SET', messageKey, messageData)
34633464
3464-
-- Lazy-init lengthCounter from existing ckIndex variants (once per base queue, ever).
3465+
-- Lazy-init lengthCounter from existing ckIndex variants (once per base queue per 24h).
3466+
-- The 24h TTL means the counter periodically re-anchors to truth, bounding any drift
3467+
-- that accumulated during rolling-deploy overlap windows.
34653468
-- Run BEFORE the ZADD so we capture pre-state; the subsequent INCR accounts for the new message.
34663469
-- The counter tracks ONLY CK-variant messages — the read path adds ZCARD(base) separately,
34673470
-- so the base zset is intentionally excluded here.
@@ -3471,12 +3474,16 @@ if redis.call('EXISTS', lengthCounterKey) == 0 then
34713474
for _, v in ipairs(variants) do
34723475
total = total + tonumber(redis.call('ZCARD', keyPrefix .. v) or '0')
34733476
end
3474-
redis.call('SET', lengthCounterKey, total)
3477+
redis.call('SET', lengthCounterKey, total, 'EX', 86400)
34753478
end
34763479
3477-
redis.call('ZADD', queueKey, messageScore, messageId)
3480+
-- INCR is gated on ZADD returning 1 (new entry). A duplicate enqueue (same messageId
3481+
-- already in the variant zset) returns 0 and must not bump the counter.
3482+
local added = redis.call('ZADD', queueKey, messageScore, messageId)
34783483
redis.call('ZADD', envQueueKey, messageScore, messageId)
3479-
redis.call('INCR', lengthCounterKey)
3484+
if added == 1 then
3485+
redis.call('INCR', lengthCounterKey)
3486+
end
34803487
34813488
-- Rebalance CK index
34823489
local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
@@ -3571,23 +3578,24 @@ end
35713578
-- Slow path: normal enqueue
35723579
redis.call('SET', messageKey, messageData)
35733580
3574-
-- Lazy-init lengthCounter from existing ckIndex variants (once per base queue, ever).
3575-
-- Run BEFORE the ZADD so we capture pre-state; the subsequent INCR accounts for the new message.
3576-
-- The counter tracks ONLY CK-variant messages — the read path adds ZCARD(base) separately,
3577-
-- so the base zset is intentionally excluded here.
3581+
-- Lazy-init lengthCounter from existing ckIndex variants (once per base queue per 24h).
3582+
-- See enqueueMessageCkTracked for the TTL rationale.
35783583
if redis.call('EXISTS', lengthCounterKey) == 0 then
35793584
local total = 0
35803585
local variants = redis.call('ZRANGE', ckIndexKey, 0, -1)
35813586
for _, v in ipairs(variants) do
35823587
total = total + tonumber(redis.call('ZCARD', keyPrefix .. v) or '0')
35833588
end
3584-
redis.call('SET', lengthCounterKey, total)
3589+
redis.call('SET', lengthCounterKey, total, 'EX', 86400)
35853590
end
35863591
3587-
redis.call('ZADD', queueKey, messageScore, messageId)
3592+
-- INCR is gated on ZADD returning 1 (new entry).
3593+
local added = redis.call('ZADD', queueKey, messageScore, messageId)
35883594
redis.call('ZADD', envQueueKey, messageScore, messageId)
35893595
redis.call('ZADD', ttlQueueKey, ttlScore, ttlMember)
3590-
redis.call('INCR', lengthCounterKey)
3596+
if added == 1 then
3597+
redis.call('INCR', lengthCounterKey)
3598+
end
35913599
35923600
-- Rebalance CK index
35933601
local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
@@ -3692,7 +3700,7 @@ for i, member in ipairs(expiredMembers) do
36923700
redis.call('SREM', envDequeuedKey, runId)
36933701
36943702
-- Rebalance CK index if this is a CK queue
3695-
local ckMatch = string.match(rawQueueKey, "^(.+):ck:.+$")
3703+
local ckMatch = string.match(rawQueueKey, "(.-):ck:")
36963704
if ckMatch then
36973705
local ckIndexKey = keyPrefix .. ckMatch .. ":ckIndex"
36983706
local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
@@ -3797,7 +3805,7 @@ for i, member in ipairs(expiredMembers) do
37973805
redis.call('SREM', envDequeuedKey, runId)
37983806
37993807
-- Rebalance CK index AND update counters if this is a CK queue
3800-
local ckMatch = string.match(rawQueueKey, "^(.+):ck:.+$")
3808+
local ckMatch = string.match(rawQueueKey, "(.-):ck:")
38013809
if ckMatch then
38023810
local lengthCounterKey = keyPrefix .. ckMatch .. ":lengthCounter"
38033811
local runningCounterKey = keyPrefix .. ckMatch .. ":runningCounter"
@@ -4325,11 +4333,17 @@ local queue = messageData.queue
43254333
local queueCurrentDequeuedKey = keyPrefix .. queue .. ':currentDequeued'
43264334
local envCurrentDequeuedKey = keyPrefix .. string.match(queue, "(.+):queue:") .. ':currentDequeued'
43274335
4328-
-- If CK, bump the runningCounter for the base queue (lazy init from ckIndex).
4336+
-- SADD first so we know if this dequeue is new (return 1) or a duplicate (return 0).
4337+
-- INCR runningCounter is gated on the new-membership result so re-dequeues don't inflate.
4338+
local addedDeq = redis.call('SADD', queueCurrentDequeuedKey, messageData.runId)
4339+
redis.call('SADD', envCurrentDequeuedKey, messageData.runId)
4340+
4341+
-- If CK + new addition, bump the runningCounter for the base queue (lazy init from ckIndex).
43294342
-- The counter tracks ONLY CK-variant currentDequeued — the read path adds the base
4330-
-- SCARD separately, so we exclude the base currentDequeued here.
4343+
-- SCARD separately, so we exclude the base currentDequeued here. 24h TTL on init
4344+
-- bounds any drift from rolling-deploy v1/v2 overlap.
43314345
local baseQueue = string.match(queue, "(.-):ck:")
4332-
if baseQueue then
4346+
if baseQueue and addedDeq == 1 then
43334347
local runningCounterKey = keyPrefix .. baseQueue .. ':runningCounter'
43344348
if redis.call('EXISTS', runningCounterKey) == 0 then
43354349
local ckIndexKey = keyPrefix .. baseQueue .. ':ckIndex'
@@ -4338,14 +4352,14 @@ if baseQueue then
43384352
for _, v in ipairs(variants) do
43394353
total = total + tonumber(redis.call('SCARD', keyPrefix .. v .. ':currentDequeued') or '0')
43404354
end
4341-
redis.call('SET', runningCounterKey, total)
4355+
-- Subtract 1 because we already SADD'd our own runId above, which the per-variant
4356+
-- SCARDs now reflect. Without this, the lazy-init seed double-counts the new member
4357+
-- relative to what the subsequent INCR will add.
4358+
redis.call('SET', runningCounterKey, math.max(0, total - 1), 'EX', 86400)
43424359
end
43434360
redis.call('INCR', runningCounterKey)
43444361
end
43454362
4346-
redis.call('SADD', queueCurrentDequeuedKey, messageData.runId)
4347-
redis.call('SADD', envCurrentDequeuedKey, messageData.runId)
4348-
43494363
return message
43504364
`,
43514365
});
@@ -4749,6 +4763,8 @@ local messageQueueName = ARGV[2]
47494763
local messageData = ARGV[3]
47504764
local messageScore = tonumber(ARGV[4])
47514765
local ckWildcardName = ARGV[5]
4766+
-- keyPrefix for prepending to variant names stored as values in ckIndex (lazy-init only)
4767+
local keyPrefix = ARGV[6]
47524768
47534769
local function decrFloored(key)
47544770
if tonumber(redis.call('GET', key) or '0') > 0 then
@@ -4766,6 +4782,19 @@ if removedFromDequeued == 1 then
47664782
decrFloored(runningCounterKey)
47674783
end
47684784
4785+
-- Lazy-init lengthCounter if missing (e.g. expired via 24h TTL). nack re-queues a
4786+
-- message, which means lengthCounter must be present before we INCR. Without this,
4787+
-- a nack after counter expiry would create the counter at 1 and stay drifted until
4788+
-- next reset.
4789+
if redis.call('EXISTS', lengthCounterKey) == 0 then
4790+
local total = 0
4791+
local variants = redis.call('ZRANGE', ckIndexKey, 0, -1)
4792+
for _, v in ipairs(variants) do
4793+
total = total + tonumber(redis.call('ZCARD', keyPrefix .. v) or '0')
4794+
end
4795+
redis.call('SET', lengthCounterKey, total, 'EX', 86400)
4796+
end
4797+
47694798
-- ZADD back to the variant zset. INCR lengthCounter only if it's a new entry.
47704799
local added = redis.call('ZADD', messageQueueKey, messageScore, messageId)
47714800
redis.call('ZADD', envQueueKey, messageScore, messageId)
@@ -5511,6 +5540,7 @@ declare module "@internal/redis" {
55115540
messageData: string,
55125541
messageScore: string,
55135542
ckWildcardName: string,
5543+
keyPrefix: string,
55145544
callback?: Callback<void>
55155545
): Result<void, Context>;
55165546

internal-packages/run-engine/src/run-queue/tests/ckCounters.test.ts

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { redisTest } from "@internal/testcontainers";
22
import { trace } from "@internal/tracing";
33
import { Logger } from "@trigger.dev/core/logger";
44
import { describe } from "vitest";
5-
import { setTimeout } from "node:timers/promises";
65
import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js";
76
import { RunQueue } from "../index.js";
87
import { RunQueueFullKeyProducer } from "../keyProducer.js";
@@ -327,4 +326,105 @@ describe("CK base-queue counters", () => {
327326
}
328327
}
329328
);
329+
330+
redisTest(
331+
"lengthCounter has 24h TTL after lazy-init",
332+
async ({ redisContainer }) => {
333+
const queue = createQueue(redisContainer);
334+
try {
335+
await queue.enqueueMessage({
336+
env: authenticatedEnvDev,
337+
message: makeMessage({ runId: "r1", concurrencyKey: "ck-a" }),
338+
workerQueue: authenticatedEnvDev.id,
339+
skipDequeueProcessing: true,
340+
});
341+
342+
const counterKey = testOptions.keys.queueLengthCounterKey(
343+
authenticatedEnvDev,
344+
"task/my-task"
345+
);
346+
const ttl = await queue.redis.ttl(counterKey);
347+
// Expect roughly 86400; allow slack for test scheduling.
348+
expect(ttl).toBeGreaterThan(86000);
349+
expect(ttl).toBeLessThanOrEqual(86400);
350+
} finally {
351+
await queue.quit();
352+
}
353+
}
354+
);
355+
356+
redisTest(
357+
"duplicate CK enqueue (same runId) does not inflate lengthCounter",
358+
async ({ redisContainer }) => {
359+
const queue = createQueue(redisContainer);
360+
try {
361+
const msg = makeMessage({ runId: "r1", concurrencyKey: "ck-a" });
362+
363+
// First enqueue: counter goes 0 -> 1
364+
await queue.enqueueMessage({
365+
env: authenticatedEnvDev,
366+
message: msg,
367+
workerQueue: authenticatedEnvDev.id,
368+
skipDequeueProcessing: true,
369+
});
370+
371+
// Same runId again: ZADD returns 0 (already in zset), counter must stay at 1
372+
await queue.enqueueMessage({
373+
env: authenticatedEnvDev,
374+
message: msg,
375+
workerQueue: authenticatedEnvDev.id,
376+
skipDequeueProcessing: true,
377+
});
378+
379+
expect(await queue.lengthOfQueue(authenticatedEnvDev, msg.queue)).toBe(1);
380+
} finally {
381+
await queue.quit();
382+
}
383+
}
384+
);
385+
386+
redisTest(
387+
"nack lazy-inits lengthCounter when it expired",
388+
async ({ redisContainer }) => {
389+
const queue = createQueue(redisContainer);
390+
try {
391+
const msg = makeMessage({ runId: "r1", concurrencyKey: "ck-a" });
392+
// Seed three messages on the CK variant so the lazy-init has a non-trivial floor.
393+
for (let i = 0; i < 3; i++) {
394+
await queue.enqueueMessage({
395+
env: authenticatedEnvDev,
396+
message: makeMessage({ runId: `seed-${i}`, concurrencyKey: "ck-a" }),
397+
workerQueue: authenticatedEnvDev.id,
398+
skipDequeueProcessing: true,
399+
});
400+
}
401+
402+
// Simulate counter expiry (the 24h TTL kicked in).
403+
const counterKey = testOptions.keys.queueLengthCounterKey(
404+
authenticatedEnvDev,
405+
"task/my-task"
406+
);
407+
await queue.redis.del(counterKey);
408+
expect(await queue.redis.exists(counterKey)).toBe(0);
409+
410+
// Dequeue one to currentConcurrency so we have something to nack back.
411+
const shard = testOptions.keys.masterQueueShardForEnvironment(msg.environmentId, 2);
412+
await queue.testDequeueFromMasterQueue(shard, msg.environmentId, 1);
413+
414+
// Nack a CK message. nackMessageCkTracked should lazy-init the counter
415+
// (find 2 already in zset + 1 we're re-queuing) rather than starting from 1.
416+
await queue.nackMessage({
417+
orgId: msg.orgId,
418+
messageId: "seed-0",
419+
skipDequeueProcessing: true,
420+
});
421+
422+
// 3 originals, 1 was dequeued (still re-queued by nack), counter should now reflect all 3.
423+
const observed = await queue.lengthOfQueue(authenticatedEnvDev, msg.queue);
424+
expect(observed).toBe(3);
425+
} finally {
426+
await queue.quit();
427+
}
428+
}
429+
);
330430
});

0 commit comments

Comments
 (0)