dispatch: redo locking, and allow reentrant processing

A deadlock bug was reported with the following test program:

	mpv_handle *mpv = mpv_create();
	mpv_set_option_string(mpv, "ytdl", "yes");
	mpv_initialize(mpv);
	mpv_terminate_destroy(mpv);

The cause of this is loading the ytdl.lua script, which triggers a
certain code path that calls mp_dispatch_queue_process() recursively. It
does so to wait until the script is loaded, and we want to keep that.

Reentrancy was not supported by mp_dispatch, which leads to the
deadlock. Rewrite the locking so that it does. We mainly get rid of the
"exclusive_lock" mutex. Instead we use the existing lock/condition
variable to wait until we can grab a logical lock.

Note that the lock_frame business can be replaced with a simple counter.
Instead of checking the lock_frame address, it'd simply increment and
store the counter when entering mp_dispatch_queue_process(), and then
compare the counter to decide whether or not to wait. But I think the
additional error checking done by the lock_frame list is valuable.

Fixes #3489.
This commit is contained in:
wm4 2016-09-04 17:00:21 +02:00
parent 2619d8eff4
commit 3878a59e2c
1 changed files with 104 additions and 73 deletions

View File

@ -28,19 +28,22 @@ struct mp_dispatch_queue {
struct mp_dispatch_item *head, *tail;
pthread_mutex_t lock;
pthread_cond_t cond;
int suspend_requested;
bool suspended;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
// This lock grant access to the target thread's state during suspend mode.
// During suspend mode, the target thread is blocked in the function
// mp_dispatch_queue_process(), however this function may be processing
// dispatch queue items. This lock serializes the dispatch queue processing
// and external mp_dispatch_lock() calls.
// Invariant: can be held only while suspended==true, and suspend_requested
// must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
// suspend mode must not be left while the lock is held.
pthread_mutex_t exclusive_lock;
// The target thread is blocked by mp_dispatch_queue_process().
bool idling;
// A mp_dispatch_lock() call is requesting an exclusive lock.
bool lock_request;
// Used to block out threads calling mp_dispatch_queue_process() while
// they're externall locked via mp_dispatch_lock().
// We could use a simple counter, but with this we can perform some
// minimal debug checks.
struct lock_frame *frame;
};
struct lock_frame {
struct lock_frame *prev;
pthread_t thread;
};
struct mp_dispatch_item {
@ -55,11 +58,11 @@ static void queue_dtor(void *p)
{
struct mp_dispatch_queue *queue = p;
assert(!queue->head);
assert(!queue->suspend_requested);
assert(!queue->suspended);
assert(!queue->idling);
assert(!queue->lock_request);
assert(!queue->frame);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
pthread_mutex_destroy(&queue->exclusive_lock);
}
// A dispatch queue lets other threads run callbacks in a target thread.
@ -76,7 +79,6 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
pthread_mutex_init(&queue->exclusive_lock, NULL);
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->cond, NULL);
return queue;
@ -170,7 +172,8 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
}
// Process any outstanding dispatch items in the queue. This also handles
// suspending or locking the target thread.
// suspending or locking the this thread from another thread via
// mp_dispatch_lock().
// The timeout specifies the minimum wait time. The actual time spent in this
// function can be much higher if the suspending/locking functions are used, or
// if executing the dispatch items takes time. On the other hand, this function
@ -182,12 +185,33 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
struct lock_frame frame = {
.thread = pthread_self(),
};
pthread_mutex_lock(&queue->lock);
queue->suspended = true;
// Wake up thread which called mp_dispatch_suspend().
pthread_cond_broadcast(&queue->cond);
while (queue->head || queue->suspend_requested || wait > 0) {
if (queue->head) {
frame.prev = queue->frame;
queue->frame = &frame;
// Logically, the queue is idling if the target thread is blocked in
// mp_dispatch_queue_process() doing nothing, so it's not possible to call
// it again. (Reentrant calls via callbacks temporarily reset the field.)
assert(!queue->idling);
queue->idling = true;
// Wake up thread which called mp_dispatch_lock().
if (queue->lock_request)
pthread_cond_broadcast(&queue->cond);
while (1) {
if (queue->lock_request || queue->frame != &frame) {
// Block due to something having called mp_dispatch_lock(). This
// is either a lock "acquire" (lock_request=true), or a lock in
// progress, with the possibility the thread which called
// mp_dispatch_lock() is now calling mp_dispatch_queue_process()
// (the latter means we must ignore any queue state changes,
// until it has been unlocked again).
pthread_cond_wait(&queue->cond, &queue->lock);
if (queue->frame == &frame)
assert(queue->idling);
} else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
if (!queue->head)
@ -195,66 +219,34 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
item->next = NULL;
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
// At the same time, exclusive_lock must be held to protect the
// thread's user state.
// At the same time, we must prevent other threads from returning
// from mp_dispatch_lock(), which is done by idling=false.
queue->idling = false;
pthread_mutex_unlock(&queue->lock);
pthread_mutex_lock(&queue->exclusive_lock);
item->fn(item->fn_data);
pthread_mutex_unlock(&queue->exclusive_lock);
pthread_mutex_lock(&queue->lock);
assert(!queue->idling);
queue->idling = true;
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
pthread_cond_broadcast(&queue->cond);
if (item->asynchronous) {
talloc_free(item);
} else {
item->completed = true;
// Wakeup mp_dispatch_run()
pthread_cond_broadcast(&queue->cond);
}
} else if (wait > 0) {
struct timespec ts = mp_time_us_to_timespec(wait);
pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
} else {
if (wait > 0) {
struct timespec ts = mp_time_us_to_timespec(wait);
pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
} else {
pthread_cond_wait(&queue->cond, &queue->lock);
}
break;
}
wait = 0;
}
queue->suspended = false;
pthread_mutex_unlock(&queue->lock);
}
// Set the target thread into suspend mode: in this mode, the thread will enter
// mp_dispatch_queue_process(), process any outstanding dispatch items, and
// wait for new items when done (instead of exiting the process function).
// Multiple threads can enter suspend mode at the same time. Suspend mode is
// not a synchronization mechanism; it merely makes sure the target thread does
// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
// can be used for exclusive access.
static void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
queue->suspend_requested++;
while (!queue->suspended) {
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
pthread_mutex_lock(&queue->lock);
if (queue->suspended)
break;
pthread_cond_wait(&queue->cond, &queue->lock);
}
pthread_mutex_unlock(&queue->lock);
}
// Undo mp_dispatch_suspend().
static void mp_dispatch_resume(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
assert(queue->suspended);
assert(queue->suspend_requested > 0);
queue->suspend_requested--;
if (queue->suspend_requested == 0)
pthread_cond_broadcast(&queue->cond);
queue->idling = false;
assert(queue->frame == &frame);
queue->frame = frame.prev;
pthread_mutex_unlock(&queue->lock);
}
@ -265,13 +257,52 @@ static void mp_dispatch_resume(struct mp_dispatch_queue *queue)
// and the mutex behavior applies to this function only.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
mp_dispatch_suspend(queue);
pthread_mutex_lock(&queue->exclusive_lock);
struct lock_frame *frame = talloc_zero(NULL, struct lock_frame);
frame->thread = pthread_self();
pthread_mutex_lock(&queue->lock);
// First grab the queue lock. Something else could be holding the lock.
while (queue->lock_request)
pthread_cond_wait(&queue->cond, &queue->lock);
queue->lock_request = true;
// And now wait until the target thread gets "trapped" within the
// mp_dispatch_queue_process() call, which will mean we get exclusive
// access to the target's thread state.
while (!queue->idling) {
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
pthread_mutex_lock(&queue->lock);
if (queue->idling)
break;
pthread_cond_wait(&queue->cond, &queue->lock);
}
assert(queue->lock_request);
// "Lock".
frame->prev = queue->frame;
queue->frame = frame;
// Reset state for recursive mp_dispatch_queue_process() calls.
queue->lock_request = false;
queue->idling = false;
pthread_mutex_unlock(&queue->lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
pthread_mutex_unlock(&queue->exclusive_lock);
mp_dispatch_resume(queue);
pthread_mutex_lock(&queue->lock);
struct lock_frame *frame = queue->frame;
// Must be called atfer a mp_dispatch_lock().
assert(frame);
assert(pthread_equal(frame->thread, pthread_self()));
// "Unlock".
queue->frame = frame->prev;
talloc_free(frame);
// This must have been set to false during locking (except temporarily
// during recursive mp_dispatch_queue_process() calls).
assert(!queue->idling);
queue->idling = true;
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
}