mirror of
https://github.com/mpv-player/mpv
synced 2025-01-17 12:31:25 +00:00
dispatch: redo locking, and allow reentrant processing
A deadlock bug was reported with the following test program: mpv_handle *mpv = mpv_create(); mpv_set_option_string(mpv, "ytdl", "yes"); mpv_initialize(mpv); mpv_terminate_destroy(mpv); The cause of this is loading the ytdl.lua script, which triggers a certain code path that calls mp_dispatch_queue_process() recursively. It does so to wait until the script is loaded, and we want to keep that. Reentrancy was not supported by mp_dispatch, which leads to the deadlock. Rewrite the locking so that it does. We mainly get rid of the "exclusive_lock" mutex. Instead we use the existing lock/condition variable to wait until we can grab a logical lock. Note that the lock_frame business can be replaced with a simple counter. Instead of checking the lock_frame address, it'd simply increment and store the counter when entering mp_dispatch_queue_process(), and then compare the counter to decide whether or not to wait. But I think the additional error checking done by the lock_frame list is valuable. Fixes #3489.
This commit is contained in:
parent
2619d8eff4
commit
3878a59e2c
177
misc/dispatch.c
177
misc/dispatch.c
@ -28,19 +28,22 @@ struct mp_dispatch_queue {
|
||||
struct mp_dispatch_item *head, *tail;
|
||||
pthread_mutex_t lock;
|
||||
pthread_cond_t cond;
|
||||
int suspend_requested;
|
||||
bool suspended;
|
||||
void (*wakeup_fn)(void *wakeup_ctx);
|
||||
void *wakeup_ctx;
|
||||
// This lock grant access to the target thread's state during suspend mode.
|
||||
// During suspend mode, the target thread is blocked in the function
|
||||
// mp_dispatch_queue_process(), however this function may be processing
|
||||
// dispatch queue items. This lock serializes the dispatch queue processing
|
||||
// and external mp_dispatch_lock() calls.
|
||||
// Invariant: can be held only while suspended==true, and suspend_requested
|
||||
// must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
|
||||
// suspend mode must not be left while the lock is held.
|
||||
pthread_mutex_t exclusive_lock;
|
||||
// The target thread is blocked by mp_dispatch_queue_process().
|
||||
bool idling;
|
||||
// A mp_dispatch_lock() call is requesting an exclusive lock.
|
||||
bool lock_request;
|
||||
// Used to block out threads calling mp_dispatch_queue_process() while
|
||||
// they're externall locked via mp_dispatch_lock().
|
||||
// We could use a simple counter, but with this we can perform some
|
||||
// minimal debug checks.
|
||||
struct lock_frame *frame;
|
||||
};
|
||||
|
||||
struct lock_frame {
|
||||
struct lock_frame *prev;
|
||||
pthread_t thread;
|
||||
};
|
||||
|
||||
struct mp_dispatch_item {
|
||||
@ -55,11 +58,11 @@ static void queue_dtor(void *p)
|
||||
{
|
||||
struct mp_dispatch_queue *queue = p;
|
||||
assert(!queue->head);
|
||||
assert(!queue->suspend_requested);
|
||||
assert(!queue->suspended);
|
||||
assert(!queue->idling);
|
||||
assert(!queue->lock_request);
|
||||
assert(!queue->frame);
|
||||
pthread_cond_destroy(&queue->cond);
|
||||
pthread_mutex_destroy(&queue->lock);
|
||||
pthread_mutex_destroy(&queue->exclusive_lock);
|
||||
}
|
||||
|
||||
// A dispatch queue lets other threads run callbacks in a target thread.
|
||||
@ -76,7 +79,6 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
|
||||
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
|
||||
*queue = (struct mp_dispatch_queue){0};
|
||||
talloc_set_destructor(queue, queue_dtor);
|
||||
pthread_mutex_init(&queue->exclusive_lock, NULL);
|
||||
pthread_mutex_init(&queue->lock, NULL);
|
||||
pthread_cond_init(&queue->cond, NULL);
|
||||
return queue;
|
||||
@ -170,7 +172,8 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
|
||||
}
|
||||
|
||||
// Process any outstanding dispatch items in the queue. This also handles
|
||||
// suspending or locking the target thread.
|
||||
// suspending or locking the this thread from another thread via
|
||||
// mp_dispatch_lock().
|
||||
// The timeout specifies the minimum wait time. The actual time spent in this
|
||||
// function can be much higher if the suspending/locking functions are used, or
|
||||
// if executing the dispatch items takes time. On the other hand, this function
|
||||
@ -182,12 +185,33 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
|
||||
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
|
||||
{
|
||||
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
|
||||
struct lock_frame frame = {
|
||||
.thread = pthread_self(),
|
||||
};
|
||||
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
queue->suspended = true;
|
||||
// Wake up thread which called mp_dispatch_suspend().
|
||||
pthread_cond_broadcast(&queue->cond);
|
||||
while (queue->head || queue->suspend_requested || wait > 0) {
|
||||
if (queue->head) {
|
||||
frame.prev = queue->frame;
|
||||
queue->frame = &frame;
|
||||
// Logically, the queue is idling if the target thread is blocked in
|
||||
// mp_dispatch_queue_process() doing nothing, so it's not possible to call
|
||||
// it again. (Reentrant calls via callbacks temporarily reset the field.)
|
||||
assert(!queue->idling);
|
||||
queue->idling = true;
|
||||
// Wake up thread which called mp_dispatch_lock().
|
||||
if (queue->lock_request)
|
||||
pthread_cond_broadcast(&queue->cond);
|
||||
while (1) {
|
||||
if (queue->lock_request || queue->frame != &frame) {
|
||||
// Block due to something having called mp_dispatch_lock(). This
|
||||
// is either a lock "acquire" (lock_request=true), or a lock in
|
||||
// progress, with the possibility the thread which called
|
||||
// mp_dispatch_lock() is now calling mp_dispatch_queue_process()
|
||||
// (the latter means we must ignore any queue state changes,
|
||||
// until it has been unlocked again).
|
||||
pthread_cond_wait(&queue->cond, &queue->lock);
|
||||
if (queue->frame == &frame)
|
||||
assert(queue->idling);
|
||||
} else if (queue->head) {
|
||||
struct mp_dispatch_item *item = queue->head;
|
||||
queue->head = item->next;
|
||||
if (!queue->head)
|
||||
@ -195,66 +219,34 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
|
||||
item->next = NULL;
|
||||
// Unlock, because we want to allow other threads to queue items
|
||||
// while the dispatch item is processed.
|
||||
// At the same time, exclusive_lock must be held to protect the
|
||||
// thread's user state.
|
||||
// At the same time, we must prevent other threads from returning
|
||||
// from mp_dispatch_lock(), which is done by idling=false.
|
||||
queue->idling = false;
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
pthread_mutex_lock(&queue->exclusive_lock);
|
||||
|
||||
item->fn(item->fn_data);
|
||||
pthread_mutex_unlock(&queue->exclusive_lock);
|
||||
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
assert(!queue->idling);
|
||||
queue->idling = true;
|
||||
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
|
||||
pthread_cond_broadcast(&queue->cond);
|
||||
if (item->asynchronous) {
|
||||
talloc_free(item);
|
||||
} else {
|
||||
item->completed = true;
|
||||
// Wakeup mp_dispatch_run()
|
||||
pthread_cond_broadcast(&queue->cond);
|
||||
}
|
||||
} else if (wait > 0) {
|
||||
struct timespec ts = mp_time_us_to_timespec(wait);
|
||||
pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
|
||||
} else {
|
||||
if (wait > 0) {
|
||||
struct timespec ts = mp_time_us_to_timespec(wait);
|
||||
pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
|
||||
} else {
|
||||
pthread_cond_wait(&queue->cond, &queue->lock);
|
||||
}
|
||||
break;
|
||||
}
|
||||
wait = 0;
|
||||
}
|
||||
queue->suspended = false;
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
}
|
||||
|
||||
// Set the target thread into suspend mode: in this mode, the thread will enter
|
||||
// mp_dispatch_queue_process(), process any outstanding dispatch items, and
|
||||
// wait for new items when done (instead of exiting the process function).
|
||||
// Multiple threads can enter suspend mode at the same time. Suspend mode is
|
||||
// not a synchronization mechanism; it merely makes sure the target thread does
|
||||
// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
|
||||
// can be used for exclusive access.
|
||||
static void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
|
||||
{
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
queue->suspend_requested++;
|
||||
while (!queue->suspended) {
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
if (queue->wakeup_fn)
|
||||
queue->wakeup_fn(queue->wakeup_ctx);
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
if (queue->suspended)
|
||||
break;
|
||||
pthread_cond_wait(&queue->cond, &queue->lock);
|
||||
}
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
}
|
||||
|
||||
// Undo mp_dispatch_suspend().
|
||||
static void mp_dispatch_resume(struct mp_dispatch_queue *queue)
|
||||
{
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
assert(queue->suspended);
|
||||
assert(queue->suspend_requested > 0);
|
||||
queue->suspend_requested--;
|
||||
if (queue->suspend_requested == 0)
|
||||
pthread_cond_broadcast(&queue->cond);
|
||||
queue->idling = false;
|
||||
assert(queue->frame == &frame);
|
||||
queue->frame = frame.prev;
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
}
|
||||
|
||||
@ -265,13 +257,52 @@ static void mp_dispatch_resume(struct mp_dispatch_queue *queue)
|
||||
// and the mutex behavior applies to this function only.
|
||||
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
|
||||
{
|
||||
mp_dispatch_suspend(queue);
|
||||
pthread_mutex_lock(&queue->exclusive_lock);
|
||||
struct lock_frame *frame = talloc_zero(NULL, struct lock_frame);
|
||||
frame->thread = pthread_self();
|
||||
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
// First grab the queue lock. Something else could be holding the lock.
|
||||
while (queue->lock_request)
|
||||
pthread_cond_wait(&queue->cond, &queue->lock);
|
||||
queue->lock_request = true;
|
||||
// And now wait until the target thread gets "trapped" within the
|
||||
// mp_dispatch_queue_process() call, which will mean we get exclusive
|
||||
// access to the target's thread state.
|
||||
while (!queue->idling) {
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
if (queue->wakeup_fn)
|
||||
queue->wakeup_fn(queue->wakeup_ctx);
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
if (queue->idling)
|
||||
break;
|
||||
pthread_cond_wait(&queue->cond, &queue->lock);
|
||||
}
|
||||
assert(queue->lock_request);
|
||||
// "Lock".
|
||||
frame->prev = queue->frame;
|
||||
queue->frame = frame;
|
||||
// Reset state for recursive mp_dispatch_queue_process() calls.
|
||||
queue->lock_request = false;
|
||||
queue->idling = false;
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
}
|
||||
|
||||
// Undo mp_dispatch_lock().
|
||||
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
|
||||
{
|
||||
pthread_mutex_unlock(&queue->exclusive_lock);
|
||||
mp_dispatch_resume(queue);
|
||||
pthread_mutex_lock(&queue->lock);
|
||||
struct lock_frame *frame = queue->frame;
|
||||
// Must be called atfer a mp_dispatch_lock().
|
||||
assert(frame);
|
||||
assert(pthread_equal(frame->thread, pthread_self()));
|
||||
// "Unlock".
|
||||
queue->frame = frame->prev;
|
||||
talloc_free(frame);
|
||||
// This must have been set to false during locking (except temporarily
|
||||
// during recursive mp_dispatch_queue_process() calls).
|
||||
assert(!queue->idling);
|
||||
queue->idling = true;
|
||||
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
|
||||
pthread_cond_broadcast(&queue->cond);
|
||||
pthread_mutex_unlock(&queue->lock);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user