HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread.h
Go to the documentation of this file.
1 // Copyright 2008-present Contributors to the OpenImageIO project.
2 // SPDX-License-Identifier: BSD-3-Clause
3 // https://github.com/OpenImageIO/oiio
4 
5 // clang-format off
6 
7 /////////////////////////////////////////////////////////////////////////
8 /// @file thread.h
9 ///
10 /// @brief Wrappers and utilities for multithreading.
11 /////////////////////////////////////////////////////////////////////////
12 
13 
14 #pragma once
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <chrono>
19 #include <functional>
20 #include <future>
21 #include <iostream>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25 #include <vector>
26 
27 #include <OpenImageIO/atomic.h>
28 #include <OpenImageIO/dassert.h>
29 #include <OpenImageIO/export.h>
31 #include <OpenImageIO/platform.h>
32 
33 
34 
35 // OIIO_THREAD_ALLOW_DCLP, if set to 0, prevents us from using a dodgy
36 // "double checked lock pattern" (DCLP). We are very careful to construct
37 // it safely and correctly, and these uses improve thread performance for
38 // us. But it confuses Thread Sanitizer, so this switch allows you to turn
39 // it off. Also set to 0 if you don't believe that we are correct in
40 // allowing this construct on all platforms.
41 #ifndef OIIO_THREAD_ALLOW_DCLP
42 # define OIIO_THREAD_ALLOW_DCLP 1
43 #endif
44 
45 
46 
47 // Some helpful links:
48 //
49 // Descriptions of the "new" gcc atomic intrinsics:
50 // https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
51 // Old gcc atomic intrinsics:
52 // https://gcc.gnu.org/onlinedocs/gcc-4.4.2/gcc/Atomic-Builtins.html
53 // C++11 and beyond std::atomic:
54 // http://en.cppreference.com/w/cpp/atomic
55 
56 
57 
59 
60 /// Null mutex that can be substituted for a real one to test how much
61 /// overhead is associated with a particular mutex.
62 class null_mutex {
63 public:
64  null_mutex() noexcept {}
65  ~null_mutex() noexcept {}
66  void lock() noexcept {}
67  void unlock() noexcept {}
68  void lock_shared() noexcept {}
69  void unlock_shared() noexcept {}
70  bool try_lock() noexcept { return true; }
71 };
72 
73 /// Null lock that can be substituted for a real one to test how much
74 /// overhead is associated with a particular lock.
75 template<typename T> class null_lock {
76 public:
77  null_lock(T& /*m*/) noexcept {}
78 };
79 
80 
81 
82 using std::mutex;
83 using std::recursive_mutex;
84 using std::thread;
85 typedef std::lock_guard<mutex> lock_guard;
86 typedef std::lock_guard<recursive_mutex> recursive_lock_guard;
87 
88 
89 
90 /// Yield the processor for the rest of the timeslice.
91 /// DEPRECATED(2.4): Use std::this_thread::yield() instead.
92 inline void
93 yield() noexcept
94 {
96 }
97 
98 
99 
100 // Slight pause
101 inline void
102 pause(int delay) noexcept
103 {
104 #if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__))
105  for (int i = 0; i < delay; ++i)
106  __asm__ __volatile__("pause;");
107 
108 #elif defined(__GNUC__) && (defined(__arm__) || defined(__s390__))
109  for (int i = 0; i < delay; ++i)
110  __asm__ __volatile__("NOP;");
111 
112 #elif defined(_MSC_VER)
113  for (int i = 0; i < delay; ++i) {
114 # if defined(_WIN64)
115  YieldProcessor();
116 # else
117  _asm pause
118 # endif /* _WIN64 */
119  }
120 
121 #else
122  // No pause on this platform, just punt
123  for (int i = 0; i < delay; ++i)
124  ;
125 #endif
126 }
127 
128 
129 
130 // Helper class to deliver ever longer pauses until we yield our timeslice.
132 public:
133  atomic_backoff(int pausemax = 16) noexcept
134  : m_count(1)
135  , m_pausemax(pausemax)
136  {
137  }
138 
139  void operator()() noexcept
140  {
141  if (m_count <= m_pausemax) {
142  pause(m_count);
143  m_count *= 2;
144  } else {
146  }
147  }
148 
149 private:
150  int m_count;
151  int m_pausemax;
152 };
153 
154 
155 
156 
157 /// A spin_mutex is semantically equivalent to a regular mutex, except
158 /// for the following:
159 /// - A spin_mutex is just 1 byte, whereas a regular mutex is quite
160 /// large (44 bytes for pthread).
161 /// - A spin_mutex is extremely fast to lock and unlock, whereas a regular
162 /// mutex is surprisingly expensive just to acquire a lock.
163 /// - A spin_mutex takes CPU while it waits, so this can be very
164 /// wasteful compared to a regular mutex that blocks (gives up its
165 /// CPU slices until it acquires the lock).
166 ///
167 /// The bottom line is that mutex is the usual choice, but in cases where
168 /// you need to acquire locks very frequently, but only need to hold the
169 /// lock for a very short period of time, you may save runtime by using
170 /// a spin_mutex, even though it's non-blocking.
171 ///
172 /// N.B. A spin_mutex is only the size of a bool. To avoid "false
173 /// sharing", be careful not to put two spin_mutex objects on the same
174 /// cache line (within 128 bytes of each other), or the two mutexes may
175 /// effectively (and wastefully) lock against each other.
176 ///
177 class spin_mutex {
178 public:
179  spin_mutex(void) noexcept {}
180  ~spin_mutex(void) noexcept {}
181 
182  /// Copy constructor -- initialize to unlocked.
183  ///
184  spin_mutex(const spin_mutex&) noexcept {}
185 
186  /// Assignment does not do anything, since lockedness should not
187  /// transfer.
188  const spin_mutex& operator=(const spin_mutex&) noexcept { return *this; }
189 
190  /// Acquire the lock, spin until we have it.
191  ///
192  void lock() noexcept
193  {
194  // To avoid spinning too tightly, we use the atomic_backoff to
195  // provide increasingly longer pauses, and if the lock is under
196  // lots of contention, eventually yield the timeslice.
197  atomic_backoff backoff;
198 
199  // Try to get ownership of the lock. Though experimentation, we
200  // found that OIIO_UNLIKELY makes this just a bit faster on gcc
201  // x86/x86_64 systems.
202  while (!OIIO_UNLIKELY(try_lock())) {
203 #if OIIO_THREAD_ALLOW_DCLP
204  // The full try_lock() involves a test_and_set, which
205  // writes memory, and that will lock the bus. But a normal
206  // read of m_locked will let us spin until the value
207  // changes, without locking the bus. So it's faster to
208  // check in this manner until the mutex appears to be free.
209  // HOWEVER... Thread Sanitizer things this is an instance of
210  // an unsafe "double checked lock pattern" (DCLP) and flags it
211  // as an error. I think it's a false negative, because the
212  // outer loop is still an atomic check, the inner non-atomic
213  // loop only serves to delay, and can't lead to a true data
214  // race. But we provide this build-time switch to, at least,
215  // give a way to use tsan for other checks.
216  do {
217  backoff();
218  } while (*(volatile bool*)&m_locked);
219 #else
220  backoff();
221 #endif
222  }
223  }
224 
225  /// Release the lock that we hold.
226  ///
227  void unlock() noexcept
228  {
229  // Fastest way to do it is with a clear with "release" semantics
230  m_locked.clear(std::memory_order_release);
231  }
232 
233  /// Try to acquire the lock. Return true if we have it, false if
234  /// somebody else is holding the lock.
235  bool try_lock() noexcept
236  {
237  return !m_locked.test_and_set(std::memory_order_acquire);
238  }
239 
240  /// Helper class: scoped lock for a spin_mutex -- grabs the lock upon
241  /// construction, releases the lock when it exits scope.
242  class lock_guard {
243  public:
244  lock_guard(spin_mutex& fm) noexcept
245  : m_fm(fm)
246  {
247  m_fm.lock();
248  }
249  ~lock_guard() noexcept { m_fm.unlock(); }
250 
251  private:
252  lock_guard() = delete;
253  lock_guard(const lock_guard& other) = delete;
254  lock_guard& operator=(const lock_guard& other) = delete;
255  spin_mutex& m_fm;
256  };
257 
258 private:
259  std::atomic_flag m_locked = ATOMIC_FLAG_INIT; // initialize to unlocked
260 };
261 
262 
264 
265 
266 
267 #if 0
268 
269 // OLD CODE vvvvvvvv
270 
271 
272 /// Spinning reader/writer mutex. This is just like spin_mutex, except
273 /// that there are separate locking mechanisms for "writers" (exclusive
274 /// holders of the lock, presumably because they are modifying whatever
275 /// the lock is protecting) and "readers" (non-exclusive, non-modifying
276 /// tasks that may access the protectee simultaneously).
277 class spin_rw_mutex {
278 public:
279  /// Default constructor -- initialize to unlocked.
280  ///
281  spin_rw_mutex (void) { m_readers = 0; }
282 
283  ~spin_rw_mutex (void) { }
284 
285  /// Copy constructor -- initialize to unlocked.
286  ///
287  spin_rw_mutex (const spin_rw_mutex &) { m_readers = 0; }
288 
289  /// Assignment does not do anything, since lockedness should not
290  /// transfer.
291  const spin_rw_mutex& operator= (const spin_rw_mutex&) { return *this; }
292 
293  /// Acquire the reader lock.
294  ///
295  void read_lock () {
296  // Spin until there are no writers active
297  m_locked.lock();
298  // Register ourself as a reader
299  ++m_readers;
300  // Release the lock, to let other readers work
301  m_locked.unlock();
302  }
303 
304  /// Release the reader lock.
305  ///
306  void read_unlock () {
307  --m_readers; // it's atomic, no need to lock to release
308  }
309 
310  /// Acquire the writer lock.
311  ///
312  void write_lock () {
313  // Make sure no new readers (or writers) can start
314  m_locked.lock();
315  // Spin until the last reader is done, at which point we will be
316  // the sole owners and nobody else (reader or writer) can acquire
317  // the resource until we release it.
318 #if OIIO_THREAD_ALLOW_DCLP
319  while (*(volatile int *)&m_readers > 0)
320  ;
321 #else
322  while (m_readers > 0)
323  ;
324 #endif
325  }
326 
327  /// Release the writer lock.
328  ///
329  void write_unlock () {
330  // Let other readers or writers get the lock
331  m_locked.unlock ();
332  }
333 
334  /// Acquire an exclusive ("writer") lock.
335  void lock () { write_lock(); }
336 
337  /// Release an exclusive ("writer") lock.
338  void unlock () { write_unlock(); }
339 
340  /// Acquire a shared ("reader") lock.
341  void lock_shared () { read_lock(); }
342 
343  /// Release a shared ("reader") lock.
344  void unlock_shared () { read_unlock(); }
345 
346  /// Helper class: scoped read lock for a spin_rw_mutex -- grabs the
347  /// read lock upon construction, releases the lock when it exits scope.
348  class read_lock_guard {
349  public:
350  read_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.read_lock(); }
351  ~read_lock_guard () { m_fm.read_unlock(); }
352  private:
353  read_lock_guard(); // Do not implement
354  read_lock_guard(const read_lock_guard& other); // Do not implement
355  read_lock_guard& operator = (const read_lock_guard& other); // Do not implement
356  spin_rw_mutex & m_fm;
357  };
358 
359  /// Helper class: scoped write lock for a spin_rw_mutex -- grabs the
360  /// read lock upon construction, releases the lock when it exits scope.
361  class write_lock_guard {
362  public:
363  write_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.write_lock(); }
364  ~write_lock_guard () { m_fm.write_unlock(); }
365  private:
366  write_lock_guard(); // Do not implement
367  write_lock_guard(const write_lock_guard& other); // Do not implement
368  write_lock_guard& operator = (const write_lock_guard& other); // Do not implement
369  spin_rw_mutex & m_fm;
370  };
371 
372 private:
374  spin_mutex m_locked; // write lock
375  char pad1_[OIIO_CACHE_LINE_SIZE-sizeof(spin_mutex)];
377  atomic_int m_readers; // number of readers
378  char pad2_[OIIO_CACHE_LINE_SIZE-sizeof(atomic_int)];
379 };
380 
381 
382 #else
383 
384 // vvv New spin rw lock Oct 2017
385 
386 /// Spinning reader/writer mutex. This is just like spin_mutex, except
387 /// that there are separate locking mechanisms for "writers" (exclusive
388 /// holders of the lock, presumably because they are modifying whatever
389 /// the lock is protecting) and "readers" (non-exclusive, non-modifying
390 /// tasks that may access the protectee simultaneously).
392 public:
393  /// Default constructor -- initialize to unlocked.
394  ///
395  spin_rw_mutex() noexcept {}
396 
397  ~spin_rw_mutex() noexcept {}
398 
399  // Do not allow copy or assignment.
400  spin_rw_mutex(const spin_rw_mutex&) = delete;
401  const spin_rw_mutex& operator=(const spin_rw_mutex&) = delete;
402 
403  /// Acquire the reader lock.
404  ///
405  void read_lock() noexcept
406  {
407  // first increase the readers, and if it turned out nobody was
408  // writing, we're done. This means that acquiring a read when nobody
409  // is writing is a single atomic operation.
410  int oldval = m_bits.fetch_add(1, std::memory_order_acquire);
411  if (!(oldval & WRITER))
412  return;
413  // Oops, we incremented readers but somebody was writing. Backtrack
414  // by subtracting, and do things the hard way.
415  int expected = (--m_bits) & NOTWRITER;
416 
417  // Do compare-and-exchange until we can increase the number of
418  // readers by one and have no writers.
419  if (m_bits.compare_exchange_weak(expected, expected + 1,
420  std::memory_order_acquire))
421  return;
422  atomic_backoff backoff;
423  do {
424  backoff();
425  expected = m_bits.load() & NOTWRITER;
426  } while (!m_bits.compare_exchange_weak(expected, expected + 1,
427  std::memory_order_acquire));
428  }
429 
430  /// Release the reader lock.
431  ///
432  void read_unlock() noexcept
433  {
434  // Atomically reduce the number of readers. It's at least 1,
435  // and the WRITER bit should definitely not be set, so this just
436  // boils down to an atomic decrement of m_bits.
437  m_bits.fetch_sub(1, std::memory_order_release);
438  }
439 
440  /// Acquire the writer lock.
441  ///
442  void write_lock() noexcept
443  {
444  // Do compare-and-exchange until we have just ourselves as writer
445  int expected = 0;
446  if (m_bits.compare_exchange_weak(expected, WRITER,
447  std::memory_order_acquire))
448  return;
449  atomic_backoff backoff;
450  do {
451  backoff();
452  expected = 0;
453  } while (!m_bits.compare_exchange_weak(expected, WRITER,
454  std::memory_order_acquire));
455  }
456 
457  /// Release the writer lock.
458  ///
459  void write_unlock() noexcept
460  {
461  // Remove the writer bit
462  m_bits.fetch_sub(WRITER, std::memory_order_release);
463  }
464 
465  /// lock() is a synonym for exclusive (write) lock.
466  void lock() { write_lock(); }
467 
468  /// unlock() is a synonym for exclusive (write) unlock.
469  void unlock() { write_unlock(); }
470 
471  /// Helper class: scoped read lock for a spin_rw_mutex -- grabs the
472  /// read lock upon construction, releases the lock when it exits scope.
474  public:
475  read_lock_guard (spin_rw_mutex &fm) noexcept : m_fm(fm) { m_fm.read_lock(); }
476  ~read_lock_guard () noexcept { m_fm.read_unlock(); }
477  private:
478  read_lock_guard(const read_lock_guard& other) = delete;
479  read_lock_guard& operator = (const read_lock_guard& other) = delete;
480  spin_rw_mutex & m_fm;
481  };
482 
483  /// Helper class: scoped write lock for a spin_rw_mutex -- grabs the
484  /// read lock upon construction, releases the lock when it exits scope.
486  public:
487  write_lock_guard (spin_rw_mutex &fm) noexcept : m_fm(fm) { m_fm.write_lock(); }
488  ~write_lock_guard () noexcept { m_fm.write_unlock(); }
489  private:
490  write_lock_guard(const write_lock_guard& other) = delete;
491  write_lock_guard& operator = (const write_lock_guard& other) = delete;
492  spin_rw_mutex & m_fm;
493  };
494 
495 private:
496  // Use one word to hold the reader count, with a high bit indicating
497  // that it's locked for writing. This will only work if we have
498  // fewer than 2^30 simultaneous readers. I think that should hold
499  // us for some time.
500  enum { WRITER = 1<<30, NOTWRITER = WRITER-1 };
501  std::atomic<int> m_bits { 0 };
502 };
503 
504 #endif
505 
506 
509 
510 
511 
512 /// Mutex pool. Sometimes, we have lots of objects that need to be
513 /// individually locked for thread safety, but two separate objects don't
514 /// need to lock against each other. If there are many more objects than
515 /// threads, it's wasteful for each object to contain its own mutex. So a
516 /// solution is to make a mutex_pool -- a collection of several mutexes.
517 /// Each object uses a hash to choose a consistent mutex for itself, but
518 /// which will be unlikely to be locked simultaneously by different object.
519 /// Semantically, it looks rather like an associative array of mutexes. We
520 /// also ensure that the mutexes are all on different cache lines, to ensure
521 /// that they don't exhibit false sharing. Try to choose Bins larger than
522 /// the expected number of threads that will be simultaneously locking
523 /// mutexes.
524 template<class Mutex, class Key, class Hash, size_t Bins = 16>
525 class mutex_pool {
526 public:
527  mutex_pool() noexcept {}
528  Mutex& operator[](const Key& key) noexcept { return m_mutex[m_hash(key) % Bins].m; }
529 
530 private:
531  // Helper type -- force cache line alignment. This should make an array
532  // of these also have padding so that each individual mutex is aligned
533  // to its own cache line, thus eliminating any "false sharing."
534  struct AlignedMutex {
536  };
537 
538  AlignedMutex m_mutex[Bins];
539  Hash m_hash;
540 };
541 
542 
543 
544 /// Simple thread group class: lets you spawn a group of new threads,
545 /// then wait for them to all complete.
547 public:
550 
552  {
553  if (t) {
554  lock_guard lock(m_mutex);
555  m_threads.emplace_back(t);
556  }
557  }
558 
559  template<typename FUNC, typename... Args>
560  thread* create_thread(FUNC func, Args&&... args)
561  {
562  thread* t = new thread(func, std::forward<Args>(args)...);
563  add_thread(t);
564  return t;
565  }
566 
567  void join_all()
568  {
569  lock_guard lock(m_mutex);
570  for (auto& t : m_threads)
571  if (t->joinable())
572  t->join();
573  }
574 
575  size_t size() const
576  {
577  lock_guard lock(m_mutex);
578  return m_threads.size();
579  }
580 
581 private:
582  mutable mutex m_mutex;
583  std::vector<std::unique_ptr<thread>> m_threads;
584 };
585 
586 
587 
588 /// thread_pool is a persistent set of threads watching a queue to which
589 /// tasks can be submitted.
590 ///
591 /// Call default_thread_pool() to retrieve a pointer to a single shared
592 /// thread_pool that will be initialized the first time it's needed, running
593 /// a number of threads corresponding to the number of cores on the machine.
594 ///
595 /// It's possible to create other pools, but it's not something that's
596 /// recommended unless you really know what you're doing and are careful
597 /// that the sum of threads across all pools doesn't cause you to be highly
598 /// over-threaded. An example of when this might be useful is if you want
599 /// one pool of 4 threads to handle I/O without interference from a separate
600 /// pool of 4 (other) threads handling computation.
601 ///
602 /// Submitting an asynchronous task to the queue follows the following
603 /// pattern:
604 ///
605 /// /* func that takes a thread ID followed possibly by more args */
606 /// result_t my_func (int thread_id, Arg1 arg1, ...) { }
607 /// pool->push (my_func, arg1, ...);
608 ///
609 /// If you just want to "fire and forget", then:
610 ///
611 /// pool->push (func, ...args...);
612 ///
613 /// But if you need a result, or simply need to know when the task has
614 /// completed, note that the push() method will return a future<result_t>
615 /// that you can check, like this:
616 ///
617 /// std::future<result_t> f = pool->push (my_task);
618 ///
619 /// And then you can
620 ///
621 /// find out if it's done: if (f.valid()) ...
622 /// wait for it to get done: f.wait();
623 /// get result (waiting if necessary): result_t r = f.get();
624 ///
625 /// A common idiom is to fire a bunch of sub-tasks at the queue, and then
626 /// wait for them to all complete. We provide a helper class, task_set,
627 /// to make this easy:
628 ///
629 /// task_set tasks (pool);
630 /// for (int i = 0; i < n_subtasks; ++i)
631 /// tasks.push (pool->push (myfunc));
632 /// tasks.wait ();
633 ///
634 /// Note that the tasks.wait() is optional -- it will be called
635 /// automatically when the task_set exits its scope.
636 ///
637 /// The task function's first argument, the thread_id, is the thread number
638 /// for the pool, or -1 if it's being executed by a non-pool thread (this
639 /// can happen in cases where the whole pool is occupied and the calling
640 /// thread contributes to running the work load).
641 ///
642 /// Thread pool. Have fun, be safe.
643 ///
645 public:
646  /// Initialize the pool. This implicitly calls resize() to set the
647  /// number of worker threads, defaulting to a number of workers that is
648  /// one less than the number of hardware cores.
649  thread_pool(int nthreads = -1);
650  ~thread_pool();
651 
652  /// How many threads are in the pool?
653  int size() const;
654 
655  /// Sets the number of worker threads in the pool. If the pool size is
656  /// 0, any tasks added to the pool will be executed immediately by the
657  /// calling thread. Requesting nthreads < 0 will cause it to resize to
658  /// the number of hardware cores minus one (one less, to account for the
659  /// fact that the calling thread will also contribute). BEWARE! Resizing
660  /// the queue should not be done while jobs are running.
661  void resize(int nthreads = -1);
662 
663  /// Return the number of currently idle threads in the queue. Zero
664  /// means the queue is fully engaged.
665  int idle() const;
666 
667  /// Run the user's function that accepts argument int - id of the
668  /// running thread. The returned value is templatized std::future, where
669  /// the user can get the result and rethrow any exceptions. If the queue
670  /// has no worker threads, the task will be run immediately by the
671  /// calling thread.
672  template<typename F> auto push(F&& f) -> std::future<decltype(f(0))>
673  {
674  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(
675  std::forward<F>(f));
676  if (size() < 1) {
677  (*pck)(-1); // No worker threads, run it with the calling thread
678  } else {
679  auto _f = new std::function<void(int id)>(
680  [pck](int id) { (*pck)(id); });
681  push_queue_and_notify(_f);
682  }
683  return pck->get_future();
684  }
685 
686  /// Run the user's function that accepts an arbitrary set of arguments
687  /// (also passed). The returned value is templatized std::future, where
688  /// the user can get the result and rethrow any exceptions. If the queue
689  /// has no worker threads, the task will be run immediately by the
690  /// calling thread.
691  template<typename F, typename... Rest>
692  auto push (F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
693  auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
694  std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
695  );
696  if (size() < 1) {
697  (*pck)(-1); // No worker threads, run it with the calling thread
698  } else {
699  auto _f = new std::function<void(int id)>([pck](int id) {
700  (*pck)(id);
701  });
702  push_queue_and_notify (_f);
703  }
704  return pck->get_future();
705  }
706 
707  /// If there are any tasks on the queue, pull one off and run it (on
708  /// this calling thread) and return true. Otherwise (there are no
709  /// pending jobs), return false immediately. This utility is what makes
710  /// it possible for non-pool threads to also run tasks from the queue
711  /// when they would ordinarily be idle. The thread id of the caller
712  /// should be passed.
713  bool run_one_task(std::thread::id id);
714 
715  /// Return true if the calling thread is part of the thread pool. This
716  /// can be used to limit a pool thread from unadvisedly adding its own
717  /// subtasks to clog up the pool.
718  /// DEPRECATED(2.1) -- use is_worker() instead.
719  bool this_thread_is_in_pool() const;
720 
721  /// Register a thread (not already in the thread pool itself) as working
722  /// on tasks in the pool. This is used to avoid recursion.
723  void register_worker(std::thread::id id);
724  /// De-register a thread, saying it is no longer in the process of
725  /// taking work from the thread pool.
726  void deregister_worker(std::thread::id id);
727  /// Is the thread in the pool or currently engaged in taking tasks from
728  /// the pool?
729  bool is_worker(std::thread::id id) const;
730  bool is_worker() const { return is_worker(std::this_thread::get_id()); }
731  // Non-const versions: DEPRECATED(2.1)
732  bool is_worker(std::thread::id id);
733 
734  /// How many jobs are waiting to run? (Use with caution! Can be out of
735  /// date by the time you look at it.)
736  size_t jobs_in_queue() const;
737 
738  /// Is the pool very busy? Meaning that there are significantly more
739  /// tasks in the queue waiting to run than there are threads in the
740  /// pool. It may be wise for a caller to check this before submitting
741  /// tasks -- if the queue is very busy, it's probably more expedient to
742  /// execute the code directly rather than add it to an oversubscribed
743  /// queue.
744  bool very_busy() const;
745 
746 private:
747  // Disallow copy construction and assignment
748  thread_pool(const thread_pool&) = delete;
749  thread_pool(thread_pool&&) = delete;
750  thread_pool& operator=(const thread_pool&) = delete;
751  thread_pool& operator=(thread_pool&&) = delete;
752 
753  // PIMPL pattern hides all the guts far away from the public API
754  class Impl;
755  std::unique_ptr<Impl> m_impl;
756 
757  // Utility function that helps us hide the implementation
758  void push_queue_and_notify(std::function<void(int id)>* f);
759 };
760 
761 
762 
763 /// Return a reference to the "default" shared thread pool. In almost all
764 /// ordinary circumstances, you should use this exclusively to get a
765 /// single shared thread pool, since creating multiple thread pools
766 /// could result in hilariously over-threading your application.
768 
769 
770 
771 /// task_set is a group of future<void>'s from a thread_queue that you can
772 /// add to, and when you either call wait() or just leave the task_set's
773 /// scope, it will wait for all the tasks in the set to be done before
774 /// proceeding.
775 ///
776 /// A typical idiom for using this is:
777 ///
778 /// void myfunc (int id) { ... do something ... }
779 ///
780 /// thread_pool* pool (default_thread_pool());
781 /// {
782 /// task_set tasks (pool);
783 /// // Launch a bunch of tasks into the thread pool
784 /// for (int i = 0; i < ntasks; ++i)
785 /// tasks.push (pool->push (myfunc));
786 /// // The following brace, by ending the scope of 'tasks', will
787 /// // wait for all those queue tasks to finish.
788 /// }
789 ///
791 public:
793  : m_pool(pool ? pool : default_thread_pool())
794  , m_submitter_thread(std::this_thread::get_id())
795  {
796  }
797  ~task_set() { wait(); }
798 
799  task_set(const task_set&) = delete;
800  const task_set& operator=(const task_set&) = delete;
801 
802  // Return the thread id of the thread that set up this task_set and
803  // submitted its tasks to the thread pool.
804  std::thread::id submitter() const { return m_submitter_thread; }
805 
806  // Save a future (presumably returned by a threadpool::push() as part
807  // of this task set.
808  void push(std::future<void>&& f)
809  {
810  OIIO_DASSERT(
811  std::this_thread::get_id() == submitter()
812  && "All tasks in a tast_set should be added by the same thread");
813  m_futures.emplace_back(std::move(f));
814  }
815 
816  // Wait for the given taskindex (0..n-1, where n is the number of tasks
817  // submitted as part of this task_set). If block == true, fully block
818  // while waiting for that task to finish. If block is false, then busy
819  // wait, and opportunistically run queue tasks yourself while you are
820  // waiting for the task to finish.
821  void wait_for_task(size_t taskindex, bool block = false);
822 
823  // Wait for all tasks in the set to finish. If block == true, fully
824  // block while waiting for the pool threads to all finish. If block is
825  // false, then busy wait, and opportunistically run queue tasks yourself
826  // while you are waiting for other tasks to finish.
827  void wait(bool block = false);
828 
829  // Debugging sanity check, called after wait(), to ensure that all the
830  // tasks were completed.
831  void check_done()
832  {
833  const std::chrono::milliseconds wait_time(0);
834  for (auto&& f : m_futures)
835  OIIO_ASSERT(f.wait_for(wait_time) == std::future_status::ready);
836  }
837 
838 private:
839  thread_pool* m_pool;
840  std::thread::id m_submitter_thread;
841  std::vector<std::future<void>> m_futures;
842 };
843 
844 
void lock() noexcept
Definition: thread.h:66
null_mutex() noexcept
Definition: thread.h:64
typedef int(APIENTRYP RE_PFNGLXSWAPINTERVALSGIPROC)(int)
void add_thread(thread *t)
Definition: thread.h:551
#define OIIO_ASSERT(x)
Definition: dassert.h:32
std::lock_guard< mutex > lock_guard
Definition: thread.h:85
OIIO_UTIL_API thread_pool * default_thread_pool()
STATIC_INLINE size_t Hash(const char *s, size_t len)
Definition: farmhash.h:2038
null_lock(T &) noexcept
Definition: thread.h:77
spin_rw_mutex::write_lock_guard spin_rw_write_lock
Definition: thread.h:508
std::mutex Mutex
void unlock() noexcept
Definition: thread.h:227
Mutex & operator[](const Key &key) noexcept
Definition: thread.h:528
spin_mutex::lock_guard spin_lock
Definition: thread.h:263
atomic_backoff(int pausemax=16) noexcept
Definition: thread.h:133
void lock() noexcept
Definition: thread.h:192
#define OIIO_UTIL_API
Definition: export.h:71
thread_group()
Definition: thread.h:548
~spin_mutex(void) noexcept
Definition: thread.h:180
void join_all()
Definition: thread.h:567
void unlock_shared() noexcept
Definition: thread.h:69
bool try_lock() noexcept
Definition: thread.h:70
atomic< int > atomic_int
Definition: atomic.h:25
GLfloat f
Definition: glcorearb.h:1926
void unlock()
unlock() is a synonym for exclusive (write) unlock.
Definition: thread.h:469
~null_mutex() noexcept
Definition: thread.h:65
lock_guard(spin_mutex &fm) noexcept
Definition: thread.h:244
write_lock_guard(spin_rw_mutex &fm) noexcept
Definition: thread.h:487
bool try_lock() noexcept
Definition: thread.h:235
void write_unlock() noexcept
Definition: thread.h:459
#define OIIO_DASSERT
Definition: dassert.h:55
bool is_worker() const
Definition: thread.h:730
void read_unlock() noexcept
Definition: thread.h:432
~spin_rw_mutex() noexcept
Definition: thread.h:397
spin_mutex(const spin_mutex &) noexcept
Definition: thread.h:184
GLuint id
Definition: glcorearb.h:655
void operator()() noexcept
Definition: thread.h:139
~task_set()
Definition: thread.h:797
task_set(thread_pool *pool=nullptr)
Definition: thread.h:792
void write_lock() noexcept
Definition: thread.h:442
Wrappers and utilities for atomics.
GLdouble t
Definition: glad.h:2397
mutex_pool() noexcept
Definition: thread.h:527
void lock_shared() noexcept
Definition: thread.h:68
size_t size() const
Definition: thread.h:575
*tasks wait()
~lock_guard() noexcept
Definition: thread.h:249
void unlock() noexcept
Definition: thread.h:67
GLsizeiptr size
Definition: glcorearb.h:664
**Note that the tasks the is the thread number *for the or if it s being executed by a non pool thread(this *can happen in cases where the whole pool is occupied and the calling *thread contributes to running the work load).**Thread pool.Have fun
GLenum func
Definition: glcorearb.h:783
spin_rw_mutex() noexcept
Definition: thread.h:395
#define OIIO_CACHE_ALIGN
Definition: platform.h:361
ImageBuf OIIO_API resize(const ImageBuf &src, string_view filtername="", float filterwidth=0.0f, ROI roi={}, int nthreads=0)
#define OIIO_UNLIKELY(x)
Definition: platform.h:380
void push(std::future< void > &&f)
Definition: thread.h:808
void lock()
lock() is a synonym for exclusive (write) lock.
Definition: thread.h:466
auto push(F &&f, Rest &&...rest) -> std::future< decltype(f(0, rest...))>
Definition: thread.h:692
LeafData & operator=(const LeafData &)=delete
const spin_rw_mutex & operator=(const spin_rw_mutex &)=delete
~thread_group()
Definition: thread.h:549
std::thread::id submitter() const
Definition: thread.h:804
void yield() noexcept
Definition: thread.h:93
std::lock_guard< recursive_mutex > recursive_lock_guard
Definition: thread.h:86
spin_mutex(void) noexcept
Definition: thread.h:179
spin_rw_mutex::read_lock_guard spin_rw_read_lock
Definition: thread.h:507
void pause(int delay) noexcept
Definition: thread.h:102
void read_lock() noexcept
Definition: thread.h:405
**If you just want to fire and args
Definition: thread.h:609
read_lock_guard(spin_rw_mutex &fm) noexcept
Definition: thread.h:475
auto push(F &&f) -> std::future< decltype(f(0))>
Definition: thread.h:672
void check_done()
Definition: thread.h:831
#define OIIO_NAMESPACE_END
Definition: oiioversion.h:94
const spin_mutex & operator=(const spin_mutex &) noexcept
Definition: thread.h:188
GA_API const UT_StringHolder rest
thread * create_thread(FUNC func, Args &&...args)
Definition: thread.h:560
**Note that the tasks the is the thread number *for the pool
Definition: thread.h:637
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the and then *wait for them to all complete We provide a helper task_set
Definition: thread.h:623
#define OIIO_CACHE_LINE_SIZE
Definition: platform.h:358
#define OIIO_NAMESPACE_BEGIN
Definition: oiioversion.h:93