1
0
mirror of https://github.com/mpv-player/mpv synced 2024-12-30 11:02:10 +00:00
mpv/misc/dispatch.c
wm4 d2bdb72b69 options: add a thread-safe way to notify option updates
So far, we had a thread-safe way to read options, but no option update
notification mechanism. Everything was funneled though the main thread's
central mp_option_change_callback() function. For example, if the
panscan options were changed, the function called vo_control() with
VOCTRL_SET_PANSCAN to manually notify the VO thread of updates. This
worked, but's pretty inconvenient. Most of these problems come from the
fact that MPlayer was written as a single-threaded program.

This commit works towards a more flexible mechanism. It adds an update
callback to m_config_cache (the thing that is already used for
thread-safe access of global options).

This alone would still be rather inconvenient, at least in context of
VOs. Add another mechanism on top of it that uses mp_dispatch_queue, and
takes care of some annoying synchronization issues. We extend
mp_dispatch_queue itself to make this easier and slightly more
efficient.

As a first application, use this to reimplement certain VO scaling and
renderer options. The update_opts() function translates these to the
"old" VOCTRLs, though.

An annoyingly subtle issue is that m_config_cache's destructor now
releases pending notifications, and must be released before the
associated dispatch queue. Otherwise, it could happen that option
updates during e.g. VO destruction queue or run stale entries, which is
not expected.

Rather untested. The singly-linked list code in dispatch.c is probably
buggy, and I bet some aspects about synchronization are not entirely
sane.
2017-08-22 15:50:33 +02:00

393 lines
15 KiB
C

