add read timeout to RTMP sources

This commit is contained in:
aler9 2021-01-31 16:24:58 +01:00
parent da9d6df706
commit f03ff73ef3
2 changed files with 17 additions and 9 deletions

View File

@ -432,6 +432,7 @@ func (pa *Path) startExternalSource() {
} else if strings.HasPrefix(pa.conf.Source, "rtmp://") {
pa.source = sourcertmp.New(
pa.conf.Source,
pa.readTimeout,
&pa.sourceWg,
pa.stats,
pa)

View File

@ -37,10 +37,11 @@ type Parent interface {
// Source is a RTMP source.
type Source struct {
ur string
wg *sync.WaitGroup
stats *stats.Stats
parent Parent
ur string
readTimeout time.Duration
wg *sync.WaitGroup
stats *stats.Stats
parent Parent
// in
terminate chan struct{}
@ -48,15 +49,17 @@ type Source struct {
// New allocates a Source.
func New(ur string,
readTimeout time.Duration,
wg *sync.WaitGroup,
stats *stats.Stats,
parent Parent) *Source {
s := &Source{
ur: ur,
wg: wg,
stats: stats,
parent: parent,
terminate: make(chan struct{}),
ur: ur,
readTimeout: readTimeout,
wg: wg,
stats: stats,
parent: parent,
terminate: make(chan struct{}),
}
atomic.AddInt64(s.stats.CountSourcesRtmp, +1)
@ -170,6 +173,9 @@ func (s *Source) runInner() bool {
var audioRTCPSender *rtcpsender.RTCPSender
var aacEncoder *rtpaac.Encoder
// configuration must be completed within readTimeout
nconn.SetReadDeadline(time.Now().Add(s.readTimeout))
confDone := make(chan error)
go func() {
confDone <- func() error {
@ -331,6 +337,7 @@ func (s *Source) runInner() bool {
go func() {
readerDone <- func() error {
for {
nconn.SetReadDeadline(time.Now().Add(s.readTimeout))
pkt, err := conn.ReadPacket()
if err != nil {
return err