ROSE 0.11.145.147
ThreadWorkers.h
1// WARNING: Changes to this file must be contributed back to Sawyer or else they will
2// be clobbered by the next update from Sawyer. The Sawyer repository is at
3// https://gitlab.com/charger7534/sawyer.git.
4
5
6
7
8#include <Sawyer/Exception.h>
9#include <Sawyer/Graph.h>
10#include <Sawyer/Map.h>
11#include <Sawyer/Sawyer.h>
12#include <Sawyer/Stack.h>
13
14#include <boost/foreach.hpp>
15#include <boost/thread/condition_variable.hpp>
16#include <boost/thread/locks.hpp>
17#include <boost/thread/mutex.hpp>
18#include <boost/thread/thread.hpp>
19#include <boost/version.hpp>
20#include <set>
21
22namespace Sawyer {
23
31template<class DependencyGraph, class Functor>
33 boost::mutex mutex_; // protects the following members after the constructor
34 DependencyGraph dependencies_; // outstanding dependencies
35 bool hasStarted_; // set when work has started
36 bool hasWaited_; // set when wait() is called
37 Container::Stack<size_t> workQueue_; // outstanding work identified by vertex ID of dependency graph
38 boost::condition_variable workInserted_; // signaled when work is added to the queue or all work is consumed
39 size_t nWorkers_; // number of worker threads allocated
40 boost::thread *workers_; // worker threads
41 size_t nItemsStarted_; // number of work items started
42 size_t nItemsFinished_; // number of work items that have been completed already
43 size_t nWorkersRunning_; // number of workers that are currently busy doing something
44 size_t nWorkersFinished_; // number of worker threads that have returned
45 std::set<size_t> runningTasks_; // tasks (vertex IDs) that are running
46
47public:
53 : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
54 nWorkersRunning_(0), nWorkersFinished_(0) {}
55
71 ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
72 : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
73 nWorkersRunning_(0), nWorkersFinished_(0) {
74 try {
75 run(dependencies, nWorkers, functor);
76 } catch (const Exception::ContainsCycle&) {
77 delete[] workers_;
78 throw; // destructor won't be called
79 }
80 }
81
86 wait();
87 delete[] workers_;
88 }
89
103 void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
104 boost::lock_guard<boost::mutex> lock(mutex_);
105 if (hasStarted_)
106 throw std::runtime_error("work can start only once per object");
107 hasStarted_ = true;
108 dependencies_ = dependencies;
109 if (0 == nWorkers)
110 nWorkers = boost::thread::hardware_concurrency();
111 nWorkers_ = std::max((size_t)1, std::min(nWorkers, dependencies.nVertices()));
112 nItemsStarted_ = nWorkersFinished_ = 0;
113 runningTasks_.clear();
114 fillWorkQueueNS();
115 startWorkersNS(functor);
116 }
117
122 void wait() {
123 boost::unique_lock<boost::mutex> lock(mutex_);
124 if (!hasStarted_ || hasWaited_)
125 return;
126 hasWaited_ = true;
127 lock.unlock();
128
129 for (size_t i=0; i<nWorkers_; ++i)
130 workers_[i].join();
131
132 lock.lock();
133 if (dependencies_.nEdges() != 0)
134 throw Exception::ContainsCycle("task dependency graph contains cycle(s)");
135 dependencies_.clear();
136 }
137
142 template<class Rep, class Period>
143 bool tryWaitFor(const boost::chrono::duration<Rep, Period> &relTime) {
144 const boost::chrono::steady_clock::time_point endAt = boost::chrono::steady_clock::now() + relTime;
145 boost::unique_lock<boost::mutex> lock(mutex_);
146 if (!hasStarted_ || hasWaited_)
147 return true;
148 lock.unlock();
149
150 for (size_t i = 0; i < nWorkers_; ++i) {
151 if (!workers_[i].try_join_until(endAt))
152 return false;
153 }
154
155 lock.lock();
156 hasWaited_ = true;
157 if (dependencies_.nEdges() != 0)
158 throw Exception::ContainsCycle("task dependency graph contains cycle(s)");
159 dependencies_.clear();
160 return true;
161 }
162
168 void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
169 start(dependencies, nWorkers, functor);
170 wait();
171 }
172
176 bool isFinished() {
177 boost::lock_guard<boost::mutex> lock(mutex_);
178 return !hasStarted_ || nWorkersFinished_ == nWorkers_;
179 }
180
185 size_t nStarted() {
186 boost::lock_guard<boost::mutex> lock(mutex_);
187 return nItemsStarted_;
188 }
189
193 size_t nFinished() {
194 boost::lock_guard<boost::mutex> lock(mutex_);
195 return nItemsFinished_;
196 }
197
202 std::set<size_t> runningTasks() {
203 boost::lock_guard<boost::mutex> lock(mutex_);
204 return runningTasks_;
205 }
206
211 std::pair<size_t, size_t> nWorkers() {
212 boost::lock_guard<boost::mutex> lock(mutex_);
213 return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
214 }
215
216private:
217 // Scan the dependency graph and fill the work queue with vertices that have no dependencies.
218 void fillWorkQueueNS() {
219 ASSERT_require(workQueue_.isEmpty());
220 BOOST_FOREACH (const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
221 if (vertex.nOutEdges() == 0)
222 workQueue_.push(vertex.id());
223 }
224 }
225
226 // Start worker threads
227 void startWorkersNS(Functor functor) {
228 workers_ = new boost::thread[nWorkers_];
229 for (size_t i=0; i<nWorkers_; ++i)
230 workers_[i] = boost::thread(startWorker, this, functor);
231 }
232
233 // Worker threads execute here
234 static void startWorker(ThreadWorkers *self, Functor functor) {
235 self->worker(functor);
236 }
237
238 void worker(Functor functor) {
239 while (1) {
240 // Get the next item of work
241 boost::unique_lock<boost::mutex> lock(mutex_);
242 while (nItemsFinished_ < nItemsStarted_ && workQueue_.isEmpty())
243 workInserted_.wait(lock);
244 if (nItemsFinished_ == nItemsStarted_ && workQueue_.isEmpty()) {
245 ++nWorkersFinished_;
246 return;
247 }
248 ASSERT_forbid(workQueue_.isEmpty());
249 size_t workItemId = workQueue_.pop();
250 typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
251 ASSERT_require(workVertex->nOutEdges() == 0);
252 typename DependencyGraph::VertexValue workItem = workVertex->value();
253 ++nItemsStarted_;
254
255 // Do the work
256 ++nWorkersRunning_;
257 runningTasks_.insert(workItemId);
258 lock.unlock();
259 functor(workItemId, workItem);
260 lock.lock();
261 ++nItemsFinished_;
262 --nWorkersRunning_;
263 runningTasks_.erase(workItemId);
264
265 // Look for more work as we remove some dependency edges. Watch out for parallel edges (self edges not possible).
266 Container::Map<size_t, typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
267 BOOST_FOREACH (const typename DependencyGraph::Edge &edge, workVertex->inEdges())
268 candidateWorkItems.insert(edge.source()->id(), edge.source());
269 dependencies_.clearInEdges(workVertex);
270 size_t newWorkInserted = 0;
271 BOOST_FOREACH (const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems.values()) {
272 if (candidate->nOutEdges() == 0) {
273 workQueue_.push(candidate->id());
274 ++newWorkInserted;
275 }
276 }
277
278 // Notify other workers
279 if (0 == newWorkInserted) {
280 if (workQueue_.isEmpty())
281 workInserted_.notify_all();
282 } else if (1 == newWorkInserted) {
283 // we'll do the new work ourself
284 } else if (2 == newWorkInserted) {
285 workInserted_.notify_one();
286 } else {
287 workInserted_.notify_all();
288 }
289 }
290 }
291};
292
314template<class DependencyGraph, class Functor>
315void
316workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
317 ThreadWorkers<DependencyGraph, Functor>(dependencies, nWorkers, functor);
318}
319
320
321template<class DependencyGraph, class Functor, class Monitor>
322void
323workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor,
324 Monitor monitor, boost::chrono::milliseconds period) {
326 workers.start(dependencies, nWorkers, functor);
327 while (!workers.tryWaitFor(period))
328 monitor(dependencies, workers.nFinished(), workers.runningTasks());
329}
333} // namespace
Stack-based container.
Definition Stack.h:22
Value pop()
Pop existing item from stack.
Definition Stack.h:98
Stack & push(const Value &value)
Push new item onto stack.
Definition Stack.h:89
bool isEmpty() const
Determines if the stack is empty.
Definition Stack.h:48
Error when a cycle is detected.
Work list with dependencies.
bool tryWaitFor(const boost::chrono::duration< Rep, Period > &relTime)
Wait for work to complete.
void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Synchronously processes tasks.
size_t nFinished()
Number of tasks that have completed.
std::set< size_t > runningTasks()
Tasks currently running.
std::pair< size_t, size_t > nWorkers()
Number of worker threads.
size_t nStarted()
Number of tasks that have started.
bool isFinished()
Test whether all possible work is finished.
ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Constructor that synchronously runs the work.
ThreadWorkers()
Default constructor.
~ThreadWorkers()
Destructor.
void wait()
Wait for work to complete.
void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Start workers and return.
Sawyer support library.
void workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Performs work in parallel.