mirror of
https://github.com/mpv-player/mpv
synced 2025-02-26 18:32:08 +00:00
dispatch: redo locking, and allow reentrant processing
A deadlock bug was reported with the following test program: mpv_handle *mpv = mpv_create(); mpv_set_option_string(mpv, "ytdl", "yes"); mpv_initialize(mpv); mpv_terminate_destroy(mpv); The cause of this is loading the ytdl.lua script, which triggers a certain code path that calls mp_dispatch_queue_process() recursively. It does so to wait until the script is loaded, and we want to keep that. Reentrancy was not supported by mp_dispatch, which leads to the deadlock. Rewrite the locking so that it does. We mainly get rid of the "exclusive_lock" mutex. Instead we use the existing lock/condition variable to wait until we can grab a logical lock. Note that the lock_frame business can be replaced with a simple counter. Instead of checking the lock_frame address, it'd simply increment and store the counter when entering mp_dispatch_queue_process(), and then compare the counter to decide whether or not to wait. But I think the additional error checking done by the lock_frame list is valuable. Fixes #3489.
This commit is contained in:
parent
2619d8eff4
commit
3878a59e2c
171
misc/dispatch.c
171
misc/dispatch.c
@ -28,19 +28,22 @@ struct mp_dispatch_queue {
|
|||||||
struct mp_dispatch_item *head, *tail;
|
struct mp_dispatch_item *head, *tail;
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t lock;
|
||||||
pthread_cond_t cond;
|
pthread_cond_t cond;
|
||||||
int suspend_requested;
|
|
||||||
bool suspended;
|
|
||||||
void (*wakeup_fn)(void *wakeup_ctx);
|
void (*wakeup_fn)(void *wakeup_ctx);
|
||||||
void *wakeup_ctx;
|
void *wakeup_ctx;
|
||||||
// This lock grant access to the target thread's state during suspend mode.
|
// The target thread is blocked by mp_dispatch_queue_process().
|
||||||
// During suspend mode, the target thread is blocked in the function
|
bool idling;
|
||||||
// mp_dispatch_queue_process(), however this function may be processing
|
// A mp_dispatch_lock() call is requesting an exclusive lock.
|
||||||
// dispatch queue items. This lock serializes the dispatch queue processing
|
bool lock_request;
|
||||||
// and external mp_dispatch_lock() calls.
|
// Used to block out threads calling mp_dispatch_queue_process() while
|
||||||
// Invariant: can be held only while suspended==true, and suspend_requested
|
// they're externall locked via mp_dispatch_lock().
|
||||||
// must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
|
// We could use a simple counter, but with this we can perform some
|
||||||
// suspend mode must not be left while the lock is held.
|
// minimal debug checks.
|
||||||
pthread_mutex_t exclusive_lock;
|
struct lock_frame *frame;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct lock_frame {
|
||||||
|
struct lock_frame *prev;
|
||||||
|
pthread_t thread;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct mp_dispatch_item {
|
struct mp_dispatch_item {
|
||||||
@ -55,11 +58,11 @@ static void queue_dtor(void *p)
|
|||||||
{
|
{
|
||||||
struct mp_dispatch_queue *queue = p;
|
struct mp_dispatch_queue *queue = p;
|
||||||
assert(!queue->head);
|
assert(!queue->head);
|
||||||
assert(!queue->suspend_requested);
|
assert(!queue->idling);
|
||||||
assert(!queue->suspended);
|
assert(!queue->lock_request);
|
||||||
|
assert(!queue->frame);
|
||||||
pthread_cond_destroy(&queue->cond);
|
pthread_cond_destroy(&queue->cond);
|
||||||
pthread_mutex_destroy(&queue->lock);
|
pthread_mutex_destroy(&queue->lock);
|
||||||
pthread_mutex_destroy(&queue->exclusive_lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// A dispatch queue lets other threads run callbacks in a target thread.
|
// A dispatch queue lets other threads run callbacks in a target thread.
|
||||||
@ -76,7 +79,6 @@ struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
|
|||||||
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
|
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
|
||||||
*queue = (struct mp_dispatch_queue){0};
|
*queue = (struct mp_dispatch_queue){0};
|
||||||
talloc_set_destructor(queue, queue_dtor);
|
talloc_set_destructor(queue, queue_dtor);
|
||||||
pthread_mutex_init(&queue->exclusive_lock, NULL);
|
|
||||||
pthread_mutex_init(&queue->lock, NULL);
|
pthread_mutex_init(&queue->lock, NULL);
|
||||||
pthread_cond_init(&queue->cond, NULL);
|
pthread_cond_init(&queue->cond, NULL);
|
||||||
return queue;
|
return queue;
|
||||||
@ -170,7 +172,8 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process any outstanding dispatch items in the queue. This also handles
|
// Process any outstanding dispatch items in the queue. This also handles
|
||||||
// suspending or locking the target thread.
|
// suspending or locking the this thread from another thread via
|
||||||
|
// mp_dispatch_lock().
|
||||||
// The timeout specifies the minimum wait time. The actual time spent in this
|
// The timeout specifies the minimum wait time. The actual time spent in this
|
||||||
// function can be much higher if the suspending/locking functions are used, or
|
// function can be much higher if the suspending/locking functions are used, or
|
||||||
// if executing the dispatch items takes time. On the other hand, this function
|
// if executing the dispatch items takes time. On the other hand, this function
|
||||||
@ -182,12 +185,33 @@ void mp_dispatch_run(struct mp_dispatch_queue *queue,
|
|||||||
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
|
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
|
||||||
{
|
{
|
||||||
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
|
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
|
||||||
|
struct lock_frame frame = {
|
||||||
|
.thread = pthread_self(),
|
||||||
|
};
|
||||||
|
|
||||||
pthread_mutex_lock(&queue->lock);
|
pthread_mutex_lock(&queue->lock);
|
||||||
queue->suspended = true;
|
frame.prev = queue->frame;
|
||||||
// Wake up thread which called mp_dispatch_suspend().
|
queue->frame = &frame;
|
||||||
|
// Logically, the queue is idling if the target thread is blocked in
|
||||||
|
// mp_dispatch_queue_process() doing nothing, so it's not possible to call
|
||||||
|
// it again. (Reentrant calls via callbacks temporarily reset the field.)
|
||||||
|
assert(!queue->idling);
|
||||||
|
queue->idling = true;
|
||||||
|
// Wake up thread which called mp_dispatch_lock().
|
||||||
|
if (queue->lock_request)
|
||||||
pthread_cond_broadcast(&queue->cond);
|
pthread_cond_broadcast(&queue->cond);
|
||||||
while (queue->head || queue->suspend_requested || wait > 0) {
|
while (1) {
|
||||||
if (queue->head) {
|
if (queue->lock_request || queue->frame != &frame) {
|
||||||
|
// Block due to something having called mp_dispatch_lock(). This
|
||||||
|
// is either a lock "acquire" (lock_request=true), or a lock in
|
||||||
|
// progress, with the possibility the thread which called
|
||||||
|
// mp_dispatch_lock() is now calling mp_dispatch_queue_process()
|
||||||
|
// (the latter means we must ignore any queue state changes,
|
||||||
|
// until it has been unlocked again).
|
||||||
|
pthread_cond_wait(&queue->cond, &queue->lock);
|
||||||
|
if (queue->frame == &frame)
|
||||||
|
assert(queue->idling);
|
||||||
|
} else if (queue->head) {
|
||||||
struct mp_dispatch_item *item = queue->head;
|
struct mp_dispatch_item *item = queue->head;
|
||||||
queue->head = item->next;
|
queue->head = item->next;
|
||||||
if (!queue->head)
|
if (!queue->head)
|
||||||
@ -195,66 +219,34 @@ void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
|
|||||||
item->next = NULL;
|
item->next = NULL;
|
||||||
// Unlock, because we want to allow other threads to queue items
|
// Unlock, because we want to allow other threads to queue items
|
||||||
// while the dispatch item is processed.
|
// while the dispatch item is processed.
|
||||||
// At the same time, exclusive_lock must be held to protect the
|
// At the same time, we must prevent other threads from returning
|
||||||
// thread's user state.
|
// from mp_dispatch_lock(), which is done by idling=false.
|
||||||
|
queue->idling = false;
|
||||||
pthread_mutex_unlock(&queue->lock);
|
pthread_mutex_unlock(&queue->lock);
|
||||||
pthread_mutex_lock(&queue->exclusive_lock);
|
|
||||||
item->fn(item->fn_data);
|
item->fn(item->fn_data);
|
||||||
pthread_mutex_unlock(&queue->exclusive_lock);
|
|
||||||
pthread_mutex_lock(&queue->lock);
|
pthread_mutex_lock(&queue->lock);
|
||||||
|
assert(!queue->idling);
|
||||||
|
queue->idling = true;
|
||||||
|
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
|
||||||
|
pthread_cond_broadcast(&queue->cond);
|
||||||
if (item->asynchronous) {
|
if (item->asynchronous) {
|
||||||
talloc_free(item);
|
talloc_free(item);
|
||||||
} else {
|
} else {
|
||||||
item->completed = true;
|
item->completed = true;
|
||||||
// Wakeup mp_dispatch_run()
|
|
||||||
pthread_cond_broadcast(&queue->cond);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else if (wait > 0) {
|
||||||
if (wait > 0) {
|
|
||||||
struct timespec ts = mp_time_us_to_timespec(wait);
|
struct timespec ts = mp_time_us_to_timespec(wait);
|
||||||
pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
|
pthread_cond_timedwait(&queue->cond, &queue->lock, &ts);
|
||||||
} else {
|
} else {
|
||||||
pthread_cond_wait(&queue->cond, &queue->lock);
|
break;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
wait = 0;
|
wait = 0;
|
||||||
}
|
}
|
||||||
queue->suspended = false;
|
queue->idling = false;
|
||||||
pthread_mutex_unlock(&queue->lock);
|
assert(queue->frame == &frame);
|
||||||
}
|
queue->frame = frame.prev;
|
||||||
|
|
||||||
// Set the target thread into suspend mode: in this mode, the thread will enter
|
|
||||||
// mp_dispatch_queue_process(), process any outstanding dispatch items, and
|
|
||||||
// wait for new items when done (instead of exiting the process function).
|
|
||||||
// Multiple threads can enter suspend mode at the same time. Suspend mode is
|
|
||||||
// not a synchronization mechanism; it merely makes sure the target thread does
|
|
||||||
// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
|
|
||||||
// can be used for exclusive access.
|
|
||||||
static void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
|
|
||||||
{
|
|
||||||
pthread_mutex_lock(&queue->lock);
|
|
||||||
queue->suspend_requested++;
|
|
||||||
while (!queue->suspended) {
|
|
||||||
pthread_mutex_unlock(&queue->lock);
|
|
||||||
if (queue->wakeup_fn)
|
|
||||||
queue->wakeup_fn(queue->wakeup_ctx);
|
|
||||||
pthread_mutex_lock(&queue->lock);
|
|
||||||
if (queue->suspended)
|
|
||||||
break;
|
|
||||||
pthread_cond_wait(&queue->cond, &queue->lock);
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&queue->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Undo mp_dispatch_suspend().
|
|
||||||
static void mp_dispatch_resume(struct mp_dispatch_queue *queue)
|
|
||||||
{
|
|
||||||
pthread_mutex_lock(&queue->lock);
|
|
||||||
assert(queue->suspended);
|
|
||||||
assert(queue->suspend_requested > 0);
|
|
||||||
queue->suspend_requested--;
|
|
||||||
if (queue->suspend_requested == 0)
|
|
||||||
pthread_cond_broadcast(&queue->cond);
|
|
||||||
pthread_mutex_unlock(&queue->lock);
|
pthread_mutex_unlock(&queue->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -265,13 +257,52 @@ static void mp_dispatch_resume(struct mp_dispatch_queue *queue)
|
|||||||
// and the mutex behavior applies to this function only.
|
// and the mutex behavior applies to this function only.
|
||||||
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
|
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
|
||||||
{
|
{
|
||||||
mp_dispatch_suspend(queue);
|
struct lock_frame *frame = talloc_zero(NULL, struct lock_frame);
|
||||||
pthread_mutex_lock(&queue->exclusive_lock);
|
frame->thread = pthread_self();
|
||||||
|
|
||||||
|
pthread_mutex_lock(&queue->lock);
|
||||||
|
// First grab the queue lock. Something else could be holding the lock.
|
||||||
|
while (queue->lock_request)
|
||||||
|
pthread_cond_wait(&queue->cond, &queue->lock);
|
||||||
|
queue->lock_request = true;
|
||||||
|
// And now wait until the target thread gets "trapped" within the
|
||||||
|
// mp_dispatch_queue_process() call, which will mean we get exclusive
|
||||||
|
// access to the target's thread state.
|
||||||
|
while (!queue->idling) {
|
||||||
|
pthread_mutex_unlock(&queue->lock);
|
||||||
|
if (queue->wakeup_fn)
|
||||||
|
queue->wakeup_fn(queue->wakeup_ctx);
|
||||||
|
pthread_mutex_lock(&queue->lock);
|
||||||
|
if (queue->idling)
|
||||||
|
break;
|
||||||
|
pthread_cond_wait(&queue->cond, &queue->lock);
|
||||||
|
}
|
||||||
|
assert(queue->lock_request);
|
||||||
|
// "Lock".
|
||||||
|
frame->prev = queue->frame;
|
||||||
|
queue->frame = frame;
|
||||||
|
// Reset state for recursive mp_dispatch_queue_process() calls.
|
||||||
|
queue->lock_request = false;
|
||||||
|
queue->idling = false;
|
||||||
|
pthread_mutex_unlock(&queue->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Undo mp_dispatch_lock().
|
// Undo mp_dispatch_lock().
|
||||||
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
|
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
|
||||||
{
|
{
|
||||||
pthread_mutex_unlock(&queue->exclusive_lock);
|
pthread_mutex_lock(&queue->lock);
|
||||||
mp_dispatch_resume(queue);
|
struct lock_frame *frame = queue->frame;
|
||||||
|
// Must be called atfer a mp_dispatch_lock().
|
||||||
|
assert(frame);
|
||||||
|
assert(pthread_equal(frame->thread, pthread_self()));
|
||||||
|
// "Unlock".
|
||||||
|
queue->frame = frame->prev;
|
||||||
|
talloc_free(frame);
|
||||||
|
// This must have been set to false during locking (except temporarily
|
||||||
|
// during recursive mp_dispatch_queue_process() calls).
|
||||||
|
assert(!queue->idling);
|
||||||
|
queue->idling = true;
|
||||||
|
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
|
||||||
|
pthread_cond_broadcast(&queue->cond);
|
||||||
|
pthread_mutex_unlock(&queue->lock);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user