dispatch: use a real lock for mp_dispatch_lock()

This is much simpler, leaves fairness isues etc. to the operating
system, and will work better with threading-related debugging tools.

The "trick" to this is that the lock can be acquired and held only while
the queue is in suspend mode. This way we don't need to make sure the
lock is held outside of mp_dispatch_queue_process, which would be quite
messy to get right, because it would have to be in locked state by
default.
This commit is contained in:
wm4 2014-04-23 20:43:31 +02:00
parent ed7e7e2eb4
commit 29b7260398
1 changed files with 20 additions and 37 deletions

View File

@ -28,11 +28,18 @@ struct mp_dispatch_queue {
pthread_mutex_t lock;
pthread_cond_t cond;
int suspend_requested;
int lock_requested;
bool suspended;
bool locked;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
// This lock grant access to the target thread's state during suspend mode.
// During suspend mode, the target thread is blocked in the function
// mp_dispatch_queue_process(), however this function may be processing
// dispatch queue items. This lock serializes the dispatch queue processing
// and external mp_dispatch_lock() calls.
// Invariant: can be held only while suspended==true, and suspend_requested
// must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
// suspend mode must not be left while the lock is held.
pthread_mutex_t exclusive_lock;
};
struct mp_dispatch_item {
@ -49,9 +56,9 @@ static void queue_dtor(void *p)
assert(!queue->head);
assert(!queue->suspend_requested);
assert(!queue->suspended);
assert(!queue->locked);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
pthread_mutex_destroy(&queue->exclusive_lock);
}
// A dispatch queue lets other threads runs callbacks in a target thread.
@ -63,6 +70,7 @@ struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent)
struct mp_dispatch_queue *queue = talloc_ptrtype(talloc_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
pthread_mutex_init(&queue->exclusive_lock, NULL);
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->cond, NULL);
return queue;
@ -166,7 +174,7 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
// Wake up thread which called mp_dispatch_suspend().
pthread_cond_broadcast(&queue->cond);
while (queue->head || queue->suspend_requested) {
if (queue->head && !queue->locked) {
if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
if (!queue->head)
@ -174,11 +182,13 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
item->next = NULL;
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
queue->locked = true;
// At the same time, exclusive_lock must be held to protect the
// thread's user state.
pthread_mutex_unlock(&queue->lock);
pthread_mutex_lock(&queue->exclusive_lock);
item->fn(item->fn_data);
pthread_mutex_unlock(&queue->exclusive_lock);
pthread_mutex_lock(&queue->lock);
queue->locked = false;
if (item->asynchronous) {
talloc_free(item);
} else {
@ -187,8 +197,6 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
pthread_cond_broadcast(&queue->cond);
}
} else {
if (!queue->locked && queue->lock_requested)
pthread_cond_broadcast(&queue->cond);
pthread_cond_wait(&queue->cond, &queue->lock);
}
}
@ -236,38 +244,13 @@ void mp_dispatch_resume(struct mp_dispatch_queue *queue)
// and the mutex behavior applies to this function only.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
// TODO: acquiring a lock should probably be serialized with other
// dispatch items to guarantee minimum fairness.
pthread_mutex_lock(&queue->lock);
queue->suspend_requested++;
queue->lock_requested++;
while (1) {
if (queue->suspended && !queue->locked) {
queue->locked = true;
break;
}
if (!queue->suspended) {
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
pthread_mutex_lock(&queue->lock);
if (queue->suspended)
continue;
}
pthread_cond_wait(&queue->cond, &queue->lock);
}
queue->lock_requested--;
pthread_mutex_unlock(&queue->lock);
mp_dispatch_suspend(queue);
pthread_mutex_lock(&queue->exclusive_lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
assert(queue->locked);
assert(queue->suspend_requested > 0);
queue->locked = false;
queue->suspend_requested--;
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
pthread_mutex_unlock(&queue->exclusive_lock);
mp_dispatch_resume(queue);
}