playback: add /list endpoint to list available timespans (#2978)

This commit is contained in:
Alessandro Ros 2024-02-03 15:54:07 +01:00 committed by GitHub
parent 4553fc267c
commit e6bf095a05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 538 additions and 102 deletions

View File

@ -56,7 +56,7 @@ And can be recorded with:
* Streams are automatically converted from a protocol to another
* Serve multiple streams at once in separate paths
* Record streams to disk
* Playback recordings
* Playback recorded streams
* Authenticate users; use internal or external authentication
* Redirect readers to other RTSP servers (load balancing)
* Query and control the server through the API
@ -116,7 +116,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [Encrypt the configuration](#encrypt-the-configuration)
* [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression)
* [Record streams to disk](#record-streams-to-disk)
* [Playback recordings](#playback-recordings)
* [Playback recorded streams](#playback-recorded-streams)
* [Forward streams to other servers](#forward-streams-to-other-servers)
* [Proxy requests to other servers](#proxy-requests-to-other-servers)
* [On-demand publishing](#on-demand-publishing)
@ -1191,16 +1191,37 @@ To upload recordings to a remote location, you can use _MediaMTX_ together with
If you want to delete local segments after they are uploaded, replace `rclone sync` with `rclone move`.
### Playback recordings
### Playback recorded streams
Recordings can be served to users through a dedicated HTTP server, that can be enabled inside the configuration:
Existing recordings can be served to users through a dedicated HTTP server, that can be enabled inside the configuration:
```yml
playback: yes
playbackAddress: :9996
```
The server can be queried for recordings by using the URL:
The server provides an endpoint to list recorded timespans:
```
http://localhost:9996/list?path=[mypath]
```
Where [mypath] is the name of a path. The server will return a list of timespans in JSON format:
```json
[
{
"start": "2006-01-02T15:04:05Z07:00",
"duration": "60.0"
},
{
"start": "2006-01-02T15:07:05Z07:00",
"duration": "32.33"
}
]
```
The server provides an endpoint for downloading recordings:
```
http://localhost:9996/get?path=[mypath]&start=[start_date]&duration=[duration]&format=[format]
@ -1213,9 +1234,7 @@ Where:
* [duration] is the maximum duration of the recording in Golang format (example: 20s, 20h)
* [format] must be fmp4
All parameters must be [url-encoded](https://www.urlencoder.org/).
For instance:
All parameters must be [url-encoded](https://www.urlencoder.org/). For instance:
```
http://localhost:9996/get?path=stream2&start=2024-01-14T16%3A33%3A17%2B00%3A00&duration=200s&format=fmp4
@ -1225,7 +1244,7 @@ The resulting stream is natively compatible with any browser, therefore its URL
```html
<video controls>
<source src="http://localhost:9996/get?path=stream2&start=2024-01-14T16%3A33%3A17%2B00%3A00&duration=200s&format=fmp4" type="video/mp4" />
<source src="http://localhost:9996/get?path=[mypath]&start=[start_date]&duration=[duration]&format=[format]" type="video/mp4" />
</video>
```

View File

@ -40,6 +40,8 @@ func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) {
return nil, err
}
// find ftyp
if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) {
return nil, fmt.Errorf("ftyp box not found")
}
@ -51,6 +53,8 @@ func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) {
return nil, err
}
// find moov
_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
@ -67,6 +71,8 @@ func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) {
return nil, err
}
// return ftyp and moov
buf = make([]byte, ftypSize+moovSize)
_, err = io.ReadFull(r, buf)
@ -77,7 +83,7 @@ func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) {
return buf, nil
}
func seekAndMuxParts(
func fmp4SeekAndMuxParts(
r io.ReadSeeker,
init []byte,
minTime time.Duration,
@ -243,7 +249,7 @@ func seekAndMuxParts(
return durationMp4ToGo(elapsed, 90000), nil
}
func muxParts(
func fmp4MuxParts(
r io.ReadSeeker,
startTime time.Duration,
maxTime time.Duration,
@ -381,7 +387,7 @@ func fmp4SeekAndMux(
return 0, err
}
elapsed, err := seekAndMuxParts(f, init, minTime, maxTime, w)
elapsed, err := fmp4SeekAndMuxParts(f, init, minTime, maxTime, w)
if err != nil {
return 0, err
}
@ -401,10 +407,222 @@ func fmp4Mux(
}
defer f.Close()
elapsed, err := muxParts(f, startTime, maxTime, w)
return fmp4MuxParts(f, startTime, maxTime, w)
}
func fmp4Duration(fpath string) (time.Duration, error) {
f, err := os.Open(fpath)
if err != nil {
return 0, err
}
defer f.Close()
// find and skip ftyp
buf := make([]byte, 8)
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
return elapsed, nil
if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) {
return 0, fmt.Errorf("ftyp box not found")
}
ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(ftypSize), io.SeekStart)
if err != nil {
return 0, err
}
// find and skip moov
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) {
return 0, fmt.Errorf("moov box not found")
}
moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(moovSize)-8, io.SeekCurrent)
if err != nil {
return 0, err
}
// find last valid moof and mdat
lastMoofPos := int64(-1)
for {
moofPos, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
_, err = io.ReadFull(f, buf)
if err != nil {
break
}
if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'f'}) {
return 0, fmt.Errorf("moof box not found")
}
moofSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(moofSize)-8, io.SeekCurrent)
if err != nil {
break
}
_, err = io.ReadFull(f, buf)
if err != nil {
break
}
if !bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) {
return 0, fmt.Errorf("mdat box not found")
}
mdatSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(mdatSize)-8, io.SeekCurrent)
if err != nil {
break
}
lastMoofPos = moofPos
}
if lastMoofPos < 0 {
return 0, fmt.Errorf("no moof boxes found")
}
// open last moof
_, err = f.Seek(lastMoofPos+8, io.SeekStart)
if err != nil {
return 0, err
}
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
// skip mfhd
if !bytes.Equal(buf[4:], []byte{'m', 'f', 'h', 'd'}) {
return 0, fmt.Errorf("mfhd box not found")
}
_, err = f.Seek(8, io.SeekCurrent)
if err != nil {
return 0, err
}
maxElapsed := uint64(0)
// foreach traf
for {
_, err := io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'r', 'a', 'f'}) {
if bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) {
break
}
return 0, fmt.Errorf("traf box not found")
}
// skip tfhd
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'f', 'h', 'd'}) {
return 0, fmt.Errorf("tfhd box not found")
}
tfhdSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(tfhdSize)-8, io.SeekCurrent)
if err != nil {
return 0, err
}
// parse tfdt
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'f', 'd', 't'}) {
return 0, fmt.Errorf("tfdt box not found")
}
tfdtSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
buf2 := make([]byte, tfdtSize-8)
_, err = io.ReadFull(f, buf2)
if err != nil {
return 0, err
}
var tfdt mp4.Tfdt
_, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &tfdt, mp4.Context{})
if err != nil {
return 0, fmt.Errorf("invalid tfdt box: %w", err)
}
elapsed := tfdt.BaseMediaDecodeTimeV1
// parse trun
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'r', 'u', 'n'}) {
return 0, fmt.Errorf("trun box not found")
}
trunSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
buf2 = make([]byte, trunSize-8)
_, err = io.ReadFull(f, buf2)
if err != nil {
return 0, err
}
var trun mp4.Trun
_, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &trun, mp4.Context{})
if err != nil {
return 0, fmt.Errorf("invalid trun box: %w", err)
}
for _, entry := range trun.Entries {
elapsed += uint64(entry.SampleDuration)
}
if elapsed > maxElapsed {
maxElapsed = elapsed
}
}
return durationMp4ToGo(maxElapsed, 90000), nil
}

