playback: improve /list performance (#3663) (#4102)

Segments are now parsed in parallel.
This commit is contained in:
Alessandro Ros 2025-01-03 13:59:49 +01:00 committed by GitHub
parent ac0ddc9e8a
commit 21b5031d6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 98 additions and 54 deletions

View File

@ -66,12 +66,12 @@ func seekAndMux(
segmentStartOffset := start.Sub(segments[0].Start)
segmentMaxElapsed, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m)
segmentDuration, err := segmentFMP4SeekAndMuxParts(f, segmentStartOffset, duration, firstInit, m)
if err != nil {
return err
}
segmentEnd = start.Add(segmentMaxElapsed)
segmentEnd = start.Add(segmentDuration)
for _, seg := range segments[1:] {
f, err = os.Open(seg.Fpath)
@ -92,13 +92,13 @@ func seekAndMux(
segmentStartOffset := seg.Start.Sub(start)
var segmentMaxElapsed time.Duration
segmentMaxElapsed, err = segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m)
var segmentDuration time.Duration
segmentDuration, err = segmentFMP4MuxParts(f, segmentStartOffset, duration, firstInit, m)
if err != nil {
return err
}
segmentEnd = start.Add(segmentMaxElapsed)
segmentEnd = start.Add(segmentDuration)
}
err = m.flush()

View File

@ -22,66 +22,110 @@ func (d listEntryDuration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).Seconds())
}
type parsedSegment struct {
start time.Time
init *fmp4.Init
duration time.Duration
}
func parseSegment(seg *recordstore.Segment) (*parsedSegment, error) {
f, err := os.Open(seg.Fpath)
if err != nil {
return nil, err
}
defer f.Close()
init, duration, err := segmentFMP4ReadHeader(f)
if err != nil {
return nil, err
}
// if duration is not present in the header, compute it
// by parsing each part
if duration == 0 {
duration, err = segmentFMP4ReadDurationFromParts(f, init)
if err != nil {
return nil, err
}
}
return &parsedSegment{
start: seg.Start,
init: init,
duration: duration,
}, nil
}
func parseSegments(segments []*recordstore.Segment) ([]*parsedSegment, error) {
parsed := make([]*parsedSegment, len(segments))
ch := make(chan error)
// process segments in parallel.
// parallel random access should improve performance in most cases.
// ref: https://pkolaczk.github.io/disk-parallelism/
for i, seg := range segments {
go func(i int, seg *recordstore.Segment) {
var err error
parsed[i], err = parseSegment(seg)
ch <- err
}(i, seg)
}
var err error
for range segments {
err2 := <-ch
if err2 != nil {
err = err2
}
}
return parsed, err
}
type listEntry struct {
Start time.Time `json:"start"`
Duration listEntryDuration `json:"duration"`
URL string `json:"url"`
}
func readDurationAndConcatenate(
func concatenateSegments(parsed []*parsedSegment) []listEntry {
out := []listEntry{}
var prevInit *fmp4.Init
for _, parsed := range parsed {
if len(out) != 0 && segmentFMP4CanBeConcatenated(
prevInit,
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
parsed.init,
parsed.start) {
prevStart := out[len(out)-1].Start
curEnd := parsed.start.Add(parsed.duration)
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
} else {
out = append(out, listEntry{
Start: parsed.start,
Duration: listEntryDuration(parsed.duration),
})
}
prevInit = parsed.init
}
return out
}
func parseAndConcatenate(
recordFormat conf.RecordFormat,
segments []*recordstore.Segment,
) ([]listEntry, error) {
if recordFormat == conf.RecordFormatFMP4 {
out := []listEntry{}
var prevInit *fmp4.Init
for _, seg := range segments {
err := func() error {
f, err := os.Open(seg.Fpath)
if err != nil {
return err
}
defer f.Close()
init, duration, err := segmentFMP4ReadHeader(f)
if err != nil {
return err
}
// if duration is not present in the header, compute it
// by parsing each part
if duration == 0 {
duration, err = segmentFMP4ReadDurationFromParts(f, init)
if err != nil {
return err
}
}
if len(out) != 0 && segmentFMP4CanBeConcatenated(
prevInit,
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
init,
seg.Start) {
prevStart := out[len(out)-1].Start
curEnd := seg.Start.Add(duration)
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
} else {
out = append(out, listEntry{
Start: seg.Start,
Duration: listEntryDuration(duration),
})
}
prevInit = init
return nil
}()
if err != nil {
return nil, err
}
parsed, err := parseSegments(segments)
if err != nil {
return nil, err
}
out := concatenateSegments(parsed)
return out, nil
}
@ -135,7 +179,7 @@ func (s *Server) onList(ctx *gin.Context) {
return
}
entries, err := readDurationAndConcatenate(pathConf.RecordFormat, segments)
entries, err := parseAndConcatenate(pathConf.RecordFormat, segments)
if err != nil {
s.writeError(ctx, http.StatusInternalServerError, err)
return