Skip to content

Replace mutex-protected UnboundedWSQ overflow buffers with lock-free MPMC ring#770

Closed
Mattbusel wants to merge 1 commit into
taskflow:masterfrom
Mattbusel:perf/mpmc-overflow-buffers
Closed

Replace mutex-protected UnboundedWSQ overflow buffers with lock-free MPMC ring#770
Mattbusel wants to merge 1 commit into
taskflow:masterfrom
Mattbusel:perf/mpmc-overflow-buffers

Conversation

@Mattbusel

Copy link
Copy Markdown
Contributor

The executor holds log2(N) overflow buffers used when a worker's bounded local queue is full. Each buffer was a struct containing a std::mutex and an UnboundedWSQ. Every push in _spill / _bulk_spill and every steal in _explore_task and _corun_until acquired that mutex. Under workloads with high task fanout -- large semaphore waiter lists, wide subflows, bulk scheduling after a corun -- multiple workers hash to the same buffer simultaneously and serialize on that lock.

The replacement

MPMCQueue<T> (added to wsq.hpp) implements the Vyukov bounded MPMC algorithm. Each slot holds an atomic sequence number and a data pointer. The producer reserves a slot by CAS-advancing _enqueue_pos and commits data with a release store on the sequence. The consumer CAS-advances _dequeue_pos, reads data, and recycles the slot with a release store. Every enqueue and dequeue is a single CAS with no mutex and no dynamic allocation after construction. Capacity must be a power of two, enforced in the constructor. Only pointer element types are supported; nullptr is the empty sentinel.

In the executor the Buffer struct is removed. _buffers becomes vector<unique_ptr<MPMCQueue<Node*>>>. MPMCQueue contains std::atomic fields and is not movable, so unique_ptr is required for vector storage.

Buffer count stays at log2(N), which the existing empirical results favor for steal spread. Each buffer is allocated with capacity bit_ceil(max(2^20, N * 1024)). The floor at 2^20 (1M slots, 8 MB per buffer) ensures large semaphore burst releases do not stall in _spill. If a buffer fills, _spill yields and retries rather than growing dynamically, which keeps memory use bounded without a secondary allocation path.

The _spill and _bulk_spill procedures drop their scoped_lock calls. The steal paths in _explore_task and _corun_until and the emptiness check in _wait_for_task use the new interface directly.

Testing

All 45 test_wsq cases pass (17721330 assertions).
All 87 test_work_stealing cases pass (42240 assertions).
All 45 test_basics cases pass (7204149 assertions), including RunAndWait.Complex which exercises the corun steal path.

…-free MPMC ring

The executor uses a set of overflow buffers to hold tasks that do not fit
in a worker's bounded local queue. Each buffer was a struct containing a
std::mutex and an UnboundedWSQ. Every push and steal on an overflow buffer
acquired that mutex, serializing all threads that hash to the same slot.
Under workloads with high task fanout or large semaphore waiter lists, this
becomes a contention bottleneck.

This change replaces those buffers with a lock-free bounded MPMC ring queue
(MPMCQueue<T>, implemented in wsq.hpp) based on the Vyukov MPMC algorithm.
Every enqueue and dequeue is a single CAS on a sequence number with no
mutex and no dynamic allocation after construction.

Implementation

MPMCQueue<T> uses a fixed-size power-of-two slot array. Each slot holds an
atomic sequence number and a data pointer. The producer reserves a slot by
doing a CAS on _enqueue_pos and writes data followed by a release store on
the sequence. The consumer similarly CAS-advances _dequeue_pos, reads data,
and recycles the slot with a release store. The capacity must be a power of
two, which is enforced in the constructor. Only pointer element types are
supported; nullptr is the empty sentinel.

In the executor, the Buffer struct is removed. _buffers becomes a
vector<unique_ptr<MPMCQueue<Node*>>>. MPMCQueue contains std::atomic fields
and is not movable, so unique_ptr is required to allow vector storage.

The buffer count remains log2(N) workers, which empirical results show is
the sweet spot for the steal hash spread. Each buffer is allocated with
capacity bit_ceil(max(2^20, N * 1024)). The 2^20 floor ensures that large
bursts from semaphore releases or subflow expansions do not stall in _spill.
If a buffer is full, _spill yields and retries rather than growing, which
bounds memory use without introducing a second allocation path.

The steal paths in _explore_task and _corun_until and the emptiness check
in _wait_for_task all use the new interface directly. The _spill and
_bulk_spill procedures drop their scoped_lock calls.

Testing

All 45 test_wsq cases pass (17721330 assertions).
All 87 test_work_stealing cases pass (42240 assertions).
All 45 test_basics cases pass (7204149 assertions).
@tsung-wei-huang

tsung-wei-huang commented Mar 14, 2026

Copy link
Copy Markdown
Member

@Mattbusel thank you for the pull request! There is a reason why unbounded queue is needed here rather than MPMC (indeed, our utility folder has this MPMC implementation). The primary reasons is that it is possible to have many tasks each inserting thousands or millions of tasks (e.g., task-parallel timing analysis OpenTimer). In this case, if the centralized queue is bounded, deadlock will occur - all workers try to push tasks but the queues are all full.

Also, the constructor of MPMC is very expensive since it iterate every element to initialize the state - You can clearly see the performance drop down in the current unit tests when switching to MPMC-based solution.

@Mattbusel

Copy link
Copy Markdown
Contributor Author

That makes sense. I hadn't accounted for the OpenTimer-scale workloads where tasks fan out into millions of subtasks per worker. A bounded queue deadlocks there regardless of capacity. The constructor cost from sequence initialization is a good point too, especially at the 2^20 floor.

@tsung-wei-huang

Copy link
Copy Markdown
Member

@Mattbusel no problem. Really appreciate your interest and contributions! I will close this issue for now.

Some thought: maybe it's more worthy looking into the making of UnboundedWSQ thread-safe so we don't need to protect each with a mutex.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants