From 514036d41a161cc3d1ae1d057e5e5cd739bc9cb8 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Mon, 15 Jan 2024 12:08:14 +0100 Subject: [PATCH] treat different RTSP formats as different tracks in logs and API (#2907) --- internal/core/path.go | 2 +- internal/defs/source.go | 47 +++++++++++++++++---------- internal/record/format_fmp4.go | 48 ++++++++++++++-------------- internal/record/format_fmp4_track.go | 10 ------ internal/record/format_mpegts.go | 32 +++++++++---------- internal/servers/hls/muxer.go | 38 ++++++++-------------- internal/servers/rtmp/conn.go | 2 +- internal/servers/srt/conn.go | 2 +- internal/servers/webrtc/session.go | 2 +- internal/stream/stream.go | 15 ++++----- 10 files changed, 94 insertions(+), 104 deletions(-) diff --git a/internal/core/path.go b/internal/core/path.go index e6655028..e090e9fa 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -630,7 +630,7 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) { if pa.stream == nil { return []string{} } - return defs.MediasDescription(pa.stream.Desc().Medias) + return defs.MediasToCodecs(pa.stream.Desc().Medias) }(), BytesReceived: func() uint64 { if pa.stream == nil { diff --git a/internal/defs/source.go b/internal/defs/source.go index 0a0bcab0..e1c3171c 100644 --- a/internal/defs/source.go +++ b/internal/defs/source.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -19,32 +20,44 @@ type Source interface { APISourceDescribe() APIPathSourceOrReader } -func mediaDescription(media *description.Media) string { - ret := make([]string, len(media.Formats)) - for i, forma := range media.Formats { +// FormatsToCodecs returns the name of codecs of given formats. +func FormatsToCodecs(formats []format.Format) []string { + ret := make([]string, len(formats)) + for i, forma := range formats { ret[i] = forma.Codec() } - return strings.Join(ret, "/") -} - -// MediasDescription returns the description of medias. -func MediasDescription(medias []*description.Media) []string { - ret := make([]string, len(medias)) - for i, media := range medias { - ret[i] = mediaDescription(media) - } return ret } -// MediasInfo returns the description of medias. -func MediasInfo(medias []*description.Media) string { +// FormatsInfo returns a description of formats. +func FormatsInfo(formats []format.Format) string { return fmt.Sprintf("%d %s (%s)", - len(medias), + len(formats), func() string { - if len(medias) == 1 { + if len(formats) == 1 { return "track" } return "tracks" }(), - strings.Join(MediasDescription(medias), ", ")) + strings.Join(FormatsToCodecs(formats), ", ")) +} + +// MediasToCodecs returns the name of codecs of given formats. +func MediasToCodecs(medias []*description.Media) []string { + var formats []format.Format + for _, media := range medias { + formats = append(formats, media.Formats...) + } + + return FormatsToCodecs(formats) +} + +// MediasInfo returns a description of medias. +func MediasInfo(medias []*description.Media) string { + var formats []format.Format + for _, media := range medias { + formats = append(formats, media.Formats...) + } + + return FormatsInfo(formats) } diff --git a/internal/record/format_fmp4.go b/internal/record/format_fmp4.go index d08b94cf..9fa564c5 100644 --- a/internal/record/format_fmp4.go +++ b/internal/record/format_fmp4.go @@ -19,6 +19,7 @@ import ( "github.com/bluenviron/mediacommon/pkg/codecs/vp9" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" + "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -108,8 +109,9 @@ type formatFMP4 struct { func (f *formatFMP4) initialize() { nextID := 1 + var formats []rtspformat.Format - addTrack := func(codec fmp4.Codec) *formatFMP4Track { + addTrack := func(format rtspformat.Format, codec fmp4.Codec) *formatFMP4Track { initTrack := &fmp4.InitTrack{ TimeScale: 90000, Codec: codec, @@ -117,9 +119,13 @@ func (f *formatFMP4) initialize() { initTrack.ID = nextID nextID++ - track := newFormatFMP4Track(f, initTrack) - f.tracks = append(f.tracks, track) + track := &formatFMP4Track{ + f: f, + initTrack: initTrack, + } + f.tracks = append(f.tracks, track) + formats = append(formats, format) return track } @@ -142,7 +148,7 @@ func (f *formatFMP4) initialize() { 8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64, }, } - track := addTrack(codec) + track := addTrack(forma, codec) firstReceived := false @@ -199,7 +205,7 @@ func (f *formatFMP4) initialize() { ChromaSubsampling: 1, ColorRange: false, } - track := addTrack(codec) + track := addTrack(forma, codec) firstReceived := false @@ -296,7 +302,7 @@ func (f *formatFMP4) initialize() { SPS: sps, PPS: pps, } - track := addTrack(codec) + track := addTrack(forma, codec) var dtsExtractor *h265.DTSExtractor @@ -381,7 +387,7 @@ func (f *formatFMP4) initialize() { SPS: sps, PPS: pps, } - track := addTrack(codec) + track := addTrack(forma, codec) var dtsExtractor *h264.DTSExtractor @@ -456,7 +462,7 @@ func (f *formatFMP4) initialize() { codec := &fmp4.CodecMPEG4Video{ Config: config, } - track := addTrack(codec) + track := addTrack(forma, codec) firstReceived := false var lastPTS time.Duration @@ -508,7 +514,7 @@ func (f *formatFMP4) initialize() { 0x14, 0x4a, 0x00, 0x01, 0x00, 0x00, }, } - track := addTrack(codec) + track := addTrack(forma, codec) firstReceived := false var lastPTS time.Duration @@ -557,7 +563,7 @@ func (f *formatFMP4) initialize() { Width: 800, Height: 600, } - track := addTrack(codec) + track := addTrack(forma, codec) parsed := false @@ -595,7 +601,7 @@ func (f *formatFMP4) initialize() { return 1 }(), } - track := addTrack(codec) + track := addTrack(forma, codec) f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.Opus) @@ -626,7 +632,7 @@ func (f *formatFMP4) initialize() { codec := &fmp4.CodecMPEG4Audio{ Config: *forma.GetConfig(), } - track := addTrack(codec) + track := addTrack(forma, codec) sampleRate := time.Duration(forma.ClockRate()) @@ -659,7 +665,7 @@ func (f *formatFMP4) initialize() { SampleRate: 32000, ChannelCount: 2, } - track := addTrack(codec) + track := addTrack(forma, codec) parsed := false @@ -713,7 +719,7 @@ func (f *formatFMP4) initialize() { LfeOn: true, BitRateCode: 7, } - track := addTrack(codec) + track := addTrack(forma, codec) parsed := false @@ -778,7 +784,7 @@ func (f *formatFMP4) initialize() { SampleRate: forma.SampleRate, ChannelCount: forma.ChannelCount, } - track := addTrack(codec) + track := addTrack(forma, codec) f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.G711) @@ -808,7 +814,7 @@ func (f *formatFMP4) initialize() { SampleRate: forma.SampleRate, ChannelCount: forma.ChannelCount, } - track := addTrack(codec) + track := addTrack(forma, codec) f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.LPCM) @@ -827,14 +833,8 @@ func (f *formatFMP4) initialize() { } } - f.a.agent.Log(logger.Info, "recording %d %s", - len(f.tracks), - func() string { - if len(f.tracks) == 1 { - return "track" - } - return "tracks" - }()) + f.a.agent.Log(logger.Info, "recording %s", + defs.FormatsInfo(formats)) } func (f *formatFMP4) close() { diff --git a/internal/record/format_fmp4_track.go b/internal/record/format_fmp4_track.go index e49b891d..5c0c0759 100644 --- a/internal/record/format_fmp4_track.go +++ b/internal/record/format_fmp4_track.go @@ -11,16 +11,6 @@ type formatFMP4Track struct { nextSample *sample } -func newFormatFMP4Track( - f *formatFMP4, - initTrack *fmp4.InitTrack, -) *formatFMP4Track { - return &formatFMP4Track{ - f: f, - initTrack: initTrack, - } -} - func (t *formatFMP4Track) record(sample *sample) error { // wait the first video sample before setting hasVideo if t.initTrack.Codec.IsVideo() { diff --git a/internal/record/format_mpegts.go b/internal/record/format_mpegts.go index 06f19802..c9154f21 100644 --- a/internal/record/format_mpegts.go +++ b/internal/record/format_mpegts.go @@ -14,6 +14,7 @@ import ( "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/unit" ) @@ -50,12 +51,15 @@ type formatMPEGTS struct { func (f *formatMPEGTS) initialize() { var tracks []*mpegts.Track + var formats []rtspformat.Format - addTrack := func(codec mpegts.Codec) *mpegts.Track { + addTrack := func(format rtspformat.Format, codec mpegts.Codec) *mpegts.Track { track := &mpegts.Track{ Codec: codec, } + tracks = append(tracks, track) + formats = append(formats, format) return track } @@ -63,7 +67,7 @@ func (f *formatMPEGTS) initialize() { for _, forma := range media.Formats { switch forma := forma.(type) { case *rtspformat.H265: - track := addTrack(&mpegts.CodecH265{}) + track := addTrack(forma, &mpegts.CodecH265{}) var dtsExtractor *h265.DTSExtractor @@ -91,7 +95,7 @@ func (f *formatMPEGTS) initialize() { }) case *rtspformat.H264: - track := addTrack(&mpegts.CodecH264{}) + track := addTrack(forma, &mpegts.CodecH264{}) var dtsExtractor *h264.DTSExtractor @@ -119,7 +123,7 @@ func (f *formatMPEGTS) initialize() { }) case *rtspformat.MPEG4Video: - track := addTrack(&mpegts.CodecMPEG4Video{}) + track := addTrack(forma, &mpegts.CodecMPEG4Video{}) firstReceived := false var lastPTS time.Duration @@ -149,7 +153,7 @@ func (f *formatMPEGTS) initialize() { }) case *rtspformat.MPEG1Video: - track := addTrack(&mpegts.CodecMPEG1Video{}) + track := addTrack(forma, &mpegts.CodecMPEG1Video{}) firstReceived := false var lastPTS time.Duration @@ -179,7 +183,7 @@ func (f *formatMPEGTS) initialize() { }) case *rtspformat.Opus: - track := addTrack(&mpegts.CodecOpus{ + track := addTrack(forma, &mpegts.CodecOpus{ ChannelCount: func() int { if forma.IsStereo { return 2 @@ -203,7 +207,7 @@ func (f *formatMPEGTS) initialize() { }) case *rtspformat.MPEG4Audio: - track := addTrack(&mpegts.CodecMPEG4Audio{ + track := addTrack(forma, &mpegts.CodecMPEG4Audio{ Config: *forma.GetConfig(), }) @@ -222,7 +226,7 @@ func (f *formatMPEGTS) initialize() { }) case *rtspformat.MPEG1Audio: - track := addTrack(&mpegts.CodecMPEG1Audio{}) + track := addTrack(forma, &mpegts.CodecMPEG1Audio{}) f.a.agent.Stream.AddReader(f.a.writer, media, forma, func(u unit.Unit) error { tunit := u.(*unit.MPEG1Audio) @@ -239,7 +243,7 @@ func (f *formatMPEGTS) initialize() { }) case *rtspformat.AC3: - track := addTrack(&mpegts.CodecAC3{}) + track := addTrack(forma, &mpegts.CodecAC3{}) sampleRate := time.Duration(forma.SampleRate) @@ -269,14 +273,8 @@ func (f *formatMPEGTS) initialize() { f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize) f.mw = mpegts.NewWriter(f.bw, tracks) - f.a.agent.Log(logger.Info, "recording %d %s", - len(tracks), - func() string { - if len(tracks) == 1 { - return "track" - } - return "tracks" - }()) + f.a.agent.Log(logger.Info, "recording %s", + defs.FormatsInfo(formats)) } func (f *formatMPEGTS) close() { diff --git a/internal/servers/hls/muxer.go b/internal/servers/hls/muxer.go index 522035e2..2f7ec443 100644 --- a/internal/servers/hls/muxer.go +++ b/internal/servers/hls/muxer.go @@ -13,7 +13,6 @@ import ( "github.com/bluenviron/gohlslib" "github.com/bluenviron/gohlslib/pkg/codecs" - "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/gin-gonic/gin" @@ -232,19 +231,10 @@ func (m *muxer) runInner(innerCtx context.Context, innerReady chan struct{}) err defer res.Stream.RemoveReader(m.writer) - var medias []*description.Media + videoTrack := m.createVideoTrack(res.Stream) + audioTrack := m.createAudioTrack(res.Stream) - videoMedia, videoTrack := m.createVideoTrack(res.Stream) - if videoMedia != nil { - medias = append(medias, videoMedia) - } - - audioMedia, audioTrack := m.createAudioTrack(res.Stream) - if audioMedia != nil { - medias = append(medias, audioMedia) - } - - if medias == nil { + if videoTrack == nil && audioTrack == nil { return fmt.Errorf( "the stream doesn't contain any supported codec, which are currently H265, H264, Opus, MPEG-4 Audio") } @@ -276,7 +266,7 @@ func (m *muxer) runInner(innerCtx context.Context, innerReady chan struct{}) err innerReady <- struct{}{} m.Log(logger.Info, "is converting into HLS, %s", - defs.MediasInfo(medias)) + defs.FormatsInfo(res.Stream.FormatsForReader(m.writer))) m.writer.Start() @@ -304,7 +294,7 @@ func (m *muxer) runInner(innerCtx context.Context, innerReady chan struct{}) err } } -func (m *muxer) createVideoTrack(stream *stream.Stream) (*description.Media, *gohlslib.Track) { +func (m *muxer) createVideoTrack(stream *stream.Stream) *gohlslib.Track { var videoFormatAV1 *format.AV1 videoMedia := stream.Desc().FindFormat(&videoFormatAV1) @@ -324,7 +314,7 @@ func (m *muxer) createVideoTrack(stream *stream.Stream) (*description.Media, *go return nil }) - return videoMedia, &gohlslib.Track{ + return &gohlslib.Track{ Codec: &codecs.AV1{}, } } @@ -348,7 +338,7 @@ func (m *muxer) createVideoTrack(stream *stream.Stream) (*description.Media, *go return nil }) - return videoMedia, &gohlslib.Track{ + return &gohlslib.Track{ Codec: &codecs.VP9{}, } } @@ -374,7 +364,7 @@ func (m *muxer) createVideoTrack(stream *stream.Stream) (*description.Media, *go vps, sps, pps := videoFormatH265.SafeParams() - return videoMedia, &gohlslib.Track{ + return &gohlslib.Track{ Codec: &codecs.H265{ VPS: vps, SPS: sps, @@ -404,7 +394,7 @@ func (m *muxer) createVideoTrack(stream *stream.Stream) (*description.Media, *go sps, pps := videoFormatH264.SafeParams() - return videoMedia, &gohlslib.Track{ + return &gohlslib.Track{ Codec: &codecs.H264{ SPS: sps, PPS: pps, @@ -412,10 +402,10 @@ func (m *muxer) createVideoTrack(stream *stream.Stream) (*description.Media, *go } } - return nil, nil + return nil } -func (m *muxer) createAudioTrack(stream *stream.Stream) (*description.Media, *gohlslib.Track) { +func (m *muxer) createAudioTrack(stream *stream.Stream) *gohlslib.Track { var audioFormatOpus *format.Opus audioMedia := stream.Desc().FindFormat(&audioFormatOpus) @@ -434,7 +424,7 @@ func (m *muxer) createAudioTrack(stream *stream.Stream) (*description.Media, *go return nil }) - return audioMedia, &gohlslib.Track{ + return &gohlslib.Track{ Codec: &codecs.Opus{ ChannelCount: func() int { if audioFormatOpus.IsStereo { @@ -468,14 +458,14 @@ func (m *muxer) createAudioTrack(stream *stream.Stream) (*description.Media, *go return nil }) - return audioMedia, &gohlslib.Track{ + return &gohlslib.Track{ Codec: &codecs.MPEG4Audio{ Config: *audioFormatMPEG4Audio.GetConfig(), }, } } - return nil, nil + return nil } func (m *muxer) handleRequest(ctx *gin.Context) { diff --git a/internal/servers/rtmp/conn.go b/internal/servers/rtmp/conn.go index ca8aafcf..bf36e2d2 100644 --- a/internal/servers/rtmp/conn.go +++ b/internal/servers/rtmp/conn.go @@ -218,7 +218,7 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error { } c.Log(logger.Info, "is reading from path '%s', %s", - res.Path.Name(), defs.MediasInfo(res.Stream.MediasForReader(writer))) + res.Path.Name(), defs.FormatsInfo(res.Stream.FormatsForReader(writer))) onUnreadHook := hooks.OnRead(hooks.OnReadParams{ Logger: c, diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index 67bce4cb..e78d616a 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -337,7 +337,7 @@ func (c *conn) runRead(req srtNewConnReq, pathName string, user string, pass str } c.Log(logger.Info, "is reading from path '%s', %s", - res.Path.Name(), defs.MediasInfo(res.Stream.MediasForReader(writer))) + res.Path.Name(), defs.FormatsInfo(res.Stream.FormatsForReader(writer))) onUnreadHook := hooks.OnRead(hooks.OnReadParams{ Logger: c, diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 8b480942..33e7c3d2 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -597,7 +597,7 @@ func (s *session) runRead() (int, error) { } s.Log(logger.Info, "is reading from path '%s', %s", - res.Path.Name(), defs.MediasInfo(res.Stream.MediasForReader(writer))) + res.Path.Name(), defs.FormatsInfo(res.Stream.FormatsForReader(writer))) onUnreadHook := hooks.OnRead(hooks.OnReadParams{ Logger: s, diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 27429039..a95f1ae9 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -136,23 +136,22 @@ func (s *Stream) RemoveReader(r *asyncwriter.Writer) { } } -// MediasForReader returns all medias that a reader is reading. -func (s *Stream) MediasForReader(r *asyncwriter.Writer) []*description.Media { +// FormatsForReader returns all formats that a reader is reading. +func (s *Stream) FormatsForReader(r *asyncwriter.Writer) []format.Format { s.mutex.Lock() defer s.mutex.Unlock() - var medias []*description.Media + var formats []format.Format - for media, sm := range s.smedias { - for _, sf := range sm.formats { + for _, sm := range s.smedias { + for forma, sf := range sm.formats { if _, ok := sf.readers[r]; ok { - medias = append(medias, media) - break + formats = append(formats, forma) } } } - return medias + return formats } // WriteUnit writes a Unit.