Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions taskflow/core/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ struct NSTATE {
constexpr static underlying_type RETAIN_SUBFLOW = 0x40000000;
constexpr static underlying_type JOINED_SUBFLOW = 0x80000000;

// mask to isolate state bits - non-state bits store # weak dependents
constexpr static underlying_type STRONG_DEPENDENCIES_MASK = 0x0FFFFFFF;
// priority encoding (2 bits): 00=UNSET, 01=HIGH, 10=NORMAL, 11=LOW
constexpr static underlying_type PRIORITY_SHIFT = 26;
constexpr static underlying_type PRIORITY_MASK = 0x0C000000;

// mask to isolate state bits - non-state bits store # strong dependents
constexpr static underlying_type STRONG_DEPENDENCIES_MASK = 0x03FFFFFF;
};

using nstate_t = NSTATE::underlying_type;
Expand Down
74 changes: 43 additions & 31 deletions taskflow/core/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ requires (!std::same_as<std::decay_t<I>, AsyncTask>)
struct Buffer {
std::mutex mutex;
UnboundedWSQ<Node*> queue;
};
};

std::vector<Worker> _workers;
std::vector<Buffer> _buffers;
Expand Down Expand Up @@ -1366,7 +1366,7 @@ inline void Executor::_spawn(size_t N, std::shared_ptr<WorkerInterface> wif) {

// Function: _explore_task
inline bool Executor::_explore_task(Worker& w, Node*& t) {

// Fast path: if no topologies are live, all queues are guaranteed empty
// by the executor's invariant (num_topologies reaches zero only after all
// nodes have been scheduled and their queues flushed). Skip the entire
Expand All @@ -1376,45 +1376,45 @@ inline bool Executor::_explore_task(Worker& w, Node*& t) {
if(_num_topologies.load(std::memory_order_relaxed) == 0) {
return true;
}

const size_t MAX_VICTIM = num_queues(); // guaranteed >= 2 by constructor
const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);

// local aliases for steal protocol sentinels — these are properties of the
// steal protocol, not of any specific queue type
size_t num_steals = 0;
size_t vtm = w._sticky_victim;

while(true) {

t = (vtm < _workers.size())
? _workers[vtm]._wsq.steal()
: _buffers[vtm - _workers.size()].queue.steal();

if(t) {
w._sticky_victim = vtm;
break;
}

// EMPTY: pick a new victim excluding self since our own queue is likely empty.
// map [0, MAX_VICTIM-1) over [0, MAX_VICTIM) \ {w._id} — always safe since MAX_VICTIM >= 2.
vtm = w._rdgen() % (MAX_VICTIM - 1);
if(vtm >= w._id) vtm++;

if(++num_steals > MAX_STEALS) {
std::this_thread::yield();
if(num_steals > 150 + MAX_STEALS) {
break;
}
}

if(w._done.test(std::memory_order_relaxed)) {
return false;
}
}

return true;
}
}