View File

@ -0,0 +1,169 @@
package playback
import (
"fmt"
"io/fs"
"path/filepath"
"sort"
"strings"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/record"
)
type segment struct {
fpath string
start time.Time
duration time.Duration
}
func findSegments(
pathConf *conf.Path,
pathName string,
start time.Time,
duration time.Duration,
) ([]*segment, error) {
if !pathConf.Playback {
return nil, fmt.Errorf("playback is disabled on path '%s'", pathName)
}
recordPath := record.PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
pathConf.RecordFormat,
)
// we have to convert to absolute paths
// otherwise, recordPath and fpath inside Walk() won't have common elements
recordPath, _ = filepath.Abs(recordPath)
commonPath := record.CommonPath(recordPath)
end := start.Add(duration)
var segments []*segment
err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
var pa record.Path
ok := pa.Decode(recordPath, fpath)
// gather all segments that starts before the end of the playback
if ok && !end.Before(time.Time(pa)) {
segments = append(segments, &segment{
fpath: fpath,
start: time.Time(pa),
})
}
}
return nil
})
if err != nil {
return nil, err
}
if segments == nil {
return nil, errNoSegmentsFound
}
sort.Slice(segments, func(i, j int) bool {
return segments[i].start.Before(segments[j].start)
})
// find the segment that may contain the start of the playback and remove all previous ones
found := false
for i := 0; i < len(segments)-1; i++ {
if !start.Before(segments[i].start) && start.Before(segments[i+1].start) {
segments = segments[i:]
found = true
break
}
}
// otherwise, keep the last segment only and check if it may contain the start of the playback
if !found {
segments = segments[len(segments)-1:]
if segments[len(segments)-1].start.After(start) {
return nil, errNoSegmentsFound
}
}
return segments, nil
}
func findAllSegments(
pathConf *conf.Path,
pathName string,
) ([]*segment, error) {
if !pathConf.Playback {
return nil, fmt.Errorf("playback is disabled on path '%s'", pathName)
}
recordPath := record.PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
pathConf.RecordFormat,
)
// we have to convert to absolute paths
// otherwise, recordPath and fpath inside Walk() won't have common elements
recordPath, _ = filepath.Abs(recordPath)
commonPath := record.CommonPath(recordPath)
var segments []*segment
err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
var pa record.Path
ok := pa.Decode(recordPath, fpath)
if ok {
segments = append(segments, &segment{
fpath: fpath,
start: time.Time(pa),
})
}
}
return nil
})
if err != nil {
return nil, err
}
if segments == nil {
return nil, errNoSegmentsFound
}
sort.Slice(segments, func(i, j int) bool {
return segments[i].start.Before(segments[j].start)
})
return segments, nil
}
func canBeConcatenated(seg1, seg2 *segment) bool {
end1 := seg1.start.Add(seg1.duration)
return !seg2.start.Before(end1.Add(-concatenationTolerance)) && !seg2.start.After(end1.Add(concatenationTolerance))
}
func mergeConcatenatedSegments(in []*segment) []*segment {
var out []*segment
for _, seg := range in {
if len(out) != 0 && canBeConcatenated(out[len(out)-1], seg) {
start := out[len(out)-1].start
end := seg.start.Add(seg.duration)
out[len(out)-1].duration = end.Sub(start)
} else {
out = append(out, seg)
}
}
return out
}

