From 59e78cb028ab95822c782a1b7a2dd833fade3c50 Mon Sep 17 00:00:00 2001
From: wm4 <wm4@nowhere>
Date: Fri, 7 Sep 2018 15:12:24 +0200
Subject: [PATCH] demux: return packets directly from demuxer instead of using
 sh_stream

Preparation for other potential changes to separate demuxer cache/thread
and actual demuxers.

Most things are untested, but it seems to work somewhat.
---
 demux/demux.c          | 19 +++++++++------
 demux/demux.h          |  7 ++++--
 demux/demux_lavf.c     | 21 +++++++++--------
 demux/demux_mf.c       | 15 ++++++------
 demux/demux_mkv.c      | 52 ++++++++++++++++++++++++++++++------------
 demux/demux_raw.c      | 16 +++++++------
 demux/demux_timeline.c | 17 +++++++-------
 demux/packet.h         |  2 +-
 8 files changed, 93 insertions(+), 56 deletions(-)

diff --git a/demux/demux.c b/demux/demux.c
index b3ab812892..f3e72552f9 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -354,6 +354,7 @@ struct mp_packet_tags {
 static void demuxer_sort_chapters(demuxer_t *demuxer);
 static void *demux_thread(void *pctx);
 static void update_cache(struct demux_internal *in);
+static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp);
 
 #if 0
 // very expensive check for redundant cached queue state
@@ -1149,9 +1150,9 @@ void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp)
     dp->keyframe = true;
     dp->pts = MP_ADD_PTS(dp->pts, -in->ts_offset);
     dp->dts = MP_ADD_PTS(dp->dts, -in->ts_offset);
+    add_packet_locked(sh, dp);
     pthread_mutex_unlock(&in->lock);
 
-    demux_add_packet(sh, dp);
 }
 
 // Add the keyframe to the end of the index. Not all packets are actually added.
@@ -1397,15 +1398,14 @@ static void adjust_seek_range_on_packet(struct demux_stream *ds,
         attempt_range_joining(ds->in);
 }
 
-void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
+static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
 {
     struct demux_stream *ds = stream ? stream->ds : NULL;
-    if (!dp || !dp->len || !ds || demux_cancel_test(ds->in->d_thread)) {
+    if (!dp->len || demux_cancel_test(ds->in->d_thread)) {
         talloc_free(dp);
         return;
     }
     struct demux_internal *in = ds->in;
-    pthread_mutex_lock(&in->lock);
 
     in->initial_state = false;
 
@@ -1541,7 +1541,6 @@ void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
     }
 
     wakeup_ds(ds);
-    pthread_mutex_unlock(&in->lock);
 }
 
 // Returns true if there was "progress" (lock was released temporarily).
@@ -1613,14 +1612,20 @@ static bool read_packet(struct demux_internal *in)
     pthread_mutex_unlock(&in->lock);
 
     struct demuxer *demux = in->d_thread;
+    struct demux_packet *pkt = NULL;
 
     bool eof = true;
-    if (demux->desc->fill_buffer && !demux_cancel_test(demux))
-        eof = demux->desc->fill_buffer(demux) <= 0;
+    if (demux->desc->read_packet && !demux_cancel_test(demux))
+        eof = !demux->desc->read_packet(demux, &pkt);
     update_cache(in);
 
     pthread_mutex_lock(&in->lock);
 