/*
// Function: _explore_task
Expand Down Expand Up @@ -1487,7 +1487,7 @@ inline bool Executor::_wait_for_task(Worker& w, Node*& t) {
if(_explore_task(w, t) == false) {
return false;
}

// Go exploit the task if we successfully steal one.
if(t) {
return true;
Expand All @@ -1510,7 +1510,7 @@ inline bool Executor::_wait_for_task(Worker& w, Node*& t) {
_notifier.commit_wait(w._id);
goto explore_task;
}

// Condition #1: buffers should be empty
for(size_t b=0; b<_buffers.size(); ++b) {
if(!_buffers[b].queue.empty()) {
Expand All @@ -1519,7 +1519,7 @@ inline bool Executor::_wait_for_task(Worker& w, Node*& t) {
goto explore_task;
}
}

// Condition #2: worker queues should be empty
// Note: We need to use index-based looping to avoid data race with _spawn
// which initializes other worker data structure at the same time.
Expand All @@ -1532,13 +1532,13 @@ inline bool Executor::_wait_for_task(Worker& w, Node*& t) {
goto explore_task;
}
}

// Condition #3: worker should be alive
if(w._done.test(std::memory_order_relaxed)) {
_notifier.cancel_wait(w._id);
return false;
}

// Now I really need to relinquish myself to others.
_notifier.commit_wait(w._id);
goto explore_task;
Expand Down Expand Up @@ -1650,13 +1650,19 @@ void Executor::_bulk_schedule(Worker& worker, I first, size_t num_nodes) {
return;
}

// NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).
// This is because when a node v is inserted into the queue, v can run and finish
// immediately. If v is the last node in the graph, it will tear down the parent task vector
// which cause the last ++first to fail. This problem is specific to MSVC which has a stricter
// iterator implementation in std::vector than GCC/Clang.
if(auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) {
_bulk_spill(first, num_nodes - n);
auto [n, uniform] = worker._wsq.try_bulk_push(first, num_nodes);
if(n != num_nodes) {
if(uniform) {
for(size_t i = n; i < num_nodes; ++i) {
_spill(first[i]);
}
} else {
for(size_t i = 0; i < num_nodes; ++i) {
if(!worker._wsq.try_push(first[i])) {
_spill(first[i]);
}
}
}
}
_notifier.notify_n(num_nodes);

Expand Down Expand Up @@ -1687,9 +1693,15 @@ inline void Executor::_bulk_schedule(I first, size_t num_nodes) {
// Function: _update_cache
TF_FORCE_INLINE void Executor::_update_cache(Worker& worker, Node*& cache, Node* node) {
if(cache) {
_schedule(worker, cache);
if(node->_priority_queue() < cache->_priority_queue()) {
_schedule(worker, cache);
cache = node;
} else {
_schedule(worker, node);
}
} else {
cache = node;
}
cache = node;
}

// Function: _bulk_update_cache
Expand All @@ -1707,7 +1719,7 @@ TF_FORCE_INLINE void Executor::_bulk_update_cache(
}
cache = node;
}

// Procedure: _invoke
inline void Executor::_invoke(Worker& worker, Node* node) {

Expand Down Expand Up @@ -1743,7 +1755,7 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
return;
}
}

invoke_task:

SmallVector<int> conds;
Expand Down Expand Up @@ -2438,7 +2450,7 @@ inline size_t Executor::_set_up_graph(Graph& graph, Topology* tpg, NodeBase* par
auto node = *first;
node->_topology = tpg;
node->_parent = parent;
node->_nstate = NSTATE::NONE;
node->_nstate &= NSTATE::PRIORITY_MASK;
node->_estate.store(ESTATE::NONE, std::memory_order_relaxed);
node->_set_up_join_counter();
node->_exception_ptr = nullptr;
Expand Down Expand Up @@ -2482,7 +2494,7 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*&
tpg = f._topologies.front().get();

lock.unlock();

// Soon after we carry out the promise, the associate taskflow may got destroyed
// from the user side, and we should never tough it again.
fetched_tpg->_carry_out_promise();
Expand All @@ -2501,7 +2513,7 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*&
f._topologies.pop();

lock.unlock();

// Soon after we carry out the promise, the associate taskflow may got destroyed
// from the user side, and we should never tough it again.
fetched_tpg->_carry_out_promise();
Expand Down
47 changes: 42 additions & 5 deletions taskflow/core/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,30 @@ class Graph {
Node* _emplace_back(ArgsT&&...);
};

// ----------------------------------------------------------------------------
// TaskPriority
// ----------------------------------------------------------------------------

/**
@enum TaskPriority

@brief enumeration of all task priority levels

A priority level determines the order in which a task is picked up by a worker
when tasks are ready to execute. Tasks with higher priority (lower numerical value)
are executed before tasks with lower priority (higher numerical value).
*/
enum class TaskPriority : unsigned {
/** @brief the highest task priority level (most urgent) */
HIGH = 0,
/** @brief the normal task priority level */
NORMAL = 1,
/** @brief the lowest task priority level (least urgent) */
LOW = 2,
};

/** @brief total number of task priority levels */

