diff --git a/internal/playback/muxer.go b/internal/playback/muxer.go index 2582980b..a000b758 100644 --- a/internal/playback/muxer.go +++ b/internal/playback/muxer.go @@ -1,7 +1,9 @@ package playback +import "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + type muxer interface { - writeInit(init []byte) + writeInit(init *fmp4.Init) setTrack(trackID int) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error writeFinalDTS(dts int64) diff --git a/internal/playback/muxer_fmp4.go b/internal/playback/muxer_fmp4.go index ae1c7c0b..5e8685a2 100644 --- a/internal/playback/muxer_fmp4.go +++ b/internal/playback/muxer_fmp4.go @@ -8,13 +8,16 @@ import ( "github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer" ) -var partSize = durationGoToMp4(1*time.Second, fmp4Timescale) +const ( + partSize = 1 * time.Second +) type muxerFMP4Track struct { - id int - firstDTS int64 - lastDTS int64 - samples []*fmp4.PartSample + id int + timeScale uint32 + firstDTS int64 + lastDTS int64 + samples []*fmp4.PartSample } func findTrack(tracks []*muxerFMP4Track, id int) *muxerFMP4Track { @@ -29,26 +32,29 @@ func findTrack(tracks []*muxerFMP4Track, id int) *muxerFMP4Track { type muxerFMP4 struct { w io.Writer - init []byte + init *fmp4.Init nextSequenceNumber uint32 tracks []*muxerFMP4Track curTrack *muxerFMP4Track outBuf seekablebuffer.Buffer } -func (w *muxerFMP4) writeInit(init []byte) { +func (w *muxerFMP4) writeInit(init *fmp4.Init) { w.init = init + + w.tracks = make([]*muxerFMP4Track, len(init.Tracks)) + + for i, track := range init.Tracks { + w.tracks[i] = &muxerFMP4Track{ + id: track.ID, + timeScale: track.TimeScale, + firstDTS: -1, + } + } } func (w *muxerFMP4) setTrack(trackID int) { w.curTrack = findTrack(w.tracks, trackID) - if w.curTrack == nil { - w.curTrack = &muxerFMP4Track{ - id: trackID, - firstDTS: -1, - } - w.tracks = append(w.tracks, w.curTrack) - } } func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool, payload []byte) error { @@ -75,7 +81,9 @@ func (w *muxerFMP4) writeSample(dts int64, ptsOffset int32, isNonSyncSample bool }) w.curTrack.lastDTS = dts - if (w.curTrack.lastDTS - w.curTrack.firstDTS) > int64(partSize) { + partSizeMP4 := durationGoToMp4(partSize, w.curTrack.timeScale) + + if (w.curTrack.lastDTS - w.curTrack.firstDTS) > partSizeMP4 { err := w.innerFlush(false) if err != nil { return err @@ -141,11 +149,18 @@ func (w *muxerFMP4) innerFlush(final bool) error { w.nextSequenceNumber++ if w.init != nil { - _, err := w.w.Write(w.init) + err := w.init.Marshal(&w.outBuf) if err != nil { return err } + + _, err = w.w.Write(w.outBuf.Bytes()) + if err != nil { + return err + } + w.init = nil + w.outBuf.Reset() } err := part.Marshal(&w.outBuf) diff --git a/internal/playback/on_get.go b/internal/playback/on_get.go index 6611c1be..5f41dbf0 100644 --- a/internal/playback/on_get.go +++ b/internal/playback/on_get.go @@ -9,6 +9,7 @@ import ( "strconv" "time" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" "github.com/gin-gonic/gin" @@ -48,7 +49,7 @@ func seekAndMux( m muxer, ) error { if recordFormat == conf.RecordFormatFMP4 { - var firstInit []byte + var firstInit *fmp4.Init var segmentEnd time.Time err := func() error { @@ -67,7 +68,7 @@ func seekAndMux( segmentStartOffset := start.Sub(segments[0].Start) - segmentMaxElapsed, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, m) + segmentMaxElapsed, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m) if err != nil { return err } @@ -99,7 +100,7 @@ func seekAndMux( segmentStartOffset := seg.Start.Sub(start) - segmentMaxElapsed, err := segmentFMP4WriteParts(f, segmentStartOffset, duration, m) + segmentMaxElapsed, err := segmentFMP4WriteParts(f, segmentStartOffset, duration, firstInit, m) if err != nil { return err } diff --git a/internal/playback/on_list.go b/internal/playback/on_list.go index 41320250..6c1cb0e4 100644 --- a/internal/playback/on_list.go +++ b/internal/playback/on_list.go @@ -9,6 +9,7 @@ import ( "os" "time" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" "github.com/bluenviron/mediamtx/internal/conf" "github.com/gin-gonic/gin" ) @@ -27,7 +28,7 @@ type listEntry struct { func computeDurationAndConcatenate(recordFormat conf.RecordFormat, segments []*Segment) ([]listEntry, error) { if recordFormat == conf.RecordFormatFMP4 { out := []listEntry{} - var prevInit []byte + var prevInit *fmp4.Init for _, seg := range segments { err := func() error { @@ -47,7 +48,7 @@ func computeDurationAndConcatenate(recordFormat conf.RecordFormat, segments []*S return err } - maxDuration, err := segmentFMP4ReadMaxDuration(f) + maxDuration, err := segmentFMP4ReadMaxDuration(f, init) if err != nil { return err } diff --git a/internal/playback/segment_fmp4.go b/internal/playback/segment_fmp4.go index c6b76f7d..d565d831 100644 --- a/internal/playback/segment_fmp4.go +++ b/internal/playback/segment_fmp4.go @@ -5,45 +5,55 @@ import ( "errors" "fmt" "io" + "reflect" "time" "github.com/abema/go-mp4" + "github.com/bluenviron/mediacommon/pkg/formats/fmp4" ) const ( sampleFlagIsNonSyncSample = 1 << 16 concatenationTolerance = 500 * time.Millisecond - fmp4Timescale = 90000 ) -func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { - timeScale64 := uint64(timeScale) +var errTerminated = errors.New("terminated") + +func durationGoToMp4(v time.Duration, timeScale uint32) int64 { + timeScale64 := int64(timeScale) secs := v / time.Second dec := v % time.Second - return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second) + return int64(secs)*timeScale64 + int64(dec)*timeScale64/int64(time.Second) } -func durationMp4ToGo(v uint64, timeScale uint32) time.Duration { - timeScale64 := uint64(timeScale) +func durationMp4ToGo(v int64, timeScale uint32) time.Duration { + timeScale64 := int64(timeScale) secs := v / timeScale64 dec := v % timeScale64 return time.Duration(secs)*time.Second + time.Duration(dec)*time.Second/time.Duration(timeScale64) } -var errTerminated = errors.New("terminated") +func findInitTrack(tracks []*fmp4.InitTrack, id int) *fmp4.InitTrack { + for _, track := range tracks { + if track.ID == id { + return track + } + } + return nil +} func segmentFMP4CanBeConcatenated( - prevInit []byte, + prevInit *fmp4.Init, prevEnd time.Time, - curInit []byte, + curInit *fmp4.Init, curStart time.Time, ) bool { - return bytes.Equal(prevInit, curInit) && + return reflect.DeepEqual(prevInit, curInit) && !curStart.Before(prevEnd.Add(-concatenationTolerance)) && !curStart.After(prevEnd.Add(concatenationTolerance)) } -func segmentFMP4ReadInit(r io.ReadSeeker) ([]byte, error) { +func segmentFMP4ReadInit(r io.ReadSeeker) (*fmp4.Init, error) { buf := make([]byte, 8) _, err := io.ReadFull(r, buf) if err != nil { @@ -81,8 +91,6 @@ func segmentFMP4ReadInit(r io.ReadSeeker) ([]byte, error) { return nil, err } - // return ftyp and moov - buf = make([]byte, ftypSize+moovSize) _, err = io.ReadFull(r, buf) @@ -90,10 +98,19 @@ func segmentFMP4ReadInit(r io.ReadSeeker) ([]byte, error) { return nil, err } - return buf, nil + var init fmp4.Init + err = init.Unmarshal(bytes.NewReader(buf)) + if err != nil { + return nil, err + } + + return &init, nil } -func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) { +func segmentFMP4ReadMaxDuration( + r io.ReadSeeker, + init *fmp4.Init, +) (time.Duration, error) { // find and skip ftyp buf := make([]byte, 8) @@ -203,7 +220,7 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) { return 0, err } - maxElapsed := uint64(0) + var maxElapsed time.Duration // foreach traf @@ -220,7 +237,7 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) { return 0, fmt.Errorf("traf box not found") } - // skip tfhd + // parse tfhd _, err = io.ReadFull(r, buf) if err != nil { @@ -233,11 +250,24 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) { tfhdSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - _, err = r.Seek(int64(tfhdSize)-8, io.SeekCurrent) + buf2 := make([]byte, tfhdSize-8) + + _, err = io.ReadFull(r, buf2) if err != nil { return 0, err } + var tfhd mp4.Tfhd + _, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &tfhd, mp4.Context{}) + if err != nil { + return 0, fmt.Errorf("invalid tfhd box: %w", err) + } + + track := findInitTrack(init.Tracks, int(tfhd.TrackID)) + if track == nil { + return 0, fmt.Errorf("invalid track ID: %v", tfhd.TrackID) + } + // parse tfdt _, err = io.ReadFull(r, buf) @@ -251,7 +281,7 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) { tfdtSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - buf2 := make([]byte, tfdtSize-8) + buf2 = make([]byte, tfdtSize-8) _, err = io.ReadFull(r, buf2) if err != nil { @@ -290,33 +320,37 @@ func segmentFMP4ReadMaxDuration(r io.ReadSeeker) (time.Duration, error) { return 0, fmt.Errorf("invalid trun box: %w", err) } - elapsed := tfdt.BaseMediaDecodeTimeV1 + elapsed := int64(tfdt.BaseMediaDecodeTimeV1) for _, entry := range trun.Entries { - elapsed += uint64(entry.SampleDuration) + elapsed += int64(entry.SampleDuration) } - if elapsed > maxElapsed { - maxElapsed = elapsed + elapsedGo := durationMp4ToGo(elapsed, track.TimeScale) + + if elapsedGo > maxElapsed { + maxElapsed = elapsedGo } } - return durationMp4ToGo(maxElapsed, fmp4Timescale), nil + return maxElapsed, nil } func segmentFMP4SeekAndMuxParts( r io.ReadSeeker, segmentStartOffset time.Duration, duration time.Duration, + init *fmp4.Init, m muxer, ) (time.Duration, error) { - segmentStartOffsetMP4 := durationGoToMp4(segmentStartOffset, fmp4Timescale) - durationMP4 := durationGoToMp4(duration, fmp4Timescale) + var segmentStartOffsetMP4 int64 + var durationMP4 int64 moofOffset := uint64(0) var tfhd *mp4.Tfhd var tfdt *mp4.Tfdt atLeastOnePartWritten := false - maxMuxerDTS := int64(0) + var timeScale uint32 + var maxMuxerDTS time.Duration breakAtNextMdat := false _, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) { @@ -342,7 +376,15 @@ func segmentFMP4SeekAndMuxParts( } tfdt = box.(*mp4.Tfdt) + track := findInitTrack(init.Tracks, int(tfhd.TrackID)) + if track == nil { + return nil, fmt.Errorf("invalid track ID: %v", tfhd.TrackID) + } + m.setTrack(int(tfhd.TrackID)) + timeScale = track.TimeScale + segmentStartOffsetMP4 = durationGoToMp4(segmentStartOffset, track.TimeScale) + durationMP4 = durationGoToMp4(duration, track.TimeScale) case "trun": box, _, err := h.ReadPayload() @@ -358,11 +400,11 @@ func segmentFMP4SeekAndMuxParts( return nil, err } - muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) - int64(segmentStartOffsetMP4) + muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) - segmentStartOffsetMP4 atLeastOneSampleWritten := false for _, e := range trun.Entries { - if muxerDTS >= int64(durationMP4) { + if muxerDTS >= durationMP4 { breakAtNextMdat = true break } @@ -395,8 +437,10 @@ func segmentFMP4SeekAndMuxParts( m.writeFinalDTS(muxerDTS) } - if muxerDTS > maxMuxerDTS { - maxMuxerDTS = muxerDTS + muxerDTSGo := durationMp4ToGo(muxerDTS, timeScale) + + if muxerDTSGo > maxMuxerDTS { + maxMuxerDTS = muxerDTSGo } case "mdat": @@ -414,21 +458,23 @@ func segmentFMP4SeekAndMuxParts( return 0, errNoSegmentsFound } - return durationMp4ToGo(uint64(maxMuxerDTS), fmp4Timescale), nil + return maxMuxerDTS, nil } func segmentFMP4WriteParts( r io.ReadSeeker, segmentStartOffset time.Duration, duration time.Duration, + init *fmp4.Init, m muxer, ) (time.Duration, error) { - segmentStartOffsetMP4 := durationGoToMp4(segmentStartOffset, fmp4Timescale) - durationMP4 := durationGoToMp4(duration, fmp4Timescale) + var segmentStartOffsetMP4 int64 + var durationMP4 int64 moofOffset := uint64(0) var tfhd *mp4.Tfhd var tfdt *mp4.Tfdt - maxMuxerDTS := int64(0) + var timeScale uint32 + var maxMuxerDTS time.Duration breakAtNextMdat := false _, err := mp4.ReadBoxStructure(r, func(h *mp4.ReadHandle) (interface{}, error) { @@ -454,7 +500,15 @@ func segmentFMP4WriteParts( } tfdt = box.(*mp4.Tfdt) + track := findInitTrack(init.Tracks, int(tfhd.TrackID)) + if track == nil { + return nil, fmt.Errorf("invalid track ID: %v", tfhd.TrackID) + } + m.setTrack(int(tfhd.TrackID)) + timeScale = track.TimeScale + segmentStartOffsetMP4 = durationGoToMp4(segmentStartOffset, track.TimeScale) + durationMP4 = durationGoToMp4(duration, track.TimeScale) case "trun": box, _, err := h.ReadPayload() @@ -470,11 +524,11 @@ func segmentFMP4WriteParts( return nil, err } - muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) + int64(segmentStartOffsetMP4) + muxerDTS := int64(tfdt.BaseMediaDecodeTimeV1) + segmentStartOffsetMP4 atLeastOneSampleWritten := false for _, e := range trun.Entries { - if muxerDTS >= int64(durationMP4) { + if muxerDTS >= durationMP4 { breakAtNextMdat = true break } @@ -503,8 +557,10 @@ func segmentFMP4WriteParts( m.writeFinalDTS(muxerDTS) } - if muxerDTS > maxMuxerDTS { - maxMuxerDTS = muxerDTS + muxerDTSGo := durationMp4ToGo(muxerDTS, timeScale) + + if muxerDTSGo > maxMuxerDTS { + maxMuxerDTS = muxerDTSGo } case "mdat": @@ -518,5 +574,5 @@ func segmentFMP4WriteParts( return 0, err } - return durationMp4ToGo(uint64(maxMuxerDTS), fmp4Timescale), nil + return maxMuxerDTS, nil }