+    if (pkt) {
+        assert(pkt->stream >= 0 && pkt->stream < in->num_streams);
+        add_packet_locked(in->streams[pkt->stream], pkt);
+    }
+
     if (!in->seeking) {
         if (eof) {
             for (int n = 0; n < in->num_streams; n++) {
diff --git a/demux/demux.h b/demux/demux.h
index 2fb3fbfb73..3ccdbe92f0 100644
--- a/demux/demux.h
+++ b/demux/demux.h
@@ -110,7 +110,11 @@ typedef struct demuxer_desc {
     // Return 0 on success, otherwise -1
     int (*open)(struct demuxer *demuxer, enum demux_check check);
     // The following functions are all optional
-    int (*fill_buffer)(struct demuxer *demuxer); // 0 on EOF, otherwise 1
+    // Try to read a packet. Return false on EOF. If true is returned, the
+    // demuxer may set *pkt to a new packet (the reference goes to the caller).
+    // If *pkt is NULL (the value when this function is called), the call
+    // will be repeated.
+    bool (*read_packet)(struct demuxer *demuxer, struct demux_packet **pkt);
     void (*close)(struct demuxer *demuxer);
     void (*seek)(struct demuxer *demuxer, double rel_seek_secs, int flags);
     int (*control)(struct demuxer *demuxer, int cmd, void *arg);
@@ -253,7 +257,6 @@ struct demux_free_async_state *demux_free_async(struct demuxer *demuxer);
 void demux_free_async_force(struct demux_free_async_state *state);
 bool demux_free_async_finish(struct demux_free_async_state *state);
 
-void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp);
 void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp);
 
 struct demux_packet *demux_read_packet(struct sh_stream *sh);
diff --git a/demux/demux_lavf.c b/demux/demux_lavf.c
index 3bb690643c..e2289295df 100644
--- a/demux/demux_lavf.c
+++ b/demux/demux_lavf.c
@@ -1039,7 +1039,8 @@ static int demux_open_lavf(demuxer_t *demuxer, enum demux_check check)
     return 0;
 }
 
-static int demux_lavf_fill_buffer(demuxer_t *demux)
+static bool demux_lavf_read_packet(struct demuxer *demux,
+                                   struct demux_packet **mp_pkt)
 {
     lavf_priv_t *priv = demux->priv;
 
@@ -1049,11 +1050,11 @@ static int demux_lavf_fill_buffer(demuxer_t *demux)
     if (r < 0) {
         av_packet_unref(pkt);
         if (r == AVERROR(EAGAIN))
-            return 1;
+            return true;
         if (r == AVERROR_EOF)
-            return 0;
+            return false;
         MP_WARN(demux, "error reading packet.\n");
-        return -1;
+        return false;
     }
 
     add_new_streams(demux);
@@ -1065,13 +1066,13 @@ static int demux_lavf_fill_buffer(demuxer_t *demux)
 
     if (!demux_stream_is_selected(stream)) {
         av_packet_unref(pkt);
-        return 1; // don't signal EOF if skipping a packet
+        return true; // don't signal EOF if skipping a packet
     }
 
     struct demux_packet *dp = new_demux_packet_from_avpacket(pkt);
     if (!dp) {
         av_packet_unref(pkt);
-        return 1;
+        return true;
     }
 
     if (pkt->pts != AV_NOPTS_VALUE)
@@ -1090,8 +1091,10 @@ static int demux_lavf_fill_buffer(demuxer_t *demux)
     if (priv->format_hack.clear_filepos)
         dp->pos = -1;
 
-    demux_add_packet(stream, dp);
-    return 1;
+    dp->stream = stream->index;
+
+    *mp_pkt = dp;
+    return true;
 }
 
 static void demux_seek_lavf(demuxer_t *demuxer, double seek_pts, int flags)
@@ -1194,7 +1197,7 @@ static void demux_close_lavf(demuxer_t *demuxer)
 const demuxer_desc_t demuxer_desc_lavf = {
     .name = "lavf",
     .desc = "libavformat",
-    .fill_buffer = demux_lavf_fill_buffer,
+    .read_packet = demux_lavf_read_packet,
     .open = demux_open_lavf,
     .close = demux_close_lavf,
     .seek = demux_seek_lavf,
diff --git a/demux/demux_mf.c b/demux/demux_mf.c
index c4995a66c5..7da07c793a 100644
--- a/demux/demux_mf.c
+++ b/demux/demux_mf.c
@@ -173,14 +173,12 @@ static void demux_seek_mf(demuxer_t *demuxer, double seek_pts, int flags)
     mf->curr_frame = newpos;
 }
 
-// return value:
-//     0 = EOF or no stream found
-//     1 = successfully read a packet
-static int demux_mf_fill_buffer(demuxer_t *demuxer)
+static bool demux_mf_read_packet(struct demuxer *demuxer,
+                                 struct demux_packet **pkt)
 {
     mf_t *mf = demuxer->priv;
     if (mf->curr_frame >= mf->nr_of_files)
-        return 0;
+        return false;
 
     struct stream *entry_stream = NULL;
     if (mf->streams)
@@ -201,7 +199,8 @@ static int demux_mf_fill_buffer(demuxer_t *demuxer)
                 memcpy(dp->buffer, data.start, data.len);
                 dp->pts = mf->curr_frame / mf->sh->codec->fps;
                 dp->keyframe = true;
-                demux_add_packet(mf->sh, dp);
+                dp->stream = mf->sh->index;
+                *pkt = dp;
             }
         }
         talloc_free(data.start);
