package path import ( "fmt" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/aler9/gortsplib" "github.com/aler9/rtsp-simple-server/internal/clientrtsp" "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/sourcertmp" "github.com/aler9/rtsp-simple-server/internal/sourcertsp" "github.com/aler9/rtsp-simple-server/internal/stats" ) func newEmptyTimer() *time.Timer { t := time.NewTimer(0) <-t.C return t } // Parent is implemented by pathman.PathMan. type Parent interface { Log(logger.Level, string, ...interface{}) OnPathClose(*Path) OnPathClientClose(*clientrtsp.Client) } // a source can be // * clientrtsp.Client // * sourcertsp.Source // * sourcertmp.Source // * sourceRedirect type source interface { IsSource() } // a sourceExternal can be // * sourcertsp.Source // * sourcertmp.Source type sourceExternal interface { IsSource() IsSourceExternal() Close() } type sourceRedirect struct{} func (*sourceRedirect) IsSource() {} type clientState int const ( clientStatePrePlay clientState = iota clientStatePlay clientStatePreRecord clientStateRecord clientStatePreRemove ) type sourceState int const ( sourceStateNotReady sourceState = iota sourceStateWaitingDescribe sourceStateReady ) // Path is a path. type Path struct { rtspPort int readTimeout time.Duration writeTimeout time.Duration readBufferCount uint64 confName string conf *conf.PathConf name string wg *sync.WaitGroup stats *stats.Stats parent Parent clients map[*clientrtsp.Client]clientState clientsWg sync.WaitGroup describeRequests []clientrtsp.DescribeReq setupPlayRequests []clientrtsp.SetupPlayReq source source sourceTrackCount int sourceSdp []byte readers *readersMap onDemandCmd *externalcmd.Cmd describeTimer *time.Timer sourceCloseTimer *time.Timer sourceCloseTimerStarted bool sourceState sourceState sourceWg sync.WaitGroup runOnDemandCloseTimer *time.Timer runOnDemandCloseTimerStarted bool closeTimer *time.Timer closeTimerStarted bool // in sourceSetReady chan struct{} // from source sourceSetNotReady chan struct{} // from source clientDescribe chan clientrtsp.DescribeReq // from program clientAnnounce chan clientrtsp.AnnounceReq // from program clientSetupPlay chan clientrtsp.SetupPlayReq // from program clientPlay chan clientrtsp.PlayReq // from client clientRecord chan clientrtsp.RecordReq // from client clientPause chan clientrtsp.PauseReq // from client clientRemove chan clientrtsp.RemoveReq // from client terminate chan struct{} } // New allocates a Path. func New( rtspPort int, readTimeout time.Duration, writeTimeout time.Duration, readBufferCount uint64, confName string, conf *conf.PathConf, name string, wg *sync.WaitGroup, stats *stats.Stats, parent Parent) *Path { pa := &Path{ rtspPort: rtspPort, readTimeout: readTimeout, writeTimeout: writeTimeout, readBufferCount: readBufferCount, confName: confName, conf: conf, name: name, wg: wg, stats: stats, parent: parent, clients: make(map[*clientrtsp.Client]clientState), readers: newReadersMap(), describeTimer: newEmptyTimer(), sourceCloseTimer: newEmptyTimer(), runOnDemandCloseTimer: newEmptyTimer(), closeTimer: newEmptyTimer(), sourceSetReady: make(chan struct{}), sourceSetNotReady: make(chan struct{}), clientDescribe: make(chan clientrtsp.DescribeReq), clientAnnounce: make(chan clientrtsp.AnnounceReq), clientSetupPlay: make(chan clientrtsp.SetupPlayReq), clientPlay: make(chan clientrtsp.PlayReq), clientRecord: make(chan clientrtsp.RecordReq), clientPause: make(chan clientrtsp.PauseReq), clientRemove: make(chan clientrtsp.RemoveReq), terminate: make(chan struct{}), } pa.wg.Add(1) go pa.run() return pa } // Close closes a path. func (pa *Path) Close() { close(pa.terminate) } // Log is the main logging function. func (pa *Path) Log(level logger.Level, format string, args ...interface{}) { pa.parent.Log(level, "[path "+pa.name+"] "+format, args...) } func (pa *Path) run() { defer pa.wg.Done() if pa.conf.Source == "redirect" { pa.source = &sourceRedirect{} } else if pa.hasExternalSource() && !pa.conf.SourceOnDemand { pa.startExternalSource() } var onInitCmd *externalcmd.Cmd if pa.conf.RunOnInit != "" { pa.Log(logger.Info, "on init command started") onInitCmd = externalcmd.New(pa.conf.RunOnInit, pa.conf.RunOnInitRestart, externalcmd.Environment{ Path: pa.name, Port: strconv.FormatInt(int64(pa.rtspPort), 10), }) } outer: for { select { case <-pa.describeTimer.C: for _, req := range pa.describeRequests { req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet } pa.describeRequests = nil for _, req := range pa.setupPlayRequests { req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet } pa.setupPlayRequests = nil // set state after removeClient(), so schedule* works once pa.sourceState = sourceStateNotReady pa.scheduleSourceClose() pa.scheduleRunOnDemandClose() pa.scheduleClose() case <-pa.sourceCloseTimer.C: pa.sourceCloseTimerStarted = false pa.source.(sourceExternal).Close() pa.source = nil pa.scheduleClose() case <-pa.runOnDemandCloseTimer.C: pa.runOnDemandCloseTimerStarted = false pa.Log(logger.Info, "on demand command stopped") pa.onDemandCmd.Close() pa.onDemandCmd = nil pa.scheduleClose() case <-pa.closeTimer.C: pa.exhaustChannels() pa.parent.OnPathClose(pa) <-pa.terminate break outer case <-pa.sourceSetReady: pa.onSourceSetReady() case <-pa.sourceSetNotReady: pa.onSourceSetNotReady() case req := <-pa.clientDescribe: pa.onClientDescribe(req) case req := <-pa.clientSetupPlay: pa.onClientSetupPlay(req) case req := <-pa.clientPlay: pa.onClientPlay(req.Client) close(req.Res) case req := <-pa.clientAnnounce: err := pa.onClientAnnounce(req.Client, req.Tracks) if err != nil { req.Res <- clientrtsp.AnnounceRes{nil, err} //nolint:govet continue } req.Res <- clientrtsp.AnnounceRes{pa, nil} //nolint:govet case req := <-pa.clientRecord: pa.onClientRecord(req.Client) close(req.Res) case req := <-pa.clientPause: pa.onClientPause(req.Client) close(req.Res) case req := <-pa.clientRemove: if _, ok := pa.clients[req.Client]; !ok { close(req.Res) continue } if pa.clients[req.Client] != clientStatePreRemove { pa.removeClient(req.Client) } delete(pa.clients, req.Client) pa.clientsWg.Done() close(req.Res) case <-pa.terminate: pa.exhaustChannels() break outer } } pa.describeTimer.Stop() pa.sourceCloseTimer.Stop() pa.runOnDemandCloseTimer.Stop() pa.closeTimer.Stop() if onInitCmd != nil { pa.Log(logger.Info, "on init command stopped") onInitCmd.Close() } if source, ok := pa.source.(sourceExternal); ok { source.Close() } pa.sourceWg.Wait() if pa.onDemandCmd != nil { pa.Log(logger.Info, "on demand command stopped") pa.onDemandCmd.Close() } for _, req := range pa.describeRequests { req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet } for _, req := range pa.setupPlayRequests { req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet } for c, state := range pa.clients { if state != clientStatePreRemove { switch state { case clientStatePlay: atomic.AddInt64(pa.stats.CountReaders, -1) pa.readers.remove(c) case clientStateRecord: atomic.AddInt64(pa.stats.CountPublishers, -1) } pa.parent.OnPathClientClose(c) } } pa.clientsWg.Wait() close(pa.sourceSetReady) close(pa.sourceSetNotReady) close(pa.clientDescribe) close(pa.clientAnnounce) close(pa.clientSetupPlay) close(pa.clientPlay) close(pa.clientRecord) close(pa.clientPause) close(pa.clientRemove) } func (pa *Path) exhaustChannels() { go func() { for { select { case _, ok := <-pa.sourceSetReady: if !ok { return } case _, ok := <-pa.sourceSetNotReady: if !ok { return } case req, ok := <-pa.clientDescribe: if !ok { return } req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pa.clientAnnounce: if !ok { return } req.Res <- clientrtsp.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pa.clientSetupPlay: if !ok { return } req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pa.clientPlay: if !ok { return } close(req.Res) case req, ok := <-pa.clientRecord: if !ok { return } close(req.Res) case req, ok := <-pa.clientPause: if !ok { return } close(req.Res) case req, ok := <-pa.clientRemove: if !ok { return } if _, ok := pa.clients[req.Client]; !ok { close(req.Res) continue } pa.clientsWg.Done() close(req.Res) } } }() } func (pa *Path) hasExternalSource() bool { return strings.HasPrefix(pa.conf.Source, "rtsp://") || strings.HasPrefix(pa.conf.Source, "rtsps://") || strings.HasPrefix(pa.conf.Source, "rtmp://") } func (pa *Path) startExternalSource() { if strings.HasPrefix(pa.conf.Source, "rtsp://") || strings.HasPrefix(pa.conf.Source, "rtsps://") { pa.source = sourcertsp.New( pa.conf.Source, pa.conf.SourceProtocolParsed, pa.readTimeout, pa.writeTimeout, pa.readBufferCount, &pa.sourceWg, pa.stats, pa) } else if strings.HasPrefix(pa.conf.Source, "rtmp://") { pa.source = sourcertmp.New( pa.conf.Source, pa.readTimeout, &pa.sourceWg, pa.stats, pa) } } func (pa *Path) hasClients() bool { for _, state := range pa.clients { if state != clientStatePreRemove { return true } } return false } func (pa *Path) hasClientsNotSources() bool { for c, state := range pa.clients { if state != clientStatePreRemove && c != pa.source { return true } } return false } func (pa *Path) addClient(c *clientrtsp.Client, state clientState) { if _, ok := pa.clients[c]; ok { panic("client already added") } pa.clients[c] = state pa.clientsWg.Add(1) } func (pa *Path) removeClient(c *clientrtsp.Client) { state := pa.clients[c] pa.clients[c] = clientStatePreRemove switch state { case clientStatePlay: atomic.AddInt64(pa.stats.CountReaders, -1) pa.readers.remove(c) case clientStateRecord: atomic.AddInt64(pa.stats.CountPublishers, -1) pa.onSourceSetNotReady() } if pa.source == c { pa.source = nil // close all clients that are reading or waiting to read for oc, state := range pa.clients { if state != clientStatePreRemove { pa.removeClient(oc) pa.parent.OnPathClientClose(oc) } } } pa.scheduleSourceClose() pa.scheduleRunOnDemandClose() pa.scheduleClose() } func (pa *Path) onSourceSetReady() { if pa.sourceState == sourceStateWaitingDescribe { pa.describeTimer.Stop() pa.describeTimer = newEmptyTimer() } pa.sourceState = sourceStateReady for _, req := range pa.describeRequests { req.Res <- clientrtsp.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet } pa.describeRequests = nil for _, req := range pa.setupPlayRequests { pa.onClientSetupPlayPost(req) } pa.setupPlayRequests = nil pa.scheduleSourceClose() pa.scheduleRunOnDemandClose() pa.scheduleClose() } func (pa *Path) onSourceSetNotReady() { pa.sourceState = sourceStateNotReady // close all clients that are reading or waiting to read for c, state := range pa.clients { if c != pa.source && state != clientStatePreRemove { pa.removeClient(c) pa.parent.OnPathClientClose(c) } } } func (pa *Path) fixedPublisherStart() { if pa.hasExternalSource() { // start on-demand source if pa.source == nil { pa.startExternalSource() if pa.sourceState != sourceStateWaitingDescribe { pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout) pa.sourceState = sourceStateWaitingDescribe } } else { // reset timer if pa.sourceCloseTimerStarted { pa.sourceCloseTimer.Stop() pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter) } } } if pa.conf.RunOnDemand != "" { // start on-demand command if pa.onDemandCmd == nil { pa.Log(logger.Info, "on demand command started") pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{ Path: pa.name, Port: strconv.FormatInt(int64(pa.rtspPort), 10), }) if pa.sourceState != sourceStateWaitingDescribe { pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout) pa.sourceState = sourceStateWaitingDescribe } } else { // reset timer if pa.runOnDemandCloseTimerStarted { pa.runOnDemandCloseTimer.Stop() pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter) } } } } func (pa *Path) onClientDescribe(req clientrtsp.DescribeReq) { if _, ok := pa.clients[req.Client]; ok { req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet return } pa.fixedPublisherStart() pa.scheduleClose() if _, ok := pa.source.(*sourceRedirect); ok { req.Res <- clientrtsp.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet return } switch pa.sourceState { case sourceStateReady: req.Res <- clientrtsp.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet return case sourceStateWaitingDescribe: pa.describeRequests = append(pa.describeRequests, req) return case sourceStateNotReady: if pa.conf.Fallback != "" { req.Res <- clientrtsp.DescribeRes{nil, pa.conf.Fallback, nil} //nolint:govet return } req.Res <- clientrtsp.DescribeRes{nil, "", clientrtsp.ErrNoOnePublishing{pa.name}} //nolint:govet return } } func (pa *Path) onClientSetupPlayPost(req clientrtsp.SetupPlayReq) { if req.TrackID >= pa.sourceTrackCount { req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("track %d does not exist", req.TrackID)} //nolint:govet return } if _, ok := pa.clients[req.Client]; !ok { // prevent on-demand source from closing if pa.sourceCloseTimerStarted { pa.sourceCloseTimer = newEmptyTimer() pa.sourceCloseTimerStarted = false } // prevent on-demand command from closing if pa.runOnDemandCloseTimerStarted { pa.runOnDemandCloseTimer = newEmptyTimer() pa.runOnDemandCloseTimerStarted = false } pa.addClient(req.Client, clientStatePrePlay) } req.Res <- clientrtsp.SetupPlayRes{pa, nil} //nolint:govet } func (pa *Path) onClientSetupPlay(req clientrtsp.SetupPlayReq) { pa.fixedPublisherStart() pa.scheduleClose() switch pa.sourceState { case sourceStateReady: pa.onClientSetupPlayPost(req) return case sourceStateWaitingDescribe: pa.setupPlayRequests = append(pa.setupPlayRequests, req) return case sourceStateNotReady: req.Res <- clientrtsp.SetupPlayRes{nil, clientrtsp.ErrNoOnePublishing{pa.name}} //nolint:govet return } } func (pa *Path) onClientPlay(c *clientrtsp.Client) { state, ok := pa.clients[c] if !ok { return } if state != clientStatePrePlay { return } atomic.AddInt64(pa.stats.CountReaders, 1) pa.clients[c] = clientStatePlay pa.readers.add(c) } func (pa *Path) onClientAnnounce(c *clientrtsp.Client, tracks gortsplib.Tracks) error { if _, ok := pa.clients[c]; ok { return fmt.Errorf("already subscribed") } if pa.source != nil || pa.hasExternalSource() { return fmt.Errorf("someone is already publishing to path '%s'", pa.name) } pa.addClient(c, clientStatePreRecord) pa.source = c pa.sourceTrackCount = len(tracks) pa.sourceSdp = tracks.Write() return nil } func (pa *Path) onClientRecord(c *clientrtsp.Client) { state, ok := pa.clients[c] if !ok { return } if state != clientStatePreRecord { return } atomic.AddInt64(pa.stats.CountPublishers, 1) pa.clients[c] = clientStateRecord pa.onSourceSetReady() } func (pa *Path) onClientPause(c *clientrtsp.Client) { state, ok := pa.clients[c] if !ok { return } if state == clientStatePlay { atomic.AddInt64(pa.stats.CountReaders, -1) pa.clients[c] = clientStatePrePlay pa.readers.remove(c) } else if state == clientStateRecord { atomic.AddInt64(pa.stats.CountPublishers, -1) pa.clients[c] = clientStatePreRecord pa.onSourceSetNotReady() } } func (pa *Path) scheduleSourceClose() { if !pa.hasExternalSource() || !pa.conf.SourceOnDemand || pa.source == nil { return } if pa.sourceCloseTimerStarted || pa.sourceState == sourceStateWaitingDescribe || pa.hasClients() { return } pa.sourceCloseTimer.Stop() pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter) pa.sourceCloseTimerStarted = true } func (pa *Path) scheduleRunOnDemandClose() { if pa.conf.RunOnDemand == "" || pa.onDemandCmd == nil { return } if pa.runOnDemandCloseTimerStarted || pa.sourceState == sourceStateWaitingDescribe || pa.hasClientsNotSources() { return } pa.runOnDemandCloseTimer.Stop() pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter) pa.runOnDemandCloseTimerStarted = true } func (pa *Path) scheduleClose() { if pa.conf.Regexp != nil && !pa.hasClients() && pa.source == nil && pa.sourceState != sourceStateWaitingDescribe && !pa.sourceCloseTimerStarted && !pa.runOnDemandCloseTimerStarted && !pa.closeTimerStarted { pa.closeTimer.Stop() pa.closeTimer = time.NewTimer(0) pa.closeTimerStarted = true } } // ConfName returns the configuration name of this path. func (pa *Path) ConfName() string { return pa.confName } // Conf returns the configuration of this path. func (pa *Path) Conf() *conf.PathConf { return pa.conf } // Name returns the name of this path. func (pa *Path) Name() string { return pa.name } // SourceTrackCount returns the number of tracks of the source this path. func (pa *Path) SourceTrackCount() int { return pa.sourceTrackCount } // OnSourceSetReady is called by a source. func (pa *Path) OnSourceSetReady(tracks gortsplib.Tracks) { pa.sourceSdp = tracks.Write() pa.sourceTrackCount = len(tracks) pa.sourceSetReady <- struct{}{} } // OnSourceSetNotReady is called by a source. func (pa *Path) OnSourceSetNotReady() { pa.sourceSetNotReady <- struct{}{} } // OnPathManDescribe is called by pathman.PathMan. func (pa *Path) OnPathManDescribe(req clientrtsp.DescribeReq) { pa.clientDescribe <- req } // OnPathManSetupPlay is called by pathman.PathMan. func (pa *Path) OnPathManSetupPlay(req clientrtsp.SetupPlayReq) { pa.clientSetupPlay <- req } // OnPathManAnnounce is called by pathman.PathMan. func (pa *Path) OnPathManAnnounce(req clientrtsp.AnnounceReq) { pa.clientAnnounce <- req } // OnClientRemove is called by clientrtsp.Client. func (pa *Path) OnClientRemove(req clientrtsp.RemoveReq) { pa.clientRemove <- req } // OnClientPlay is called by clientrtsp.Client. func (pa *Path) OnClientPlay(req clientrtsp.PlayReq) { pa.clientPlay <- req } // OnClientRecord is called by clientrtsp.Client. func (pa *Path) OnClientRecord(req clientrtsp.RecordReq) { pa.clientRecord <- req } // OnClientPause is called by clientrtsp.Client. func (pa *Path) OnClientPause(req clientrtsp.PauseReq) { pa.clientPause <- req } // OnFrame is called by a source or by a clientrtsp.Client. func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, buf []byte) { pa.readers.forwardFrame(trackID, streamType, buf) }