From e6bbcadb76d02486dbc9badfbb444bebef9c401e Mon Sep 17 00:00:00 2001 From: ypeng216 Date: Sun, 31 May 2026 23:42:44 -0400 Subject: [PATCH] add priorities --- taskflow/core/error.hpp | 8 +- taskflow/core/executor.hpp | 74 ++- taskflow/core/graph.hpp | 47 +- taskflow/core/task.hpp | 53 ++ taskflow/core/taskflow.hpp | 8 + taskflow/core/worker.hpp | 6 +- taskflow/core/wsq.hpp | 96 +++ unittests/CMakeLists.txt | 1 + unittests/test_graph_priorities.cpp | 989 ++++++++++++++++++++++++++++ 9 files changed, 1241 insertions(+), 41 deletions(-) create mode 100644 unittests/test_graph_priorities.cpp diff --git a/taskflow/core/error.hpp b/taskflow/core/error.hpp index ba1d35150..ea01af9d4 100644 --- a/taskflow/core/error.hpp +++ b/taskflow/core/error.hpp @@ -21,8 +21,12 @@ struct NSTATE { constexpr static underlying_type RETAIN_SUBFLOW = 0x40000000; constexpr static underlying_type JOINED_SUBFLOW = 0x80000000; - // mask to isolate state bits - non-state bits store # weak dependents - constexpr static underlying_type STRONG_DEPENDENCIES_MASK = 0x0FFFFFFF; + // priority encoding (2 bits): 00=UNSET, 01=HIGH, 10=NORMAL, 11=LOW + constexpr static underlying_type PRIORITY_SHIFT = 26; + constexpr static underlying_type PRIORITY_MASK = 0x0C000000; + + // mask to isolate state bits - non-state bits store # strong dependents + constexpr static underlying_type STRONG_DEPENDENCIES_MASK = 0x03FFFFFF; }; using nstate_t = NSTATE::underlying_type; diff --git a/taskflow/core/executor.hpp b/taskflow/core/executor.hpp index 38f3d28d0..732e80d7e 100644 --- a/taskflow/core/executor.hpp +++ b/taskflow/core/executor.hpp @@ -1104,7 +1104,7 @@ requires (!std::same_as, AsyncTask>) struct Buffer { std::mutex mutex; UnboundedWSQ queue; - }; + }; std::vector _workers; std::vector _buffers; @@ -1366,7 +1366,7 @@ inline void Executor::_spawn(size_t N, std::shared_ptr wif) { // Function: _explore_task inline bool Executor::_explore_task(Worker& w, Node*& t) { - + // Fast path: if no topologies are live, all queues are guaranteed empty // by the executor's invariant (num_topologies reaches zero only after all // nodes have been scheduled and their queues flushed). Skip the entire @@ -1376,45 +1376,45 @@ inline bool Executor::_explore_task(Worker& w, Node*& t) { if(_num_topologies.load(std::memory_order_relaxed) == 0) { return true; } - + const size_t MAX_VICTIM = num_queues(); // guaranteed >= 2 by constructor const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1); - + // local aliases for steal protocol sentinels — these are properties of the // steal protocol, not of any specific queue type size_t num_steals = 0; size_t vtm = w._sticky_victim; - + while(true) { - + t = (vtm < _workers.size()) ? _workers[vtm]._wsq.steal() : _buffers[vtm - _workers.size()].queue.steal(); - + if(t) { w._sticky_victim = vtm; break; } - + // EMPTY: pick a new victim excluding self since our own queue is likely empty. // map [0, MAX_VICTIM-1) over [0, MAX_VICTIM) \ {w._id} — always safe since MAX_VICTIM >= 2. vtm = w._rdgen() % (MAX_VICTIM - 1); if(vtm >= w._id) vtm++; - + if(++num_steals > MAX_STEALS) { std::this_thread::yield(); if(num_steals > 150 + MAX_STEALS) { break; } } - + if(w._done.test(std::memory_order_relaxed)) { return false; } } - + return true; -} +} /* // Function: _explore_task @@ -1487,7 +1487,7 @@ inline bool Executor::_wait_for_task(Worker& w, Node*& t) { if(_explore_task(w, t) == false) { return false; } - + // Go exploit the task if we successfully steal one. if(t) { return true; @@ -1510,7 +1510,7 @@ inline bool Executor::_wait_for_task(Worker& w, Node*& t) { _notifier.commit_wait(w._id); goto explore_task; } - + // Condition #1: buffers should be empty for(size_t b=0; b<_buffers.size(); ++b) { if(!_buffers[b].queue.empty()) { @@ -1519,7 +1519,7 @@ inline bool Executor::_wait_for_task(Worker& w, Node*& t) { goto explore_task; } } - + // Condition #2: worker queues should be empty // Note: We need to use index-based looping to avoid data race with _spawn // which initializes other worker data structure at the same time. @@ -1532,13 +1532,13 @@ inline bool Executor::_wait_for_task(Worker& w, Node*& t) { goto explore_task; } } - + // Condition #3: worker should be alive if(w._done.test(std::memory_order_relaxed)) { _notifier.cancel_wait(w._id); return false; } - + // Now I really need to relinquish myself to others. _notifier.commit_wait(w._id); goto explore_task; @@ -1650,13 +1650,19 @@ void Executor::_bulk_schedule(Worker& worker, I first, size_t num_nodes) { return; } - // NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)). - // This is because when a node v is inserted into the queue, v can run and finish - // immediately. If v is the last node in the graph, it will tear down the parent task vector - // which cause the last ++first to fail. This problem is specific to MSVC which has a stricter - // iterator implementation in std::vector than GCC/Clang. - if(auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) { - _bulk_spill(first, num_nodes - n); + auto [n, uniform] = worker._wsq.try_bulk_push(first, num_nodes); + if(n != num_nodes) { + if(uniform) { + for(size_t i = n; i < num_nodes; ++i) { + _spill(first[i]); + } + } else { + for(size_t i = 0; i < num_nodes; ++i) { + if(!worker._wsq.try_push(first[i])) { + _spill(first[i]); + } + } + } } _notifier.notify_n(num_nodes); @@ -1687,9 +1693,15 @@ inline void Executor::_bulk_schedule(I first, size_t num_nodes) { // Function: _update_cache TF_FORCE_INLINE void Executor::_update_cache(Worker& worker, Node*& cache, Node* node) { if(cache) { - _schedule(worker, cache); + if(node->_priority_queue() < cache->_priority_queue()) { + _schedule(worker, cache); + cache = node; + } else { + _schedule(worker, node); + } + } else { + cache = node; } - cache = node; } // Function: _bulk_update_cache @@ -1707,7 +1719,7 @@ TF_FORCE_INLINE void Executor::_bulk_update_cache( } cache = node; } - + // Procedure: _invoke inline void Executor::_invoke(Worker& worker, Node* node) { @@ -1743,7 +1755,7 @@ inline void Executor::_invoke(Worker& worker, Node* node) { return; } } - + invoke_task: SmallVector conds; @@ -2438,7 +2450,7 @@ inline size_t Executor::_set_up_graph(Graph& graph, Topology* tpg, NodeBase* par auto node = *first; node->_topology = tpg; node->_parent = parent; - node->_nstate = NSTATE::NONE; + node->_nstate &= NSTATE::PRIORITY_MASK; node->_estate.store(ESTATE::NONE, std::memory_order_relaxed); node->_set_up_join_counter(); node->_exception_ptr = nullptr; @@ -2482,7 +2494,7 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*& tpg = f._topologies.front().get(); lock.unlock(); - + // Soon after we carry out the promise, the associate taskflow may got destroyed // from the user side, and we should never tough it again. fetched_tpg->_carry_out_promise(); @@ -2501,7 +2513,7 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*& f._topologies.pop(); lock.unlock(); - + // Soon after we carry out the promise, the associate taskflow may got destroyed // from the user side, and we should never tough it again. fetched_tpg->_carry_out_promise(); diff --git a/taskflow/core/graph.hpp b/taskflow/core/graph.hpp index e8c811b60..2ad5280ac 100644 --- a/taskflow/core/graph.hpp +++ b/taskflow/core/graph.hpp @@ -132,6 +132,30 @@ class Graph { Node* _emplace_back(ArgsT&&...); }; +// ---------------------------------------------------------------------------- +// TaskPriority +// ---------------------------------------------------------------------------- + +/** +@enum TaskPriority + +@brief enumeration of all task priority levels + +A priority level determines the order in which a task is picked up by a worker +when tasks are ready to execute. Tasks with higher priority (lower numerical value) +are executed before tasks with lower priority (higher numerical value). +*/ +enum class TaskPriority : unsigned { + /** @brief the highest task priority level (most urgent) */ + HIGH = 0, + /** @brief the normal task priority level */ + NORMAL = 1, + /** @brief the lowest task priority level (least urgent) */ + LOW = 2, +}; + +/** @brief total number of task priority levels */ + // ---------------------------------------------------------------------------- // TaskParams // ---------------------------------------------------------------------------- @@ -238,24 +262,37 @@ class NodeBase { friend class TaskGroup; friend class Algorithm; + public: + + size_t _priority_queue() const { + return static_cast((_nstate & NSTATE::PRIORITY_MASK) >> NSTATE::PRIORITY_SHIFT); + } + protected: - + nstate_t _nstate {NSTATE::NONE}; std::atomic _estate {ESTATE::NONE}; NodeBase* _parent {nullptr}; std::atomic _join_counter {0}; - + std::exception_ptr _exception_ptr {nullptr}; NodeBase() = default; NodeBase(nstate_t nstate, estate_t estate, NodeBase* parent, size_t join_counter) : - _nstate {nstate}, + _nstate {nstate}, _estate {estate}, _parent {parent}, _join_counter {join_counter} { } + + // encode TaskPriority (HIGH=0,NORMAL=1,LOW=2) as +1 into bits 26-27 of _nstate, + // reserving 00 for UNSET (no priority explicitly assigned) + void _set_priority(TaskPriority p) { + nstate_t encoded = (static_cast(p) + 1) << NSTATE::PRIORITY_SHIFT; + _nstate = (_nstate & ~NSTATE::PRIORITY_MASK) | encoded; + } void _rethrow_exception() { if(_exception_ptr) { @@ -300,7 +337,7 @@ class Topology : public NodeBase { std::function _predicate; std::function _on_finish; - + void _carry_out_promise(); }; @@ -513,7 +550,7 @@ class Node : public NodeBase { std::string _name; void* _data {nullptr}; - + Topology* _topology {nullptr}; size_t _num_successors {0}; diff --git a/taskflow/core/task.hpp b/taskflow/core/task.hpp index af6745bb8..6a97036bb 100644 --- a/taskflow/core/task.hpp +++ b/taskflow/core/task.hpp @@ -998,6 +998,43 @@ class Task { */ Task& data(void* data); + + /** + @brief assigns a priority level to the task + + @param p the priority level (tf::TaskPriority::HIGH, tf::TaskPriority::NORMAL, or tf::TaskPriority::LOW) + @return @c *this + + A task with a higher priority (lower numerical value) will be executed + before tasks with lower priority (higher numerical value) when they are + both ready to run. + + @code{.cpp} + tf::Taskflow taskflow; + auto [A, B, C] = taskflow.emplace( + [](){}, + [](){}, + [](){} + ); + A.priority(tf::TaskPriority::HIGH); + B.priority(tf::TaskPriority::NORMAL); + C.priority(tf::TaskPriority::LOW); + @endcode + */ + Task& priority(TaskPriority p); + + /** + @brief queries the priority level of the task + + @return the priority level of the task + + @code{.cpp} + tf::Task task = taskflow.emplace([](){}); + task.priority(tf::TaskPriority::HIGH); + assert(task.priority() == tf::TaskPriority::HIGH); + @endcode + */ + TaskPriority priority() const; /** @brief resets the task handle to null @@ -1522,6 +1559,22 @@ inline Task& Task::data(void* data) { return *this; } +// Function: priority +inline TaskPriority Task::priority() const { + auto bits = _node->_priority_queue(); + // 0=UNSET→HIGH, 1=HIGH, 2=NORMAL, 3=LOW + constexpr TaskPriority map[] = { + TaskPriority::HIGH, TaskPriority::HIGH, TaskPriority::NORMAL, TaskPriority::LOW + }; + return map[bits]; +} + +// Function: priority +inline Task& Task::priority(TaskPriority p) { + _node->_set_priority(p); + return *this; +} + // ---------------------------------------------------------------------------- // global ostream // ---------------------------------------------------------------------------- diff --git a/taskflow/core/taskflow.hpp b/taskflow/core/taskflow.hpp index 019533977..d71819424 100644 --- a/taskflow/core/taskflow.hpp +++ b/taskflow/core/taskflow.hpp @@ -487,6 +487,14 @@ inline void Taskflow::_dump( os << ind << 'p' << node << "[label=\""; if(node->_name.empty()) os << 'p' << node; else os << node->_name; + + switch (node->_priority_queue()) { + case 0: break; // UNSET — no priority label + case 1: os << "\\npriority: HIGH"; break; + case 2: os << "\\npriority: NORMAL"; break; + case 3: os << "\\npriority: LOW"; break; + default: break; + } os << "\" "; // shape of the node diff --git a/taskflow/core/worker.hpp b/taskflow/core/worker.hpp index 1002c4b74..48be829e2 100644 --- a/taskflow/core/worker.hpp +++ b/taskflow/core/worker.hpp @@ -58,7 +58,7 @@ class Worker { friend class Runtime; friend class WorkerView; - using wsq_type = BoundedWSQ; + using wsq_type = BoundedPriorityWSQ, 3>; public: @@ -159,12 +159,12 @@ inline size_t WorkerView::id() const { // Function: queue_size inline size_t WorkerView::queue_size() const { - return _worker._wsq.size(); + return _worker.queue_size(); } // Function: queue_capacity inline size_t WorkerView::queue_capacity() const { - return static_cast(_worker._wsq.capacity()); + return _worker.queue_capacity(); } // ---------------------------------------------------------------------------- diff --git a/taskflow/core/wsq.hpp b/taskflow/core/wsq.hpp index 473ef9abf..769700c96 100644 --- a/taskflow/core/wsq.hpp +++ b/taskflow/core/wsq.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include "../utility/macros.hpp" @@ -880,5 +881,100 @@ constexpr size_t BoundedWSQ::capacity() const { return BufferSize; } +template +concept BoundedWSQLike = requires(Q& q, typename Q::value_type v) { + { q.steal() }; + { q.pop() }; + { q.try_push(v) } -> std::same_as; +}; + +template +class BoundedPriorityWSQ { + + static_assert(MaxPriority >= 1); + + public: + + using value_type = typename Q::value_type; + + /** + @brief tries to insert an item into the queue determined by the item's + priority (UNSET/HIGH -> queue 0, NORMAL -> queue 1, LOW -> queue 2) + */ + template + bool try_push(O&& item) { + constexpr size_t queue_map[] = {0, 0, 1, 2}; + return _wsqs[queue_map[item->_priority_queue()]].try_push(std::forward(item)); + } + + struct BulkPushResult { + size_t count; + bool uniform; + }; + + /** + tries to insert a batch of items, routing each by its priority + + Fast path: if all items map to the same queue, delegates to the + underlying queue's bulk push (single atomic bottom update). + Returns {count, true} where items [0, count) are pushed contiguously. + + Mixed priorities: returns {0, false} without pushing anything — the + caller should fall back to per-item try_push with its own overflow logic. + */ + template + BulkPushResult try_bulk_push(I first, size_t N) { + constexpr size_t queue_map[] = {0, 0, 1, 2}; + size_t q0 = queue_map[first[0]->_priority_queue()]; + for(size_t i = 1; i < N; ++i) { + if(queue_map[first[i]->_priority_queue()] != q0) { + return {0, false}; + } + } + return {_wsqs[q0].try_bulk_push(first, N), true}; + } + + value_type pop() { + value_type result = Q::empty_value(); + unroll_until([&](auto i) { + result = _wsqs[i].pop(); + return result != Q::empty_value(); + }); + return result; + } + + value_type steal() { + value_type result = Q::empty_value(); + unroll_until([&](auto i) { + result = _wsqs[i].steal(); + return result != Q::empty_value(); + }); + return result; + } + + bool empty() const noexcept { + for(size_t i = 0; i < MaxPriority; ++i) { + if(!_wsqs[i].empty()) return false; + } + return true; + } + + size_t size() const noexcept { + size_t n = 0; + for(size_t i = 0; i < MaxPriority; ++i) { + n += _wsqs[i].size(); + } + return n; + } + + constexpr size_t capacity() const { + return _wsqs[0].capacity() * MaxPriority; + } + + private: + + std::array _wsqs; +}; + } // end of namespace tf ----------------------------------------------------- diff --git a/unittests/CMakeLists.txt b/unittests/CMakeLists.txt index 5d23aaf68..ecf2067d2 100644 --- a/unittests/CMakeLists.txt +++ b/unittests/CMakeLists.txt @@ -37,6 +37,7 @@ list(APPEND TF_UNITTESTS test_data_pipelines test_fill test_generate + test_graph_priorities ) # we only do exception tests if sanitizer is not enabled diff --git a/unittests/test_graph_priorities.cpp b/unittests/test_graph_priorities.cpp new file mode 100644 index 000000000..534612a31 --- /dev/null +++ b/unittests/test_graph_priorities.cpp @@ -0,0 +1,989 @@ +#include +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN + +#include +#include +#include +#include +#include +#include + + +// ============================================================================ +// Test: Linear chain with priorities +// A(HIGH) -> B(LOW) -> C(HIGH) -> D(LOW) +// Each task must execute in dependency order regardless of priority. +// ============================================================================ +TEST_CASE("GraphPriority.LinearChain" * doctest::timeout(300)) { + tf::Executor executor(2); + tf::Taskflow taskflow; + + std::vector execution_order; + + auto A = taskflow.emplace([&]() { + execution_order.push_back(0); + }).name("A"); + + auto B = taskflow.emplace([&]() { + execution_order.push_back(1); + }).name("B"); + + auto C = taskflow.emplace([&]() { + execution_order.push_back(2); + }).name("C"); + + auto D = taskflow.emplace([&]() { + execution_order.push_back(3); + }).name("D"); + + A.precede(B); + B.precede(C); + C.precede(D); + + A.priority(tf::TaskPriority::HIGH); + B.priority(tf::TaskPriority::LOW); + C.priority(tf::TaskPriority::HIGH); + D.priority(tf::TaskPriority::LOW); + + executor.run(taskflow).wait(); + + REQUIRE(execution_order.size() == 4); + REQUIRE(execution_order[0] == 0); + REQUIRE(execution_order[1] == 1); + REQUIRE(execution_order[2] == 2); + REQUIRE(execution_order[3] == 3); +} + +// ============================================================================ +// Test: Diamond pattern with priorities +// A(HIGH) +// / \ +// B(LOW) C(HIGH) +// \ / +// D(NORMAL) +// ============================================================================ +TEST_CASE("GraphPriority.Diamond" * doctest::timeout(300)) { + tf::Executor executor(2); + tf::Taskflow taskflow; + + std::atomic counter{0}; + std::atomic b_order{-1}, c_order{-1}; + + auto A = taskflow.emplace([&]() { + counter++; + }).name("A"); + + auto B = taskflow.emplace([&]() { + b_order.store(counter.fetch_add(1)); + }).name("B"); + + auto C = taskflow.emplace([&]() { + c_order.store(counter.fetch_add(1)); + }).name("C"); + + auto D = taskflow.emplace([&]() { + counter++; + }).name("D"); + + A.precede(B, C); + B.precede(D); + C.precede(D); + + A.priority(tf::TaskPriority::HIGH); + B.priority(tf::TaskPriority::LOW); + C.priority(tf::TaskPriority::HIGH); + D.priority(tf::TaskPriority::NORMAL); + + executor.run(taskflow).wait(); + + REQUIRE(counter.load() == 4); + // C (HIGH) should execute before B (LOW) + REQUIRE(c_order.load() < b_order.load()); + REQUIRE(b_order.load() >= 0); + REQUIRE(c_order.load() >= 0); +} + +// ============================================================================ +// Test: Diamond pattern with single thread — strict ordering guaranteed +// A(HIGH) +// / \ +// B(HIGH) C(LOW) +// \ / +// D(NORMAL) +// With one worker, B and C cannot run in parallel, so B (HIGH) must +// execute before C (LOW). +// ============================================================================ +TEST_CASE("GraphPriority.DiamondSingleThread" * doctest::timeout(300)) { + tf::Executor executor(1); + tf::Taskflow taskflow; + + std::vector order; + + auto A = taskflow.emplace([&]() { + order.push_back("A"); + }).name("A"); + + auto B = taskflow.emplace([&]() { + order.push_back("B"); + }).name("B"); + + auto C = taskflow.emplace([&]() { + order.push_back("C"); + }).name("C"); + + auto D = taskflow.emplace([&]() { + order.push_back("D"); + }).name("D"); + + A.precede(B, C); + B.precede(D); + C.precede(D); + + A.priority(tf::TaskPriority::HIGH); + B.priority(tf::TaskPriority::HIGH); + C.priority(tf::TaskPriority::LOW); + D.priority(tf::TaskPriority::NORMAL); + + executor.run(taskflow).wait(); + + REQUIRE(order.size() == 4); + REQUIRE(order[0] == "A"); + + // Find positions of B and C + auto pos_B = std::find(order.begin(), order.end(), "B") - order.begin(); + auto pos_C = std::find(order.begin(), order.end(), "C") - order.begin(); + + // With a single worker, B (HIGH) must run before C (LOW) + REQUIRE(pos_B < pos_C); + + REQUIRE(order[3] == "D"); +} + +// ============================================================================ +// Test: Multiple independent diamonds, test that everything executes +// ============================================================================ +TEST_CASE("GraphPriority.MultipleDiamonds" * doctest::timeout(300)) { + tf::Executor executor(4); + tf::Taskflow taskflow; + + const int NUM_DIAMONDS = 5; + std::atomic counter{0}; + + for(int d = 0; d < NUM_DIAMONDS; d++) { + auto top = taskflow.emplace([&]() { counter++; }) + .priority(tf::TaskPriority::HIGH); + auto left = taskflow.emplace([&]() { counter++; }) + .priority(tf::TaskPriority::HIGH); + auto right = taskflow.emplace([&]() { counter++; }) + .priority(tf::TaskPriority::LOW); + auto bottom = taskflow.emplace([&]() { counter++; }) + .priority(tf::TaskPriority::NORMAL); + + top.precede(left, right); + left.precede(bottom); + right.precede(bottom); + } + + executor.run(taskflow).wait(); + + REQUIRE(counter.load() == NUM_DIAMONDS * 4); +} + +// ============================================================================ +// Test: Complex DAG with mixed priorities +// +// A(H) --> B(L) --> D(H) --> F(N) +// \ ^ +// --> C(H) ------/ +// |\ +// | --> E(H) --> G(L) +// H(L) +// ============================================================================ +TEST_CASE("GraphPriority.ComplexDAG" * doctest::timeout(300)) { + tf::Executor executor(2); + tf::Taskflow taskflow; + + // 8 tasks: A=0, B=1, C=2, D=3, E=4, F=5, G=6, H=7 + // Each task records its completion position via atomic counter. + std::atomic order{0}; + std::array positions{}; // positions[task_id] = execution position + + auto make_task = [&](int id) { + return [&, id]() { + positions[id] = order.fetch_add(1, std::memory_order_acq_rel); + }; + }; + + auto A = taskflow.emplace(make_task(0)).priority(tf::TaskPriority::HIGH).name("A"); + auto B = taskflow.emplace(make_task(1)).priority(tf::TaskPriority::LOW).name("B"); + auto C = taskflow.emplace(make_task(2)).priority(tf::TaskPriority::HIGH).name("C"); + auto D = taskflow.emplace(make_task(3)).priority(tf::TaskPriority::HIGH).name("D"); + auto E = taskflow.emplace(make_task(4)).priority(tf::TaskPriority::HIGH).name("E"); + auto F = taskflow.emplace(make_task(5)).priority(tf::TaskPriority::NORMAL).name("F"); + auto G = taskflow.emplace(make_task(6)).priority(tf::TaskPriority::LOW).name("G"); + auto H = taskflow.emplace(make_task(7)).priority(tf::TaskPriority::LOW).name("H"); + + A.precede(B, C); + B.precede(D); + C.precede(D, E, H); + D.precede(F); + E.precede(G); + + executor.run(taskflow).wait(); + + REQUIRE(order.load() == 8); + + // Verify dependency ordering using recorded positions + // pos(X) is positions[X's id] + REQUIRE(positions[0] < positions[1]); // A < B + REQUIRE(positions[0] < positions[2]); // A < C + REQUIRE(positions[1] < positions[3]); // B < D + REQUIRE(positions[2] < positions[3]); // C < D + REQUIRE(positions[2] < positions[4]); // C < E + REQUIRE(positions[2] < positions[7]); // C < H + REQUIRE(positions[3] < positions[5]); // D < F + REQUIRE(positions[4] < positions[6]); // E < G +} + +// ============================================================================ +// Test: Queue ordering verification +// With single thread, tasks should execute in priority order when all +// are ready simultaneously. +// ============================================================================ +TEST_CASE("GraphPriority.QueueOrdering" * doctest::timeout(300)) { + tf::Executor executor(1); + tf::Taskflow taskflow; + + std::vector execution_order; + + auto start = taskflow.emplace([](){}); + + // Create tasks with known priorities, all depending on start + auto low1 = taskflow.emplace([&]() { + execution_order.emplace_back(0); + }).priority(tf::TaskPriority::LOW); + + auto high1 = taskflow.emplace([&]() { + execution_order.emplace_back(1); + }).priority(tf::TaskPriority::HIGH); + + auto normal1 = taskflow.emplace([&]() { + execution_order.emplace_back(2); + }).priority(tf::TaskPriority::NORMAL); + + auto high2 = taskflow.emplace([&]() { + execution_order.emplace_back(3); + }).priority(tf::TaskPriority::HIGH); + + auto low2 = taskflow.emplace([&]() { + execution_order.emplace_back(4); + }).priority(tf::TaskPriority::LOW); + + start.precede(low1, high1, normal1, high2, low2); + + executor.run(taskflow).wait(); + + REQUIRE(execution_order.size() == 5); + + // With single worker, HIGH tasks should come first + std::vector high_tasks, normal_tasks, low_tasks; + for(int id : execution_order) { + if(id == 1 || id == 3) high_tasks.push_back(id); + else if(id == 2) normal_tasks.push_back(id); + else low_tasks.push_back(id); + } + + // Verify all HIGH before NORMAL before LOW (single worker = deterministic) + if(!high_tasks.empty() && !normal_tasks.empty()) { + // Find positions + size_t last_high = 0, first_normal = execution_order.size(); + for(size_t i = 0; i < execution_order.size(); i++) { + if(execution_order[i] == 1 || execution_order[i] == 3) last_high = i; + if((execution_order[i] == 2) && i < first_normal) first_normal = i; + } + REQUIRE(last_high < first_normal); + } +} + +// ============================================================================ +// Test: Wide graph with many tasks at each priority level +// ============================================================================ +TEST_CASE("GraphPriority.WideGraph" * doctest::timeout(300)) { + tf::Executor executor(1); + tf::Taskflow taskflow; + + const int TASKS_PER_PRIORITY = 100; + const int TOTAL_TASKS = TASKS_PER_PRIORITY * 3; + std::atomic high_count{0}, normal_count{0}, low_count{0}; + std::atomic order{0}; + std::array positions{}; // positions[pos] = priority value + + auto start = taskflow.emplace([](){}); + auto end = taskflow.emplace([](){}); + + for(int i = 0; i < TASKS_PER_PRIORITY; i++) { + auto h = taskflow.emplace([&]() { + high_count++; + int pos = order.fetch_add(1); + positions[pos] = static_cast(tf::TaskPriority::HIGH); + }).priority(tf::TaskPriority::HIGH); + auto n = taskflow.emplace([&]() { + normal_count++; + int pos = order.fetch_add(1); + positions[pos] = static_cast(tf::TaskPriority::NORMAL); + }).priority(tf::TaskPriority::NORMAL); + auto l = taskflow.emplace([&]() { + low_count++; + int pos = order.fetch_add(1); + positions[pos] = static_cast(tf::TaskPriority::LOW); + }).priority(tf::TaskPriority::LOW); + start.precede(h, n, l); + h.precede(end); + n.precede(end); + l.precede(end); + } + + executor.run(taskflow).wait(); + + REQUIRE(high_count.load() == TASKS_PER_PRIORITY); + REQUIRE(normal_count.load() == TASKS_PER_PRIORITY); + REQUIRE(low_count.load() == TASKS_PER_PRIORITY); + REQUIRE(order.load() == TOTAL_TASKS); + + // Compute average execution index per priority level. + // Lower average index = executed earlier on average. + double high_sum = 0, normal_sum = 0, low_sum = 0; + for(int i = 0; i < TOTAL_TASKS; i++) { + if (positions[i] == static_cast(tf::TaskPriority::HIGH)) high_sum += i; + else if(positions[i] == static_cast(tf::TaskPriority::NORMAL)) normal_sum += i; + else low_sum += i; + } + + double avg_high = high_sum / TASKS_PER_PRIORITY; + double avg_normal = normal_sum / TASKS_PER_PRIORITY; + double avg_low = low_sum / TASKS_PER_PRIORITY; + + REQUIRE(avg_high < avg_normal); + REQUIRE(avg_normal < avg_low); +} + +// ============================================================================ +// Test: Layered graph - tasks in layers, each layer depends on the previous +// ============================================================================ +TEST_CASE("GraphPriority.LayeredGraph" * doctest::timeout(300)) { + tf::Executor executor(1); + tf::Taskflow taskflow; + + const int NUM_LAYERS = 3; + const int TASKS_PER_LAYER = 60; + std::atomic total_count{0}; + + // Per-layer atomic counter assigns positions via fetch_add — strict + // arrival order with zero lock contention, giving the most faithful + // recording of task completion order. + struct LayerLog { + std::atomic counter{0}; + std::array priorities{}; // filled by fetch_add position + }; + std::vector layer_logs(NUM_LAYERS); + std::vector> layers(NUM_LAYERS); + + for(int layer = 0; layer < NUM_LAYERS; layer++) { + for(int i = 0; i < TASKS_PER_LAYER; i++) { + tf::TaskPriority prio; + if(i % 3 == 0) prio = tf::TaskPriority::HIGH; + else if(i % 3 == 1) prio = tf::TaskPriority::NORMAL; + else prio = tf::TaskPriority::LOW; + + auto t = taskflow.emplace([&, layer, prio]() { + // Busy-wait so task duration >> steal overhead, ensuring workers + // distribute properly across priority levels before any one + // worker can lap the others. + volatile int x = 0; + for(int j = 0; j < 1000000; j++) x += j; + total_count++; + int pos = layer_logs[layer].counter.fetch_add(1, std::memory_order_acq_rel); + layer_logs[layer].priorities[pos] = static_cast(prio); + }).priority(prio); + layers[layer].push_back(t); + + if(layer > 0) { + for(auto& prev : layers[layer-1]) { + prev.precede(t); + } + } + } + } + + executor.run(taskflow).wait(); + + REQUIRE(total_count.load() == NUM_LAYERS * TASKS_PER_LAYER); + + // For each layer all tasks become ready simultaneously (previous layer done), + // so within each layer HIGH should execute before NORMAL before LOW on average. + for(int layer = 0; layer < NUM_LAYERS; layer++) { + REQUIRE(layer_logs[layer].counter.load() == TASKS_PER_LAYER); + + double high_sum = 0, normal_sum = 0, low_sum = 0; + int high_n = 0, normal_n = 0, low_n = 0; + for(int i = 0; i < TASKS_PER_LAYER; i++) { + int p = layer_logs[layer].priorities[i]; + if (p == static_cast(tf::TaskPriority::HIGH)) { high_sum += i; high_n++; } + else if(p == static_cast(tf::TaskPriority::NORMAL)) { normal_sum += i; normal_n++; } + else { low_sum += i; low_n++; } + } + + double avg_high = high_sum / high_n; + double avg_normal = normal_sum / normal_n; + double avg_low = low_sum / low_n; + + REQUIRE(avg_high < avg_normal); + REQUIRE(avg_normal < avg_low); + } +} + + +// ============================================================================ +// Test: Subflow with priorities +// ============================================================================ +TEST_CASE("GraphPriority.Subflow" * doctest::timeout(300)) { + tf::Executor executor(1); + tf::Taskflow taskflow; + + std::atomic counter{0}; + std::atomic order{0}; + int s1_pos{-1}, s2_pos{-1}, s3_pos{-1}; + + auto A = taskflow.emplace([&](tf::Subflow& sf) { + sf.emplace([&]() { + counter++; + s1_pos = order.fetch_add(1); + }).priority(tf::TaskPriority::HIGH); + sf.emplace([&]() { + counter++; + s2_pos = order.fetch_add(1); + }).priority(tf::TaskPriority::LOW); + sf.emplace([&]() { + counter++; + s3_pos = order.fetch_add(1); + }).priority(tf::TaskPriority::HIGH); + }).priority(tf::TaskPriority::HIGH); + + auto B = taskflow.emplace([&]() { + counter++; + }).priority(tf::TaskPriority::LOW); + + A.precede(B); + + executor.run(taskflow).wait(); + + REQUIRE(counter.load() == 4); + + // s1(HIGH) and s3(HIGH) should both execute before s2(LOW) + REQUIRE(s1_pos < s2_pos); + REQUIRE(s3_pos < s2_pos); +} + +// ============================================================================ +// Test: Multiple runs of same taskflow +// ============================================================================ +TEST_CASE("GraphPriority.MultipleRuns" * doctest::timeout(300)) { + tf::Executor executor(2); + tf::Taskflow taskflow; + + std::atomic counter{0}; + + auto start = taskflow.emplace([](){}); + for(int i = 0; i < 10; i++) { + auto t = taskflow.emplace([&]() { counter++; }) + .priority(static_cast(i % 3)); + start.precede(t); + } + + for(int run = 0; run < 5; run++) { + executor.run(taskflow).wait(); + } + + REQUIRE(counter.load() == 50); +} + +// ============================================================================ +// Test: Dynamic priority change between runs +// ============================================================================ +TEST_CASE("GraphPriority.DynamicChange" * doctest::timeout(300)) { + tf::Executor executor(1); + tf::Taskflow taskflow; + + std::vector order; + + auto start = taskflow.emplace([](){}); + + auto t0 = taskflow.emplace([&]() { order.emplace_back(0); }); + auto t1 = taskflow.emplace([&]() { order.emplace_back(1); }); + + start.precede(t0, t1); + + // First run: t0=HIGH, t1=LOW + t0.priority(tf::TaskPriority::HIGH); + t1.priority(tf::TaskPriority::LOW); + executor.run(taskflow).wait(); + + REQUIRE(order.size() == 2); + REQUIRE(order[0] == 0); // HIGH first + REQUIRE(order[1] == 1); + + order.clear(); + + // Second run: t0=LOW, t1=HIGH + t0.priority(tf::TaskPriority::LOW); + t1.priority(tf::TaskPriority::HIGH); + executor.run(taskflow).wait(); + + REQUIRE(order.size() == 2); + REQUIRE(order[0] == 1); // HIGH first (t1 now) + REQUIRE(order[1] == 0); +} + +// ============================================================================ +// Test: All same priority +// ============================================================================ +TEST_CASE("GraphPriority.AllSamePriority" * doctest::timeout(300)) { + tf::Executor executor(4); + tf::Taskflow taskflow; + + std::atomic counter{0}; + const int N = 50; + + auto start = taskflow.emplace([](){}); + auto end = taskflow.emplace([](){}); + + for(int i = 0; i < N; i++) { + auto t = taskflow.emplace([&]() { counter++; }) + .priority(tf::TaskPriority::NORMAL); + start.precede(t); + t.precede(end); + } + + executor.run(taskflow).wait(); + + REQUIRE(counter.load() == N); +} + +// ============================================================================ +// Test: Dynamic fan-out (subflow creates many prioritized children) +// ============================================================================ +TEST_CASE("GraphPriority.DynamicFanout" * doctest::timeout(300)) { + tf::Executor executor(4); + tf::Taskflow taskflow; + + const int FAN_WIDTH = 200; + std::atomic high_count{0}, low_count{0}; + + // Atomic counter assigns completion positions with strict arrival order. + std::atomic order{0}; + std::array positions{}; // priority value at each position + + taskflow.emplace([&](tf::Subflow& sf) { + for(int i = 0; i < FAN_WIDTH; i++) { + if(i % 2 == 0) { + sf.emplace([&]() { + volatile int x = 0; + for(int j = 0; j < 10000; j++) x += j; + high_count++; + int pos = order.fetch_add(1, std::memory_order_acq_rel); + positions[pos] = static_cast(tf::TaskPriority::HIGH); + }).priority(tf::TaskPriority::HIGH); + } else { + sf.emplace([&]() { + volatile int x = 0; + for(int j = 0; j < 10000; j++) x += j; + low_count++; + int pos = order.fetch_add(1, std::memory_order_acq_rel); + positions[pos] = static_cast(tf::TaskPriority::LOW); + }).priority(tf::TaskPriority::LOW); + } + } + }); + + executor.run(taskflow).wait(); + + REQUIRE(high_count.load() == FAN_WIDTH / 2); + REQUIRE(low_count.load() == FAN_WIDTH / 2); + + // Verify HIGH tasks execute before LOW on average. + double high_sum = 0, low_sum = 0; + for(int i = 0; i < FAN_WIDTH; i++) { + if(positions[i] == static_cast(tf::TaskPriority::HIGH)) high_sum += i; + else low_sum += i; + } + + double avg_high = high_sum / (FAN_WIDTH / 2); + double avg_low = low_sum / (FAN_WIDTH / 2); + + REQUIRE(avg_high < avg_low); + REQUIRE(avg_low - avg_high >= 60); +} + +// ============================================================================ +// Test: Cascading Burst — producers spawn mixed-priority work. +// +// trigger → [prod0, prod1, prod2, prod3] → [L2 tasks] → sink +// +// Each producer releases a burst of HIGH, NORMAL, and LOW tasks. Tests that +// the scheduler correctly prioritizes HIGH over NORMAL over LOW when work +// arrives in staggered bursts from multiple producers. +// ============================================================================ +TEST_CASE("GraphPriority.CascadingBurst" * doctest::timeout(300)) { + const int NUM_WORKERS = 4; + const int NUM_PRODUCERS = 4; + const int TASKS_PER_PRODUCER = 30; // 10 HIGH + 10 NORMAL + 10 LOW each + const int NUM_L2_TASKS = NUM_PRODUCERS * TASKS_PER_PRODUCER; + + tf::Executor executor(NUM_WORKERS); + tf::Taskflow taskflow; + + std::atomic order{0}; + std::array positions{}; + + auto trigger = taskflow.emplace([](){}); + auto sink = taskflow.emplace([](){}); + + std::vector producers; + for(int p = 0; p < NUM_PRODUCERS; p++) { + auto prod = taskflow.emplace([&]() { + volatile int x = 0; + for(int i = 0; i < 1000; i++) x += i; + }).priority(tf::TaskPriority::NORMAL); + trigger.precede(prod); + producers.push_back(prod); + } + + for(int p = 0; p < NUM_PRODUCERS; p++) { + for(int i = 0; i < TASKS_PER_PRODUCER; i++) { + tf::TaskPriority prio; + if(i % 3 == 0) prio = tf::TaskPriority::HIGH; + else if(i % 3 == 1) prio = tf::TaskPriority::NORMAL; + else prio = tf::TaskPriority::LOW; + + auto t = taskflow.emplace([&, prio]() { + volatile int x = 0; + for(int j = 0; j < 10000; j++) x += j; + int pos = order.fetch_add(1, std::memory_order_acq_rel); + positions[pos] = static_cast(prio); + }).priority(prio); + + producers[p].precede(t); + t.precede(sink); + } + } + + executor.run(taskflow).wait(); + + REQUIRE(order.load() == NUM_L2_TASKS); + + // Verify HIGH < NORMAL < LOW on average. + double high_sum = 0, normal_sum = 0, low_sum = 0; + int high_n = 0, normal_n = 0, low_n = 0; + for(int i = 0; i < NUM_L2_TASKS; i++) { + if (positions[i] == static_cast(tf::TaskPriority::HIGH)) { high_sum += i; high_n++; } + else if(positions[i] == static_cast(tf::TaskPriority::NORMAL)) { normal_sum += i; normal_n++; } + else { low_sum += i; low_n++; } + } + + REQUIRE(high_n > 0); + REQUIRE(normal_n > 0); + REQUIRE(low_n > 0); + REQUIRE(high_n + normal_n + low_n == NUM_L2_TASKS); + + double avg_high = high_sum / high_n; + double avg_normal = normal_sum / normal_n; + double avg_low = low_sum / low_n; + + REQUIRE(avg_high < avg_normal); + REQUIRE(avg_normal < avg_low); + REQUIRE(avg_normal - avg_high >= 8); + REQUIRE(avg_low - avg_normal >= 8); +} + +// ============================================================================ +// Test: Continuation Cache Race +// +// start → [hub0..hub15] → each hub fans out to HIGH, NORMAL, LOW → end +// +// When a hub completes, _prio_update_cache processes its 3 successors. +// The cache should pick HIGH for inline execution and schedule NORMAL/LOW. +// With many hubs completing across workers, this tests that the cache +// correctly picks the highest priority under concurrent pressure. +// ============================================================================ +TEST_CASE("GraphPriority.ContinuationCacheRace" * doctest::timeout(300)) { + const int NUM_HUBS = 16; + const int NUM_WORKERS = 4; + const int NUM_TASKS = NUM_HUBS * 3; // 3 tasks per hub (HIGH, NORMAL, LOW) + + tf::Executor executor(NUM_WORKERS); + tf::Taskflow taskflow; + + std::atomic order{0}; + std::array positions{}; + + auto start = taskflow.emplace([](){}); + auto end = taskflow.emplace([](){}); + + for(int h = 0; h < NUM_HUBS; h++) { + auto hub = taskflow.emplace([](){}).priority(tf::TaskPriority::NORMAL); + start.precede(hub); + + auto high = taskflow.emplace([&]() { + volatile int x = 0; + for(int j = 0; j < 10000; j++) x += j; + int pos = order.fetch_add(1, std::memory_order_acq_rel); + positions[pos] = static_cast(tf::TaskPriority::HIGH); + }).priority(tf::TaskPriority::HIGH); + + auto normal = taskflow.emplace([&]() { + volatile int x = 0; + for(int j = 0; j < 10000; j++) x += j; + int pos = order.fetch_add(1, std::memory_order_acq_rel); + positions[pos] = static_cast(tf::TaskPriority::NORMAL); + }).priority(tf::TaskPriority::NORMAL); + + auto low = taskflow.emplace([&]() { + volatile int x = 0; + for(int j = 0; j < 10000; j++) x += j; + int pos = order.fetch_add(1, std::memory_order_acq_rel); + positions[pos] = static_cast(tf::TaskPriority::LOW); + }).priority(tf::TaskPriority::LOW); + + hub.precede(high, normal, low); + high.precede(end); + normal.precede(end); + low.precede(end); + } + + executor.run(taskflow).wait(); + + REQUIRE(order.load() == NUM_TASKS); + + // Verify HIGH < NORMAL < LOW on average. + double high_sum = 0, normal_sum = 0, low_sum = 0; + int high_n = 0, normal_n = 0, low_n = 0; + for(int i = 0; i < NUM_TASKS; i++) { + if (positions[i] == static_cast(tf::TaskPriority::HIGH)) { high_sum += i; high_n++; } + else if(positions[i] == static_cast(tf::TaskPriority::NORMAL)) { normal_sum += i; normal_n++; } + else { low_sum += i; low_n++; } + } + + REQUIRE(high_n == NUM_HUBS); + REQUIRE(normal_n == NUM_HUBS); + REQUIRE(low_n == NUM_HUBS); + + double avg_high = high_sum / high_n; + double avg_normal = normal_sum / normal_n; + double avg_low = low_sum / low_n; + + REQUIRE(avg_high < avg_normal); + REQUIRE(avg_normal < avg_low); +} + +// ============================================================================ +// Test: Concurrent prioritized topologies +// ============================================================================ +TEST_CASE("GraphPriority.ConcurrentPrioritizedRuns" * doctest::timeout(300)) { + tf::Executor executor(4); + + std::atomic counter_a{0}; + std::atomic counter_b{0}; + + tf::Taskflow tf_a; + tf::Taskflow tf_b; + + auto a_start = tf_a.emplace([](){}); + for(int i = 0; i < 20; i++) { + auto task = tf_a.emplace([&]() { + counter_a++; + }).priority(i < 10 ? tf::TaskPriority::HIGH : tf::TaskPriority::LOW); + a_start.precede(task); + } + + auto b_start = tf_b.emplace([](){}); + for(int i = 0; i < 20; i++) { + auto task = tf_b.emplace([&]() { + counter_b++; + }).priority(i < 10 ? tf::TaskPriority::HIGH : tf::TaskPriority::LOW); + b_start.precede(task); + } + + auto f1 = executor.run(tf_a); + auto f2 = executor.run(tf_b); + + f1.wait(); + f2.wait(); + + REQUIRE(counter_a == 20); + REQUIRE(counter_b == 20); +} + +// ============================================================================ +// Test: Mixed run and regular run on the same executor +// +// Submits a prioritized taskflow and a regular taskflow concurrently to the +// same executor. Verifies: +// 1. All tasks from both taskflows complete. +// 2. The prioritized taskflow still respects priority ordering (HIGH before +// LOW on average) even while sharing workers with the regular run. +// 3. Neither run interferes with the other's correctness. +// ============================================================================ +TEST_CASE("GraphPriority.MixedPrioritizedAndRegularRun" * doctest::timeout(300)) { + + tf::Executor executor(4); + + // --- Prioritized taskflow: 200 tasks (100 HIGH, 100 LOW) behind a gate --- + tf::Taskflow prio_tf; + const int PRIO_TASKS = 200; + + std::atomic prio_order{0}; + std::array prio_positions{}; + + auto prio_start = prio_tf.emplace([](){}); + auto prio_end = prio_tf.emplace([](){}); + + for(int i = 0; i < PRIO_TASKS; i++) { + tf::TaskPriority p = (i < PRIO_TASKS / 2) ? tf::TaskPriority::HIGH + : tf::TaskPriority::LOW; + auto t = prio_tf.emplace([&, p]() { + // Small busy-wait to spread work across workers + volatile int x = 0; + for(int j = 0; j < 5000; j++) x += j; + int pos = prio_order.fetch_add(1, std::memory_order_acq_rel); + prio_positions[pos] = static_cast(p); + }).priority(p); + prio_start.precede(t); + t.precede(prio_end); + } + + // --- Regular taskflow: 200 tasks, no priorities --- + tf::Taskflow regular_tf; + const int REG_TASKS = 200; + std::atomic reg_counter{0}; + + auto reg_start = regular_tf.emplace([](){}); + auto reg_end = regular_tf.emplace([](){}); + + for(int i = 0; i < REG_TASKS; i++) { + auto t = regular_tf.emplace([&]() { + volatile int x = 0; + for(int j = 0; j < 5000; j++) x += j; + reg_counter++; + }); + reg_start.precede(t); + t.precede(reg_end); + } + + // Submit both concurrently to the same executor + auto f_prio = executor.run(prio_tf); + auto f_reg = executor.run(regular_tf); + + f_prio.wait(); + f_reg.wait(); + + // 1. All tasks completed + REQUIRE(prio_order.load() == PRIO_TASKS); + REQUIRE(reg_counter.load() == REG_TASKS); + + // 2. Priority ordering: HIGH tasks execute before LOW on average + double high_sum = 0, low_sum = 0; + int high_n = 0, low_n = 0; + for(int i = 0; i < PRIO_TASKS; i++) { + if(prio_positions[i] == static_cast(tf::TaskPriority::HIGH)) { + high_sum += i; high_n++; + } else { + low_sum += i; low_n++; + } + } + + REQUIRE(high_n == PRIO_TASKS / 2); + REQUIRE(low_n == PRIO_TASKS / 2); + + double avg_high = high_sum / high_n; + double avg_low = low_sum / low_n; + + REQUIRE(avg_high < avg_low); + REQUIRE(avg_low - avg_high >= 20); // With 4 workers, should be a significant gap +} + +// ============================================================================ +// Test: Sequential mixed runs — alternating prioritized and regular runs +// +// Runs the same taskflow alternately with run and run, verifying +// that both paths produce correct results and that switching between them +// does not leave stale priority state in the executor. +// ============================================================================ +TEST_CASE("GraphPriority.AlternatingPrioritizedAndRegularRuns" * doctest::timeout(300)) { + + tf::Executor executor(2); + tf::Taskflow taskflow; + + std::atomic counter{0}; + + auto start = taskflow.emplace([](){}); + for(int i = 0; i < 20; i++) { + auto t = taskflow.emplace([&]() { counter++; }) + .priority(i < 10 ? tf::TaskPriority::HIGH : tf::TaskPriority::LOW); + start.precede(t); + } + + // Alternate between prioritized and regular runs + for(int round = 0; round < 6; round++) { + if(round % 2 == 0) { + executor.run(taskflow).wait(); + } else { + executor.run(taskflow).wait(); + } + } + + // 20 tasks x 6 runs = 120 + REQUIRE(counter.load() == 120); +} + +// ============================================================================ +// Test: Multiple concurrent mixed runs — several prioritized and regular +// taskflows submitted at once, verifying all complete correctly. +// ============================================================================ +TEST_CASE("GraphPriority.ManyConcurrentMixedRuns" * doctest::timeout(300)) { + + tf::Executor executor(4); + + const int NUM_FLOWS = 6; // 3 prioritized + 3 regular + const int TASKS_PER_FLOW = 50; + + std::array, NUM_FLOWS> counters{}; + std::array flows; + + for(int f = 0; f < NUM_FLOWS; f++) { + auto gate = flows[f].emplace([](){}); + for(int i = 0; i < TASKS_PER_FLOW; i++) { + auto t = flows[f].emplace([&counters, f]() { + counters[f]++; + }).priority(static_cast(i % 3)); + gate.precede(t); + } + } + + // Submit: even indices via run, odd via run + std::array, NUM_FLOWS> futures; + for(int f = 0; f < NUM_FLOWS; f++) { + if(f % 2 == 0) { + futures[f] = executor.run(flows[f]); + } else { + futures[f] = executor.run(flows[f]); + } + } + + for(auto& fut : futures) { + fut.wait(); + } + + for(int f = 0; f < NUM_FLOWS; f++) { + REQUIRE(counters[f].load() == TASKS_PER_FLOW); + } +}