client API: rewrite property observation (again)

I intend to rewrite this code approximately every 2 months.

Last time, I did this in commit d66eb93e5d (and 065c307e8e and
b2006eeb74). It was intended to remove the roundabout synchronous
thread "ping pong" when observing properties. At first, the original
async. code was replaced with some nice mostly synchronous code. But
then an async. code path had to be added for vo_libmpv, and finally the
sync. code was dropped because it broke in other obscure cases (like the
Objective-C Cocoa backend).

Try again. This time, update properties entirely on the main thread.
Updates get batched out on every playloop iteration. (At first I wanted
it to make it every time the player goes to sleep, but that might starve
API clients if the playloop get saturated.) One nice thing is that
clients only get woken up once all changed events have been sent, which
might reduce overhead.

While this sounds simple, it's not. The main problem is that reading
properties must not block the client API, i.e. no client API locks can
be held while reading the property. Maybe eventually we can avoid this
requirement, but currently it's just a fact. This means we have to
iterate over all clients and then over all properties (of each client),
all while releasing all locks when updating a property. Solve this by
rechecking on each iteration whether the list changed, and if so,
aborting the iteration and redo it "next time".

High risk change, expect bugs such as crashes and missing property
updates.
This commit is contained in:
wm4 2019-12-19 11:11:51 +01:00
parent f719b63840
commit 2c5cb2c53f
3 changed files with 205 additions and 147 deletions

View File

