ROSE  0.9.11.158
ThreadWorkers.h
1 // WARNING: Changes to this file must be contributed back to Sawyer or else they will
2 // be clobbered by the next update from Sawyer. The Sawyer repository is at
3 // https://github.com/matzke1/sawyer.
4 
5 
6 
7 
8 #include <Sawyer/Exception.h>
9 #include <Sawyer/Graph.h>
10 #include <Sawyer/Map.h>
11 #include <Sawyer/Sawyer.h>
12 #include <Sawyer/Stack.h>
13 
14 #include <boost/foreach.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/locks.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/thread.hpp>
19 #include <boost/version.hpp>
20 #include <set>
21 
22 namespace Sawyer {
23 
31 template<class DependencyGraph, class Functor>
33  boost::mutex mutex_; // protects the following members after the constructor
34  DependencyGraph dependencies_; // outstanding dependencies
35  bool hasStarted_; // set when work has started
36  bool hasWaited_; // set when wait() is called
37  Container::Stack<size_t> workQueue_; // outstanding work identified by vertex ID of dependency graph
38  boost::condition_variable workInserted_; // signaled when work is added to the queue or all work is consumed
39  size_t nWorkers_; // number of worker threads allocated
40  boost::thread *workers_; // worker threads
41  size_t nItemsStarted_; // number of work items started
42  size_t nItemsFinished_; // number of work items that have been completed already
43  size_t nWorkersRunning_; // number of workers that are currently busy doing something
44  size_t nWorkersFinished_; // number of worker threads that have returned
45  std::set<size_t> runningTasks_; // tasks (vertex IDs) that are running
46 
47 public:
53  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
54  nWorkersRunning_(0), nWorkersFinished_(0) {}
55 
71  ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
72  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
73  nWorkersRunning_(0), nWorkersFinished_(0) {
74  try {
75  run(dependencies, nWorkers, functor);
76  } catch (const Exception::ContainsCycle&) {
77  delete[] workers_;
78  throw; // destructor won't be called
79  }
80  }
81 
86  wait();
87  delete[] workers_;
88  }
89 
103  void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
104  boost::lock_guard<boost::mutex> lock(mutex_);
105  if (hasStarted_)
106  throw std::runtime_error("work can start only once per object");
107  hasStarted_ = true;
108  dependencies_ = dependencies;
109  if (0 == nWorkers)
110  nWorkers = boost::thread::hardware_concurrency();
111  nWorkers_ = std::max((size_t)1, std::min(nWorkers, dependencies.nVertices()));
112  nItemsStarted_ = nWorkersFinished_ = 0;
113  runningTasks_.clear();
114  fillWorkQueueNS();
115  startWorkersNS(functor);
116  }
117 
122  void wait() {
123  boost::unique_lock<boost::mutex> lock(mutex_);
124  if (!hasStarted_ || hasWaited_)
125  return;
126  hasWaited_ = true;
127  lock.unlock();
128 
129  for (size_t i=0; i<nWorkers_; ++i)
130  workers_[i].join();
131 
132  lock.lock();
133  if (dependencies_.nEdges() != 0)
134  throw Exception::ContainsCycle("task dependency graph contains cycle(s)");
135  dependencies_.clear();
136  }
137 
143  void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
144  start(dependencies, nWorkers, functor);
145  wait();
146  }
147 
151  bool isFinished() {
152  boost::lock_guard<boost::mutex> lock(mutex_);
153  return !hasStarted_ || nWorkersFinished_ == nWorkers_;
154  }
155 
160  size_t nStarted() {
161  boost::lock_guard<boost::mutex> lock(mutex_);
162  return nItemsStarted_;
163  }
164 
168  size_t nFinished() {
169  boost::lock_guard<boost::mutex> lock(mutex_);
170  return nItemsFinished_;
171  }
172 
177  std::set<size_t> runningTasks() {
178  boost::lock_guard<boost::mutex> lock(mutex_);
179  return runningTasks_;
180  }
181 
186  std::pair<size_t, size_t> nWorkers() {
187  boost::lock_guard<boost::mutex> lock(mutex_);
188  return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
189  }
190 
191 private:
192  // Scan the dependency graph and fill the work queue with vertices that have no dependencies.
193  void fillWorkQueueNS() {
194  ASSERT_require(workQueue_.isEmpty());
195  BOOST_FOREACH (const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
196  if (vertex.nOutEdges() == 0)
197  workQueue_.push(vertex.id());
198  }
199  }
200 
201  // Start worker threads
202  void startWorkersNS(Functor functor) {
203  workers_ = new boost::thread[nWorkers_];
204  for (size_t i=0; i<nWorkers_; ++i)
205  workers_[i] = boost::thread(startWorker, this, functor);
206  }
207 
208  // Worker threads execute here
209  static void startWorker(ThreadWorkers *self, Functor functor) {
210  self->worker(functor);
211  }
212 
213  void worker(Functor functor) {
214  while (1) {
215  // Get the next item of work
216  boost::unique_lock<boost::mutex> lock(mutex_);
217  while (nItemsFinished_ < nItemsStarted_ && workQueue_.isEmpty())
218  workInserted_.wait(lock);
219  if (nItemsFinished_ == nItemsStarted_ && workQueue_.isEmpty()) {
220  ++nWorkersFinished_;
221  return;
222  }
223  ASSERT_forbid(workQueue_.isEmpty());
224  size_t workItemId = workQueue_.pop();
225  typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
226  ASSERT_require(workVertex->nOutEdges() == 0);
227  typename DependencyGraph::VertexValue workItem = workVertex->value();
228  ++nItemsStarted_;
229 
230  // Do the work
231  ++nWorkersRunning_;
232  runningTasks_.insert(workItemId);
233  lock.unlock();
234  functor(workItemId, workItem);
235  lock.lock();
236  ++nItemsFinished_;
237  --nWorkersRunning_;
238  runningTasks_.erase(workItemId);
239 
240  // Look for more work as we remove some dependency edges. Watch out for parallel edges (self edges not possible).
241  Container::Map<size_t, typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
242  BOOST_FOREACH (const typename DependencyGraph::Edge &edge, workVertex->inEdges())
243  candidateWorkItems.insert(edge.source()->id(), edge.source());
244  dependencies_.clearInEdges(workVertex);
245  size_t newWorkInserted = 0;
246  BOOST_FOREACH (const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems.values()) {
247  if (candidate->nOutEdges() == 0) {
248  workQueue_.push(candidate->id());
249  ++newWorkInserted;
250  }
251  }
252 
253  // Notify other workers
254  if (0 == newWorkInserted) {
255  if (workQueue_.isEmpty())
256  workInserted_.notify_all();
257  } else if (1 == newWorkInserted) {
258  // we'll do the new work ourself
259  } else if (2 == newWorkInserted) {
260  workInserted_.notify_one();
261  } else {
262  workInserted_.notify_all();
263  }
264  }
265  }
266 };
267 
289 template<class DependencyGraph, class Functor>
290 void
291 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
292  ThreadWorkers<DependencyGraph, Functor>(dependencies, nWorkers, functor);
293 }
294 
295 
296 template<class DependencyGraph, class Functor, class Monitor>
297 void
298 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor,
299  Monitor monitor, boost::chrono::milliseconds period) {
301  workers.start(dependencies, nWorkers, functor);
302  while (!workers.isFinished()) {
303  monitor(dependencies, nWorkers, workers.runningTasks());
304 #if BOOST_VERSION >= 105000
305  boost::this_thread::sleep_for(period);
306 #else
307  // For ROSE's sake, don't make this a compile-time error just yet. [Robb Matzke 2018-04-24]
308  ASSERT_not_reachable("this old version of boost is not supported");
309 #endif
310  }
311  monitor(dependencies, nWorkers, std::set<size_t>());
312  workers.wait();
313 }
317 } // namespace
~ThreadWorkers()
Destructor.
Definition: ThreadWorkers.h:85
std::set< size_t > runningTasks()
Tasks currently running.
Name space for the entire library.
Stack & push(const Value &value)
Push new item onto stack.
Definition: Stack.h:90
bool isFinished()
Test whether all possible work is finished.
Work list with dependencies.
Definition: ThreadWorkers.h:32
void wait()
Wait for work to complete.
Value pop()
Pop existing item from stack.
Definition: Stack.h:99
void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Start workers and return.
size_t nFinished()
Number of tasks that have completed.
void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Synchronously processes tasks.
size_t nStarted()
Number of tasks that have started.
bool isEmpty() const
Determines if the stack is empty.
Definition: Stack.h:49
void workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Performs work in parallel.
Error when a cycle is detected.
ThreadWorkers()
Default constructor.
Definition: ThreadWorkers.h:52
std::pair< size_t, size_t > nWorkers()
Number of worker threads.
ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Constructor that synchronously runs the work.
Definition: ThreadWorkers.h:71