'use strict'; class Queue { constructor(concurrency) { this.paused = false; this.concurrency = concurrency; this.count = 0; this.waiting = []; this.onProcess = null; this.onDone = null; this.onSuccess = null; this.onFailure = null; this.onDrain = null; this.waitTimeout = Infinity; this.processTimeout = Infinity; this.priorityMode = false; this.destination = null; } static channels(concurrency) { return new Queue(concurrency); } wait(msec) { this.waitTimeout = msec; return this; } timeout(msec) { this.processTimeout = msec; return this; } add(task, priority = 0) { if (!this.paused) { const hasChannel = this.count < this.concurrency; if (hasChannel) { this.next(task); return; } } this.waiting.push({ task, start: Date.now(), priority }); if (this.priorityMode) { this.waiting.sort((a, b) => b.priority - a.priority); } } next(task) { this.count++; let timer = null; let finished = false; const { processTimeout, onProcess } = this; const finish = (err, res) => { if (finished) return; finished = true; if (timer) clearTimeout(timer); this.count--; this.finish(err, res); if (!this.paused && this.waiting.length > 0) this.takeNext(); }; if (processTimeout !== Infinity) { const err = new Error('Process timed out'); timer = setTimeout(finish, processTimeout, err, task); } onProcess(task, finish); } takeNext() { const { waiting, waitTimeout } = this; const { task, start } = waiting.shift(); if (waitTimeout !== Infinity) { const delay = Date.now() - start; if (delay > waitTimeout) { const err = new Error('Waiting timed out'); this.finish(err, task); if (waiting.length > 0) { setTimeout(() => { if (!this.paused && waiting.length > 0) this.takeNext(); }, 0); } return; } } const hasChannel = this.count < this.concurrency; if (hasChannel) this.next(task); } finish(err, res) { const { onFailure, onSuccess, onDone, onDrain } = this; if (err) { if (onFailure) onFailure(err, res); } else { if (onSuccess) onSuccess(res); if (this.destination) this.destination.add(res); } if (onDone) onDone(err, res); if (this.count === 0 && onDrain) onDrain(); } process(listener) { this.onProcess = listener; return this; } done(listener) { this.onDone = listener; return this; } success(listener) { this.onSuccess = listener; return this; } failure(listener) { this.onFailure = listener; return this; } drain(listener) { this.onDrain = listener; return this; } pause() { this.paused = true; return this; } resume() { if (this.waiting.length > 0) { const channels = this.concurrency - this.count; for (let i = 0; i < channels; i++) { this.takeNext(); } } this.paused = false; return this; } priority(flag = true) { this.priorityMode = flag; return this; } pipe(destination) { this.destination = destination; return this; } } // Usage const destination = Queue.channels(2) .wait(5000) .process((task, next) => next(null, { ...task, processed: true })) .done((error, task) => { if (error) console.error(error); console.log({ task }); }); const source = Queue.channels(3) .timeout(4000) .process((task, next) => setTimeout(next, task.interval, null, task)) .pipe(destination); for (let i = 0; i < 10; i++) { source.add({ name: `Task${i}`, interval: 1000 }); }