limit logging of decode errors (#2253)

This commit is contained in:
Alessandro Ros 2023-08-26 23:34:39 +02:00 committed by GitHub
parent 34dc84de90
commit 30b7245bb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 111 additions and 100 deletions

View File

@ -2,7 +2,6 @@ package core
import (
"fmt"
"sync"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
@ -15,11 +14,8 @@ const (
)
type asyncWriter struct {
parent logger.Writer
buffer *ringbuffer.RingBuffer
prevWarnPrinted time.Time
prevWarnPrintedMutex sync.Mutex
writeErrLogger logger.Writer
buffer *ringbuffer.RingBuffer
// out
err chan error
@ -32,9 +28,9 @@ func newAsyncWriter(
buffer, _ := ringbuffer.New(uint64(queueSize))
return &asyncWriter{
parent: parent,
buffer: buffer,
err: make(chan error),
writeErrLogger: newLimitedLogger(parent),
buffer: buffer,
err: make(chan error),
}
}
@ -72,12 +68,6 @@ func (w *asyncWriter) runInner() error {
func (w *asyncWriter) push(cb func() error) {
ok := w.buffer.Push(cb)
if !ok {
now := time.Now()
w.prevWarnPrintedMutex.Lock()
if now.Sub(w.prevWarnPrinted) >= minIntervalBetweenWarnings {
w.prevWarnPrinted = now
w.parent.Log(logger.Warn, "write queue is full")
}
w.prevWarnPrintedMutex.Unlock()
w.writeErrLogger.Log(logger.Warn, "write queue is full")
}
}

View File

@ -48,6 +48,8 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
}()
decodeErrLogger := newLimitedLogger(s)
var c *gohlslib.Client
c = &gohlslib.Client{
URI: cnf.Source,
@ -66,7 +68,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
s.Log(logger.Debug, "downloading segment %v", u)
},
OnDecodeError: func(err error) {
s.Log(logger.Warn, err.Error())
decodeErrLogger.Log(logger.Warn, err.Error())
},
OnTracks: func(tracks []*gohlslib.Track) error {
var medias []*description.Media

View File

@ -0,0 +1,30 @@
package core
import (
"sync"
"time"
"github.com/bluenviron/mediamtx/internal/logger"
)
type limitedLogger struct {
w logger.Writer
mutex sync.Mutex
lastPrinted time.Time
}
func newLimitedLogger(w logger.Writer) *limitedLogger {
return &limitedLogger{
w: w,
}
}
func (l *limitedLogger) Log(level logger.Level, format string, args ...interface{}) {
now := time.Now()
l.mutex.Lock()
if now.Sub(l.lastPrinted) >= minIntervalBetweenWarnings {
l.lastPrinted = now
l.w.Log(level, format, args...)
}
l.mutex.Unlock()
}

View File

@ -595,7 +595,7 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
desc,
allocateEncoder,
pa.bytesReceived,
pa.source,
newLimitedLogger(pa.source),
)
if err != nil {
return err

View File

@ -40,17 +40,17 @@ type rtspSession struct {
pathManager rtspSessionPathManager
parent rtspSessionParent
uuid uuid.UUID
created time.Time
path *path
stream *stream.Stream
onReadCmd *externalcmd.Cmd // read
mutex sync.Mutex
state gortsplib.ServerSessionState
transport *gortsplib.Transport
pathName string
prevWarnPrinted time.Time
prevWarnPrintedMutex sync.Mutex
uuid uuid.UUID
created time.Time
path *path
stream *stream.Stream
onReadCmd *externalcmd.Cmd // read
mutex sync.Mutex
state gortsplib.ServerSessionState
transport *gortsplib.Transport
pathName string
decodeErrLogger logger.Writer
writeErrLogger logger.Writer
}
func newRTSPSession(
@ -74,6 +74,9 @@ func newRTSPSession(
created: time.Now(),
}
s.decodeErrLogger = newLimitedLogger(s)
s.writeErrLogger = newLimitedLogger(s)
s.Log(logger.Info, "created by %v", s.author.NetConn().RemoteAddr())
return s
@ -402,23 +405,17 @@ func (s *rtspSession) apiSourceDescribe() pathAPISourceOrReader {
// onPacketLost is called by rtspServer.
func (s *rtspSession) onPacketLost(ctx *gortsplib.ServerHandlerOnPacketLostCtx) {
s.Log(logger.Warn, ctx.Error.Error())
s.decodeErrLogger.Log(logger.Warn, ctx.Error.Error())
}
// onDecodeError is called by rtspServer.
func (s *rtspSession) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) {
s.Log(logger.Warn, ctx.Error.Error())
s.decodeErrLogger.Log(logger.Warn, ctx.Error.Error())
}
// onStreamWriteError is called by rtspServer.
func (s *rtspSession) onStreamWriteError(ctx *gortsplib.ServerHandlerOnStreamWriteErrorCtx) {
now := time.Now()
s.prevWarnPrintedMutex.Lock()
if now.Sub(s.prevWarnPrinted) >= minIntervalBetweenWarnings {
s.prevWarnPrinted = now
s.Log(logger.Warn, ctx.Error.Error())
}
s.prevWarnPrintedMutex.Unlock()
s.writeErrLogger.Log(logger.Warn, ctx.Error.Error())
}
func (s *rtspSession) apiItem() *apiRTSPSession {

View File

@ -94,6 +94,8 @@ func (s *rtspSource) Log(level logger.Level, format string, args ...interface{})
func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error {
s.Log(logger.Debug, "connecting")
decodeErrLogger := newLimitedLogger(s)
c := &gortsplib.Client{
Transport: cnf.SourceProtocol.Transport,
TLSConfig: tlsConfigForFingerprint(cnf.SourceFingerprint),
@ -111,10 +113,10 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
s.Log(logger.Warn, err.Error())
},
OnPacketLost: func(err error) {
s.Log(logger.Warn, err.Error())
decodeErrLogger.Log(logger.Warn, err.Error())
},
OnDecodeError: func(err error) {
s.Log(logger.Warn, err.Error())
decodeErrLogger.Log(logger.Warn, err.Error())
},
}

View File

@ -234,6 +234,12 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error {
return err
}
decodeErrLogger := newLimitedLogger(c)
r.OnDecodeError(func(err error) {
decodeErrLogger.Log(logger.Warn, err.Error())
})
var medias []*description.Media //nolint:prealloc
var stream *stream.Stream

View File

@ -91,6 +91,12 @@ func (s *srtSource) runReader(sconn srt.Conn) error {
return err
}
decodeErrLogger := newLimitedLogger(s)
r.OnDecodeError(func(err error) {
decodeErrLogger.Log(logger.Warn, err.Error())
})
var medias []*description.Media //nolint:prealloc
var stream *stream.Stream

View File

@ -140,6 +140,12 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
return err
}
decodeErrLogger := newLimitedLogger(s)
r.OnDecodeError(func(err error) {
decodeErrLogger.Log(logger.Warn, err.Error())
})
var medias []*description.Media //nolint:prealloc
var stream *stream.Stream

View File

@ -1,4 +1,4 @@
package formatprocessor
package formatprocessor //nolint:dupl
import (
"fmt"
@ -8,14 +8,12 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
type formatProcessorAV1 struct {
udpMaxPayloadSize int
format *format.AV1
log logger.Writer
encoder *rtpav1.Encoder
decoder *rtpav1.Decoder
@ -25,12 +23,10 @@ func newAV1(
udpMaxPayloadSize int,
forma *format.AV1,
generateRTPPackets bool,
log logger.Writer,
) (*formatProcessorAV1, error) {
t := &formatProcessorAV1{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
log: log,
}
if generateRTPPackets {

View File

@ -7,7 +7,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -19,7 +18,6 @@ func newGeneric(
udpMaxPayloadSize int,
forma format.Format,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorGeneric, error) {
if generateRTPPackets {
return nil, fmt.Errorf("we don't know how to generate RTP packets of format %+v", forma)

View File

@ -18,7 +18,7 @@ func TestGenericRemovePadding(t *testing.T) {
err := forma.Init()
require.NoError(t, err)
p, err := New(1472, forma, false, nil)
p, err := New(1472, forma, false)
require.NoError(t, err)
pkt := &rtp.Packet{

View File

@ -9,7 +9,6 @@ import (
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -73,7 +72,6 @@ func rtpH264ExtractSPSPPS(pkt *rtp.Packet) ([]byte, []byte) {
type formatProcessorH264 struct {
udpMaxPayloadSize int
format *format.H264
log logger.Writer
encoder *rtph264.Encoder
decoder *rtph264.Decoder
@ -83,12 +81,10 @@ func newH264(
udpMaxPayloadSize int,
forma *format.H264,
generateRTPPackets bool,
log logger.Writer,
) (*formatProcessorH264, error) {
t := &formatProcessorH264{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
log: log,
}
if generateRTPPackets {

View File

@ -18,7 +18,7 @@ func TestH264DynamicParams(t *testing.T) {
PacketizationMode: 1,
}
p, err := New(1472, forma, false, nil)
p, err := New(1472, forma, false)
require.NoError(t, err)
enc, err := forma.CreateEncoder()
@ -88,7 +88,7 @@ func TestH264OversizedPackets(t *testing.T) {
PacketizationMode: 1,
}
p, err := New(1472, forma, false, nil)
p, err := New(1472, forma, false)
require.NoError(t, err)
var out []*rtp.Packet
@ -191,7 +191,7 @@ func TestH264EmptyPacket(t *testing.T) {
PacketizationMode: 1,
}
p, err := New(1472, forma, true, nil)
p, err := New(1472, forma, true)
require.NoError(t, err)
unit := &unit.H264{

View File

@ -9,7 +9,6 @@ import (
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -80,7 +79,6 @@ func rtpH265ExtractVPSSPSPPS(pkt *rtp.Packet) ([]byte, []byte, []byte) {
type formatProcessorH265 struct {
udpMaxPayloadSize int
format *format.H265
log logger.Writer
encoder *rtph265.Encoder
decoder *rtph265.Decoder
@ -90,12 +88,10 @@ func newH265(
udpMaxPayloadSize int,
forma *format.H265,
generateRTPPackets bool,
log logger.Writer,
) (*formatProcessorH265, error) {
t := &formatProcessorH265{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
log: log,
}
if generateRTPPackets {

View File

@ -17,7 +17,7 @@ func TestH265DynamicParams(t *testing.T) {
PayloadTyp: 96,
}
p, err := New(1472, forma, false, nil)
p, err := New(1472, forma, false)
require.NoError(t, err)
enc, err := forma.CreateEncoder()
@ -99,7 +99,7 @@ func TestH265OversizedPackets(t *testing.T) {
PPS: []byte{byte(h265.NALUType_PPS_NUT) << 1, 16, 17, 18},
}
p, err := New(1472, forma, false, nil)
p, err := New(1472, forma, false)
require.NoError(t, err)
var out []*rtp.Packet
@ -189,7 +189,7 @@ func TestH265EmptyPacket(t *testing.T) {
PayloadTyp: 96,
}
p, err := New(1472, forma, true, nil)
p, err := New(1472, forma, true)
require.NoError(t, err)
unit := &unit.H265{

View File

@ -1,4 +1,4 @@
package formatprocessor
package formatprocessor //nolint:dupl
import (
"fmt"
@ -8,7 +8,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg1audio"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -23,7 +22,6 @@ func newMPEG1Audio(
udpMaxPayloadSize int,
forma *format.MPEG1Audio,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorMPEG1Audio, error) {
t := &formatProcessorMPEG1Audio{
udpMaxPayloadSize: udpMaxPayloadSize,

View File

@ -8,7 +8,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4audio"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -23,7 +22,6 @@ func newMPEG4AudioGeneric(
udpMaxPayloadSize int,
forma *format.MPEG4Audio,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorMPEG4AudioGeneric, error) {
t := &formatProcessorMPEG4AudioGeneric{
udpMaxPayloadSize: udpMaxPayloadSize,

View File

@ -8,7 +8,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4audiolatm"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -23,7 +22,6 @@ func newMPEG4AudioLATM(
udpMaxPayloadSize int,
forma *format.MPEG4AudioLATM,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorMPEG4AudioLATM, error) {
t := &formatProcessorMPEG4AudioLATM{
udpMaxPayloadSize: udpMaxPayloadSize,

View File

@ -9,7 +9,6 @@ import (
"github.com/bluenviron/mediacommon/pkg/codecs/opus"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -24,7 +23,6 @@ func newOpus(
udpMaxPayloadSize int,
forma *format.Opus,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorOpus, error) {
t := &formatProcessorOpus{
udpMaxPayloadSize: udpMaxPayloadSize,

View File

@ -7,7 +7,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -45,37 +44,36 @@ func New(
udpMaxPayloadSize int,
forma format.Format,
generateRTPPackets bool,
log logger.Writer,
) (Processor, error) {
switch forma := forma.(type) {
case *format.H264:
return newH264(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newH264(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.H265:
return newH265(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newH265(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.VP8:
return newVP8(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newVP8(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.VP9:
return newVP9(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newVP9(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.AV1:
return newAV1(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newAV1(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.MPEG1Audio:
return newMPEG1Audio(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newMPEG1Audio(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.MPEG4AudioGeneric:
return newMPEG4AudioGeneric(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newMPEG4AudioGeneric(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.MPEG4AudioLATM:
return newMPEG4AudioLATM(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newMPEG4AudioLATM(udpMaxPayloadSize, forma, generateRTPPackets)
case *format.Opus:
return newOpus(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newOpus(udpMaxPayloadSize, forma, generateRTPPackets)
default:
return newGeneric(udpMaxPayloadSize, forma, generateRTPPackets, log)
return newGeneric(udpMaxPayloadSize, forma, generateRTPPackets)
}
}

View File

@ -8,7 +8,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -23,7 +22,6 @@ func newVP8(
udpMaxPayloadSize int,
forma *format.VP8,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorVP8, error) {
t := &formatProcessorVP8{
udpMaxPayloadSize: udpMaxPayloadSize,

View File

@ -8,7 +8,6 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -23,7 +22,6 @@ func newVP9(
udpMaxPayloadSize int,
forma *format.VP9,
generateRTPPackets bool,
_ logger.Writer,
) (*formatProcessorVP9, error) {
t := &formatProcessorVP9{
udpMaxPayloadSize: udpMaxPayloadSize,

View File

@ -32,7 +32,7 @@ func New(
desc *description.Session,
generateRTPPackets bool,
bytesReceived *uint64,
source logger.Writer,
decodeErrLogger logger.Writer,
) (*Stream, error) {
s := &Stream{
bytesReceived: bytesReceived,
@ -43,7 +43,7 @@ func New(
for _, media := range desc.Medias {
var err error
s.smedias[media], err = newStreamMedia(udpMaxPayloadSize, media, generateRTPPackets, source)
s.smedias[media], err = newStreamMedia(udpMaxPayloadSize, media, generateRTPPackets, decodeErrLogger)
if err != nil {
return nil, err
}

View File

@ -14,26 +14,26 @@ import (
)
type streamFormat struct {
source logger.Writer
proc formatprocessor.Processor
nonRTSPReaders map[interface{}]func(unit.Unit)
decodeErrLogger logger.Writer
proc formatprocessor.Processor
nonRTSPReaders map[interface{}]func(unit.Unit)
}
func newStreamFormat(
udpMaxPayloadSize int,
forma format.Format,
generateRTPPackets bool,
source logger.Writer,
decodeErrLogger logger.Writer,
) (*streamFormat, error) {
proc, err := formatprocessor.New(udpMaxPayloadSize, forma, generateRTPPackets, source)
proc, err := formatprocessor.New(udpMaxPayloadSize, forma, generateRTPPackets)
if err != nil {
return nil, err
}
sf := &streamFormat{
source: source,
proc: proc,
nonRTSPReaders: make(map[interface{}]func(unit.Unit)),
decodeErrLogger: decodeErrLogger,
proc: proc,
nonRTSPReaders: make(map[interface{}]func(unit.Unit)),
}
return sf, nil
@ -52,7 +52,7 @@ func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, data unit.
err := sf.proc.Process(data, hasNonRTSPReaders)
if err != nil {
sf.source.Log(logger.Warn, err.Error())
sf.decodeErrLogger.Log(logger.Warn, err.Error())
return
}

View File

@ -14,7 +14,7 @@ type streamMedia struct {
func newStreamMedia(udpMaxPayloadSize int,
medi *description.Media,
generateRTPPackets bool,
source logger.Writer,
decodeErrLogger logger.Writer,
) (*streamMedia, error) {
sm := &streamMedia{
formats: make(map[format.Format]*streamFormat),
@ -22,7 +22,7 @@ func newStreamMedia(udpMaxPayloadSize int,
for _, forma := range medi.Formats {
var err error
sm.formats[forma], err = newStreamFormat(udpMaxPayloadSize, forma, generateRTPPackets, source)
sm.formats[forma], err = newStreamFormat(udpMaxPayloadSize, forma, generateRTPPackets, decodeErrLogger)
if err != nil {
return nil, err
}