command: add a way to abort asynchronous commands

Many asynchronous commands are potentially long running operations, such
as loading something from network or running a foreign process.
Obviously it shouldn't just be possible for them to freeze the player if
they don't terminate as expected. Also, there will be situations where
you want to explicitly stop some of those operations explicitly. So add
an infrastructure for this.

Commands have to support this explicitly. The next commit uses this to
actually add support to a command.
This commit is contained in:
wm4 2018-05-12 18:46:37 +02:00
parent ce1f5e78c2
commit e4fb23ed7d
12 changed files with 169 additions and 9 deletions

View File

@ -36,6 +36,7 @@ API changes
1.102 - redo handling of async commands
- add mpv_event_command and make it possible to return values from
commands issued with mpv_command_async() or mpv_command_node_async()
- add mpv_abort_async_command()
1.101 - add MPV_RENDER_PARAM_ADVANCED_CONTROL and related API
- add MPV_RENDER_PARAM_NEXT_FRAME_INFO and related symbols
- add MPV_RENDER_PARAM_BLOCK_FOR_TARGET_TIME

View File

@ -54,6 +54,10 @@ struct mp_cmd_def {
// unlocked, you have no synchronized access to mpctx, but you can do long
// running operations without blocking playback or input handling).
bool spawn_thread;
// If this is set, mp_cmd_ctx.abort is set. Set this if handler() can do
// asynchronous abort of the command, and explicitly uses mp_cmd_ctx.abort.
// (Not setting it when it's not needed can save resources.)
bool can_abort;
};
enum mp_cmd_flags {

View File

@ -1013,6 +1013,38 @@ int mpv_command_async(mpv_handle *ctx, uint64_t reply_userdata,
int mpv_command_node_async(mpv_handle *ctx, uint64_t reply_userdata,
mpv_node *args);
/**
* Signal to all async requests with the matching ID to abort. This affects
* the following API calls:
*
* mpv_command_async
* mpv_command_node_async
*
* All of these functions take a reply_userdata parameter. This API function
* tells all requests with the matching reply_userdata value to try to return
* as soon as possible. If there are multiple requests with matching ID, it
* aborts all of them.
*
* This API function is mostly asynchronous itself. It will not wait until the
* command is aborted. Instead, the command will terminate as usual, but with
* some work not done. How this is signaled depends on the specific command (for
* example, the "subprocess" command will indicate it by "killed_by_us" set to
* true in the result). How long it takes also depends on the situation. The
* aborting process is completely asynchronous.
*
* Not all commands may support this functionality. In this case, this function
* will have no effect. The same is true if the request using the passed
* reply_userdata has already terminated, has not been started yet, or was
* never in use at all.
*
* You have to be careful of race conditions: the time during which the abort
* request will be effective is _after_ e.g. mpv_command_async() has returned,
* and before the command has signaled completion with MPV_EVENT_COMMAND_REPLY.
*
* @param reply_userdata ID of the request to be aborted (see above)
*/
void mpv_abort_async_command(mpv_handle *ctx, uint64_t reply_userdata);
/**
* Set a property to a given value. Properties are essentially variables which
* can be queried or set at runtime. For example, writing to the pause property

View File

@ -1,3 +1,4 @@
mpv_abort_async_command
mpv_client_api_version
mpv_client_name
mpv_command

View File

@ -1063,9 +1063,14 @@ static int run_client_command(mpv_handle *ctx, struct mp_cmd *cmd, mpv_node *res
lock_core(ctx);
if (async) {
run_command(ctx->mpctx, req.cmd, NULL, NULL);
run_command(ctx->mpctx, cmd, NULL, NULL, NULL);
} else {
run_command(ctx->mpctx, req.cmd, cmd_complete, &req);
struct mp_abort_entry *abort = NULL;
if (cmd->def->can_abort) {
abort = talloc_zero(NULL, struct mp_abort_entry);
abort->client = ctx;
}
run_command(ctx->mpctx, cmd, abort, cmd_complete, &req);
}
unlock_core(ctx);
@ -1129,9 +1134,17 @@ static void async_cmd_fn(void *data)
ta_xset_parent(cmd, NULL);
req->cmd = NULL;
struct mp_abort_entry *abort = NULL;
if (cmd->def->can_abort) {
abort = talloc_zero(NULL, struct mp_abort_entry);
abort->client = req->reply_ctx;
abort->client_work_type = MPV_EVENT_COMMAND_REPLY;
abort->client_work_id = req->userdata;
}
// This will synchronously or asynchronously call cmd_complete (depending
// on the command).
run_command(req->mpctx, cmd, async_cmd_complete, req);
run_command(req->mpctx, cmd, abort, async_cmd_complete, req);
}
static int run_async_cmd(mpv_handle *ctx, uint64_t ud, struct mp_cmd *cmd)
@ -1163,6 +1176,23 @@ int mpv_command_node_async(mpv_handle *ctx, uint64_t ud, mpv_node *args)
return run_async_cmd(ctx, ud, mp_input_parse_cmd_node(ctx->log, args));
}
void mpv_abort_async_command(mpv_handle *ctx, uint64_t reply_userdata)
{
struct MPContext *mpctx = ctx->mpctx;
pthread_mutex_lock(&mpctx->abort_lock);
for (int n = 0; n < mpctx->num_abort_list; n++) {
struct mp_abort_entry *abort = mpctx->abort_list[n];
if (abort->client == ctx &&
abort->client_work_type == MPV_EVENT_COMMAND_REPLY &&
abort->client_work_id == reply_userdata)
{
mp_abort_trigger_locked(mpctx, abort);
}
}
pthread_mutex_unlock(&mpctx->abort_lock);
}
static int translate_property_error(int errc)
{
switch (errc) {

View File

@ -4865,7 +4865,7 @@ static void continue_cmd_list(struct cmd_list_ctx *list)
if (sub->flags & MP_ASYNC_CMD) {
// We run it "detached" (fire & forget)
run_command(list->mpctx, sub, NULL, NULL);
run_command(list->mpctx, sub, NULL, NULL, NULL);
} else {
// Run the next command once this one completes.
@ -4873,7 +4873,7 @@ static void continue_cmd_list(struct cmd_list_ctx *list)
list->current_valid = true;
list->current = pthread_self();
run_command(list->mpctx, sub, on_cmd_list_sub_completion, list);
run_command(list->mpctx, sub, NULL, on_cmd_list_sub_completion, list);
list->current_valid = false;
@ -4920,8 +4920,9 @@ void mp_cmd_ctx_complete(struct mp_cmd_ctx *cmd)
mpv_free_node_contents(&cmd->result);
if (cmd->on_completion)
cmd->on_completion(cmd);
if (cmd->abort)
mp_abort_remove(cmd->mpctx, cmd->abort);
mpv_free_node_contents(&cmd->result);
talloc_free(cmd->cmd);
talloc_free(cmd);
}
@ -4949,27 +4950,40 @@ static void run_command_on_worker_thread(void *p)
// function returns (the caller is supposed to be able to handle both cases). In
// both cases, the callback will be called while the core is locked (i.e. you
// can access the core freely).
// If abort is non-NULL, then the caller creates the abort object. It must have
// been allocated with talloc. run_command() will register/unregister/destroy
// it. Must not be set if cmd->def->can_abort==false.
// on_completion_priv is copied to mp_cmd_ctx.on_completion_priv and can be
// accessed from the completion callback.
// The completion callback is invoked exactly once. If it's NULL, it's ignored.
// Ownership of cmd goes to the caller.
void run_command(struct MPContext *mpctx, struct mp_cmd *cmd,
struct mp_abort_entry *abort,
void (*on_completion)(struct mp_cmd_ctx *cmd),
void *on_completion_priv)
{
struct mp_cmd_ctx *ctx = talloc(NULL, struct mp_cmd_ctx);
*ctx = (struct mp_cmd_ctx){
.mpctx = mpctx,
.cmd = cmd,
.cmd = talloc_steal(ctx, cmd),
.args = cmd->args,
.num_args = cmd->nargs,
.priv = cmd->def->priv,
.abort = talloc_steal(ctx, abort),
.success = true,
.completed = true,
.on_completion = on_completion,
.on_completion_priv = on_completion_priv,
};
if (!ctx->abort && cmd->def->can_abort)
ctx->abort = talloc_zero(ctx, struct mp_abort_entry);
assert(cmd->def->can_abort == !!ctx->abort);
if (ctx->abort)
mp_abort_add(mpctx, ctx->abort);
struct MPOpts *opts = mpctx->opts;
ctx->on_osd = cmd->flags & MP_ON_OSD_FLAGS;
bool auto_osd = ctx->on_osd == MP_ON_OSD_AUTO;

View File

@ -45,6 +45,8 @@ struct mp_cmd_ctx {
bool bar_osd; // OSD bar requested
bool seek_msg_osd; // same as above, but for seek commands
bool seek_bar_osd;
// If mp_cmd_def.can_abort is set, this will be set.
struct mp_abort_entry *abort;
// Return values (to be set by command implementation, read by the
// completion callback).
bool success; // true by default
@ -64,6 +66,7 @@ struct mp_cmd_ctx {
};
void run_command(struct MPContext *mpctx, struct mp_cmd *cmd,
struct mp_abort_entry *abort,
void (*on_completion)(struct mp_cmd_ctx *cmd),
void *on_completion_priv);
void mp_cmd_ctx_complete(struct mp_cmd_ctx *cmd);

View File

@ -442,6 +442,8 @@ typedef struct MPContext {
// --- The following fields are protected by abort_lock
struct mp_cancel *demuxer_cancel; // cancel handle for MPContext.demuxer
struct mp_abort_entry **abort_list;
int num_abort_list;
// --- Owned by MPContext
pthread_t open_thread;
@ -459,6 +461,20 @@ typedef struct MPContext {
int open_res_error;
} MPContext;
// Contains information about an asynchronous work item, how it can be aborted,
// and when. All fields are protected by MPContext.abort_lock.
struct mp_abort_entry {
// General conditions.
bool coupled_to_playback; // trigger when playback is terminated
// Actual trigger to abort the work.
struct mp_cancel *cancel;
// For client API.
struct mpv_handle *client; // non-NULL if done by a client API user
int client_work_type; // client API type, e.h. MPV_EVENT_COMMAND_REPLY
uint64_t client_work_id; // client API user reply_userdata value
// (only valid if client_work_type set)
};
// audio.c
void reset_audio_state(struct MPContext *mpctx);
void reinit_audio_chain(struct MPContext *mpctx);
@ -488,6 +504,12 @@ struct playlist_entry *mp_check_playlist_resume(struct MPContext *mpctx,
// loadfile.c
void mp_abort_playback_async(struct MPContext *mpctx);
void mp_abort_add(struct MPContext *mpctx, struct mp_abort_entry *abort);
void mp_abort_remove(struct MPContext *mpctx, struct mp_abort_entry *abort);
void mp_abort_recheck_locked(struct MPContext *mpctx,
struct mp_abort_entry *abort);
void mp_abort_trigger_locked(struct MPContext *mpctx,
struct mp_abort_entry *abort);
void uninit_player(struct MPContext *mpctx, unsigned int mask);
int mp_add_external_file(struct MPContext *mpctx, char *filename,
enum stream_type filter);

View File

@ -67,11 +67,62 @@ void mp_abort_playback_async(struct MPContext *mpctx)
mp_cancel_trigger(mpctx->playback_abort);
pthread_mutex_lock(&mpctx->abort_lock);
if (mpctx->demuxer_cancel)
mp_cancel_trigger(mpctx->demuxer_cancel);
for (int n = 0; n < mpctx->num_abort_list; n++) {
struct mp_abort_entry *abort = mpctx->abort_list[n];
if (abort->coupled_to_playback)
mp_abort_trigger_locked(mpctx, abort);
}
pthread_mutex_unlock(&mpctx->abort_lock);
}
// Add it to the global list, and allocate required data structures.
void mp_abort_add(struct MPContext *mpctx, struct mp_abort_entry *abort)
{
pthread_mutex_lock(&mpctx->abort_lock);
assert(!abort->cancel);
abort->cancel = mp_cancel_new(NULL);
MP_TARRAY_APPEND(NULL, mpctx->abort_list, mpctx->num_abort_list, abort);
mp_abort_recheck_locked(mpctx, abort);
pthread_mutex_unlock(&mpctx->abort_lock);
}
// Remove Add it to the global list, and free/clear required data structures.
// Does not deallocate the abort value itself.
void mp_abort_remove(struct MPContext *mpctx, struct mp_abort_entry *abort)
{
pthread_mutex_lock(&mpctx->abort_lock);
for (int n = 0; n < mpctx->num_abort_list; n++) {
if (mpctx->abort_list[n] == abort) {
MP_TARRAY_REMOVE_AT(mpctx->abort_list, mpctx->num_abort_list, n);
TA_FREEP(&abort->cancel);
abort = NULL; // it's not free'd, just clear for the assert below
break;
}
}
assert(!abort); // should have been in the list
pthread_mutex_unlock(&mpctx->abort_lock);
}
// Verify whether the abort needs to be signaled after changing certain fields
// in abort.
void mp_abort_recheck_locked(struct MPContext *mpctx,
struct mp_abort_entry *abort)
{
if (abort->coupled_to_playback && mp_cancel_test(mpctx->playback_abort))
mp_abort_trigger_locked(mpctx, abort);
}
void mp_abort_trigger_locked(struct MPContext *mpctx,
struct mp_abort_entry *abort)
{
mp_cancel_trigger(abort->cancel);
}
static void uninit_demuxer(struct MPContext *mpctx)
{
for (int r = 0; r < NUM_PTRACKS; r++) {

View File

@ -189,6 +189,8 @@ void mp_destroy(struct MPContext *mpctx)
uninit_libav(mpctx->global);
mp_msg_uninit(mpctx->global);
assert(!mpctx->num_abort_list);
talloc_free(mpctx->abort_list);
pthread_mutex_destroy(&mpctx->abort_lock);
talloc_free(mpctx);
}

View File

@ -110,7 +110,7 @@ void mp_process_input(struct MPContext *mpctx)
mp_cmd_t *cmd = mp_input_read_cmd(mpctx->input);
if (!cmd)
break;
run_command(mpctx, cmd, NULL, NULL);
run_command(mpctx, cmd, NULL, NULL, NULL);
}
mp_set_timeout(mpctx, mp_input_get_delay(mpctx->input));
}

View File

@ -519,7 +519,7 @@ void screenshot_flip(struct MPContext *mpctx)
struct mp_waiter wait = MP_WAITER_INITIALIZER;
void *a[] = {mpctx, &wait};
run_command(mpctx, mp_cmd_clone(ctx->each_frame), screenshot_fin, a);
run_command(mpctx, mp_cmd_clone(ctx->each_frame), NULL, screenshot_fin, a);
// Block (in a reentrant way) until he screenshot was written. Otherwise,
// we could pile up screenshot requests forever.