allow to start/stop recording without disconnecting clients (#2395) (#2434)

This commit is contained in:
Alessandro Ros 2023-09-28 19:39:56 +02:00 committed by GitHub
parent 72e74b6456
commit 442a48363c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 163 additions and 62 deletions

View File

@ -419,6 +419,8 @@ func (pconf PathConf) Clone() *PathConf {
panic(err)
}
dest.Regexp = pconf.Regexp
return &dest
}

View File

@ -299,12 +299,6 @@ func (pa *path) Log(level logger.Level, format string, args ...interface{}) {
pa.parent.Log(level, "[path "+pa.name+"] "+format, args...)
}
func (pa *path) safeConf() *conf.PathConf {
pa.confMutex.RLock()
defer pa.confMutex.RUnlock()
return pa.conf
}
func (pa *path) run() {
defer close(pa.done)
defer pa.wg.Done()
@ -512,13 +506,22 @@ func (pa *path) doOnDemandPublisherCloseTimer() {
}
func (pa *path) doReloadConf(newConf *conf.PathConf) {
pa.confMutex.Lock()
pa.conf = newConf
pa.confMutex.Unlock()
if pa.conf.HasStaticSource() {
go pa.source.(*sourceStatic).reloadConf(newConf)
}
pa.confMutex.Lock()
pa.conf = newConf
pa.confMutex.Unlock()
if pa.recordingEnabled() {
if pa.stream != nil && pa.recordAgent == nil {
pa.startRecording()
}
} else if pa.recordAgent != nil {
pa.recordAgent.Close()
pa.recordAgent = nil
}
}
func (pa *path) doSourceStaticSetReady(req pathSourceStaticSetReadyReq) {
@ -778,6 +781,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
}
}
func (pa *path) safeConf() *conf.PathConf {
pa.confMutex.RLock()
defer pa.confMutex.RUnlock()
return pa.conf
}
func (pa *path) shouldClose() bool {
return pa.conf.Regexp != nil &&
pa.source == nil &&
@ -786,6 +795,10 @@ func (pa *path) shouldClose() bool {
len(pa.readerAddRequestsOnHold) == 0
}
func (pa *path) recordingEnabled() bool {
return pa.record && pa.conf.Record
}
func (pa *path) externalCmdEnv() externalcmd.Environment {
_, port, _ := net.SplitHostPort(pa.rtspAddress)
env := externalcmd.Environment{
@ -886,16 +899,8 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
return err
}
if pa.record && pa.conf.Record {
pa.recordAgent = record.NewAgent(
pa.writeQueueSize,
pa.recordPath,
time.Duration(pa.recordPartDuration),
time.Duration(pa.recordSegmentDuration),
pa.name,
pa.stream,
pa,
)
if pa.recordingEnabled() {
pa.startRecording()
}
pa.readyTime = time.Now()
@ -962,6 +967,18 @@ func (pa *path) setNotReady() {
}
}
func (pa *path) startRecording() {
pa.recordAgent = record.NewAgent(
pa.writeQueueSize,
pa.recordPath,
time.Duration(pa.recordPartDuration),
time.Duration(pa.recordSegmentDuration),
pa.name,
pa.stream,
pa,
)
}
func (pa *path) executeRemoveReader(r reader) {
delete(pa.readers, r)
}

View File

@ -14,6 +14,8 @@ import (
func pathConfCanBeUpdated(oldPathConf *conf.PathConf, newPathConf *conf.PathConf) bool {
clone := oldPathConf.Clone()
clone.Record = newPathConf.Record
clone.RPICameraBrightness = newPathConf.RPICameraBrightness
clone.RPICameraContrast = newPathConf.RPICameraContrast
clone.RPICameraSaturation = newPathConf.RPICameraSaturation

View File

@ -19,6 +19,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
rtspurl "github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/datarhei/gosrt"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/rtmp"
@ -409,3 +410,81 @@ func TestPathMaxReaders(t *testing.T) {
}
}
}
func TestPathRecord(t *testing.T) {
dir, err := os.MkdirTemp("", "rtsp-path-record")
require.NoError(t, err)
defer os.RemoveAll(dir)
p, ok := newInstance("api: yes\n" +
"record: yes\n" +
"recordPath: " + filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") + "\n" +
"paths:\n" +
" all:\n" +
" record: yes\n")
require.Equal(t, true, ok)
defer p.Close()
source := gortsplib.Client{}
err = source.StartRecording(
"rtsp://localhost:8554/mystream",
&description.Session{Medias: []*description.Media{testMediaH264}})
require.NoError(t, err)
defer source.Close()
for i := 0; i < 4; i++ {
err := source.WritePacketRTP(testMediaH264, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 1123 + uint16(i),
Timestamp: 45343 + 90000*uint32(i),
SSRC: 563423,
},
Payload: []byte{5},
})
require.NoError(t, err)
}
time.Sleep(500 * time.Millisecond)
files, err := os.ReadDir(filepath.Join(dir, "mystream"))
require.NoError(t, err)
require.Equal(t, 1, len(files))
hc := &http.Client{Transport: &http.Transport{}}
httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v2/config/paths/edit/all", map[string]interface{}{
"record": false,
}, nil)
time.Sleep(500 * time.Millisecond)
httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v2/config/paths/edit/all", map[string]interface{}{
"record": true,
}, nil)
time.Sleep(500 * time.Millisecond)
for i := 4; i < 8; i++ {
err := source.WritePacketRTP(testMediaH264, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 1123 + uint16(i),
Timestamp: 45343 + 90000*uint32(i),
SSRC: 563423,
},
Payload: []byte{5},
})
require.NoError(t, err)
}
time.Sleep(500 * time.Millisecond)
files, err = os.ReadDir(filepath.Join(dir, "mystream"))
require.NoError(t, err)
require.Equal(t, 2, len(files))
}

