32 #ifndef __UT_ParallelUtil__
33 #define __UT_ParallelUtil__
45 #include <tbb/blocked_range.h>
46 #include <tbb/blocked_range2d.h>
47 #include <tbb/task_arena.h>
48 #include <tbb/parallel_for.h>
49 #include <tbb/parallel_reduce.h>
50 #include <tbb/parallel_sort.h>
59 template <
typename RowT,
typename ColT=RowT>
63 template<
typename RANGE >
75 template<
typename T >
82 return range.rows().size() * range.cols().size();
87 template <
typename RANGE>
99 template<
typename RANGE >
110 myGrainSize(range.myGrainSize)
119 RANGE::is_divisible() &&
128 myGrainSize(grain_size)
132 template <
typename Range,
typename Body>
134 const Range &
range,
const Body &body,
135 const int subscribe_ratio,
const int min_grain_size,
136 const bool force_use_task_scope
138 template <
typename Range,
typename Body>
140 const Range &
range, Body &body,
141 const int subscribe_ratio,
const int min_grain_size,
142 const bool force_use_taskscope
144 template <
typename Range,
typename Body>
146 const Range &
range, Body &body,
const int grain_size,
147 const bool force_use_taskscope
155 template<
typename Range,
typename Body>
166 : myBody(src.myBody),
167 myParentTaskScope(src.myParentTaskScope)
185 template<
typename Range,
typename Body>
201 template <
typename IntType,
typename Body>
291 template <
typename Range,
typename Body>
293 const Range &
range,
const Body &body,
294 const int subscribe_ratio = 2,
295 const int min_grain_size = 1,
296 const bool force_use_task_scope =
true
308 if (est_range_size == 0)
312 if (num_processors == 1 || est_range_size <= min_grain_size ||
319 size_t grain_size(min_grain_size);
320 if( subscribe_ratio > 0 )
323 est_range_size / (subscribe_ratio * num_processors)
332 tbb::simple_partitioner());
338 tbb::simple_partitioner());
344 template <
typename Range,
typename Body>
346 const Range &
range,
const Body &body,
347 const int subscribe_ratio = 2,
348 const int min_grain_size = 1
351 UTparallelFor(range, body, subscribe_ratio, min_grain_size,
true);
357 template <
typename Range,
typename Body>
360 const bool force_use_task_scope =
true)
377 template <
typename Range,
typename Body>
401 template <
typename IntType,
typename Body>
410 if (num_processors == 1)
415 if (nitems <= num_processors)
432 template <
typename IntType,
typename Body>
436 for (IntType i = 0; i < nitems; ++i)
443 template <
typename IntType,
typename Body>
452 template <
typename Range,
typename Body>
460 template<
typename Body>
471 : myBody(src.myBody),
472 myParentTaskScope(src.myParentTaskScope)
491 template <
typename Body>
501 template <
typename F1,
typename F2>
516 template <
typename F1,
typename F2,
typename... Rest>
532 template <
typename F1>
537 : myFunctions(functions) {}
540 for (
int i = r.begin(); i != r.end(); ++i)
550 template <
typename F1>
561 for (
int i = 0; i < funs.
entries(); i++)
566 template <
typename F1>
571 : myFunctions(functions) {}
574 for (
int i = r.begin(); i != r.end(); ++i)
584 template <
typename F1>
595 for (
int i = 0; i < funs.
entries(); i++)
604 template<
typename Range,
typename Body>
616 : myParentTaskScope(src.myParentTaskScope)
635 const Body &
body()
const {
return myBodyPtr ? *myBodyPtr : *myBody; }
636 Body &
body() {
return myBodyPtr ? *myBodyPtr : *myBody; }
715 template <
typename Range,
typename Body>
719 const int subscribe_ratio = 2,
720 const int min_grain_size = 1,
721 const bool force_use_task_scope =
true
733 if (est_range_size == 0)
737 if (num_processors == 1 || est_range_size <= min_grain_size ||
744 size_t grain_size(min_grain_size);
745 if( subscribe_ratio > 0 )
748 est_range_size / (subscribe_ratio * num_processors)
755 tbb::parallel_reduce(coarsened_range,
757 tbb::simple_partitioner());
761 tbb::parallel_reduce(coarsened_range, body, tbb::simple_partitioner());
775 template <
typename Range,
typename Body>
779 const int grain_size,
780 const bool force_use_task_scope =
true
788 if (est_range_size == 0)
792 "FIXME: There needs to be a way to do identical splits and joins when single-threading,"
793 " to avoid having different roundoff error from when multi-threading. "
794 " Something using simple_partitioner() might work.");
800 tbb::parallel_deterministic_reduce(coarsened_range,
802 tbb::simple_partitioner());
806 tbb::parallel_deterministic_reduce(coarsened_range, body);
813 template <
typename Range,
typename Body>
822 template <
typename Range,
typename Body>
830 template <
typename Range,
typename Body>
836 UTparallelCancelGroupExecution()
839 tbb::task::self().cancel_group_execution();
846 template <
typename RandomAccessIterator,
typename Compare>
850 tbb::parallel_sort(begin, end, compare);
859 template <
typename RandomAccessIterator>
863 tbb::parallel_sort(begin, end);
872 template <
typename T>
876 tbb::parallel_sort(begin, end);
884 template<
typename RandomAccessIterator,
typename Compare>
889 template<
class RandomAccessIterator>
903 template <
typename RandomAccessIterator,
typename Compare>
916 template <
typename RandomAccessIterator>
928 template <
typename T>
940 template <
typename T,
typename Compare>
954 template <
typename T>
969 template <
typename T,
typename Compare>
988 template <
typename T>
996 : tbb::blocked_range<
T>(begin_value, end_value, grainsize)
999 : tbb::blocked_range<
T>(R, split)
1020 {
return (myCurrent == cmp.myCurrent); }
1023 {
return !(*
this ==
cmp); }
1056 template <
typename RowT,
typename ColT>
1065 ColT col_begin, ColT col_end,
1066 size_t row_grainsize=1,
size_t col_grainsize=1)
1067 : tbb::blocked_range2d<RowT, ColT>(row_begin, row_end, row_grainsize,
1068 col_begin, col_end, col_grainsize)
1071 : tbb::blocked_range2d<RowT, ColT>(R, split)
1081 template <
typename Op,
typename T>
1087 const int grain_size = 1024,
1088 const bool force_use_task_scope =
true
1093 if (array.
entries() < grain_size * 10)
1098 total = op(total, array(i));
1108 exint nblocks = (array.
entries() + grain_size-1) / grain_size;
1115 for (
exint block = r.begin(); block < r.end(); block++)
1120 for (
exint i = start; i <
end; i++)
1122 total = op(total, array(i));
1126 blocktotals(block) = total;
1128 }, force_use_task_scope);
1133 grain_size, force_use_task_scope);
1138 for (
exint block = r.begin(); block < r.end(); block++)
1144 T total = blocktotals(block-1);
1145 for (
exint i = start; i <
end; i++)
1147 array(i) = op(total, array(i));
1151 }, force_use_task_scope);
1158 #if TBB_VERSION_MAJOR >= 2018
1159 template <
typename F>
static inline void
1160 UTisolate(F &
f) { tbb::this_task_arena::isolate(f); }
1162 template <
typename F>
static inline void
1163 UTisolate(
const F &
f) { tbb::this_task_arena::isolate(f); }
1165 template <
typename F>
static inline void
1168 tbb::task_arena __nested;
1169 __nested.execute(f);
1171 template <
typename F>
static inline void
1172 UTisolate(
const F &f)
1174 tbb::task_arena __nested;
1175 __nested.execute(f);
1216 #include <algorithm>
1220 namespace internal {
1223 template<
class RandomAccessIterator>
1233 template<
class RandomAccessIterator1,
class RandomAccessIterator2,
class RandomAccessIterator3,
class Compare>
1234 void serial_move_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, Compare comp ) {
1240 if( comp(*ys,*xs) ) {
1241 *zs = std::move(*ys);
1243 if( ++ys==ye )
break;
1245 *zs = std::move(*xs);
1247 if( ++xs==xe )
goto movey;
1255 std::move( ys, ye, zs );
1258 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
1259 void stable_sort_base_case( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs,
int inplace, Compare comp) {
1260 std::stable_sort( xs, xe, comp );
1262 RandomAccessIterator2 ze = zs + (xe-xs);
1266 for( ; zs<ze; ++zs )
1270 for( ; zs<ze; ++xs, ++zs )
1271 new(&*zs) T(std::move(*xs));
1282 operator bool()
const {
return ptr;}
1284 void*
get()
const {
return ptr;}
1289 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename RandomAccessIterator3,
typename Compare>
1290 void parallel_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys,
1291 RandomAccessIterator2 ye, RandomAccessIterator3 zs,
bool destroy, Compare comp );
1293 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename RandomAccessIterator3,
typename Compare>
1301 parallel_merge_invoke( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye,
1302 RandomAccessIterator3 zs,
bool destroy, Compare comp):
1310 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename RandomAccessIterator3,
typename Compare>
1311 void parallel_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys,
1312 RandomAccessIterator2 ye, RandomAccessIterator3 zs,
bool destroy, Compare comp ) {
1313 const size_t MERGE_CUT_OFF = 2000;
1314 if( (xe-xs) + (ye-ys) <= MERGE_CUT_OFF ) {
1321 RandomAccessIterator1 xm;
1322 RandomAccessIterator2 ym;
1323 if( xe-xs < ye-ys ) {
1325 xm = std::upper_bound(xs,xe,*ym,comp);
1328 ym = std::lower_bound(ys,ye,*xm,comp);
1330 RandomAccessIterator3 zm = zs + ((xm-xs) + (ym-ys));
1331 tbb::parallel_invoke(
parallel_merge_invoke<RandomAccessIterator1, RandomAccessIterator2, RandomAccessIterator3, Compare>( xs, xm, ys, ym, zs, destroy, comp ),
1336 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
1337 void parallel_stable_sort_aux( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs,
int inplace, Compare comp );
1339 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
1355 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
1356 void parallel_stable_sort_aux( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs,
int inplace, Compare comp ) {
1357 const size_t SORT_CUT_OFF = 500;
1358 if( xe-xs<=SORT_CUT_OFF ) {
1361 RandomAccessIterator1 xm = xs + (xe-xs)/2;
1362 RandomAccessIterator2 zm = zs + (xm-xs);
1363 RandomAccessIterator2 ze = zs + (xe-xs);
1374 template<
typename RandomAccessIterator,
typename Compare>
1382 std::stable_sort( xs, xe, comp );
ut_TaskScopedInvokeBody(const Body &body)
void UTparallelSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
UT_BlockedRange2D()=delete
SYS_FORCE_INLINE bool operator==(const ValueWrapper &cmp) const
UT_BlockedRange(T begin_value, T end_value, size_t grainsize=1)
void parallel_for(int64_t start, int64_t end, std::function< void(int64_t index)> &&task, parallel_options opt=parallel_options(0, Split_Y, 1))
SYS_FORCE_INLINE ValueWrapper & operator++()
void UTparallelFor(const Range &range, const Body &body, const int subscribe_ratio=2, const int min_grain_size=1, const bool force_use_task_scope=true)
void UTparallelDeterministicReduce(const Range &range, Body &body, const int grain_size, const bool force_use_task_scope=true)
size_t operator()(const RANGE &range) const
void UTparallelForTaskScope(const Range &range, const Body &body, const int subscribe_ratio=2, const int min_grain_size=1)
SYS_FORCE_INLINE bool operator!=(const ValueWrapper &cmp) const
friend void UTparallelDeterministicReduce(const Range &range, Body &body, const int grain_size, const bool force_use_taskscope)
void operator()(const Range &r)
void UTparallelForEachNumber(IntType nitems, const Body &body, const bool force_use_task_scope=true)
void UTparallelDeterministicPrefixSumInPlace(UT_Array< T > &array, const T identity, const Op &op, const int grain_size=1024, const bool force_use_task_scope=true)
void setSizeNoInit(exint newsize)
void UTserialReduce(const Range &range, Body &body)
ut_ReduceTaskScopedBody(Body *body)
RandomAccessIterator1 _xe
GLdouble GLdouble GLdouble z
RandomAccessIterator1 _xe
GLboolean GLboolean GLboolean GLboolean a
void serial_destroy(RandomAccessIterator zs, RandomAccessIterator ze)
Destroy sequence [xs,xe)
void UTparallelForLightItems(const Range &range, const Body &body, const bool force_use_task_scope=true)
void UTserialForEachNumber(IntType nitems, const Body &body, bool usetaskscope=true)
RandomAccessIterator2 _ye
void parallel_stable_sort_aux(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
static bool isThreadingEnabled()
ut_ReduceTaskScopedBody(ut_ReduceTaskScopedBody &src, UT_Split)
std::optional< T > UT_Optional
size_t UTestimatedNumItems(const RANGE &range)
This is needed by UT_CoarsenedRange.
IMATH_HOSTDEVICE constexpr int cmp(T a, T b) IMATH_NOEXCEPT
size_t operator()(const UT_BlockedRange2D< T > &range) const
#define UT_ASSERT_MSG(ZZ,...)
#define SYS_DEPRECATED_REPLACE(__V__, __R__)
void join(ut_ReduceTaskScopedBody &other)
RandomAccessIterator2 _ys
UT_ParallelInvokeFunctors(const UT_Array< F1 > &functions)
RandomAccessIterator2 _zs
CompareResults OIIO_API compare(const ImageBuf &A, const ImageBuf &B, float failthresh, float warnthresh, ROI roi={}, int nthreads=0)
Raw memory buffer with automatic cleanup.
void parallel_merge(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp)
ut_TaskBody(const Body *body)
ut_TaskScopedInvokeBody(const ut_TaskScopedInvokeBody &src)
~raw_buffer()
Destroy buffer.
void operator()(const UT_BlockedRange< IntType > &range) const
ut_TaskScopedBody(const ut_TaskScopedBody &src)
const Body & body() const
static int getNumProcessors()
void UTparallelReduceHeavyItems(const Range &range, Body &body)
UT_BlockedRange2D(RowT row_begin, RowT row_end, ColT col_begin, ColT col_end, size_t row_grainsize=1, size_t col_grainsize=1)
NB: The arguments are in a different order than tbb.
SYS_FORCE_INLINE T operator*()
RandomAccessIterator1 _xs
tbb::split UT_Split
Typedef to denote the "split" constructor of a range.
void operator()(const tbb::blocked_range< int > &r) const
friend void UTparallelFor(const Range &range, const Body &body, const int subscribe_ratio, const int min_grain_size, const bool force_use_task_scope)
UT_BlockedRange(UT_BlockedRange &R, UT_Split split)
void operator()(const Range &r) const
void operator()(const tbb::blocked_range< int > &r) const
ut_TaskScopedBody(const Body *body)
exint entries() const
Alias of size(). size() is preferred.
void operator()(const Range &r) const
UT_ParallelInvokePointers(const UT_Array< F1 * > &functions)
void UTparallelInvoke(bool parallel, F1 &&f1, F2 &&f2)
void UTparallelStableSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
RandomAccessIterator3 _zs
raw_buffer(size_t bytes)
Try to obtain buffer of given size.
void parallel_stable_sort(RandomAccessIterator xs, RandomAccessIterator xe, Compare comp)
ImageBuf OIIO_API max(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
void * get() const
Return pointer to buffer, or NULL if buffer could not be obtained.
parallel_stable_sort_aux_invoke(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
UT_BlockedRange2D(UT_BlockedRange2D &R, UT_Split split)
void serial_move_merge(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, Compare comp)
Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,(xe-xs)+(ye-ys)), using std::move...
void UTparallelForHeavyItems(const Range &range, const Body &body)
RandomAccessIterator1 _xs
void OIIO_UTIL_API split(string_view str, std::vector< string_view > &result, string_view sep=string_view(), int maxsplit=-1)
ut_ForEachNumberBody(const Body &body, SYS_AtomicInt< IntType > &it, IntType end)
void stable_sort_base_case(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
void UTparallelForEachNumberTaskScope(IntType nitems, const Body &body)
UT_CoarsenedRange(UT_CoarsenedRange &range, tbb::split spl)
GA_API const UT_StringHolder rest
void sort(I begin, I end, const Pred &pred)
const ut_TaskScopedInvokeBody< Body > UTmakeTaskScopedInvokeBody(const Body &body)
SYS_FORCE_INLINE ValueWrapper(const T &it)
void UTparallelReduce(const Range &range, Body &body, const int subscribe_ratio=2, const int min_grain_size=1, const bool force_use_task_scope=true)
friend void UTparallelReduce(const Range &range, Body &body, const int subscribe_ratio, const int min_grain_size, const bool force_use_taskscope)
void UTserialFor(const Range &range, const Body &body)
bool is_divisible() const
void UTparallelReduceLightItems(const Range &range, Body &body)
parallel_merge_invoke(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp)
PcpNodeRef_ChildrenIterator begin(const PcpNodeRef::child_const_range &r)
Support for range-based for loops for PcpNodeRef children ranges.