@@ -211,7 +210,7 @@ static int demux_mf_fill_buffer(demuxer_t *demuxer)
         free_stream(stream);
 
     mf->curr_frame++;
-    return 1;
+    return true;
 }
 
 // map file extension/type to a codec name
@@ -350,7 +349,7 @@ static void demux_close_mf(demuxer_t *demuxer)
 const demuxer_desc_t demuxer_desc_mf = {
     .name = "mf",
     .desc = "image files (mf)",
-    .fill_buffer = demux_mf_fill_buffer,
+    .read_packet = demux_mf_read_packet,
     .open = demux_open_mf,
     .close = demux_close_mf,
     .seek = demux_seek_mf,
diff --git a/demux/demux_mkv.c b/demux/demux_mkv.c
index bad5af991b..ce4008ca4c 100644
--- a/demux/demux_mkv.c
+++ b/demux/demux_mkv.c
@@ -210,6 +210,10 @@ typedef struct mkv_demuxer {
     // temporary data, and not normally larger than 0 or 1 elements.
     struct block_info *blocks;
     int num_blocks;
+
+    // Packets to return.
+    struct demux_packet **packets;
+    int num_packets;
 } mkv_demuxer_t;
 
 #define OPT_BASE_STRUCT struct demux_mkv_opts
@@ -256,6 +260,17 @@ static void probe_first_timestamp(struct demuxer *demuxer);
 static int read_next_block_into_queue(demuxer_t *demuxer);
 static void free_block(struct block_info *block);
 
+static void add_packet(struct demuxer *demuxer, struct sh_stream *stream,
+                       struct demux_packet *pkt)
+{
+    mkv_demuxer_t *mkv_d = demuxer->priv;
+    if (!pkt)
+        return;
+
+    pkt->stream = stream->index;
+    MP_TARRAY_APPEND(mkv_d, mkv_d->packets, mkv_d->num_packets, pkt);
+}
+
 #define AAC_SYNC_EXTENSION_TYPE 0x02b7
 static int aac_get_sample_rate_index(uint32_t sample_rate)
 {
@@ -2311,7 +2326,7 @@ static bool handle_realaudio(demuxer_t *demuxer, mkv_track_t *track,
                 track->audio_timestamp[x * apk_usize / w];
             dp->pos = orig->pos + x;
             dp->keyframe = !x;   // Mark first packet as keyframe
-            demux_add_packet(track->stream, dp);
+            add_packet(demuxer, track->stream, dp);
         }
     }
 
@@ -2336,6 +2351,10 @@ static void mkv_seek_reset(demuxer_t *demuxer)
         free_block(&mkv_d->blocks[n]);
     mkv_d->num_blocks = 0;
 
+    for (int n = 0; n < mkv_d->num_packets; n++)
+        talloc_free(mkv_d->packets[n]);
+    mkv_d->num_packets = 0;
+
     mkv_d->skip_to_timecode = INT64_MIN;
 }
 
@@ -2439,7 +2458,7 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track,
             if (new) {
                 demux_packet_copy_attribs(new, dp);
                 talloc_free(dp);
-                demux_add_packet(stream, new);
+                add_packet(demuxer, stream, new);
                 return;
             }
         }
