ROSE  0.11.28.0
ThreadWorkers.h
1 // WARNING: Changes to this file must be contributed back to Sawyer or else they will
2 // be clobbered by the next update from Sawyer. The Sawyer repository is at
3 // https://github.com/matzke1/sawyer.
4 
5 
6 
7 
8 #include <Sawyer/Exception.h>
9 #include <Sawyer/Graph.h>
10 #include <Sawyer/Map.h>
11 #include <Sawyer/Sawyer.h>
12 #include <Sawyer/Stack.h>
13 
14 #include <boost/foreach.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/locks.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/thread.hpp>
19 #include <boost/version.hpp>
20 #include <set>
21 
22 namespace Sawyer {
23 
31 template<class DependencyGraph, class Functor>
33  boost::mutex mutex_; // protects the following members after the constructor
34  DependencyGraph dependencies_; // outstanding dependencies
35  bool hasStarted_; // set when work has started
36  bool hasWaited_; // set when wait() is called
37  Container::Stack<size_t> workQueue_; // outstanding work identified by vertex ID of dependency graph
38  boost::condition_variable workInserted_; // signaled when work is added to the queue or all work is consumed
39  size_t nWorkers_; // number of worker threads allocated
40  boost::thread *workers_; // worker threads
41  size_t nItemsStarted_; // number of work items started
42  size_t nItemsFinished_; // number of work items that have been completed already
43  size_t nWorkersRunning_; // number of workers that are currently busy doing something
44  size_t nWorkersFinished_; // number of worker threads that have returned
45  std::set<size_t> runningTasks_; // tasks (vertex IDs) that are running
46 
47 public:
53  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
54  nWorkersRunning_(0), nWorkersFinished_(0) {}
55 
71  ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
72  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
73  nWorkersRunning_(0), nWorkersFinished_(0) {
74  try {
75  run(dependencies, nWorkers, functor);
76  } catch (const Exception::ContainsCycle&) {
77  delete[] workers_;
78  throw; // destructor won't be called
79  }
80  }
81 
86  wait();
87  delete[] workers_;
88  }
89 
103  void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
104  boost::lock_guard<boost::mutex> lock(mutex_);
105  if (hasStarted_)
106  throw std::runtime_error("work can start only once per object");
107  hasStarted_ = true;
108  dependencies_ = dependencies;
109  if (0 == nWorkers)
110  nWorkers = boost::thread::hardware_concurrency();
111  nWorkers_ = std::max((size_t)1, std::min(nWorkers, dependencies.nVertices()));
112  nItemsStarted_ = nWorkersFinished_ = 0;
113  runningTasks_.clear();
114  fillWorkQueueNS();
115  startWorkersNS(functor);
116  }
117 
122  void wait() {
123  boost::unique_lock<boost::mutex> lock(mutex_);
124  if (!hasStarted_ || hasWaited_)
125  return;
126  hasWaited_ = true;
127  lock.unlock();
128 
129  for (size_t i=0; i<nWorkers_; ++i)
130  workers_[i].join();
131 
132  lock.lock();
133  if (dependencies_.nEdges() != 0)
134  throw Exception::ContainsCycle("task dependency graph contains cycle(s)");
135  dependencies_.clear();
136  }
137 
142  template<class Rep, class Period>
143  bool tryWaitFor(const boost::chrono::duration<Rep, Period> &relTime) {
144  const boost::chrono::steady_clock::time_point endAt = boost::chrono::steady_clock::now() + relTime;
145  boost::unique_lock<boost::mutex> lock(mutex_);
146  if (!hasStarted_ || hasWaited_)
147  return true;
148  lock.unlock();
149 
150  for (size_t i = 0; i < nWorkers_; ++i) {
151  if (!workers_[i].try_join_until(endAt))
152  return false;
153  }
154 
155  lock.lock();
156  hasWaited_ = true;
157  if (dependencies_.nEdges() != 0)
158  throw Exception::ContainsCycle("task dependency graph contains cycle(s)");
159  dependencies_.clear();
160  return true;
161  }
162 
168  void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
169  start(dependencies, nWorkers, functor);
170  wait();
171  }
172 
176  bool isFinished() {
177  boost::lock_guard<boost::mutex> lock(mutex_);
178  return !hasStarted_ || nWorkersFinished_ == nWorkers_;
179  }
180 
185  size_t nStarted() {
186  boost::lock_guard<boost::mutex> lock(mutex_);
187  return nItemsStarted_;
188  }
189 
193  size_t nFinished() {
194  boost::lock_guard<boost::mutex> lock(mutex_);
195  return nItemsFinished_;
196  }
197 
202  std::set<size_t> runningTasks() {
203  boost::lock_guard<boost::mutex> lock(mutex_);
204  return runningTasks_;
205  }
206 
211  std::pair<size_t, size_t> nWorkers() {
212  boost::lock_guard<boost::mutex> lock(mutex_);
213  return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
214  }
215 
216 private:
217  // Scan the dependency graph and fill the work queue with vertices that have no dependencies.
218  void fillWorkQueueNS() {
219  ASSERT_require(workQueue_.isEmpty());
220  BOOST_FOREACH (const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
221  if (vertex.nOutEdges() == 0)
222  workQueue_.push(vertex.id());
223  }
224  }
225 
226  // Start worker threads
227  void startWorkersNS(Functor functor) {
228  workers_ = new boost::thread[nWorkers_];
229  for (size_t i=0; i<nWorkers_; ++i)
230  workers_[i] = boost::thread(startWorker, this, functor);
231  }
232 
233  // Worker threads execute here
234  static void startWorker(ThreadWorkers *self, Functor functor) {
235  self->worker(functor);
236  }
237 
238  void worker(Functor functor) {
239  while (1) {
240  // Get the next item of work
241  boost::unique_lock<boost::mutex> lock(mutex_);
242  while (nItemsFinished_ < nItemsStarted_ && workQueue_.isEmpty())
243  workInserted_.wait(lock);
244  if (nItemsFinished_ == nItemsStarted_ && workQueue_.isEmpty()) {
245  ++nWorkersFinished_;
246  return;
247  }
248  ASSERT_forbid(workQueue_.isEmpty());
249  size_t workItemId = workQueue_.pop();
250  typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
251  ASSERT_require(workVertex->nOutEdges() == 0);
252  typename DependencyGraph::VertexValue workItem = workVertex->value();
253  ++nItemsStarted_;
254 
255  // Do the work
256  ++nWorkersRunning_;
257  runningTasks_.insert(workItemId);
258  lock.unlock();
259  functor(workItemId, workItem);
260  lock.lock();
261  ++nItemsFinished_;
262  --nWorkersRunning_;
263  runningTasks_.erase(workItemId);
264 
265  // Look for more work as we remove some dependency edges. Watch out for parallel edges (self edges not possible).
266  Container::Map<size_t, typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
267  BOOST_FOREACH (const typename DependencyGraph::Edge &edge, workVertex->inEdges())
268  candidateWorkItems.insert(edge.source()->id(), edge.source());
269  dependencies_.clearInEdges(workVertex);
270  size_t newWorkInserted = 0;
271  BOOST_FOREACH (const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems.values()) {
272  if (candidate->nOutEdges() == 0) {
273  workQueue_.push(candidate->id());
274  ++newWorkInserted;
275  }
276  }
277 
278  // Notify other workers
279  if (0 == newWorkInserted) {
280  if (workQueue_.isEmpty())
281  workInserted_.notify_all();
282  } else if (1 == newWorkInserted) {
283  // we'll do the new work ourself
284  } else if (2 == newWorkInserted) {
285  workInserted_.notify_one();
286  } else {
287  workInserted_.notify_all();
288  }
289  }
290  }
291 };
292 
314 template<class DependencyGraph, class Functor>
315 void
316 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
317  ThreadWorkers<DependencyGraph, Functor>(dependencies, nWorkers, functor);
318 }
319 
320 
321 template<class DependencyGraph, class Functor, class Monitor>
322 void
323 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor,
324  Monitor monitor, boost::chrono::milliseconds period) {
326  workers.start(dependencies, nWorkers, functor);
327  while (!workers.tryWaitFor(period))
328  monitor(dependencies, workers.nFinished(), workers.runningTasks());
329 }
333 } // namespace
~ThreadWorkers()
Destructor.
Definition: ThreadWorkers.h:85
std::set< size_t > runningTasks()
Tasks currently running.
bool tryWaitFor(const boost::chrono::duration< Rep, Period > &relTime)
Wait for work to complete.
Name space for the entire library.
Stack & push(const Value &value)
Push new item onto stack.
Definition: Stack.h:90
bool isFinished()
Test whether all possible work is finished.
Work list with dependencies.
Definition: ThreadWorkers.h:32
void wait()
Wait for work to complete.
Value pop()
Pop existing item from stack.
Definition: Stack.h:99
void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Start workers and return.
size_t nFinished()
Number of tasks that have completed.
void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Synchronously processes tasks.
size_t nStarted()
Number of tasks that have started.
bool isEmpty() const
Determines if the stack is empty.
Definition: Stack.h:49
void workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Performs work in parallel.
Error when a cycle is detected.
ThreadWorkers()
Default constructor.
Definition: ThreadWorkers.h:52
std::pair< size_t, size_t > nWorkers()
Number of worker threads.
ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Constructor that synchronously runs the work.
Definition: ThreadWorkers.h:71