while (condition)
{
//... stuff
threadpool.join_all();
//...
}
Doesn't make any sense, because you can only join threads once. Once joined, they are gone. You don't want to be starting new threads all the time (use a thread pool + task queue1).
Since you don't want to actually stop the threads, you probably don't want to destruct the work. If you insist, a shared_ptr<work>
or optional<work>
works nicely (just my_work.reset()
it)
1 Update Suggestion:
UPDATE
A simple extension to "SOLUTION #2" would make it possible to wait for all tasks to have been completed, without joining the workers/destroying the pool:
void drain() {
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::empty(phx::ref(_queue)));
}
Note that for reliable operation, one needs to signal the condition variable on de-queue as well:
cv.notify_all(); // in order to signal drain
CAVEATS
It's an interface inviting race conditions (the queue could accept jobs from many threads, so once drain()
returns, another thread could have posted a new task already)
This signals when the queue is empty, not when the task is completed. The queue cannot know about this, if you need this, use a barrier/signal a condition from within the task (the_work
in this example). The mechanism for queuing/scheduling is not relevant there.
DEMO
Live On Coliru
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));
cv.notify_one();
}
void drain() {
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::empty(phx::ref(_queue)));
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
auto job = std::move(_queue.front());
_queue.pop_front();
cv.notify_all(); // in order to signal drain
return std::move(job);
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
void the_work(int id)
{
std::cout << "worker " << id << " entered
";
// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work
";
this_thread::sleep_for(chrono::milliseconds(2));
std::cout << "worker " << id << " done
";
}
int main()
{
thread_pool pool; // uses 1 thread per core
for (auto i = 0ull; i < 20; ++i) {
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
pool.drain(); // make the queue empty, leave the threads
std::cout << "Queue empty
";
}
// destructing pool joins the worker threads
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…