Skip to content

added priority support#784

Open
pengyk wants to merge 1 commit into
taskflow:masterfrom
pengyk:priorities
Open

added priority support#784
pengyk wants to merge 1 commit into
taskflow:masterfrom
pengyk:priorities

Conversation

@pengyk
Copy link
Copy Markdown

@pengyk pengyk commented Apr 20, 2026

Summary

  • Add a priority-aware scheduling pipeline (prioritized_run()) that respects TaskPriority::HIGH, TaskPriority::NORMAL, and TaskPriority::LOW when choosing which ready task to execute next
  • Introduce per-worker priority work-stealing queues (_prio_wsq[3]) and a StagingQueue buffer that batches NORMAL/LOW tasks while pushing HIGH tasks immediately
  • Gate all priority logic behind an atomic _num_prioritized counter so the existing run() path incurs zero overhead

Design

  • Priority-ordered stealing: _prio_sweep_task() scans all workers and buffers in HIGH → NORMAL → LOW order, guaranteeing no lower-prioritask is taken while a higher-priority task exists anywhere.
    A macro TF_ENFORCE_PRIORITY_EXPLOIT that enables cross-worker global sweep during the exploit phase

  • Continuation cache: _prio_update_cache() keeps the higher-priority successor in the cache and stages the lower one, preserving the existing zero-atomic-op continuation optimization while respecting priorities.

  • Staging buffer — NORMAL and LOW tasks accumulate in a per-worker. StagingQueue during task invocation and are flushed to priority aware work stealing queues after each invoke via _prio_flush(). HIGH tasks bypass the buffer entirely.

Some benchmarks

Uniform Independent Tasks with Mixed Priorities (128 equal-cost tasks, round-robin H/N/L)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 11.1 10.6 9.52 -5% -14%
2 9.63 12.9 13.8 +34% +43%
4 18.7 25.4 23.7 +36% +27%
8 28.3 30.2 31.6 +7% +12%
16 26.3 33.0 35.8 +25% +36%
32 27.4 32.7 39.7 +19% +45%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 10.2 9.67 8.67 -5% -15%
2 9.29 12.2 13.3 +31% +43%
4 20.7 26.7 22.2 +29% +7%
8 30.3 30.5 31.6 +1% +4%
16 25.1 32.8 34.3 +31% +37%
32 25.7 31.5 39.0 +23% +52%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 14.8 13.8 11.3 -7% -24%
2 11.0 14.7 17.3 +34% +57%
4 26.4 36.2 32.9 +37% +25%
8 38.1 37.7 39.4 -1% +3%
16 41.5 46.9 47.8 +13% +15%
32 43.3 46.7 48.7 +8% +12%

Fan-Out with Skewed Work Costs (300 tasks: 50 expensive HIGH, 100 medium NORMAL, 150 cheap LOW)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 14.9 17.4 16.9 +17% +13%
2 19.7 22.5 41.7 +14% +112%
4 41.4 33.6 57.2 -19% +38%
8 68.7 48.7 52.7 -29% -23%
16 54.4 56.9 63.0 +5% +16%
32 47.7 52.3 77.9 +10% +63%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 14.8 17.2 16.0 +16% +8%
2 19.7 18.9 32.4 -4% +64%
4 45.4 29.4 59.0 -35% +30%
8 71.8 48.5 51.4 -32% -28%
16 58.7 58.7 58.2 0% -1%
32 40.1 51.8 69.1 +29% +72%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 15.9 18.7 19.2 +18% +21%
2 23.5 38.3 87.8 +63% +274%
4 58.8 50.4 73.8 -14% +26%
8 97.4 57.5 79.5 -41% -18%
16 82.4 72.5 96.7 -12% +17%
32 80.7 75.2 115 -7% +43%

Large Fan-Out with Skewed Work Costs (1000 tasks: 100 expensive HIGH, 300 medium NORMAL, 600 cheap LOW)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 27.4 36.8 34.0 +34% +24%
2 73.3 83.7 102 +14% +39%
4 138 109 95.7 -21% -31%
8 200 126 140 -37% -30%
16 166 124 167 -25% +1%
32 127 120 204 -6% +61%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 26.3 36.3 32.7 +38% +24%
2 68.0 78.7 99.6 +16% +46%
4 147 111 84.0 -24% -43%
8 210 128 141 -39% -33%
16 181 124 155 -31% -14%
32 135 123 195 -9% +44%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 29.7 40.9 38.0 +38% +28%
2 85.7 94.5 113 +10% +32%
4 160 128 162 -20% +1%
8 240 142 157 -41% -35%
16 224 156 238 -30% +6%
32 203 140 278 -31% +37%