@@ -2454,7 +2473,7 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track,
             memcpy(new->buffer + 8, dp->buffer, dp->len);
             demux_packet_copy_attribs(new, dp);
             talloc_free(dp);
-            demux_add_packet(stream, new);
+            add_packet(demuxer, stream, new);
             return;
         }
     }
@@ -2468,7 +2487,7 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track,
     }
 
     if (!track->parse || !track->av_parser || !track->av_parser_codec) {
-        demux_add_packet(stream, dp);
+        add_packet(demuxer, stream, dp);
         return;
     }
 
@@ -2502,13 +2521,13 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track,
                 new->dts = track->av_parser->dts == AV_NOPTS_VALUE
                          ? MP_NOPTS_VALUE : track->av_parser->dts / tb;
             }
-            demux_add_packet(stream, new);
+            add_packet(demuxer, stream, new);
         }
         pts = dts = AV_NOPTS_VALUE;
     }
 
     if (dp->len) {
-        demux_add_packet(stream, dp);
+        add_packet(demuxer, stream, dp);
     } else {
         talloc_free(dp);
     }
@@ -2896,19 +2915,26 @@ static int read_next_block(demuxer_t *demuxer, struct block_info *block)
     return 1;
 }
 
-static int demux_mkv_fill_buffer(demuxer_t *demuxer)
+static bool demux_mkv_read_packet(struct demuxer *demuxer,
+                                  struct demux_packet **pkt)
 {
+    struct mkv_demuxer *mkv_d = demuxer->priv;
+
     for (;;) {
+        if (mkv_d->num_packets) {
+            *pkt = mkv_d->packets[0];
+            MP_TARRAY_REMOVE_AT(mkv_d->packets, mkv_d->num_packets, 0);
+            return true;
+        }
+
         int res;
         struct block_info block;
         res = read_next_block(demuxer, &block);
         if (res < 0)
-            return 0;
+            return false;
         if (res > 0) {
-            res = handle_block(demuxer, &block);
+            handle_block(demuxer, &block);
             free_block(&block);
-            if (res > 0)
-                return 1;
         }
     }
 }
@@ -3129,8 +3155,6 @@ static void demux_mkv_seek(demuxer_t *demuxer, double seek_pts, int flags)
     mkv_d->v_skip_to_keyframe = st_active[STREAM_VIDEO];
     mkv_d->a_skip_to_keyframe = st_active[STREAM_AUDIO];
     mkv_d->a_skip_preroll = mkv_d->a_skip_to_keyframe;
-
-    demux_mkv_fill_buffer(demuxer);
 }
 
 static void probe_last_timestamp(struct demuxer *demuxer, int64_t start_pos)
@@ -3237,7 +3261,7 @@ const demuxer_desc_t demuxer_desc_matroska = {
     .name = "mkv",
     .desc = "Matroska",
     .open = demux_mkv_open,
-    .fill_buffer = demux_mkv_fill_buffer,
+    .read_packet = demux_mkv_read_packet,
     .close = mkv_free,
     .seek = demux_mkv_seek,
     .load_timeline = build_ordered_chapter_timeline,
diff --git a/demux/demux_raw.c b/demux/demux_raw.c
index 32473bf9bc..3239b509bd 100644
--- a/demux/demux_raw.c
+++ b/demux/demux_raw.c
@@ -267,17 +267,17 @@ static int demux_rawvideo_open(demuxer_t *demuxer, enum demux_check check)
     return generic_open(demuxer);
 }
 
