@@ -78,8 +78,6 @@ export class MarQS {
7878 this . keys = options . keysProducer ;
7979 this . queuePriorityStrategy = options . queuePriorityStrategy ;
8080
81- // Spawn options.workers workers to requeue visible messages
82- this . #startRequeuingWorkers( ) ;
8381 this . #startRebalanceWorkers( ) ;
8482 this . #registerCommands( ) ;
8583 }
@@ -790,66 +788,6 @@ export class MarQS {
790788 }
791789 }
792790
793- #startRequeuingWorkers( ) {
794- // Start a new worker to requeue visible messages
795- for ( let i = 0 ; i < this . options . workers ; i ++ ) {
796- const worker = new AsyncWorker ( this . #requeueVisibleMessages. bind ( this ) , 1000 ) ;
797-
798- this . #requeueingWorkers. push ( worker ) ;
799-
800- worker . start ( ) ;
801- }
802- }
803-
804- async #requeueVisibleMessages( ) {
805- // Remove any of the messages from the timeoutQueue that have expired
806- const messages = await this . redis . zrangebyscore (
807- constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE ,
808- 0 ,
809- Date . now ( ) ,
810- "LIMIT" ,
811- 0 ,
812- 10
813- ) ;
814-
815- if ( messages . length === 0 ) {
816- return ;
817- }
818-
819- for ( let i = 0 ; i < messages . length ; i ++ ) {
820- const message = messages [ i ] ;
821-
822- const messageData = await this . redis . get ( this . keys . messageKey ( message ) ) ;
823-
824- if ( ! messageData ) {
825- // The message has been removed for some reason (TTL, etc.), so we should remove it from the timeout queue
826- await this . redis . zrem ( constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE , message ) ;
827-
828- continue ;
829- }
830-
831- const parsedMessage = MessagePayload . safeParse ( JSON . parse ( messageData ) ) ;
832-
833- if ( ! parsedMessage . success ) {
834- await this . redis . zrem ( constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE , message ) ;
835-
836- continue ;
837- }
838-
839- await this . #callNackMessage( {
840- messageKey : this . keys . messageKey ( message ) ,
841- messageQueue : parsedMessage . data . queue ,
842- parentQueue : parsedMessage . data . parentQueue ,
843- concurrencyKey : this . keys . currentConcurrencyKeyFromQueue ( parsedMessage . data . queue ) ,
844- envConcurrencyKey : this . keys . envCurrentConcurrencyKeyFromQueue ( parsedMessage . data . queue ) ,
845- orgConcurrencyKey : this . keys . orgCurrentConcurrencyKeyFromQueue ( parsedMessage . data . queue ) ,
846- visibilityQueue : constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE ,
847- messageId : parsedMessage . data . messageId ,
848- messageScore : parsedMessage . data . timestamp ,
849- } ) ;
850- }
851- }
852-
853791 async #rebalanceParentQueues( ) {
854792 return await new Promise < void > ( ( resolve , reject ) => {
855793 // Scan for sorted sets with the parent queue pattern
0 commit comments