ROSE  0.11.2.0
WorkList.h
1 // WARNING: Changes to this file must be contributed back to Sawyer or else they will
2 // be clobbered by the next update from Sawyer. The Sawyer repository is at
3 // https://github.com/matzke1/sawyer.
4 
5 
6 
7 
8 #ifndef Sawyer_WorkList_H
9 #define Sawyer_WorkList_H
10 
11 #include <Sawyer/Synchronization.h>
12 #include <deque>
13 
14 namespace Sawyer {
15 
22 template<class T>
23 class WorkFifo {
24  mutable SAWYER_THREAD_TRAITS::Mutex mutex_; // protects the following data members
25  std::deque<T> items_; // ordered list of work to be completed
26 
27 public:
29  typedef T Item;
30 
32  void insert(const Item &item) {
33  SAWYER_THREAD_TRAITS::LockGuard lock(mutex_);
34  items_.push_back(item);
35  }
36 
38  bool isEmpty() const {
39  SAWYER_THREAD_TRAITS::LockGuard lock(mutex_);
40  return items_.empty();
41  }
42 
51  Item next() {
52  SAWYER_THREAD_TRAITS::LockGuard lock(mutex_);
53  ASSERT_forbid(items_.empty());
54  Item item = items_.front();
55  items_.pop_front();
56  return item;
57  }
58 };
59 
68 template<class WorkItems, class Functor>
69 void processWorkList(WorkItems &workList, size_t maxWorkers, Functor f) {
70 #if SAWYER_MULTI_THREADED
71  boost::mutex mutex;
72  boost::condition_variable cond;
73  size_t nActiveWorkers = 0;
74 
75  if (0 == maxWorkers)
76  maxWorkers = boost::thread::hardware_concurrency();
77  ASSERT_require(maxWorkers > 0);
78 
79  while (1) {
80  // Wait for work items or workers to be available.
81  boost::unique_lock<boost::mutex> lock(mutex);
82  while (!workList.isEmpty() && nActiveWorkers >= maxWorkers)
83  cond.wait(lock);
84 
85  // Start as much work as possible
86  while (!workList.isEmpty() && nActiveWorkers < maxWorkers) {
87  // Assuming lambda's are not available, we work around.
88  struct Worker {
89  static void doWork(Functor f, const typename WorkItems::Item &item, WorkItems *workList,
90  boost::mutex *mutex, boost::condition_variable *cond, size_t *nActiveWorkers) {
91  f(item, *workList);
92  boost::lock_guard<boost::mutex> lock(*mutex);
93  --*nActiveWorkers;
94  cond->notify_one();
95  }
96  };
97 
98  boost::thread thrd(Worker::doWork, f, workList.next(), &workList, &mutex, &cond, &nActiveWorkers);
99  thrd.detach();
100  ++nActiveWorkers;
101  }
102 
103  // Return if there's nothing to do
104  if (workList.isEmpty() && 0 == nActiveWorkers)
105  break;
106  }
107 #else
108  while (!workList.isEmpty())
109  f(workList.next(), workList);
110 #endif
111 }
112 
113 } // namespace
114 #endif
First-in-first-out work queue.
Definition: WorkList.h:23
void insert(const Item &item)
Insert one unit of work into the queue.
Definition: WorkList.h:32
void processWorkList(WorkItems &workList, size_t maxWorkers, Functor f)
Process work from a work list in parallel.
Definition: WorkList.h:69
T Item
The type that represents one unit of work.
Definition: WorkList.h:29
Item next()
Remove and return the next item of work.
Definition: WorkList.h:51
Name space for the entire library.
bool isEmpty() const
Test whethere the queue is empty.
Definition: WorkList.h:38