mirror of
https://github.com/bluenviron/mediamtx
synced 2025-01-03 05:22:30 +00:00
perf: chunk and process playback segments concurrently
This commit is contained in:
parent
1dfb5ba4b9
commit
d39ea20193
@ -8,6 +8,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -27,6 +28,67 @@ type listEntry struct {
|
||||
Start time.Time `json:"start"`
|
||||
Duration listEntryDuration `json:"duration"`
|
||||
URL string `json:"url"`
|
||||
init *fmp4.Init
|
||||
}
|
||||
|
||||
func computeDurationAndConcatenateFMP4(segments []*recordstore.Segment) ([]listEntry, error) {
|
||||
out := []listEntry{}
|
||||
var prevInit *fmp4.Init
|
||||
|
||||
for _, seg := range segments {
|
||||
err := func() error {
|
||||
f, err := os.Open(seg.Fpath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
init, err := segmentFMP4ReadInit(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = f.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxDuration, err := segmentFMP4ReadMaxDuration(f, init)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(out) != 0 && segmentFMP4CanBeConcatenated(
|
||||
prevInit,
|
||||
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
|
||||
init,
|
||||
seg.Start) {
|
||||
prevStart := out[len(out)-1].Start
|
||||
curEnd := seg.Start.Add(maxDuration)
|
||||
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
|
||||
} else {
|
||||
out = append(out, listEntry{
|
||||
Start: seg.Start,
|
||||
Duration: listEntryDuration(maxDuration),
|
||||
init: init,
|
||||
})
|
||||
}
|
||||
|
||||
prevInit = init
|
||||
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
type concatEntryRes struct {
|
||||
entry []listEntry
|
||||
err error
|
||||
}
|
||||
|
||||
func computeDurationAndConcatenate(
|
||||
@ -34,54 +96,53 @@ func computeDurationAndConcatenate(
|
||||
segments []*recordstore.Segment,
|
||||
) ([]listEntry, error) {
|
||||
if recordFormat == conf.RecordFormatFMP4 {
|
||||
const numWorkers = 4
|
||||
chunkSize := (len(segments) + numWorkers - 1) / numWorkers
|
||||
ch := make(chan (*concatEntryRes), numWorkers)
|
||||
defer close(ch)
|
||||
|
||||
numChunks := 0
|
||||
for i := 0; i < len(segments); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(segments) {
|
||||
end = len(segments)
|
||||
}
|
||||
|
||||
numChunks++
|
||||
go func(segments []*recordstore.Segment) {
|
||||
entry, err := computeDurationAndConcatenateFMP4(segments)
|
||||
ch <- &concatEntryRes{entry: entry, err: err}
|
||||
}(segments[i:end])
|
||||
}
|
||||
|
||||
entries := []listEntry{}
|
||||
for i := 0; i < numChunks; i++ {
|
||||
res := <-ch
|
||||
if res.err != nil {
|
||||
return nil, res.err
|
||||
}
|
||||
entries = append(entries, res.entry...)
|
||||
}
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
return entries[i].Start.Before(entries[j].Start)
|
||||
})
|
||||
|
||||
out := []listEntry{}
|
||||
var prevInit *fmp4.Init
|
||||
|
||||
for _, seg := range segments {
|
||||
err := func() error {
|
||||
f, err := os.Open(seg.Fpath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
init, err := segmentFMP4ReadInit(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = f.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxDuration, err := segmentFMP4ReadMaxDuration(f, init)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(out) != 0 && segmentFMP4CanBeConcatenated(
|
||||
prevInit,
|
||||
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
|
||||
init,
|
||||
seg.Start) {
|
||||
prevStart := out[len(out)-1].Start
|
||||
curEnd := seg.Start.Add(maxDuration)
|
||||
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
|
||||
} else {
|
||||
out = append(out, listEntry{
|
||||
Start: seg.Start,
|
||||
Duration: listEntryDuration(maxDuration),
|
||||
})
|
||||
}
|
||||
|
||||
prevInit = init
|
||||
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for _, entry := range entries {
|
||||
if len(out) != 0 && segmentFMP4CanBeConcatenated(
|
||||
prevInit,
|
||||
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
|
||||
entry.init,
|
||||
entry.Start) {
|
||||
prevStart := out[len(out)-1].Start
|
||||
curEnd := entry.Start.Add(time.Duration(entry.Duration))
|
||||
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
|
||||
} else {
|
||||
out = append(out, entry)
|
||||
}
|
||||
|
||||
prevInit = entry.init
|
||||
}
|
||||
|
||||
return out, nil
|
||||
|
Loading…
Reference in New Issue
Block a user