added priority support#784
Conversation
|
Hi @pengyk , thank you for this pull request! I have the following questions:
I think perhaps the most efficient way to support priority is integrating it natively into work-stealing queue (WSQ). Supporting priority for unbounded WSQ might be a bit challenging now but I feel it is possible for bounded WSQ. Basically, we will maintain |
|
Hi @tsung-wei-huang thanks for taking a look!
graph TD
A[Root Node] --> B[Node 1<br/>Low priority]
A --> C[Node 2<br/>Low priority]
A --> D[Node 3<br/>Low priority]
A --> E[Node 4<br/>High priority]
A --> F[Node 5<br/>High priority]
If we have 3 threads and tasks are made ready in the given order, I think we can guarantee that the high priority tasks will all get executed first. I do agree that this might be a little overkill and might degrade performance, so I am open to removing it. I think the rest is pretty much just a
Let me know if that isn't clear, thanks! |
|
@pengyk Thank you for the response! After a careful review, I suggest the following modification to refactor the pull:
With 2, I suggest the following refactoring with a new class // let's define a concept for work-stealing queue
template <typename Q>
concept BoundedWSQLike = requires(Q& q) {
{ q.steal() };
{ q.pop() };
{ q.try_push() } -> bool ; // unbounded wsq instead has push -> void
}
// top wrapper over WSQ - and yes, I agree with you that LogPriority is not a good idea. Let's just template it linearly
template <BoundedWSQLike Q, size_t MaxPriority>
class BoundedPriorityWSQ {
public:
// here we provide all BoundedWSQ methods with priority i to pin the operation to specific _wsq[i]
template <typename O>
bool try_push(size_t priority, O&& item);
... similarly for other methods
// for method that doesn't have priority, we will, by default, perform the batch operation from i=0, to MaxPriority-1
// note that i=0 means the highest priority
template <typename O>
bool try_push(O&& item);
... similarly for other methods
private:
std::array<Q, MaxPriority> _wsqs;
}
Now, the advantage of this is, we can statically unroll all batch operations (e.g., // let's use the static unroll_until https://github.com/taskflow/taskflow/blob/a215b4578338cc27cba5c4772c4f73b61a538d42/taskflow/utility/traits.hpp#L164
template<auto beg, auto end, auto step, typename F>
constexpr bool unroll_until(F&& f) {
return [&]<auto... Is>(std::index_sequence<Is...>) {
return (f(beg + Is * step) || ...);
}(std::make_index_sequence<(end - beg + step - 1) / step>{});
}
template <BoundedWSQLike Q, size_t MaxPriority>
template <typename O>
bool BoundedPriorityWSQ<Q, MaxPriority>::try_push(O&& item) {
return unroll_until<0, MaxPriority-1, 1>([&](size_t i){ return _wsqs[i].push(std::item); });
}In this case, I don't think we even need |
|
Hi @tsung-wei-huang , Yeah that sounds good! I think this is a cleaner implementation. I am wondering if you have any preferences on the max number of MaxPriority. I think right now it is 3, is that ok or would you want it to be a configurable number? |
|
@pengyk configurable number is definitely better. But we can default it to If you have any questions or any part that you think I can jump in or help, please don't hesitate to let me know. |
|
@tsung-wei-huang Yeah that sounds good! I will try to get something out before end of week, thanks for the help! I will let you know if anything question comes up |
|
Hi @tsung-wei-huang I think I have made the requested changes, let me know if anything is unclear thanks! |
|
@pengyk thank you - I will have a look soon and get back to you! Before that, I will make a release first to include several other changes on profiler, doc examples, and other scheduler-level micro optimization. Really appreciate your contributions! |
Summary
TaskPriority::HIGH,TaskPriority::NORMAL, andTaskPriority::LOWwhen choosing which ready task to execute next_prio_wsq[3]) and a StagingQueue buffer that batches NORMAL/LOW tasks while pushing HIGH tasks immediately_num_prioritizedcounter so the existing run() path incurs zero overheadDesign
Priority-ordered stealing:
_prio_sweep_task()scans all workers and buffers in HIGH → NORMAL → LOW order, guaranteeing no lower-prioritask is taken while a higher-priority task exists anywhere.A macro
TF_ENFORCE_PRIORITY_EXPLOITthat enables cross-worker global sweep during the exploit phaseContinuation cache:
_prio_update_cache()keeps the higher-priority successor in the cache and stages the lower one, preserving the existing zero-atomic-op continuation optimization while respecting priorities.Staging buffer — NORMAL and LOW tasks accumulate in a per-worker. StagingQueue during task invocation and are flushed to priority aware work stealing queues after each invoke via
_prio_flush(). HIGH tasks bypass the buffer entirely.Some benchmarks
Uniform Independent Tasks with Mixed Priorities (128 equal-cost tasks, round-robin H/N/L)
Mean
Median
P90
Fan-Out with Skewed Work Costs (300 tasks: 50 expensive HIGH, 100 medium NORMAL, 150 cheap LOW)
Mean
Median
P90
Large Fan-Out with Skewed Work Costs (1000 tasks: 100 expensive HIGH, 300 medium NORMAL, 600 cheap LOW)
Mean
Median
P90
Few Expensive Tasks Among Many Cheap Ones (10 heavy HIGH "whales" + 1000 near-instant LOW "minnows")
Mean
Median
P90
Layered Pipeline with Dependencies (5 layers x 20 tasks, each layer depends on the previous; HIGH tasks gate downstream work)
Mean
Median
P90
Thread-Matched Scheduling Test (N HIGH tasks + N LOW tasks where N = thread count; HIGH work = N x LOW work, all independent)
Mean
Median
P90
Github gist of the benchmarks https://gist.github.com/pengyk/af6df7e2bbf1ec341473af012b563102