Few Expensive Tasks Among Many Cheap Ones (10 heavy HIGH "whales" + 1000 near-instant LOW "minnows")

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 26.4 40.6 35.6 +54% +35%
2 85.0 85.2 109 0% +28%
4 127 120 100 -6% -21%
8 231 117 149 -49% -35%
16 126 132 183 +5% +45%
32 130 125 215 -4% +65%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 25.6 35.9 34.6 +40% +35%
2 72.5 75.5 106 +4% +46%
4 133 124 91.9 -7% -31%
8 241 123 149 -49% -38%
16 134 128 168 -4% +25%
32 144 124 203 -14% +41%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 30.8 51.4 39.2 +67% +27%
2 157 128 120 -18% -24%
4 162 149 144 -8% -11%
8 292 152 159 -48% -46%
16 190 153 256 -19% +35%
32 205 161 294 -21% +43%

Layered Pipeline with Dependencies (5 layers x 20 tasks, each layer depends on the previous; HIGH tasks gate downstream work)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 13.1 14.2 13.2 +8% +1%
2 25.9 29.8 23.9 +15% -8%
4 40.3 41.4 46.2 +3% +15%
8 44.1 45.4 48.9 +3% +11%
16 47.3 43.8 44.6 -7% -6%
32 44.0 45.8 50.5 +4% +15%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 12.8 13.5 12.8 +5% 0%
2 18.6 21.9 24.8 +18% +33%
4 42.2 45.0 44.8 +7% +6%
8 47.5 48.3 51.5 +2% +8%
16 46.7 38.7 36.6 -17% -22%
32 41.3 40.1 41.6 -3% +1%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 16.7 18.4 14.7 +10% -12%
2 49.6 57.3 31.8 +16% -36%
4 66.7 67.4 69.8 +1% +5%
8 60.6 68.3 68.9 +13% +14%
16 74.3 74.2 81.7 0% +10%
32 73.8 76.8 93.4 +4% +27%

Thread-Matched Scheduling Test (N HIGH tasks + N LOW tasks where N = thread count; HIGH work = N x LOW work, all independent)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 8.16 17.7 6.43 +117% -21%
2 7.95 8.67 7.54 +9% -5%
4 10.7 8.99 10.3 -16% -4%
8 12.5 11.8 13.1 -6% +5%
16 18.8 19.9 23.0 +6% +22%
32 51.7 23.7 33.1 -54% -36%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 7.75 12.0 7.04 +55% -9%
2 7.40 8.17 7.75 +10% +5%
4 7.92 7.98 10.5 +1% +33%
8 11.6 11.1 13.3 -4% +15%
16 18.1 18.7 23.4 +3% +29%
32 22.2 21.3 32.8 -4% +48%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 12.5 21.0 8.87 +68% -29%
2 14.8 15.3 9.92 +3% -33%
4 18.4 15.3 13.3 -17% -28%
8 21.5 19.1 19.0 -11% -12%
16 30.7 31.8 32.4 +4% +6%
32 47.3 34.3 42.5 -27% -10%

Github gist of the benchmarks https://gist.github.com/pengyk/af6df7e2bbf1ec341473af012b563102

@pengyk pengyk marked this pull request as ready for review April 21, 2026 20:14
@tsung-wei-huang
Copy link
Copy Markdown
Member

Hi @pengyk , thank you for this pull request! I have the following questions:

  1. It's not very clear to me the purpose of Staging Queue. Could you please elaborate it a bit more?
  2. I was thinking each worker only keeps a std::array<wsq_type, 3> _wsqs; where HIGH_PRIORITY=0, MEDIUM_PRIORITY=1, and LOW_PRIORITY=2, so we can simply iterate each of the three queues in order. In this case, steal and pop at the executor level will remain pretty much the same but with additional loop for priority first. pop is easier since we can just insert a node to the corresponding _wsqs[node->_priority].

I think perhaps the most efficient way to support priority is integrating it natively into work-stealing queue (WSQ). Supporting priority for unbounded WSQ might be a bit challenging now but I feel it is possible for bounded WSQ. Basically, we will maintain P priority splits on the ring buffer, where P can be 1 (default), 2, 4, etc. Each of the split works as a standalone WSQ in a single storage. We want P to be power of two so we can use efficient bit masking. In this case, the queue can natively support priority scheduling without any additional structure like std::array<wsq_type, 3> _wsqs;. Thought?

template <
  typename T, 
  size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE,
  size_t LogSplitSize = 0  // default 2^0 = 1, no priority support
> 
class BoundedWSQ {
  ... // we will support all priority-based push/pop/steal here.
}

@pengyk
Copy link
Copy Markdown
Author

pengyk commented May 18, 2026

Hi @tsung-wei-huang thanks for taking a look!

  1. It is just to reduce the chances of a situation where, as worker is pushing its successors, another thread will not greedily steal a low priority task. It will "hide" the normal and low priority tasks and only expose the high priority tasks at first so that high priority tasks are guaranteed to be made visible first to be stolen. Imagine a graph like this
graph TD
    A[Root Node] --> B[Node 1<br/>Low priority]
    A --> C[Node 2<br/>Low priority]
    A --> D[Node 3<br/>Low priority]
    A --> E[Node 4<br/>High priority]
    A --> F[Node 5<br/>High priority]
Loading

If we have 3 threads and tasks are made ready in the given order, I think we can guarantee that the high priority tasks will all get executed first.

I do agree that this might be a little overkill and might degrade performance, so I am open to removing it.

