audio/out/push: add mechanism for event-based waiting

Until now, we've always calculated a timeout based on a heuristic when
to refill the audio buffers. Allow AOs to do it completely event-based
by providing wait and wakeup callbacks.

This also shuffles around the heuristic used for other AOs, and there is
a minor possibility that behavior slightly changes in real-world cases.
But in general it should be much more robust now.

ao_pulse.c now makes use of event-based waiting. It already did before,
but the code for time-based waiting was also involved. This commit also
removes one awkward artifact of the PulseAudio API out of the generic
code: the callback asking for more data can be reentrant, and thus
requires a separate lock for waiting (or a recursive mutex).
This commit is contained in:
wm4 2014-05-29 23:56:48 +02:00
parent 35aba9675d
commit 5929dc458f
3 changed files with 145 additions and 78 deletions

View File

@ -24,6 +24,7 @@
#include <stdbool.h>
#include <string.h>
#include <stdint.h>
#include <pthread.h>
#include <pulse/pulseaudio.h>
@ -55,6 +56,11 @@ struct priv {
bool broken_pause;
int retval;
// for wakeup handling
pthread_mutex_t wakeup_lock;
pthread_cond_t wakeup;
int wakeup_status;
char *cfg_host;
char *cfg_sink;
int cfg_buffer;
@ -90,14 +96,38 @@ static void stream_state_cb(pa_stream *s, void *userdata)
}
}
static void wakeup(struct ao *ao)
{
struct priv *priv = ao->priv;
pthread_mutex_lock(&priv->wakeup_lock);
priv->wakeup_status = 1;
pthread_cond_signal(&priv->wakeup);
pthread_mutex_unlock(&priv->wakeup_lock);
}
static void stream_request_cb(pa_stream *s, size_t length, void *userdata)
{
struct ao *ao = userdata;
struct priv *priv = ao->priv;
ao_need_data(ao);
wakeup(ao);
pa_threaded_mainloop_signal(priv->mainloop, 0);
}
static int wait(struct ao *ao, pthread_mutex_t *lock)
{
struct priv *priv = ao->priv;
// We don't use this mutex, because pulse like to call stream_request_cb
// while we have the central mutex held.
pthread_mutex_unlock(lock);
pthread_mutex_lock(&priv->wakeup_lock);
while (!priv->wakeup_status)
pthread_cond_wait(&priv->wakeup, &priv->wakeup_lock);
priv->wakeup_status = 0;
pthread_mutex_unlock(&priv->wakeup_lock);
pthread_mutex_lock(lock);
return 0;
}
static void stream_latency_update_cb(pa_stream *s, void *userdata)
{
struct ao *ao = userdata;
@ -237,6 +267,9 @@ static void uninit(struct ao *ao)
pa_threaded_mainloop_free(priv->mainloop);
priv->mainloop = NULL;
}
pthread_cond_destroy(&priv->wakeup);
pthread_mutex_destroy(&priv->wakeup_lock);
}
static int init(struct ao *ao)
@ -249,6 +282,9 @@ static int init(struct ao *ao)
char *sink = priv->cfg_sink && priv->cfg_sink[0] ? priv->cfg_sink : NULL;
const char *version = pa_get_library_version();
pthread_mutex_init(&priv->wakeup_lock, NULL);
pthread_cond_init(&priv->wakeup, NULL);
ao->per_application_mixer = true;
priv->broken_pause = false;
@ -626,6 +662,8 @@ const struct ao_driver audio_out_pulse = {
.pause = pause,
.resume = resume,
.drain = drain,
.wait = wait,
.wakeup = wakeup,
.priv_size = sizeof(struct priv),
.priv_defaults = &(const struct priv) {
.cfg_buffer = 250,

View File

@ -20,6 +20,7 @@
#define MP_AO_INTERNAL_H_
#include <stdbool.h>
#include <pthread.h>
#include "audio/chmap.h"
#include "audio/chmap_sel.h"
@ -88,6 +89,8 @@ extern const struct ao_driver ao_api_pull;
* Optional:
* control
* drain
* wait
* wakeup
* b) ->play must be NULL. ->resume must be provided, and should make the
* audio API start calling the audio callback. Your audio callback should
* in turn call ao_read_data() to get audio data. Most functions are
@ -131,6 +134,20 @@ struct ao_driver {
float (*get_delay)(struct ao *ao);
// push based: block until all queued audio is played (optional)
void (*drain)(struct ao *ao);
// Wait until the audio buffer needs to be refilled. The lock is the
// internal mutex usually protecting the internal AO state (and used to
// protect driver calls), and must be temporarily unlocked while waiting.
// ->wakeup will be called (with lock held) if the wait should be canceled.
// Returns 0 on success, -1 on error.
// Optional; if this is not provided, generic code using audio timing is
// used to estimate when the AO needs to be refilled.
// Warning: it's only called if the feed thread truly needs to know when
// the audio thread takes data again. Often, it will just copy
// the complete soft-buffer to the AO, and then wait for the
// decoder instead. Don't do necessary work in this callback.
int (*wait)(struct ao *ao, pthread_mutex_t *lock);
// In combination with wait(). Lock may or may not be held.
void (*wakeup)(struct ao *ao);
// For option parsing (see vo.h)
int priv_size;
@ -141,7 +158,6 @@ struct ao_driver {
// These functions can be called by AOs.
int ao_play_silence(struct ao *ao, int samples);
void ao_need_data(struct ao *ao);
void ao_wait_drain(struct ao *ao);
int ao_read_data(struct ao *ao, void **data, int samples, int64_t out_time_us);

View File

@ -40,9 +40,6 @@
struct ao_push_state {
pthread_t thread;
pthread_mutex_t lock;
// uses a separate lock to avoid lock order issues with ao_need_data()
pthread_mutex_t wakeup_lock;
pthread_cond_t wakeup;
// --- protected by lock
@ -50,23 +47,25 @@ struct ao_push_state {
struct mp_audio_buffer *buffer;
bool terminate;
bool playing;
bool buffers_full;
bool avoid_ao_wait;
bool need_wakeup;
bool requested_data;
bool paused;
// Whether the current buffer contains the complete audio.
bool final_chunk;
double expected_end_time;
// -- protected by wakeup_lock
bool need_wakeup;
};
// lock must be held
static void wakeup_playthread(struct ao *ao)
{
struct ao_push_state *p = ao->api_priv;
pthread_mutex_lock(&p->wakeup_lock);
if (ao->driver->wakeup)
ao->driver->wakeup(ao);
p->need_wakeup = true;
pthread_cond_signal(&p->wakeup);
pthread_mutex_unlock(&p->wakeup_lock);
}
static int control(struct ao *ao, enum aocontrol cmd, void *arg)
@ -107,7 +106,7 @@ static void reset(struct ao *ao)
if (ao->driver->reset)
ao->driver->reset(ao);
mp_audio_buffer_clear(p->buffer);
p->playing = false;
p->paused = false;
wakeup_playthread(ao);
pthread_mutex_unlock(&p->lock);
}
@ -118,7 +117,7 @@ static void pause(struct ao *ao)
pthread_mutex_lock(&p->lock);
if (ao->driver->pause)
ao->driver->pause(ao);
p->playing = false;
p->paused = true;
wakeup_playthread(ao);
pthread_mutex_unlock(&p->lock);
}
@ -129,7 +128,7 @@ static void resume(struct ao *ao)
pthread_mutex_lock(&p->lock);
if (ao->driver->resume)
ao->driver->resume(ao);
p->playing = true; // tentatively
p->paused = false;
p->expected_end_time = 0;
wakeup_playthread(ao);
pthread_mutex_unlock(&p->lock);
@ -186,6 +185,10 @@ static int play(struct ao *ao, void **data, int samples, int flags)
int write_samples = mp_audio_buffer_get_write_available(p->buffer);
write_samples = MPMIN(write_samples, samples);
if (write_samples < samples)
flags = flags & ~AOPLAY_FINAL_CHUNK;
bool is_final = flags & AOPLAY_FINAL_CHUNK;
struct mp_audio audio;
mp_audio_buffer_get_format(p->buffer, &audio);
for (int n = 0; n < ao->num_planes; n++)
@ -193,92 +196,115 @@ static int play(struct ao *ao, void **data, int samples, int flags)
audio.samples = write_samples;
mp_audio_buffer_append(p->buffer, &audio);
p->final_chunk = !!(flags & AOPLAY_FINAL_CHUNK);
p->playing = true;
p->expected_end_time = 0;
bool got_data = write_samples > 0 || p->paused || p->final_chunk != is_final;
wakeup_playthread(ao);
p->expected_end_time = 0;
p->final_chunk = is_final;
p->paused = false;
// If we don't have new data, the decoder thread basically promises it
// will send new data as soon as it's available.
if (got_data) {
p->requested_data = false;
wakeup_playthread(ao);
}
pthread_mutex_unlock(&p->lock);
return write_samples;
}
// called locked
static int ao_play_data(struct ao *ao)
static void ao_play_data(struct ao *ao)
{
struct ao_push_state *p = ao->api_priv;
struct mp_audio data;
mp_audio_buffer_peek(p->buffer, &data);
int max = data.samples;
int space = ao->driver->get_space ? ao->driver->get_space(ao) : INT_MAX;
int space = ao->driver->get_space(ao);
space = MPMAX(space, 0);
if (data.samples > space)
data.samples = space;
if (data.samples <= 0)
return 0;
MP_STATS(ao, "start ao fill");
int flags = 0;
if (p->final_chunk && data.samples == max)
flags |= AOPLAY_FINAL_CHUNK;
int r = ao->driver->play(ao, data.planes, data.samples, flags);
MP_STATS(ao, "start ao fill");
int r = 0;
if (data.samples)
r = ao->driver->play(ao, data.planes, data.samples, flags);
MP_STATS(ao, "end ao fill");
if (r > data.samples) {
MP_WARN(ao, "Audio device returned non-sense value.\n");
r = data.samples;
}
if (r > 0)
mp_audio_buffer_skip(p->buffer, r);
mp_audio_buffer_skip(p->buffer, MPMAX(r, 0));
if (p->final_chunk && mp_audio_buffer_samples(p->buffer) == 0) {
p->playing = false;
p->expected_end_time = mp_time_sec() + AO_EOF_DELAY + 0.25; // + margin
if (ao->driver->get_delay)
p->expected_end_time += ao->driver->get_delay(ao);
}
MP_STATS(ao, "end ao fill");
return r;
// In both cases, we have to account for space!=0, but the AO not accepting
// any new data (due to rounding to period boundaries).
p->buffers_full = max >= space && r <= 0;
p->avoid_ao_wait = (max == 0 && space > 0) || p->paused;
// Probably can't copy the rest of the buffer due to period alignment.
if (!p->final_chunk && r <= 0 && space >= max && data.samples > 0)
p->avoid_ao_wait = true;
}
// Estimate when the AO needs data again.
static double ao_estimate_timeout(struct ao *ao)
{
struct ao_push_state *p = ao->api_priv;
double timeout = 0;
if (p->buffers_full && ao->driver->get_delay) {
timeout = ao->driver->get_delay(ao) - 0.050;
// Keep extra safety margin if the buffers are large
if (timeout > 0.100)
timeout = MPMAX(timeout - 0.200, 0.100);
}
return MPMAX(timeout, ao->device_buffer * 0.75 / ao->samplerate);
}
static void *playthread(void *arg)
{
struct ao *ao = arg;
struct ao_push_state *p = ao->api_priv;
while (1) {
pthread_mutex_lock(&p->lock);
if (p->terminate) {
pthread_mutex_unlock(&p->lock);
return NULL;
}
double timeout = 2.0;
if (p->playing) {
int r = ao_play_data(ao);
// The device buffers are not necessarily full, but writing to the
// AO buffer will wake up this thread anyway.
bool buffers_full = r <= 0;
// We have to estimate when the AO needs data again.
if (buffers_full && ao->driver->get_delay) {
float buffered_audio = ao->driver->get_delay(ao);
timeout = buffered_audio - 0.050;
// Keep extra safety margin if the buffers are large
if (timeout > 0.100)
timeout = MPMAX(timeout - 0.200, 0.100);
} else {
timeout = 0;
}
// Half of the buffer played -> wakeup playback thread to get more.
double min_wait = ao->device_buffer / (double)ao->samplerate;
if (timeout <= min_wait / 2 + 0.001 && unlocked_get_space(ao) > 0)
pthread_mutex_lock(&p->lock);
while (!p->terminate) {
ao_play_data(ao);
// Request new data from decoder if buffer goes below "full".
// Allow a small margin of missing data for AOs that use timeouts.
double margin = ao->driver->wait ? 0 : ao->device_buffer / 8;
if (!p->buffers_full && unlocked_get_space(ao) > margin) {
if (!p->requested_data)
mp_input_wakeup(ao->input_ctx);
// Avoid wasting CPU - this assumes ao_play_data() usually fills the
// audio buffer as far as possible, so even if the device buffer
// is not full, we can only wait for the core.
timeout = MPMAX(timeout, min_wait * 0.75);
p->requested_data = true;
}
if (!p->need_wakeup) {
MP_STATS(ao, "start audio wait");
if (p->avoid_ao_wait) {
// Avoid busy waiting, because the audio API will still report
// that it needs new data, even if we're not ready yet, or if
// get_space() decides that the amount of audio buffered in the
// device is enough, and p->buffer can be empty.
// The most important part is that the decoder is woken up, so
// that the decoder will wake up us in turn.
MP_TRACE(ao, "buffer inactive.\n");
mp_input_wakeup(ao->input_ctx);
pthread_cond_wait(&p->wakeup, &p->lock);
} else {
if (!ao->driver->wait || ao->driver->wait(ao, &p->lock) < 0) {
// Fallback to guessing.
double timeout = ao_estimate_timeout(ao);
mpthread_cond_timedwait_rel(&p->wakeup, &p->lock, timeout);
}
}
MP_STATS(ao, "end audio wait");
}
pthread_mutex_unlock(&p->lock);
MP_STATS(ao, "start audio wait");
pthread_mutex_lock(&p->wakeup_lock);
if (!p->need_wakeup)
mpthread_cond_timedwait_rel(&p->wakeup, &p->wakeup_lock, timeout);
p->need_wakeup = false;
pthread_mutex_unlock(&p->wakeup_lock);
MP_STATS(ao, "end audio wait");
}
pthread_mutex_unlock(&p->lock);
return NULL;
}
@ -297,7 +323,6 @@ static void uninit(struct ao *ao)
pthread_cond_destroy(&p->wakeup);
pthread_mutex_destroy(&p->lock);
pthread_mutex_destroy(&p->wakeup_lock);
}
static int init(struct ao *ao)
@ -305,7 +330,6 @@ static int init(struct ao *ao)
struct ao_push_state *p = ao->api_priv;
pthread_mutex_init(&p->lock, NULL);
pthread_mutex_init(&p->wakeup_lock, NULL);
pthread_cond_init(&p->wakeup, NULL);
p->buffer = mp_audio_buffer_create(ao);
@ -348,14 +372,3 @@ int ao_play_silence(struct ao *ao, int samples)
talloc_free(p);
return r;
}
// Notify the core that new data should be sent to the AO. Normally, the core
// uses a heuristic based on ao_delay() when to refill the buffers, but this
// can be used to reduce wait times. Can be called from any thread.
void ao_need_data(struct ao *ao)
{
assert(ao->api == &ao_api_push);
// wakeup the play thread at least once
wakeup_playthread(ao);
}