HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UT_ParallelUtil.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * NAME: UT_ParallelUtil.h ( UT Library, C++)
7  *
8  * COMMENTS: Simple wrappers on tbb interface
9  *
10  * RELATION TO THE STL:
11  *
12  * Use UT_ParallelUtil.h (or if necessary, UT_StdThread) instead
13  * of std::thread.
14  *
15  * Reasoning:
16  *
17  * Houdini requires tight control over the number of threads as
18  * we try to follow the command line -j option.
19  * This is important for Houdini to play nicely on farms where
20  * we may get a slice of a machine.
21  * Some oversubscription is a feature, but too much is not.
22  * We use TBB currently to ensure composability of threading -
23  * your algorithm does not run in a vacuum but must thread nicely
24  * with other algorithms at the same time, so you should never
25  * assume you get # CPU threads.
26  *
27  * We also need careful control of task stealing, which requires
28  * setting up thread groups. We thus must have a centralized
29  * location where all threads are created.
30  */
31 
32 #ifndef __UT_ParallelUtil__
33 #define __UT_ParallelUtil__
34 
35 #include "UT_API.h"
36 
37 #include "UT_Array.h"
38 #include "UT_PerformanceThread.h"
39 #include "UT_TaskScope.h"
40 #include "UT_TBBParallelInvoke.h"
41 #include "UT_Thread.h"
42 #include "UT_IteratorRange.h"
43 #include "UT_Optional.h"
44 
45 #include <tbb/blocked_range.h>
46 #include <tbb/blocked_range2d.h>
47 #include <tbb/task_arena.h>
48 #include <tbb/parallel_for.h>
49 #include <tbb/parallel_reduce.h>
50 #include <tbb/parallel_sort.h>
51 
52 /// Typedef to denote the "split" constructor of a range
54 
55 /// Declare prior to use.
56 template <typename T>
58 
59 template <typename RowT, typename ColT=RowT>
61 
62 // Default implementation that calls range.size()
63 template< typename RANGE >
65 {
67 
68  size_t operator()(const RANGE& range) const
69  {
70  return range.size();
71  }
72 };
73 
74 // Partial specialization for UT_BlockedRange2D<T>
75 template< typename T >
77 {
79 
80  size_t operator()(const UT_BlockedRange2D<T>& range) const
81  {
82  return range.rows().size() * range.cols().size();
83  }
84 };
85 
86 /// This is needed by UT_CoarsenedRange
87 template <typename RANGE>
88 inline size_t UTestimatedNumItems(const RANGE& range)
89 {
90  return UT_EstimatorNumItems<RANGE>()(range);
91 }
92 
93 /// UT_CoarsenedRange: This should be used only inside
94 /// UT_ParallelFor and UT_ParallelReduce
95 /// This class wraps an existing range with a new range.
96 /// This allows us to use simple_partitioner, rather than
97 /// auto_partitioner, which has disastrous performance with
98 /// the default grain size in ttb 4.
99 template< typename RANGE >
100 class UT_CoarsenedRange : public RANGE
101 {
102 public:
103  // Compiler-generated versions are fine:
104  // ~UT_CoarsenedRange();
105  // UT_CoarsenedRange(const UT_CoarsenedRange&);
106 
107  // Split into two sub-ranges:
109  RANGE(range, spl),
110  myGrainSize(range.myGrainSize)
111  {
112  }
113 
114  // Inherited: bool empty() const
115 
116  bool is_divisible() const
117  {
118  return
119  RANGE::is_divisible() &&
120  (UTestimatedNumItems(static_cast<const RANGE&>(*this)) > myGrainSize);
121  }
122 
123 private:
124  size_t myGrainSize;
125 
126  UT_CoarsenedRange(const RANGE& base_range, const size_t grain_size) :
127  RANGE(base_range),
128  myGrainSize(grain_size)
129  {
130  }
131 
132  template <typename Range, typename Body>
133  friend void UTparallelFor(
134  const Range &range, const Body &body,
135  const int subscribe_ratio, const int min_grain_size,
136  const bool force_use_task_scope
137  );
138  template <typename Range, typename Body>
139  friend void UTparallelReduce(
140  const Range &range, Body &body,
141  const int subscribe_ratio, const int min_grain_size,
142  const bool force_use_taskscope
143  );
144  template <typename Range, typename Body>
145  friend void UTparallelDeterministicReduce(
146  const Range &range, Body &body, const int grain_size,
147  const bool force_use_taskscope
148  );
149 };
150 
151 /// Helper class for UTparallelFor().
152 /// Wraps the thread body in a task scope so that thread stats are collected
153 /// by the performance monitor, and child tasks can inherit task scope locks
154 /// from the parent task.
155 template<typename Range, typename Body>
157 {
158 public:
159  ut_TaskScopedBody(const Body *body)
160  : myBody(body),
161  myParentTaskScope(UT_TaskScope::getCurrent())
162  {
163  }
164 
166  : myBody(src.myBody),
167  myParentTaskScope(src.myParentTaskScope)
168  {
169  }
170 
171  void operator()(const Range &r) const
172  {
173  UT_TaskScope task_scope(myParentTaskScope);
174  (*myBody)(r);
175  }
176 
177 private:
178  const Body *myBody;
179  const UT_TaskScope *myParentTaskScope;
180 };
181 
182 /// Helper class for UTparallelFor().
183 /// Wraps the thread body allowing non-copyable bodies to be used with
184 /// UTparallelFor().
185 template<typename Range, typename Body>
187 {
188 public:
189  ut_TaskBody(const Body *body) : myBody(body) {}
190  void operator()(const Range &r) const { (*myBody)(r); }
191 
192 private:
193  const Body *myBody;
194 };
195 
196 /// Helper class for UTparallelForEachNumber()
197 /// This wraps the thread body to perform different load balancing based on
198 /// peeling off tasks using an atomic int to iterate over the range.
199 /// @c IntType must be an integer type supported by @c SYS_AtomicInt (currently
200 /// int32 or int64).
201 template <typename IntType, typename Body>
203 {
204 public:
205  ut_ForEachNumberBody(const Body &body,
206  SYS_AtomicInt<IntType> &it, IntType end)
207  : myBody(body)
208  , myIt(it)
209  , myEnd(end)
210  {
211  }
213  {
214  while (true)
215  {
216  IntType it = myIt.exchangeAdd(1);
217  if (it >= myEnd)
218  break;
219  myBody(UT_BlockedRange<IntType>(it, it+1));
220  }
221  }
222 private:
223  const Body &myBody;
225  IntType myEnd;
226 };
227 
228 /// Run the @c body function over a range in parallel.
229 /// UTparallelFor attempts to spread the range out over at most
230 /// subscribe_ratio * num_processor tasks.
231 /// The factor subscribe_ratio can be used to help balance the load.
232 /// UTparallelFor() uses tbb for its implementation.
233 /// The used grain size is the maximum of min_grain_size and
234 /// if UTestimatedNumItems(range) / (subscribe_ratio * num_processor).
235 /// If subscribe_ratio == 0, then a grain size of min_grain_size will be used.
236 /// A range can be split only when UTestimatedNumItems(range) exceeds the
237 /// grain size the range is divisible.
238 
239 ///
240 /// Requirements for the Range functor are:
241 /// - the requirements of the tbb Range Concept
242 /// - UT_estimatorNumItems<Range> must return the the estimated number of work items
243 /// for the range. When Range::size() is not the correct estimate, then a
244 /// (partial) specialization of UT_estimatorNumItemsimatorRange must be provided
245 /// for the type Range.
246 ///
247 /// Requirements for the Body function are:
248 /// - @code Body(const Body &); @endcode @n
249 /// Copy Constructor
250 /// - @code Body()::~Body(); @endcode @n
251 /// Destructor
252 /// - @code void Body::operator()(const Range &range) const; @endcode
253 /// Function call to perform operation on the range. Note the operator is
254 /// @b const.
255 ///
256 /// The requirements for a Range object are:
257 /// - @code Range::Range(const Range&); @endcode @n
258 /// Copy constructor
259 /// - @code Range::~Range(); @endcode @n
260 /// Destructor
261 /// - @code bool Range::is_divisible() const; @endcode @n
262 /// True if the range can be partitioned into two sub-ranges
263 /// - @code bool Range::empty() const; @endcode @n
264 /// True if the range is empty
265 /// - @code Range::Range(Range &r, UT_Split) const; @endcode @n
266 /// Split the range @c r into two sub-ranges (i.e. modify @c r and *this)
267 ///
268 /// Example: @code
269 /// class Square {
270 /// public:
271 /// Square(fpreal *data) : myData(data) {}
272 /// ~Square();
273 /// void operator()(const UT_BlockedRange<int64> &range) const
274 /// {
275 /// for (int64 i = range.begin(); i != range.end(); ++i)
276 /// myData[i] *= myData[i];
277 /// }
278 /// fpreal *myData;
279 /// };
280 /// ...
281 ///
282 /// void
283 /// parallel_square(fpreal *array, int64 length)
284 /// {
285 /// UTparallelFor(UT_BlockedRange<int64>(0, length), Square(array));
286 /// }
287 /// @endcode
288 ///
289 /// @see UTparallelReduce(), UT_BlockedRange()
290 
291 template <typename Range, typename Body>
293  const Range &range, const Body &body,
294  const int subscribe_ratio = 2,
295  const int min_grain_size = 1,
296  const bool force_use_task_scope = true
297 )
298 {
299  const size_t num_processors( UT_Thread::getNumProcessors() );
300 
301  UT_ASSERT( num_processors >= 1 );
302  UT_ASSERT( min_grain_size >= 1 );
303  UT_ASSERT( subscribe_ratio >= 0 );
304 
305  const size_t est_range_size( UTestimatedNumItems(range) );
306 
307  // Don't run on an empty range!
308  if (est_range_size == 0)
309  return;
310 
311  // Avoid tbb overhead if entire range needs to be single threaded
312  if (num_processors == 1 || est_range_size <= min_grain_size ||
314  {
315  body(range);
316  return;
317  }
318 
319  size_t grain_size(min_grain_size);
320  if( subscribe_ratio > 0 )
321  grain_size = std::max(
322  grain_size,
323  est_range_size / (subscribe_ratio * num_processors)
324  );
325 
326  UT_CoarsenedRange< Range > coarsened_range(range, grain_size);
327 
328  if (force_use_task_scope || UTperformanceIsRecordingThreadStats())
329  {
331  coarsened_range, ut_TaskScopedBody<Range, Body>(&body),
332  tbb::simple_partitioner());
333  }
334  else
335  {
337  coarsened_range, ut_TaskBody<Range, Body>(&body),
338  tbb::simple_partitioner());
339  }
340 }
341 
342 /// Version of UTparallelFor that always creates a task scope to prevent
343 /// deadlocking of child tasks that might acquire UT_TaskLocks.
344 template <typename Range, typename Body>
346  const Range &range, const Body &body,
347  const int subscribe_ratio = 2,
348  const int min_grain_size = 1
349 )
350 {
351  UTparallelFor(range, body, subscribe_ratio, min_grain_size, true);
352 }
353 
354 /// Version of UTparallelFor that is tuned for the case where the range
355 /// consists of lightweight items, for example,
356 /// float additions or matrix-vector multiplications.
357 template <typename Range, typename Body>
358 void
359 UTparallelForLightItems(const Range &range, const Body &body,
360  const bool force_use_task_scope = true)
361 {
362  UTparallelFor(range, body, 2, 1024, force_use_task_scope);
363 }
364 
365 /// Version of UTparallelFor that is tuned for the case where the range
366 /// consists of heavy items, for example, defragmenting an entire attribute.
367 ///
368 /// If possible, UTparallelForEachNumber() is preferred over use of
369 /// UTparallelForHeavyItems().
370 ///
371 /// Note, when the range is guaranteed to be small, you might prefer to run
372 /// <tt>UTparallelFor(range, body, 0, 1)</tt>. That form of the loop would
373 /// guarantee that a separate task is started for each iteration of the body.
374 /// However, that form can cause issues when the range gets large, in that a @b
375 /// large number of tasks may be created.
376 ///
377 template <typename Range, typename Body>
378 SYS_DEPRECATED_REPLACE(16.5, "UTparallelForEachNumber||UTparallelFor(r,b,0,1)")
379 void
380 UTparallelForHeavyItems(const Range &range, const Body &body)
381 {
382  // By oversubscribing by 32, small ranges will still be split into
383  // individual tasks. However, large ranges will be chunked, causing fewer
384  // tasks, but potentially worse load balancing.
385  //
386  // Consider using UTparallelForEachNumber() instead.
387  UTparallelFor(range, body, 32, 1, /*force_use_task=*/true);
388 }
389 
390 /// Version of UTparallelFor tuned for a range consists of heavy items, for
391 /// example, defragmenting an entire attribute.
392 ///
393 /// This approach uses "ideal" load balancing across threads and doesn't rely
394 /// on the TBB task scheduler for splitting the range. Instead, it iterates
395 /// from @c 0 to @c nitems, calling @c body with a UT_BlockedRange<IntType>
396 /// containing a list of tasks to execute.
397 ///
398 /// @note The @c IntType must work with @c SYS_AtomicInt (currently int32 or
399 /// int64). If you get a boost static assertion, please make sure the @c body
400 /// range takes the proper integer type.
401 template <typename IntType, typename Body>
402 void
403 UTparallelForEachNumber(IntType nitems, const Body &body, const bool force_use_task_scope = true)
404 {
405  const size_t num_processors(UT_Thread::getNumProcessors());
406 
407  UT_ASSERT(num_processors >= 1);
408  if (nitems == 0)
409  return;
410  if (num_processors == 1)
411  {
412  body(UT_BlockedRange<IntType>(0, nitems));
413  return;
414  }
415  if (nitems <= num_processors)
416  {
417  // When there are a small number of tasks, split into a single task per
418  // thread.
419  UTparallelFor(UT_BlockedRange<IntType>(0, nitems), body, 0, 1, force_use_task_scope);
420  return;
421  }
422 
423  // Split across number of processors, with each thread using the atomic int
424  // to query the next task to be run (similar to UT_ThreadedAlgorithm)
426  UTparallelFor(UT_BlockedRange<IntType>(0, num_processors),
427  ut_ForEachNumberBody<IntType, Body>(body, it, nitems), 0, 1, force_use_task_scope);
428 }
429 
430 /// UTserialForEachNumber can be used as a debugging tool to quickly replace a
431 /// parallel for with a serial for.
432 template <typename IntType, typename Body>
433 void
434 UTserialForEachNumber(IntType nitems, const Body &body, bool usetaskscope=true)
435 {
436  for (IntType i = 0; i < nitems; ++i)
437  body(UT_BlockedRange<IntType>(i, i + 1));
438 }
439 
440 /// Version of UTparallelForEachNumber that wraps the body in a UT_TaskScope
441 /// that makes it safe to use UT_TaskLock objects that are currently locked by
442 /// the parent scope.
443 template <typename IntType, typename Body>
444 void
445 UTparallelForEachNumberTaskScope(IntType nitems, const Body &body)
446 {
447  UTparallelForEachNumber(nitems, body, /*force_use_task_scope=*/true);
448 }
449 
450 /// UTserialFor can be used as a debugging tool to quickly replace a parallel
451 /// for with a serial for.
452 template <typename Range, typename Body>
453 void UTserialFor(const Range &range, const Body &body)
454  { body(range); }
455 
456 /// Helper class for UTparallelInvoke().
457 /// Wraps the thread body in a task scope so that thread stats are collected
458 /// by the performance monitor, and child tasks can inherit task scope locks
459 /// from the parent task.
460 template<typename Body>
462 {
463 public:
464  ut_TaskScopedInvokeBody(const Body &body)
465  : myBody(body),
466  myParentTaskScope(UT_TaskScope::getCurrent())
467  {
468  }
469 
471  : myBody(src.myBody),
472  myParentTaskScope(src.myParentTaskScope)
473  {
474  }
475 
476  void operator()() const
477  {
478  UT_TaskScope task_scope(myParentTaskScope);
479  myBody();
480  }
481 
482 private:
483  const Body &myBody;
484  const UT_TaskScope *myParentTaskScope;
485 };
486 
487 /// Takes a functor for passing to UTparallelInvoke, and wraps it in a
488 /// ut_TaskScopeInvokeBody object so the functor will be invoked wrapped in
489 /// a UT_TaskScope that makes it safe to use UT_TaskLock objects that are
490 /// currently locked by the parent scope.
491 template <typename Body>
493 UTmakeTaskScopedInvokeBody(const Body &body)
494 {
495  return ut_TaskScopedInvokeBody<Body>(body);
496 }
497 
498 /// UTparallelInvoke() executes the given functions in parallel when the
499 /// parallel flag is true - otherwise it runs them serially. F1 and F2
500 /// should be void functors.
501 template <typename F1, typename F2>
502 inline void UTparallelInvoke(bool parallel, F1 &&f1, F2 &&f2)
503 {
504  if (parallel && UT_Thread::isThreadingEnabled())
505  {
506  tbb::parallel_invoke(UTmakeTaskScopedInvokeBody(std::forward<F1>(f1)),
507  UTmakeTaskScopedInvokeBody(std::forward<F2>(f2)));
508  }
509  else
510  {
511  f1();
512  f2();
513  }
514 }
515 
516 template <typename F1, typename F2, typename... Rest>
517 inline void UTparallelInvoke(bool parallel, F1 &&f1, F2 &&f2, Rest&&... rest)
518 {
519  if (parallel && UT_Thread::isThreadingEnabled())
520  {
521  tbb::parallel_invoke(UTmakeTaskScopedInvokeBody(std::forward<F1>(f1)),
522  UTmakeTaskScopedInvokeBody(std::forward<F2>(f2)),
523  UTmakeTaskScopedInvokeBody(std::forward<Rest>(rest))...);
524  }
525  else
526  {
527  f1();
528  UTparallelInvoke(parallel, f2, std::forward<Rest>(rest)...);
529  }
530 }
531 
532 template <typename F1>
534 {
535 public:
537  : myFunctions(functions) {}
538  void operator()(const tbb::blocked_range<int>& r ) const
539  {
540  for (int i = r.begin(); i != r.end(); ++i)
541  (*myFunctions(i))();
542  }
543 private:
544  const UT_Array<F1 *> &myFunctions;
545 };
546 
547 /// UTparallelInvoke() executes the array of functions in parallel when the
548 /// parallel flag is true - otherwise it runs them serially. F1 should be
549 /// a void functor.
550 template <typename F1>
551 inline void UTparallelInvoke(bool parallel, const UT_Array<F1 *> &funs)
552 {
553  if (parallel && funs.entries() > 1 && UT_Thread::isThreadingEnabled())
554  {
555  UTparallelFor(tbb::blocked_range<int>(0, funs.entries(), 1),
557  32, 1); // oversubscribe to force forking
558  }
559  else
560  {
561  for (int i = 0; i < funs.entries(); i++)
562  (*funs(i))();
563  }
564 }
565 
566 template <typename F1>
568 {
569 public:
571  : myFunctions(functions) {}
572  void operator()(const tbb::blocked_range<int>& r ) const
573  {
574  for (int i = r.begin(); i != r.end(); ++i)
575  myFunctions(i)();
576  }
577 private:
578  const UT_Array<F1> &myFunctions;
579 };
580 
581 /// UTparallelInvoke() executes the array of functions in parallel when the
582 /// parallel flag is true - otherwise it runs them serially. F1 should be
583 /// a void functor.
584 template <typename F1>
585 inline void UTparallelInvoke(bool parallel, const UT_Array<F1> &funs)
586 {
587  if (parallel && funs.entries() > 1 && UT_Thread::isThreadingEnabled())
588  {
589  UTparallelFor(tbb::blocked_range<int>(0, funs.entries(), 1),
591  32, 1); // oversubscribe to force forking
592  }
593  else
594  {
595  for (int i = 0; i < funs.entries(); i++)
596  funs(i)();
597  }
598 }
599 
600 /// Helper class for UTparallelReduce().
601 /// Wraps the thread body in a task scope so that thread stats are collected
602 /// by the performance monitor, and child tasks can inherit task scope locks
603 /// from the parent task.
604 template<typename Range, typename Body>
606 {
607 public:
608  // Construct from base type pointer, holds a pointer to it.
610  : myParentTaskScope(UT_TaskScope::getCurrent())
611  {
612  myBodyPtr = body;
613  }
614 
616  : myParentTaskScope(src.myParentTaskScope)
617  , myBodyPtr(nullptr)
618  {
619  UT_TaskScope task_scope(myParentTaskScope);
620  myBody.emplace(src.body(), UT_Split());
621  }
622 
623  void operator()(const Range &r)
624  {
625  UT_TaskScope task_scope(myParentTaskScope);
626  body()(r);
627  }
628 
630  {
631  UT_TaskScope task_scope(myParentTaskScope);
632  body().join(other.body());
633  }
634 
635  const Body &body() const { return myBodyPtr ? *myBodyPtr : *myBody; }
636  Body &body() { return myBodyPtr ? *myBodyPtr : *myBody; }
637 private:
638  UT_Optional<Body> myBody;
639  Body *myBodyPtr;
640  const UT_TaskScope *myParentTaskScope;
641 };
642 
643 /// UTparallelReduce() is a simple wrapper that uses tbb for its implementation.
644 /// Run the @c body function over a range in parallel.
645 ///
646 /// WARNING: The @c operator()() and @c join() functions MUST @b NOT initialize
647 /// data! @b Both of these functions MUST ONLY accumulate data! This
648 /// is because TBB may re-use body objects for multiple ranges.
649 /// Effectively, operator()() must act as an in-place join operation
650 /// for data as it comes in. Initialization must be kept to the
651 /// constructors of Body.
652 ///
653 /// Requirements for the Body function are:
654 /// - @code Body()::~Body(); @endcode @n
655 /// Destructor
656 /// - @code Body::Body(Body &r, UT_Split) const; @endcode @n
657 /// The splitting constructor.
658 /// WARNING: This must be able to run concurrently with calls to
659 /// @c r.operator()() and @c r.join(), so this should not copy
660 /// values accumulating in r.
661 /// - @code void Body::operator()(const Range &range); @endcode
662 /// Function call to perform operation on the range. Note the operator is
663 /// @b not const.
664 /// - @code void Body::join(const Body &other); @endcode
665 /// Join the results from another operation with this operation.
666 /// @b not const.
667 ///
668 /// The requirements for a Range object are:
669 /// - @code Range::Range(const Range&); @endcode @n
670 /// Copy constructor
671 /// - @code Range::~Range(); @endcode @n
672 /// Destructor
673 /// - @code bool Range::is_divisible() const; @endcode @n
674 /// True if the range can be partitioned into two sub-ranges
675 /// - @code bool Range::empty() const; @endcode @n
676 /// True if the range is empty
677 /// - @code Range::Range(Range &r, UT_Split) const; @endcode @n
678 /// Split the range @c r into two sub-ranges (i.e. modify @c r and *this)
679 ///
680 /// Example: @code
681 /// class Dot {
682 /// public:
683 /// Dot(const fpreal *a, const fpreal *b)
684 /// : myA(a)
685 /// , myB(b)
686 /// , mySum(0)
687 /// {}
688 /// Dot(Dot &src, UT_Split)
689 /// : myA(src.myA)
690 /// , myB(src.myB)
691 /// , mySum(0)
692 /// {}
693 /// void operator()(const UT_BlockedRange<int64> &range)
694 /// {
695 /// for (int64 i = range.begin(); i != range.end(); ++i)
696 /// mySum += myA[i] * myB[i];
697 /// }
698 /// void join(const Dot &other)
699 /// {
700 /// mySum += other.mySum;
701 /// }
702 /// fpreal mySum;
703 /// const fpreal *myA, *myB;
704 /// };
705 ///
706 /// fpreal
707 /// parallel_dot(const fpreal *a, const fpreal *b, int64 length)
708 /// {
709 /// Dot body(a, b);
710 /// UTparallelReduce(UT_BlockedRange<int64>(0, length), body);
711 /// return body.mySum;
712 /// }
713 /// @endcode
714 /// @see UTparallelFor(), UT_BlockedRange()
715 template <typename Range, typename Body>
717  const Range &range,
718  Body &body,
719  const int subscribe_ratio = 2,
720  const int min_grain_size = 1,
721  const bool force_use_task_scope = true
722 )
723 {
724  const size_t num_processors( UT_Thread::getNumProcessors() );
725 
726  UT_ASSERT( num_processors >= 1 );
727  UT_ASSERT( min_grain_size >= 1 );
728  UT_ASSERT( subscribe_ratio >= 0 );
729 
730  const size_t est_range_size( UTestimatedNumItems(range) );
731 
732  // Don't run on an empty range!
733  if (est_range_size == 0)
734  return;
735 
736  // Avoid tbb overhead if entire range needs to be single threaded
737  if (num_processors == 1 || est_range_size <= min_grain_size ||
739  {
740  body(range);
741  return;
742  }
743 
744  size_t grain_size(min_grain_size);
745  if( subscribe_ratio > 0 )
746  grain_size = std::max(
747  grain_size,
748  est_range_size / (subscribe_ratio * num_processors)
749  );
750 
751  UT_CoarsenedRange< Range > coarsened_range(range, grain_size);
752  if (force_use_task_scope || UTperformanceIsRecordingThreadStats())
753  {
754  ut_ReduceTaskScopedBody<Range, Body> bodywrapper(&body);
755  tbb::parallel_reduce(coarsened_range,
756  bodywrapper,
757  tbb::simple_partitioner());
758  }
759  else
760  {
761  tbb::parallel_reduce(coarsened_range, body, tbb::simple_partitioner());
762  }
763 }
764 
765 /// This is a simple wrapper for deterministic reduce that uses tbb. It
766 /// works in the same manner as UTparallelReduce, with the following
767 /// differences:
768 /// - reduction and join order is deterministic (devoid of threading
769 /// uncertainty;
770 /// - a fixed grain size must be provided by the caller; grain size is
771 /// not adjusted based on the available resources (this is required to
772 /// satisfy determinism).
773 /// This version should be used when task joining is not associative (such
774 /// as accumulation of a floating point residual).
775 template <typename Range, typename Body>
777  const Range &range,
778  Body &body,
779  const int grain_size,
780  const bool force_use_task_scope = true
781 )
782 {
783  UT_ASSERT( grain_size >= 1 );
784 
785  const size_t est_range_size( UTestimatedNumItems(range) );
786 
787  // Don't run on an empty range!
788  if (est_range_size == 0)
789  return;
790 
792  "FIXME: There needs to be a way to do identical splits and joins when single-threading,"
793  " to avoid having different roundoff error from when multi-threading. "
794  " Something using simple_partitioner() might work.");
795 
796  UT_CoarsenedRange< Range > coarsened_range(range, grain_size);
797  if (force_use_task_scope || UTperformanceIsRecordingThreadStats())
798  {
799  ut_ReduceTaskScopedBody<Range, Body> bodywrapper(&body);
800  tbb::parallel_deterministic_reduce(coarsened_range,
801  bodywrapper,
802  tbb::simple_partitioner());
803  }
804  else
805  {
806  tbb::parallel_deterministic_reduce(coarsened_range, body);
807  }
808 }
809 
810 /// Version of UTparallelReduce that is tuned for the case where the range
811 /// consists of lightweight items, for example, finding the min/max in a set of
812 /// integers.
813 template <typename Range, typename Body>
814 void UTparallelReduceLightItems(const Range &range, Body &body)
815 {
816  UTparallelReduce(range, body, 2, 1024);
817 }
818 
819 /// Version of UTparallelReduce that is tuned for the case where the range
820 /// consists of heavy items, for example, computing the bounding box of a list
821 /// of geometry objects.
822 template <typename Range, typename Body>
823 void UTparallelReduceHeavyItems(const Range &range, Body &body)
824 {
825  UTparallelReduce(range, body, 0, 1);
826 }
827 
828 /// UTserialReduce can be used as a debugging tool to quickly replace a
829 /// parallel reduce with a serial for.
830 template <typename Range, typename Body>
831 void UTserialReduce(const Range &range, Body &body)
832  { body(range); }
833 
834 /// Cancel the entire current task group context when run within a task
835 static inline void
836 UTparallelCancelGroupExecution()
837 {
838  // TODO: In oneTBB, this becomes current_context()->cancel_group_execution()
839  tbb::task::self().cancel_group_execution();
840 }
841 
842 /// UTparallelSort() is a simple wrapper that uses tbb for its implementation.
843 ///
844 /// WARNING: UTparallelSort is UNSTABLE! You must explicitly force stability
845 /// if needed.
846 template <typename RandomAccessIterator, typename Compare>
847 void UTparallelSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
848 {
850  tbb::parallel_sort(begin, end, compare);
851  else
852  std::sort(begin, end, compare);
853 }
854 
855 /// UTparallelSort() is a simple wrapper that uses tbb for its implementation.
856 ///
857 /// WARNING: UTparallelSort is UNSTABLE! You must explicitly force stability
858 /// if needed.
859 template <typename RandomAccessIterator>
860 void UTparallelSort(RandomAccessIterator begin, RandomAccessIterator end)
861 {
863  tbb::parallel_sort(begin, end);
864  else
865  std::sort(begin, end);
866 }
867 
868 /// UTparallelSort() is a simple wrapper that uses tbb for its implementation.
869 ///
870 /// WARNING: UTparallelSort is UNSTABLE! You must explicitly force stability
871 /// if needed.
872 template <typename T>
874 {
876  tbb::parallel_sort(begin, end);
877  else
878  std::sort(begin, end);
879 }
880 
881 // Forward declaration of parallel_stable_sort; implementation at end of file.
882 namespace pss
883 {
884 template<typename RandomAccessIterator, typename Compare>
885 void parallel_stable_sort( RandomAccessIterator xs, RandomAccessIterator xe,
886  Compare comp );
887 
888 //! Wrapper for sorting with default comparator.
889 template<class RandomAccessIterator>
890 void parallel_stable_sort( RandomAccessIterator xs, RandomAccessIterator xe )
891 {
893  parallel_stable_sort( xs, xe, std::less<T>() );
894 }
895 }
896 
897 /// UTparalleStableSort() is a stable parallel merge sort.
898 ///
899 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
900 /// On allocation failure it falls back to calling @c std::stable_sort.
901 /// NOTE: Element initialization is done via @c std::move, so non-POD element
902 /// types should implement c++11 move semantics.
903 template <typename RandomAccessIterator, typename Compare>
904 void UTparallelStableSort(RandomAccessIterator begin, RandomAccessIterator end,
905  const Compare &compare)
906 {
907  pss::parallel_stable_sort(begin, end, compare);
908 }
909 
910 /// UTparalleStableSort() is a stable parallel merge sort.
911 ///
912 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
913 /// On allocation failure it falls back to calling @c std::stable_sort.
914 /// NOTE: Element initialization is done via @c std::move, so non-POD element
915 /// types should implement c++11 move semantics.
916 template <typename RandomAccessIterator>
917 void UTparallelStableSort(RandomAccessIterator begin, RandomAccessIterator end)
918 {
919  pss::parallel_stable_sort(begin, end);
920 }
921 
922 /// UTparalleStableSort() is a stable parallel merge sort.
923 ///
924 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
925 /// On allocation failure it falls back to calling @c std::stable_sort.
926 /// NOTE: Element initialization is done via @c std::move, so non-POD element
927 /// types should implement c++11 move semantics.
928 template <typename T>
930 {
931  pss::parallel_stable_sort(begin, end);
932 }
933 
934 /// UTparalleStableSort() is a stable parallel merge sort.
935 ///
936 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
937 /// On allocation failure it falls back to calling @c std::stable_sort.
938 /// NOTE: Element initialization is done via @c std::move, so non-POD element
939 /// types should implement c++11 move semantics.
940 template <typename T, typename Compare>
941 void UTparallelStableSort(T *begin, T *end, const Compare &compare)
942 {
943  pss::parallel_stable_sort(begin, end, compare);
944 }
945 
946 
947 /// UTparalleStableSort() is a stable parallel merge sort.
948 /// This form works with UT_Array and other containers with begin/end members.
949 ///
950 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
951 /// On allocation failure it falls back to calling @c std::stable_sort.
952 /// NOTE: Element initialization is done via @c std::move, so non-POD element
953 /// types should implement c++11 move semantics.
954 template <typename T>
955 void
957 {
958  pss::parallel_stable_sort(a.begin(), a.end());
959 }
960 
961 
962 /// UTparalleStableSort() is a stable parallel merge sort.
963 /// This form works with UT_Array and other containers with begin/end members.
964 ///
965 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
966 /// On allocation failure it falls back to calling @c std::stable_sort.
967 /// NOTE: Element initialization is done via @c std::move, so non-POD element
968 /// types should implement c++11 move semantics.
969 template <typename T, typename Compare>
970 void
971 UTparallelStableSort(T &a, const Compare &compare)
972 {
973  pss::parallel_stable_sort(a.begin(), a.end(), compare);
974 }
975 
976 /// UT_BlockedRange() is a simple wrapper using tbb for its implementation
977 /// This meets the requirements for a Range object, which are:
978 /// - @code Range::Range(const Range&); @endcode @n
979 /// Copy constructor
980 /// - @code Range::~Range(); @endcode @n
981 /// Destructor
982 /// - @code bool Range::is_divisible() const; @endcode @n
983 /// True if the range can be partitioned into two sub-ranges
984 /// - @code bool Range::empty() const; @endcode @n
985 /// True if the range is empty
986 /// - @code Range::Range(Range &r, UT_Split) const; @endcode @n
987 /// Split the range @c r into two sub-ranges (i.e. modify @c r and *this)
988 template <typename T>
989 class UT_BlockedRange : public tbb::blocked_range<T>
990 {
991 public:
992  // TBB 2018 U3 no longer supports default blocked_range constructors
993  UT_BlockedRange() = delete;
994 
995  UT_BlockedRange(T begin_value, T end_value, size_t grainsize=1)
996  : tbb::blocked_range<T>(begin_value, end_value, grainsize)
997  {}
999  : tbb::blocked_range<T>(R, split)
1000  {}
1001 
1002 
1003  // Because the VALUE of a blocked range may be a simple
1004  // type like int, the range-based for will fail to do a
1005  // dereference on it. This iterator-like wrapper will
1006  // allow * to work.
1008  {
1009  public:
1011  explicit ValueWrapper(const T &it)
1012  : myCurrent(it)
1013  {}
1014 
1016  T operator*() { return myCurrent; }
1017 
1019  bool operator==(const ValueWrapper &cmp) const
1020  { return (myCurrent == cmp.myCurrent); }
1022  bool operator!=(const ValueWrapper &cmp) const
1023  { return !(*this == cmp); }
1024 
1027  {
1028  ++myCurrent;
1029  return *this;
1030  }
1031  private:
1032  T myCurrent;
1033  };
1034 
1035  // Allows for:
1036  // for (T value : range.items())
1037  auto items() const
1038  {
1039  return UT_IteratorRange<ValueWrapper>(ValueWrapper(this->begin()), ValueWrapper(this->end()));
1040  }
1041 
1042 };
1043 
1044 /// UT_BlockedRange2D() is a simple wrapper using tbb for its implementation
1045 /// This meets the requirements for a Range object, which are:
1046 /// - @code Range::Range(const Range&); @endcode @n
1047 /// Copy constructor
1048 /// - @code Range::~Range(); @endcode @n
1049 /// Destructor
1050 /// - @code bool Range::is_divisible() const; @endcode @n
1051 /// True if the range can be partitioned into two sub-ranges
1052 /// - @code bool Range::empty() const; @endcode @n
1053 /// True if the range is empty
1054 /// - @code Range::Range(Range &r, UT_Split) const; @endcode @n
1055 /// Split the range @c r into two sub-ranges (i.e. modify @c r and *this)
1056 template <typename RowT, typename ColT>
1057 class UT_BlockedRange2D : public tbb::blocked_range2d<RowT, ColT>
1058 {
1059 public:
1060  // TBB 2018 U3 no longer supports default blocked_range constructors
1061  UT_BlockedRange2D() = delete;
1062 
1063  /// NB: The arguments are in a different order than tbb
1064  UT_BlockedRange2D(RowT row_begin, RowT row_end,
1065  ColT col_begin, ColT col_end,
1066  size_t row_grainsize=1, size_t col_grainsize=1)
1067  : tbb::blocked_range2d<RowT, ColT>(row_begin, row_end, row_grainsize,
1068  col_begin, col_end, col_grainsize)
1069  {}
1071  : tbb::blocked_range2d<RowT, ColT>(R, split)
1072  {}
1073 };
1074 
1075 /// Performs a prefix sum across all the entries of the array.
1076 /// Ie,
1077 /// for (int i = 1; i < array.entries(); i++)
1078 /// array(i) = OP(array(i-1), array(i));
1079 /// tbb has this as tbb_parallel_scan but does not guarantee determinism.
1080 /// Note determinism is based on grain size, so that must be fixed.
1081 template <typename Op, typename T>
1082 void
1084  UT_Array<T> &array,
1085  const T identity,
1086  const Op &op,
1087  const int grain_size = 1024,
1088  const bool force_use_task_scope = true
1089 )
1090 {
1091  // Check serial. We need to have a enough grains to make
1092  // this worthwhile.
1093  if (array.entries() < grain_size * 10)
1094  {
1095  T total = identity;
1096  for (exint i = 0, n = array.entries(); i < n; i++)
1097  {
1098  total = op(total, array(i));
1099  array(i) = total;
1100  }
1101  return;
1102  }
1103 
1104  // We could use the actual destination array to store the block
1105  // totals with some cleverness... For example, perhaps a stride &
1106  // offset so we could still recurse on prefix summing those totals?
1107  UT_Array<T> blocktotals;
1108  exint nblocks = (array.entries() + grain_size-1) / grain_size;
1109  blocktotals.setSizeNoInit(nblocks);
1110 
1111  // Scan for total for each block & compute the prefix sum
1112  // within the block
1113  UTparallelForEachNumber(nblocks, [&](const UT_BlockedRange<exint> &r)
1114  {
1115  for (exint block = r.begin(); block < r.end(); block++)
1116  {
1117  exint start = block * grain_size;
1118  exint end = SYSmin((block+1)*grain_size, array.entries());
1119  T total = identity;
1120  for (exint i = start; i < end; i++)
1121  {
1122  total = op(total, array(i));
1123  array(i) = total;
1124  }
1125  // TODO: False sharing here?
1126  blocktotals(block) = total;
1127  }
1128  }, force_use_task_scope);
1129 
1130  // Prefix sum our block totals.
1132  identity, op,
1133  grain_size, force_use_task_scope);
1134 
1135  // Apply them back...
1136  UTparallelForEachNumber(nblocks, [&](const UT_BlockedRange<exint> &r)
1137  {
1138  for (exint block = r.begin(); block < r.end(); block++)
1139  {
1140  exint start = block * grain_size;
1141  exint end = SYSmin((block+1)*grain_size, array.entries());
1142  if (block > 0)
1143  {
1144  T total = blocktotals(block-1);
1145  for (exint i = start; i < end; i++)
1146  {
1147  array(i) = op(total, array(i));
1148  }
1149  }
1150  }
1151  }, force_use_task_scope);
1152 }
1153 
1154 
1155 /// @{
1156 /// Wrapper around TBB's task isolation. In versions of TBB that don't support
1157 /// isolate, this uses a task arena.
1158 #if TBB_VERSION_MAJOR >= 2018
1159 template <typename F> static inline void
1160 UTisolate(F &f) { tbb::this_task_arena::isolate(f); }
1161 
1162 template <typename F> static inline void
1163 UTisolate(const F &f) { tbb::this_task_arena::isolate(f); }
1164 #else
1165 template <typename F> static inline void
1166 UTisolate(F &f)
1167 {
1168  tbb::task_arena __nested;
1169  __nested.execute(f);
1170 }
1171 template <typename F> static inline void
1172 UTisolate(const F &f)
1173 {
1174  tbb::task_arena __nested;
1175  __nested.execute(f);
1176 }
1177 #endif
1178 /// @}
1179 
1180 // The code below is originally from:
1181 // https://software.intel.com/en-us/articles/a-parallel-stable-sort-using-c11-for-tbb-cilk-plus-and-openmp
1182 // and is covered by the following copyright:
1183 /*
1184  Copyright (C) 2014 Intel Corporation
1185  All rights reserved.
1186 
1187  Redistribution and use in source and binary forms, with or without
1188  modification, are permitted provided that the following conditions
1189  are met:
1190 
1191  * Redistributions of source code must retain the above copyright
1192  notice, this list of conditions and the following disclaimer.
1193  * Redistributions in binary form must reproduce the above copyright
1194  notice, this list of conditions and the following disclaimer in
1195  the documentation and/or other materials provided with the
1196  distribution.
1197  * Neither the name of Intel Corporation nor the names of its
1198  contributors may be used to endorse or promote products derived
1199  from this software without specific prior written permission.
1200 
1201  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
1202  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
1203  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
1204  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
1205  HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
1206  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
1207  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
1208  OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
1209  AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
1210  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
1211  WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
1212  POSSIBILITY OF SUCH DAMAGE.
1213 */
1214 #include <utility>
1215 #include <iterator>
1216 #include <algorithm>
1217 
1218 namespace pss {
1219 
1220 namespace internal {
1221 
1222 //! Destroy sequence [xs,xe)
1223 template<class RandomAccessIterator>
1224 void serial_destroy( RandomAccessIterator zs, RandomAccessIterator ze ) {
1226  while( zs!=ze ) {
1227  --ze;
1228  (*ze).~T();
1229  }
1230 }
1231 
1232 //! Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,(xe-xs)+(ye-ys)), using std::move
1233 template<class RandomAccessIterator1, class RandomAccessIterator2, class RandomAccessIterator3, class Compare>
1234 void serial_move_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, Compare comp ) {
1235  if( xs!=xe ) {
1236  if( ys!=ye )
1237  {
1238  for(;;)
1239  {
1240  if( comp(*ys,*xs) ) {
1241  *zs = std::move(*ys);
1242  ++zs;
1243  if( ++ys==ye ) break;
1244  } else {
1245  *zs = std::move(*xs);
1246  ++zs;
1247  if( ++xs==xe ) goto movey;
1248  }
1249  }
1250  }
1251  ys = xs;
1252  ye = xe;
1253  }
1254 movey:
1255  std::move( ys, ye, zs );
1256 }
1257 
1258 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename Compare>
1259 void stable_sort_base_case( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp) {
1260  std::stable_sort( xs, xe, comp );
1261  if( inplace!=2 ) {
1262  RandomAccessIterator2 ze = zs + (xe-xs);
1264  if( inplace )
1265  // Initialize the temporary buffer
1266  for( ; zs<ze; ++zs )
1267  new(&*zs) T;
1268  else
1269  // Initialize the temporary buffer and move keys to it.
1270  for( ; zs<ze; ++xs, ++zs )
1271  new(&*zs) T(std::move(*xs));
1272  }
1273 }
1274 
1275 //! Raw memory buffer with automatic cleanup.
1276 class raw_buffer {
1277  void* ptr;
1278 public:
1279  //! Try to obtain buffer of given size.
1280  raw_buffer( size_t bytes ) : ptr( operator new(bytes,std::nothrow) ) {}
1281  //! True if buffer was successfully obtained, zero otherwise.
1282  operator bool() const {return ptr;}
1283  //! Return pointer to buffer, or NULL if buffer could not be obtained.
1284  void* get() const {return ptr;}
1285  //! Destroy buffer
1286  ~raw_buffer() {operator delete(ptr);}
1287 };
1288 
1289 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename RandomAccessIterator3, typename Compare>
1290 void parallel_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys,
1291  RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp );
1292 
1293 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename RandomAccessIterator3, typename Compare>
1295 {
1296  RandomAccessIterator1 _xs, _xe;
1297  RandomAccessIterator2 _ys, _ye;
1298  RandomAccessIterator3 _zs;
1299  bool _destroy;
1300  Compare _comp;
1301  parallel_merge_invoke( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye,
1302  RandomAccessIterator3 zs, bool destroy, Compare comp):
1303  _xs(xs), _xe(xe), _ys(ys), _ye(ye), _zs(zs), _destroy(destroy), _comp(comp) {}
1304 
1306 
1307 };
1308 
1309 // Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,zs+(xe-xs)+(ye-ys))
1310 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename RandomAccessIterator3, typename Compare>
1311 void parallel_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys,
1312  RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp ) {
1313  const size_t MERGE_CUT_OFF = 2000;
1314  if( (xe-xs) + (ye-ys) <= MERGE_CUT_OFF ) {
1315  serial_move_merge( xs, xe, ys, ye, zs, comp );
1316  if( destroy ) {
1317  serial_destroy( xs, xe );
1318  serial_destroy( ys, ye );
1319  }
1320  } else {
1321  RandomAccessIterator1 xm;
1322  RandomAccessIterator2 ym;
1323  if( xe-xs < ye-ys ) {
1324  ym = ys+(ye-ys)/2;
1325  xm = std::upper_bound(xs,xe,*ym,comp);
1326  } else {
1327  xm = xs+(xe-xs)/2;
1328  ym = std::lower_bound(ys,ye,*xm,comp);
1329  }
1330  RandomAccessIterator3 zm = zs + ((xm-xs) + (ym-ys));
1331  tbb::parallel_invoke( parallel_merge_invoke<RandomAccessIterator1, RandomAccessIterator2, RandomAccessIterator3, Compare>( xs, xm, ys, ym, zs, destroy, comp ),
1333  }
1334 }
1335 
1336 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename Compare>
1337 void parallel_stable_sort_aux( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp );
1338 
1339 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename Compare>
1341 {
1342  RandomAccessIterator1 _xs, _xe;
1343  RandomAccessIterator2 _zs;
1344  bool _inplace;
1345  Compare _comp;
1346  parallel_stable_sort_aux_invoke( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp ):
1347  _xs(xs), _xe(xe), _zs(zs), _inplace(inplace), _comp(comp) {}
1348 
1350 
1351 };
1352 
1353 // Sorts [xs,xe), where zs[0:xe-xs) is temporary buffer supplied by caller.
1354 // Result is in [xs,xe) if inplace==true, otherwise in [zs,zs+(xe-xs))
1355 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename Compare>
1356 void parallel_stable_sort_aux( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp ) {
1357  const size_t SORT_CUT_OFF = 500;
1358  if( xe-xs<=SORT_CUT_OFF ) {
1359  stable_sort_base_case(xs, xe, zs, inplace, comp);
1360  } else {
1361  RandomAccessIterator1 xm = xs + (xe-xs)/2;
1362  RandomAccessIterator2 zm = zs + (xm-xs);
1363  RandomAccessIterator2 ze = zs + (xe-xs);
1364  tbb::parallel_invoke( parallel_stable_sort_aux_invoke<RandomAccessIterator1, RandomAccessIterator2, Compare>( xs, xm, zs, !inplace, comp ),
1366  if( inplace )
1367  parallel_merge( zs, zm, zm, ze, xs, inplace==2, comp );
1368  else
1369  parallel_merge( xs, xm, xm, xe, zs, false, comp );
1370  }
1371 }
1372 } // namespace internal
1373 
1374 template<typename RandomAccessIterator, typename Compare>
1375 void parallel_stable_sort( RandomAccessIterator xs, RandomAccessIterator xe, Compare comp ) {
1377  internal::raw_buffer z = internal::raw_buffer( sizeof(T)*(xe-xs) );
1378  if( z && UT_Thread::isThreadingEnabled() )
1379  internal::parallel_stable_sort_aux( xs, xe, (T*)z.get(), 2, comp );
1380  else
1381  // Not enough memory available - fall back on serial sort
1382  std::stable_sort( xs, xe, comp );
1383 }
1384 
1385 } // namespace pss
1386 
1387 
1388 #endif
ut_TaskScopedInvokeBody(const Body &body)
void UTparallelSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
UT_BlockedRange2D()=delete
SYS_FORCE_INLINE bool operator==(const ValueWrapper &cmp) const
UT_BlockedRange(T begin_value, T end_value, size_t grainsize=1)
void parallel_for(int64_t start, int64_t end, std::function< void(int64_t index)> &&task, parallel_options opt=parallel_options(0, Split_Y, 1))
Definition: parallel.h:127
SYS_FORCE_INLINE ValueWrapper & operator++()
void UTparallelFor(const Range &range, const Body &body, const int subscribe_ratio=2, const int min_grain_size=1, const bool force_use_task_scope=true)
void UTparallelDeterministicReduce(const Range &range, Body &body, const int grain_size, const bool force_use_task_scope=true)
size_t operator()(const RANGE &range) const
GLenum GLint * range
Definition: glcorearb.h:1925
tbb::split UT_Split
Definition: GA_PolyCounts.h:24
void UTparallelForTaskScope(const Range &range, const Body &body, const int subscribe_ratio=2, const int min_grain_size=1)
SYS_FORCE_INLINE bool operator!=(const ValueWrapper &cmp) const
friend void UTparallelDeterministicReduce(const Range &range, Body &body, const int grain_size, const bool force_use_taskscope)
void operator()(const Range &r)
void
Definition: png.h:1083
void UTparallelForEachNumber(IntType nitems, const Body &body, const bool force_use_task_scope=true)
void UTparallelDeterministicPrefixSumInPlace(UT_Array< T > &array, const T identity, const Op &op, const int grain_size=1024, const bool force_use_task_scope=true)
GLuint start
Definition: glcorearb.h:475
void setSizeNoInit(exint newsize)
Definition: UT_Array.h:695
void UTserialReduce(const Range &range, Body &body)
ut_ReduceTaskScopedBody(Body *body)
GLdouble GLdouble GLdouble z
Definition: glcorearb.h:848
int64 exint
Definition: SYS_Types.h:125
GLboolean GLboolean GLboolean GLboolean a
Definition: glcorearb.h:1222
void serial_destroy(RandomAccessIterator zs, RandomAccessIterator ze)
Destroy sequence [xs,xe)
void UTparallelForLightItems(const Range &range, const Body &body, const bool force_use_task_scope=true)
void UTserialForEachNumber(IntType nitems, const Body &body, bool usetaskscope=true)
T exchangeAdd(T val)
uint64 value_type
Definition: GA_PrimCompat.h:29
void parallel_stable_sort_aux(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
static bool isThreadingEnabled()
ut_ReduceTaskScopedBody(ut_ReduceTaskScopedBody &src, UT_Split)
std::optional< T > UT_Optional
Definition: UT_Optional.h:26
size_t UTestimatedNumItems(const RANGE &range)
This is needed by UT_CoarsenedRange.
IMATH_HOSTDEVICE constexpr int cmp(T a, T b) IMATH_NOEXCEPT
Definition: ImathFun.h:84
size_t operator()(const UT_BlockedRange2D< T > &range) const
#define UT_ASSERT_MSG(ZZ,...)
Definition: UT_Assert.h:159
#define SYS_DEPRECATED_REPLACE(__V__, __R__)
void join(ut_ReduceTaskScopedBody &other)
UT_ParallelInvokeFunctors(const UT_Array< F1 > &functions)
CompareResults OIIO_API compare(const ImageBuf &A, const ImageBuf &B, float failthresh, float warnthresh, ROI roi={}, int nthreads=0)
Raw memory buffer with automatic cleanup.
GLdouble n
Definition: glcorearb.h:2008
GLfloat f
Definition: glcorearb.h:1926
void parallel_merge(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp)
ut_TaskBody(const Body *body)
ut_TaskScopedInvokeBody(const ut_TaskScopedInvokeBody &src)
~raw_buffer()
Destroy buffer.
void operator()(const UT_BlockedRange< IntType > &range) const
ut_TaskScopedBody(const ut_TaskScopedBody &src)
const Body & body() const
GLuint GLuint end
Definition: glcorearb.h:475
static int getNumProcessors()
#define SYS_FORCE_INLINE
Definition: SYS_Inline.h:45
void UTparallelReduceHeavyItems(const Range &range, Body &body)
UT_BlockedRange2D(RowT row_begin, RowT row_end, ColT col_begin, ColT col_end, size_t row_grainsize=1, size_t col_grainsize=1)
NB: The arguments are in a different order than tbb.
SYS_FORCE_INLINE T operator*()
tbb::split UT_Split
Typedef to denote the "split" constructor of a range.
void operator()(const tbb::blocked_range< int > &r) const
friend void UTparallelFor(const Range &range, const Body &body, const int subscribe_ratio, const int min_grain_size, const bool force_use_task_scope)
UT_BlockedRange(UT_BlockedRange &R, UT_Split split)
void operator()(const Range &r) const
void operator()(const tbb::blocked_range< int > &r) const
ut_TaskScopedBody(const Body *body)
exint entries() const
Alias of size(). size() is preferred.
Definition: UT_Array.h:648
void operator()(const Range &r) const
UT_ParallelInvokePointers(const UT_Array< F1 * > &functions)
UT_BlockedRange()=delete
void UTparallelInvoke(bool parallel, F1 &&f1, F2 &&f2)
void UTparallelStableSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
raw_buffer(size_t bytes)
Try to obtain buffer of given size.
void parallel_stable_sort(RandomAccessIterator xs, RandomAccessIterator xe, Compare comp)
auto items() const
ImageBuf OIIO_API max(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
void * get() const
Return pointer to buffer, or NULL if buffer could not be obtained.
parallel_stable_sort_aux_invoke(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
UT_BlockedRange2D(UT_BlockedRange2D &R, UT_Split split)
UT_API bool UTperformanceIsRecordingThreadStats()
Determine if we're currently recording thread stats.
void serial_move_merge(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, Compare comp)
Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,(xe-xs)+(ye-ys)), using std::move...
void UTparallelForHeavyItems(const Range &range, const Body &body)
#define UT_ASSERT(ZZ)
Definition: UT_Assert.h:156
GLboolean r
Definition: glcorearb.h:1222
void OIIO_UTIL_API split(string_view str, std::vector< string_view > &result, string_view sep=string_view(), int maxsplit=-1)
ut_ForEachNumberBody(const Body &body, SYS_AtomicInt< IntType > &it, IntType end)
void stable_sort_base_case(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
void UTparallelForEachNumberTaskScope(IntType nitems, const Body &body)
UT_CoarsenedRange(UT_CoarsenedRange &range, tbb::split spl)
#define SYSmin(a, b)
Definition: SYS_Math.h:1571
GA_API const UT_StringHolder rest
Declare prior to use.
void sort(I begin, I end, const Pred &pred)
Definition: pugixml.cpp:7334
const ut_TaskScopedInvokeBody< Body > UTmakeTaskScopedInvokeBody(const Body &body)
SYS_FORCE_INLINE ValueWrapper(const T &it)
void UTparallelReduce(const Range &range, Body &body, const int subscribe_ratio=2, const int min_grain_size=1, const bool force_use_task_scope=true)
Definition: format.h:2459
friend void UTparallelReduce(const Range &range, Body &body, const int subscribe_ratio, const int min_grain_size, const bool force_use_taskscope)
void UTserialFor(const Range &range, const Body &body)
bool is_divisible() const
void UTparallelReduceLightItems(const Range &range, Body &body)
parallel_merge_invoke(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp)
GLenum src
Definition: glcorearb.h:1793
PcpNodeRef_ChildrenIterator begin(const PcpNodeRef::child_const_range &r)
Support for range-based for loops for PcpNodeRef children ranges.
Definition: node.h:558