I think the rest is pretty much just a std::array<wsq_type, 3> _wsqs except that I defined to be called _prio_wsq

  1. This is a good idea! My one concern is that I think that right now we can execute multiple "topologies" on the same resources. So let's say we have one graph with 1 priority level, another with 2 and another with 8, wouldn't it be hard to keep track of which index is available for pushes? Also, would the semantics for defining the priority be harder to reason? Right now it is simply 3 hard coded enums for high/medium/low.

Let me know if that isn't clear, thanks!

@tsung-wei-huang
Copy link
Copy Markdown
Member

tsung-wei-huang commented May 22, 2026

@pengyk Thank you for the response! After a careful review, I suggest the following modification to refactor the pull:

  1. Given the ultimate randomness of the work-stealing scheduler, I agree with you that Staging Queue is an overkill. The cost of its seems overshadows its benefit.
  2. I would suggest let's focus on having priority-based scheduling at the worker level first, rather than the centralized spilling queue. The design of this centralized spilling queue is still being tested and evaluated in terms of its space and runtime tradeoff, and my require some future changes (e.g., NUMA adoption). On the other hand, the worker-level queue design is relatively fixed and stable. So let's focus on brining priority support to worker-level queue for now.

With 2, I suggest the following refactoring with a new class BoundedPriorityWSQ that abstract all the pop/steal/push operations with priority support so we can minimize the modification of tf::Executor

// let's define a concept for work-stealing queue
template <typename Q>
concept BoundedWSQLike = requires(Q& q) {
  { q.steal() };
  { q.pop() };
  { q.try_push() } -> bool ;  // unbounded wsq instead has push -> void
}

// top wrapper over WSQ - and yes, I agree with you that LogPriority is not a good idea. Let's just template it linearly
template <BoundedWSQLike Q, size_t MaxPriority>
class BoundedPriorityWSQ {
  
  public:
    // here we provide all BoundedWSQ methods with priority i to pin the operation to specific _wsq[i]
   template <typename O>   
   bool try_push(size_t priority, O&& item);

   ... similarly for other methods

   // for method that doesn't have priority, we will, by default, perform the batch operation from i=0, to MaxPriority-1   
   // note that i=0 means the highest priority
  template <typename O> 
  bool try_push(O&& item);
  
  ... similarly for other methods
   
  private:
  std::array<Q, MaxPriority> _wsqs;
}

Now, the advantage of this is, we can statically unroll all batch operations (e.g., BoundedPriorityWSQ::try_push(O&& item) in correct priority level.

// let's use the static unroll_until https://github.com/taskflow/taskflow/blob/a215b4578338cc27cba5c4772c4f73b61a538d42/taskflow/utility/traits.hpp#L164
template<auto beg, auto end, auto step, typename F>
constexpr bool unroll_until(F&& f) {
  return [&]<auto... Is>(std::index_sequence<Is...>) {
    return (f(beg + Is * step) || ...);
  }(std::make_index_sequence<(end - beg + step - 1) / step>{});
}


template <BoundedWSQLike Q, size_t MaxPriority>
template <typename O> 
bool BoundedPriorityWSQ<Q, MaxPriority>::try_push(O&& item) {
    return unroll_until<0, MaxPriority-1, 1>([&](size_t i){  return _wsqs[i].push(std::item); });
}

In this case, I don't think we even need prioritized_run? Thought?

@pengyk
Copy link
Copy Markdown
Author

pengyk commented May 26, 2026

Hi @tsung-wei-huang ,

Yeah that sounds good! I think this is a cleaner implementation. I am wondering if you have any preferences on the max number of MaxPriority. I think right now it is 3, is that ok or would you want it to be a configurable number?

@tsung-wei-huang
Copy link
Copy Markdown
Member

@pengyk configurable number is definitely better. But we can default it to 3. Also, regarding setting the priority, instead of having a separate _priority member in tf::Node, we may reuse its _nstate like reserving 2 bits for priority so we can save some space (see NSTATEtaskflow/core/error.hpp).

template <BoundedWSQLike Q, size_t MaxPriority = 3>
template <typename O> 
bool BoundedPriorityWSQ<Q, MaxPriority>::try_push(O&& item) {
    return unroll_until<0, MaxPriority-1, 1>([&](size_t i){  return _wsqs[i].push(std::item); });
}

If you have any questions or any part that you think I can jump in or help, please don't hesitate to let me know.

@pengyk
Copy link
Copy Markdown
Author

pengyk commented May 26, 2026

@tsung-wei-huang Yeah that sounds good! I will try to get something out before end of week, thanks for the help! I will let you know if anything question comes up

@pengyk
Copy link
Copy Markdown
Author

pengyk commented Jun 1, 2026

Hi @tsung-wei-huang I think I have made the requested changes, let me know if anything is unclear thanks!

@tsung-wei-huang
Copy link
Copy Markdown
Member

@pengyk thank you - I will have a look soon and get back to you! Before that, I will make a release first to include several other changes on profiler, doc examples, and other scheduler-level micro optimization. Really appreciate your contributions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants