Replace mutex-protected UnboundedWSQ overflow buffers with lock-free MPMC ring#770
Replace mutex-protected UnboundedWSQ overflow buffers with lock-free MPMC ring#770Mattbusel wants to merge 1 commit into
Conversation
…-free MPMC ring The executor uses a set of overflow buffers to hold tasks that do not fit in a worker's bounded local queue. Each buffer was a struct containing a std::mutex and an UnboundedWSQ. Every push and steal on an overflow buffer acquired that mutex, serializing all threads that hash to the same slot. Under workloads with high task fanout or large semaphore waiter lists, this becomes a contention bottleneck. This change replaces those buffers with a lock-free bounded MPMC ring queue (MPMCQueue<T>, implemented in wsq.hpp) based on the Vyukov MPMC algorithm. Every enqueue and dequeue is a single CAS on a sequence number with no mutex and no dynamic allocation after construction. Implementation MPMCQueue<T> uses a fixed-size power-of-two slot array. Each slot holds an atomic sequence number and a data pointer. The producer reserves a slot by doing a CAS on _enqueue_pos and writes data followed by a release store on the sequence. The consumer similarly CAS-advances _dequeue_pos, reads data, and recycles the slot with a release store. The capacity must be a power of two, which is enforced in the constructor. Only pointer element types are supported; nullptr is the empty sentinel. In the executor, the Buffer struct is removed. _buffers becomes a vector<unique_ptr<MPMCQueue<Node*>>>. MPMCQueue contains std::atomic fields and is not movable, so unique_ptr is required to allow vector storage. The buffer count remains log2(N) workers, which empirical results show is the sweet spot for the steal hash spread. Each buffer is allocated with capacity bit_ceil(max(2^20, N * 1024)). The 2^20 floor ensures that large bursts from semaphore releases or subflow expansions do not stall in _spill. If a buffer is full, _spill yields and retries rather than growing, which bounds memory use without introducing a second allocation path. The steal paths in _explore_task and _corun_until and the emptiness check in _wait_for_task all use the new interface directly. The _spill and _bulk_spill procedures drop their scoped_lock calls. Testing All 45 test_wsq cases pass (17721330 assertions). All 87 test_work_stealing cases pass (42240 assertions). All 45 test_basics cases pass (7204149 assertions).
|
@Mattbusel thank you for the pull request! There is a reason why unbounded queue is needed here rather than MPMC (indeed, our utility folder has this MPMC implementation). The primary reasons is that it is possible to have many tasks each inserting thousands or millions of tasks (e.g., task-parallel timing analysis OpenTimer). In this case, if the centralized queue is bounded, deadlock will occur - all workers try to push tasks but the queues are all full. Also, the constructor of MPMC is very expensive since it iterate every element to initialize the state - You can clearly see the performance drop down in the current unit tests when switching to MPMC-based solution. |
|
That makes sense. I hadn't accounted for the OpenTimer-scale workloads where tasks fan out into millions of subtasks per worker. A bounded queue deadlocks there regardless of capacity. The constructor cost from sequence initialization is a good point too, especially at the 2^20 floor. |
|
@Mattbusel no problem. Really appreciate your interest and contributions! I will close this issue for now. Some thought: maybe it's more worthy looking into the making of UnboundedWSQ thread-safe so we don't need to protect each with a mutex. |
The executor holds
log2(N)overflow buffers used when a worker's bounded local queue is full. Each buffer was a struct containing astd::mutexand anUnboundedWSQ. Every push in_spill/_bulk_spilland every steal in_explore_taskand_corun_untilacquired that mutex. Under workloads with high task fanout -- large semaphore waiter lists, wide subflows, bulk scheduling after a corun -- multiple workers hash to the same buffer simultaneously and serialize on that lock.The replacement
MPMCQueue<T>(added towsq.hpp) implements the Vyukov bounded MPMC algorithm. Each slot holds an atomic sequence number and a data pointer. The producer reserves a slot by CAS-advancing_enqueue_posand commits data with a release store on the sequence. The consumer CAS-advances_dequeue_pos, reads data, and recycles the slot with a release store. Every enqueue and dequeue is a single CAS with no mutex and no dynamic allocation after construction. Capacity must be a power of two, enforced in the constructor. Only pointer element types are supported;nullptris the empty sentinel.In the executor the
Bufferstruct is removed._buffersbecomesvector<unique_ptr<MPMCQueue<Node*>>>.MPMCQueuecontainsstd::atomicfields and is not movable, sounique_ptris required for vector storage.Buffer count stays at
log2(N), which the existing empirical results favor for steal spread. Each buffer is allocated with capacitybit_ceil(max(2^20, N * 1024)). The floor at 2^20 (1M slots, 8 MB per buffer) ensures large semaphore burst releases do not stall in_spill. If a buffer fills,_spillyields and retries rather than growing dynamically, which keeps memory use bounded without a secondary allocation path.The
_spilland_bulk_spillprocedures drop theirscoped_lockcalls. The steal paths in_explore_taskand_corun_untiland the emptiness check in_wait_for_taskuse the new interface directly.Testing
All 45
test_wsqcases pass (17721330 assertions).All 87
test_work_stealingcases pass (42240 assertions).All 45
test_basicscases pass (7204149 assertions), includingRunAndWait.Complexwhich exercises the corun steal path.