ROSE  0.9.10.91
ThreadWorkers.h
1 // WARNING: Changes to this file must be contributed back to Sawyer or else they will
2 // be clobbered by the next update from Sawyer. The Sawyer repository is at
3 // https://github.com/matzke1/sawyer.
4 
5 
6 
7 
8 #include <Sawyer/Exception.h>
9 #include <Sawyer/Graph.h>
10 #include <Sawyer/Sawyer.h>
11 #include <Sawyer/Stack.h>
12 
13 #include <boost/foreach.hpp>
14 #include <boost/version.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/locks.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/thread.hpp>
19 #include <set>
20 
21 namespace Sawyer {
22 
30 template<class DependencyGraph, class Functor>
32  boost::mutex mutex_; // protects the following members after the constructor
33  DependencyGraph dependencies_; // outstanding dependencies
34  bool hasStarted_; // set when work has started
35  bool hasWaited_; // set when wait() is called
36  Container::Stack<size_t> workQueue_; // outstanding work identified by vertex ID of dependency graph
37  boost::condition_variable workInserted_; // signaled when work is added to the queue or all work is consumed
38  size_t nWorkers_; // number of worker threads allocated
39  boost::thread *workers_; // worker threads
40  size_t nItemsStarted_; // number of work items started
41  size_t nItemsFinished_; // number of work items that have been completed already
42  size_t nWorkersRunning_; // number of workers that are currently busy doing something
43  size_t nWorkersFinished_; // number of worker threads that have returned
44  std::set<size_t> runningTasks_; // tasks (vertex IDs) that are running
45 
46 public:
52  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
53  nWorkersRunning_(0), nWorkersFinished_(0) {}
54 
70  ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
71  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
72  nWorkersRunning_(0), nWorkersFinished_(0) {
73  try {
74  run(dependencies, nWorkers, functor);
75  } catch (const Exception::ContainsCycle&) {
76  delete[] workers_;
77  throw; // destructor won't be called
78  }
79  }
80 
85  wait();
86  delete[] workers_;
87  }
88 
102  void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
103  boost::lock_guard<boost::mutex> lock(mutex_);
104  if (hasStarted_)
105  throw std::runtime_error("work can start only once per object");
106  hasStarted_ = true;
107  dependencies_ = dependencies;
108  if (0 == nWorkers)
109  nWorkers = boost::thread::hardware_concurrency();
110  nWorkers_ = std::max((size_t)1, std::min(nWorkers, dependencies.nVertices()));
111  nItemsStarted_ = nWorkersFinished_ = 0;
112  runningTasks_.clear();
113  fillWorkQueueNS();
114  startWorkersNS(functor);
115  }
116 
121  void wait() {
122  boost::unique_lock<boost::mutex> lock(mutex_);
123  if (!hasStarted_ || hasWaited_)
124  return;
125  hasWaited_ = true;
126  lock.unlock();
127 
128  for (size_t i=0; i<nWorkers_; ++i)
129  workers_[i].join();
130 
131  lock.lock();
132  if (dependencies_.nEdges() != 0)
133  throw Exception::ContainsCycle("task dependency graph contains cycle(s)");
134  dependencies_.clear();
135  }
136 
142  void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
143  start(dependencies, nWorkers, functor);
144  wait();
145  }
146 
150  bool isFinished() {
151  boost::lock_guard<boost::mutex> lock(mutex_);
152  return !hasStarted_ || nWorkersFinished_ == nWorkers_;
153  }
154 
159  size_t nStarted() {
160  boost::lock_guard<boost::mutex> lock(mutex_);
161  return nItemsStarted_;
162  }
163 
167  size_t nFinished() {
168  boost::lock_guard<boost::mutex> lock(mutex_);
169  return nItemsFinished_;
170  }
171 
176  std::set<size_t> runningTasks() {
177  boost::lock_guard<boost::mutex> lock(mutex_);
178  return runningTasks_;
179  }
180 
185  std::pair<size_t, size_t> nWorkers() {
186  boost::lock_guard<boost::mutex> lock(mutex_);
187  return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
188  }
189 
190 private:
191  // Scan the dependency graph and fill the work queue with vertices that have no dependencies.
192  void fillWorkQueueNS() {
193  ASSERT_require(workQueue_.isEmpty());
194  BOOST_FOREACH (const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
195  if (vertex.nOutEdges() == 0)
196  workQueue_.push(vertex.id());
197  }
198  }
199 
200  // Start worker threads
201  void startWorkersNS(Functor functor) {
202  workers_ = new boost::thread[nWorkers_];
203  for (size_t i=0; i<nWorkers_; ++i)
204  workers_[i] = boost::thread(startWorker, this, functor);
205  }
206 
207  // Worker threads execute here
208  static void startWorker(ThreadWorkers *self, Functor functor) {
209  self->worker(functor);
210  }
211 
212  void worker(Functor functor) {
213  while (1) {
214  // Get the next item of work
215  boost::unique_lock<boost::mutex> lock(mutex_);
216  while (nItemsFinished_ < nItemsStarted_ && workQueue_.isEmpty())
217  workInserted_.wait(lock);
218  if (nItemsFinished_ == nItemsStarted_ && workQueue_.isEmpty()) {
219  ++nWorkersFinished_;
220  return;
221  }
222  ASSERT_forbid(workQueue_.isEmpty());
223  size_t workItemId = workQueue_.pop();
224  typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
225  ASSERT_require(workVertex->nOutEdges() == 0);
226  typename DependencyGraph::VertexValue workItem = workVertex->value();
227  ++nItemsStarted_;
228 
229  // Do the work
230  ++nWorkersRunning_;
231  runningTasks_.insert(workItemId);
232  lock.unlock();
233  functor(workItemId, workItem);
234  lock.lock();
235  ++nItemsFinished_;
236  --nWorkersRunning_;
237  runningTasks_.erase(workItemId);
238 
239  // Look for more work as we remove some dependency edges. Watch out for parallel edges (self edges not possible).
240  std::set<typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
241  BOOST_FOREACH (const typename DependencyGraph::Edge &edge, workVertex->inEdges())
242  candidateWorkItems.insert(edge.source());
243  dependencies_.clearInEdges(workVertex);
244  size_t newWorkInserted = 0;
245  BOOST_FOREACH (const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems) {
246  if (candidate->nOutEdges() == 0) {
247  workQueue_.push(candidate->id());
248  ++newWorkInserted;
249  }
250  }
251 
252  // Notify other workers
253  if (0 == newWorkInserted) {
254  if (workQueue_.isEmpty())
255  workInserted_.notify_all();
256  } else if (1 == newWorkInserted) {
257  // we'll do the new work ourself
258  } else if (2 == newWorkInserted) {
259  workInserted_.notify_one();
260  } else {
261  workInserted_.notify_all();
262  }
263  }
264  }
265 };
266 
288 template<class DependencyGraph, class Functor>
289 void
290 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
291  ThreadWorkers<DependencyGraph, Functor>(dependencies, nWorkers, functor);
292 }
293 
294 
295 template<class DependencyGraph, class Functor, class Monitor>
296 void
297 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor,
298  Monitor monitor, boost::chrono::milliseconds period) {
300  workers.start(dependencies, nWorkers, functor);
301  while (!workers.isFinished()) {
302  monitor(dependencies, nWorkers, workers.runningTasks());
303 #if BOOST_VERSION >= 105000
304  boost::this_thread::sleep_for(period);
305 #else
306  // For ROSE's sake, don't make this a compile-time error just yet. [Robb Matzke 2018-04-24]
307  ASSERT_not_reachable("this old version of boost is not supported");
308 #endif
309  }
310  monitor(dependencies, nWorkers, std::set<size_t>());
311  workers.wait();
312 }
316 } // namespace
~ThreadWorkers()
Destructor.
Definition: ThreadWorkers.h:84
std::set< size_t > runningTasks()
Tasks currently running.
Name space for the entire library.
Definition: Access.h:13
Stack & push(const Value &value)
Push new item onto stack.
Definition: Stack.h:90
bool isFinished()
Test whether all possible work is finished.
Work list with dependencies.
Definition: ThreadWorkers.h:31
void wait()
Wait for work to complete.
Value pop()
Pop existing item from stack.
Definition: Stack.h:99
void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Start workers and return.
size_t nFinished()
Number of tasks that have completed.
void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Synchronously processes tasks.
size_t nStarted()
Number of tasks that have started.
bool isEmpty() const
Determines if the stack is empty.
Definition: Stack.h:49
void workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Performs work in parallel.
Error when a cycle is detected.
ThreadWorkers()
Default constructor.
Definition: ThreadWorkers.h:51
std::pair< size_t, size_t > nWorkers()
Number of worker threads.
ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Constructor that synchronously runs the work.
Definition: ThreadWorkers.h:70