filter: simplify/fix external filter graph usage

There was the following problem: if a filter graph had asynchronous
filters, and the filter graph user did not call mp_filter_run() (and
only accessed the mp_pins), then filtering could stall, because using
mp_pin_out_request_data() only recursively invoked filtering if the
data_requested flag wasn't already set. The latter can happen if a
request was tried earlier but failed, and then an asynchronous filter
actually produced output that would satisfy the request. Obviously, it
has to invoke filtering again to get the requested frame.

Fix this by organizing the code differently, and making sure to invoke
mp_filter_run() on every request (if there's nothing to do, it doesn't
do anything anyway). Simplify it a bit by removing things which are not
needed, like connecting filter graphs with different root filters.
This commit is contained in:
wm4 2018-02-12 19:06:40 +01:00 committed by Kevin Mitchell
parent 0ec0c147ed
commit 251f4e5d77
1 changed files with 37 additions and 34 deletions

View File

@ -107,6 +107,13 @@ struct mp_filter_internal {
bool failed;
};
// Called when new work needs to be done on a pin belonging to the filter:
// - new data was requested
// - new data has been queued
// - or just an connect/disconnect/async notification happened
// This means the process function for this filter has to be called at some
// point in the future to continue filtering.
static void add_pending(struct mp_filter *f)
{
struct filter_runner *r = f->in->runner;
@ -118,42 +125,30 @@ static void add_pending(struct mp_filter *f)
// something naive and dumb does the job too.
f->in->pending = true;
MP_TARRAY_APPEND(r, r->pending, r->num_pending, f);
// Need to tell user that something changed.
if (f == r->root_filter)
r->external_pending = true;
}
// Called when new work needs to be done on a pin belonging to the filter:
// - new data was requested
// - new data has been queued
// - or just an connect/disconnect/async notification happened
// This means the process function for this filter has to be called next.
static void update_filter(struct mp_filter *src, struct mp_filter *f)
// Possibly enter recursive filtering. This is done as convenience for
// "external" filter users only. (Normal filtering does this iteratively via
// mp_filter_run() to avoid filter reentrancy issues and deep call stacks.) If
// the API users uses an external manually connected pin, do recursive filtering
// as a not strictly necessary feature which makes outside I/O with filters
// easier.
static void filter_recursive(struct mp_filter *f)
{
assert(f);
struct filter_runner *r = f->in->runner;
// Make sure the filter knows it has to make progress.
if (src->in->runner != r) {
// Connected to a different graph. The user has to drive those manually,
// and we simplify tell the user via the mp_filter_run() return value.
r->external_pending = true;
} else if (!f->in->pending) {
add_pending(f);
// Never do internal filtering recursively.
if (r->filtering)
return;
if (!r->filtering) {
// Likely the "outer" API user used an external manually connected
// pin, so do recursive filtering (as a not strictly necessary
// feature which makes outside I/O with filters easier).
// Also don't lose the pending state, which the user may or may not
// care about.
// Note that we must avoid calling this from within filtering,
// because that would make the process() functions recursive and
// reentrant (and hard to reason about).
r->external_pending |= mp_filter_run(r->root_filter);
}
// Need to tell user that something changed.
if (f == r->root_filter)
r->external_pending = true;
}
// Also don't lose the pending state, which the user may or may not
// care about.
r->external_pending |= mp_filter_run(r->root_filter);
}
void mp_filter_internal_mark_progress(struct mp_filter *f)
@ -182,6 +177,10 @@ bool mp_filter_run(struct mp_filter *filter)
{
struct filter_runner *r = filter->in->runner;
// (could happen with separate filter graphs calling each other, for now
// ignore this issue as we don't use such a setup anywhere)
assert(!r->filtering);
r->filtering = true;
flush_async_notifications(r);
@ -233,7 +232,8 @@ bool mp_pin_in_write(struct mp_pin *p, struct mp_frame frame)
assert(p->conn->data.type == MP_FRAME_NONE);
p->conn->data = frame;
p->conn->data_requested = false;
update_filter(p->owner, p->conn->manual_connection);
add_pending(p->conn->manual_connection);
filter_recursive(p->conn->manual_connection);
return true;
}
@ -248,9 +248,12 @@ bool mp_pin_out_request_data(struct mp_pin *p)
{
if (mp_pin_out_has_data(p))
return true;
if (p->conn && p->conn->manual_connection && !p->data_requested) {
p->data_requested = true;
update_filter(p->owner, p->conn->manual_connection);
if (p->conn && p->conn->manual_connection) {
if (!p->data_requested) {
p->data_requested = true;
add_pending(p->conn->manual_connection);
}
filter_recursive(p->conn->manual_connection);
}
return mp_pin_out_has_data(p);
}
@ -258,7 +261,7 @@ bool mp_pin_out_request_data(struct mp_pin *p)
void mp_pin_out_request_data_next(struct mp_pin *p)
{
if (mp_pin_out_request_data(p))
update_filter(p->owner, p->conn->manual_connection);
add_pending(p->conn->manual_connection);
}
struct mp_frame mp_pin_out_read(struct mp_pin *p)