ROSE 0.11.145.147
ParallelSort.h
1// Parallel sorting using multiple threads. See ParallelSort::quicksort() near the end of this file.
2#ifndef ROSE_ParallelSort_H
3#define ROSE_ParallelSort_H
4#include <RoseFirst.h>
5
6#include <boost/thread.hpp>
7#include <list>
8#include <vector>
9
10namespace Rose {
11
22namespace ParallelSort {
23
24// This stuff is all private but useful for any parallel sort algorithm.
25namespace Private {
26
27// A unit of work. The values to be worked on are specified by a begin (inclusive) and end (exclusive) iterator.
28template<class RandomAccessIterator>
29struct Work {
30 RandomAccessIterator begin, end;
31 Work(RandomAccessIterator begin, RandomAccessIterator end): begin(begin), end(end) {}
32};
33
34// Information about a sorting job. A job is generated when the user requests a sort, and each job may have multiple threads.
35template<class RandomAccessIterator, class Compare>
36struct Job {
37 Compare compare; // functor to compare two values, like for std::sort()
38 boost::condition_variable worklistInsertion; // signaled when something is added to the worklist
39 static const int multiThreshold = 10000; // size at which to start multi-threading
40 boost::mutex mutex; // protects all of the following data members
41 std::list<Work<RandomAccessIterator> > worklist; // non-overlapping stuff that needs to be sorted
42 size_t npending; // number of workers that are processing jobs that aren't in worklist
43 Job(Compare compare): compare(compare), npending(0) {}
44};
45
46// Somewhat like std::partition(). Partitions the iterator range into two parts according to the value at the pivot iterator
47// and returns an iterator for the beginning of the second part. The values of the first part will all be less than the pivot
48// value, and the values of the second part will all be greater than or equal to the pivot value. The values are in no
49// particular order.
50template<class RandomAccessIterator, class Compare>
51RandomAccessIterator partition(RandomAccessIterator begin, RandomAccessIterator end,
52 RandomAccessIterator pivot, Compare compare) {
53 assert(begin < end);
54 assert(pivot >= begin && pivot < end);
55 std::swap(*pivot, *(end-1)); // temporarily move the pivot out of the way
56 for (RandomAccessIterator i=pivot=begin; i+1<end; ++i) {
57 if (compare(*i, *(end-1)))
58 std::swap(*i, *pivot++);
59 }
60 std::swap(*(end-1), *pivot);
61 return pivot;
62}
63
64// Add work to the work list
65template<class RandomAccessIterator, class Compare>
66void addWork(Job<RandomAccessIterator, Compare> &job, const Work<RandomAccessIterator> &work) {
67 boost::lock_guard<boost::mutex> lock(job.mutex);
68 job.worklist.push_back(work);
69 job.worklistInsertion.notify_one();
70}
71
72// Sorts one unit of work, adding additional items to the worklist if necessary.
73template<class RandomAccessIterator, class Compare>
74void quicksort(Job<RandomAccessIterator, Compare> &job, Work<RandomAccessIterator> work) {
75 while (work.end - work.begin > 1) {
76 if (work.end - work.begin < job.multiThreshold) {
77 std::sort(work.begin, work.end, job.compare);
78 return;
79 } else {
80 RandomAccessIterator pivot = work.begin + (work.end - work.begin) / 2; // assuming fairly even distribution
81 pivot = partition(work.begin, work.end, pivot, job.compare);
82 addWork(job, Work<RandomAccessIterator>(pivot+1, work.end));
83 work.end = pivot;
84 }
85 }
86}
87
88// A worker thread.
89template<class RandomAccessIterator, class Compare>
90struct Worker {
92 size_t id; // only for debugging
93 Worker(Job<RandomAccessIterator, Compare> &job, size_t id): job(job), id(id) {}
94 void operator()() {
95 boost::unique_lock<boost::mutex> lock(job.mutex);
96 while (true) {
97 // Get the next unit of work. If no workers are working and the worklist is empty then we're all done.
98 while (job.worklist.empty() && job.npending>0)
99 job.worklistInsertion.wait(lock);
100 if (job.worklist.empty())
101 return;
102 Work<RandomAccessIterator> work = job.worklist.front();
103 job.worklist.pop_front();
104 ++job.npending;
105
106 // Sort that unit of work
107 lock.unlock();
108 quicksort(job, work);
109 lock.lock();
110
111 // Indicate that the work is completed
112 assert(job.npending>0);
113 if (0==--job.npending && job.worklist.empty())
114 job.worklistInsertion.notify_all();
115 }
116 }
117};
118
119} // namespace
120
121
140template<class RandomAccessIterator, class Compare>
141void quicksort(RandomAccessIterator begin, RandomAccessIterator end, size_t nThreads, Compare compare) {
142 if (begin < end) {
143 assert(begin < end);
144 using namespace Private;
145
146 Job<RandomAccessIterator, Compare> job(compare);
147 addWork(job, Work<RandomAccessIterator>(begin, end));
148
149 // Start worker threads (we can't assume containers with move semantics, so use an array)
150 size_t nworkers = std::max(nThreads, (size_t)1) - 1;
151 boost::thread *workers = new boost::thread[nworkers];
152 for (size_t i=0; i<nworkers; ++i)
153 workers[i] = boost::thread(Worker<RandomAccessIterator, Compare>(job, i+1));
154
155 // Participate in the work ourselves (we might be the only thread!)
156 Worker<RandomAccessIterator, Compare>(job, 0)();
157
158 // Wait for all the threads to finish
159 for (size_t i=0; i<nworkers; ++i)
160 workers[i].join();
161 }
162}
163
164} // namespace
165} // namespace
166
167#endif
void quicksort(RandomAccessIterator begin, RandomAccessIterator end, size_t nThreads, Compare compare)
Sort values in parallel.
The ROSE library.