From a253c72dbb0fedf3b83f2f20f1f311c40551a6b1 Mon Sep 17 00:00:00 2001 From: wm4 Date: Thu, 17 May 2018 21:20:26 +0200 Subject: [PATCH] thread_tools: unify mp_cancel POSIX/win32 paths, add features The OS specifics are merged because the resulting ifdeffery is not much worse than the old ifdeffery, but the logic that is now shared is becoming more complex. Create all objects lazily. The intention is to make mp_cancel instances cheaper. POSIX pipes and win32 Events are pretty heavy weight, and are only needed in special situations. Add a mechanism to "chain" mp_cancel instances. Needed by the later commits for whatever reasons. Untested on win32. --- misc/thread_tools.c | 212 +++++++++++++++++++++++++++++++------------- misc/thread_tools.h | 17 ++++ 2 files changed, 165 insertions(+), 64 deletions(-) diff --git a/misc/thread_tools.c b/misc/thread_tools.c index b11ffd07e4..ecf6bc2381 100644 --- a/misc/thread_tools.c +++ b/misc/thread_tools.c @@ -26,8 +26,10 @@ #endif #include "common/common.h" +#include "misc/linked_list.h" #include "osdep/atomic.h" #include "osdep/io.h" +#include "osdep/timer.h" #include "thread_tools.h" @@ -74,45 +76,118 @@ bool mp_waiter_poll(struct mp_waiter *waiter) return r; } -#ifndef __MINGW32__ struct mp_cancel { + pthread_mutex_t lock; + pthread_cond_t wakeup; + + // Semaphore state and "mirrors". atomic_bool triggered; + void (*cb)(void *ctx); + void *cb_ctx; int wakeup_pipe[2]; + void *win32_event; // actually HANDLE + + // Slave list. These are automatically notified as well. + struct { + struct mp_cancel *head, *tail; + } slaves; + + // For slaves. Synchronization is managed by parent.lock! + struct mp_cancel *parent; + struct { + struct mp_cancel *next, *prev; + } siblings; }; static void cancel_destroy(void *p) { struct mp_cancel *c = p; + + assert(!c->slaves.head); // API user error + + // We can access c->parent without synchronization, because: + // - since c is being destroyed, nobody can explicitly remove it as slave + // at the same time + // - c->parent needs to stay valid as long as the slave exists + if (c->parent) + mp_cancel_remove_slave(c->parent, c); + if (c->wakeup_pipe[0] >= 0) { close(c->wakeup_pipe[0]); close(c->wakeup_pipe[1]); } + +#ifdef __MINGW32__ + if (c->win32_event) + CloseHandle(c->win32_event); +#endif + + pthread_mutex_destroy(&c->lock); + pthread_cond_destroy(&c->wakeup); } struct mp_cancel *mp_cancel_new(void *talloc_ctx) { struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c); talloc_set_destructor(c, cancel_destroy); - *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)}; - mp_make_wakeup_pipe(c->wakeup_pipe); + *c = (struct mp_cancel){ + .triggered = ATOMIC_VAR_INIT(false), + .wakeup_pipe = {-1, -1}, + }; + pthread_mutex_init(&c->lock, NULL); + pthread_cond_init(&c->wakeup, NULL); return c; } +static void trigger_locked(struct mp_cancel *c) +{ + atomic_store(&c->triggered, true); + + pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered + + if (c->cb) + c->cb(c->cb_ctx); + + for (struct mp_cancel *sub = c->slaves.head; sub; sub = sub->siblings.next) + mp_cancel_trigger(sub); + + if (c->wakeup_pipe[1] >= 0) + (void)write(c->wakeup_pipe[1], &(char){0}, 1); + +#ifdef __MINGW32__ + if (c->win32_event) + SetEvent(c->win32_event); +#endif +} + void mp_cancel_trigger(struct mp_cancel *c) { - atomic_store(&c->triggered, true); - (void)write(c->wakeup_pipe[1], &(char){0}, 1); + pthread_mutex_lock(&c->lock); + trigger_locked(c); + pthread_mutex_unlock(&c->lock); } void mp_cancel_reset(struct mp_cancel *c) { + pthread_mutex_lock(&c->lock); + atomic_store(&c->triggered, false); - // Flush it fully. - while (1) { - int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256); - if (r <= 0 && !(r < 0 && errno == EINTR)) - break; + + if (c->wakeup_pipe[0] >= 0) { + // Flush it fully. + while (1) { + int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256); + if (r <= 0 && !(r < 0 && errno == EINTR)) + break; + } } + +#ifdef __MINGW32__ + if (c->win32_event) + ResetEvent(c->win32_event); +#endif + + pthread_mutex_unlock(&c->lock); } bool mp_cancel_test(struct mp_cancel *c) @@ -122,69 +197,78 @@ bool mp_cancel_test(struct mp_cancel *c) bool mp_cancel_wait(struct mp_cancel *c, double timeout) { - struct pollfd fd = { .fd = c->wakeup_pipe[0], .events = POLLIN }; - poll(&fd, 1, timeout * 1000); - return fd.revents & POLLIN; + struct timespec ts = mp_rel_time_to_timespec(timeout); + pthread_mutex_lock(&c->lock); + while (!mp_cancel_test(c)) { + if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts)) + break; + } + pthread_mutex_unlock(&c->lock); + + return mp_cancel_test(c); +} + +// If a new notification mechanism was added, and the mp_cancel state was +// already triggered, make sure the newly added mechanism is also triggered. +static void retrigger_locked(struct mp_cancel *c) +{ + if (mp_cancel_test(c)) + trigger_locked(c); +} + +void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx) +{ + pthread_mutex_lock(&c->lock); + c->cb = cb; + c->cb_ctx = ctx; + retrigger_locked(c); + pthread_mutex_unlock(&c->lock); +} + +void mp_cancel_add_slave(struct mp_cancel *c, struct mp_cancel *slave) +{ + pthread_mutex_lock(&c->lock); + assert(!slave->parent); + slave->parent = c; + LL_APPEND(siblings, &c->slaves, slave); + retrigger_locked(c); + pthread_mutex_unlock(&c->lock); +} + +void mp_cancel_remove_slave(struct mp_cancel *c, struct mp_cancel *slave) +{ + pthread_mutex_lock(&c->lock); + if (slave->parent) { + assert(slave->parent == c); + slave->parent = NULL; + LL_REMOVE(siblings, &c->slaves, slave); + } + pthread_mutex_unlock(&c->lock); } int mp_cancel_get_fd(struct mp_cancel *c) { + pthread_mutex_lock(&c->lock); + if (c->wakeup_pipe[0] < 0) { + mp_make_wakeup_pipe(c->wakeup_pipe); + retrigger_locked(c); + } + pthread_mutex_unlock(&c->lock); + + return c->wakeup_pipe[0]; } -#else - -struct mp_cancel { - atomic_bool triggered; - HANDLE event; -}; - -static void cancel_destroy(void *p) -{ - struct mp_cancel *c = p; - CloseHandle(c->event); -} - -struct mp_cancel *mp_cancel_new(void *talloc_ctx) -{ - struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c); - talloc_set_destructor(c, cancel_destroy); - *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)}; - c->event = CreateEventW(NULL, TRUE, FALSE, NULL); - return c; -} - -void mp_cancel_trigger(struct mp_cancel *c) -{ - atomic_store(&c->triggered, true); - SetEvent(c->event); -} - -void mp_cancel_reset(struct mp_cancel *c) -{ - atomic_store(&c->triggered, false); - ResetEvent(c->event); -} - -bool mp_cancel_test(struct mp_cancel *c) -{ - return c ? atomic_load_explicit(&c->triggered, memory_order_relaxed) : false; -} - -bool mp_cancel_wait(struct mp_cancel *c, double timeout) -{ - return WaitForSingleObject(c->event, timeout < 0 ? INFINITE : timeout * 1000) - == WAIT_OBJECT_0; -} - +#ifdef __MINGW32__ void *mp_cancel_get_event(struct mp_cancel *c) { - return c->event; -} + pthread_mutex_lock(&c->lock); + if (!c->win32_event) { + c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL); + retrigger_locked(c); + } + pthread_mutex_unlock(&c->lock); -int mp_cancel_get_fd(struct mp_cancel *c) -{ - return -1; + return c->win32_event; } - #endif diff --git a/misc/thread_tools.h b/misc/thread_tools.h index a734ac85b0..2198181e6c 100644 --- a/misc/thread_tools.h +++ b/misc/thread_tools.h @@ -59,6 +59,23 @@ bool mp_cancel_wait(struct mp_cancel *c, double timeout); // Restore original state. (Allows reusing a mp_cancel.) void mp_cancel_reset(struct mp_cancel *c); +// Add a callback to invoke when mp_cancel gets triggered. If it's already +// triggered, call it from mp_cancel_add_cb() directly. May be called multiple +// times even if the trigger state changes; not called if it resets. In all +// cases, this may be called with internal locks held (either in mp_cancel, or +// other locks held by whoever calls mp_cancel_trigger()). +// There is only one callback. Create a slave mp_cancel to get a private one. +void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx); + +// If c gets triggered, automatically trigger slave. Trying to add a slave more +// than once or to multiple parents is undefined behavior. +// The parent mp_cancel must remain valid until the slave is manually removed +// or destroyed. Destroying a mp_cancel that still has slaves is an error. +void mp_cancel_add_slave(struct mp_cancel *c, struct mp_cancel *slave); + +// Undo mp_cancel_add_slave(). Ignores never added slaves for easier cleanup. +void mp_cancel_remove_slave(struct mp_cancel *c, struct mp_cancel *slave); + // win32 "Event" HANDLE that indicates the current mp_cancel state. void *mp_cancel_get_event(struct mp_cancel *c);