hls source: download first playlist once

This commit is contained in:
aler9 2022-12-27 12:00:00 +00:00
parent 0c6385f0e8
commit 241c81dc56
2 changed files with 100 additions and 82 deletions

View File

@ -36,7 +36,7 @@ func findSegmentWithInvPosition(segments []*gm3u8.MediaSegment, pos int) *gm3u8.
func findSegmentWithID(seqNo uint64, segments []*gm3u8.MediaSegment, id uint64) (*gm3u8.MediaSegment, int) { func findSegmentWithID(seqNo uint64, segments []*gm3u8.MediaSegment, id uint64) (*gm3u8.MediaSegment, int) {
index := int(int64(id) - int64(seqNo)) index := int(int64(id) - int64(seqNo))
if (index) >= len(segments) { if index >= len(segments) {
return nil, 0 return nil, 0
} }
@ -139,13 +139,23 @@ func (d *clientDownloaderStream) run(ctx context.Context) error {
d.rp.add(proc) d.rp.add(proc)
} }
err := d.fillSegmentQueue(ctx, initialPlaylist, segmentQueue)
if err != nil {
return err
}
for { for {
ok := segmentQueue.waitUntilSizeIsBelow(ctx, 1) ok := segmentQueue.waitUntilSizeIsBelow(ctx, 1)
if !ok { if !ok {
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
} }
err := d.fillSegmentQueue(ctx, segmentQueue) pl, err := d.downloadPlaylist(ctx)
if err != nil {
return err
}
err = d.fillSegmentQueue(ctx, pl, segmentQueue)
if err != nil { if err != nil {
return err return err
} }
@ -204,12 +214,9 @@ func (d *clientDownloaderStream) downloadSegment(ctx context.Context,
return byts, nil return byts, nil
} }
func (d *clientDownloaderStream) fillSegmentQueue(ctx context.Context, segmentQueue *clientSegmentQueue) error { func (d *clientDownloaderStream) fillSegmentQueue(ctx context.Context,
pl, err := d.downloadPlaylist(ctx) pl *m3u8.MediaPlaylist, segmentQueue *clientSegmentQueue,
if err != nil { ) error {
return err
}
pl.Segments = pl.Segments[:segmentsLen(pl.Segments)] pl.Segments = pl.Segments[:segmentsLen(pl.Segments)]
var seg *gm3u8.MediaSegment var seg *gm3u8.MediaSegment

View File

@ -93,82 +93,53 @@ func writeTempFile(byts []byte) (string, error) {
return tmpf.Name(), nil return tmpf.Name(), nil
} }
func mpegtsSegment(w io.Writer) {
mux := astits.NewMuxer(context.Background(), w)
mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 256,
StreamType: astits.StreamTypeH264Video,
})
mux.SetPCRPID(256)
mux.WriteTables()
enc, _ := h264.AnnexBMarshal([][]byte{
{7, 1, 2, 3}, // SPS
{8}, // PPS
{5}, // IDR
})
mux.WriteData(&astits.MuxerData{
PID: 256,
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorBothPresent,
PTS: &astits.ClockReference{Base: 90000}, // +1 sec
DTS: &astits.ClockReference{Base: 0x1FFFFFFFF - 90000 + 1}, // -1 sec
},
StreamID: 224, // = video
},
Data: enc,
},
})
}
type testHLSServer struct { type testHLSServer struct {
s *http.Server s *http.Server
} }
func newTestHLSServer(ca string) (*testHLSServer, error) { func newTestHLSServer(router http.Handler, isTLS bool) (*testHLSServer, error) {
ln, err := net.Listen("tcp", "localhost:5780") ln, err := net.Listen("tcp", "localhost:5780")
if err != nil { if err != nil {
return nil, err return nil, err
} }
ts := &testHLSServer{} s := &testHLSServer{
s: &http.Server{Handler: router},
gin.SetMode(gin.ReleaseMode)
router := gin.New()
segment := "segment.ts"
if ca == "segment with query" {
segment = "segment.ts?key=val"
} }
router.GET("/stream.m3u8", func(ctx *gin.Context) { if isTLS {
cnt := `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-ALLOW-CACHE:NO
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXTINF:2,
` + segment + `
#EXT-X-ENDLIST
`
ctx.Writer.Header().Set("Content-Type", `application/x-mpegURL`)
io.Copy(ctx.Writer, bytes.NewReader([]byte(cnt)))
})
router.GET("/segment.ts", func(ctx *gin.Context) {
if ca == "segment with query" && ctx.Query("key") != "val" {
return
}
ctx.Writer.Header().Set("Content-Type", `video/MP2T`)
mux := astits.NewMuxer(context.Background(), ctx.Writer)
mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 256,
StreamType: astits.StreamTypeH264Video,
})
mux.SetPCRPID(256)
mux.WriteTables()
enc, _ := h264.AnnexBMarshal([][]byte{
{7, 1, 2, 3}, // SPS
{8}, // PPS
{5}, // IDR
})
mux.WriteData(&astits.MuxerData{
PID: 256,
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorBothPresent,
PTS: &astits.ClockReference{Base: 90000}, // +1 sec
DTS: &astits.ClockReference{Base: 0x1FFFFFFFF - 90000 + 1}, // -1 sec
},
StreamID: 224, // = video
},
Data: enc,
},
})
})
ts.s = &http.Server{Handler: router}
if ca == "tls" {
go func() { go func() {
serverCertFpath, err := writeTempFile(serverCert) serverCertFpath, err := writeTempFile(serverCert)
if err != nil { if err != nil {
@ -182,17 +153,17 @@ func newTestHLSServer(ca string) (*testHLSServer, error) {
} }
defer os.Remove(serverKeyFpath) defer os.Remove(serverKeyFpath)
ts.s.ServeTLS(ln, serverCertFpath, serverKeyFpath) s.s.ServeTLS(ln, serverCertFpath, serverKeyFpath)
}() }()
} else { } else {
go ts.s.Serve(ln) go s.s.Serve(ln)
} }
return ts, nil return s, nil
} }
func (ts *testHLSServer) close() { func (s *testHLSServer) close() {
ts.s.Shutdown(context.Background()) s.s.Shutdown(context.Background())
} }
func TestClient(t *testing.T) { func TestClient(t *testing.T) {
@ -202,9 +173,44 @@ func TestClient(t *testing.T) {
"segment with query", "segment with query",
} { } {
t.Run(ca, func(t *testing.T) { t.Run(ca, func(t *testing.T) {
ts, err := newTestHLSServer(ca) gin.SetMode(gin.ReleaseMode)
router := gin.New()
segment := "segment.ts"
if ca == "segment with query" {
segment = "segment.ts?key=val"
}
sent := false
router.GET("/stream.m3u8", func(ctx *gin.Context) {
if sent {
return
}
sent = true
ctx.Writer.Header().Set("Content-Type", `application/x-mpegURL`)
io.Copy(ctx.Writer, bytes.NewReader([]byte(`#EXTM3U
#EXT-X-VERSION:3
#EXT-X-ALLOW-CACHE:NO
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXTINF:2,
`+segment+`
#EXT-X-ENDLIST
`)))
})
router.GET("/segment.ts", func(ctx *gin.Context) {
if ca == "segment with query" {
require.Equal(t, "val", ctx.Query("key"))
}
ctx.Writer.Header().Set("Content-Type", `video/MP2T`)
mpegtsSegment(ctx.Writer)
})
s, err := newTestHLSServer(router, ca == "tls")
require.NoError(t, err) require.NoError(t, err)
defer ts.close() defer s.close()
packetRecv := make(chan struct{}) packetRecv := make(chan struct{})
@ -216,7 +222,12 @@ func TestClient(t *testing.T) {
c, err := NewClient( c, err := NewClient(
prefix+"://localhost:5780/stream.m3u8", prefix+"://localhost:5780/stream.m3u8",
"33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739", "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739",
func(*format.H264, *format.MPEG4Audio) error { func(videoTrack *format.H264, audioTrack *format.MPEG4Audio) error {
require.Equal(t, &format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}, videoTrack)
require.Equal(t, (*format.MPEG4Audio)(nil), audioTrack)
return nil return nil
}, },
func(pts time.Duration, nalus [][]byte) { func(pts time.Duration, nalus [][]byte) {
@ -237,7 +248,7 @@ func TestClient(t *testing.T) {
<-packetRecv <-packetRecv
c.Close() c.Close()
c.Wait() <-c.Wait()
}) })
} }
} }