HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
threadpool.h
Go to the documentation of this file.
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 /* Modifications Copyright (c) Microsoft. */
17 
18 #pragma once
19 #include <string>
20 #include <vector>
21 #include <functional>
22 #include <memory>
23 #include "core/common/common.h"
24 #include "core/platform/env.h"
25 
26 #include <functional>
27 #include <memory>
28 
29 // ORT thread pool overview
30 // ------------------------
31 //
32 // The ORT thread pool implementation is split into two layers. This
33 // file provides the high-level component. See the accompanying
34 // comments in EigenNonBlockingThreadPool.h for the low-level
35 // component.
36 //
37 // threadpool.h defines the user-facing functions for use in
38 // operators. The main abstraction are parallel loops
39 // (ThreadPool::TryParallelFor*), although we also support scheduling
40 // of asynchronous tasks (ThreadPool::Schedule), and the construction
41 // of multi-loop parallel sections (ThreadPool::ParallelSection).
42 //
43 // This high level API is accessed via static methods on the
44 // ThreadPool class. These methods map the operations onto one of
45 // three low-level implementations: (#1) direct execution of the
46 // operations if there is no thread pool configured, (#2) execution of
47 // the operations using the modified Eigen threadpool, (#3) execution
48 // of the operations using OpenMP. Option #1 enables execution in
49 // simple settings without needing threads. Option #2 is the
50 // preferred approach for use in settings with parallelism.
51 //
52 // The high-level part of the thread pool is responsible for:
53 //
54 // - Exposing the desired degree of parallelism to user code, and to
55 // libraries such as MLAS. This lets the libraries tailor the
56 // extent to which they parallelize work.
57 //
58 // - Handling trivial cases (such as directly running parallel loops
59 // with only a single iteration, or with no iterations at all).
60 //
61 // - Deciding how to divide work efficiently between the threads
62 // available.
63 //
64 // The ThreadPool::TryParallelFor methods do this based on cost
65 // estimates supplied by the caller, and are designed to support
66 // loops with small amounts of work per iteration. The loop body is
67 // supplied as a function taking a [start,end) range of iterations
68 // to execute (avoiding the need for per-iteration std::function
69 // calls, or a reliance upon inlining to avoid those calls).
70 //
71 // ThreadPool::TrySimpleParallelFor uses a simpler single-iteration
72 // API based on the assumption that the caller has divided work to
73 // an appropriate granularity.
74 //
75 // - When used with the Eigen-based thread pool, the implementation of
76 // all of the loops maps down onto
77 // ThreadPool::ParallelForFixedBlockSizeScheduling. This method
78 // takes the degree of parallelism (d_of_p) and work distribution
79 // block size (from the cost-based heuristics), and creates a set of
80 // tasks in the underlying thread pool (via
81 // ThreadPool::RunInParallel).
82 //
83 // These tasks then run a loop which picks off batches of iterations
84 // from the user's code. The distribution of these batches is
85 // handled dynmamically via LoopCounter::ClaimIterations. This
86 // dynamic balancing behavior helps make performance robust to any
87 // variability in the execution time across iterations, and to
88 // situations such as multiple loops running concurrently on the
89 // same thread pool.
90 //
91 // - When running a series of loops inside a parallel section, the
92 // LoopCounter also helps obtain affinity between these loops (i.e.,
93 // iteration X of one loop will tend to run on the same thread that
94 // ran iteration X of prior loops). This locality helps improve hit
95 // rates in per-core caches across the series of short loops used in
96 // operators like GRU.
97 //
98 // There are some known areas for exploration here:
99 //
100 // - The cost-based heuristics were developed prior to recent changes
101 // to the thread pool. The heuristics seem to work well, but we
102 // should revisit the tuning periodically.
103 //
104 // - Can we unify the APIs for the different kinds of parallel loop?
105 //
106 // In particular, we may be able to replace the current use of
107 // TryBatchParallelFor with appropriate costs for each call site,
108 // and then use TryParallelFor. This would allow for more dynamic
109 // re-balancing of work between threads than the current
110 // ThreadPool::PartitionWork function provides.
111 //
112 // - Given the extensive modifications to original Eigen code, should
113 // we separate that out as a new class and remove the dependence on
114 // other Eigen components.
115 
116 // This file use PIMPL to avoid having eigen headers here
117 namespace Eigen {
118 class Allocator;
119 class ThreadPoolInterface;
120 } // namespace Eigen
121 
122 namespace onnxruntime {
123 
124 struct TensorOpCost {
125  double bytes_loaded;
126  double bytes_stored;
128 };
129 
130 namespace concurrency {
131 
132 template <typename Environment>
133 class ThreadPoolTempl;
134 
135 class ExtendedThreadPoolInterface;
136 class LoopCounter;
137 class ThreadPoolParallelSection;
138 
139 class ThreadPool {
140  public:
141 #ifdef _WIN32
142  using NAME_CHAR_TYPE = wchar_t;
143 #else
144  using NAME_CHAR_TYPE = char;
145 #endif
146  // Constructs a pool for running with with "degree_of_parallelism" threads with
147  // specified "name". env->StartThread() is used to create individual threads
148  // with the given ThreadOptions. If "low_latency_hint" is true the thread pool
149  // implementation may use it as a hint that lower latency is preferred at the
150  // cost of higher CPU usage, e.g. by letting one or more idle threads spin
151  // wait. Conversely, if the threadpool is used to schedule high-latency
152  // operations like I/O the hint should be set to false.
153  //
154  // REQUIRES: degree_of_parallelism > 0
155  ThreadPool(Env* env,
156  const ThreadOptions& thread_options,
157  const NAME_CHAR_TYPE* name,
158  int degree_of_parallelism,
159  bool low_latency_hint,
160  bool force_hybrid = false);
161 
162  // Waits until all scheduled work has finished and then destroy the
163  // set of threads.
164  ~ThreadPool();
165 
166  // Start and end a multi-loop parallel section. Parallel loops can
167  // be executed directly (without using this API), but entering a
168  // parallel section allows the runtime system to amortize loop
169  // entry/exit costs over multiple loops, and allows it to promote
170  // affinity between corresponding iterations of different loops.
171  //
172  // Multi-loop sections would typically be used in cases where a
173  // series of loops executes without much code in between them, and
174  // where it is impractical to refactor code into a single loop. For
175  // instance:
176  //
177  // {
178  // onnxruntime::concurrency::ThreadPoool::ParallelSection ps(tp);
179  // for (int x = 0; x < seq_len; x++) {
180  // TrySimpleParallelFor(tp, 16, [&]() { ... });
181  // }
182  // }
183  //
184  // The parallel section is entered via the constructor of
185  // ThreadPool::ParallelSection, and exited via the destructor.
186  // Currently, thread-local state is used to track whether or not the
187  // current thread is inside a parallel section. In contrast to
188  // handling parallel section objects explicitly in user code, this
189  // approach allows code such as MLAS to operate with/without the use
190  // of parallel sections.
191  //
192  // Parallel sections are only implemented with the Eigen threadpool.
193  // They have no effect when using OpenMP.
194  //
195  // Parallel sections may not be nested, and may not be used inside
196  // parallel loops.
197 
199  public:
200  explicit ParallelSection(ThreadPool* tp);
202 
203  private:
204  friend class ThreadPool;
205 
206  // Owning reference for the underlying ThreadPoolParallelSection
207  // which implements the thread management. We use an explicit
208  // deleter here so that the definition of
209  // ThreadPoolParallelSection does not need to be available at this
210  // point to avoid a dependence on the Eigen headers.
211  ThreadPoolParallelSection* ps_{nullptr};
212  ThreadPool* tp_;
214  };
215 
216  // The below API allows to disable spinning
217  // This is used to support real-time scenarios where
218  // spinning between relatively infrequent requests
219  // contributes to high CPU usage while not processing anything.
220  void EnableSpinning();
221 
222  void DisableSpinning();
223 
224  // Schedules fn() for execution in the pool of threads. The function may run
225  // synchronously if it cannot be enqueued. This will occur if the thread pool's
226  // degree-of-parallelism is 1, but it may also occur for implementation-dependent
227  // reasons such as if queues used for buffering work are full.
228  static void Schedule(ThreadPool* tp,
229  std::function<void()> fn) {
230  if (tp) {
231  tp->Schedule(fn);
232  } else {
233  fn();
234  }
235  }
236 
237  // ParallelFor shards the "total" units of work assuming each unit of work
238  // having roughly "cost_per_unit" cost, in cycles. Each unit of work is
239  // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work
240  // and the total cost of each shard is roughly the same.
241  //
242  // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds
243  // if not CPU-bound) to complete a unit of work. Overestimating creates too
244  // many shards and CPU time will be dominated by per-shard overhead, such as
245  // Context creation. Underestimating may not fully make use of the specified
246  // parallelism, and may also cause inefficiencies due to load balancing
247  // issues and stragglers.
248 
249  static void TryParallelFor(ThreadPool* tp, std::ptrdiff_t total, double cost_per_unit,
250  const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn) {
251  TryParallelFor(tp, total, TensorOpCost{0, 0, static_cast<double>(cost_per_unit)}, fn);
252  }
253 
254  static void TryParallelFor(ThreadPool* tp, std::ptrdiff_t total, const TensorOpCost& cost_per_unit,
255  const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn);
256 
257  // Directly schedule the 'total' tasks to the underlying threadpool, without
258  // cutting them by halves
259 
260  inline static void TrySimpleParallelFor(ThreadPool* tp, std::ptrdiff_t total,
261  const std::function<void(std::ptrdiff_t)>& fn) {
262  if (tp != nullptr) {
263  tp->SimpleParallelFor(total, fn);
264  } else {
265  for (std::ptrdiff_t i = 0; i < total; ++i) {
266  // In many cases, fn can be inlined here.
267  fn(i);
268  }
269  }
270  }
271 
272  /**
273  * Tries to call the given function in parallel, with calls split into (num_batches) batches.
274  *\param num_batches If it is zero, it will be replaced to the value of DegreeOfParallelism().
275  *\param fn A std::function or STL style functor with signature of "void f(std::ptrdiff_t);"
276  * Pitfall: Caller should cap `num_batches` to a reasonable value based on the cost of `fn` and the value of `total`.
277  *For example, if fn is as simple as: int sum=0; fn = [&](int i){sum +=i;} and `total` is 100, then num_batches should
278  *be just 1.
279  *
280  * ```
281  **/
282  template <typename F>
283  inline static void TryBatchParallelFor(ThreadPool* tp, std::ptrdiff_t total, F&& fn, std::ptrdiff_t num_batches) {
284  if (tp == nullptr) {
285  for (std::ptrdiff_t i = 0; i < total; ++i) {
286  // In many cases, fn can be inlined here.
287  fn(i);
288  }
289  return;
290  }
291  if (total <= 0)
292  return;
293 
294  if (total == 1) {
295  fn(0);
296  return;
297  }
298 
299  if (num_batches <= 0) {
300  num_batches = std::min<std::ptrdiff_t>(total, DegreeOfParallelism(tp));
301  }
302 
303  if (num_batches <= 1) {
304  for (int i = 0; i < total; i++) {
305  fn(i);
306  }
307  return;
308  }
309 
310  tp->SimpleParallelFor(num_batches, [&](std::ptrdiff_t batch_index) {
311  auto work = PartitionWork(batch_index, num_batches, total);
312  for (std::ptrdiff_t i = work.start; i < work.end; i++) {
313  fn(i);
314  }
315  });
316  }
317 
318  struct WorkInfo {
319  std::ptrdiff_t start{0};
320  std::ptrdiff_t end{0};
321  };
322 
323  /** Calculate the start and end offsets for a batch.
324  @remarks Based on MlasPartitionWork
325  */
326  constexpr static WorkInfo PartitionWork(std::ptrdiff_t batch_idx, std::ptrdiff_t num_batches, std::ptrdiff_t total_work) {
327  const std::ptrdiff_t work_per_batch = total_work / num_batches;
328  const std::ptrdiff_t work_per_batch_extra = total_work % num_batches;
329 
330  WorkInfo info;
331  if (batch_idx < work_per_batch_extra) {
332  info.start = (work_per_batch + 1) * batch_idx;
333  info.end = info.start + work_per_batch + 1;
334  } else {
335  info.start = work_per_batch * batch_idx + work_per_batch_extra;
336  info.end = info.start + work_per_batch;
337  }
338 
339  return info;
340  }
341 
342  //......................................................................
343  //
344  // The following static methods take into account whether OpenMP is
345  // enabled/disabled, and if the thread pool pointer is nullptr
346  // during sequential execution.
347 
348  // Provide a hint to the caller for whether or not to parallelize
349  // work. This lets a caller switch to a sequential version of an
350  // algorithm rather than using calls via the ParallelFor functions.
351 
352  static bool ShouldParallelize(const ThreadPool* tp);
353 
354  // Return the degree of parallelism that code should assume when using the thread pool.
355  // It decouples the degree of parallelism for use with the thread pool from
356  // the implementation choice of whether this matches the number of threads created in
357  // the pool.
358  //
359  // Currently, a loop with degree-of-parallelism N is supported by a pool of N-1 threads
360  // working in combination with the thread initiating the loop.
361  static int DegreeOfParallelism(const ThreadPool* tp);
362 
364 
365  // StartProfiling and StopProfiling are not to be consumed as public-facing API
366  static void StartProfiling(concurrency::ThreadPool* tp);
367  static std::string StopProfiling(concurrency::ThreadPool* tp);
368 
369  private:
370  friend class LoopCounter;
371 
372  // Returns the number of threads created in the pool. This may be different from the
373  // value returned by DegreeOfParallelism to code using the pool.
374  int NumThreads() const;
375 
376  // Returns current thread id between 0 and NumThreads() - 1, if called from a
377  // thread in the pool. Returns -1 otherwise.
378  int CurrentThreadId() const;
379 
380  // Run fn with up to n degree-of-parallelism enlisting the thread pool for
381  // help. The degree-of-parallelism includes the caller, and so if n==1
382  // then the function will run directly in the caller. The fork-join
383  // synchronization is handled in the thread pool, and so any state captured
384  // by fn() is safe from concurrent access once RunWithHelp returns.
385  void RunInParallel(std::function<void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size);
386 
387  // Divides the work represented by the range [0, total) into k shards.
388  // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k).
389  // Each shard may be executed on a different thread in parallel, depending on
390  // the number of threads available in the pool.
391  // When (i+1)*block_size > total, fn(i*block_size, total) is called instead.
392  // Requires 0 < block_size <= total.
393  void ParallelForFixedBlockSizeScheduling(std::ptrdiff_t total, std::ptrdiff_t block_size,
394  const std::function<void(std::ptrdiff_t, std::ptrdiff_t)>& fn);
395 
396  // Return whether or not the calling thread should run a loop of
397  // num_iterations divided in chunks of block_size in parallel. If not,
398  // the caller should run the loop sequentially.
399  bool ShouldParallelizeLoop(const std::ptrdiff_t num_iterations,
400  const std::ptrdiff_t block_size = 1) const;
401 
402  // Internal (non-static) parallel loop methods. Unlike the public static methods,
403  // these will not handle the cases of OpenMP builds. or builds without a threadpool.
404  void ParallelFor(std::ptrdiff_t total, double cost_per_unit,
405  const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn);
406 
407  void ParallelFor(std::ptrdiff_t total, const TensorOpCost& cost_per_unit,
408  const std::function<void(std::ptrdiff_t first, std::ptrdiff_t)>& fn);
409 
410  void SimpleParallelFor(std::ptrdiff_t total, const std::function<void(std::ptrdiff_t)>& fn);
411 
412  void Schedule(std::function<void()> fn);
413 
414  void StartProfiling();
415 
417 
418  ThreadOptions thread_options_;
419 
420  // If a thread pool is created with degree_of_parallelism != 1 then an underlying
421  // EigenThreadPool is used to create OS threads and handle work distribution to them.
422  // If degree_of_parallelism == 1 then underlying_threadpool_ is left as nullptr
423  // and parallel work is run directly by the caller.
424  ExtendedThreadPoolInterface* underlying_threadpool_ = nullptr;
425 
426  // If used, underlying_threadpool_ is instantiated and owned by the ThreadPool.
427  std::unique_ptr<ThreadPoolTempl<Env> > extended_eigen_threadpool_;
428 
429  // Force the thread pool to run in hybrid mode on a normal cpu.
430  bool force_hybrid_ = false;
431 };
432 
433 } // namespace concurrency
434 } // namespace onnxruntime
GLint first
Definition: glcorearb.h:405
GLuint start
Definition: glcorearb.h:475
GLsizei const GLchar *const * string
Definition: glcorearb.h:814
static void TrySimpleParallelFor(ThreadPool *tp, std::ptrdiff_t total, const std::function< void(std::ptrdiff_t)> &fn)
Definition: threadpool.h:260
static void Schedule(ThreadPool *tp, std::function< void()> fn)
Definition: threadpool.h:228
static void TryBatchParallelFor(ThreadPool *tp, std::ptrdiff_t total, F &&fn, std::ptrdiff_t num_batches)
Definition: threadpool.h:283
ThreadPool(Env *env, const ThreadOptions &thread_options, const NAME_CHAR_TYPE *name, int degree_of_parallelism, bool low_latency_hint, bool force_hybrid=false)
static bool ShouldParallelize(const ThreadPool *tp)
static void TryParallelFor(ThreadPool *tp, std::ptrdiff_t total, double cost_per_unit, const std::function< void(std::ptrdiff_t first, std::ptrdiff_t last)> &fn)
Definition: threadpool.h:249
GLdouble n
Definition: glcorearb.h:2008
GLuint GLuint end
Definition: glcorearb.h:475
#define ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(TypeName)
Definition: common.h:219
GLuint const GLchar * name
Definition: glcorearb.h:786
static int DegreeOfParallelism(const ThreadPool *tp)
static std::string StopProfiling(concurrency::ThreadPool *tp)
__hostdev__ uint64_t last(uint32_t i) const
Definition: NanoVDB.h:5976
static void StartProfiling(concurrency::ThreadPool *tp)
static constexpr WorkInfo PartitionWork(std::ptrdiff_t batch_idx, std::ptrdiff_t num_batches, std::ptrdiff_t total_work)
Definition: threadpool.h:326