mediamtx/path.go

294 lines
7.0 KiB
Go
Raw Normal View History

package main
import (
2020-07-30 11:31:18 +00:00
"fmt"
2020-10-03 19:10:41 +00:00
"strings"
2020-09-19 21:52:06 +00:00
"sync/atomic"
"time"
2020-10-13 22:50:08 +00:00
"github.com/aler9/rtsp-simple-server/conf"
"github.com/aler9/rtsp-simple-server/externalcmd"
)
2020-08-30 13:27:03 +00:00
const (
2020-10-03 19:10:41 +00:00
describeTimeout = 5 * time.Second
sourceStopAfterDescribePeriod = 10 * time.Second
onDemandCmdStopAfterDescribePeriod = 10 * time.Second
2020-08-30 13:27:03 +00:00
)
2020-10-13 18:00:40 +00:00
// a source can be a client, a sourceRtsp or a sourceRtmp
type source interface {
isSource()
}
type path struct {
2020-08-30 13:27:03 +00:00
p *program
name string
2020-10-13 22:50:08 +00:00
conf *conf.PathConf
2020-10-13 18:00:40 +00:00
source source
sourceReady bool
sourceTrackCount int
sourceSdp []byte
2020-08-30 13:27:03 +00:00
lastDescribeReq time.Time
lastDescribeActivation time.Time
onInitCmd *externalcmd.ExternalCmd
onDemandCmd *externalcmd.ExternalCmd
}
2020-10-13 22:50:08 +00:00
func newPath(p *program, name string, conf *conf.PathConf) *path {
2020-07-30 11:31:18 +00:00
pa := &path{
2020-09-19 15:13:45 +00:00
p: p,
name: name,
conf: conf,
}
2020-07-30 11:31:18 +00:00
2020-10-03 19:10:41 +00:00
if strings.HasPrefix(conf.Source, "rtsp://") {
s := newSourceRtsp(p, pa)
2020-10-13 18:00:40 +00:00
pa.source = s
2020-10-03 19:10:41 +00:00
} else if strings.HasPrefix(conf.Source, "rtmp://") {
s := newSourceRtmp(p, pa)
2020-10-13 18:00:40 +00:00
pa.source = s
2020-08-30 12:15:00 +00:00
}
2020-07-30 15:30:50 +00:00
return pa
}
func (pa *path) log(format string, args ...interface{}) {
pa.p.log("[path "+pa.name+"] "+format, args...)
}
func (pa *path) onInit() {
2020-10-13 18:00:40 +00:00
if source, ok := pa.source.(*sourceRtsp); ok {
2020-10-03 19:10:41 +00:00
go source.run(source.state)
2020-10-13 18:00:40 +00:00
} else if source, ok := pa.source.(*sourceRtmp); ok {
2020-10-03 19:10:41 +00:00
go source.run(source.state)
2020-08-30 12:15:00 +00:00
}
2020-09-19 15:13:45 +00:00
if pa.conf.RunOnInit != "" {
pa.log("starting on init command")
var err error
pa.onInitCmd, err = externalcmd.New(pa.conf.RunOnInit, pa.name)
if err != nil {
pa.log("ERR: %s", err)
}
}
}
func (pa *path) onClose() {
2020-10-13 18:00:40 +00:00
if source, ok := pa.source.(*sourceRtsp); ok {
2020-10-03 19:10:41 +00:00
close(source.terminate)
<-source.done
2020-10-13 18:00:40 +00:00
} else if source, ok := pa.source.(*sourceRtmp); ok {
2020-10-03 19:10:41 +00:00
close(source.terminate)
<-source.done
2020-08-30 12:15:00 +00:00
}
if pa.onInitCmd != nil {
pa.log("stopping on init command (closing)")
pa.onInitCmd.Close()
}
if pa.onDemandCmd != nil {
pa.log("stopping on demand command (closing)")
pa.onDemandCmd.Close()
}
for c := range pa.p.clients {
if c.path == pa {
if c.state == clientStateWaitDescription {
c.path = nil
c.state = clientStateInitial
c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
} else {
c.close()
}
}
}
}
2020-08-30 13:27:03 +00:00
func (pa *path) hasClients() bool {
for c := range pa.p.clients {
if c.path == pa {
2020-08-30 13:27:03 +00:00
return true
}
}
return false
}
func (pa *path) hasClientsWaitingDescribe() bool {
for c := range pa.p.clients {
if c.state == clientStateWaitDescription && c.path == pa {
2020-08-30 13:27:03 +00:00
return true
}
}
return false
}
func (pa *path) hasClientReaders() bool {
for c := range pa.p.clients {
2020-10-13 18:00:40 +00:00
if c.path == pa && c != pa.source {
2020-08-30 13:27:03 +00:00
return true
}
2020-08-30 13:27:03 +00:00
}
return false
}
2020-07-30 11:31:18 +00:00
2020-08-30 13:27:03 +00:00
func (pa *path) onCheck() {
2020-07-30 11:31:18 +00:00
// reply to DESCRIBE requests if they are in timeout
2020-08-30 13:27:03 +00:00
if pa.hasClientsWaitingDescribe() &&
time.Since(pa.lastDescribeActivation) >= describeTimeout {
2020-07-30 11:31:18 +00:00
for c := range pa.p.clients {
if c.state == clientStateWaitDescription &&
c.path == pa {
c.path = nil
2020-07-30 11:31:18 +00:00
c.state = clientStateInitial
c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
2020-07-30 11:31:18 +00:00
}
}
2020-08-30 13:27:03 +00:00
}
2020-07-30 11:31:18 +00:00
2020-10-03 19:10:41 +00:00
// stop on demand rtsp source if needed
2020-10-13 18:00:40 +00:00
if source, ok := pa.source.(*sourceRtsp); ok {
2020-10-03 19:10:41 +00:00
if pa.conf.SourceOnDemand &&
source.state == sourceRtspStateRunning &&
!pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod {
pa.log("stopping on demand rtsp source (not requested anymore)")
atomic.AddInt64(pa.p.countSourcesRtspRunning, -1)
source.state = sourceRtspStateStopped
source.setState <- source.state
}
// stop on demand rtmp source if needed
2020-10-13 18:00:40 +00:00
} else if source, ok := pa.source.(*sourceRtmp); ok {
2020-10-03 19:10:41 +00:00
if pa.conf.SourceOnDemand &&
source.state == sourceRtmpStateRunning &&
!pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribePeriod {
pa.log("stopping on demand rtmp source (not requested anymore)")
atomic.AddInt64(pa.p.countSourcesRtmpRunning, -1)
source.state = sourceRtmpStateStopped
source.setState <- source.state
}
2020-07-30 11:31:18 +00:00
}
2020-08-30 13:27:03 +00:00
// stop on demand command if needed
if pa.onDemandCmd != nil &&
!pa.hasClientReaders() &&
2020-10-03 19:10:41 +00:00
time.Since(pa.lastDescribeReq) >= onDemandCmdStopAfterDescribePeriod {
pa.log("stopping on demand command (not requested anymore)")
pa.onDemandCmd.Close()
2020-08-30 13:27:03 +00:00
pa.onDemandCmd = nil
}
2020-09-19 15:13:45 +00:00
// remove regular expression paths
2020-10-13 22:50:08 +00:00
if pa.conf.Regexp != nil &&
2020-10-13 18:00:40 +00:00
pa.source == nil &&
2020-08-30 13:27:03 +00:00
!pa.hasClients() {
pa.onClose()
2020-08-30 13:27:03 +00:00
delete(pa.p.paths, pa.name)
}
}
2020-10-13 18:00:40 +00:00
func (pa *path) onSourceRemove() {
pa.source = nil
// close all clients that are reading or waiting for reading
for c := range pa.p.clients {
if c.path == pa &&
c.state != clientStateWaitDescription &&
2020-10-13 18:00:40 +00:00
c != pa.source {
c.close()
}
}
2020-08-30 13:27:03 +00:00
}
2020-10-13 18:00:40 +00:00
func (pa *path) onSourceSetReady() {
pa.sourceReady = true
2020-08-30 13:27:03 +00:00
// reply to all clients that are waiting for a description
for c := range pa.p.clients {
if c.state == clientStateWaitDescription &&
c.path == pa {
c.path = nil
2020-08-30 13:27:03 +00:00
c.state = clientStateInitial
2020-10-13 18:00:40 +00:00
c.describe <- describeRes{pa.sourceSdp, nil}
2020-07-30 11:31:18 +00:00
}
2020-08-30 13:27:03 +00:00
}
}
2020-07-30 11:31:18 +00:00
2020-10-13 18:00:40 +00:00
func (pa *path) onSourceSetNotReady() {
pa.sourceReady = false
2020-08-30 13:27:03 +00:00
// close all clients that are reading or waiting for reading
2020-08-30 13:27:03 +00:00
for c := range pa.p.clients {
if c.path == pa &&
c.state != clientStateWaitDescription &&
2020-10-13 18:00:40 +00:00
c != pa.source {
c.close()
2020-07-30 11:31:18 +00:00
}
}
}
2020-08-30 13:27:03 +00:00
func (pa *path) onDescribe(client *client) {
pa.lastDescribeReq = time.Now()
2020-07-30 11:31:18 +00:00
// publisher not found
2020-10-13 18:00:40 +00:00
if pa.source == nil {
2020-07-31 15:49:48 +00:00
// on demand command is available: put the client on hold
2020-09-19 15:13:45 +00:00
if pa.conf.RunOnDemand != "" {
2020-08-30 13:27:03 +00:00
if pa.onDemandCmd == nil { // start if needed
pa.log("starting on demand command")
2020-08-30 13:27:03 +00:00
pa.lastDescribeActivation = time.Now()
var err error
pa.onDemandCmd, err = externalcmd.New(pa.conf.RunOnDemand, pa.name)
2020-07-30 11:31:18 +00:00
if err != nil {
pa.log("ERR: %s", err)
2020-07-30 11:31:18 +00:00
}
}
client.path = pa
client.state = clientStateWaitDescription
2020-08-30 13:27:03 +00:00
// no on-demand: reply with 404
} else {
client.describe <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)}
2020-08-30 13:27:03 +00:00
}
2020-07-30 11:31:18 +00:00
2020-08-30 13:27:03 +00:00
// publisher was found but is not ready: put the client on hold
2020-10-13 18:00:40 +00:00
} else if !pa.sourceReady {
2020-10-03 19:10:41 +00:00
// start rtsp source if needed
2020-10-13 18:00:40 +00:00
if source, ok := pa.source.(*sourceRtsp); ok {
2020-10-03 19:10:41 +00:00
if source.state == sourceRtspStateStopped {
pa.log("starting on demand rtsp source")
pa.lastDescribeActivation = time.Now()
atomic.AddInt64(pa.p.countSourcesRtspRunning, +1)
source.state = sourceRtspStateRunning
source.setState <- source.state
}
// start rtmp source if needed
2020-10-13 18:00:40 +00:00
} else if source, ok := pa.source.(*sourceRtmp); ok {
2020-10-03 19:10:41 +00:00
if source.state == sourceRtmpStateStopped {
pa.log("starting on demand rtmp source")
pa.lastDescribeActivation = time.Now()
atomic.AddInt64(pa.p.countSourcesRtmpRunning, +1)
source.state = sourceRtmpStateRunning
source.setState <- source.state
}
}
client.path = pa
client.state = clientStateWaitDescription
2020-08-30 13:27:03 +00:00
// publisher was found and is ready
} else {
2020-10-13 18:00:40 +00:00
client.describe <- describeRes{pa.sourceSdp, nil}
}
}