Several fixes and enhancements to abigail::workers

While making abipkgdiff to use the abigail::workers API to do away
with using pthreads directory, it appeared that the abigail::workers
API needs fixes and enhancements.

Fixes
=====

* Don't try to schedule a task if the pointer to the task is nil

* Fix a data race when bringing workers (of a queue) down

* Always try to wake up all waiting threads when bringing down queue
  workers.

* Fix a data race when accessing the queue condition variable

* Fix a data race when notifying listeners about the end of the job
  performed by the task.

Enhancements
============

* Pass the "task done" notifier by reference, to the worker queue.

Without this, the worker queue needs to copy the "task done" notifier
by value.  This implies that user code needs to provide task done
notifier instances that come with potentially complicated copy
constructors.  By passing it by reference and by just re-using the
notifier from the user code, we do away with the need for copying
altogether.  This also fixes some latent copying bugs.

* Add a workers::queue::schedule_tasks() method

This allows user code to schedule a vector of tasks at once.

* make workers::queue::get_completed_tasks() return a non-const vector

This enables user code to sort the completed tasks as they wish.

	* include/abg-workers.h (queue::tasks_type): New typedef.
	(queue::queue): Pass task_done_notify by reference.
	(queue::schedule_tasks): Declare new member function.
	(queue::get_completed_tasks): Return non-const vector.
	* src/abg-workers.cc (queue::priv::default_notify): New data
	member.
	(queue::priv::notify): Make this data member be a reference.
	(queue::priv::priv): Initialize the notify data member to either
	the new default_notify (if no notifier is provided by the
	constructor) or to the notifier provided by the constructor.
	(queue::priv::schedule_task): Do not schedule a nil task.  Update
	comment.
	(queue::priv::schedule_tasks): Add a new member function.
	(queue::priv::do_bring_workers_down): Update comment.  Protect
	access to "bring_workers_down" with tasks_todo_mutex to prevent a
	data race.  Call pthread_cond_broadcast on the queue_cond
	unconditionaly to prevent some worker threads to keep waiting for
	ever. Also, protect the access to the queue_cond by the
	queue_cond_mutex to precent a data race.
	(queue::queue): Pass the notifier by reference. Update comment.
	(queue::schedule_task): Update comment.
	(queue::schedule_tasks): Define new member function.
	(queue::wait_for_workers_to_complete): Update comment.
	(queue::get_completed_tasks): Return a non-const vector. Update
	comment.
	(worker::wait_to_execute_a_task): Update several comments. Make
	the execution of the notification code to be synchronized (on the
	tasks_done_mutex).

Signed-off-by: Dodji Seketeli <dodji@redhat.com>
This commit is contained in:
Dodji Seketeli 2017-02-24 10:58:16 +01:00
parent 5989cbe26c
commit 1d85cc4546
2 changed files with 101 additions and 30 deletions

View File

@ -87,6 +87,9 @@ public:
struct priv;
typedef shared_ptr<priv> priv_sptr;
/// A convenience typedef for a vector of @ref task_sptr
typedef std::vector<task_sptr> tasks_type;
private:
priv_sptr p_;
@ -95,11 +98,12 @@ public:
queue();
queue(unsigned number_of_workers);
queue(unsigned number_of_workers,
const task_done_notify& notifier);
task_done_notify& notifier);
size_t get_size() const;
bool schedule_task(const task_sptr&);
bool schedule_tasks(const tasks_type&);
void wait_for_workers_to_complete();
const std::vector<task_sptr>& get_completed_tasks() const;
tasks_type& get_completed_tasks() const;
~queue();
}; // end class queue

View File