-static int raw_fill_buffer(demuxer_t *demuxer)
+static bool raw_read_packet(struct demuxer *demuxer, struct demux_packet **pkt)
 {
     struct priv *p = demuxer->priv;
 
     if (demuxer->stream->eof)
-        return 0;
+        return false;
 
     struct demux_packet *dp = new_demux_packet(p->frame_size * p->read_frames);
     if (!dp) {
         MP_ERR(demuxer, "Can't read packet.\n");
-        return 1;
+        return true;
     }
 
     dp->pos = stream_tell(demuxer->stream);
@@ -285,9 +285,11 @@ static int raw_fill_buffer(demuxer_t *demuxer)
 
     int len = stream_read(demuxer->stream, dp->buffer, dp->len);
     demux_packet_shorten(dp, len);
-    demux_add_packet(p->sh, dp);
 
-    return 1;
+    dp->stream = p->sh->index;
+    *pkt = dp;
+
+    return true;
 }
 
 static void raw_seek(demuxer_t *demuxer, double seek_pts, int flags)
@@ -310,7 +312,7 @@ const demuxer_desc_t demuxer_desc_rawaudio = {
     .name = "rawaudio",
     .desc = "Uncompressed audio",
     .open = demux_rawaudio_open,
-    .fill_buffer = raw_fill_buffer,
+    .read_packet = raw_read_packet,
     .seek = raw_seek,
 };
 
@@ -318,6 +320,6 @@ const demuxer_desc_t demuxer_desc_rawvideo = {
     .name = "rawvideo",
     .desc = "Uncompressed video",
     .open = demux_rawvideo_open,
-    .fill_buffer = raw_fill_buffer,
+    .read_packet = raw_read_packet,
     .seek = raw_seek,
 };
diff --git a/demux/demux_timeline.c b/demux/demux_timeline.c
index 1eb73956c3..c34619a6d2 100644
--- a/demux/demux_timeline.c
+++ b/demux/demux_timeline.c
@@ -222,7 +222,7 @@ static void d_seek(struct demuxer *demuxer, double seek_pts, int flags)
     switch_segment(demuxer, new, pts, flags, false);
 }
 
-static int d_fill_buffer(struct demuxer *demuxer)
+static bool d_read_packet(struct demuxer *demuxer, struct demux_packet **out_pkt)
 {
     struct priv *p = demuxer->priv;
 
@@ -231,7 +231,7 @@ static int d_fill_buffer(struct demuxer *demuxer)
 
     struct segment *seg = p->current;
     if (!seg || !seg->d)
-        return 0;
+        return false;
 
     struct demux_packet *pkt = demux_read_any_packet(seg->d);
     if (!pkt || pkt->pts >= seg->end)
@@ -267,9 +267,9 @@ static int d_fill_buffer(struct demuxer *demuxer)
             }
         }
         if (!next)
-            return 0;
+            return false;
         switch_segment(demuxer, next, next->start, 0, true);
-        return 1; // reader will retry
+        return true; // reader will retry
     }
 
     if (pkt->stream < 0 || pkt->stream > seg->num_stream_map)
@@ -308,12 +308,13 @@ static int d_fill_buffer(struct demuxer *demuxer)
         }
     }
 
-    demux_add_packet(vs->sh, pkt);
-    return 1;
+    pkt->stream = vs->sh->index;
+    *out_pkt = pkt;
+    return true;
 
 drop:
     talloc_free(pkt);
-    return 1;
+    return true;
 }
 
 static void print_timeline(struct demuxer *demuxer)
@@ -447,7 +448,7 @@ static int d_control(struct demuxer *demuxer, int cmd, void *arg)
 const demuxer_desc_t demuxer_desc_timeline = {
     .name = "timeline",
     .desc = "timeline segments",
-    .fill_buffer = d_fill_buffer,
+    .read_packet = d_read_packet,
     .open = d_open,
     .close = d_close,
     .seek = d_seek,
diff --git a/demux/packet.h b/demux/packet.h
index 4d34b3d766..7c5b04720f 100644
--- a/demux/packet.h
+++ b/demux/packet.h
@@ -33,7 +33,7 @@ typedef struct demux_packet {
     bool keyframe;
 
     int64_t pos;        // position in source file byte stream
-    int stream;         // source stream index
+    int stream;         // source stream index (typically sh_stream.index)
 
     // segmentation (ordered chapters, EDL)
     bool segmented;