@ -73,6 +73,10 @@ struct mp_client_api {
bool shutting_down; // do not allow new clients
bool have_terminator; // a client took over the role of destroying the core
bool terminate_core_thread; // make libmpv core thread exit
// This is incremented whenever the clients[] array above changes. This is
// used to safely unlock mp_client_api.lock while iterating the list of
// clients.
uint64_t clients_list_change_ts;
struct mp_custom_protocol *custom_protocols;
int num_custom_protocols;
@ -82,6 +86,7 @@ struct mp_client_api {
};
struct observe_property {
// -- immutable
struct mpv_handle *owner;
char *name;
int id; // ==mp_get_property_id(name)
@ -89,15 +94,14 @@ struct observe_property {
int64_t reply_id;
mpv_format format;
const struct m_option *type;
bool changed; // property change should be signaled to user
bool dead; // property unobserved while retrieving value
// -- protected by owner->lock
size_t refcount;
uint64_t change_ts; // logical timestamp incremented on each change
uint64_t value_ts; // logical timestamp for value contents
bool value_valid;
union m_option_value value;
uint64_t async_change_ts; // logical timestamp incremented on each change
uint64_t async_value_ts; // logical timestamp for async_value contents
bool async_updating; // if true, updating async_value_ts to change_ts
bool async_value_valid;
union m_option_value async_value;
uint64_t value_ret_ts; // logical timestamp of value returned to user
union m_option_value value_ret;
};
struct mpv_handle {
@ -110,6 +114,7 @@ struct mpv_handle {
// -- not thread-safe
struct mpv_event *cur_event;
struct mpv_event_property cur_property_event;
struct observe_property *cur_property;
pthread_mutex_t lock;
@ -134,11 +139,19 @@ struct mpv_handle {
int reserved_events; // number of entries reserved for replies
size_t async_counter; // pending other async events
bool choked; // recovering from queue overflow
bool destroying; // pending destruction; no API accesses allowed
struct observe_property **properties;
int num_properties;
int lowest_changed; // attempt at making change processing incremental
bool has_pending_properties; // (maybe) new property events (producer side)
bool new_property_events; // new property events (consumer side)
int cur_property_index; // round-robin for property events (consumer side)
uint64_t property_event_masks; // or-ed together event masks of all properties
// This is incremented whenever the properties[] array above changes. This
// is used to safely unlock mpv_handle.lock while reading a property. If
// the counter didn't change between unlock and relock, then it will assume
// the array did not change.
uint64_t properties_change_ts;
bool fuzzy_initialized; // see scripting.c wait_loaded()
bool is_weak; // can not keep core alive on its own
@ -150,6 +163,18 @@ static bool gen_log_message_event(struct mpv_handle *ctx);
static bool gen_property_change_event(struct mpv_handle *ctx);
static void notify_property_events(struct mpv_handle *ctx, uint64_t event_mask);
// Must be called with prop->owner->lock held.
static void prop_unref(struct observe_property *prop)
{
if (!prop)
return;
assert(prop->refcount > 0);
prop->refcount -= 1;
if (!prop->refcount)
talloc_free(prop);
}
void mp_clients_init(struct MPContext *mpctx)
{
mpctx->clients = talloc_ptrtype(NULL, mpctx->clients);
@ -256,6 +281,7 @@ struct mpv_handle *mp_new_client(struct mp_client_api *clients, const char *name
snprintf(client->name, sizeof(client->name), "%s", nname);
clients->clients_list_change_ts += 1;
MP_TARRAY_APPEND(clients, clients->clients, clients->num_clients, client);
if (clients->num_clients == 1 && !clients->mpctx->is_cli)
@ -410,6 +436,20 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate)
if (terminate)
mpv_command(ctx, (const char*[]){"quit", NULL});
pthread_mutex_lock(&ctx->lock);
ctx->destroying = true;
for (int n = 0; n < ctx->num_properties; n++)
prop_unref(ctx->properties[n]);
ctx->num_properties = 0;
ctx->properties_change_ts += 1;
prop_unref(ctx->cur_property);
ctx->cur_property = NULL;
pthread_mutex_unlock(&ctx->lock);
abort_async(mpctx, ctx, 0, 0);
// reserved_events equals the number of asynchronous requests that weren't
@ -424,6 +464,7 @@ static void mp_destroy_client(mpv_handle *ctx, bool terminate)
for (int n = 0; n < clients->num_clients; n++) {
if (clients->clients[n] == ctx) {
clients->clients_list_change_ts += 1;
MP_TARRAY_REMOVE_AT(clients->clients, clients->num_clients, n);
while (ctx->num_events) {
talloc_free(ctx->events[ctx->first_event].data);
@ -1415,11 +1456,11 @@ static void property_free(void *p)
{
struct observe_property *prop = p;
assert(!prop->async_updating);
assert(prop->refcount == 0);
if (prop->type) {
m_option_free(prop->type, &prop->value);
m_option_free(prop->type, &prop->async_value);
m_option_free(prop->type, &prop->value_ret);
}
}
@ -1434,6 +1475,7 @@ int mpv_observe_property(mpv_handle *ctx, uint64_t userdata,
return MPV_ERROR_PROPERTY_FORMAT;
pthread_mutex_lock(&ctx->lock);
assert(!ctx->destroying);
struct observe_property *prop = talloc_ptrtype(ctx, prop);
talloc_set_destructor(prop, property_free);
*prop = (struct observe_property){
@ -1444,35 +1486,33 @@ int mpv_observe_property(mpv_handle *ctx, uint64_t userdata,
.reply_id = userdata,
.format = format,
.type = type,
.changed = true,
.async_change_ts = 1,
.change_ts = 1, // force initial event
.refcount = 1,
};
ctx->properties_change_ts += 1;
MP_TARRAY_APPEND(ctx, ctx->properties, ctx->num_properties, prop);
ctx->property_event_masks |= prop->event_mask;
ctx->lowest_changed = 0;
ctx->new_property_events = true;
ctx->cur_property_index = 0;
ctx->has_pending_properties = true;
pthread_mutex_unlock(&ctx->lock);
mp_wakeup_core(ctx->mpctx);
return 0;
}
static void mark_property_changed(struct mpv_handle *client, int index)
{
struct observe_property *prop = client->properties[index];
prop->changed = true;
prop->async_change_ts += 1;
client->lowest_changed = MPMIN(client->lowest_changed, index);
}
int mpv_unobserve_property(mpv_handle *ctx, uint64_t userdata)
{
pthread_mutex_lock(&ctx->lock);
int count = 0;
for (int n = 0; n < ctx->num_properties; n++) {
for (int n = ctx->num_properties - 1; n >= 0; n--) {
struct observe_property *prop = ctx->properties[n];
// Perform actual removal of the property lazily to avoid creating
// dangling pointers and such.
if (prop->reply_id == userdata && !prop->dead) {
mark_property_changed(ctx, n);
prop->dead = true;
if (prop->reply_id == userdata) {
prop_unref(prop);
ctx->properties_change_ts += 1;
MP_TARRAY_REMOVE_AT(ctx->properties, ctx->num_properties, n);
ctx->cur_property_index = 0;
count++;
}
}
@ -1485,6 +1525,7 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name)
{
struct mp_client_api *clients = mpctx->clients;
int id = mp_get_property_id(mpctx, name);
bool any_pending = false;
pthread_mutex_lock(&clients->lock);
@ -1492,15 +1533,23 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name)
struct mpv_handle *client = clients->clients[n];
pthread_mutex_lock(&client->lock);
for (int i = 0; i < client->num_properties; i++) {
if (client->properties[i]->id == id)
mark_property_changed(client, i);
if (client->properties[i]->id == id) {
client->properties[i]->change_ts += 1;
client->has_pending_properties = true;
any_pending = true;
}
}
if (client->lowest_changed < client->num_properties)
wakeup_client(client);
pthread_mutex_unlock(&client->lock);
}
pthread_mutex_unlock(&clients->lock);
// If we're inside mp_dispatch_queue_process(), this will cause the playloop
// to be re-run (to get mp_client_send_property_changes() called). If we're
// inside the normal playloop, this does nothing, but the latter function
// will be called at the end of the playloop anyway.
if (any_pending)
mp_dispatch_adjust_timeout(mpctx->dispatch, 0);
}
// Mark properties as changed in reaction to specific events.
@ -1508,94 +1557,122 @@ void mp_client_property_change(struct MPContext *mpctx, const char *name)
static void notify_property_events(struct mpv_handle *ctx, uint64_t event_mask)
{
for (int i = 0; i < ctx->num_properties; i++) {
if (ctx->properties[i]->event_mask & event_mask)
mark_property_changed(ctx, i);
if (ctx->properties[i]->event_mask & event_mask) {
ctx->properties[i]->change_ts += 1;
ctx->has_pending_properties = true;
}
}
if (ctx->lowest_changed < ctx->num_properties)
// Note: assume this function is called from the playloop only (at least
// if called with events that trigger property changes).
}
// Call with ctx->lock held (only). May temporarily drop the lock.
static void send_client_property_changes(struct mpv_handle *ctx)
{
uint64_t cur_ts = ctx->properties_change_ts;
ctx->has_pending_properties = false;
for (int n = 0; n < ctx->num_properties; n++) {
struct observe_property *prop = ctx->properties[n];
if (prop->value_ts == prop->change_ts)
continue;
bool changed = false;
if (prop->format) {
const struct m_option *type = prop->type;
union m_option_value val = {0};
struct getproperty_request req = {
.mpctx = ctx->mpctx,
.name = prop->name,
.format = prop->format,
.data = &val,
};
// Temporarily unlock and read the property. The very important
// thing is that property getters can do whatever they want, _and_
// that they may wait on the client API user thread (if vo_libmpv
// or similar things are involved).
prop->refcount += 1; // keep prop alive (esp. prop->name)
ctx->async_counter += 1; // keep ctx alive
pthread_mutex_unlock(&ctx->lock);
getproperty_fn(&req);
pthread_mutex_lock(&ctx->lock);
ctx->async_counter -= 1;
prop_unref(prop);
// Set of observed properties was changed or something similar
// => start over, retry next time.
if (cur_ts != ctx->properties_change_ts || ctx->destroying) {
m_option_free(type, &val);
mp_wakeup_core(ctx->mpctx);
ctx->has_pending_properties = true;
break;
}
assert(prop->refcount > 0);
bool val_valid = req.status >= 0;
changed = prop->value_valid != val_valid;
if (prop->value_valid && val_valid)
changed = !equal_mpv_value(&prop->value, &val, prop->format);
if (prop->value_ts == 0)
changed = true; // initial event
prop->value_valid = val_valid;
if (changed && val_valid) {
// move val to prop->value
m_option_free(type, &prop->value);
memcpy(&prop->value, &val, type->type->size);
memset(&val, 0, type->type->size);
}
m_option_free(prop->type, &val);
} else {
changed = true;
}
if (changed) {
ctx->new_property_events = true;
} else if (prop->value_ret_ts == prop->value_ts) {
prop->value_ret_ts = prop->change_ts; // no change => no event
}
prop->value_ts = prop->change_ts;
}
if (ctx->destroying || ctx->new_property_events)
wakeup_client(ctx);
}
static void update_prop_async(void *p)
void mp_client_send_property_changes(struct MPContext *mpctx)
{
struct observe_property *prop = p;
struct mpv_handle *ctx = prop->owner;
struct mp_client_api *clients = mpctx->clients;
union m_option_value val = {0};
bool val_valid = false;
uint64_t value_ts;
pthread_mutex_lock(&clients->lock);
uint64_t cur_ts = clients->clients_list_change_ts;
pthread_mutex_lock(&ctx->lock);
value_ts = prop->async_change_ts;
assert(prop->async_updating);
pthread_mutex_unlock(&ctx->lock);
for (int n = 0; n < clients->num_clients; n++) {
struct mpv_handle *ctx = clients->clients[n];
struct getproperty_request req = {
.mpctx = ctx->mpctx,
.name = prop->name,
.format = prop->format,
.data = &val,
};
getproperty_fn(&req);
val_valid = req.status >= 0;
pthread_mutex_lock(&ctx->lock);
assert(prop->async_updating);
// Move to prop->async_value
m_option_free(prop->type, &prop->async_value);
memcpy(&prop->async_value, &val, prop->type->type->size);
prop->async_value_valid = val_valid;
prop->async_value_ts = value_ts;
prop->async_updating = false;
// Cause it to re-check the property.
prop->changed = true;
ctx->lowest_changed = 0;
ctx->async_counter -= 1;
wakeup_client(ctx);
pthread_mutex_unlock(&ctx->lock);
}
static bool update_prop(struct mpv_handle *ctx, struct observe_property *prop)
{
if (!prop->type)
return true;
if (prop->async_change_ts > prop->async_value_ts) {
if (!prop->async_updating) {
prop->async_updating = true;
ctx->async_counter += 1;
mp_dispatch_enqueue(ctx->mpctx->dispatch, update_prop_async, prop);
pthread_mutex_lock(&ctx->lock);
if (!ctx->has_pending_properties) {
pthread_mutex_unlock(&ctx->lock);
continue;
}
return false; // re-update later when the changed value comes in
}
union m_option_value val = {0};
bool val_valid = prop->async_value_valid;
m_option_copy(prop->type, &val, &prop->async_value);
bool changed = prop->value_valid != val_valid;
if (prop->value_valid && val_valid)
changed = !equal_mpv_value(&prop->value, &val, prop->format);
if (changed) {
prop->value_valid = val_valid;
if (val_valid) {
// move val to prop->value
m_option_free(prop->type, &prop->value);
memcpy(&prop->value, &val, prop->type->type->size);
val_valid = false;
// Keep ctx->lock locked (unlock order does not matter).
pthread_mutex_unlock(&clients->lock);
send_client_property_changes(ctx);
pthread_mutex_unlock(&ctx->lock);
pthread_mutex_lock(&clients->lock);
if (cur_ts != clients->clients_list_change_ts) {
// List changed; need to start over. Do it in the next iteration.
mp_wakeup_core(mpctx);
break;
}
}
if (val_valid)
m_option_free(prop->type, &val);
return changed;
pthread_mutex_unlock(&clients->lock);
}
// Set ctx->cur_event to a generated property change event, if there is any
@ -1605,62 +1682,40 @@ static bool gen_property_change_event(struct mpv_handle *ctx)
if (!ctx->mpctx->initialized)
return false;
*ctx->cur_event = (struct mpv_event){
.event_id = MPV_EVENT_NONE,
};
bool need_gc = false;
int start = ctx->lowest_changed;
ctx->lowest_changed = ctx->num_properties;
for (int n = start; n < ctx->num_properties; n++) {
struct observe_property *prop = ctx->properties[n];
if (prop->changed && n < ctx->lowest_changed)
ctx->lowest_changed = n;
bool updated = false;
if (prop->changed && !prop->dead) {
prop->changed = false;
updated = update_prop(ctx, prop);
while (1) {
if (ctx->cur_property_index >= ctx->num_properties) {
if (!ctx->new_property_events || !ctx->num_properties)
break;
ctx->new_property_events = false;
ctx->cur_property_index = 0;
}
if (prop->dead) {
need_gc = true;
} else if (updated) {
struct observe_property *prop = ctx->properties[ctx->cur_property_index++];
if (prop->value_ret_ts != prop->value_ts) {
prop->value_ret_ts = prop->value_ts;
prop_unref(ctx->cur_property);
ctx->cur_property = prop;
prop->refcount += 1;
if (prop->value_valid)
m_option_copy(prop->type, &prop->value_ret, &prop->value);
ctx->cur_property_event = (struct mpv_event_property){
.name = prop->name,
.format = prop->value_valid ? prop->format : 0,
.data = prop->value_valid ? &prop->value : NULL,
.data = prop->value_valid ? &prop->value_ret : NULL,
};
*ctx->cur_event = (struct mpv_event){
.event_id = MPV_EVENT_PROPERTY_CHANGE,
.reply_userdata = prop->reply_id,
.data = &ctx->cur_property_event,
};
break;
return true;
}
}
if (need_gc) {
// Remove entries which have the .dead flag set. The point of doing this
// here is to ensure that this does not conflict with update_prop(),
// and that a previously returned mpv_event struct pointing to removed
// property entries does not result in dangling pointers.
ctx->property_event_masks = 0;
ctx->lowest_changed = 0;
for (int n = ctx->num_properties - 1; n >= 0; n--) {
struct observe_property *prop = ctx->properties[n];
if (prop->dead) {
if (!prop->async_updating) {
MP_TARRAY_REMOVE_AT(ctx->properties, ctx->num_properties, n);
talloc_free(prop);
}
} else {
ctx->property_event_masks |= prop->event_mask;
}
}
}
return !!ctx->cur_event->event_id;
return false;
}
int mpv_hook_add(mpv_handle *ctx, uint64_t reply_userdata,

View File

@ -30,6 +30,7 @@ int mp_client_send_event(struct MPContext *mpctx, const char *client_name,
int mp_client_send_event_dup(struct MPContext *mpctx, const char *client_name,
int event, void *data);
void mp_client_property_change(struct MPContext *mpctx, const char *name);
void mp_client_send_property_changes(struct MPContext *mpctx);
struct mpv_handle *mp_new_client(struct mp_client_api *clients, const char *name);
void mp_client_set_weak(struct mpv_handle *ctx);

View File

@ -54,6 +54,8 @@
// mp_wait_events() was called.
void mp_wait_events(struct MPContext *mpctx)
{
mp_client_send_property_changes(mpctx);
bool sleeping = mpctx->sleeptime > 0;
if (sleeping)
MP_STATS(mpctx, "start sleep");