Skip to content

Commit 393af1b

Browse files
committed
Add ability to enable v2 marqs requeuing (and remove deprecated marqs visibility requeuing)
1 parent 6a91fb8 commit 393af1b

2 files changed

Lines changed: 1 addition & 63 deletions

File tree

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 0 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ export class MarQS {
7878
this.keys = options.keysProducer;
7979
this.queuePriorityStrategy = options.queuePriorityStrategy;
8080

81-
// Spawn options.workers workers to requeue visible messages
82-
this.#startRequeuingWorkers();
8381
this.#startRebalanceWorkers();
8482
this.#registerCommands();
8583
}
@@ -790,66 +788,6 @@ export class MarQS {
790788
}
791789
}
792790

793-
#startRequeuingWorkers() {
794-
// Start a new worker to requeue visible messages
795-
for (let i = 0; i < this.options.workers; i++) {
796-
const worker = new AsyncWorker(this.#requeueVisibleMessages.bind(this), 1000);
797-
798-
this.#requeueingWorkers.push(worker);
799-
800-
worker.start();
801-
}
802-
}
803-
804-
async #requeueVisibleMessages() {
805-
// Remove any of the messages from the timeoutQueue that have expired
806-
const messages = await this.redis.zrangebyscore(
807-
constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE,
808-
0,
809-
Date.now(),
810-
"LIMIT",
811-
0,
812-
10
813-
);
814-
815-
if (messages.length === 0) {
816-
return;
817-
}
818-
819-
for (let i = 0; i < messages.length; i++) {
820-
const message = messages[i];
821-
822-
const messageData = await this.redis.get(this.keys.messageKey(message));
823-
824-
if (!messageData) {
825-
// The message has been removed for some reason (TTL, etc.), so we should remove it from the timeout queue
826-
await this.redis.zrem(constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, message);
827-
828-
continue;
829-
}
830-
831-
const parsedMessage = MessagePayload.safeParse(JSON.parse(messageData));
832-
833-
if (!parsedMessage.success) {
834-
await this.redis.zrem(constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, message);
835-
836-
continue;
837-
}
838-
839-
await this.#callNackMessage({
840-
messageKey: this.keys.messageKey(message),
841-
messageQueue: parsedMessage.data.queue,
842-
parentQueue: parsedMessage.data.parentQueue,
843-
concurrencyKey: this.keys.currentConcurrencyKeyFromQueue(parsedMessage.data.queue),
844-
envConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(parsedMessage.data.queue),
845-
orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(parsedMessage.data.queue),
846-
visibilityQueue: constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE,
847-
messageId: parsedMessage.data.messageId,
848-
messageScore: parsedMessage.data.timestamp,
849-
});
850-
}
851-
}
852-
853791
async #rebalanceParentQueues() {
854792
return await new Promise<void>((resolve, reject) => {
855793
// Scan for sorted sets with the parent queue pattern

apps/webapp/app/v3/marqs/v2.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ function getMarQSClient() {
8282
defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY, // this is so we aren't limited by the environment concurrency
8383
defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT,
8484
visibilityTimeoutInMs: env.V2_MARQS_VISIBILITY_TIMEOUT_MS, // 15 minutes
85-
enableRebalancing: !env.MARQS_DISABLE_REBALANCING,
85+
enableRebalancing: env.V2_MARQS_CONSUMER_POOL_ENABLED === "1",
8686
});
8787
}
8888

0 commit comments

Comments
 (0)