/*
* This file is part of mpv.
*
* mpv is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* mpv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with mpv. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdbool.h>
#include <assert.h>
#include "common/common.h"
#include "osdep/threads.h"
#include "osdep/timer.h"
#include "dispatch.h"
struct mp_dispatch_queue {
struct mp_dispatch_item *head, *tail;
pthread_mutex_t lock;
pthread_cond_t cond;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
// Make mp_dispatch_queue_process() exit if it's idle.
bool interrupted;
// The target thread is blocked by mp_dispatch_queue_process(). Note that
// mp_dispatch_lock() can set this from true to false to keep the thread
// blocked (this stops if from processing other dispatch items, and from
// other threads to return from mp_dispatch_lock(), making it an exclusive
// lock).
bool idling;
// A mp_dispatch_lock() call is requesting an exclusive lock.
bool lock_request;
// Used to block out threads calling mp_dispatch_queue_process() while
// they're externall locked via mp_dispatch_lock().
// We could use a simple counter (increment it instead of adding a frame,
// also increment it when locking), but with this we can perform some
// minimal debug checks.
struct lock_frame *frame;
};
struct lock_frame {
struct lock_frame *prev;
pthread_t thread;
pthread_t locked_thread;
bool locked;
};
struct mp_dispatch_item {
mp_dispatch_fn fn;
void *fn_data;
bool asynchronous;
bool mergeable;
bool completed;
struct mp_dispatch_item *next;
};
static void queue_dtor(void *p)
{
struct mp_dispatch_queue *queue = p;
assert(!queue->head);
assert(!queue->idling);
assert(!queue->lock_request);
assert(!queue->frame);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
}
// A dispatch queue lets other threads run callbacks in a target thread.
// The target thread is the thread which calls mp_dispatch_queue_process().
// Free the dispatch queue with talloc_free(). At the time of destruction,
// the queue must be empty. The easiest way to guarantee this is to
// terminate all potential senders, then call mp_dispatch_run() with a
// function that e.g. makes the target thread exit, then pthread_join() the
// target thread, and finally destroy the queue. Another way is calling
// mp_dispatch_queue_process() after terminating all potential senders, and
// then destroying the queue.
struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent)
{
struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->cond, NULL);
return queue;
}
// Set a custom function that should be called to guarantee that the target
// thread wakes up. This is intended for use with code that needs to block
// on non-pthread primitives, such as e.g. select(). In the case of select(),
// the wakeup_fn could for example write a byte into a "wakeup" pipe in order
// to unblock the select(). The wakeup_fn is called from the dispatch queue
// when there are new dispatch items, and the target thread should then enter
// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is
// called under no lock, so you might have to do synchronization yourself.
void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
void (*wakeup_fn)(void *wakeup_ctx),
void *wakeup_ctx)
{
queue->wakeup_fn = wakeup_fn;
queue->wakeup_ctx = wakeup_ctx;
}
static void mp_dispatch_append(struct mp_dispatch_queue *queue,
struct mp_dispatch_item *item)
{
pthread_mutex_lock(&queue->lock);
if (item->mergeable) {
for (struct mp_dispatch_item *cur = queue->head; cur; cur = cur->next) {
if (cur->mergeable && cur->fn == item->fn &&
cur->fn_data == item->fn_data)
{
talloc_free(item);
pthread_mutex_unlock(&queue->lock);
return;
}
}
}
if (queue->tail) {
queue->tail->next = item;
} else {
queue->head = item;
}
queue->tail = item;
// Wake up the main thread; note that other threads might wait on this
// condition for reasons, so broadcast the condition.
pthread_cond_broadcast(&queue->cond);
// No wakeup callback -> assume mp_dispatch_queue_process() needs to be
// interrupted instead.
if (!queue->wakeup_fn)
queue->interrupted = true;
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
}
// Enqueue a callback to run it on the target thread asynchronously. The target
// thread will run fn(fn_data) as soon as it enter mp_dispatch_queue_process.
// Note that mp_dispatch_enqueue() will usually return long before that happens.
// It's up to the user to signal completion of the callback. It's also up to
// the user to guarantee that the context fn_data has correct lifetime, i.e.
// lives until the callback is run, and is freed after that.
void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
*item = (struct mp_dispatch_item){
.fn = fn,
.fn_data = fn_data,
.asynchronous = true,
};
mp_dispatch_append(queue, item);
}
// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data)
// after the fn callback has been run. (The callback could trivially do that
// itself, but it makes it easier to implement synchronous and asynchronous
// requests with the same callback implementation.)
void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
*item = (struct mp_dispatch_item){
.fn = fn,
.fn_data = talloc_steal(item, fn_data),
.asynchronous = true,
};
mp_dispatch_append(queue, item);
}
// Like mp_dispatch_enqueue(), but
void mp_dispatch_enqueue_notify(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
*item = (struct mp_dispatch_item){
.fn = fn,
.fn_data = fn_data,
.mergeable = true,
.asynchronous = true,
};
mp_dispatch_append(queue, item);
}
// Remove already queued item. Only items enqueued with the following functions
// can be canceled:
// - mp_dispatch_enqueue()
// - mp_dispatch_enqueue_notify()
// Items which were enqueued, and which are currently executing, can not be
// canceled anymore. This function is mostly for being called from the same
// context as mp_dispatch_queue_process(), where the "currently executing" case
// can be excluded.
void mp_dispatch_cancel_fn(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
pthread_mutex_lock(&queue->lock);
struct mp_dispatch_item **pcur = &queue->head;
queue->tail = NULL;
while (*pcur) {
struct mp_dispatch_item *cur = *pcur;
if (cur->fn == fn && cur->fn_data == fn_data) {
*pcur = cur->next;
talloc_free(cur);
} else {
queue->tail = cur;
pcur = &cur->next;
}
}
pthread_mutex_unlock(&queue->lock);
}
// Run fn(fn_data) on the target thread synchronously. This function enqueues
// the callback and waits until the target thread is done doing this.
// This is redundant to calling the function inside mp_dispatch_[un]lock(),
// but can be helpful with code that relies on TLS (such as OpenGL).
void mp_dispatch_run(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
struct mp_dispatch_item item = {
.fn = fn,
.fn_data = fn_data,
};
mp_dispatch_append(queue, &item);
pthread_mutex_lock(&queue->lock);
while (!item.completed)
pthread_cond_wait(&queue->cond, &queue->lock);
pthread_mutex_unlock(&queue->lock);
}
// Process any outstanding dispatch items in the queue. This also handles
// suspending or locking the this thread from another thread via
// mp_dispatch_lock().
// The timeout specifies the minimum wait time. The actual time spent in this
// function can be much higher if the suspending/locking functions are used, or
// if executing the dispatch items takes time. On the other hand, this function
// can return much earlier than the timeout due to sporadic wakeups.
// Note that this will strictly return only after:
// - timeout has passed,
// - all queue items were processed,
// - the possibly acquired lock has been released
// It's possible to cancel the timeout by calling mp_dispatch_interrupt().
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0;
struct lock_frame frame = {
.thread = pthread_self(),
};
pthread_mutex_lock(&queue->lock);
frame.prev = queue->frame;
queue->frame = &frame;
// Logically, the queue is idling if the target thread is blocked in
// mp_dispatch_queue_process() doing nothing, so it's not possible to call
// it again. (Reentrant calls via callbacks temporarily reset the field.)
assert(!queue->idling);
queue->idling = true;
// Wake up thread which called mp_dispatch_lock().
if (queue->lock_request)
pthread_cond_broadcast(&queue->cond);
while (1) {
if (queue->lock_request || queue->frame != &frame || frame.locked) {
// Block due to something having called mp_dispatch_lock(). This
// is either a lock "acquire" (lock_request=true), or a lock in
// progress, with the possibility the thread which called
// mp_dispatch_lock() is now calling mp_dispatch_queue_process()
// (the latter means we must ignore any queue state changes,
// until it has been unlocked again).
pthread_cond_wait(&queue->cond, &queue->lock);
if (queue->frame == &frame && !frame.locked)
assert(queue->idling);
} else if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
if (!queue->head)
queue->tail = NULL;
item->next = NULL;
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
// At the same time, we must prevent other threads from returning
// from mp_dispatch_lock(), which is done by idling=false.
queue->idling = false;
pthread_mutex_unlock(&queue->lock);
item->fn(item->fn_data);
pthread_mutex_lock(&queue->lock);
assert(!queue->idling);
queue->idling = true;
// Wakeup mp_dispatch_run(), also mp_dispatch_lock().
pthread_cond_broadcast(&queue->cond);
if (item->asynchronous) {
talloc_free(item);
} else {
item->completed = true;
}
} else if (wait > 0 && !queue->interrupted) {
struct timespec ts = mp_time_us_to_timespec(wait);
if (pthread_cond_timedwait(&queue->cond, &queue->lock, &ts))
wait = 0;
} else {
break;
}
}
queue->idling = false;
assert(!frame.locked);
assert(queue->frame == &frame);
queue->frame = frame.prev;
queue->interrupted = false;
pthread_mutex_unlock(&queue->lock);
}
// If the queue is inside of mp_dispatch_queue_process(), make it return as
// soon as all work items have been run, without waiting for the timeout. This
// does not make it return early if it's blocked by a mp_dispatch_lock().
// If mp_dispatch_queue_process() is called in a reentrant way (including the
// case where another thread calls mp_dispatch_lock() and then
// mp_dispatch_queue_process()), this affects only the "topmost" invocation.
void mp_dispatch_interrupt(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
queue->interrupted = true;
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
}
// Grant exclusive access to the target thread's state. While this is active,
// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
// a pthread mutex), and no other thread can get dispatch items completed.
// Other threads can still queue asynchronous dispatch items without waiting,
// and the mutex behavior applies to this function only.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
// First grab the queue lock. Something else could be holding the lock.
while (queue->lock_request)
pthread_cond_wait(&queue->cond, &queue->lock);
queue->lock_request = true;
// And now wait until the target thread gets "trapped" within the
// mp_dispatch_queue_process() call, which will mean we get exclusive
// access to the target's thread state.
while (!queue->idling) {
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
pthread_mutex_lock(&queue->lock);
if (queue->idling)
break;
pthread_cond_wait(&queue->cond, &queue->lock);
}
assert(queue->lock_request);
assert(queue->frame); // must be set if idling
assert(!queue->frame->locked); // no recursive locking on the same level
// "Lock".
queue->frame->locked = true;
queue->frame->locked_thread = pthread_self();
// Reset state for recursive mp_dispatch_queue_process() calls.
queue->lock_request = false;
queue->idling = false;
pthread_mutex_unlock(&queue->lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
// Must be called atfer a mp_dispatch_lock().
assert(queue->frame);
assert(queue->frame->locked);
assert(pthread_equal(queue->frame->locked_thread, pthread_self()));
// "Unlock".
queue->frame->locked = false;
// This must have been set to false during locking (except temporarily
// during recursive mp_dispatch_queue_process() calls).
assert(!queue->idling);
queue->idling = true;
// Wakeup mp_dispatch_queue_process(), and maybe other mp_dispatch_lock()s.
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
}