filter: add functions to suspend filtering temporarily

Filtering is integrated into an event loop, which is something the
filter API user provides. To make interacting with the event loop
easier, and in particular to avoid filtering to block event handling,
add functions the event loop code can suspend filtering.

While we cannot actually suspend a single filter, it's pretty easy to
suspend the filter graph run loop itself, which is responsible for
selecting which filter to run next.

This commit shouldn't change behavior at all, but the functions will be
used in later commits.
This commit is contained in:
wm4 2020-03-05 21:18:15 +01:00
parent 670610bc1d
commit 8a1bd15216
2 changed files with 66 additions and 1 deletions

View File

@ -1,8 +1,11 @@
#include <math.h>
#include <pthread.h>
#include "common/common.h"
#include "common/global.h"
#include "common/msg.h"
#include "osdep/atomic.h"
#include "osdep/timer.h"
#include "video/hwdec.h"
#include "filter.h"
@ -65,6 +68,9 @@ struct filter_runner {
struct mp_filter *root_filter;
double max_run_time;
atomic_bool interrupt_flag;
// If we're currently running the filter graph (for avoiding recursion).
bool filtering;
@ -177,6 +183,10 @@ bool mp_filter_run(struct mp_filter *filter)
{
struct filter_runner *r = filter->in->runner;
int64_t end_time = 0;
if (isfinite(r->max_run_time))
end_time = mp_add_timeout(mp_time_us(), MPMAX(r->max_run_time, 0));
// (could happen with separate filter graphs calling each other, for now
// ignore this issue as we don't use such a setup anywhere)
assert(!r->filtering);
@ -187,13 +197,30 @@ bool mp_filter_run(struct mp_filter *filter)
// to queue a wakeup again later. So do not call this in the loop.
flush_async_notifications(r);
while (r->num_pending) {
while (1) {
if (atomic_exchange_explicit(&r->interrupt_flag, false,
memory_order_acq_rel))
{
pthread_mutex_lock(&r->async_lock);
if (!r->async_wakeup_sent && r->wakeup_cb)
r->wakeup_cb(r->wakeup_ctx);
r->async_wakeup_sent = true;
pthread_mutex_unlock(&r->async_lock);
break;
}
if (!r->num_pending)
break;
struct mp_filter *next = r->pending[r->num_pending - 1];
r->num_pending -= 1;
next->in->pending = false;
if (next->in->info->process)
next->in->info->process(next);
if (end_time && mp_time_us() >= end_time)
mp_filter_graph_interrupt(r->root_filter);
}
r->filtering = false;
@ -644,6 +671,18 @@ void mp_filter_mark_async_progress(struct mp_filter *f)
filter_wakeup(f, true);
}
void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds)
{
struct filter_runner *r = f->in->runner;
r->max_run_time = seconds;
}
void mp_filter_graph_interrupt(struct mp_filter *f)
{
struct filter_runner *r = f->in->runner;
atomic_store(&r->interrupt_flag, true);
}
void mp_filter_free_children(struct mp_filter *f)
{
while(f->in->num_children)
@ -717,6 +756,7 @@ struct mp_filter *mp_filter_create_with_params(struct mp_filter_params *params)
*f->in->runner = (struct filter_runner){
.global = params->global,
.root_filter = f,
.max_run_time = INFINITY,
};
pthread_mutex_init(&f->in->runner->async_lock, NULL);
}

View File

@ -409,8 +409,31 @@ struct AVBufferRef *mp_filter_load_hwdec_device(struct mp_filter *f, int avtype)
// Perform filtering. This runs until the filter graph is blocked (due to
// missing external input or unread output). It returns whether any outside
// pins have changed state.
// Note: this always operates on the filter graph associated with f, f itself
// is not treated differently from any other filters in the graph.
bool mp_filter_run(struct mp_filter *f);
// Set the maximum time mp_filter_run() should block. If the maximum time
// expires, the effect is the same as calling mp_filter_graph_interrupt() while
// the function is running. See that function for further details.
// The default is seconds==INFINITY. Values <=0 make it return after 1 iteration.
void mp_filter_graph_set_max_run_time(struct mp_filter *f, double seconds);
// Interrupt mp_filter_run() asynchronously. This does not stop filtering in a
// destructive way, but merely suspends it. In practice, this will make
// mp_filter_run() return after the current filter's process() function has
// finished. Filtering can be resumed with subsequent mp_filter_run() calls.
// When mp_filter_run() is interrupted, it will trigger the filter graph wakeup
// callback, which in turn ensures that the user will call mp_filter_run() again.
// If it is called if not in mp_filter_run(), the next mp_filter_run() call is
// interrupted and no filtering is done for that call.
// Calling this too often will starve filtering.
// This does not call the graph wakeup callback directly, which will avoid
// potential reentrancy issues. (But mp_filter_run() will call it in reaction to
// it, as described above.)
// Explicitly thread-safe.
void mp_filter_graph_interrupt(struct mp_filter *f);
// Create a root dummy filter with no inputs or outputs. This fulfills the
// following functions:
// - passing it as parent filter to top-level filters
@ -428,6 +451,8 @@ struct mp_filter *mp_filter_create_root(struct mpv_global *global);
// user's thread to call mp_filter_run() again.
// The wakeup callback must not recursively call into any filter APIs, or do
// blocking waits on the filter API (deadlocks will happen).
// A wakeup callback should always set a "wakeup" flag, that is reset only when
// mp_filter_run() is going to be called again with no wait time.
void mp_filter_root_set_wakeup_cb(struct mp_filter *root,
void (*wakeup_cb)(void *ctx), void *ctx);