From 442a48363cd4a4881a141a16c76b30b1847c98df Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Thu, 28 Sep 2023 19:39:56 +0200 Subject: [PATCH] allow to start/stop recording without disconnecting clients (#2395) (#2434) --- internal/conf/path.go | 2 + internal/core/path.go | 55 +++++++++++++++--------- internal/core/path_manager.go | 2 + internal/core/path_test.go | 79 +++++++++++++++++++++++++++++++++++ internal/record/agent.go | 2 + internal/record/part.go | 27 ++++++++++++ internal/record/segment.go | 58 +++++++------------------ 7 files changed, 163 insertions(+), 62 deletions(-) diff --git a/internal/conf/path.go b/internal/conf/path.go index b258ea0f..03dfd458 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -419,6 +419,8 @@ func (pconf PathConf) Clone() *PathConf { panic(err) } + dest.Regexp = pconf.Regexp + return &dest } diff --git a/internal/core/path.go b/internal/core/path.go index 016e981b..38926138 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -299,12 +299,6 @@ func (pa *path) Log(level logger.Level, format string, args ...interface{}) { pa.parent.Log(level, "[path "+pa.name+"] "+format, args...) } -func (pa *path) safeConf() *conf.PathConf { - pa.confMutex.RLock() - defer pa.confMutex.RUnlock() - return pa.conf -} - func (pa *path) run() { defer close(pa.done) defer pa.wg.Done() @@ -512,13 +506,22 @@ func (pa *path) doOnDemandPublisherCloseTimer() { } func (pa *path) doReloadConf(newConf *conf.PathConf) { + pa.confMutex.Lock() + pa.conf = newConf + pa.confMutex.Unlock() + if pa.conf.HasStaticSource() { go pa.source.(*sourceStatic).reloadConf(newConf) } - pa.confMutex.Lock() - pa.conf = newConf - pa.confMutex.Unlock() + if pa.recordingEnabled() { + if pa.stream != nil && pa.recordAgent == nil { + pa.startRecording() + } + } else if pa.recordAgent != nil { + pa.recordAgent.Close() + pa.recordAgent = nil + } } func (pa *path) doSourceStaticSetReady(req pathSourceStaticSetReadyReq) { @@ -778,6 +781,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) { } } +func (pa *path) safeConf() *conf.PathConf { + pa.confMutex.RLock() + defer pa.confMutex.RUnlock() + return pa.conf +} + func (pa *path) shouldClose() bool { return pa.conf.Regexp != nil && pa.source == nil && @@ -786,6 +795,10 @@ func (pa *path) shouldClose() bool { len(pa.readerAddRequestsOnHold) == 0 } +func (pa *path) recordingEnabled() bool { + return pa.record && pa.conf.Record +} + func (pa *path) externalCmdEnv() externalcmd.Environment { _, port, _ := net.SplitHostPort(pa.rtspAddress) env := externalcmd.Environment{ @@ -886,16 +899,8 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error return err } - if pa.record && pa.conf.Record { - pa.recordAgent = record.NewAgent( - pa.writeQueueSize, - pa.recordPath, - time.Duration(pa.recordPartDuration), - time.Duration(pa.recordSegmentDuration), - pa.name, - pa.stream, - pa, - ) + if pa.recordingEnabled() { + pa.startRecording() } pa.readyTime = time.Now() @@ -962,6 +967,18 @@ func (pa *path) setNotReady() { } } +func (pa *path) startRecording() { + pa.recordAgent = record.NewAgent( + pa.writeQueueSize, + pa.recordPath, + time.Duration(pa.recordPartDuration), + time.Duration(pa.recordSegmentDuration), + pa.name, + pa.stream, + pa, + ) +} + func (pa *path) executeRemoveReader(r reader) { delete(pa.readers, r) } diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index c7eb87bf..5ae81adb 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -14,6 +14,8 @@ import ( func pathConfCanBeUpdated(oldPathConf *conf.PathConf, newPathConf *conf.PathConf) bool { clone := oldPathConf.Clone() + clone.Record = newPathConf.Record + clone.RPICameraBrightness = newPathConf.RPICameraBrightness clone.RPICameraContrast = newPathConf.RPICameraContrast clone.RPICameraSaturation = newPathConf.RPICameraSaturation diff --git a/internal/core/path_test.go b/internal/core/path_test.go index 8faba69b..60e5fb53 100644 --- a/internal/core/path_test.go +++ b/internal/core/path_test.go @@ -19,6 +19,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/sdp" rtspurl "github.com/bluenviron/gortsplib/v4/pkg/url" "github.com/datarhei/gosrt" + "github.com/pion/rtp" "github.com/stretchr/testify/require" "github.com/bluenviron/mediamtx/internal/rtmp" @@ -409,3 +410,81 @@ func TestPathMaxReaders(t *testing.T) { } } } + +func TestPathRecord(t *testing.T) { + dir, err := os.MkdirTemp("", "rtsp-path-record") + require.NoError(t, err) + defer os.RemoveAll(dir) + + p, ok := newInstance("api: yes\n" + + "record: yes\n" + + "recordPath: " + filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") + "\n" + + "paths:\n" + + " all:\n" + + " record: yes\n") + require.Equal(t, true, ok) + defer p.Close() + + source := gortsplib.Client{} + err = source.StartRecording( + "rtsp://localhost:8554/mystream", + &description.Session{Medias: []*description.Media{testMediaH264}}) + require.NoError(t, err) + defer source.Close() + + for i := 0; i < 4; i++ { + err := source.WritePacketRTP(testMediaH264, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 1123 + uint16(i), + Timestamp: 45343 + 90000*uint32(i), + SSRC: 563423, + }, + Payload: []byte{5}, + }) + require.NoError(t, err) + } + + time.Sleep(500 * time.Millisecond) + + files, err := os.ReadDir(filepath.Join(dir, "mystream")) + require.NoError(t, err) + require.Equal(t, 1, len(files)) + + hc := &http.Client{Transport: &http.Transport{}} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v2/config/paths/edit/all", map[string]interface{}{ + "record": false, + }, nil) + + time.Sleep(500 * time.Millisecond) + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v2/config/paths/edit/all", map[string]interface{}{ + "record": true, + }, nil) + + time.Sleep(500 * time.Millisecond) + + for i := 4; i < 8; i++ { + err := source.WritePacketRTP(testMediaH264, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 1123 + uint16(i), + Timestamp: 45343 + 90000*uint32(i), + SSRC: 563423, + }, + Payload: []byte{5}, + }) + require.NoError(t, err) + } + + time.Sleep(500 * time.Millisecond) + + files, err = os.ReadDir(filepath.Join(dir, "mystream")) + require.NoError(t, err) + require.Equal(t, 2, len(files)) +} diff --git a/internal/record/agent.go b/internal/record/agent.go index 93ac5156..6dc49e5c 100644 --- a/internal/record/agent.go +++ b/internal/record/agent.go @@ -832,6 +832,8 @@ func NewAgent( // Close closes the Agent. func (r *Agent) Close() { + r.Log(logger.Info, "recording stopped") + r.ctxCancel() <-r.done } diff --git a/internal/record/part.go b/internal/record/part.go index e8d69c6e..a9dd6ec5 100644 --- a/internal/record/part.go +++ b/internal/record/part.go @@ -2,10 +2,14 @@ package record import ( "io" + "os" + "path/filepath" "time" "github.com/aler9/writerseeker" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + + "github.com/bluenviron/mediamtx/internal/logger" ) func writePart(f io.Writer, partTracks map[*track]*fmp4.PartTrack) error { @@ -50,6 +54,29 @@ func newPart( } func (p *part) close() error { + if p.s.f == nil { + p.s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, p.s.r.path) + p.s.r.Log(logger.Debug, "opening segment %s", p.s.fpath) + + err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755) + if err != nil { + return err + } + + f, err := os.Create(p.s.fpath) + if err != nil { + return err + } + + err = writeInit(f, p.s.r.tracks) + if err != nil { + f.Close() + return err + } + + p.s.f = f + } + return writePart(p.s.f, p.partTracks) } diff --git a/internal/record/segment.go b/internal/record/segment.go index dfb958a4..aeae3ad4 100644 --- a/internal/record/segment.go +++ b/internal/record/segment.go @@ -3,7 +3,6 @@ package record import ( "io" "os" - "path/filepath" "time" "github.com/aler9/writerseeker" @@ -54,31 +53,31 @@ func newSegment( } func (s *segment) close() error { + var err error + if s.curPart != nil { - err := s.flush() - - if s.f != nil { - s.r.Log(logger.Debug, "closing segment %s", s.fpath) - - err2 := s.f.Close() - if err == nil { - err = err2 - } - } - - return err + err = s.curPart.close() } - return nil + if s.f != nil { + s.r.Log(logger.Debug, "closing segment %s", s.fpath) + err2 := s.f.Close() + if err == nil { + err = err2 + } + } + + return err } func (s *segment) record(track *track, sample *sample) error { if s.curPart == nil { s.curPart = newPart(s, sample.dts) } else if s.curPart.duration() >= s.r.partDuration { - err := s.flush() + err := s.curPart.close() + s.curPart = nil + if err != nil { - s.curPart = nil return err } @@ -87,30 +86,3 @@ func (s *segment) record(track *track, sample *sample) error { return s.curPart.record(track, sample) } - -func (s *segment) flush() error { - if s.f == nil { - s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, s.r.path) - s.r.Log(logger.Debug, "opening segment %s", s.fpath) - - err := os.MkdirAll(filepath.Dir(s.fpath), 0o755) - if err != nil { - return err - } - - f, err := os.Create(s.fpath) - if err != nil { - return err - } - - err = writeInit(f, s.r.tracks) - if err != nil { - f.Close() - return err - } - - s.f = f - } - - return s.curPart.close() -}