HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
EigenNonBlockingThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 /* Modifications Copyright (c) Microsoft. */
11 
12 #include <type_traits>
13 
14 #pragma once
15 #include "onnxruntime_config.h"
16 // build/external/eigen/unsupported/Eigen/CXX11/src/Tensor/TensorEvaluator.h:162:71:
17 // error: ignoring attributes on template argument "Eigen::PacketType<const float, Eigen::DefaultDevice>::type {aka
18 // __vector(4) float}" [-Werror=ignored-attributes]
19 #if defined(__GNUC__)
20 #pragma GCC diagnostic push
21 #pragma GCC diagnostic ignored "-Wunused-parameter"
22 #pragma GCC diagnostic ignored "-Wunused-result"
23 // cmake/external/eigen/unsupported/Eigen/CXX11/../../../Eigen/src/Core/arch/NEON/PacketMath.h:1633:9:
24 // error: ‘void* memcpy(void*, const void*, size_t)’ copying an object of non-trivial type ‘Eigen::internal::Packet4c’
25 // {aka ‘struct Eigen::internal::eigen_packet_wrapper<int, 2>’} from an array of ‘const int8_t’
26 // {aka ‘const signed char’} [-Werror=class-memaccess]
27 #ifdef HAS_CLASS_MEMACCESS
28 #pragma GCC diagnostic ignored "-Wclass-memaccess"
29 #endif
30 // eigen-src/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h:231:56: error: implicit conversion loses integer
31 // precision: 'uint64_t' (aka 'unsigned long long') to 'size_t' (aka 'unsigned long') [-Werror,-Wshorten-64-to-32]
32 // next = wnext == kStackMask ? nullptr : &waiters_[wnext];
33 // ~~~~~~~~ ^~~~~
34 #ifdef HAS_SHORTEN_64_TO_32
35 #pragma GCC diagnostic ignored "-Wshorten-64-to-32"
36 #endif
37 #elif defined(_MSC_VER)
38 #pragma warning(push)
39 #pragma warning(disable : 4127)
40 #pragma warning(disable : 4805)
41 #endif
42 #include <memory>
43 #include "unsupported/Eigen/CXX11/ThreadPool"
44 
45 #if defined(__GNUC__)
46 #pragma GCC diagnostic pop
47 #elif defined(_MSC_VER)
48 #pragma warning(pop)
49 #endif
50 #include "core/common/denormal.h"
52 #include "core/common/spin_pause.h"
55 #include "core/platform/Barrier.h"
56 
57 // ORT thread pool overview
58 // ------------------------
59 //
60 // The ORT thread pool implementation is split into two layers. This
61 // file provides the low-level component. See the accompanying
62 // comments in threadpool.h for the high-level component.
63 //
64 // The code here is derived from the Eigen non-blocking thread pool,
65 // although many parts have been updated over time. The main
66 // abstractions used here are:
67 //
68 // - The thread pool maintains a set of OS threads running
69 // ThreadPoolTempl::WorkerLoop.
70 //
71 // Each thread has its own RunQueue object, holding a queue of tasks
72 // that have been pushed to the thread for execution. The main work
73 // loop is to pop a task from the head of the queue, and to execute
74 // it to completion. If the worker's run queue is empty then it
75 // will spin waiting for work, then attempt to steal tasks from
76 // other threads' queues, and then block in the OS if it cannot find
77 // work.
78 //
79 // This spin-then-block behavior is configured via a flag provided
80 // when creating the thread pool, and by the constant spin_count.
81 //
82 // - Although all tasks are simple void()->void functions,
83 // conceptually there are three different kinds:
84 //
85 // - One-shot tasks submitted externally via the Schedule() method.
86 // These tasks are used to support asynchronous work. These are
87 // used in the parallel executor, but otherwise are not widely
88 // used outside of test harnesses (see threadpool_test.cc for some
89 // examples).
90 //
91 // - Tasks for running a parallel loop.
92 //
93 // The tasks themselves are defined in threadpool.cc, and are
94 // submitted to the run queues via RunInParallel->SummonWorkers.
95 // Each task will loop internally, picking off iterations from the
96 // user's code via atoic-fetch-and-add, until the loop is
97 // complete.
98 //
99 // This two-layer approach lets us separate out the
100 // super-lightweight per-iteration-batch work from the more
101 // costly per-loop work of managing Task objects.
102 //
103 // - Tasks for running a parallel section. This is an extension of
104 // the approach taken for parallel loops. However, the Tasks are
105 // defined in this file, and can pick up iterations from a series
106 // of different parallel loops. The tasks are defined in
107 // RunInParallelSection->SummonWorkers.
108 //
109 // The additional layer of parallel sections is a further way to
110 // amortize costs: the work done creating the tasks can be
111 // performed once, and then exploited over a series of loops.
112 //
113 // There are a few aspects of the modified Eigen thread pool to
114 // highlight:
115 //
116 // - The run queues follow the usual approach of having push/pop
117 // operations on the front/back, and optimizing the PopFront case
118 // for single-threaded use by the thread owning the run queue.
119 // Two points to note here are:
120 //
121 // * We should experiment with simplifying these queues. In ORT, we
122 // use the CAS-based scheduling layer in threadpool.cc for the
123 // fine-grained allocation of individual loop iterations to worker
124 // threads. This means we do not have the form of recursive
125 // sub-division of work that motivates the original design.
126 //
127 // * We support an additional Revoke operation to replace an item in
128 // the middle of a queue with a tombstone. This operation is used
129 // at the end of parallel loops and parallel sections to remove
130 // any tasks that were created but not yet executed. Once
131 // revoked, a thread can rely on the fact that the task will no
132 // longer execute. Revocation helps manage captured state in
133 // parallel loops: the alternatives would be (i) waiting for all
134 // tasks that captured state to reach the head of their queues and
135 // execute, or (ii) use heap-allocated state in tasks, and use a
136 // technique such as reference counting to de-allocate it.
137 //
138 // To support revocation, each thread has a unique "Tag" to
139 // identify the items that it adds to the work queues. A thread
140 // can revoke an item only if it has the thread's own tag.
141 //
142 // - When entering a parallel loop (or parallel section), a thread
143 // maintains a set of "preferred" worker hints, and initially
144 // submits tasks to these workers.
145 // When a task executes, it updates the submitting thread's
146 // preferred workers to reflect the worker that the task ran on.
147 // Hence, if a task is submitted to thread T1's queue, and then
148 // stolen by T2 for execution, then T2 will become preferred.
149 //
150 // This "stickiness" aims to retain locality between successive
151 // loops submitted by the same thread, to maintain the same set of
152 // active threads over time (when the entire pool is not needed),
153 // and to allow concurrent requests to submit works to their own
154 // respective sets of preferred workers.
155 
156 namespace onnxruntime {
157 namespace concurrency {
158 
159 #ifdef _WIN32
160 using CHAR_TYPE = wchar_t;
161 #else
162 using CHAR_TYPE = char;
163 #endif
164 
166 class ThreadPoolLoop;
167 
168 enum class StealAttemptKind {
169  TRY_ONE,
170  TRY_ALL,
171 };
172 
173 enum class PushResult {
174  REJECTED,
177 };
178 
179 // Align to avoid false sharing with prior fields. If required,
180 // alignment or padding must be added subsequently to avoid false
181 // sharing with later fields. Note that:
182 //
183 // - The __x86_64__ value is twice the line size (64 bytes). This
184 // accounts for 2-line prefetch behavior on some cores.
185 //
186 // - Ideally, ORT_ALIGN_TO_AVOID_FALSE_SHARING is used. However, the
187 // definition of ThreadPoolParallelSection uses naive padding
188 // because C++11 does not support alignment constraints on
189 // allocation or expose stdlib.h aligned_alloc. C++17 introduces
190 // support for aligned allocation which we could use here.
191 
192 #if defined(__x86_64__)
193 #define ORT_FALSE_SHARING_BYTES 128
194 #else
195 #define ORT_FALSE_SHARING_BYTES 64
196 #endif
197 
198 #define ORT_ALIGN_TO_AVOID_FALSE_SHARING alignas(ORT_FALSE_SHARING_BYTES)
199 
202 };
203 
204 /* Usage:
205 1. In executor, call Start() before profiling and Stop() to get profiled numbers;
206 2. Inside thread pool, call LogStart() before interested section and LogEnd... after to log elapsed time;
207 3. To extend, just add more events in enum Event before "All", and update GetEventName(...) accordingly;
208 4. Note LogStart must pair with either LogEnd or LogEndAndStart, otherwise ORT_ENFORCE will fail;
209 5. ThreadPoolProfiler is thread-safe.
210 */
211 #ifdef ORT_MINIMAL_BUILD
212 class ThreadPoolProfiler {
213  public:
214  enum ThreadPoolEvent {
215  DISTRIBUTION = 0,
217  RUN,
218  WAIT,
219  WAIT_REVOKE,
220  MAX_EVENT
221  };
222  ThreadPoolProfiler(int, const CHAR_TYPE*){};
223  ~ThreadPoolProfiler() = default;
225  void Start(){};
226  std::string Stop() { return "not available for minimal build"; }
227  void LogStart(){};
228  void LogEnd(ThreadPoolEvent){};
230  void LogStartAndCoreAndBlock(std::ptrdiff_t){};
231  void LogCoreAndBlock(std::ptrdiff_t){};
232  void LogThreadId(int){};
233  void LogRun(int){};
234  std::string DumpChildThreadStat() { return {}; }
235 };
236 #else
238  public:
246  };
247  ThreadPoolProfiler(int num_threads, const CHAR_TYPE* threal_pool_name);
250  using Clock = std::chrono::high_resolution_clock;
251  void Start(); // called by executor to start profiling
252  std::string Stop(); // called by executor to stop profiling and return collected numbers
253  void LogStart(); // called in main thread to record the starting time point
254  void LogEnd(ThreadPoolEvent); // called in main thread to calculate and save the time elapsed from last start point
256  void LogStartAndCoreAndBlock(std::ptrdiff_t block_size);
257  void LogCoreAndBlock(std::ptrdiff_t block_size); // called in main thread to log core and block size for task breakdown
258  void LogThreadId(int thread_idx); // called in child thread to log its id
259  void LogRun(int thread_idx); // called in child thread to log num of run
260  std::string DumpChildThreadStat(); // return all child statitics collected so far
261 
262  private:
263  static const char* GetEventName(ThreadPoolEvent);
264  struct MainThreadStat {
265  uint64_t events_[MAX_EVENT] = {};
266  int32_t core_ = -1;
267  std::vector<std::ptrdiff_t> blocks_; // block size determined by cost model
268  std::vector<onnxruntime::TimePoint> points_;
269  void LogCore();
270  void LogBlockSize(std::ptrdiff_t block_size);
271  void LogStart();
272  void LogEnd(ThreadPoolEvent);
274  std::string Reset();
275  };
276  bool enabled_ = false;
277  MainThreadStat& GetMainThreadStat(); // return thread local stat
278  int num_threads_;
279 #ifdef _MSC_VER
280 #pragma warning(push)
281  // C4324: structure was padded due to alignment specifier
282 #pragma warning(disable : 4324)
283 #endif // _MSC_VER
284  struct ORT_ALIGN_TO_AVOID_FALSE_SHARING ChildThreadStat {
285  std::thread::id thread_id_;
286  uint64_t num_run_ = 0;
287  onnxruntime::TimePoint last_logged_point_ = Clock::now();
288  int32_t core_ = -1; // core that the child thread is running on
289  };
290 #ifdef _MSC_VER
291 #pragma warning(pop)
292 #endif // _MSC_VER
293  std::vector<ChildThreadStat> child_thread_stats_;
294  std::string thread_pool_name_;
295 };
296 #endif
297 
298 // Extended Eigen thread pool interface, avoiding the need to modify
299 // the ThreadPoolInterface.h header from the external Eigen
300 // repository.
301 
302 class ExtendedThreadPoolInterface : public Eigen::ThreadPoolInterface {
303  public:
304  // Start/end a parallel section, within which calls to
305  // RunInParallelSection may be made. Parallel sections are
306  // non-nesting.
307  virtual void StartParallelSection(ThreadPoolParallelSection& ps) = 0;
308  virtual void EndParallelSection(ThreadPoolParallelSection& ps) = 0;
309 
310  // Run fn with up to n degree-of-parallelism enlisting the thread
311  // pool for help. The degree-of-parallelism includes the caller,
312  // and so if n==1 then the function will run directly in the caller.
313  //
314  // The fork-join synchronization is handled in the thread pool, and
315  // so any state captured by fn() is safe from concurrent access once
316  // RunInParallelSection returns.
317  //
318  // The parameter idx provides a loop-local thread ID in the range
319  // [0,k) where k<=n.
321  std::function<void(unsigned idx)> fn,
322  unsigned n, std::ptrdiff_t block_size) = 0;
323 
324  // Special case alternative to RunInParallelSection for use without
325  // an existing parallel section. Ideally we would use a single
326  // implementation and a stack-allocated ThreadPoolParallelSection.
327  //
328  // However, on the BM_ThreadPoolParallelFor micro-benchmark I saw
329  // ~20% overhead on the resulting single-loop parallel sections.
330  // There are some additional costs (~5%) for additional invocations
331  // through lambda functions on loop entry. Most significantly, on
332  // loop exit, we incurred ~15% cost by no longer being able to
333  // overlap clean-up of unused Task objects in EndParallelSection
334  // with waiting for loop iterations to complete.
335  //
336  // [ Note that this 20% overhead is more than paid for when we have
337  // two loops execute in series in a parallel section. ]
338  virtual void RunInParallel(std::function<void(unsigned idx)> fn,
339  unsigned n, std::ptrdiff_t block_size) = 0;
340  virtual void StartProfiling() = 0;
341  virtual std::string StopProfiling() = 0;
342 };
343 
345  public:
346  // State accessed only by the main thread
347  // --------------------------------------
348 
349  // Tasks successfully submitted to the work queues. This sets the
350  // maximum degree of parallelism that the section will support.
352 
353  // Number of tasks revoked (i.e., removed from the queues prior to
354  // execution). We count this at various points, and omit waiting
355  // for them at the end of a loop.
356  unsigned tasks_revoked{0};
357 
358  // Current degree of parallelism, including work in the main thread
359  // and in the dispatcher.
360  unsigned current_dop{0};
361 
362  // State shared between the main thread and worker threads
363  // -------------------------------------------------------
364 
365  // Flag to signal termination of the parallel section
366  std::atomic<bool> active{false};
367 
368  // Count of the number of tasks that completed normally. Other
369  // tasks may be running currently, or may be present in work queues,
370  // or may have been removed from the queues by
371  // RunQueue::RevokeWithTag.
373  std::atomic<unsigned> tasks_finished{0};
375 
376  // If non-null, the current loop that tasks should be executing. We
377  // need to be careful on access to the contents of current_loop
378  // because it can be stack allocated on the thread entering the
379  // loop:
380  //
381  // - Readers increment workers_in_loop and then read current_loop
382  //
383  // - Writers wishing to deallocate *current_loop must first clear
384  // current_loop and then wait for workers_in_loop==0
385  std::atomic<ThreadPoolLoop*> current_loop{nullptr};
386  std::atomic<unsigned> workers_in_loop{0};
387 
388  // Members to track asynchronous dispatching
389  int dispatch_q_idx = -1; // index of thread that dispatch work to all other threads
390  unsigned dispatch_w_idx = 0; // index of enqueued work
391  std::atomic<bool> dispatch_started{false};
392  std::atomic<bool> dispatch_done{false};
393  std::atomic<bool> work_done{false};
394 };
395 
397  public:
398  ThreadPoolLoop(std::function<void(unsigned)> f, unsigned t) : fn(std::move(f)), threads_needed(t) {
399  }
400 
401  const std::function<void(unsigned)> fn;
402  const unsigned threads_needed;
403 
404  private:
405  ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(ThreadPoolLoop);
406 };
407 
408 template <typename Work, typename Tag, unsigned kSize>
409 class RunQueue {
410  public:
411  RunQueue() : front_(0), back_(0) {
412  // require power-of-two for fast masking
413  assert((kSize & (kSize - 1)) == 0);
414  assert(kSize > 2); // why would you do this?
415  assert(kSize <= (64 << 10)); // leave enough space for counter
416  for (unsigned i = 0; i < kSize; i++) array_[i].state.store(ElemState::kEmpty, std::memory_order_relaxed);
417  }
418 
420  assert(Size() == 0);
421  }
422 
423  // PopFront removes and returns the first element in the queue.
424  // If the queue was empty returns default-constructed Work.
425  Work PopFront() {
426  unsigned front;
427  Elem* e;
428  ElemState s;
429 
430  // Drain revoked items from the front of the queue. CAS to busy to synchronize with
431  // any attempt to take the same item from the back of the queue.
432  do {
433  front = front_.load(std::memory_order_relaxed);
434  e = &array_[(front - 1) & kMask];
435  s = e->state.load(std::memory_order_relaxed);
436  if (s == ElemState::kRevoked &&
437  e->state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) {
438  e->state.store(ElemState::kEmpty, std::memory_order_release);
439  front = ((front - 1) & kMask2) | (front & ~kMask2);
440  front_.store(front, std::memory_order_relaxed);
441  }
442  } while (s == ElemState::kRevoked);
443 
444  // Attempt to take next item. State kEmpty shows the queue is empty, kBusy shows
445  // the work is in progress on the item at the front of the queue.
446  if (s != ElemState::kReady ||
447  !e->state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire))
448  return Work();
449  Work w = std::move(e->w);
450  e->tag = Tag();
451  e->state.store(ElemState::kEmpty, std::memory_order_release);
452  front = ((front - 1) & kMask2) | (front & ~kMask2);
453  front_.store(front, std::memory_order_relaxed);
454  return w;
455  }
456 
457  // PushBack adds w at the end of the queue.
458  // If queue is full returns w, otherwise returns default-constructed Work.
459  Work PushBack(Work w) {
460 #ifdef USE_LOCK_FREE_QUEUE
461  std::lock_guard<OrtSpinLock> mtx(spin_lock_);
462 #else
463  std::lock_guard<OrtMutex> lock(mutex_);
464 #endif
465  unsigned back = back_.load(std::memory_order_relaxed);
466  Elem& e = array_[(back - 1) & kMask];
467  ElemState s = e.state.load(std::memory_order_relaxed);
468  if (s != ElemState::kEmpty ||
469  !e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire))
470  return w;
471  back = ((back - 1) & kMask2) | (back & ~kMask2);
472  back_.store(back, std::memory_order_relaxed);
473  e.w = std::move(w);
474  e.tag = Tag();
475  e.state.store(ElemState::kReady, std::memory_order_release);
476  return Work();
477  }
478 
479  // PushBackWithTag adds w at the end of the queue. The tag value can be used on a
480  // subsequent call to RevokeWithTag to remove the item from the queue in combination
481  // with w_idx. Typically the tag will be a per-thread ID to distinguish work
482  // submitted from different threads.
483  PushResult PushBackWithTag(Work w, Tag tag, unsigned& w_idx) {
484 #ifdef USE_LOCK_FREE_QUEUE
485  std::lock_guard<OrtSpinLock> mtx(spin_lock_);
486 #else
487  std::lock_guard<OrtMutex> lock(mutex_);
488 #endif
489  unsigned back = back_.load(std::memory_order_relaxed);
490  w_idx = (back - 1) & kMask;
491  Elem& e = array_[w_idx];
492  ElemState s = e.state.load(std::memory_order_relaxed);
493  if (s != ElemState::kEmpty ||
494  !e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire))
495  return PushResult::REJECTED; /* Not enqueued */
496  bool was_ready = (((back ^ (front_.load(std::memory_order_relaxed))) & kMask) == 0);
497  back = ((back - 1) & kMask2) | (back & ~kMask2);
498  back_.store(back, std::memory_order_relaxed);
499  e.w = std::move(w);
500  e.tag = tag;
501  e.state.store(ElemState::kReady, std::memory_order_release);
502  return was_ready ? PushResult::ACCEPTED_IDLE : PushResult::ACCEPTED_BUSY; /* Enqueued */
503  }
504 
505  // PopBack removes and returns the last elements in the queue.
506  Work PopBack() {
507  if (Empty())
508  return Work();
509 #ifdef USE_LOCK_FREE_QUEUE
510  std::lock_guard<OrtSpinLock> mtx(spin_lock_);
511 #else
512  std::lock_guard<OrtMutex> lock(mutex_);
513 #endif
514  unsigned back;
515  Elem* e;
516  ElemState s;
517 
518  // Drain revoked items from the back of the queue. CAS to busy to synchronize with
519  // any attempt to take the same item from the front of the queue.
520  do {
521  back = back_.load(std::memory_order_relaxed);
522  e = &array_[back & kMask];
523  s = e->state.load(std::memory_order_relaxed);
524  if (s == ElemState::kRevoked &&
525  e->state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) {
526  e->state.store(ElemState::kEmpty, std::memory_order_release);
527  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
528  }
529  } while (s == ElemState::kRevoked);
530 
531  if (s != ElemState::kReady ||
532  !e->state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire))
533  return Work();
534  Work w = std::move(e->w);
535  e->tag = Tag();
536  e->state.store(ElemState::kEmpty, std::memory_order_release);
537  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
538  return w;
539  }
540 
541  // RevokeItem removes a work item from the queue. Items are identified positionally,
542  // and so a tag is used to detect whether the same position is occupied by a
543  // different work item at the time of removal. RevokeWithTags lets threads offer work
544  // for parallel execution, and then revoke the offer prior to the work executing (for
545  // instance if the thread itself completes all of the work). Revoking the work
546  // lets the thread deallocate state that might otherwise have been captured by the work item
547  // and accessed by it.
548  //
549  // Return true iff the item is successfully revoked. If the item is not revoked then
550  // the caller must assume that it may still execute, for instance because it
551  // has been pop'd from the queue concurrent with the revocation request.
552 
553  bool RevokeWithTag(Tag tag, unsigned w_idx) {
554  bool revoked = false;
555 #ifdef USE_LOCK_FREE_QUEUE
556  std::lock_guard<OrtSpinLock> mtx(spin_lock_);
557 #else
558  std::lock_guard<OrtMutex> lock(mutex_);
559 #endif
560  Elem& e = array_[w_idx];
561  ElemState s = e.state.load(std::memory_order_relaxed);
562 
563  // We have acquired a lock on the queue, synchronizing with
564  // operations aside from the PopFront fast-path. Synchronize with
565  // that by attempting the same kReady->kBusy transition via CAS.
566 
567  if (s == ElemState::kReady &&
568  e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) {
569  if (e.tag == tag) {
570  unsigned back = back_.load(std::memory_order_relaxed);
571  unsigned back_idx = back & kMask;
572  if (back_idx != w_idx) {
573  // Item is not at the back of the queue, mark it in-place as revoked
574  e.tag = Tag();
575  e.w = Work();
576  e.state.store(ElemState::kRevoked, std::memory_order_release);
577  revoked = true;
578  } else {
579  // Item being removed as still at the back; shift the back pointer over it,
580  // and bump the version number.
581  e.tag = Tag();
582  e.w = Work();
583  e.state.store(ElemState::kEmpty, std::memory_order_release);
584  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
585  revoked = true;
586  }
587  } else {
588  // Tag mismatch, i.e. work queue slot re-used
589  e.state.store(ElemState::kReady, std::memory_order_release);
590  }
591  }
592  return revoked;
593  }
594 
595  // Size returns current queue size.
596  // Can be called by any thread at any time.
597  unsigned Size() const {
598  return SizeOrNotEmpty<true>();
599  }
600 
601  // Empty tests whether container is empty.
602  // Can be called by any thread at any time.
603  bool Empty() const {
604  return SizeOrNotEmpty<false>() == 0;
605  }
606 
607  private:
608  static const unsigned kMask = kSize - 1;
609  static const unsigned kMask2 = (kSize << 1) - 1;
610 
611  enum class ElemState : uint8_t {
612  kEmpty,
613  kBusy,
614  kReady,
615  kRevoked,
616  };
617 
618  // Updates to an element are bracketed by a std::memory_order_acquire
619  // load from the state, and a std::memory_order_release store. Accesses
620  // to the front/back indices for the work queue use relaxed semantics,
621  // with the state of the elements being authoritative.
622  //
623  // TODO: Revisit whether there is a significant benefit for the current
624  // workloads in the complexity here.
625  struct Elem {
626  std::atomic<ElemState> state;
627  Tag tag;
628  Work w;
629  };
630 
631 #ifdef USE_LOCK_FREE_QUEUE
632  OrtSpinLock spin_lock_;
633 #else
634  OrtMutex mutex_;
635 #endif
636 
637  // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
638  // front/back, respectively. The remaining bits contain modification counters
639  // that are incremented on Push operations. This allows us to (1) distinguish
640  // between empty and full conditions (if we would use log(kSize) bits for
641  // position, these conditions would be indistinguishable); (2) obtain
642  // consistent snapshot of front_/back_ for Size operation using the
643  // modification counters.
644  ORT_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned> front_;
645  ORT_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned> back_;
646  ORT_ALIGN_TO_AVOID_FALSE_SHARING Elem array_[kSize];
647 
648  // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
649  // only whether the size is 0 is guaranteed to be correct.
650  // Can be called by any thread at any time.
651  template <bool NeedSizeEstimate>
652  unsigned SizeOrNotEmpty() const {
653  // Emptiness plays critical role in thread pool blocking. So we go to great
654  // effort to not produce false positives (claim non-empty queue as empty).
655  unsigned front = front_.load(std::memory_order_acquire);
656  for (;;) {
657  // Capture a consistent snapshot of front/tail.
658  unsigned back = back_.load(std::memory_order_acquire);
659  unsigned front1 = front_.load(std::memory_order_relaxed);
660  if (front != front1) {
661  front = front1;
662  std::atomic_thread_fence(std::memory_order_acquire);
663  continue;
664  }
665  if (NeedSizeEstimate) {
666  return CalculateSize(front, back);
667  }
668  // This value will be 0 if the queue is empty, and undefined otherwise.
669  unsigned maybe_zero = ((front ^ back) & kMask2);
670  // Queue size estimate must agree with maybe zero check on the queue
671  // empty/non-empty state.
672  eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
673  return maybe_zero;
674  }
675  }
676 
677  EIGEN_ALWAYS_INLINE
678  unsigned CalculateSize(unsigned front, unsigned back) const {
679  int size = (front & kMask2) - (back & kMask2);
680  // Fix overflow.
681  if (size < 0)
682  size += 2 * kSize;
683  // Order of modification in push/pop is crafted to make the queue look
684  // larger than it is during concurrent modifications. E.g. push can
685  // increment size before the corresponding pop has decremented it.
686  // So the computed size can be up to kSize + 1, fix it.
687  if (size > static_cast<int>(kSize))
688  size = kSize;
689  return static_cast<unsigned>(size);
690  }
691 
692  RunQueue(const RunQueue&) = delete;
693  void operator=(const RunQueue&) = delete;
694 };
695 
696 static std::atomic<uint32_t> next_tag{1};
697 
698 template <typename Environment>
700  private:
701  struct PerThread;
702 
703  static unsigned WorkerLoop(int id, Eigen::ThreadPoolInterface* param) {
704  // unsafe downcast
705  ThreadPoolTempl* this_ptr = (ThreadPoolTempl*)param;
706  this_ptr->WorkerLoop(id);
707  return 0;
708  }
709 
710  ThreadPoolProfiler profiler_;
711 
712  void SignalAllAndWait() {
713  done_ = true;
714 
715  // Now if all threads block without work, they will start exiting.
716  // But note that threads can continue to work arbitrary long,
717  // block, submit new work, unblock and otherwise live full life.
718  WakeAllWorkersForExit();
719  // Join threads explicitly (by destroying) to avoid destruction order within
720  // this class.
721  for (size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].thread.reset();
722  }
723 
724  public:
725  void StartProfiling() override {
726  profiler_.Start();
727  }
728 
730  return profiler_.Stop();
731  }
732 
733  struct Tag {
734  constexpr Tag() : v_(0) {
735  }
736 
737  Tag(uint32_t v) : v_(v) {
738  }
739 
740  // Allocate a new tag to use to identify work items from a given
741  // thread in a parallel section. Ideally, threads will have
742  // unique tags, but re-use is not incorrect if the counter wraps
743  // (for intsance, if a long-running workload is calling into ORT
744  // from a fresh thread for each request). We must not re-use the
745  // default tag 0 which is used to identify work items added via
746  // Schedule as opposed to requests for help in parallel sections.
747 
748  static Tag GetNext() {
749  Tag t = Tag(next_tag++);
750  if (t.v_ == 0) {
751  t = Tag(next_tag++);
752  }
753  return t;
754  }
755 
756  uint32_t Get() const {
757  return v_;
758  }
759 
760  bool operator==(const Tag& other) const {
761  return v_ == other.v_;
762  }
763 
764  uint32_t v_ = 0;
765  };
766 
767  typedef std::function<void()> Task;
769 
770  ThreadPoolTempl(const CHAR_TYPE* name, int num_threads, bool allow_spinning, Environment& env,
771  const ThreadOptions& thread_options)
772  : profiler_(num_threads, name),
773  env_(env),
774  num_threads_(num_threads),
775  allow_spinning_(allow_spinning),
776  set_denormal_as_zero_(thread_options.set_denormal_as_zero),
777  worker_data_(num_threads),
778  all_coprimes_(num_threads),
779  blocked_(0),
780  done_(false) {
781  // Calculate coprimes of all numbers [1, num_threads].
782  // Coprimes are used for random walks over all threads in Steal
783  // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
784  // a random starting thread index t and calculate num_threads - 1 subsequent
785  // indices as (t + coprime) % num_threads, we will cover all threads without
786  // repetitions (effectively getting a presudo-random permutation of thread
787  // indices).
788  for (auto i = 1u; i <= num_threads_; ++i) {
789  all_coprimes_.emplace_back(i);
790  ComputeCoprimes(i, &all_coprimes_.back());
791  }
792 
793  // Eigen::MaxSizeVector has neither essential exception safety features
794  // such as swap, nor it is movable. So we have to join threads right here
795  // on exception
796  ORT_TRY {
797  worker_data_.resize(num_threads_);
798  for (auto i = 0u; i < num_threads_; i++) {
799  worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop, this, thread_options));
800  }
801  }
802  ORT_CATCH(...) {
803  ORT_HANDLE_EXCEPTION([&]() {
804  SignalAllAndWait();
805  throw;
806  });
807  }
808  }
809 
810  ~ThreadPoolTempl() override {
811  SignalAllAndWait();
812  }
813 
814  // Run fn(). Ordinarily, the function will be added to the thread pool and executed
815  // by a worker thread. If the thread pool rejects the work then fn() will instead
816  // execute synchronously during Schedule(fn). Currently the thread pool will only
817  // reject work if the queue of pending work is full.
818 
819  void Schedule(std::function<void()> fn) override {
820  PerThread* pt = GetPerThread();
821  int q_idx = Rand(&pt->rand) % num_threads_;
822  WorkerData& td = worker_data_[q_idx];
823  Queue& q = td.queue;
824  fn = q.PushBack(std::move(fn));
825  if (!fn) {
826  // The queue accepted the work; ensure that the thread will pick it up
827  td.EnsureAwake();
828  } else {
829  // Run the work directly if the queue rejected the work
830  fn();
831  }
832  }
833 
834  //......................................................................
835  //
836  // Parallel sections
837  // -----------------
838  //
839 
840  // Start a parallel section, using a caller-provided
841  // ThreadPoolParallelSection for maintaining the per-section state.
842  // Starting a parallel section is just book-keeping; threads are
843  // "summoned" to help with the parallel section once it enters
844  // parallel loops. The threads are then retained until the end of the
845  // section, being re-used over subsequent loops.
846 
847  void StartParallelSectionInternal(PerThread& pt,
849  assert((!pt.leading_par_section) && "Nested parallelism not supported");
850  assert((!ps.active) && "Starting parallel section, but active already");
851  pt.leading_par_section = true;
852  if (!pt.tag.Get()) {
853  pt.tag = Tag::GetNext();
854  }
855  ps.dispatch_q_idx = -1;
856  ps.dispatch_started = false;
857  ps.dispatch_done = false;
858  ps.work_done = false;
859  ps.tasks_revoked = 0;
860  ps.current_dop = 1;
861  ps.active = true;
862  }
863 
865  PerThread* pt = GetPerThread();
867  }
868 
869  // End a parallel section, waiting for all worker threads to exit from
870  // section. Hence, on return, the ThreadPoolParallelSection object
871  // can be dealloacted.
872  void EndParallelSectionInternal(PerThread& pt,
874  assert((pt.leading_par_section) && "Ending parallel section, but none started");
875  assert((ps.active) && "Ending parallel section, but not active");
876  pt.leading_par_section = false;
877 
878  // Notify workers to exit from the section
879  ps.active = false;
880 
881  // First, attempt to revoke the dispatch task. If we succeed then
882  // we know we revoked _something_ pushed for the current loop. That
883  // may be the dispatch task itself, or it may be a task pushed by
884  // the dispatch task. Those cases are distinguished by whether or
885  // not the dispatch task itself has started -- if it has not started
886  // then it cannot have pushed tasks.
887  if (ps.dispatch_q_idx != -1) {
888  Queue& q = worker_data_[ps.dispatch_q_idx].queue;
889  if (q.RevokeWithTag(pt.tag, ps.dispatch_w_idx)) {
890  if (!ps.dispatch_started.load(std::memory_order_acquire)) {
891  // We successfully revoked a task, and saw the dispatch task
892  // not started. Hence we know we revoked the dispatch task.
893  // This should be the common case.
894  ps.dispatch_q_idx = -1;
895  } else {
896  // We successfully revoked a task, but saw the dispatch task
897  // had started. Hence we know we revoked one of the _new_
898  // tasks created by the dispatcher (not the dispatcher
899  // itself). This should be the rare case, but can occur if
900  // one of the tasks created by the dispatcher occupies the
901  // exact same slot in a work queue that the dispatcher used.
902  ps.tasks_revoked++;
903  }
904  }
905  }
906 
907  // Second, if we failed to revoke the dispatch task, wait for it to
908  // finish dispatch work. This avoids new tasks being started
909  // concurrently with us attempting to end the parallel section.
910  if (ps.dispatch_q_idx != -1) {
911  while (!ps.dispatch_done.load(std::memory_order_acquire)) {
913  }
914  }
915 
916  // Now we know that dispatch is finshed, we synchronize with the
917  // tasks that were created (if any) for the parallel section. We
918  // revoke tasks still in queues, and then wait for any that are
919  // still running.
920  profiler_.LogStart();
921  unsigned tasks_started = static_cast<unsigned>(ps.tasks.size());
922  while (!ps.tasks.empty()) {
923  const auto& item = ps.tasks.back();
924  Queue& q = worker_data_[item.first].queue;
925  if (q.RevokeWithTag(pt.tag, item.second)) {
926  ps.tasks_revoked++;
927  }
928  ps.tasks.pop_back();
929  }
931 
932  // Wait for the dispatch task's own work...
933  if (ps.dispatch_q_idx > -1) {
934  while (!ps.work_done.load(std::memory_order_acquire)) {
936  }
937  }
938 
939  // ...and wait for any other tasks not revoked to finish their work
940  auto tasks_to_wait_for = tasks_started - ps.tasks_revoked;
941  while (ps.tasks_finished < tasks_to_wait_for) {
943  }
944 
945  // Clear status to allow the ThreadPoolParallelSection to be
946  // re-used.
947  ps.tasks_finished = 0;
948  }
949 
951  PerThread* pt = GetPerThread();
953  }
954 
955  //----------------------------------------------------------------------
956  //
957  // Preferred workers
958  // -----------------
959  //
960  // Initialize the set of hints for preferred worker threads we will
961  // use. We do this once, covering the maximum num_threads_ items,
962  // in order to avoid resizing preferred_workers concurrent with
963  // access from worker threads.
964  //
965  // For simplicity we initialize with hints round-robin among the
966  // workers. For simple workloads with 1 main thread this means we
967  // will distribute work across the pool of workers. For workers
968  // with multiple main threads it attempts to balance the load.
969  //
970  // These hints are just used as a starting point, and are updated by
971  // the worker thread that actually claims an item (e.g., if an item
972  // initially assigned to thread T1 is stolen and executed by T2,
973  // then T2 is assigned at the new preferred worker).
974  //
975  // Note that the hints are held in the _main_ thread that submits
976  // work to the pool. We assume that a thread is primarily
977  // submitting work to just one pool, but allow for the pool to
978  // change over time. Hence we allow the hints vector to grow over
979  // time.
980  //
981  // A note on terminology used in the variable names here:
982  //
983  // dop - degree of parallelism, as seen by the user. For instance
984  // dop=4 means 4 threads in total: 1 main thread that enters the
985  // loop, plus 1 dispatcher thread, plus 2 additional worker
986  // threads.
987  //
988  // par_idx - a thread's index within the loop, in the range [0,dop).
989  //
990  // num_threads_ - the number of worker threads in the thread pool. A
991  // loop with dop=4 will be common on a pool with 3 threads
992  // (given that the main thread will also participate).
993  //
994  // q_idx - a worker queue index, in the range [0,num_threads_).
995  //
996  // preferred_workers - this maps from par_idx values to q_idx. Hence,
997  // with dop=4 the vector will have length 4, and will identify
998  // which of the workers (0,1,2) should run tasks for the loop.
999  // Note that mapping from par_idx values means that only slots
1000  // [1,dop) are actually used in preferred_workers.
1001  //
1002  // Here are three examples, all assuming a machine with 4 h/w threads,
1003  // and ORT configured to use dop=4.
1004  //
1005  // * First, suppose that a single job is running a series of loops.
1006  // Its main thread enters a parallel loop. Initially, let's assume
1007  // its preferred worker array is [_,0,1,2], writing "_" for the
1008  // unusued element for the par_idx=0 work that the main thread will
1009  // run.
1010  //
1011  // The main thread schedules the dispatcher task onto worker 0.
1012  //
1013  // The dispatcher task schedules worker tasks onto workers 1 and 2.
1014  //
1015  // The tasks all execute, without any work stealing, on the threads
1016  // they were scheduled on. The preferred worker array remains
1017  // [_,0,1,2].
1018  //
1019  // * Next, assume we have the same job, and for whatever reason the
1020  // preferred workers were initially [_,0,0,0].
1021  //
1022  // The main thread schedules the dispatcher onto worker 0.
1023  //
1024  // This dispatcher task runs on worker 0, and pushes the worker
1025  // tasks back onto worker 0's queue.
1026  //
1027  // Workers 1 and 2 are idle, and steal tasks from worker 0. As the
1028  // tasks run, they update the preferred_workers array to record the
1029  // workers that execute them.
1030  //
1031  // After the loop, the preferred worker array may now be [_,0,2,1]
1032  // or [_,0,1,2], reflecting the fact that the work has got
1033  // re-distributed. The next loop will start out by distributing the
1034  // work to those same workers.
1035  //
1036  // * Finally, let's assume we have two jobs running on two main
1037  // threads, and we are now using DoP=2 in the loops, and have 2
1038  // workers in the thread pool (so the machine is not
1039  // over-subscribed).
1040  //
1041  // Each main thread has its own preferred_workers, and
1042  // let's say initially these are both [_,0].
1043  //
1044  // Here, with DoP=2, each main thread will just dispatch a single
1045  // task immediately (there is no need for asynchrony with only one
1046  // task to generate).
1047  //
1048  // Initially both main threads will submit these tasks to worker 0.
1049  //
1050  // Once worker 1 steals one of these tasks, the task will update its
1051  // preferred worker to be 1.
1052  //
1053  // From that point onwards, the two main threads will dispatch tasks
1054  // to separate workers, avoiding the need for further work stealing.
1055 
1057  static std::atomic<unsigned> next_worker{0};
1058 
1059  // preferred_workers[0] isn't supposed to be used, so initializing it with -1 to:
1060  // a) fault if inappropriately accessed
1061  // b) avoid wasting next_worker value
1062  if (preferred_workers.empty()) {
1063  preferred_workers.push_back(-1);
1064  }
1065 
1066  // preferred_workers maps from a par_idx to a q_idx, hence we
1067  // initialize slots in the range [0,num_threads_]
1068  while (preferred_workers.size() <= num_threads_) {
1069  preferred_workers.push_back(next_worker++ % num_threads_);
1070  }
1071  }
1072 
1073  // Update the preferred worker for par_idx to be the calling thread
1074 
1076  unsigned par_idx) {
1077  unsigned ran_on_idx = GetPerThread()->thread_id;
1078  assert(ran_on_idx < num_threads_);
1079  assert(par_idx < preferred_workers.size());
1080  preferred_workers[par_idx] = ran_on_idx;
1081  }
1082 
1083  // Schedule [par_idx_start,par_idx_end) across the preferred workers
1084 
1085  void ScheduleOnPreferredWorkers(PerThread& pt,
1087  InlinedVector<int>& preferred_workers,
1088  unsigned par_idx_start,
1089  unsigned par_idx_end,
1090  std::function<void(unsigned)> worker_fn) {
1091  for (auto par_idx = par_idx_start; par_idx < par_idx_end; ++par_idx) {
1092  // Look up hint for par_idx. Note that the hints may have been
1093  // recorded from a prior thread pool with a different number of
1094  // threads, hence we must cap at num_threads_.
1095  assert(par_idx < preferred_workers.size());
1096  unsigned q_idx = preferred_workers[par_idx] % num_threads_;
1097  assert(q_idx < num_threads_);
1098  WorkerData& td = worker_data_[q_idx];
1099  Queue& q = td.queue;
1100  unsigned w_idx;
1101 
1102  // Attempt to enqueue the task
1103  auto push_status = q.PushBackWithTag([worker_fn, par_idx, &preferred_workers, &ps, this]() {
1104  // Record the worker thread that actually runs this task.
1105  // This will form the preferred worker for the next loop.
1106  UpdatePreferredWorker(preferred_workers, par_idx);
1107  worker_fn(par_idx);
1108  ps.tasks_finished++;
1109  },
1110  pt.tag, w_idx);
1111 
1112  // Queue accepted the task; wake the thread that owns the queue.
1113  // In addition, if the queue was non-empty, attempt to wake
1114  // another thread (which may then steal the task).
1115  if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) {
1116  ps.tasks.push_back({q_idx, w_idx});
1117  td.EnsureAwake();
1118  if (push_status == PushResult::ACCEPTED_BUSY) {
1119  worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake();
1120  }
1121  }
1122  }
1123  }
1124 
1125  //......................................................................
1126  //
1127  // Parallel loops
1128  // --------------
1129  //
1130  // Ensure that the ThreadPoolParallelSection has sufficient workers to
1131  // execute a loop with degree of parallelism n. We track the number
1132  // of workers already avaiable to the parallel section, prior to
1133  // submitting tasks to the work queues to make up the total.
1134  //
1135  // Each worker will call in to worker_fn(idx) with a per-worker thread
1136  // ID. Note there are different levels of indirection here:
1137  //
1138  // - In a single-loop parallel section, worker_fn will directly
1139  // execute the threadpool.cc code that implements the parallel loop.
1140  //
1141  // - In a multi-loop parallel section, worker_fn is an intermediate
1142  // function that is long-lived (i.e., that lasts until the end of
1143  // the parallel section, as opposed to just a single loop's
1144  // duration).
1145  //
1146  // For ordinary parallel sections, RunInParallelInternal dispatch
1147  // tasks to a number of workers asynchronously. A worker thread will
1148  // be selected as the dispatcher that distributes tasks. This removes
1149  // the O(n) work off the critical path of starting the first loop
1150  // iteration, helping maintain good performance on very short loops.
1151  //
1152  // See the note on terminology above for the use of variable names
1153  // here.
1154 
1155  void RunInParallelInternal(PerThread& pt,
1157  unsigned new_dop,
1158  bool dispatch_async,
1159  std::function<void(unsigned)> worker_fn) {
1160  // Ensure that the vector of preferred workers is sufficient for the
1161  // size of the loop we are entering. We do this before dispatching
1162  // tasks for the loop in order to avoid any races between changes to
1163  // the size of the vector and recording the locations that tasks run
1164  // in as they complete.
1165  assert(new_dop <= (unsigned)(num_threads_ + 1));
1166  auto& preferred_workers = pt.preferred_workers;
1167  InitializePreferredWorkers(preferred_workers);
1168 
1169  // current_dop is the degree of parallelism via any workers already
1170  // participating in the current parallel section. Usually, for
1171  // single-loop parallel sections, current_dop=1.
1172  unsigned current_dop = ps.current_dop;
1173 
1174  if (current_dop < new_dop) {
1175  unsigned extra_needed = new_dop - current_dop;
1176 
1177  // Attempt to summon additional workers asynchronously if we
1178  // need more than one. Otherwise, we fall back to simple
1179  // synchronous scheduling.
1180  if (dispatch_async && extra_needed > 1) {
1181  assert(current_dop == 1);
1182 
1183  // Task for dispatching work asynchronously.
1184  Task dispatch_task = [current_dop, new_dop, worker_fn, &preferred_workers, &ps, &pt, this]() {
1185  // Record that dispatch work has started. This must occur
1186  // prior to scheduling tasks, in order to synchronize with
1187  // EndParallelSectionInternal. [ If EndParallelSection
1188  // revoked a task, and then sees distpatch_started=false, then
1189  // it knows that it revoked the dispatcher. Conversely, if it
1190  // revokes a task, and then sees dispatch_started=true, then
1191  // it knows it revoked a worker task. ]
1192  ps.dispatch_started.store(true, std::memory_order_seq_cst);
1193 
1194  // Schedule tasks par_idx=[current_dop+1,new_dop)
1195  ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop + 1, new_dop, worker_fn);
1196  ps.dispatch_done.store(true, std::memory_order_release);
1197 
1198  // Record the worker thread that actually runs this task.
1199  // This will form the preferred worker for the next loop.
1200  UpdatePreferredWorker(preferred_workers, current_dop);
1201 
1202  // Run dispatcher task's own work, par_idx=current_dop
1203  worker_fn(current_dop);
1204 
1205  // Dispatcher's work complete
1206  ps.work_done.store(true, std::memory_order_release);
1207  };
1208 
1209  profiler_.LogStart();
1210  ps.dispatch_q_idx = preferred_workers[current_dop] % num_threads_;
1211  WorkerData& dispatch_td = worker_data_[ps.dispatch_q_idx];
1212  Queue& dispatch_que = dispatch_td.queue;
1213 
1214  // assign dispatch task to selected dispatcher
1215  auto push_status = dispatch_que.PushBackWithTag(dispatch_task, pt.tag, ps.dispatch_w_idx);
1216  // Queue accepted the task; wake the thread that owns the queue.
1217  // In addition, if the queue was non-empty, attempt to wake
1218  // another thread (which may then steal the task).
1219  if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) {
1220  dispatch_td.EnsureAwake();
1221  if (push_status == PushResult::ACCEPTED_BUSY) {
1222  worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake();
1223  }
1224  } else {
1225  ps.dispatch_q_idx = -1; // failed to enqueue dispatch_task
1226  }
1228  } else {
1229  // Synchronous dispatch
1230  ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop, new_dop, std::move(worker_fn));
1231  }
1232  ps.current_dop = new_dop;
1233  }
1234  }
1235 
1236  // Run a single parallel loop in an existing parallel section. This
1237  // maps directly onto SummonWorkers to create sufficient worker
1238  // threads for the desired degree of parallelism, followed by
1239  // dispatching the loop to those workers.
1241  std::function<void(unsigned idx)> fn,
1242  unsigned n,
1243  std::ptrdiff_t block_size) override {
1244  ORT_ENFORCE(n <= num_threads_ + 1, "More work items than threads");
1245  profiler_.LogStartAndCoreAndBlock(block_size);
1246  PerThread* pt = GetPerThread();
1247  assert(pt->leading_par_section && "RunInParallel, but not in parallel section");
1248  assert((n > 1) && "Trivial parallel section; should be avoided by caller");
1249 
1250  // Publish the work to any existing workers in the parallel
1251  // section, and ensure it is visible to any new threads created
1252  // below.
1253  assert((!ps.current_loop) && "RunInParallelSection, but loop already active");
1254  ThreadPoolLoop loop{std::move(fn), n};
1255  ps.current_loop = &loop;
1256 
1257  // Increase the worker count if needed. Each worker will pick up
1258  // loops to execute from the current parallel section.
1259  std::function<void(unsigned)> worker_fn = [&ps](unsigned par_idx) {
1260  while (ps.active) {
1261  if (ps.current_loop.load() == nullptr) {
1263  } else {
1264  ps.workers_in_loop++;
1265  ThreadPoolLoop* work_item = ps.current_loop;
1266  if (work_item && par_idx < work_item->threads_needed) {
1267  work_item->fn(par_idx);
1268  }
1269  ps.workers_in_loop--;
1270  }
1271  }
1272  };
1273  RunInParallelInternal(*pt, ps, n, false, std::move(worker_fn));
1274  assert(ps.dispatch_q_idx == -1);
1276 
1277  // Run work in the main thread
1278  loop.fn(0);
1280 
1281  // Wait for workers to exit the loop
1282  ps.current_loop = 0;
1283  while (ps.workers_in_loop) {
1285  }
1286  profiler_.LogEnd(ThreadPoolProfiler::WAIT);
1287  }
1288 
1289  // Run a single parallel loop _without_ a parallel section. This is a
1290  // special case of RunInParallelSection, avoiding code paths for
1291  // handing off multiple loops to the pool of workers.
1292  // For main thread:
1293  // 1. select a dispatcher and do job distribution;
1294  // 2. run fn(0);
1295  // 3, wait for all;
1296  // For dispatcher:
1297  // 1. distribute jobs to all other threads;
1298  // 2. run fn(...) itself.
1299  // For all other threads:
1300  // 1. run fn(...);
1301  void RunInParallel(std::function<void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override {
1302  ORT_ENFORCE(n <= num_threads_ + 1, "More work items than threads");
1303  profiler_.LogStartAndCoreAndBlock(block_size);
1304  PerThread* pt = GetPerThread();
1307  RunInParallelInternal(*pt, ps, n, true, fn); // select dispatcher and do job distribution;
1309  fn(0); // run fn(0)
1311  EndParallelSectionInternal(*pt, ps); // wait for all
1312  profiler_.LogEnd(ThreadPoolProfiler::WAIT);
1313  }
1314 
1315  int NumThreads() const final {
1316  return num_threads_;
1317  }
1318 
1319  int CurrentThreadId() const final {
1320  const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
1321  if (pt->pool == this) {
1322  return pt->thread_id;
1323  }
1324  return -1;
1325  }
1326 
1328  spin_loop_status_ = SpinLoopStatus::kBusy;
1329  }
1330 
1332  spin_loop_status_ = SpinLoopStatus::kIdle;
1333  }
1334 
1335  private:
1336  void ComputeCoprimes(int N, Eigen::MaxSizeVector<unsigned>* coprimes) {
1337  for (int i = 1; i <= N; i++) {
1338  unsigned a = i;
1339  unsigned b = N;
1340  // If GCD(a, b) == 1, then a and b are coprimes.
1341  while (b != 0) {
1342  unsigned tmp = a;
1343  a = b;
1344  b = tmp % b;
1345  }
1346  if (a == 1) {
1347  coprimes->push_back(i);
1348  }
1349  }
1350  }
1351 
1352  typedef typename Environment::EnvThread Thread;
1353  struct WorkerData;
1354 
1355  // PerThread objects are allocated in thread-local storage and
1356  // allocated on the thread's first call to GetPerThread. PerThread
1357  // objects are allocated for all threads that submit work to the
1358  // thread pool, in addition to threads within the pool.
1359  //
1360  // In contrast, the WorkerData objects are allocated only for the
1361  // threads in the pool, and their lifetime is managed along with the
1362  // pool.
1363 
1364 #ifdef _MSC_VER
1365 #pragma warning(push)
1366 // C4324: structure was padded due to alignment specifier
1367 #pragma warning(disable : 4324)
1368 #endif // _MSC_VER
1369 
1370  struct ORT_ALIGN_TO_AVOID_FALSE_SHARING PerThread {
1371  constexpr PerThread() : pool(nullptr) {
1372  }
1373  ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
1374  bool initialized{false}; // Non-trivial initialization ran (e.g. for RNG)
1375  uint64_t rand{0}; // Random generator state.
1376  int thread_id{-1}; // Worker thread index in pool.
1377  Tag tag{}; // Work item tag used to identify this thread.
1378  bool leading_par_section{false}; // Leading a parallel section (used only for asserts)
1379 
1380  // When this thread is entering a parallel section, it will
1381  // initially push work to this set of workers. The aim is to
1382  // retain cache state within the workers, and to reduce the number
1383  // of times that the work-stealing code paths are used for
1384  // rebalancing.
1385  InlinedVector<int> preferred_workers;
1386  };
1387 
1388 #ifdef _MSC_VER
1389 #pragma warning(pop)
1390 #endif // _MSC_VER
1391 
1392  struct WorkerData {
1393  constexpr WorkerData() : thread(), queue() {
1394  }
1395  std::unique_ptr<Thread> thread;
1396  Queue queue;
1397 
1398  // Each thread has a status, available read-only without locking, and protected
1399  // by the mutex field below for updates. The status is used for three
1400  // purposes:
1401  //
1402  // 1. To identify threads that are good candidates to push work to.
1403  // We prefer to push work to threads that are actively spinning (no need
1404  // for an OS wake-up, and no need for current work to finish). After that, we
1405  // prefer to push work to threads that are blocked (no need to wait for the
1406  // current work to finish).
1407  //
1408  // 2. To identify threads that are good candidates to steal work from. We
1409  // prefer to steal work from threads that are active outside the worker loop.
1410  // This avoids "snatching" new work away from a thread that has just been
1411  // given it but not yet noticed.
1412  //
1413  // 3. When pushing work to a thread, we use the status read-only to identify
1414  // when we need to wake the thread. This read-only check avoids the
1415  // need for mutex / condvar operations in the case where the thread pool
1416  // remains busy.
1417 
1418  enum class ThreadStatus : uint8_t {
1419  Spinning, // Spinning in the work loop, and other cases (initialization) where
1420  // the thread will soon be in the loop
1421  Active, // Running user code, not waiting for work
1422  Blocking, // In the process of blocking; may no longer notice work pushed to it
1423  Blocked, // Blocked on cv
1424  Waking, // Not yet back in the worker loop, but wake-up notification sent
1425  };
1426 
1427  ThreadStatus GetStatus() const {
1428  return status;
1429  }
1430 
1431  // State transitions, called from other threads
1432 
1433  // We employ mutex for synchronizing on Blocked/Waking state (EnsureAwake/SeBlocked)
1434  // to wakeup the thread in the event it goes to sleep. Because thread status
1435  // is an atomic member the lock is not necessary to update it.
1436  // Thus, we do not obtain the mutex when we set Active/Spinning state for the thread.
1437  // While manipulating under the mutex, we employ relaxed semantics so the compiler is not restricted
1438  // any further.
1439  void EnsureAwake() {
1440  ThreadStatus seen = GetStatus();
1441  if (seen == ThreadStatus::Blocking ||
1442  seen == ThreadStatus::Blocked) {
1443  std::unique_lock<OrtMutex> lk(mutex);
1444  // Blocking state exists only transiently during the SetBlock() method
1445  // while holding the lock. We may observe it at the start of this
1446  // function, but after acquiring the lock then the target thread
1447  // will either be blocked or not.
1448  seen = status.load(std::memory_order_relaxed);
1449  assert(seen != ThreadStatus::Blocking);
1450  if (seen == ThreadStatus::Blocked) {
1451  status.store(ThreadStatus::Waking, std::memory_order_relaxed);
1452  lk.unlock();
1453  cv.notify_one();
1454  }
1455  }
1456  }
1457 
1458  // State transitions, called only from the thread itself
1459  // The lock is only used in the synchronization between EnsureAwake and SetBlocked,
1460  // while the Active vs Spinning states are just used as a hint for work stealing
1461  // (prefer to steal from a thread that is actively running a task, rather than stealing from
1462  // a thread that is spinning and likely to pick up the task itself).
1463  void SetActive() {
1464  status = ThreadStatus::Active;
1465  }
1466 
1467  void SetSpinning() {
1468  status = ThreadStatus::Spinning;
1469  }
1470 
1471  void SetBlocked(std::function<bool()> should_block,
1472  std::function<void()> post_block) {
1473  std::unique_lock<OrtMutex> lk(mutex);
1474  assert(GetStatus() == ThreadStatus::Spinning);
1475  status.store(ThreadStatus::Blocking, std::memory_order_relaxed);
1476  if (should_block()) {
1477  status.store(ThreadStatus::Blocked, std::memory_order_relaxed);
1478  do {
1479  cv.wait(lk);
1480  } while (status.load(std::memory_order_relaxed) == ThreadStatus::Blocked);
1481  post_block();
1482  }
1483  status.store(ThreadStatus::Spinning, std::memory_order_relaxed);
1484  }
1485 
1486  private:
1487  std::atomic<ThreadStatus> status{ThreadStatus::Spinning};
1488  OrtMutex mutex;
1489  OrtCondVar cv;
1490  };
1491 
1492  Environment& env_;
1493  const unsigned num_threads_;
1494  const bool allow_spinning_;
1495  const bool set_denormal_as_zero_;
1496  Eigen::MaxSizeVector<WorkerData> worker_data_;
1497  Eigen::MaxSizeVector<Eigen::MaxSizeVector<unsigned>> all_coprimes_;
1498  std::atomic<unsigned> blocked_; // Count of blocked workers, used as a termination condition
1499  std::atomic<bool> done_;
1500 
1501  // SpinLoopStatus indicates whether the main worker spinning (inner) loop should exit immediately when there is
1502  // no work available (kIdle) or whether it should follow the configured spin-then-block policy (kBusy).
1503  // This lets the ORT session layer hint to the thread pool that it should stop spinning in between
1504  // requests.
1505  enum class SpinLoopStatus {
1506  kIdle,
1507  kBusy
1508  };
1509 
1510  // Default is no control over spinning
1511  std::atomic<SpinLoopStatus> spin_loop_status_{SpinLoopStatus::kBusy};
1512 
1513  // Wake any blocked workers so that they can cleanly exit WorkerLoop(). For
1514  // a clean exit, each thread will observe (1) done_ set, indicating that the
1515  // destructor has been called, (2) all threads blocked, and (3) no
1516  // items in the work queues.
1517 
1518  void WakeAllWorkersForExit() {
1519  for (auto& td : worker_data_) {
1520  td.EnsureAwake();
1521  }
1522  }
1523 
1524  // Main worker thread loop.
1525  void WorkerLoop(int thread_id) {
1526  PerThread* pt = GetPerThread();
1527  WorkerData& td = worker_data_[thread_id];
1528  Queue& q = td.queue;
1529  bool should_exit = false;
1530  pt->pool = this;
1531  pt->thread_id = thread_id;
1532 
1533  assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning);
1534 
1535  constexpr int log2_spin = 20;
1536  const int spin_count = allow_spinning_ ? (1ull << log2_spin) : 0;
1537  const int steal_count = spin_count / 100;
1538 
1539  SetDenormalAsZero(set_denormal_as_zero_);
1540  profiler_.LogThreadId(thread_id);
1541 
1542  while (!should_exit) {
1543  Task t = q.PopFront();
1544  if (!t) {
1545  // Spin waiting for work.
1546  for (int i = 0; i < spin_count && !done_; i++) {
1547  if (((i + 1) % steal_count == 0)) {
1548  t = Steal(StealAttemptKind::TRY_ONE);
1549  } else {
1550  t = q.PopFront();
1551  }
1552  if (t) break;
1553 
1554  if (spin_loop_status_.load(std::memory_order_relaxed) == SpinLoopStatus::kIdle) {
1555  break;
1556  }
1558  }
1559 
1560  // Attempt to block
1561  if (!t) {
1562  td.SetBlocked( // Pre-block test
1563  [&]() -> bool {
1564  bool should_block = true;
1565  // Check whether work was pushed to us while attempting to block. We make
1566  // this test while holding the per-thread status lock, and after setting
1567  // our status to ThreadStatus::Blocking.
1568  //
1569  // This synchronizes with ThreadPool::Schedule which pushes work to the queue
1570  // and then tests for ThreadStatus::Blocking/Blocked (via EnsureAwake):
1571  //
1572  // Main thread: Worker:
1573  // #1 Push work #A Set status blocking
1574  // #2 Read worker status #B Check queue
1575  // #3 Wake if blocking/blocked
1576  //
1577  // If #A is before #2 then main sees worker blocked and wakes
1578  //
1579  // If #A if after #2 then #B will see #1, and we abandon blocking
1580  assert(!t);
1581  t = q.PopFront();
1582  if (t) {
1583  should_block = false;
1584  }
1585 
1586  // No work pushed to us, continue attempting to block. The remaining
1587  // test is to synchronize with termination requests. If we are
1588  // shutting down and all worker threads blocked without work, that's
1589  // we are done.
1590  if (should_block) {
1591  blocked_++;
1592  if (done_ && blocked_ == num_threads_) {
1593  should_block = false;
1594  // Almost done, but need to re-check queues.
1595  // Consider that all queues are empty and all worker threads are preempted
1596  // right after incrementing blocked_ above. Now a free-standing thread
1597  // submits work and calls destructor (which sets done_). If we don't
1598  // re-check queues, we will exit leaving the work unexecuted.
1599  if (NonEmptyQueueIndex() != -1) {
1600  // Note: we must not pop from queues before we decrement blocked_,
1601  // otherwise the following scenario is possible. Consider that instead
1602  // of checking for emptiness we popped the only element from queues.
1603  // Now other worker threads can start exiting, which is bad if the
1604  // work item submits other work. So we just check emptiness here,
1605  // which ensures that all worker threads exit at the same time.
1606  blocked_--;
1607  } else {
1608  should_exit = true;
1609  }
1610  }
1611  }
1612  return should_block;
1613  },
1614  // Post-block update (executed only if we blocked)
1615  [&]() {
1616  blocked_--;
1617  });
1618  // Thread just unblocked. Unless we picked up work while
1619  // blocking, or are exiting, then either work was pushed to
1620  // us, or it was pushed to an overloaded queue
1621  if (!t) t = q.PopFront();
1622  if (!t) t = Steal(StealAttemptKind::TRY_ALL);
1623  }
1624  }
1625 
1626  if (t) {
1627  td.SetActive();
1628  t();
1629  profiler_.LogRun(thread_id);
1630  td.SetSpinning();
1631  }
1632  }
1633 
1634  // Whichever thread(s) observe the termination conditions are responsible for waking
1635  // any other threads that have remained blocked.
1636  if (should_exit) {
1637  WakeAllWorkersForExit();
1638  }
1639  }
1640 
1641  // Steal tries to steal work from other worker threads in a
1642  // best-effort manner. We steal only from threads that are running
1643  // in user code (ThreadStatus::Active). The intuition behind this
1644  // is that the thread is busy with other work, and we will avoid
1645  // "snatching" work from a thread which is just about to notice the
1646  // work itself.
1647 
1648  Task Steal(StealAttemptKind steal_kind) {
1649  PerThread* pt = GetPerThread();
1650  unsigned size = num_threads_;
1651  unsigned num_attempts = (steal_kind == StealAttemptKind::TRY_ALL) ? size : 1;
1652  unsigned r = Rand(&pt->rand);
1653  unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
1654  unsigned victim = r % size;
1655 
1656  for (unsigned i = 0; i < num_attempts; i++) {
1657  assert(victim < size);
1658  if (worker_data_[victim].GetStatus() == WorkerData::ThreadStatus::Active) {
1659  Task t = worker_data_[victim].queue.PopBack();
1660  if (t) {
1661  return t;
1662  }
1663  }
1664  victim += inc;
1665  if (victim >= size) {
1666  victim -= size;
1667  }
1668  }
1669 
1670  return Task();
1671  }
1672 
1673  int NonEmptyQueueIndex() {
1674  PerThread* pt = GetPerThread();
1675  const unsigned size = static_cast<unsigned>(worker_data_.size());
1676  unsigned r = Rand(&pt->rand);
1677  unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
1678  unsigned victim = r % size;
1679  for (unsigned i = 0; i < size; i++) {
1680  if (!worker_data_[victim].queue.Empty()) {
1681  return victim;
1682  }
1683  victim += inc;
1684  if (victim >= size) {
1685  victim -= size;
1686  }
1687  }
1688  return -1;
1689  }
1690 
1691  static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
1692  return std::hash<std::thread::id>()(std::this_thread::get_id());
1693  }
1694 
1695  static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
1696  static thread_local PerThread per_thread_;
1697  PerThread* pt = &per_thread_;
1698  if (!pt->initialized) {
1699  pt->rand = GlobalThreadIdHash();
1700  pt->initialized = true;
1701  }
1702  return pt;
1703  }
1704 
1705  static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
1706  uint64_t current = *state;
1707  // Update the internal state
1708  *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
1709  // Generate the random output (using the PCG-XSH-RS scheme)
1710  return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
1711  }
1712 };
1713 
1714 } // namespace concurrency
1715 
1716 } // namespace onnxruntime
void ScheduleOnPreferredWorkers(PerThread &pt, ThreadPoolParallelSection &ps, InlinedVector< int > &preferred_workers, unsigned par_idx_start, unsigned par_idx_end, std::function< void(unsigned)> worker_fn)
void RunInParallel(std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override
bool RevokeWithTag(Tag tag, unsigned w_idx)
virtual void StartParallelSection(ThreadPoolParallelSection &ps)=0
const GLdouble * v
Definition: glcorearb.h:837
GLsizei const GLchar *const * string
Definition: glcorearb.h:814
ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(ThreadPoolProfiler)
#define ORT_ENFORCE(condition,...)
Definition: common.h:172
void RunInParallelSection(ThreadPoolParallelSection &ps, std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override
GLboolean GLboolean GLboolean GLboolean a
Definition: glcorearb.h:1222
GLdouble s
Definition: glad.h:3009
#define ORT_TRY
Definition: common.h:153
GLdouble GLdouble GLdouble q
Definition: glad.h:2445
void EndParallelSection(ThreadPoolParallelSection &ps) override
virtual void EndParallelSection(ThreadPoolParallelSection &ps)=0
InlinedVector< std::pair< int, unsigned > > tasks
GLdouble n
Definition: glcorearb.h:2008
GLfloat f
Definition: glcorearb.h:1926
std::chrono::high_resolution_clock::time_point TimePoint
Definition: common.h:42
#define ORT_ALIGN_TO_AVOID_FALSE_SHARING
bool SetDenormalAsZero(bool on)
void EndParallelSectionInternal(PerThread &pt, ThreadPoolParallelSection &ps)
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
Definition: thread.h:623
void InitializePreferredWorkers(InlinedVector< int > &preferred_workers)
absl::InlinedVector< T, N, Allocator > InlinedVector
GLuint id
Definition: glcorearb.h:655
void StartParallelSection(ThreadPoolParallelSection &ps) override
GLuint const GLchar * name
Definition: glcorearb.h:786
GLboolean GLboolean GLboolean b
Definition: glcorearb.h:1222
GLdouble t
Definition: glad.h:2397
PushResult PushBackWithTag(Work w, Tag tag, unsigned &w_idx)
const std::function< void(unsigned)> fn
void StartParallelSectionInternal(PerThread &pt, ThreadPoolParallelSection &ps)
void Schedule(std::function< void()> fn) override
void RunInParallelInternal(PerThread &pt, ThreadPoolParallelSection &ps, unsigned new_dop, bool dispatch_async, std::function< void(unsigned)> worker_fn)
GLsizeiptr size
Definition: glcorearb.h:664
**Note that the tasks the is the thread number *for the or if it s being executed by a non pool thread(this *can happen in cases where the whole pool is occupied and the calling *thread contributes to running the work load).**Thread pool.Have fun
GLenum GLfloat param
Definition: glcorearb.h:104
ThreadPoolProfiler(int num_threads, const CHAR_TYPE *threal_pool_name)
void UpdatePreferredWorker(InlinedVector< int > &preferred_workers, unsigned par_idx)
ThreadPoolTempl(const CHAR_TYPE *name, int num_threads, bool allow_spinning, Environment &env, const ThreadOptions &thread_options)
void LogStartAndCoreAndBlock(std::ptrdiff_t block_size)
GA_API const UT_StringHolder N
virtual void RunInParallelSection(ThreadPoolParallelSection &ps, std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size)=0
GLubyte GLubyte GLubyte GLubyte w
Definition: glcorearb.h:857
GLboolean r
Definition: glcorearb.h:1222
ThreadPoolLoop(std::function< void(unsigned)> f, unsigned t)
void LogCoreAndBlock(std::ptrdiff_t block_size)
**Note that the tasks the thread_id
Definition: thread.h:637
#define ORT_FALSE_SHARING_BYTES
#define ORT_CATCH(x)
Definition: common.h:154
#define ORT_HANDLE_EXCEPTION(func)
Definition: common.h:157
virtual void RunInParallel(std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size)=0
**Note that the tasks the is the thread number *for the pool
Definition: thread.h:637