12 #include <type_traits>
15 #include "onnxruntime_config.h"
20 #pragma GCC diagnostic push
21 #pragma GCC diagnostic ignored "-Wunused-parameter"
22 #pragma GCC diagnostic ignored "-Wunused-result"
27 #ifdef HAS_CLASS_MEMACCESS
28 #pragma GCC diagnostic ignored "-Wclass-memaccess"
34 #ifdef HAS_SHORTEN_64_TO_32
35 #pragma GCC diagnostic ignored "-Wshorten-64-to-32"
37 #elif defined(_MSC_VER)
39 #pragma warning(disable : 4127)
40 #pragma warning(disable : 4805)
43 #include "unsupported/Eigen/CXX11/ThreadPool"
46 #pragma GCC diagnostic pop
47 #elif defined(_MSC_VER)
156 namespace onnxruntime {
157 namespace concurrency {
192 #if defined(__x86_64__)
193 #define ORT_FALSE_SHARING_BYTES 128
195 #define ORT_FALSE_SHARING_BYTES 64
198 #define ORT_ALIGN_TO_AVOID_FALSE_SHARING alignas(ORT_FALSE_SHARING_BYTES)
211 #ifdef ORT_MINIMAL_BUILD
250 using Clock = std::chrono::high_resolution_clock;
259 void LogRun(
int thread_idx);
264 struct MainThreadStat {
267 std::vector<std::ptrdiff_t> blocks_;
268 std::vector<onnxruntime::TimePoint> points_;
270 void LogBlockSize(std::ptrdiff_t block_size);
276 bool enabled_ =
false;
277 MainThreadStat& GetMainThreadStat();
280 #pragma warning(push)
282 #pragma warning(disable : 4324)
286 uint64_t num_run_ = 0;
293 std::vector<ChildThreadStat> child_thread_stats_;
321 std::function<
void(
unsigned idx)> fn,
322 unsigned n, std::ptrdiff_t block_size) = 0;
338 virtual void RunInParallel(std::function<
void(
unsigned idx)> fn,
339 unsigned n, std::ptrdiff_t block_size) = 0;
401 const std::function<void(unsigned)>
fn;
408 template <
typename Work,
typename Tag,
unsigned kSize>
413 assert((kSize & (kSize - 1)) == 0);
415 assert(kSize <= (64 << 10));
416 for (
unsigned i = 0; i < kSize; i++) array_[i].state.store(
ElemState::kEmpty, std::memory_order_relaxed);
433 front = front_.load(std::memory_order_relaxed);
434 e = &array_[(front - 1) & kMask];
435 s = e->state.load(std::memory_order_relaxed);
437 e->state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire)) {
439 front = ((front - 1) & kMask2) | (front & ~kMask2);
440 front_.store(front, std::memory_order_relaxed);
447 !e->state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire))
449 Work
w = std::move(e->w);
452 front = ((front - 1) & kMask2) | (front & ~kMask2);
453 front_.store(front, std::memory_order_relaxed);
460 #ifdef USE_LOCK_FREE_QUEUE
461 std::lock_guard<OrtSpinLock> mtx(spin_lock_);
463 std::lock_guard<OrtMutex> lock(mutex_);
465 unsigned back = back_.load(std::memory_order_relaxed);
466 Elem& e = array_[(back - 1) & kMask];
467 ElemState
s = e.state.load(std::memory_order_relaxed);
469 !e.state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire))
471 back = ((back - 1) & kMask2) | (back & ~kMask2);
472 back_.store(back, std::memory_order_relaxed);
484 #ifdef USE_LOCK_FREE_QUEUE
485 std::lock_guard<OrtSpinLock> mtx(spin_lock_);
487 std::lock_guard<OrtMutex> lock(mutex_);
489 unsigned back = back_.load(std::memory_order_relaxed);
490 w_idx = (back - 1) & kMask;
491 Elem& e = array_[w_idx];
492 ElemState
s = e.state.load(std::memory_order_relaxed);
494 !e.state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire))
496 bool was_ready = (((back ^ (front_.load(std::memory_order_relaxed))) & kMask) == 0);
497 back = ((back - 1) & kMask2) | (back & ~kMask2);
498 back_.store(back, std::memory_order_relaxed);
509 #ifdef USE_LOCK_FREE_QUEUE
510 std::lock_guard<OrtSpinLock> mtx(spin_lock_);
512 std::lock_guard<OrtMutex> lock(mutex_);
521 back = back_.load(std::memory_order_relaxed);
522 e = &array_[back & kMask];
523 s = e->state.load(std::memory_order_relaxed);
525 e->state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire)) {
527 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
532 !e->state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire))
534 Work
w = std::move(e->w);
537 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
554 bool revoked =
false;
555 #ifdef USE_LOCK_FREE_QUEUE
556 std::lock_guard<OrtSpinLock> mtx(spin_lock_);
558 std::lock_guard<OrtMutex> lock(mutex_);
560 Elem& e = array_[w_idx];
561 ElemState
s = e.state.load(std::memory_order_relaxed);
568 e.state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire)) {
570 unsigned back = back_.load(std::memory_order_relaxed);
571 unsigned back_idx = back & kMask;
572 if (back_idx != w_idx) {
584 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
598 return SizeOrNotEmpty<true>();
604 return SizeOrNotEmpty<false>() == 0;
608 static const unsigned kMask = kSize - 1;
609 static const unsigned kMask2 = (kSize << 1) - 1;
611 enum class ElemState : uint8_t {
626 std::atomic<ElemState> state;
631 #ifdef USE_LOCK_FREE_QUEUE
632 OrtSpinLock spin_lock_;
651 template <
bool NeedSizeEstimate>
652 unsigned SizeOrNotEmpty()
const {
655 unsigned front = front_.load(std::memory_order_acquire);
658 unsigned back = back_.load(std::memory_order_acquire);
659 unsigned front1 = front_.load(std::memory_order_relaxed);
660 if (front != front1) {
662 std::atomic_thread_fence(std::memory_order_acquire);
665 if (NeedSizeEstimate) {
666 return CalculateSize(front, back);
669 unsigned maybe_zero = ((front ^ back) & kMask2);
672 eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
678 unsigned CalculateSize(
unsigned front,
unsigned back)
const {
679 int size = (front & kMask2) - (back & kMask2);
687 if (size > static_cast<int>(kSize))
689 return static_cast<unsigned>(
size);
693 void operator=(
const RunQueue&) =
delete;
696 static std::atomic<uint32_t> next_tag{1};
698 template <
typename Environment>
703 static unsigned WorkerLoop(
int id, Eigen::ThreadPoolInterface*
param) {
706 this_ptr->WorkerLoop(
id);
712 void SignalAllAndWait() {
718 WakeAllWorkersForExit();
721 for (
size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].
thread.reset();
730 return profiler_.
Stop();
761 return v_ == other.
v_;
767 typedef std::function<void()>
Task;
771 const ThreadOptions& thread_options)
772 : profiler_(num_threads, name),
774 num_threads_(num_threads),
775 allow_spinning_(allow_spinning),
776 set_denormal_as_zero_(thread_options.set_denormal_as_zero),
777 worker_data_(num_threads),
778 all_coprimes_(num_threads),
788 for (
auto i = 1u; i <= num_threads_; ++i) {
789 all_coprimes_.emplace_back(i);
790 ComputeCoprimes(i, &all_coprimes_.back());
797 worker_data_.resize(num_threads_);
798 for (
auto i = 0u; i < num_threads_; i++) {
799 worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop,
this, thread_options));
820 PerThread* pt = GetPerThread();
821 int q_idx = Rand(&pt->rand) % num_threads_;
822 WorkerData& td = worker_data_[q_idx];
849 assert((!pt.leading_par_section) &&
"Nested parallelism not supported");
850 assert((!ps.
active) &&
"Starting parallel section, but active already");
851 pt.leading_par_section =
true;
865 PerThread* pt = GetPerThread();
874 assert((pt.leading_par_section) &&
"Ending parallel section, but none started");
875 assert((ps.
active) &&
"Ending parallel section, but not active");
876 pt.leading_par_section =
false;
921 unsigned tasks_started =
static_cast<unsigned>(ps.
tasks.size());
922 while (!ps.
tasks.empty()) {
923 const auto& item = ps.
tasks.back();
924 Queue&
q = worker_data_[item.first].queue;
934 while (!ps.
work_done.load(std::memory_order_acquire)) {
951 PerThread* pt = GetPerThread();
1057 static std::atomic<unsigned> next_worker{0};
1062 if (preferred_workers.empty()) {
1063 preferred_workers.push_back(-1);
1068 while (preferred_workers.size() <= num_threads_) {
1069 preferred_workers.push_back(next_worker++ % num_threads_);
1077 unsigned ran_on_idx = GetPerThread()->thread_id;
1078 assert(ran_on_idx < num_threads_);
1079 assert(par_idx < preferred_workers.size());
1080 preferred_workers[par_idx] = ran_on_idx;
1088 unsigned par_idx_start,
1089 unsigned par_idx_end,
1090 std::function<
void(
unsigned)> worker_fn) {
1091 for (
auto par_idx = par_idx_start; par_idx < par_idx_end; ++par_idx) {
1095 assert(par_idx < preferred_workers.size());
1096 unsigned q_idx = preferred_workers[par_idx] % num_threads_;
1097 assert(q_idx < num_threads_);
1098 WorkerData& td = worker_data_[q_idx];
1103 auto push_status = q.
PushBackWithTag([worker_fn, par_idx, &preferred_workers, &ps,
this]() {
1116 ps.
tasks.push_back({q_idx, w_idx});
1119 worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake();
1158 bool dispatch_async,
1159 std::function<
void(
unsigned)> worker_fn) {
1165 assert(new_dop <= (
unsigned)(num_threads_ + 1));
1166 auto& preferred_workers = pt.preferred_workers;
1174 if (current_dop < new_dop) {
1175 unsigned extra_needed = new_dop - current_dop;
1180 if (dispatch_async && extra_needed > 1) {
1181 assert(current_dop == 1);
1184 Task dispatch_task = [current_dop, new_dop, worker_fn, &preferred_workers, &ps, &pt,
this]() {
1192 ps.dispatch_started.store(
true, std::memory_order_seq_cst);
1196 ps.dispatch_done.store(
true, std::memory_order_release);
1203 worker_fn(current_dop);
1206 ps.work_done.store(
true, std::memory_order_release);
1210 ps.dispatch_q_idx = preferred_workers[current_dop] % num_threads_;
1211 WorkerData& dispatch_td = worker_data_[ps.dispatch_q_idx];
1212 Queue& dispatch_que = dispatch_td.queue;
1215 auto push_status = dispatch_que.
PushBackWithTag(dispatch_task, pt.tag, ps.dispatch_w_idx);
1220 dispatch_td.EnsureAwake();
1222 worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake();
1225 ps.dispatch_q_idx = -1;
1241 std::function<
void(
unsigned idx)> fn,
1243 std::ptrdiff_t block_size)
override {
1244 ORT_ENFORCE(n <= num_threads_ + 1,
"More work items than threads");
1246 PerThread* pt = GetPerThread();
1247 assert(pt->leading_par_section &&
"RunInParallel, but not in parallel section");
1248 assert((n > 1) &&
"Trivial parallel section; should be avoided by caller");
1253 assert((!ps.
current_loop) &&
"RunInParallelSection, but loop already active");
1259 std::function<void(unsigned)> worker_fn = [&ps](
unsigned par_idx) {
1266 if (work_item && par_idx < work_item->threads_needed) {
1267 work_item->
fn(par_idx);
1301 void RunInParallel(std::function<
void(
unsigned idx)> fn,
unsigned n, std::ptrdiff_t block_size)
override {
1302 ORT_ENFORCE(n <= num_threads_ + 1,
"More work items than threads");
1304 PerThread* pt = GetPerThread();
1316 return num_threads_;
1320 const PerThread* pt =
const_cast<ThreadPoolTempl*
>(
this)->GetPerThread();
1321 if (pt->pool ==
this) {
1322 return pt->thread_id;
1336 void ComputeCoprimes(
int N, Eigen::MaxSizeVector<unsigned>* coprimes) {
1337 for (
int i = 1; i <=
N; i++) {
1347 coprimes->push_back(i);
1352 typedef typename Environment::EnvThread
Thread;
1365 #pragma warning(push)
1367 #pragma warning(disable : 4324)
1371 constexpr PerThread() :
pool(nullptr) {
1374 bool initialized{
false};
1378 bool leading_par_section{
false};
1385 InlinedVector<int> preferred_workers;
1389 #pragma warning(pop)
1395 std::unique_ptr<Thread>
thread;
1418 enum class ThreadStatus : uint8_t {
1427 ThreadStatus GetStatus()
const {
1439 void EnsureAwake() {
1440 ThreadStatus seen = GetStatus();
1441 if (seen == ThreadStatus::Blocking ||
1442 seen == ThreadStatus::Blocked) {
1443 std::unique_lock<OrtMutex> lk(mutex);
1448 seen = status.load(std::memory_order_relaxed);
1449 assert(seen != ThreadStatus::Blocking);
1450 if (seen == ThreadStatus::Blocked) {
1451 status.store(ThreadStatus::Waking, std::memory_order_relaxed);
1464 status = ThreadStatus::Active;
1467 void SetSpinning() {
1468 status = ThreadStatus::Spinning;
1471 void SetBlocked(std::function<
bool()> should_block,
1472 std::function<
void()> post_block) {
1473 std::unique_lock<OrtMutex> lk(mutex);
1474 assert(GetStatus() == ThreadStatus::Spinning);
1475 status.store(ThreadStatus::Blocking, std::memory_order_relaxed);
1476 if (should_block()) {
1477 status.store(ThreadStatus::Blocked, std::memory_order_relaxed);
1480 }
while (status.load(std::memory_order_relaxed) == ThreadStatus::Blocked);
1483 status.store(ThreadStatus::Spinning, std::memory_order_relaxed);
1487 std::atomic<ThreadStatus> status{ThreadStatus::Spinning};
1493 const unsigned num_threads_;
1494 const bool allow_spinning_;
1495 const bool set_denormal_as_zero_;
1496 Eigen::MaxSizeVector<WorkerData> worker_data_;
1497 Eigen::MaxSizeVector<Eigen::MaxSizeVector<unsigned>> all_coprimes_;
1498 std::atomic<unsigned> blocked_;
1499 std::atomic<bool> done_;
1505 enum class SpinLoopStatus {
1518 void WakeAllWorkersForExit() {
1519 for (
auto& td : worker_data_) {
1526 PerThread* pt = GetPerThread();
1527 WorkerData& td = worker_data_[
thread_id];
1529 bool should_exit =
false;
1533 assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning);
1535 constexpr
int log2_spin = 20;
1536 const int spin_count = allow_spinning_ ? (1ull << log2_spin) : 0;
1537 const int steal_count = spin_count / 100;
1542 while (!should_exit) {
1543 Task t = q.PopFront();
1546 for (
int i = 0; i < spin_count && !done_; i++) {
1547 if (((i + 1) % steal_count == 0)) {
1564 bool should_block =
true;
1583 should_block =
false;
1592 if (done_ && blocked_ == num_threads_) {
1593 should_block =
false;
1599 if (NonEmptyQueueIndex() != -1) {
1612 return should_block;
1621 if (!t) t = q.PopFront();
1629 profiler_.
LogRun(thread_id);
1637 WakeAllWorkersForExit();
1649 PerThread* pt = GetPerThread();
1650 unsigned size = num_threads_;
1652 unsigned r = Rand(&pt->rand);
1653 unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
1654 unsigned victim = r %
size;
1656 for (
unsigned i = 0; i < num_attempts; i++) {
1657 assert(victim < size);
1658 if (worker_data_[victim].GetStatus() == WorkerData::ThreadStatus::Active) {
1659 Task t = worker_data_[victim].queue.PopBack();
1665 if (victim >= size) {
1673 int NonEmptyQueueIndex() {
1674 PerThread* pt = GetPerThread();
1675 const unsigned size =
static_cast<unsigned>(worker_data_.size());
1676 unsigned r = Rand(&pt->rand);
1677 unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
1678 unsigned victim = r %
size;
1679 for (
unsigned i = 0; i <
size; i++) {
1680 if (!worker_data_[victim].
queue.Empty()) {
1684 if (victim >= size) {
1691 static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
1692 return std::hash<std::thread::id>()(std::this_thread::get_id());
1695 static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
1696 static thread_local PerThread per_thread_;
1697 PerThread* pt = &per_thread_;
1698 if (!pt->initialized) {
1699 pt->rand = GlobalThreadIdHash();
1700 pt->initialized =
true;
1705 static EIGEN_STRONG_INLINE
unsigned Rand(uint64_t* state) {
1706 uint64_t current = *state;
1708 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
1710 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
void ScheduleOnPreferredWorkers(PerThread &pt, ThreadPoolParallelSection &ps, InlinedVector< int > &preferred_workers, unsigned par_idx_start, unsigned par_idx_end, std::function< void(unsigned)> worker_fn)
virtual void StartProfiling()=0
std::string StopProfiling() override
void RunInParallel(std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override
bool RevokeWithTag(Tag tag, unsigned w_idx)
virtual void StartParallelSection(ThreadPoolParallelSection &ps)=0
GLsizei const GLchar *const * string
ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(ThreadPoolProfiler)
std::function< void()> Task
PaddingToAvoidFalseSharing padding_2
std::chrono::high_resolution_clock Clock
#define ORT_ENFORCE(condition,...)
void RunInParallelSection(ThreadPoolParallelSection &ps, std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override
GLboolean GLboolean GLboolean GLboolean a
const unsigned threads_needed
PaddingToAvoidFalseSharing padding_1
GLdouble GLdouble GLdouble q
void EndParallelSection(ThreadPoolParallelSection &ps) override
virtual std::string StopProfiling()=0
virtual void EndParallelSection(ThreadPoolParallelSection &ps)=0
InlinedVector< std::pair< int, unsigned > > tasks
std::atomic< bool > dispatch_done
RunQueue< Task, Tag, 1024 > Queue
std::atomic< ThreadPoolLoop * > current_loop
std::string DumpChildThreadStat()
~ThreadPoolTempl() override
std::chrono::high_resolution_clock::time_point TimePoint
int CurrentThreadId() const final
std::atomic< bool > work_done
#define ORT_ALIGN_TO_AVOID_FALSE_SHARING
bool operator==(const Tag &other) const
bool SetDenormalAsZero(bool on)
void EndParallelSectionInternal(PerThread &pt, ThreadPoolParallelSection &ps)
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
void InitializePreferredWorkers(InlinedVector< int > &preferred_workers)
absl::InlinedVector< T, N, Allocator > InlinedVector
void StartParallelSection(ThreadPoolParallelSection &ps) override
GLuint const GLchar * name
GLboolean GLboolean GLboolean b
std::atomic< unsigned > workers_in_loop
PushResult PushBackWithTag(Work w, Tag tag, unsigned &w_idx)
const std::function< void(unsigned)> fn
void StartParallelSectionInternal(PerThread &pt, ThreadPoolParallelSection &ps)
void LogEnd(ThreadPoolEvent)
std::atomic< bool > dispatch_started
void Schedule(std::function< void()> fn) override
void RunInParallelInternal(PerThread &pt, ThreadPoolParallelSection &ps, unsigned new_dop, bool dispatch_async, std::function< void(unsigned)> worker_fn)
**Note that the tasks the is the thread number *for the or if it s being executed by a non pool thread(this *can happen in cases where the whole pool is occupied and the calling *thread contributes to running the work load).**Thread pool.Have fun
ThreadPoolProfiler(int num_threads, const CHAR_TYPE *threal_pool_name)
void UpdatePreferredWorker(InlinedVector< int > &preferred_workers, unsigned par_idx)
ThreadPoolTempl(const CHAR_TYPE *name, int num_threads, bool allow_spinning, Environment &env, const ThreadOptions &thread_options)
void LogThreadId(int thread_idx)
void LogStartAndCoreAndBlock(std::ptrdiff_t block_size)
GA_API const UT_StringHolder N
virtual void RunInParallelSection(ThreadPoolParallelSection &ps, std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size)=0
void LogEndAndStart(ThreadPoolEvent)
GLubyte GLubyte GLubyte GLubyte w
char padding[ORT_FALSE_SHARING_BYTES]
ThreadPoolLoop(std::function< void(unsigned)> f, unsigned t)
void LogCoreAndBlock(std::ptrdiff_t block_size)
std::atomic< bool > active
**Note that the tasks the thread_id
#define ORT_FALSE_SHARING_BYTES
std::atomic< unsigned > tasks_finished
#define ORT_HANDLE_EXCEPTION(func)
virtual void RunInParallel(std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size)=0
void LogRun(int thread_idx)
int NumThreads() const final
void StartProfiling() override
**Note that the tasks the is the thread number *for the pool