Skip to content

Commit 7cce7a7

Browse files
committed
add tthread
1 parent 874588f commit 7cce7a7

File tree

6 files changed

+1829
-78
lines changed

6 files changed

+1829
-78
lines changed

RcppParallel.Rproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ AlwaysSaveHistory: No
66

77
EnableCodeIndexing: Yes
88
UseSpacesForTab: Yes
9-
NumSpacesForTab: 3
9+
NumSpacesForTab: 2
1010
Encoding: UTF-8
1111

1212
RnwWeave: Sweave

inst/examples/parallel-tbb.cpp

Lines changed: 3 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -54,83 +54,9 @@ double vectorSum(NumericVector x) {
5454
*
5555
*/
5656

57-
// [[Rcpp::depends(TBB)]]
58-
#include <tbb/tbb.h>
59-
60-
struct IWorker {
61-
virtual ~IWorker() {}
62-
virtual void operator()(std::size_t begin, std::size_t end) = 0;
63-
};
64-
65-
struct TBBWorker {
66-
67-
explicit TBBWorker(IWorker& worker) : worker_(worker) {}
68-
69-
void operator()(const tbb::blocked_range<size_t>& r) const {
70-
worker_(r.begin(), r.end());
71-
}
72-
73-
private:
74-
IWorker& worker_;
75-
};
76-
77-
inline void parallelFor(std::size_t begin, std::size_t end, IWorker& worker) {
78-
79-
TBBWorker tbbWorker(worker);
80-
tbb::parallel_for(tbb::blocked_range<size_t>(begin, end), tbbWorker);
81-
}
82-
83-
template <typename T>
84-
struct Reduce : public IWorker {
85-
virtual void split(const T& source) = 0;
86-
virtual void join(const T& rhs) = 0;
87-
};
88-
89-
template <typename T>
90-
struct TBBReduce {
91-
92-
explicit TBBReduce(Reduce<T>& reduce)
93-
: pReduce_(static_cast<T*>(&reduce)), wasSplit_(false)
94-
{
95-
}
96-
97-
virtual ~TBBReduce() {
98-
try
99-
{
100-
if (wasSplit_)
101-
delete pReduce_;
102-
}
103-
catch(...)
104-
{
105-
}
106-
}
107-
108-
void operator()(const tbb::blocked_range<size_t>& r) {
109-
pReduce_->operator()(r.begin(), r.end());
110-
}
111-
112-
TBBReduce(TBBReduce& reduce, tbb::split)
113-
: pReduce_(new T()), wasSplit_(true)
114-
{
115-
pReduce_->split(*reduce.pReduce_);
116-
}
117-
118-
void join(const TBBReduce& reduce) {
119-
pReduce_->join(*reduce.pReduce_);
120-
}
121-
122-
private:
123-
T* pReduce_;
124-
bool wasSplit_;
125-
};
126-
127-
template <typename T>
128-
inline void parallelReduce(std::size_t begin, std::size_t end, Reduce<T>& worker) {
129-
130-
TBBReduce<T> tbbReduce(worker);
131-
tbb::parallel_reduce(tbb::blocked_range<size_t>(begin, end), tbbReduce);
132-
}
133-
57+
// [[Rcpp::depends(RcppParallel)]]
58+
#include <RcppParallel.h>
59+
using namespace RcppParallel;
13460

13561
struct SquareRoot : public IWorker
13662
{

inst/include/RcppParallel.h

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
2+
#ifndef __RCPP_PARALLEL__
3+
#define __RCPP_PARALLEL__
4+
5+
// determine whether to use TBB
6+
#ifndef RCPP_PARALLEL_USE_TBB
7+
#ifdef _WIN32
8+
#define RCPP_PARALLEL_USE_TBB 0
9+
#else
10+
#define RCPP_PARALLEL_USE_TBB 1
11+
#endif
12+
#endif
13+
14+
namespace RcppParallel {
15+
16+
////// Shared Types ///////////////////////////////////////////////////
17+
18+
// Code which can be executed within a worker thread. We implement
19+
// dynamic dispatch using vtables so we can have a stable type to
20+
// cast the void* to within the worker thread.
21+
22+
struct IWorker {
23+
virtual ~IWorker() {}
24+
virtual void operator()(std::size_t begin, std::size_t end) = 0;
25+
};
26+
27+
template <typename T>
28+
struct Reduce : public IWorker {
29+
virtual void split(const T& source) = 0;
30+
virtual void join(const T& rhs) = 0;
31+
};
32+
33+
} // namespace RcppParallel
34+
35+
////// TinyThreads Implementation /////////////////////////////////////
36+
37+
// tinythread library
38+
#include <tthread/tinythread.h>
39+
40+
#include <vector>
41+
42+
namespace RcppParallel {
43+
44+
namespace {
45+
46+
// Class which represents a range of indexes to perform work on
47+
// (worker functions are passed this range so they know which
48+
// elements are safe to read/write to)
49+
class IndexRange {
50+
public:
51+
52+
// Initizlize with a begin and (exclusive) end index
53+
IndexRange(std::size_t begin, std::size_t end)
54+
: begin_(begin), end_(end)
55+
{
56+
}
57+
58+
// Access begin() and end()
59+
std::size_t begin() const { return begin_; }
60+
std::size_t end() const { return end_; }
61+
62+
private:
63+
std::size_t begin_;
64+
std::size_t end_;
65+
};
66+
67+
68+
// Because tinythread allows us to pass only a plain C function
69+
// we need to pass our worker and range within a struct that we
70+
// can cast to/from void*
71+
struct Work {
72+
Work(IndexRange range, IWorker& worker)
73+
: range(range), worker(worker)
74+
{
75+
}
76+
IndexRange range;
77+
IWorker& worker;
78+
};
79+
80+
// Thread which performs work (then deletes the work object
81+
// when it's done)
82+
extern "C" inline void workerThread(void* data) {
83+
try
84+
{
85+
Work* pWork = static_cast<Work*>(data);
86+
pWork->worker(pWork->range.begin(), pWork->range.end());
87+
delete pWork;
88+
}
89+
catch(...)
90+
{
91+
}
92+
}
93+
94+
// Function to calculate the ranges for a given input
95+
std::vector<IndexRange> splitInputRange(const IndexRange& range) {
96+
97+
// max threads is based on hardware concurrency
98+
std::size_t threads = tthread::thread::hardware_concurrency();
99+
100+
// determine the chunk size
101+
std::size_t length = range.end() - range.begin();
102+
std::size_t chunkSize = length / threads;
103+
104+
// allocate ranges
105+
std::vector<IndexRange> ranges;
106+
std::size_t nextIndex = range.begin();
107+
for (std::size_t i = 0; i<threads; i++) {
108+
std::size_t begin = nextIndex;
109+
std::size_t end = std::min(begin + chunkSize, range.end());
110+
ranges.push_back(IndexRange(begin, end));
111+
nextIndex = end;
112+
}
113+
114+
// return ranges
115+
return ranges;
116+
}
117+
118+
} // anonymous namespace
119+
120+
// Execute the IWorker over the IndexRange in parallel
121+
inline void ttParallelFor(std::size_t begin, std::size_t end, IWorker& worker) {
122+
123+
using namespace tthread;
124+
125+
// split the work
126+
std::vector<IndexRange> ranges = splitInputRange(IndexRange(begin, end));
127+
128+
// create threads
129+
std::vector<thread*> threads;
130+
for (std::size_t i = 0; i<ranges.size(); ++i) {
131+
threads.push_back(new thread(workerThread, new Work(ranges[i], worker)));
132+
}
133+
134+
// join and delete them
135+
for (std::size_t i = 0; i<threads.size(); ++i) {
136+
threads[i]->join();
137+
delete threads[i];
138+
}
139+
}
140+
141+
// Execute the IWorker over the range in parallel then join results
142+
template <typename T>
143+
inline void ttParallelReduce(std::size_t begin, std::size_t end, Reduce<T>& worker) {
144+
145+
using namespace tthread;
146+
147+
// split the work
148+
std::vector<IndexRange> ranges = splitInputRange(IndexRange(begin, end));
149+
150+
// create threads (split for each thread and track the allocated workers)
151+
std::vector<thread*> threads;
152+
std::vector<Reduce<T>*> workers;
153+
for (std::size_t i = 0; i<ranges.size(); ++i) {
154+
T* pWorker = new T();
155+
pWorker->split(static_cast<T&>(worker));
156+
workers.push_back(pWorker);
157+
threads.push_back(new thread(workerThread, new Work(ranges[i], *pWorker)));
158+
}
159+
160+
// wait for each thread, join it's results, then delete the worker & thread
161+
for (std::size_t i = 0; i<threads.size(); ++i) {
162+
163+
// wait for thread
164+
threads[i]->join();
165+
166+
// join the results
167+
worker.join(static_cast<T&>(*workers[i]));
168+
169+
// delete the worker (which we split above) and the thread
170+
delete workers[i];
171+
delete threads[i];
172+
}
173+
}
174+
175+
} // namespace RcppParallel
176+
177+
////// TBB Implementation /////////////////////////////////////////////
178+
179+
#if RCPP_PARALLEL_USE_TBB
180+
181+
#include <tbb/tbb.h>
182+
183+
namespace RcppParallel {
184+
185+
struct TBBWorker
186+
{
187+
explicit TBBWorker(IWorker& worker) : worker_(worker) {}
188+
189+
void operator()(const tbb::blocked_range<size_t>& r) const {
190+
worker_(r.begin(), r.end());
191+
}
192+
193+
private:
194+
IWorker& worker_;
195+
};
196+
197+
inline void tbbParallelFor(std::size_t begin, std::size_t end, IWorker& worker) {
198+
TBBWorker tbbWorker(worker);
199+
tbb::parallel_for(tbb::blocked_range<size_t>(begin, end), tbbWorker);
200+
}
201+
202+
template <typename T>
203+
struct TBBReduce {
204+
205+
explicit TBBReduce(Reduce<T>& reduce)
206+
: pReduce_(static_cast<T*>(&reduce)), wasSplit_(false)
207+
{
208+
}
209+
210+
virtual ~TBBReduce() {
211+
try
212+
{
213+
if (wasSplit_)
214+
delete pReduce_;
215+
}
216+
catch(...)
217+
{
218+
}
219+
}
220+
221+
void operator()(const tbb::blocked_range<size_t>& r) {
222+
pReduce_->operator()(r.begin(), r.end());
223+
}
224+
225+
TBBReduce(TBBReduce& reduce, tbb::split)
226+
: pReduce_(new T()), wasSplit_(true)
227+
{
228+
pReduce_->split(*reduce.pReduce_);
229+
}
230+
231+
void join(const TBBReduce& reduce) {
232+
pReduce_->join(*reduce.pReduce_);
233+
}
234+
235+
private:
236+
T* pReduce_;
237+
bool wasSplit_;
238+
};
239+
240+
template <typename T>
241+
inline void tbbParallelReduce(std::size_t begin, std::size_t end, Reduce<T>& worker) {
242+
243+
TBBReduce<T> tbbReduce(worker);
244+
tbb::parallel_reduce(tbb::blocked_range<size_t>(begin, end), tbbReduce);
245+
}
246+
247+
} // namespace RcppParallel
248+
249+
#endif
250+
251+
////// Dispatch to Implementation /////////////////////////////////////
252+
253+
namespace RcppParallel {
254+
255+
inline void parallelFor(std::size_t begin, std::size_t end, IWorker& worker) {
256+
#if RCPP_PARALLEL_USE_TBB
257+
tbbParallelFor(begin, end, worker);
258+
#else
259+
ttParallelFor(begin, end, worker);
260+
#endif
261+
}
262+
263+
template <typename T>
264+
inline void parallelReduce(std::size_t begin, std::size_t end, Reduce<T>& worker) {
265+
#if RCPP_PARALLEL_USE_TBB
266+
tbbParallelReduce(begin, end, worker);
267+
#else
268+
ttParallelReduce(begin, end, worker);
269+
#endif
270+
}
271+
272+
} // namespace RcppParallel
273+
274+
#endif // __RCPP_PARALLEL__

0 commit comments

Comments
 (0)