@@ -2577,7 +2577,8 @@ export class RunQueue {
25772577 messageQueue ,
25782578 JSON . stringify ( message ) ,
25792579 String ( messageScore ) ,
2580- ckWildcardName
2580+ ckWildcardName ,
2581+ this . options . redis . keyPrefix ?? ""
25812582 ) ;
25822583 } else {
25832584 await this . redis . nackMessage (
@@ -3461,7 +3462,9 @@ end
34613462-- Slow path: normal enqueue
34623463redis.call('SET', messageKey, messageData)
34633464
3464- -- Lazy-init lengthCounter from existing ckIndex variants (once per base queue, ever).
3465+ -- Lazy-init lengthCounter from existing ckIndex variants (once per base queue per 24h).
3466+ -- The 24h TTL means the counter periodically re-anchors to truth, bounding any drift
3467+ -- that accumulated during rolling-deploy overlap windows.
34653468-- Run BEFORE the ZADD so we capture pre-state; the subsequent INCR accounts for the new message.
34663469-- The counter tracks ONLY CK-variant messages — the read path adds ZCARD(base) separately,
34673470-- so the base zset is intentionally excluded here.
@@ -3471,12 +3474,16 @@ if redis.call('EXISTS', lengthCounterKey) == 0 then
34713474 for _, v in ipairs(variants) do
34723475 total = total + tonumber(redis.call('ZCARD', keyPrefix .. v) or '0')
34733476 end
3474- redis.call('SET', lengthCounterKey, total)
3477+ redis.call('SET', lengthCounterKey, total, 'EX', 86400 )
34753478end
34763479
3477- redis.call('ZADD', queueKey, messageScore, messageId)
3480+ -- INCR is gated on ZADD returning 1 (new entry). A duplicate enqueue (same messageId
3481+ -- already in the variant zset) returns 0 and must not bump the counter.
3482+ local added = redis.call('ZADD', queueKey, messageScore, messageId)
34783483redis.call('ZADD', envQueueKey, messageScore, messageId)
3479- redis.call('INCR', lengthCounterKey)
3484+ if added == 1 then
3485+ redis.call('INCR', lengthCounterKey)
3486+ end
34803487
34813488-- Rebalance CK index
34823489local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
@@ -3571,23 +3578,24 @@ end
35713578-- Slow path: normal enqueue
35723579redis.call('SET', messageKey, messageData)
35733580
3574- -- Lazy-init lengthCounter from existing ckIndex variants (once per base queue, ever).
3575- -- Run BEFORE the ZADD so we capture pre-state; the subsequent INCR accounts for the new message.
3576- -- The counter tracks ONLY CK-variant messages — the read path adds ZCARD(base) separately,
3577- -- so the base zset is intentionally excluded here.
3581+ -- Lazy-init lengthCounter from existing ckIndex variants (once per base queue per 24h).
3582+ -- See enqueueMessageCkTracked for the TTL rationale.
35783583if redis.call('EXISTS', lengthCounterKey) == 0 then
35793584 local total = 0
35803585 local variants = redis.call('ZRANGE', ckIndexKey, 0, -1)
35813586 for _, v in ipairs(variants) do
35823587 total = total + tonumber(redis.call('ZCARD', keyPrefix .. v) or '0')
35833588 end
3584- redis.call('SET', lengthCounterKey, total)
3589+ redis.call('SET', lengthCounterKey, total, 'EX', 86400 )
35853590end
35863591
3587- redis.call('ZADD', queueKey, messageScore, messageId)
3592+ -- INCR is gated on ZADD returning 1 (new entry).
3593+ local added = redis.call('ZADD', queueKey, messageScore, messageId)
35883594redis.call('ZADD', envQueueKey, messageScore, messageId)
35893595redis.call('ZADD', ttlQueueKey, ttlScore, ttlMember)
3590- redis.call('INCR', lengthCounterKey)
3596+ if added == 1 then
3597+ redis.call('INCR', lengthCounterKey)
3598+ end
35913599
35923600-- Rebalance CK index
35933601local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
@@ -3692,7 +3700,7 @@ for i, member in ipairs(expiredMembers) do
36923700 redis.call('SREM', envDequeuedKey, runId)
36933701
36943702 -- Rebalance CK index if this is a CK queue
3695- local ckMatch = string.match(rawQueueKey, "^(.+ ):ck:.+$ ")
3703+ local ckMatch = string.match(rawQueueKey, "(.- ):ck:")
36963704 if ckMatch then
36973705 local ckIndexKey = keyPrefix .. ckMatch .. ":ckIndex"
36983706 local earliest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
@@ -3797,7 +3805,7 @@ for i, member in ipairs(expiredMembers) do
37973805 redis.call('SREM', envDequeuedKey, runId)
37983806
37993807 -- Rebalance CK index AND update counters if this is a CK queue
3800- local ckMatch = string.match(rawQueueKey, "^(.+ ):ck:.+$ ")
3808+ local ckMatch = string.match(rawQueueKey, "(.- ):ck:")
38013809 if ckMatch then
38023810 local lengthCounterKey = keyPrefix .. ckMatch .. ":lengthCounter"
38033811 local runningCounterKey = keyPrefix .. ckMatch .. ":runningCounter"
@@ -4325,11 +4333,17 @@ local queue = messageData.queue
43254333local queueCurrentDequeuedKey = keyPrefix .. queue .. ':currentDequeued'
43264334local envCurrentDequeuedKey = keyPrefix .. string.match(queue, "(.+):queue:") .. ':currentDequeued'
43274335
4328- -- If CK, bump the runningCounter for the base queue (lazy init from ckIndex).
4336+ -- SADD first so we know if this dequeue is new (return 1) or a duplicate (return 0).
4337+ -- INCR runningCounter is gated on the new-membership result so re-dequeues don't inflate.
4338+ local addedDeq = redis.call('SADD', queueCurrentDequeuedKey, messageData.runId)
4339+ redis.call('SADD', envCurrentDequeuedKey, messageData.runId)
4340+
4341+ -- If CK + new addition, bump the runningCounter for the base queue (lazy init from ckIndex).
43294342-- The counter tracks ONLY CK-variant currentDequeued — the read path adds the base
4330- -- SCARD separately, so we exclude the base currentDequeued here.
4343+ -- SCARD separately, so we exclude the base currentDequeued here. 24h TTL on init
4344+ -- bounds any drift from rolling-deploy v1/v2 overlap.
43314345local baseQueue = string.match(queue, "(.-):ck:")
4332- if baseQueue then
4346+ if baseQueue and addedDeq == 1 then
43334347 local runningCounterKey = keyPrefix .. baseQueue .. ':runningCounter'
43344348 if redis.call('EXISTS', runningCounterKey) == 0 then
43354349 local ckIndexKey = keyPrefix .. baseQueue .. ':ckIndex'
@@ -4338,14 +4352,14 @@ if baseQueue then
43384352 for _, v in ipairs(variants) do
43394353 total = total + tonumber(redis.call('SCARD', keyPrefix .. v .. ':currentDequeued') or '0')
43404354 end
4341- redis.call('SET', runningCounterKey, total)
4355+ -- Subtract 1 because we already SADD'd our own runId above, which the per-variant
4356+ -- SCARDs now reflect. Without this, the lazy-init seed double-counts the new member
4357+ -- relative to what the subsequent INCR will add.
4358+ redis.call('SET', runningCounterKey, math.max(0, total - 1), 'EX', 86400)
43424359 end
43434360 redis.call('INCR', runningCounterKey)
43444361end
43454362
4346- redis.call('SADD', queueCurrentDequeuedKey, messageData.runId)
4347- redis.call('SADD', envCurrentDequeuedKey, messageData.runId)
4348-
43494363return message
43504364 ` ,
43514365 } ) ;
@@ -4749,6 +4763,8 @@ local messageQueueName = ARGV[2]
47494763local messageData = ARGV[3]
47504764local messageScore = tonumber(ARGV[4])
47514765local ckWildcardName = ARGV[5]
4766+ -- keyPrefix for prepending to variant names stored as values in ckIndex (lazy-init only)
4767+ local keyPrefix = ARGV[6]
47524768
47534769local function decrFloored(key)
47544770 if tonumber(redis.call('GET', key) or '0') > 0 then
@@ -4766,6 +4782,19 @@ if removedFromDequeued == 1 then
47664782 decrFloored(runningCounterKey)
47674783end
47684784
4785+ -- Lazy-init lengthCounter if missing (e.g. expired via 24h TTL). nack re-queues a
4786+ -- message, which means lengthCounter must be present before we INCR. Without this,
4787+ -- a nack after counter expiry would create the counter at 1 and stay drifted until
4788+ -- next reset.
4789+ if redis.call('EXISTS', lengthCounterKey) == 0 then
4790+ local total = 0
4791+ local variants = redis.call('ZRANGE', ckIndexKey, 0, -1)
4792+ for _, v in ipairs(variants) do
4793+ total = total + tonumber(redis.call('ZCARD', keyPrefix .. v) or '0')
4794+ end
4795+ redis.call('SET', lengthCounterKey, total, 'EX', 86400)
4796+ end
4797+
47694798-- ZADD back to the variant zset. INCR lengthCounter only if it's a new entry.
47704799local added = redis.call('ZADD', messageQueueKey, messageScore, messageId)
47714800redis.call('ZADD', envQueueKey, messageScore, messageId)
@@ -5511,6 +5540,7 @@ declare module "@internal/redis" {
55115540 messageData : string ,
55125541 messageScore : string ,
55135542 ckWildcardName : string ,
5543+ keyPrefix : string ,
55145544 callback ?: Callback < void >
55155545 ) : Result < void , Context > ;
55165546
0 commit comments