View File

@ -832,6 +832,8 @@ func NewAgent(
// Close closes the Agent.
func (r *Agent) Close() {
r.Log(logger.Info, "recording stopped")
r.ctxCancel()
<-r.done
}

View File

@ -2,10 +2,14 @@ package record
import (
"io"
"os"
"path/filepath"
"time"
"github.com/aler9/writerseeker"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/logger"
)
func writePart(f io.Writer, partTracks map[*track]*fmp4.PartTrack) error {
@ -50,6 +54,29 @@ func newPart(
}
func (p *part) close() error {
if p.s.f == nil {
p.s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, p.s.r.path)
p.s.r.Log(logger.Debug, "opening segment %s", p.s.fpath)
err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755)
if err != nil {
return err
}
f, err := os.Create(p.s.fpath)
if err != nil {
return err
}
err = writeInit(f, p.s.r.tracks)
if err != nil {
f.Close()
return err
}
p.s.f = f
}
return writePart(p.s.f, p.partTracks)
}

View File

@ -3,7 +3,6 @@ package record
import (
"io"
"os"
"path/filepath"
"time"
"github.com/aler9/writerseeker"
@ -54,31 +53,31 @@ func newSegment(
}
func (s *segment) close() error {
var err error
if s.curPart != nil {
err := s.flush()
if s.f != nil {
s.r.Log(logger.Debug, "closing segment %s", s.fpath)
err2 := s.f.Close()
if err == nil {
err = err2
}
}
return err
err = s.curPart.close()
}
return nil
if s.f != nil {
s.r.Log(logger.Debug, "closing segment %s", s.fpath)
err2 := s.f.Close()
if err == nil {
err = err2
}
}
return err
}
func (s *segment) record(track *track, sample *sample) error {
if s.curPart == nil {
s.curPart = newPart(s, sample.dts)
} else if s.curPart.duration() >= s.r.partDuration {
err := s.flush()
err := s.curPart.close()
s.curPart = nil
if err != nil {
s.curPart = nil
return err
}
@ -87,30 +86,3 @@ func (s *segment) record(track *track, sample *sample) error {
return s.curPart.record(track, sample)
}
func (s *segment) flush() error {
if s.f == nil {
s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, s.r.path)
s.r.Log(logger.Debug, "opening segment %s", s.fpath)
err := os.MkdirAll(filepath.Dir(s.fpath), 0o755)
if err != nil {
return err
}
f, err := os.Create(s.fpath)
if err != nil {
return err
}
err = writeInit(f, s.r.tracks)
if err != nil {
f.Close()
return err
}
s.f = f
}
return s.curPart.close()
}