@ -155,8 +155,14 @@ struct queue::priv
std::vector<task_sptr> tasks_done;
// This functor is invoked to notify the user of this queue that a
// task has been completed and has been added to the done tasks
// vector.
task_done_notify notify;
// vector. We call it a notifier. This notifier is the default
// notifier of the work queue; the one that is used when the user
// has specified no notifier. It basically does nothing.
task_done_notify default_notify;
// This is a reference to the the notifier that is actually used in
// the queue. It's either the one specified by the user or the
// default one.
task_done_notify& notify;
// A vector of the worker threads.
std::vector<worker> workers;
@ -167,7 +173,8 @@ struct queue::priv
queue_cond_mutex(),
queue_cond(),
tasks_todo_mutex(),
tasks_done_mutex()
tasks_done_mutex(),
notify(default_notify)
{create_workers();}
/// A constructor of @ref queue::priv.
@ -180,7 +187,8 @@ struct queue::priv
queue_cond_mutex(),
queue_cond(),
tasks_todo_mutex(),
tasks_done_mutex()
tasks_done_mutex(),
notify(default_notify)
{create_workers();}
/// A constructor of @ref queue::priv.
@ -191,7 +199,7 @@ struct queue::priv
/// @param task_done_notify a functor object that is invoked by the
/// worker thread which has performed the task, right after it's
/// added that task to the vector of the done tasks.
priv(size_t nb_workers, const task_done_notify& n)
priv(size_t nb_workers, task_done_notify& n)
: bring_workers_down(),
num_workers(nb_workers),
queue_cond_mutex(),
@ -223,11 +231,15 @@ struct queue::priv
/// performing the task. When it's done with the task, it goes back
/// to be suspended, waiting for a new task to be scheduled.
///
/// @param t the task to schedule.
/// @param t the task to schedule. Note that a nil task won't be
/// scheduled. If the queue is empty, the task @p t won't be
/// scheduled either.
///
/// @return true iff the task @p t was successfully scheduled.
bool
schedule_task(const task_sptr& t)
{
if (workers.empty())
if (workers.empty() || !t)
return false;
pthread_mutex_lock(&tasks_todo_mutex);
@ -240,18 +252,35 @@ struct queue::priv
return true;
}
/// Signal all the threads (of the pool) which are suspended, so
/// that they wakes up. If there is no task to perform, they just
/// end their execution. If there are tasks to perform, they finish
/// them and then end their execution.
/// Submit a vector of task to the queue of tasks to be performed.
///
/// This wakes up threads of the pool which immediatly start
/// performing the tasks. When they are done with the task, they go
/// back to be suspended, waiting for new tasks to be scheduled.
///
/// @param tasks the tasks to schedule.
bool
schedule_tasks(const tasks_type& tasks)
{
bool is_ok= true;
for (tasks_type::const_iterator t = tasks.begin(); t != tasks.end(); ++t)
is_ok &= schedule_task(*t);
return is_ok;
}
/// Signal all the threads (of the pool) which are suspended and
/// waiting to perform a task, so that they wake up and end up their
/// execution. If there is no task to perform, they just end their
/// execution. If there are tasks to perform, they finish them and
/// then end their execution.
///
/// This function then joins all the tasks of the pool, waiting for
/// them to finish, and then it returns. In other words, this
/// function suspends the thread of the caller, waiting for the
/// worker threads to finish their tasks, and end their execution.
///
/// If the user wants to work with the thread pool again, she'll
/// need to create them again, using the member function
/// If the user code wants to work with the thread pool again,
/// she'll need to create them again, using the member function
/// create_workers().
void
do_bring_workers_down()
@ -259,13 +288,17 @@ struct queue::priv
if (workers.empty())
return;
bring_workers_down = true;
pthread_mutex_lock(&tasks_todo_mutex);
if (tasks_todo.empty())
assert(pthread_cond_broadcast(&queue_cond) == 0);
bring_workers_down = true;
pthread_mutex_unlock(&tasks_todo_mutex);
// Acquire the mutex that protects the queue condition variable
// (queue_cond) and wake up all the workers that are sleeping on
// the condition.
pthread_mutex_lock(&queue_cond_mutex);
assert(pthread_cond_broadcast(&queue_cond) == 0);
pthread_mutex_unlock(&queue_cond_mutex);
for (std::vector<worker>::const_iterator i = workers.begin();
i != workers.end();
++i)
@ -301,12 +334,15 @@ queue::queue(unsigned number_of_workers)
/// @param number_of_workers the number of worker threads to have in
/// the pool.
///
/// @param the notifier to invoker when a task is performed. Users
/// should create a type that inherit this @ref task_done_notify class
/// and overload its virtual task_done_notify::operator() operator
/// function.
/// @param the notifier to invoke when a task is done doing its job.
/// Users should create a type that inherit this @ref task_done_notify
/// class and overload its virtual task_done_notify::operator()
/// operator function. Note that the code of that
/// task_done_notify::operator() is assured to run in *sequence*, with
/// respect to the code of other task_done_notify::operator() from
/// other tasks.
queue::queue(unsigned number_of_workers,
const task_done_notify& notifier)
task_done_notify& notifier)
: p_(new priv(number_of_workers, notifier))
{}
@ -324,27 +360,43 @@ queue::get_size() const
/// performing the task. When it's done with the task, it goes back
/// to be suspended, waiting for a new task to be scheduled.
///
/// @param t the task to schedule.
/// @param t the task to schedule. Note that if the queue is empty or
/// if the task is nil, the task is not scheduled.
///
/// @return true iff the task was successfully scheduled.
bool
queue::schedule_task(const task_sptr& t)
{return p_->schedule_task(t);}
/// Submit a vector of tasks to the queue of tasks to be performed.
///
/// This wakes up one or more threads from the pool which immediatly
/// start performing the tasks. When the threads are done with the
/// tasks, they goes back to be suspended, waiting for a new task to
/// be scheduled.
///
/// @param tasks the tasks to schedule.
bool
queue::schedule_tasks(const tasks_type& tasks)
{return p_->schedule_tasks(tasks);}
/// Suspends the current thread until all worker threads finish
/// performing the tasks they are executing.
///
/// If the worker threads were suspended waiting for a new task to
/// perform, they are woken up and their execution ends.
///
/// The execution of the current thread is resume when all the threads
/// of the pool have finished their execution and are terminated.
/// The execution of the current thread is resumed when all the
/// threads of the pool have finished their execution and are
/// terminated.
void
queue::wait_for_workers_to_complete()
{p_->do_bring_workers_down();}
/// Getter of the vector of tasks that got performed.
///
/// @retun the vector of tasks that got performed.
const std::vector<task_sptr>&
/// @return the vector of tasks that got performed.
std::vector<task_sptr>&
queue::get_completed_tasks() const
{return p_->tasks_done;}
@ -382,6 +434,8 @@ worker::wait_to_execute_a_task(queue::priv* p)
do
{
// If there is no more tasks to perform and the queue is not to
// be brought down then wait (sleep) for new tasks to come up.
pthread_mutex_lock(&p->queue_cond_mutex);
while (!more_tasks && !p->bring_workers_down)
{
@ -393,6 +447,8 @@ worker::wait_to_execute_a_task(queue::priv* p)
}
pthread_mutex_unlock(&p->queue_cond_mutex);
// We were woken up. So maybe there are tasks to perform? If
// so, get a task from the queue ...
task_sptr t;
pthread_mutex_lock(&p->tasks_todo_mutex);
if (!p->tasks_todo.empty())
@ -402,13 +458,24 @@ worker::wait_to_execute_a_task(queue::priv* p)
}
pthread_mutex_unlock(&p->tasks_todo_mutex);
// If we've got a task to perform then perform it and when it's
// done then add to the set of tasks that are done.
if (t)
{
t->perform();
// Add the task to the vector of tasks that are done and
// notify listeners about the fact that the task is done.
//
// Note that this (including the notification) is not
// happening in parallel. So the code performed by the
// notifier during the notification is running sequentially,
// not in parallel with any other task that was just done
// and that is notifying its listeners.
pthread_mutex_lock(&p->tasks_done_mutex);
p->tasks_done.push_back(t);
pthread_mutex_unlock(&p->tasks_done_mutex);
p->notify(t);
pthread_mutex_unlock(&p->tasks_done_mutex);
}
pthread_mutex_lock(&p->tasks_todo_mutex);