34    DependencyGraph dependencies_;                      
 
   38    boost::condition_variable workInserted_;            
 
   40    boost::thread *workers_;                            
 
   41    size_t nItemsStarted_;                              
 
   42    size_t nItemsFinished_;                             
 
   43    size_t nWorkersRunning_;                            
 
   44    size_t nWorkersFinished_;                           
 
   45    std::set<size_t> runningTasks_;                     
 
   53        : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
 
   54          nWorkersRunning_(0), nWorkersFinished_(0) {}
 
 
   72        : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
 
   73          nWorkersRunning_(0), nWorkersFinished_(0) {
 
 
  103    void start(
const DependencyGraph &dependencies, 
size_t nWorkers, Functor functor) {
 
  104        boost::lock_guard<boost::mutex> lock(mutex_);
 
  106            throw std::runtime_error(
"work can start only once per object");
 
  108        dependencies_ = dependencies;
 
  110            nWorkers = boost::thread::hardware_concurrency();
 
  111        nWorkers_ = std::max((
size_t)1, std::min(
nWorkers, dependencies.nVertices()));
 
  112        nItemsStarted_ = nWorkersFinished_ = 0;
 
  113        runningTasks_.clear();
 
  115        startWorkersNS(functor);
 
 
  123        boost::unique_lock<boost::mutex> lock(mutex_);
 
  124        if (!hasStarted_ || hasWaited_)
 
  129        for (
size_t i=0; i<nWorkers_; ++i)
 
  133        if (dependencies_.nEdges() != 0)
 
  135        dependencies_.clear();
 
 
  142    template<
class Rep, 
class Period>
 
  143    bool tryWaitFor(
const boost::chrono::duration<Rep, Period> &relTime) {
 
  144        const boost::chrono::steady_clock::time_point endAt = boost::chrono::steady_clock::now() + relTime;
 
  145        boost::unique_lock<boost::mutex> lock(mutex_);
 
  146        if (!hasStarted_ || hasWaited_)
 
  150        for (
size_t i = 0; i < nWorkers_; ++i) {
 
  151            if (!workers_[i].try_join_until(endAt))
 
  157        if (dependencies_.nEdges() != 0)
 
  159        dependencies_.clear();
 
 
  168    void run(
const DependencyGraph &dependencies, 
size_t nWorkers, Functor functor) {
 
 
  177        boost::lock_guard<boost::mutex> lock(mutex_);
 
  178        return !hasStarted_ || nWorkersFinished_ == nWorkers_;
 
 
  186        boost::lock_guard<boost::mutex> lock(mutex_);
 
  187        return nItemsStarted_;
 
 
  194        boost::lock_guard<boost::mutex> lock(mutex_);
 
  195        return nItemsFinished_;
 
 
  203        boost::lock_guard<boost::mutex> lock(mutex_);
 
  204        return runningTasks_;
 
 
  212        boost::lock_guard<boost::mutex> lock(mutex_);
 
  213        return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
 
 
  218    void fillWorkQueueNS() {
 
  219        ASSERT_require(workQueue_.
isEmpty());
 
  220        BOOST_FOREACH (
const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
 
  221            if (vertex.nOutEdges() == 0)
 
  222                workQueue_.
push(vertex.id());
 
  227    void startWorkersNS(Functor functor) {
 
  228        workers_ = 
new boost::thread[nWorkers_];
 
  229        for (
size_t i=0; i<nWorkers_; ++i)
 
  230            workers_[i] = boost::thread(startWorker, 
this, functor);
 
  234    static void startWorker(
ThreadWorkers *self, Functor functor) {
 
  235        self->worker(functor);
 
  238    void worker(Functor functor) {
 
  241            boost::unique_lock<boost::mutex> lock(mutex_);
 
  242            while (nItemsFinished_ < nItemsStarted_ && workQueue_.
isEmpty())
 
  243                workInserted_.wait(lock);
 
  244            if (nItemsFinished_ == nItemsStarted_ && workQueue_.
isEmpty()) {
 
  248            ASSERT_forbid(workQueue_.
isEmpty());
 
  249            size_t workItemId = workQueue_.
pop();
 
  250            typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
 
  251            ASSERT_require(workVertex->nOutEdges() == 0);
 
  252            typename DependencyGraph::VertexValue workItem = workVertex->value();
 
  257            runningTasks_.insert(workItemId);
 
  259            functor(workItemId, workItem);
 
  263            runningTasks_.erase(workItemId);
 
  266            Container::Map<size_t, typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
 
  267            BOOST_FOREACH (
const typename DependencyGraph::Edge &edge, workVertex->inEdges())
 
  268                candidateWorkItems.insert(edge.source()->
id(), edge.source());
 
  269            dependencies_.clearInEdges(workVertex);
 
  270            size_t newWorkInserted = 0;
 
  271            BOOST_FOREACH (const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems.values()) {
 
  272                if (candidate->nOutEdges() == 0) {
 
  273                    workQueue_.
push(candidate->id());
 
  279            if (0 == newWorkInserted) {
 
  281                    workInserted_.notify_all();
 
  282            } 
else if (1 == newWorkInserted) {
 
  284            } 
else if (2 == newWorkInserted) {
 
  285                workInserted_.notify_one();
 
  287                workInserted_.notify_all();
 
 
  323workInParallel(
const DependencyGraph &dependencies, 
size_t nWorkers, Functor functor,
 
  324               Monitor monitor, boost::chrono::milliseconds period) {
 
  326    workers.
start(dependencies, nWorkers, functor);