View File

@ -4,19 +4,14 @@ package playback
import (
"errors"
"fmt"
"io/fs"
"net"
"net/http"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpserv"
"github.com/bluenviron/mediamtx/internal/record"
"github.com/bluenviron/mediamtx/internal/restrictnetwork"
"github.com/gin-gonic/gin"
)
@ -27,6 +22,11 @@ const (
var errNoSegmentsFound = errors.New("no recording segments found for the given timestamp")
type listEntry struct {
Start time.Time `json:"start"`
Duration float64 `json:"duration"`
}
type writerWrapper struct {
ctx *gin.Context
written bool
@ -41,86 +41,6 @@ func (w *writerWrapper) Write(p []byte) (int, error) {
return w.ctx.Writer.Write(p)
}
type segment struct {
fpath string
start time.Time
}
func findSegments(
pathConf *conf.Path,
pathName string,
start time.Time,
duration time.Duration,
) ([]segment, error) {
if !pathConf.Playback {
return nil, fmt.Errorf("playback is disabled on path '%s'", pathName)
}
recordPath := record.PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
pathConf.RecordFormat,
)
// we have to convert to absolute paths
// otherwise, recordPath and fpath inside Walk() won't have common elements
recordPath, _ = filepath.Abs(recordPath)
commonPath := record.CommonPath(recordPath)
end := start.Add(duration)
var segments []segment
// gather all segments that starts before the end of the playback
err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
var pa record.Path
ok := pa.Decode(recordPath, fpath)
if ok && !end.Before(time.Time(pa)) {
segments = append(segments, segment{
fpath: fpath,
start: time.Time(pa),
})
}
}
return nil
})
if err != nil {
return nil, err
}
if segments == nil {
return nil, errNoSegmentsFound
}
sort.Slice(segments, func(i, j int) bool {
return segments[i].start.Before(segments[j].start)
})
// find the segment that may contain the start of the playback and remove all previous ones
found := false
for i := 0; i < len(segments)-1; i++ {
if !start.Before(segments[i].start) && start.Before(segments[i+1].start) {
segments = segments[i:]
found = true
break
}
}
// otherwise, keep the last segment only and check whether it may contain the start of the playback
if !found {
segments = segments[len(segments)-1:]
if segments[len(segments)-1].start.After(start) {
return nil, errNoSegmentsFound
}
}
return segments, nil
}
// Server is the playback server.
type Server struct {
Address string
@ -139,6 +59,7 @@ func (p *Server) Initialize() error {
group := router.Group("/")
group.GET("/list", p.onList)
group.GET("/get", p.onGet)
network, address := restrictnetwork.Restrict("tcp", p.Address)
@ -196,6 +117,52 @@ func (p *Server) safeFindPathConf(name string) (*conf.Path, error) {
return pathConf, err
}
func (p *Server) onList(ctx *gin.Context) {
pathName := ctx.Query("path")
pathConf, err := p.safeFindPathConf(pathName)
if err != nil {
p.writeError(ctx, http.StatusBadRequest, err)
return
}
segments, err := findAllSegments(pathConf, pathName)
if err != nil {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
if pathConf.RecordFormat != conf.RecordFormatFMP4 {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("format of recording segments is not fmp4"))
return
}
for _, seg := range segments {
d, err := fmp4Duration(seg.fpath)
if err != nil {
p.writeError(ctx, http.StatusInternalServerError, err)
return
}
seg.duration = d
}
segments = mergeConcatenatedSegments(segments)
out := make([]listEntry, len(segments))
for i, seg := range segments {
out[i] = listEntry{
Start: seg.start,
Duration: seg.duration.Seconds(),
}
}
ctx.JSON(http.StatusOK, out)
}
func (p *Server) onGet(ctx *gin.Context) {
pathName := ctx.Query("path")
@ -264,7 +231,7 @@ func (p *Server) onGet(ctx *gin.Context) {
return
}
// something has been already written: abort and write to logs only
// something has already been written: abort and write logs only
p.Log(logger.Error, err.Error())
return
}
@ -274,7 +241,7 @@ func (p *Server) onGet(ctx *gin.Context) {
overallElapsed := elapsed
for _, seg := range segments[1:] {
// there's a gap between segments; stop serving the recording.
// there's a gap between segments, stop serving the recording
if seg.start.Before(start.Add(-concatenationTolerance)) || seg.start.After(start.Add(concatenationTolerance)) {
return
}

View File

@ -1,6 +1,7 @@
package playback
import (
"encoding/json"
"io"
"net/http"
"net/url"
@ -134,7 +135,7 @@ func writeSegment2(t *testing.T, fpath string) {
require.NoError(t, err)
}
func TestServer(t *testing.T) {
func TestServerGet(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
@ -226,3 +227,65 @@ func TestServer(t *testing.T) {
},
}, parts)
}
func TestServerList(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-000000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-000000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2009-11-07_11-23-02-000000.mp4"))
s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.StringDuration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Playback: true,
RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
},
},
Parent: &nilLogger{},
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
v := url.Values{}
v.Set("path", "mypath")
u := &url.URL{
Scheme: "http",
Host: "localhost:9996",
Path: "/list",
RawQuery: v.Encode(),
}
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
var out interface{}
err = json.NewDecoder(res.Body).Decode(&out)
require.NoError(t, err)
require.Equal(t, []interface{}{
map[string]interface{}{
"duration": float64(64),
"start": time.Date(2008, 11, 0o7, 11, 22, 0, 0, time.Local).Format(time.RFC3339),
},
map[string]interface{}{
"duration": float64(2),
"start": time.Date(2009, 11, 0o7, 11, 23, 2, 0, time.Local).Format(time.RFC3339),
},
}, out)
}