From 20e478b8bd519d52399a738456aa16918d17dc71 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 1 Nov 2020 16:51:12 +0100 Subject: [PATCH] new parameters: sourceOnDemandStartTimeout, sourceOnDemandCloseAfter, runOnDemandStartTimeout, runOnDemandCloseAfter (#62) --- README.md | 2 +- conf/pathconf.go | 75 +++--- externalcmd/externalcmd.go | 4 +- main_test.go | 40 +++- path/path.go | 479 ++++++++++++++++++++++--------------- rtsp-simple-server.yml | 32 ++- sourcertmp/source.go | 103 ++------ sourcertsp/source.go | 98 ++------ 8 files changed, 431 insertions(+), 402 deletions(-) diff --git a/README.md b/README.md index a01db6bf..ec10ea74 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ Parameters in maps can be overridden by using underscores, in the following way: RTSP_PATHS_TEST_SOURCE=rtsp://myurl ./rtsp-simple-server ``` -The configuration can be changed dinamically when the server is running (hot reloading) by editing the configuration file: changes are detected and applied without disconnecting existing clients, if possible. +The configuration can be changed dinamically when the server is running (hot reloading) by editing the configuration file, Changes are detected and applied without disconnecting existing clients, whenever is possible. ### RTSP proxy mode diff --git a/conf/pathconf.go b/conf/pathconf.go index b35175b0..3bd8acca 100644 --- a/conf/pathconf.go +++ b/conf/pathconf.go @@ -6,6 +6,7 @@ import ( "net/url" "regexp" "strings" + "time" "github.com/aler9/gortsplib" ) @@ -15,28 +16,32 @@ var reUserPass = regexp.MustCompile("^[a-zA-Z0-9!\\$\\(\\)\\*\\+\\.;<=>\\[\\]\\^ const userPassSupportedChars = "A-Z,0-9,!,$,(,),*,+,.,;,<,=,>,[,],^,_,-,{,}" type PathConf struct { - Regexp *regexp.Regexp `yaml:"-" json:"-"` - Source string `yaml:"source"` - SourceProtocol string `yaml:"sourceProtocol"` - SourceProtocolParsed gortsplib.StreamProtocol `yaml:"-" json:"-"` - SourceOnDemand bool `yaml:"sourceOnDemand"` - SourceRedirect string `yaml:"sourceRedirect"` - RunOnInit string `yaml:"runOnInit"` - RunOnInitRestart bool `yaml:"runOnInitRestart"` - RunOnDemand string `yaml:"runOnDemand"` - RunOnDemandRestart bool `yaml:"runOnDemandRestart"` - RunOnPublish string `yaml:"runOnPublish"` - RunOnPublishRestart bool `yaml:"runOnPublishRestart"` - RunOnRead string `yaml:"runOnRead"` - RunOnReadRestart bool `yaml:"runOnReadRestart"` - PublishUser string `yaml:"publishUser"` - PublishPass string `yaml:"publishPass"` - PublishIps []string `yaml:"publishIps"` - PublishIpsParsed []interface{} `yaml:"-" json:"-"` - ReadUser string `yaml:"readUser"` - ReadPass string `yaml:"readPass"` - ReadIps []string `yaml:"readIps"` - ReadIpsParsed []interface{} `yaml:"-" json:"-"` + Regexp *regexp.Regexp `yaml:"-" json:"-"` + Source string `yaml:"source"` + SourceProtocol string `yaml:"sourceProtocol"` + SourceProtocolParsed gortsplib.StreamProtocol `yaml:"-" json:"-"` + SourceOnDemand bool `yaml:"sourceOnDemand"` + SourceOnDemandStartTimeout time.Duration `yaml:"sourceOnDemandStartTimeout"` + SourceOnDemandCloseAfter time.Duration `yaml:"sourceOnDemandCloseAfter"` + SourceRedirect string `yaml:"sourceRedirect"` + RunOnInit string `yaml:"runOnInit"` + RunOnInitRestart bool `yaml:"runOnInitRestart"` + RunOnDemand string `yaml:"runOnDemand"` + RunOnDemandRestart bool `yaml:"runOnDemandRestart"` + RunOnDemandStartTimeout time.Duration `yaml:"runOnDemandStartTimeout"` + RunOnDemandCloseAfter time.Duration `yaml:"runOnDemandCloseAfter"` + RunOnPublish string `yaml:"runOnPublish"` + RunOnPublishRestart bool `yaml:"runOnPublishRestart"` + RunOnRead string `yaml:"runOnRead"` + RunOnReadRestart bool `yaml:"runOnReadRestart"` + PublishUser string `yaml:"publishUser"` + PublishPass string `yaml:"publishPass"` + PublishIps []string `yaml:"publishIps"` + PublishIpsParsed []interface{} `yaml:"-" json:"-"` + ReadUser string `yaml:"readUser"` + ReadPass string `yaml:"readPass"` + ReadIps []string `yaml:"readIps"` + ReadIpsParsed []interface{} `yaml:"-" json:"-"` } func (pconf *PathConf) fillAndCheck(name string) error { @@ -64,7 +69,9 @@ func (pconf *PathConf) fillAndCheck(name string) error { pconf.Source = "record" } - if strings.HasPrefix(pconf.Source, "rtsp://") { + if pconf.Source == "record" { + + } else if strings.HasPrefix(pconf.Source, "rtsp://") { if pconf.Regexp != nil { return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTSP source; use another path") } @@ -117,8 +124,6 @@ func (pconf *PathConf) fillAndCheck(name string) error { } } - } else if pconf.Source == "record" { - } else if pconf.Source == "redirect" { if pconf.SourceRedirect == "" { return fmt.Errorf("source redirect must be filled") @@ -134,7 +139,15 @@ func (pconf *PathConf) fillAndCheck(name string) error { } } else { - return fmt.Errorf("unsupported source: '%s'", pconf.Source) + return fmt.Errorf("invalid source: '%s'", pconf.Source) + } + + if pconf.SourceOnDemandStartTimeout == 0 { + pconf.SourceOnDemandStartTimeout = 10 * time.Second + } + + if pconf.SourceOnDemandCloseAfter == 0 { + pconf.SourceOnDemandCloseAfter = 10 * time.Second } if pconf.PublishUser != "" { @@ -188,10 +201,18 @@ func (pconf *PathConf) fillAndCheck(name string) error { pconf.ReadIps = nil } - if pconf.Regexp != nil && pconf.RunOnInit != "" { + if pconf.RunOnInit != "" && pconf.Regexp != nil { return fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path") } + if pconf.RunOnDemandStartTimeout == 0 { + pconf.RunOnDemandStartTimeout = 10 * time.Second + } + + if pconf.RunOnDemandCloseAfter == 0 { + pconf.RunOnDemandCloseAfter = 10 * time.Second + } + return nil } diff --git a/externalcmd/externalcmd.go b/externalcmd/externalcmd.go index 6ab55553..558809d4 100644 --- a/externalcmd/externalcmd.go +++ b/externalcmd/externalcmd.go @@ -9,7 +9,7 @@ import ( ) const ( - restartPause = 5 * time.Second + retryPause = 5 * time.Second ) type ExternalCmd struct { @@ -57,7 +57,7 @@ func (e *ExternalCmd) run() { return false } - t := time.NewTimer(restartPause) + t := time.NewTimer(retryPause) defer t.Stop() select { diff --git a/main_test.go b/main_test.go index a4967b12..cffad20c 100644 --- a/main_test.go +++ b/main_test.go @@ -175,26 +175,38 @@ func TestEnvironment(t *testing.T) { pa, ok := p.conf.Paths["test2"] require.Equal(t, true, ok) require.Equal(t, &conf.PathConf{ - Source: "record", + Source: "record", + SourceOnDemandStartTimeout: 10 * time.Second, + SourceOnDemandCloseAfter: 10 * time.Second, + RunOnDemandStartTimeout: 10 * time.Second, + RunOnDemandCloseAfter: 10 * time.Second, }, pa) pa, ok = p.conf.Paths["~^.*$"] require.Equal(t, true, ok) require.Equal(t, &conf.PathConf{ - Regexp: regexp.MustCompile("^.*$"), - Source: "record", - SourceProtocol: "udp", - ReadUser: "testuser", - ReadPass: "testpass", + Regexp: regexp.MustCompile("^.*$"), + Source: "record", + SourceProtocol: "udp", + SourceOnDemandStartTimeout: 10 * time.Second, + SourceOnDemandCloseAfter: 10 * time.Second, + ReadUser: "testuser", + ReadPass: "testpass", + RunOnDemandStartTimeout: 10 * time.Second, + RunOnDemandCloseAfter: 10 * time.Second, }, pa) pa, ok = p.conf.Paths["cam1"] require.Equal(t, true, ok) require.Equal(t, &conf.PathConf{ - Source: "rtsp://testing", - SourceProtocol: "tcp", - SourceProtocolParsed: gortsplib.StreamProtocolTCP, - SourceOnDemand: true, + Source: "rtsp://testing", + SourceProtocol: "tcp", + SourceProtocolParsed: gortsplib.StreamProtocolTCP, + SourceOnDemand: true, + SourceOnDemandStartTimeout: 10 * time.Second, + SourceOnDemandCloseAfter: 10 * time.Second, + RunOnDemandStartTimeout: 10 * time.Second, + RunOnDemandCloseAfter: 10 * time.Second, }, pa) } @@ -209,8 +221,12 @@ func TestEnvironmentNoFile(t *testing.T) { pa, ok := p.conf.Paths["cam1"] require.Equal(t, true, ok) require.Equal(t, &conf.PathConf{ - Source: "rtsp://testing", - SourceProtocol: "udp", + Source: "rtsp://testing", + SourceProtocol: "udp", + SourceOnDemandStartTimeout: 10 * time.Second, + SourceOnDemandCloseAfter: 10 * time.Second, + RunOnDemandStartTimeout: 10 * time.Second, + RunOnDemandCloseAfter: 10 * time.Second, }, pa) } diff --git a/path/path.go b/path/path.go index 50258bfd..6c5736df 100644 --- a/path/path.go +++ b/path/path.go @@ -18,12 +18,11 @@ import ( "github.com/aler9/rtsp-simple-server/stats" ) -const ( - pathCheckPeriod = 5 * time.Second - describeTimeout = 5 * time.Second - sourceStopAfterDescribePeriod = 10 * time.Second - onDemandCmdStopAfterDescribePeriod = 10 * time.Second -) +func newEmptyTimer() *time.Timer { + t := time.NewTimer(0) + <-t.C + return t +} type Parent interface { Log(string, ...interface{}) @@ -45,9 +44,8 @@ type source interface { // * sourcertmp.Source type sourceExternal interface { IsSource() + IsSourceExternal() Close() - IsRunning() bool - SetRunning(bool) } type sourceRedirect struct{} @@ -118,6 +116,14 @@ const ( clientStatePreRemove ) +type sourceState int + +const ( + sourceStateNotReady sourceState = iota + sourceStateWaitingDescribe + sourceStateReady +) + type Path struct { readTimeout time.Duration writeTimeout time.Duration @@ -128,17 +134,23 @@ type Path struct { stats *stats.Stats parent Parent - clients map[*client.Client]clientState - clientsWg sync.WaitGroup - source source - sourceReady bool - sourceTrackCount int - sourceSdp []byte - lastDescribeReq time.Time - lastDescribeActivation time.Time - readers *readersMap - onInitCmd *externalcmd.ExternalCmd - onDemandCmd *externalcmd.ExternalCmd + clients map[*client.Client]clientState + clientsWg sync.WaitGroup + source source + sourceTrackCount int + sourceSdp []byte + readers *readersMap + onInitCmd *externalcmd.ExternalCmd + onDemandCmd *externalcmd.ExternalCmd + describeTimer *time.Timer + sourceCloseTimer *time.Timer + sourceCloseTimerStarted bool + sourceState sourceState + sourceWg sync.WaitGroup + runOnDemandCloseTimer *time.Timer + runOnDemandCloseTimerStarted bool + closeTimer *time.Timer + closeTimerStarted bool // in sourceSetReady chan struct{} // from source @@ -163,25 +175,29 @@ func New( parent Parent) *Path { pa := &Path{ - readTimeout: readTimeout, - writeTimeout: writeTimeout, - confName: confName, - conf: conf, - name: name, - wg: wg, - stats: stats, - parent: parent, - clients: make(map[*client.Client]clientState), - readers: newReadersMap(), - sourceSetReady: make(chan struct{}), - sourceSetNotReady: make(chan struct{}), - clientDescribe: make(chan ClientDescribeReq), - clientAnnounce: make(chan ClientAnnounceReq), - clientSetupPlay: make(chan ClientSetupPlayReq), - clientPlay: make(chan clientPlayReq), - clientRecord: make(chan clientRecordReq), - clientRemove: make(chan clientRemoveReq), - terminate: make(chan struct{}), + readTimeout: readTimeout, + writeTimeout: writeTimeout, + confName: confName, + conf: conf, + name: name, + wg: wg, + stats: stats, + parent: parent, + clients: make(map[*client.Client]clientState), + readers: newReadersMap(), + describeTimer: newEmptyTimer(), + sourceCloseTimer: newEmptyTimer(), + runOnDemandCloseTimer: newEmptyTimer(), + closeTimer: newEmptyTimer(), + sourceSetReady: make(chan struct{}), + sourceSetNotReady: make(chan struct{}), + clientDescribe: make(chan ClientDescribeReq), + clientAnnounce: make(chan ClientAnnounceReq), + clientSetupPlay: make(chan ClientSetupPlayReq), + clientPlay: make(chan clientPlayReq), + clientRecord: make(chan clientRecordReq), + clientRemove: make(chan clientRemoveReq), + terminate: make(chan struct{}), } pa.wg.Add(1) @@ -200,48 +216,58 @@ func (pa *Path) Log(format string, args ...interface{}) { func (pa *Path) run() { defer pa.wg.Done() - if strings.HasPrefix(pa.conf.Source, "rtsp://") { - state := !pa.conf.SourceOnDemand - if state { - pa.Log("starting source") - } - - pa.source = sourcertsp.New(pa.conf.Source, pa.conf.SourceProtocolParsed, - pa.readTimeout, pa.writeTimeout, state, pa.stats, pa) - - } else if strings.HasPrefix(pa.conf.Source, "rtmp://") { - state := !pa.conf.SourceOnDemand - if state { - pa.Log("starting source") - } - - pa.source = sourcertmp.New(pa.conf.Source, state, pa.stats, pa) - - } else if pa.conf.Source == "redirect" { + if pa.conf.Source == "redirect" { pa.source = &sourceRedirect{} + + } else if pa.hasExternalSource() && !pa.conf.SourceOnDemand { + pa.startExternalSource() } if pa.conf.RunOnInit != "" { - pa.Log("starting on init command") + pa.Log("on init command started") pa.onInitCmd = externalcmd.New(pa.conf.RunOnInit, pa.conf.RunOnInitRestart, pa.name) } - tickerCheck := time.NewTicker(pathCheckPeriod) - defer tickerCheck.Stop() - outer: for { select { - case <-tickerCheck.C: - ok := pa.onCheck() - if !ok { - pa.exhaustChannels() - pa.parent.OnPathClose(pa) - <-pa.terminate - break outer + case <-pa.describeTimer.C: + for c, state := range pa.clients { + if state == clientStateWaitingDescribe { + pa.removeClient(c) + c.OnPathDescribeData(nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)) + } } + // set state after removeClient(), so schedule* works once + pa.sourceState = sourceStateNotReady + + pa.scheduleSourceClose() + pa.scheduleRunOnDemandClose() + pa.scheduleClose() + + case <-pa.sourceCloseTimer.C: + pa.sourceCloseTimerStarted = false + pa.source.(sourceExternal).Close() + pa.source = nil + + pa.scheduleClose() + + case <-pa.runOnDemandCloseTimer.C: + pa.runOnDemandCloseTimerStarted = false + pa.Log("on demand command stopped") + pa.onDemandCmd.Close() + pa.onDemandCmd = nil + + pa.scheduleClose() + + case <-pa.closeTimer.C: + pa.exhaustChannels() + pa.parent.OnPathClose(pa) + <-pa.terminate + break outer + case <-pa.sourceSetReady: pa.onSourceSetReady() @@ -290,7 +316,7 @@ outer: } if pa.clients[req.client] != clientStatePreRemove { - pa.onClientPreRemove(req.client) + pa.removeClient(req.client) } delete(pa.clients, req.client) @@ -304,20 +330,23 @@ outer: } } + pa.describeTimer.Stop() + pa.sourceCloseTimer.Stop() + pa.runOnDemandCloseTimer.Stop() + pa.closeTimer.Stop() + if pa.onInitCmd != nil { - pa.Log("stopping on init command (closing)") + pa.Log("on init command stopped") pa.onInitCmd.Close() } if source, ok := pa.source.(sourceExternal); ok { - if source.IsRunning() { - pa.Log("stopping on demand source (closing)") - } source.Close() } + pa.sourceWg.Wait() if pa.onDemandCmd != nil { - pa.Log("stopping on demand command (closing)") + pa.Log("on demand command stopped") pa.onDemandCmd.Close() } @@ -331,7 +360,6 @@ outer: case clientStateRecord: atomic.AddInt64(pa.stats.CountPublishers, -1) } - pa.parent.OnPathClientClose(c) } } @@ -409,6 +437,21 @@ func (pa *Path) exhaustChannels() { }() } +func (pa *Path) hasExternalSource() bool { + return strings.HasPrefix(pa.conf.Source, "rtsp://") || + strings.HasPrefix(pa.conf.Source, "rtmp://") +} + +func (pa *Path) startExternalSource() { + if strings.HasPrefix(pa.conf.Source, "rtsp://") { + pa.source = sourcertsp.New(pa.conf.Source, pa.conf.SourceProtocolParsed, + pa.readTimeout, pa.writeTimeout, &pa.sourceWg, pa.stats, pa) + + } else if strings.HasPrefix(pa.conf.Source, "rtmp://") { + pa.source = sourcertmp.New(pa.conf.Source, &pa.sourceWg, pa.stats, pa) + } +} + func (pa *Path) hasClients() bool { for _, state := range pa.clients { if state != clientStatePreRemove { @@ -418,16 +461,7 @@ func (pa *Path) hasClients() bool { return false } -func (pa *Path) hasClientsWaitingDescribe() bool { - for _, state := range pa.clients { - if state == clientStateWaitingDescribe { - return true - } - } - return false -} - -func (pa *Path) hasClientReadersOrWaitingDescribe() bool { +func (pa *Path) hasClientsNotSources() bool { for c, state := range pa.clients { if state != clientStatePreRemove && c != pa.source { return true @@ -436,130 +470,149 @@ func (pa *Path) hasClientReadersOrWaitingDescribe() bool { return false } -func (pa *Path) onCheck() bool { - // reply to DESCRIBE requests if they are in timeout - if pa.hasClientsWaitingDescribe() && - time.Since(pa.lastDescribeActivation) >= describeTimeout { - for c, state := range pa.clients { - if state != clientStatePreRemove && state == clientStateWaitingDescribe { - pa.clients[c] = clientStatePreRemove - c.OnPathDescribeData(nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)) +func (pa *Path) addClient(c *client.Client, state clientState) { + if _, ok := pa.clients[c]; ok { + panic("client already added") + } + + pa.clients[c] = state + pa.clientsWg.Add(1) +} + +func (pa *Path) removeClient(c *client.Client) { + state := pa.clients[c] + pa.clients[c] = clientStatePreRemove + + switch state { + case clientStatePlay: + atomic.AddInt64(pa.stats.CountReaders, -1) + pa.readers.remove(c) + + case clientStateRecord: + atomic.AddInt64(pa.stats.CountPublishers, -1) + pa.onSourceSetNotReady() + } + + if pa.source == c { + pa.source = nil + + // close all clients that are reading or waiting to read + for oc, state := range pa.clients { + if state != clientStatePreRemove && state != clientStateWaitingDescribe { + pa.removeClient(oc) + pa.parent.OnPathClientClose(oc) } } } - // stop on demand source if needed - if source, ok := pa.source.(sourceExternal); ok { - if pa.conf.SourceOnDemand && - source.IsRunning() && - !pa.hasClients() && - time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod { - pa.Log("stopping on demand source (not requested anymore)") - source.SetRunning(false) - } - } - - // stop on demand command if needed - if pa.onDemandCmd != nil && - !pa.hasClientReadersOrWaitingDescribe() && - time.Since(pa.lastDescribeReq) >= onDemandCmdStopAfterDescribePeriod { - pa.Log("stopping on demand command (not requested anymore)") - pa.onDemandCmd.Close() - pa.onDemandCmd = nil - } - - // remove path if is regexp, has no source, has no on-demand command and has no clients - if pa.conf.Regexp != nil && - pa.source == nil && - pa.onDemandCmd == nil && - !pa.hasClients() { - return false - } - - return true + pa.scheduleSourceClose() + pa.scheduleRunOnDemandClose() + pa.scheduleClose() } func (pa *Path) onSourceSetReady() { - pa.sourceReady = true + if pa.sourceState == sourceStateWaitingDescribe { + pa.describeTimer.Stop() + pa.describeTimer = newEmptyTimer() + } + + pa.sourceState = sourceStateReady // reply to all clients that are waiting for a description for c, state := range pa.clients { if state == clientStateWaitingDescribe { - pa.clients[c] = clientStatePreRemove + pa.removeClient(c) c.OnPathDescribeData(pa.sourceSdp, "", nil) } } + + pa.scheduleSourceClose() + pa.scheduleRunOnDemandClose() + pa.scheduleClose() } func (pa *Path) onSourceSetNotReady() { - pa.sourceReady = false + pa.sourceState = sourceStateNotReady // close all clients that are reading or waiting to read for c, state := range pa.clients { - if state != clientStatePreRemove && state != clientStateWaitingDescribe && c != pa.source { - pa.onClientPreRemove(c) + if state == clientStateWaitingDescribe { + panic("not possible") + } + if c != pa.source && state != clientStatePreRemove { + pa.removeClient(c) pa.parent.OnPathClientClose(c) } } } func (pa *Path) onClientDescribe(c *client.Client) { - pa.lastDescribeReq = time.Now() + // prevent on-demand source from closing + if pa.sourceCloseTimerStarted { + pa.sourceCloseTimer = newEmptyTimer() + pa.sourceCloseTimerStarted = false + } - // source not found - if pa.source == nil { - // on demand command is available: put the client on hold - if pa.conf.RunOnDemand != "" { - if pa.onDemandCmd == nil { // start if needed - pa.Log("starting on demand command") - pa.lastDescribeActivation = time.Now() - pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, - pa.conf.RunOnDemandRestart, pa.name) + // prevent on-demand command from closing + if pa.runOnDemandCloseTimerStarted { + pa.runOnDemandCloseTimer = newEmptyTimer() + pa.runOnDemandCloseTimerStarted = false + } + + // start on-demand source + if pa.hasExternalSource() { + if pa.source == nil { + pa.startExternalSource() + + if pa.sourceState != sourceStateWaitingDescribe { + pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout) + pa.sourceState = sourceStateWaitingDescribe } - - pa.clients[c] = clientStateWaitingDescribe - pa.clientsWg.Add(1) - - // no on-demand: reply with 404 - } else { - pa.clients[c] = clientStatePreRemove - pa.clientsWg.Add(1) - - c.OnPathDescribeData(nil, "", fmt.Errorf("no one is publishing to path '%s'", pa.name)) } + } - // source found and is redirect - } else if _, ok := pa.source.(*sourceRedirect); ok { - pa.clients[c] = clientStatePreRemove - pa.clientsWg.Add(1) + // start on-demand command + if pa.conf.RunOnDemand != "" { + if pa.onDemandCmd == nil { + pa.Log("on demand command started") + pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, + pa.conf.RunOnDemandRestart, pa.name) + if pa.sourceState != sourceStateWaitingDescribe { + pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout) + pa.sourceState = sourceStateWaitingDescribe + } + } + } + + if _, ok := pa.source.(*sourceRedirect); ok { + pa.addClient(c, clientStatePreRemove) + pa.removeClient(c) c.OnPathDescribeData(nil, pa.conf.SourceRedirect, nil) + return + } - // source was found but is not ready: put the client on hold - } else if !pa.sourceReady { - // start source if needed - if source, ok := pa.source.(sourceExternal); ok { - if !source.IsRunning() { - pa.Log("starting on demand source") - pa.lastDescribeActivation = time.Now() - source.SetRunning(true) - } - } - - pa.clients[c] = clientStateWaitingDescribe - pa.clientsWg.Add(1) - - // source was found and is ready - } else { - pa.clients[c] = clientStatePreRemove - pa.clientsWg.Add(1) - + switch pa.sourceState { + case sourceStateReady: + pa.addClient(c, clientStatePreRemove) + pa.removeClient(c) c.OnPathDescribeData(pa.sourceSdp, "", nil) + return + + case sourceStateWaitingDescribe: + pa.addClient(c, clientStateWaitingDescribe) + return + + case sourceStateNotReady: + pa.addClient(c, clientStatePreRemove) + pa.removeClient(c) + c.OnPathDescribeData(nil, "", fmt.Errorf("no one is publishing to path '%s'", pa.name)) + return } } func (pa *Path) onClientSetupPlay(c *client.Client, trackId int) error { - if !pa.sourceReady { + if pa.sourceState != sourceStateReady { return fmt.Errorf("no one is publishing to path '%s'", pa.name) } @@ -568,8 +621,19 @@ func (pa *Path) onClientSetupPlay(c *client.Client, trackId int) error { } if _, ok := pa.clients[c]; !ok { - pa.clients[c] = clientStatePrePlay - pa.clientsWg.Add(1) + // prevent on-demand source from closing + if pa.sourceCloseTimerStarted { + pa.sourceCloseTimer = newEmptyTimer() + pa.sourceCloseTimerStarted = false + } + + // prevent on-demand command from closing + if pa.runOnDemandCloseTimerStarted { + pa.runOnDemandCloseTimer = newEmptyTimer() + pa.runOnDemandCloseTimerStarted = false + } + + pa.addClient(c, clientStatePrePlay) } return nil @@ -595,12 +659,11 @@ func (pa *Path) onClientAnnounce(c *client.Client, tracks gortsplib.Tracks) erro return fmt.Errorf("already subscribed") } - if pa.source != nil { + if pa.source != nil || pa.hasExternalSource() { return fmt.Errorf("someone is already publishing to path '%s'", pa.name) } - pa.clients[c] = clientStatePreRecord - pa.clientsWg.Add(1) + pa.addClient(c, clientStatePreRecord) pa.source = c pa.sourceTrackCount = len(tracks) @@ -624,40 +687,58 @@ func (pa *Path) onClientRecord(c *client.Client) { pa.onSourceSetReady() } -func (pa *Path) onClientPreRemove(c *client.Client) { - state := pa.clients[c] - pa.clients[c] = clientStatePreRemove - - switch state { - case clientStatePlay: - atomic.AddInt64(pa.stats.CountReaders, -1) - pa.readers.remove(c) - - case clientStateRecord: - atomic.AddInt64(pa.stats.CountPublishers, -1) - pa.onSourceSetNotReady() +func (pa *Path) scheduleSourceClose() { + if !pa.hasExternalSource() || !pa.conf.SourceOnDemand || pa.source == nil { + return } - if pa.source == c { - pa.source = nil - - // close all clients that are reading or waiting to read - for oc, state := range pa.clients { - if state != clientStatePreRemove && state != clientStateWaitingDescribe && oc != pa.source { - pa.onClientPreRemove(oc) - pa.parent.OnPathClientClose(oc) - } - } + if pa.sourceCloseTimerStarted || + pa.sourceState == sourceStateWaitingDescribe || + pa.hasClients() { + return } + + pa.sourceCloseTimer.Stop() + pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter) + pa.sourceCloseTimerStarted = true } -func (pa *Path) OnSourceReady(tracks gortsplib.Tracks) { +func (pa *Path) scheduleRunOnDemandClose() { + if pa.conf.RunOnDemand == "" || pa.onDemandCmd == nil { + return + } + + if pa.runOnDemandCloseTimerStarted || + pa.sourceState == sourceStateWaitingDescribe || + pa.hasClientsNotSources() { + return + } + + pa.runOnDemandCloseTimer.Stop() + pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter) + pa.runOnDemandCloseTimerStarted = true +} + +func (pa *Path) scheduleClose() { + if pa.closeTimerStarted || + pa.conf.Regexp == nil || + pa.hasClients() || + pa.source != nil { + return + } + + pa.closeTimer.Stop() + pa.closeTimer = time.NewTimer(0) + pa.closeTimerStarted = true +} + +func (pa *Path) OnSourceSetReady(tracks gortsplib.Tracks) { pa.sourceSdp = tracks.Write() pa.sourceTrackCount = len(tracks) pa.sourceSetReady <- struct{}{} } -func (pa *Path) OnSourceNotReady() { +func (pa *Path) OnSourceSetNotReady() { pa.sourceSetNotReady <- struct{}{} } diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index 97940a58..55fbfea9 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -22,14 +22,14 @@ metrics: no # enable pprof on port 9999 to monitor performances. pprof: no -# destinations of log messages; available options are "stdout", "file" and "syslog". +# destinations of log messages; available values are "stdout", "file" and "syslog". logDestinations: [stdout] -# if "file" is in logDestinations, this is the file that will receive the logs. +# if "file" is in logDestinations, this is the file which will receive the logs. logFile: rtsp-simple-server.log # command to run when a client connects to the server. # this is terminated with SIGINT when a client disconnects from the server. -# the restart option allows to restart the command if it exits suddely. +# the restart parameter allows to restart the command if it exits suddenly. runOnConnect: runOnConnectRestart: no @@ -54,6 +54,12 @@ paths: # if the source is an RTSP or RTMP url, it will be pulled only when at least # one reader is connected, saving bandwidth. sourceOnDemand: no + # if sourceOnDemand is "yes", readers will be put on hold until the source is + # ready or until this amount of time has passed. + sourceOnDemandStartTimeout: 10s + # if sourceOnDemand is "yes", the source will be closed when there are no + # readers connected and this amount of time has passed. + sourceOnDemandCloseAfter: 10s # if the source is "redirect", this is the RTSP url which clients will be # redirected to. @@ -73,32 +79,38 @@ paths: # ips or networks (x.x.x.x/24) allowed to read. readIps: [] - # command to run when this path is loaded by the program. - # this can be used, for example, to publish a stream and keep it always opened. + # command to run when this path is initialized. + # this can be used to publish a stream and keep it always opened. # this is terminated with SIGINT when the program closes. # the path name is available in the RTSP_SERVER_PATH variable. - # the restart option allows to restart the command if it exits suddely. + # the restart parameter allows to restart the command if it exits suddenly. runOnInit: runOnInitRestart: no # command to run when this path is requested. - # this can be used, for example, to publish a stream on demand. + # this can be used to publish a stream on demand. # this is terminated with SIGINT when the path is not requested anymore. # the path name is available in the RTSP_SERVER_PATH variable. - # the restart option allows to restart the command if it exits suddely. + # the restart parameter allows to restart the command if it exits suddenly. runOnDemand: runOnDemandRestart: no + # readers will be put on hold until the runOnDemand command starts publishing + # or until this amount of time has passed. + runOnDemandStartTimeout: 10s + # the runOnDemand command will be closed when there are no + # readers connected and this amount of time has passed. + runOnDemandCloseAfter: 10s # command to run when a client starts publishing. # this is terminated with SIGINT when a client stops publishing. # the path name is available in the RTSP_SERVER_PATH variable. - # the restart option allows to restart the command if it exits suddely. + # the restart parameter allows to restart the command if it exits suddenly. runOnPublish: runOnPublishRestart: no # command to run when a clients starts reading. # this is terminated with SIGINT when a client stops reading. # the path name is available in the RTSP_SERVER_PATH variable. - # the restart option allows to restart the command if it exits suddely. + # the restart parameter allows to restart the command if it exits suddenly. runOnRead: runOnReadRestart: no diff --git a/sourcertmp/source.go b/sourcertmp/source.go index 278754c5..147d5faa 100644 --- a/sourcertmp/source.go +++ b/sourcertmp/source.go @@ -3,6 +3,7 @@ package sourcertmp import ( "fmt" "net" + "sync" "sync/atomic" "time" @@ -17,126 +18,74 @@ import ( ) const ( - retryInterval = 5 * time.Second + retryPause = 5 * time.Second ) type Parent interface { Log(string, ...interface{}) - OnSourceReady(gortsplib.Tracks) - OnSourceNotReady() + OnSourceSetReady(gortsplib.Tracks) + OnSourceSetNotReady() OnFrame(int, gortsplib.StreamType, []byte) } type Source struct { ur string state bool + wg *sync.WaitGroup stats *stats.Stats parent Parent - innerState bool - // in - innerTerminate chan struct{} - innerDone chan struct{} - stateChange chan bool - terminate chan struct{} - - // out - done chan struct{} + terminate chan struct{} } func New(ur string, - state bool, + wg *sync.WaitGroup, stats *stats.Stats, parent Parent) *Source { s := &Source{ - ur: ur, - state: state, - stats: stats, - parent: parent, - stateChange: make(chan bool), - terminate: make(chan struct{}), - done: make(chan struct{}), + ur: ur, + wg: wg, + stats: stats, + parent: parent, + terminate: make(chan struct{}), } atomic.AddInt64(s.stats.CountSourcesRtmp, +1) + s.parent.Log("rtmp source started") + s.wg.Add(1) go s.run() - s.SetRunning(s.state) return s } func (s *Source) Close() { + atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1) + s.parent.Log("rtmp source stopped") close(s.terminate) - <-s.done } func (s *Source) IsSource() {} -func (s *Source) IsRunning() bool { - return s.state -} - -func (s *Source) SetRunning(state bool) { - s.state = state - s.stateChange <- s.state -} +func (s *Source) IsSourceExternal() {} func (s *Source) run() { - defer close(s.done) - -outer: - for { - select { - case state := <-s.stateChange: - if state { - if !s.innerState { - atomic.AddInt64(s.stats.CountSourcesRtmpRunning, +1) - s.innerState = true - s.innerTerminate = make(chan struct{}) - s.innerDone = make(chan struct{}) - go s.runInner() - } - } else { - if s.innerState { - atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1) - close(s.innerTerminate) - <-s.innerDone - s.innerState = false - } - } - - case <-s.terminate: - break outer - } - } - - if s.innerState { - atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1) - close(s.innerTerminate) - <-s.innerDone - } - - close(s.stateChange) -} - -func (s *Source) runInner() { - defer close(s.innerDone) + defer s.wg.Done() for { ok := func() bool { - ok := s.runInnerInner() + ok := s.runInner() if !ok { return false } - t := time.NewTimer(retryInterval) + t := time.NewTimer(retryPause) defer t.Stop() select { case <-t.C: return true - case <-s.innerTerminate: + case <-s.terminate: return false } }() @@ -146,7 +95,7 @@ func (s *Source) runInner() { } } -func (s *Source) runInnerInner() bool { +func (s *Source) runInner() bool { s.parent.Log("connecting to rtmp source") var conn *rtmp.Conn @@ -159,7 +108,7 @@ func (s *Source) runInnerInner() bool { }() select { - case <-s.innerTerminate: + case <-s.terminate: return false case <-dialDone: } @@ -271,8 +220,8 @@ func (s *Source) runInnerInner() bool { return true } - s.parent.OnSourceReady(tracks) s.parent.Log("rtmp source ready") + s.parent.OnSourceSetReady(tracks) readDone := make(chan error) go func() { @@ -336,7 +285,7 @@ func (s *Source) runInnerInner() bool { outer: for { select { - case <-s.innerTerminate: + case <-s.terminate: nconn.Close() <-readDone ret = false @@ -350,7 +299,7 @@ outer: } } - s.parent.OnSourceNotReady() + s.parent.OnSourceSetNotReady() return ret } diff --git a/sourcertsp/source.go b/sourcertsp/source.go index 7d8f5e47..2310f1cb 100644 --- a/sourcertsp/source.go +++ b/sourcertsp/source.go @@ -12,13 +12,13 @@ import ( ) const ( - retryInterval = 5 * time.Second + retryPause = 5 * time.Second ) type Parent interface { Log(string, ...interface{}) - OnSourceReady(gortsplib.Tracks) - OnSourceNotReady() + OnSourceSetReady(gortsplib.Tracks) + OnSourceSetNotReady() OnFrame(int, gortsplib.StreamType, []byte) } @@ -27,17 +27,12 @@ type Source struct { proto gortsplib.StreamProtocol readTimeout time.Duration writeTimeout time.Duration - state bool + wg *sync.WaitGroup stats *stats.Stats parent Parent - innerState bool - // in - innerTerminate chan struct{} - innerDone chan struct{} - stateChange chan bool - terminate chan struct{} + terminate chan struct{} // out done chan struct{} @@ -47,7 +42,7 @@ func New(ur string, proto gortsplib.StreamProtocol, readTimeout time.Duration, writeTimeout time.Duration, - state bool, + wg *sync.WaitGroup, stats *stats.Stats, parent Parent) *Source { s := &Source{ @@ -55,92 +50,47 @@ func New(ur string, proto: proto, readTimeout: readTimeout, writeTimeout: writeTimeout, - state: state, + wg: wg, stats: stats, parent: parent, - stateChange: make(chan bool), terminate: make(chan struct{}), - done: make(chan struct{}), } atomic.AddInt64(s.stats.CountSourcesRtsp, +1) + s.parent.Log("rtsp source started") + s.wg.Add(1) go s.run() - s.SetRunning(s.state) return s } func (s *Source) Close() { + atomic.AddInt64(s.stats.CountSourcesRtsp, -1) + s.parent.Log("rtsp source stopped") close(s.terminate) - <-s.done } func (s *Source) IsSource() {} -func (s *Source) IsRunning() bool { - return s.state -} - -func (s *Source) SetRunning(state bool) { - s.state = state - s.stateChange <- s.state -} +func (s *Source) IsSourceExternal() {} func (s *Source) run() { - defer close(s.done) - -outer: - for { - select { - case state := <-s.stateChange: - if state { - if !s.innerState { - atomic.AddInt64(s.stats.CountSourcesRtspRunning, +1) - s.innerState = true - s.innerTerminate = make(chan struct{}) - s.innerDone = make(chan struct{}) - go s.runInner() - } - } else { - if s.innerState { - atomic.AddInt64(s.stats.CountSourcesRtspRunning, -1) - close(s.innerTerminate) - <-s.innerDone - s.innerState = false - } - } - - case <-s.terminate: - break outer - } - } - - if s.innerState { - atomic.AddInt64(s.stats.CountSourcesRtspRunning, -1) - close(s.innerTerminate) - <-s.innerDone - } - - close(s.stateChange) -} - -func (s *Source) runInner() { - defer close(s.innerDone) + defer s.wg.Done() for { ok := func() bool { - ok := s.runInnerInner() + ok := s.runInner() if !ok { return false } - t := time.NewTimer(retryInterval) + t := time.NewTimer(retryPause) defer t.Stop() select { case <-t.C: return true - case <-s.innerTerminate: + case <-s.terminate: return false } }() @@ -150,7 +100,7 @@ func (s *Source) runInner() { } } -func (s *Source) runInnerInner() bool { +func (s *Source) runInner() bool { s.parent.Log("connecting to rtsp source") u, _ := url.Parse(s.ur) @@ -169,7 +119,7 @@ func (s *Source) runInnerInner() bool { }() select { - case <-s.innerTerminate: + case <-s.terminate: return false case <-dialDone: } @@ -217,8 +167,8 @@ func (s *Source) runUDP(u *url.URL, conn *gortsplib.ConnClient, tracks gortsplib return true } - s.parent.OnSourceReady(tracks) s.parent.Log("rtsp source ready") + s.parent.OnSourceSetReady(tracks) var wg sync.WaitGroup @@ -266,7 +216,7 @@ func (s *Source) runUDP(u *url.URL, conn *gortsplib.ConnClient, tracks gortsplib outer: for { select { - case <-s.innerTerminate: + case <-s.terminate: conn.Close() <-tcpConnDone ret = false @@ -282,7 +232,7 @@ outer: wg.Wait() - s.parent.OnSourceNotReady() + s.parent.OnSourceSetNotReady() return ret } @@ -304,8 +254,8 @@ func (s *Source) runTCP(u *url.URL, conn *gortsplib.ConnClient, tracks gortsplib return true } - s.parent.OnSourceReady(tracks) s.parent.Log("rtsp source ready") + s.parent.OnSourceSetReady(tracks) tcpConnDone := make(chan error) go func() { @@ -325,7 +275,7 @@ func (s *Source) runTCP(u *url.URL, conn *gortsplib.ConnClient, tracks gortsplib outer: for { select { - case <-s.innerTerminate: + case <-s.terminate: conn.Close() <-tcpConnDone ret = false @@ -339,7 +289,7 @@ outer: } } - s.parent.OnSourceNotReady() + s.parent.OnSourceSetNotReady() return ret }