ROSE  0.9.9.109
ThreadWorkers.h
1 // WARNING: Changes to this file must be contributed back to Sawyer or else they will
2 // be clobbered by the next update from Sawyer. The Sawyer repository is at
3 // https://github.com/matzke1/sawyer.
4 
5 
6 
7 
8 #include <Sawyer/Exception.h>
9 #include <Sawyer/Graph.h>
10 #include <Sawyer/Sawyer.h>
11 #include <Sawyer/Stack.h>
12 
13 #include <boost/foreach.hpp>
14 #include <boost/thread/condition_variable.hpp>
15 #include <boost/thread/locks.hpp>
16 #include <boost/thread/mutex.hpp>
17 #include <boost/thread/thread.hpp>
18 #include <set>
19 
20 namespace Sawyer {
21 
29 template<class DependencyGraph, class Functor>
31  boost::mutex mutex_; // protects the following members after the constructor
32  DependencyGraph dependencies_; // outstanding dependencies
33  bool hasStarted_; // set when work has started
34  bool hasWaited_; // set when wait() is called
35  Container::Stack<size_t> workQueue_; // outstanding work identified by vertex ID of dependency graph
36  boost::condition_variable workInserted_; // signaled when work is added to the queue or all work is consumed
37  size_t nWorkers_; // number of worker threads allocated
38  boost::thread *workers_; // worker threads
39  size_t nItemsStarted_; // number of work items started
40  size_t nItemsFinished_; // number of work items that have been completed already
41  size_t nWorkersRunning_; // number of workers that are currently busy doing something
42  size_t nWorkersFinished_; // number of worker threads that have returned
43 
44 public:
50  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
51  nWorkersRunning_(0), nWorkersFinished_(0) {}
52 
68  ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
69  : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
70  nWorkersRunning_(0), nWorkersFinished_(0) {
71  try {
72  run(dependencies, nWorkers, functor);
73  } catch (const Exception::ContainsCycle&) {
74  delete[] workers_;
75  throw; // destructor won't be called
76  }
77  }
78 
83  wait();
84  delete[] workers_;
85  }
86 
100  void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
101  boost::lock_guard<boost::mutex> lock(mutex_);
102  if (hasStarted_)
103  throw std::runtime_error("work can start only once per object");
104  hasStarted_ = true;
105  dependencies_ = dependencies;
106  if (0 == nWorkers)
107  nWorkers = boost::thread::hardware_concurrency();
108  nWorkers_ = std::max((size_t)1, std::min(nWorkers, dependencies.nVertices()));
109  nItemsStarted_ = nWorkersFinished_ = 0;
110  fillWorkQueueNS();
111  startWorkersNS(functor);
112  }
113 
118  void wait() {
119  boost::unique_lock<boost::mutex> lock(mutex_);
120  if (!hasStarted_ || hasWaited_)
121  return;
122  hasWaited_ = true;
123  lock.unlock();
124 
125  for (size_t i=0; i<nWorkers_; ++i)
126  workers_[i].join();
127 
128  lock.lock();
129  dependencies_.clear();
130  }
131 
137  void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
138  start(dependencies, nWorkers, functor);
139  wait();
140  if (dependencies_.nEdges() != 0)
141  throw Exception::ContainsCycle("task dependency graph contains cycle(s)");
142  }
143 
147  bool isFinished() {
148  boost::lock_guard<boost::mutex> lock(mutex_);
149  return !hasStarted_ || nWorkersFinished_ == nWorkers_;
150  }
151 
155  size_t nStarted() {
156  boost::lock_guard<boost::mutex> lock(mutex_);
157  return nItemsStarted_;
158  }
159 
161  size_t nFinished() {
162  boost::lock_guard<boost::mutex> lock(mutex_);
163  return nItemsFinished_;
164  }
165 
170  std::pair<size_t, size_t> nWorkers() {
171  boost::lock_guard<boost::mutex> lock(mutex_);
172  return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
173  }
174 
175 private:
176  // Scan the dependency graph and fill the work queue with vertices that have no dependencies.
177  void fillWorkQueueNS() {
178  ASSERT_require(workQueue_.isEmpty());
179  BOOST_FOREACH (const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
180  if (vertex.nOutEdges() == 0)
181  workQueue_.push(vertex.id());
182  }
183  }
184 
185  // Start worker threads
186  void startWorkersNS(Functor functor) {
187  workers_ = new boost::thread[nWorkers_];
188  for (size_t i=0; i<nWorkers_; ++i)
189  workers_[i] = boost::thread(startWorker, this, functor);
190  }
191 
192  // Worker threads execute here
193  static void startWorker(ThreadWorkers *self, Functor functor) {
194  self->worker(functor);
195  }
196 
197  void worker(Functor functor) {
198  while (1) {
199  // Get the next item of work
200  boost::unique_lock<boost::mutex> lock(mutex_);
201  while (nItemsFinished_ < nItemsStarted_ && workQueue_.isEmpty())
202  workInserted_.wait(lock);
203  if (nItemsFinished_ == nItemsStarted_ && workQueue_.isEmpty()) {
204  ++nWorkersFinished_;
205  return;
206  }
207  ASSERT_forbid(workQueue_.isEmpty());
208  size_t workItemId = workQueue_.pop();
209  typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
210  ASSERT_require(workVertex->nOutEdges() == 0);
211  typename DependencyGraph::VertexValue workItem = workVertex->value();
212  ++nItemsStarted_;
213 
214  // Do the work
215  ++nWorkersRunning_;
216  lock.unlock();
217  functor(workItemId, workItem);
218  lock.lock();
219  ++nItemsFinished_;
220  --nWorkersRunning_;
221 
222  // Look for more work as we remove some dependency edges. Watch out for parallel edges (self edges not possible).
223  std::set<typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
224  BOOST_FOREACH (const typename DependencyGraph::Edge &edge, workVertex->inEdges())
225  candidateWorkItems.insert(edge.source());
226  dependencies_.clearInEdges(workVertex);
227  size_t newWorkInserted = 0;
228  BOOST_FOREACH (const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems) {
229  if (candidate->nOutEdges() == 0) {
230  workQueue_.push(candidate->id());
231  ++newWorkInserted;
232  }
233  }
234 
235  // Notify other workers
236  if (0 == newWorkInserted) {
237  if (workQueue_.isEmpty())
238  workInserted_.notify_all();
239  } else if (1 == newWorkInserted) {
240  // we'll do the new work ourself
241  } else if (2 == newWorkInserted) {
242  workInserted_.notify_one();
243  } else {
244  workInserted_.notify_all();
245  }
246  }
247  }
248 };
249 
265 template<class DependencyGraph, class Functor>
266 void
267 workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor) {
268  ThreadWorkers<DependencyGraph, Functor>(dependencies, nWorkers, functor);
269 }
270 
271 
272 } // namespace
~ThreadWorkers()
Destructor.
Definition: ThreadWorkers.h:82
Name space for the entire library.
Definition: Access.h:11
Stack & push(const Value &value)
Push new item onto stack.
Definition: Stack.h:89
bool isFinished()
Test whether all possible work is finished.
Work list with dependencies.
Definition: ThreadWorkers.h:30
void wait()
Wait for work to complete.
Value pop()
Pop existing item from stack.
Definition: Stack.h:98
void start(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Start workers and return.
size_t nFinished()
Number of tasks that have completed.
void run(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Synchronously processes tasks.
size_t nStarted()
Number of tasks that have started.
bool isEmpty() const
Determines if the stack is empty.
Definition: Stack.h:48
void workInParallel(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Performs work in parallel.
Error when a cycle is detected.
ThreadWorkers()
Default constructor.
Definition: ThreadWorkers.h:49
std::pair< size_t, size_t > nWorkers()
Number of worker threads.
ThreadWorkers(const DependencyGraph &dependencies, size_t nWorkers, Functor functor)
Constructor that synchronously runs the work.
Definition: ThreadWorkers.h:68