1
0
mirror of https://github.com/mpv-player/mpv synced 2025-01-11 09:29:29 +00:00
mpv/misc/dispatch.c
wm4 80ff94131b dispatch: implement timeout
Note that this mechanism is similarly "unreliable" as for example
pthread_cond_timedwait(). Trying to target the exact wait time will just
make it more complex.

The main use case for this is for threads which either use the dispatch
centrally and want mp_dispatch_queue_process to do a blocking wait for
new work, or threads which have to implement timers. For the former,
anything is fine, as long as they don't have to do active waiting for
new works. For the former, callers are better off recalculating their
deadline after every call.
2014-04-23 21:16:52 +02:00

262 lines
9.9 KiB
C

/*
* This file is part of mpv.
*
* mpv is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* mpv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with mpv. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdbool.h>
#include <assert.h>
#include "common/common.h"
#include "osdep/threads.h"
#include "dispatch.h"
struct mp_dispatch_queue {
struct mp_dispatch_item *head, *tail;
pthread_mutex_t lock;
pthread_cond_t cond;
int suspend_requested;
bool suspended;
void (*wakeup_fn)(void *wakeup_ctx);
void *wakeup_ctx;
// This lock grant access to the target thread's state during suspend mode.
// During suspend mode, the target thread is blocked in the function
// mp_dispatch_queue_process(), however this function may be processing
// dispatch queue items. This lock serializes the dispatch queue processing
// and external mp_dispatch_lock() calls.
// Invariant: can be held only while suspended==true, and suspend_requested
// must be >0 (unless mp_dispatch_queue_process() locks it). In particular,
// suspend mode must not be left while the lock is held.
pthread_mutex_t exclusive_lock;
};
struct mp_dispatch_item {
mp_dispatch_fn fn;
void *fn_data;
bool asynchronous;
bool completed;
struct mp_dispatch_item *next;
};
static void queue_dtor(void *p)
{
struct mp_dispatch_queue *queue = p;
assert(!queue->head);
assert(!queue->suspend_requested);
assert(!queue->suspended);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->lock);
pthread_mutex_destroy(&queue->exclusive_lock);
}
// A dispatch queue lets other threads runs callbacks in a target thread.
// The target thread is the thread which created the queue and which calls
// mp_dispatch_queue_process().
// Free the dispatch queue with talloc_free(). (It must be empty.)
struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent)
{
struct mp_dispatch_queue *queue = talloc_ptrtype(talloc_parent, queue);
*queue = (struct mp_dispatch_queue){0};
talloc_set_destructor(queue, queue_dtor);
pthread_mutex_init(&queue->exclusive_lock, NULL);
pthread_mutex_init(&queue->lock, NULL);
pthread_cond_init(&queue->cond, NULL);
return queue;
}
// Set a custom function that should be called to guarantee that the target
// thread wakes up. This is intended for use with code that needs to block
// on non-pthread primitives, such as e.g. select(). In the case of select(),
// the wakeup_fn could for example write a byte into a "wakeup" pipe in order
// to unblock the select(). The wakeup_fn is called from the dispatch queue
// when there are new dispatch items, and the target thread should then enter
// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is
// called under no lock, so you might have to do synchronization yourself.
void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue,
void (*wakeup_fn)(void *wakeup_ctx),
void *wakeup_ctx)
{
queue->wakeup_fn = wakeup_fn;
queue->wakeup_ctx = wakeup_ctx;
}
static void mp_dispatch_append(struct mp_dispatch_queue *queue,
struct mp_dispatch_item *item)
{
pthread_mutex_lock(&queue->lock);
if (queue->tail) {
queue->tail->next = item;
} else {
queue->head = item;
}
queue->tail = item;
// Wake up the main thread; note that other threads might wait on this
// condition for reasons, so broadcast the condition.
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
}
// Let the dispatch item process asynchronously. item->fn will be run in the
// target thread's context, but note that mp_dispatch_enqueue() will usually
// return long before that happens. It's up to the user to signal completion
// of the callback. It's also up to the user to guarantee that the context
// (fn_data) has correct lifetime, i.e. lives until the callback is run, and
// is freed after that.
void mp_dispatch_enqueue(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
*item = (struct mp_dispatch_item){
.fn = fn,
.fn_data = fn_data,
.asynchronous = true,
};
mp_dispatch_append(queue, item);
}
// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data)
// after the fn callback has been run. (The callback could trivially do that
// itself, but it makes it easier to implement synchronous and asynchronous
// requests with the same callback implementation.)
void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
struct mp_dispatch_item *item = talloc_ptrtype(NULL, item);
*item = (struct mp_dispatch_item){
.fn = fn,
.fn_data = talloc_steal(item, fn_data),
.asynchronous = true,
};
mp_dispatch_append(queue, item);
}
// Run the dispatch item synchronously. item->fn will be run in the target
// thread's context, and this function will wait until it's done.
void mp_dispatch_run(struct mp_dispatch_queue *queue,
mp_dispatch_fn fn, void *fn_data)
{
struct mp_dispatch_item item = {
.fn = fn,
.fn_data = fn_data,
};
mp_dispatch_append(queue, &item);
pthread_mutex_lock(&queue->lock);
while (!item.completed)
pthread_cond_wait(&queue->cond, &queue->lock);
pthread_mutex_unlock(&queue->lock);
}
// Process any outstanding dispatch items in the queue. This also handles
// suspending or locking the target thread.
// The timeout specifies the minimum wait time. The actual time spent in this
// function can be much higher if the suspending/locking functions are used, or
// if executing the dispatch items takes time. On the other hand, this function
// can return much earlier than the timeout due to sporadic wakeups.
void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout)
{
pthread_mutex_lock(&queue->lock);
queue->suspended = true;
// Wake up thread which called mp_dispatch_suspend().
pthread_cond_broadcast(&queue->cond);
while (queue->head || queue->suspend_requested || timeout > 0) {
if (queue->head) {
struct mp_dispatch_item *item = queue->head;
queue->head = item->next;
if (!queue->head)
queue->tail = NULL;
item->next = NULL;
// Unlock, because we want to allow other threads to queue items
// while the dispatch item is processed.
// At the same time, exclusive_lock must be held to protect the
// thread's user state.
pthread_mutex_unlock(&queue->lock);
pthread_mutex_lock(&queue->exclusive_lock);
item->fn(item->fn_data);
pthread_mutex_unlock(&queue->exclusive_lock);
pthread_mutex_lock(&queue->lock);
if (item->asynchronous) {
talloc_free(item);
} else {
item->completed = true;
// Wakeup mp_dispatch_run()
pthread_cond_broadcast(&queue->cond);
}
} else {
if (timeout > 0) {
mpthread_cond_timed_wait(&queue->cond, &queue->lock, timeout);
} else {
pthread_cond_wait(&queue->cond, &queue->lock);
}
}
timeout = 0;
}
queue->suspended = false;
pthread_mutex_unlock(&queue->lock);
}
// Set the target thread into suspend mode: in this mode, the thread will enter
// mp_dispatch_queue_process(), process any outstanding dispatch items, and
// wait for new items when done (instead of exiting the process function).
// Multiple threads can enter suspend mode at the same time. Suspend mode is
// not a synchronization mechanism; it merely makes sure the target thread does
// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock()
// can be used for exclusive access.
void mp_dispatch_suspend(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
queue->suspend_requested++;
while (!queue->suspended) {
pthread_mutex_unlock(&queue->lock);
if (queue->wakeup_fn)
queue->wakeup_fn(queue->wakeup_ctx);
pthread_mutex_lock(&queue->lock);
if (queue->suspended)
break;
pthread_cond_wait(&queue->cond, &queue->lock);
}
pthread_mutex_unlock(&queue->lock);
}
// Undo mp_dispatch_suspend().
void mp_dispatch_resume(struct mp_dispatch_queue *queue)
{
pthread_mutex_lock(&queue->lock);
assert(queue->suspend_requested > 0);
queue->suspend_requested--;
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->lock);
}
// Grant exclusive access to the target thread's state. While this is active,
// no other thread can return from mp_dispatch_lock() (i.e. it behaves like
// a pthread mutex), and no other thread can get dispatch items completed.
// Other threads can still queue asynchronous dispatch items without waiting,
// and the mutex behavior applies to this function only.
void mp_dispatch_lock(struct mp_dispatch_queue *queue)
{
mp_dispatch_suspend(queue);
pthread_mutex_lock(&queue->exclusive_lock);
}
// Undo mp_dispatch_lock().
void mp_dispatch_unlock(struct mp_dispatch_queue *queue)
{
pthread_mutex_unlock(&queue->exclusive_lock);
mp_dispatch_resume(queue);
}