demux: refactor cache range init/deinit

Remove the duplicated creation of the first range. Explicitly destroy
ranges, including the last one on final deinit.

It looks like this also fixes a leak of removed range structs, which was
never noticed because they're so small, and were freed on final deinit
due to having the demuxer as talloc parent.

This improves upon the previous commit too (that change should have
been part of it I guess). Sub-demuxers (demux_timeline only) now
automatically don't use the cache (like it was intended by the previous
commit). The cache is "initialized" (or disabled) last in the recursive
call chain, which is messy, but this sub demuxer stuff FUCKING SUCKS, as
mentioned in the previous commit message. This would be no problem if
the caching layer and actual demuxer implementations were separate.

Most of this change has no purpose. Might make (de-)initialization of
further cache exerpiments simpler.
This commit is contained in:
wm4 2019-06-08 20:32:15 +02:00
parent e8147843fc
commit fae31f39c7
3 changed files with 51 additions and 59 deletions

View File

@ -231,7 +231,8 @@ struct demux_internal {
size_t total_bytes; // total sum of packet data buffered
// Range from which decoder is reading, and to which demuxer is appending.
// This is never NULL. This is always ranges[num_ranges - 1].
// This is normally never NULL. This is always ranges[num_ranges - 1].
// This is can be NULL during initialization or deinitialization.
struct demux_cached_range *current_range;
double highest_av_pts; // highest non-subtitle PTS seen - for duration
@ -406,6 +407,7 @@ struct mp_packet_tags {
struct mp_tags *sh; // per sh_stream tags (e.g. OGG)
};
static void switch_to_fresh_cache_range(struct demux_internal *in);
static void demuxer_sort_chapters(demuxer_t *demuxer);
static void *demux_thread(void *pctx);
static void update_cache(struct demux_internal *in);
@ -746,17 +748,26 @@ static void clear_cached_range(struct demux_internal *in,
// ranges.
static void free_empty_cached_ranges(struct demux_internal *in)
{
assert(in->current_range && in->num_ranges > 0);
assert(in->current_range == in->ranges[in->num_ranges - 1]);
while (1) {
struct demux_cached_range *worst = NULL;
for (int n = in->num_ranges - 2; n >= 0; n--) {
int end = in->num_ranges - 1;
// (Not set during early init or late destruction.)
if (in->current_range) {
assert(in->current_range && in->num_ranges > 0);
assert(in->current_range == in->ranges[in->num_ranges - 1]);
end -= 1;
}
for (int n = end; n >= 0; n--) {
struct demux_cached_range *range = in->ranges[n];
if (range->seek_start == MP_NOPTS_VALUE || !in->seekable_cache) {
clear_cached_range(in, range);
MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
for (int i = 0; i < range->num_streams; i++)
talloc_free(range->streams[i]);
talloc_free(range);
} else {
if (!worst || (range->seek_end - range->seek_start <
worst->seek_end - worst->seek_start))
@ -764,7 +775,7 @@ static void free_empty_cached_ranges(struct demux_internal *in)
}
}
if (in->num_ranges <= MAX_SEEK_RANGES)
if (in->num_ranges <= MAX_SEEK_RANGES || !worst)
break;
clear_cached_range(in, worst);
@ -902,7 +913,7 @@ static void add_missing_streams(struct demux_internal *in,
for (int n = range->num_streams; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
struct demux_queue *queue = talloc_ptrtype(range, queue);
struct demux_queue *queue = talloc_ptrtype(NULL, queue);
*queue = (struct demux_queue){
.ds = ds,
.range = range,
@ -978,10 +989,12 @@ static void demux_add_sh_stream_locked(struct demux_internal *in,
MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
assert(in->streams[sh->index] == sh);
for (int n = 0; n < in->num_ranges; n++)
add_missing_streams(in, in->ranges[n]);
if (in->current_range) {
for (int n = 0; n < in->num_ranges; n++)
add_missing_streams(in, in->ranges[n]);
sh->ds->queue = in->current_range->streams[sh->ds->index];
sh->ds->queue = in->current_range->streams[sh->ds->index];
}
update_stream_selection_state(in, sh->ds);
@ -1076,6 +1089,7 @@ int demux_get_num_stream(struct demuxer *demuxer)
return r;
}
// It's UB to call anything but demux_dealloc() on the demuxer after this.
static void demux_shutdown(struct demux_internal *in)
{
struct demuxer *demuxer = in->d_user;
@ -1093,6 +1107,8 @@ static void demux_shutdown(struct demux_internal *in)
demux_flush(demuxer);
assert(in->total_bytes == 0);
in->current_range = NULL;
free_empty_cached_ranges(in);
if (in->owns_stream)
free_stream(demuxer->stream);
demuxer->stream = NULL;
@ -2874,13 +2890,6 @@ static struct demuxer *open_given_type(struct mpv_global *global,
pthread_mutex_init(&in->lock, NULL);
pthread_cond_init(&in->wakeup, NULL);
in->current_range = talloc_ptrtype(in, in->current_range);
*in->current_range = (struct demux_cached_range){
.seek_start = MP_NOPTS_VALUE,
.seek_end = MP_NOPTS_VALUE,
};
MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, in->current_range);
*in->d_thread = *demuxer;
in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags);
@ -2942,6 +2951,8 @@ static struct demuxer *open_given_type(struct mpv_global *global,
in->min_secs = 0;
in->max_bytes = 1;
}
switch_to_fresh_cache_range(in);
demuxer = sub ? sub : demuxer;
// Let this demuxer free demuxer->stream. Timeline sub-demuxers can
// share a stream, and in these cases the demux_timeline instance
@ -3081,23 +3092,27 @@ static void switch_current_range(struct demux_internal *in,
set_current_range(in, range);
// Remove packets which can't be used when seeking back to the range.
for (int n = 0; n < in->num_streams; n++) {
struct demux_queue *queue = old->streams[n];
if (old) {
// Remove packets which can't be used when seeking back to the range.
for (int n = 0; n < in->num_streams; n++) {
struct demux_queue *queue = old->streams[n];
// Remove all packets from head up until including next_prune_target.
while (queue->next_prune_target)
remove_head_packet(queue);
}
// Remove all packets from head up until including next_prune_target.
while (queue->next_prune_target)
remove_head_packet(queue);
}
// Exclude weird corner cases that break resuming.
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
// This is needed to resume or join the range at all.
if (ds->selected && !(ds->global_correct_dts || ds->global_correct_pos)) {
MP_VERBOSE(in, "discarding unseekable range due to stream %d\n", n);
clear_cached_range(in, old);
break;
// Exclude weird corner cases that break resuming.
for (int n = 0; n < in->num_streams; n++) {
struct demux_stream *ds = in->streams[n]->ds;
// This is needed to resume or join the range at all.
if (ds->selected && !(ds->global_correct_dts ||
ds->global_correct_pos))
{
MP_VERBOSE(in, "discarding unseekable range due to stream %d\n", n);
clear_cached_range(in, old);
break;
}
}
}
@ -3300,12 +3315,12 @@ static void execute_cache_seek(struct demux_internal *in,
// demuxer cache is disabled, merely reset the current range to a blank state.
static void switch_to_fresh_cache_range(struct demux_internal *in)
{
if (!in->seekable_cache) {
if (!in->seekable_cache && in->current_range) {
clear_cached_range(in, in->current_range);
return;
}
struct demux_cached_range *range = talloc_ptrtype(in, range);
struct demux_cached_range *range = talloc_ptrtype(NULL, range);
*range = (struct demux_cached_range){
.seek_start = MP_NOPTS_VALUE,
.seek_end = MP_NOPTS_VALUE,
@ -3595,24 +3610,6 @@ int demuxer_add_chapter(demuxer_t *demuxer, char *name,
return demuxer->num_chapters - 1;
}
void demux_disable_cache(demuxer_t *demuxer)
{
struct demux_internal *in = demuxer->in;
assert(demuxer == in->d_user);
pthread_mutex_lock(&in->lock);
if (in->seekable_cache) {
MP_VERBOSE(demuxer, "disabling persistent packet cache\n");
in->seekable_cache = false;
// Get rid of potential buffered ranges floating around.
free_empty_cached_ranges(in);
// Get rid of potential old packets in the current range.
prune_old_packets(in);
}
pthread_mutex_unlock(&in->lock);
}
// Disallow reading any packets and make readers think there is no new data
// yet, until a seek is issued.
void demux_block_reading(struct demuxer *demuxer, bool block)

View File

@ -291,7 +291,6 @@ void demux_close_stream(struct demuxer *demuxer);
void demux_metadata_changed(demuxer_t *demuxer);
void demux_update(demuxer_t *demuxer);
void demux_disable_cache(demuxer_t *demuxer);
bool demux_is_network_cached(demuxer_t *demuxer);
void demux_report_unbuffered_read_bytes(struct demuxer *demuxer, int64_t new);

View File

@ -217,10 +217,8 @@ static void reopen_lazy_segments(struct demuxer *demuxer,
demuxer->cancel, demuxer->global);
if (!src->current->d && !demux_cancel_test(demuxer))
MP_ERR(demuxer, "failed to load segment\n");
if (src->current->d) {
demux_disable_cache(src->current->d);
if (src->current->d)
update_slave_stats(demuxer, src->current->d);
}
associate_streams(demuxer, src, src->current);
}
@ -593,10 +591,8 @@ static bool add_tl(struct demuxer *demuxer, struct timeline_par *tl)
// demux_timeline already does caching, doing it for the sub-demuxers
// would be pointless and wasteful.
if (part->source) {
demux_disable_cache(part->source);
if (part->source)
demuxer->is_network |= part->source->is_network;
}
struct segment *seg = talloc_ptrtype(src, seg);
*seg = (struct segment){