ROSE  0.9.10.69
ThreadWorkers.h
1 // WARNING: Changes to this file must be contributed back to Sawyer or else they will
2 // be clobbered by the next update from Sawyer. The Sawyer repository is at
3 // https://github.com/matzke1/sawyer.
4 
5 
6 
7 
8 #include <Sawyer/Exception.h>
9 #include <Sawyer/Graph.h>
10 #include <Sawyer/Sawyer.h>
11 #include <Sawyer/Stack.h>
12 
13 #include <boost/foreach.hpp>
14 #include <boost/thread/condition_variable.hpp>
15 #include <boost/thread/locks.hpp>
16 #include <boost/thread/mutex.hpp>
17 #include <boost/thread/thread.hpp>
18 #include <set>
19 
20 namespace Sawyer {
21 
29 template<class DependencyGraph, class Functor>
31  boost::mutex mutex_; // protects the following members after the constructor
32  DependencyGraph dependencies_; // outstanding dependencies
33  bool hasStarted_; // set when work has started
34  bool hasWaited_; // set when wait() is called
35  Container::Stack<size_t> workQueue_; // outstanding work identified by vertex ID of dependency graph
36  boost::condition_variable workInserted_; // signaled when work is added to the queue or all work is consumed
37  size_t nWorkers_; // number of worker threads allocated
38  boost::thread *workers_; // worker threads
39  size_t nItemsStarted_; // number of work items started
40  size_t nItemsFinished_; // number of work items that have been completed already
41  size_t nWorkersRunning_; // number of workers that are currently busy doing something
42  size_t nWorkersFinished_; // number of worker threads that have returned
43  std::set<size_t> runningTasks_; // tasks (vertex IDs) that are running
44 
45 public:
51  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
52  nWorkersRunning_(0), nWorkersFinished_(0) {}
53 
69  ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
70  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
71  nWorkersRunning_(0), nWorkersFinished_(0) {
72  try {
73  run(dependencies, nWorkers, functor);
74  } catch (const Exception::ContainsCycle&) {
75  delete[] workers_;
76  throw; // destructor won't be called
77  }
78  }
79 
84  wait();
85  delete[] workers_;
86  }
87 
101  void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
102  boost::lock_guard<boost::mutex> lock(mutex_);
103  if (hasStarted_)
104  throw std::runtime_error("work can start only once per object");
105  hasStarted_ = true;
106  dependencies_ = dependencies;
107  if (0 == nWorkers)
108  nWorkers = boost::thread::hardware_concurrency();
109  nWorkers_ = std::max((size_t)1, std::min(nWorkers, dependencies.nVertices()));
110  nItemsStarted_ = nWorkersFinished_ = 0;
111  runningTasks_.clear();
112  fillWorkQueueNS();
113  startWorkersNS(functor);
114  }
115 
120  void wait() {
121  boost::unique_lock<boost::mutex> lock(mutex_);
122  if (!hasStarted_ || hasWaited_)
123  return;
124  hasWaited_ = true;
125  lock.unlock();
126 
127  for (size_t i=0; i<nWorkers_; ++i)
128  workers_[i].join();
129 
130  lock.lock();
131  if (dependencies_.nEdges() != 0)
132  throw Exception::ContainsCycle("task dependency graph contains cycle(s)");
133  dependencies_.clear();
134  }
135 
141  void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
142  start(dependencies, nWorkers, functor);
143  wait();
144  }
145 
149  bool isFinished() {
150  boost::lock_guard<boost::mutex> lock(mutex_);
151  return !hasStarted_ || nWorkersFinished_ == nWorkers_;
152  }
153 
158  size_t nStarted() {
159  boost::lock_guard<boost::mutex> lock(mutex_);
160  return nItemsStarted_;
161  }
162 
166  size_t nFinished() {
167  boost::lock_guard<boost::mutex> lock(mutex_);
168  return nItemsFinished_;
169  }
170 
175  std::set<size_t> runningTasks() {
176  boost::lock_guard<boost::mutex> lock(mutex_);
177  return runningTasks_;
178  }
179 
184  std::pair<size_t, size_t> nWorkers() {
185  boost::lock_guard<boost::mutex> lock(mutex_);
186  return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
187  }
188 
189 private:
190  // Scan the dependency graph and fill the work queue with vertices that have no dependencies.
191  void fillWorkQueueNS() {
192  ASSERT_require(workQueue_.isEmpty());
193  BOOST_FOREACH (const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
194  if (vertex.nOutEdges() == 0)
195  workQueue_.push(vertex.id());
196  }
197  }
198 
199  // Start worker threads
200  void startWorkersNS(Functor functor) {
201  workers_ = new boost::thread[nWorkers_];
202  for (size_t i=0; i<nWorkers_; ++i)
203  workers_[i] = boost::thread(startWorker, this, functor);
204  }
205 
206  // Worker threads execute here
207  static void startWorker(ThreadWorkers *self, Functor functor) {
208  self->worker(functor);
209  }
210 
211  void worker(Functor functor) {
212  while (1) {
213  // Get the next item of work
214  boost::unique_lock<boost::mutex> lock(mutex_);
215  while (nItemsFinished_ < nItemsStarted_ && workQueue_.isEmpty())
216  workInserted_.wait(lock);
217  if (nItemsFinished_ == nItemsStarted_ && workQueue_.isEmpty()) {
218  ++nWorkersFinished_;
219  return;
220  }
221  ASSERT_forbid(workQueue_.isEmpty());
222  size_t workItemId = workQueue_.pop();
223  typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
224  ASSERT_require(workVertex->nOutEdges() == 0);
225  typename DependencyGraph::VertexValue workItem = workVertex->value();
226  ++nItemsStarted_;
227 
228  // Do the work
229  ++nWorkersRunning_;
230  runningTasks_.insert(workItemId);
231  lock.unlock();
232  functor(workItemId, workItem);
233  lock.lock();
234  ++nItemsFinished_;
235  --nWorkersRunning_;
236  runningTasks_.erase(workItemId);
237 
238  // Look for more work as we remove some dependency edges. Watch out for parallel edges (self edges not possible).
239  std::set<typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
240  BOOST_FOREACH (const typename DependencyGraph::Edge &edge, workVertex->inEdges())
241  candidateWorkItems.insert(edge.source());
242  dependencies_.clearInEdges(workVertex);
243  size_t newWorkInserted = 0;
244  BOOST_FOREACH (const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems) {
245  if (candidate->nOutEdges() == 0) {
246  workQueue_.push(candidate->id());
247  ++newWorkInserted;
248  }
249  }
250 
251  // Notify other workers
252  if (0 == newWorkInserted) {
253  if (workQueue_.isEmpty())
254  workInserted_.notify_all();
255  } else if (1 == newWorkInserted) {
256  // we'll do the new work ourself
257  } else if (2 == newWorkInserted) {
258  workInserted_.notify_one();
259  } else {
260  workInserted_.notify_all();
261  }
262  }
263  }
264 };
265 
287 template<class DependencyGraph, class Functor>
288 void
289 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
290  ThreadWorkers<DependencyGraph, Functor>(dependencies, nWorkers, functor);
291 }
292 
293 
294 template<class DependencyGraph, class Functor, class Monitor>
295 void
296 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor,
297  Monitor monitor, boost::chrono::milliseconds period) {
299  workers.start(dependencies, nWorkers, functor);
300  while (!workers.isFinished()) {
301  monitor(dependencies, nWorkers, workers.runningTasks());
302 #if BOOST_VERSION >= 1050000
303  boost::this_thread::sleep_for(period);
304 #else
305  // For ROSE's sake, don't make this a compile-time error just yet. [Robb Matzke 2018-04-24]
306  ASSERT_not_reachable("this old version of boost is not supported");
307 #endif
308  }
309  monitor(dependencies, nWorkers, std::set<size_t>());
310  workers.wait();
311 }
315 } // namespace
~ThreadWorkers()
Destructor.
Definition: ThreadWorkers.h:83
std::set< size_t > runningTasks()
Tasks currently running.
Name space for the entire library.
Definition: Access.h:13
Stack & push(const Value &value)
Push new item onto stack.
Definition: Stack.h:90
bool isFinished()
Test whether all possible work is finished.
Work list with dependencies.
Definition: ThreadWorkers.h:30
void wait()
Wait for work to complete.
Value pop()
Pop existing item from stack.
Definition: Stack.h:99
void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Start workers and return.
size_t nFinished()
Number of tasks that have completed.
void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Synchronously processes tasks.
size_t nStarted()
Number of tasks that have started.
bool isEmpty() const
Determines if the stack is empty.
Definition: Stack.h:49
void workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Performs work in parallel.
Error when a cycle is detected.
ThreadWorkers()
Default constructor.
Definition: ThreadWorkers.h:50
std::pair< size_t, size_t > nWorkers()
Number of worker threads.
ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Constructor that synchronously runs the work.
Definition: ThreadWorkers.h:69