diff --git a/demux/demux.c b/demux/demux.c index f0512e5859..e8f9f0a1ea 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -223,8 +223,6 @@ struct demux_internal { int num_ranges; size_t total_bytes; // total sum of packet data buffered - size_t fw_bytes; // sum of forward packet data in current_range - // Range from which decoder is reading, and to which demuxer is appending. // This is never NULL. This is always ranges[num_ranges - 1]. struct demux_cached_range *current_range; @@ -278,6 +276,8 @@ struct demux_queue { struct demux_packet *next_prune_target; // cached value for faster pruning + uint64_t tail_cum_pos; // cumulative size including tail packet + bool correct_dts; // packet DTS is strictly monotonically increasing bool correct_pos; // packet pos is strictly monotonically increasing int64_t last_pos; // for determining correct_pos @@ -333,7 +333,6 @@ struct demux_stream { double last_br_ts; // timestamp of last packet bitrate was calculated size_t last_br_bytes; // summed packet sizes since last bitrate calculation double bitrate; - size_t fw_bytes; // total bytes of packets in buffer (forward) struct demux_packet *reader_head; // points at current decoder position bool skip_to_keyframe; bool attached_picture_added; @@ -392,12 +391,18 @@ static struct demux_packet *advance_reader_head(struct demux_stream *ds); static bool queue_seek(struct demux_internal *in, double seek_pts, int flags, bool clear_back_state); +static uint64_t get_foward_buffered_bytes(struct demux_stream *ds) +{ + if (!ds->reader_head) + return 0; + return ds->queue->tail_cum_pos - ds->reader_head->cum_pos; +} + #if 0 // very expensive check for redundant cached queue state static void check_queue_consistency(struct demux_internal *in) { - size_t total_bytes = 0; - size_t total_fw_bytes = 0; + uint64_t total_bytes = 0; assert(in->current_range && in->num_ranges > 0); assert(in->current_range == in->ranges[in->num_ranges - 1]); @@ -418,6 +423,7 @@ static void check_queue_consistency(struct demux_internal *in) bool kf_found = false; bool npt_found = false; int next_index = 0; + uint64_t queue_total_bytes = 0; for (struct demux_packet *dp = queue->head; dp; dp = dp->next) { is_forward |= dp == queue->ds->reader_head; kf_found |= dp == queue->keyframe_latest; @@ -425,6 +431,7 @@ static void check_queue_consistency(struct demux_internal *in) size_t bytes = demux_packet_estimate_total_size(dp); total_bytes += bytes; + queue_total_bytes += bytes; if (is_forward) { fw_bytes += bytes; assert(range == in->current_range); @@ -442,20 +449,24 @@ static void check_queue_consistency(struct demux_internal *in) assert(!queue->tail); assert(next_index == queue->num_index); + uint64_t queue_total_bytes2 = 0; + if (queue->head) + queue_total_bytes2 = queue->tail_cum_pos - queue->head->cum_pos; + + assert(queue_total_bytes == queue_total_bytes2); + // If the queue is currently used... if (queue->ds->queue == queue) { // ...reader_head and others must be in the queue. assert(is_forward == !!queue->ds->reader_head); assert(kf_found == !!queue->keyframe_latest); + uint64_t fw_bytes2 = get_foward_buffered_bytes(queue->ds); + assert(fw_bytes == fw_bytes2); } assert(npt_found == !!queue->next_prune_target); - total_fw_bytes += fw_bytes; - - if (range == in->current_range) { - assert(queue->ds->fw_bytes == fw_bytes); - } else { + if (range != in->current_range) { assert(fw_bytes == 0); } @@ -470,7 +481,6 @@ static void check_queue_consistency(struct demux_internal *in) } assert(in->total_bytes == total_bytes); - assert(in->fw_bytes == total_fw_bytes); } #endif @@ -524,15 +534,6 @@ static void mp_packet_tags_make_writable(struct mp_packet_tags **tags) *tags = new; } -static void recompute_buffers(struct demux_stream *ds) -{ - ds->fw_bytes = 0; - - for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) { - ds->fw_bytes += demux_packet_estimate_total_size(dp); - } -} - // (this doesn't do most required things for a switch, like updating ds->queue) static void set_current_range(struct demux_internal *in, struct demux_cached_range *range) @@ -625,7 +626,7 @@ broken: range->seek_start = range->seek_end = MP_NOPTS_VALUE; } -// Remove queue->head from the queue. Does not update in->fw_bytes. +// Remove queue->head from the queue. static void remove_head_packet(struct demux_queue *queue) { struct demux_packet *dp = queue->head; @@ -719,9 +720,7 @@ static void free_empty_cached_ranges(struct demux_internal *in) static void ds_clear_reader_queue_state(struct demux_stream *ds) { - ds->in->fw_bytes -= ds->fw_bytes; ds->reader_head = NULL; - ds->fw_bytes = 0; ds->eof = false; ds->need_wakeup = true; } @@ -1615,8 +1614,6 @@ static void attempt_range_joining(struct demux_internal *in) // Actually join the ranges. Now that we think it will work, mutate the // data associated with the current range. - in->fw_bytes = 0; - for (int n = 0; n < in->num_streams; n++) { struct demux_queue *q1 = in->current_range->streams[n]; struct demux_queue *q2 = next->streams[n]; @@ -1659,8 +1656,13 @@ static void attempt_range_joining(struct demux_internal *in) ds->reader_head = join_point; ds->skip_to_keyframe = false; - recompute_buffers(ds); - in->fw_bytes += ds->fw_bytes; + // Make the cum_pos values in all q2 packets continuous. + for (struct demux_packet *dp = join_point; dp; dp = dp->next) { + uint64_t next_pos = dp->next ? dp->next->cum_pos : q2->tail_cum_pos; + uint64_t size = next_pos - dp->cum_pos; + dp->cum_pos = q1->tail_cum_pos; + q1->tail_cum_pos += size; + } // For moving demuxer position. ds->refreshing = ds->selected; @@ -1803,10 +1805,8 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp) size_t bytes = demux_packet_estimate_total_size(dp); in->total_bytes += bytes; - if (ds->reader_head) { - ds->fw_bytes += bytes; - in->fw_bytes += bytes; - } + dp->cum_pos = queue->tail_cum_pos; + queue->tail_cum_pos += bytes; if (queue->tail) { // next packet in stream @@ -1834,9 +1834,10 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp) ds->base_ts = queue->last_ts; const char *num_pkts = queue->head == queue->tail ? "1" : ">1"; + uint64_t fw_bytes = get_foward_buffered_bytes(ds); MP_TRACE(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" " "[num=%s size=%zd]\n", stream_type_name(stream->type), - dp->len, dp->pts, dp->dts, dp->pos, num_pkts, ds->fw_bytes); + dp->len, dp->pts, dp->dts, dp->pos, num_pkts, (size_t)fw_bytes); adjust_seek_range_on_packet(ds, dp); @@ -1916,6 +1917,7 @@ static bool read_packet(struct demux_internal *in) // the minimum, or if a stream explicitly needs new packets. Also includes // safe-guards against packet queue overflow. bool read_more = false, prefetch_more = false, refresh_more = false; + uint64_t total_fw_bytes = 0; for (int n = 0; n < in->num_streams; n++) { struct demux_stream *ds = in->streams[n]->ds; if (ds->eager) { @@ -1929,11 +1931,12 @@ static bool read_packet(struct demux_internal *in) ds->queue->last_ts >= ds->base_ts && !in->back_demuxing) prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs; + total_fw_bytes += get_foward_buffered_bytes(ds); } MP_TRACE(in, "bytes=%zd, read_more=%d prefetch_more=%d, refresh_more=%d\n", - in->fw_bytes, read_more, prefetch_more, refresh_more); - if (in->fw_bytes >= in->max_bytes) { + (size_t)total_fw_bytes, read_more, prefetch_more, refresh_more); + if (total_fw_bytes >= in->max_bytes) { // if we hit the limit just by prefetching, simply stop prefetching if (!read_more) return false; @@ -1947,9 +1950,10 @@ static bool read_packet(struct demux_internal *in) for (struct demux_packet *dp = ds->reader_head; dp; dp = dp->next) num_pkts++; + uint64_t fw_bytes = get_foward_buffered_bytes(ds); MP_WARN(in, " %s/%d: %zd packets, %zd bytes%s%s\n", stream_type_name(ds->type), n, - num_pkts, ds->fw_bytes, + num_pkts, (size_t)fw_bytes, ds->eager ? "" : " (lazy)", ds->refreshing ? " (refreshing)" : ""); } @@ -2020,7 +2024,15 @@ static void prune_old_packets(struct demux_internal *in) // prune the oldest packet runs, as long as the total cache amount is too // big. size_t max_bytes = in->seekable_cache ? in->max_bytes_bw : 0; - while (in->total_bytes - in->fw_bytes > max_bytes) { + while (1) { + uint64_t fw_bytes = 0; + for (int n = 0; n < in->num_streams; n++) { + struct demux_stream *ds = in->streams[n]->ds; + fw_bytes += get_foward_buffered_bytes(ds); + } + if (in->total_bytes - fw_bytes <= max_bytes) + break; + // (Start from least recently used range.) struct demux_cached_range *range = in->ranges[0]; double earliest_ts = MP_NOPTS_VALUE; @@ -2197,11 +2209,6 @@ static struct demux_packet *advance_reader_head(struct demux_stream *ds) ds->reader_head = pkt->next; - // Update cached packet queue state. - size_t bytes = demux_packet_estimate_total_size(pkt); - ds->fw_bytes -= bytes; - ds->in->fw_bytes -= bytes; - ds->last_ret_pos = pkt->pos; ds->last_ret_dts = pkt->dts; @@ -2945,7 +2952,6 @@ static void clear_reader_state(struct demux_internal *in, in->d_user->filepos = -1; // implicitly synchronized in->blocked = false; in->need_back_seek = false; - assert(in->fw_bytes == 0); } // clear the packet queues @@ -3138,9 +3144,6 @@ static void execute_cache_seek(struct demux_internal *in, if (ds->reader_head) ds->base_ts = MP_PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts); - recompute_buffers(ds); - in->fw_bytes += ds->fw_bytes; - MP_VERBOSE(in, "seeking stream %d (%s) to ", n, stream_type_name(ds->type)); @@ -3626,7 +3629,6 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state * .ts_end = MP_NOPTS_VALUE, .ts_duration = -1, .total_bytes = in->total_bytes, - .fw_bytes = in->fw_bytes, .seeking = in->seeking_in_progress, .low_level_seeks = in->low_level_seeks, .ts_last = in->demux_ts, @@ -3641,6 +3643,7 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state * r->ts_end = MP_PTS_MAX(r->ts_end, ds->queue->last_ts); any_packets |= !!ds->reader_head; } + r->fw_bytes += get_foward_buffered_bytes(ds); } r->idle = (in->idle && !r->underrun) || r->eof; r->underrun &= !r->idle && in->threading; diff --git a/demux/packet.h b/demux/packet.h index 9e28ed52d0..6822bce2c1 100644 --- a/demux/packet.h +++ b/demux/packet.h @@ -48,6 +48,7 @@ typedef struct demux_packet { // private struct demux_packet *next; struct AVPacket *avpacket; // keep the buffer allocation and sidedata + uint64_t cum_pos; // demux.c internal: cumulative size until _start_ of pkt double kf_seek_pts; // demux.c internal: seek pts for keyframe range struct mp_packet_tags *metadata; // timed metadata (demux.c internal) } demux_packet_t;