// ----------------------------------------------------------------------------
// TaskParams
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -238,24 +262,37 @@ class NodeBase {
friend class TaskGroup;
friend class Algorithm;

public:

size_t _priority_queue() const {
return static_cast<size_t>((_nstate & NSTATE::PRIORITY_MASK) >> NSTATE::PRIORITY_SHIFT);
}

protected:

nstate_t _nstate {NSTATE::NONE};
std::atomic<estate_t> _estate {ESTATE::NONE};

NodeBase* _parent {nullptr};
std::atomic<size_t> _join_counter {0};

std::exception_ptr _exception_ptr {nullptr};

NodeBase() = default;

NodeBase(nstate_t nstate, estate_t estate, NodeBase* parent, size_t join_counter) :
_nstate {nstate},
_nstate {nstate},
_estate {estate},
_parent {parent},
_join_counter {join_counter} {
}

// encode TaskPriority (HIGH=0,NORMAL=1,LOW=2) as +1 into bits 26-27 of _nstate,
// reserving 00 for UNSET (no priority explicitly assigned)
void _set_priority(TaskPriority p) {
nstate_t encoded = (static_cast<nstate_t>(p) + 1) << NSTATE::PRIORITY_SHIFT;
_nstate = (_nstate & ~NSTATE::PRIORITY_MASK) | encoded;
}

void _rethrow_exception() {
if(_exception_ptr) {
Expand Down Expand Up @@ -300,7 +337,7 @@ class Topology : public NodeBase {

std::function<bool()> _predicate;
std::function<void()> _on_finish;

void _carry_out_promise();
};

Expand Down Expand Up @@ -513,7 +550,7 @@ class Node : public NodeBase {
std::string _name;

void* _data {nullptr};

Topology* _topology {nullptr};

size_t _num_successors {0};
Expand Down
53 changes: 53 additions & 0 deletions taskflow/core/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,43 @@ class Task {

*/
Task& data(void* data);

/**
@brief assigns a priority level to the task

@param p the priority level (tf::TaskPriority::HIGH, tf::TaskPriority::NORMAL, or tf::TaskPriority::LOW)
@return @c *this

A task with a higher priority (lower numerical value) will be executed
before tasks with lower priority (higher numerical value) when they are
both ready to run.

@code{.cpp}
tf::Taskflow taskflow;
auto [A, B, C] = taskflow.emplace(
[](){},
[](){},
[](){}
);
A.priority(tf::TaskPriority::HIGH);
B.priority(tf::TaskPriority::NORMAL);
C.priority(tf::TaskPriority::LOW);
@endcode
*/
Task& priority(TaskPriority p);

/**
@brief queries the priority level of the task

@return the priority level of the task

@code{.cpp}
tf::Task task = taskflow.emplace([](){});
task.priority(tf::TaskPriority::HIGH);
assert(task.priority() == tf::TaskPriority::HIGH);
@endcode
*/
TaskPriority priority() const;

/**
@brief resets the task handle to null
Expand Down Expand Up @@ -1522,6 +1559,22 @@ inline Task& Task::data(void* data) {
return *this;
}

// Function: priority
inline TaskPriority Task::priority() const {
auto bits = _node->_priority_queue();
// 0=UNSET→HIGH, 1=HIGH, 2=NORMAL, 3=LOW
constexpr TaskPriority map[] = {
TaskPriority::HIGH, TaskPriority::HIGH, TaskPriority::NORMAL, TaskPriority::LOW
};
return map[bits];
}

// Function: priority
inline Task& Task::priority(TaskPriority p) {
_node->_set_priority(p);
return *this;
}

// ----------------------------------------------------------------------------
// global ostream
// ----------------------------------------------------------------------------
Expand Down
8 changes: 8 additions & 0 deletions taskflow/core/taskflow.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,14 @@ inline void Taskflow::_dump(
os << ind << 'p' << node << "[label=\"";
if(node->_name.empty()) os << 'p' << node;
else os << node->_name;

switch (node->_priority_queue()) {
case 0: break; // UNSET — no priority label
case 1: os << "\\npriority: HIGH"; break;
case 2: os << "\\npriority: NORMAL"; break;
case 3: os << "\\npriority: LOW"; break;
default: break;
}
os << "\" ";

// shape of the node
Expand Down
Loading