mediamtx/internal/core/path.go

965 lines
23 KiB
Go
Raw Normal View History

package core
2020-10-19 20:17:48 +00:00
import (
2021-05-10 19:32:59 +00:00
"context"
2020-10-19 20:17:48 +00:00
"fmt"
"net"
2020-10-19 20:17:48 +00:00
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
2021-02-09 21:33:50 +00:00
"github.com/aler9/gortsplib/pkg/base"
2020-10-19 20:17:48 +00:00
2020-11-01 21:56:56 +00:00
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
2020-10-19 20:17:48 +00:00
)
func newEmptyTimer() *time.Timer {
t := time.NewTimer(0)
<-t.C
return t
}
2020-10-19 20:17:48 +00:00
type pathErrNoOnePublishing struct {
PathName string
}
// Error implements the error interface.
func (e pathErrNoOnePublishing) Error() string {
return fmt.Sprintf("no one is publishing to path '%s'", e.PathName)
}
type pathErrAuthNotCritical struct {
*base.Response
}
// Error implements the error interface.
func (pathErrAuthNotCritical) Error() string {
return "non-critical authentication error"
}
type pathErrAuthCritical struct {
Message string
Response *base.Response
}
// Error implements the error interface.
func (pathErrAuthCritical) Error() string {
return "critical authentication error"
}
type pathParent interface {
Log(logger.Level, string, ...interface{})
OnPathSourceReady(*path)
OnPathClose(*path)
2020-10-19 20:17:48 +00:00
}
type pathRTSPSession interface {
IsRTSPSession()
}
2020-10-27 23:29:53 +00:00
type sourceRedirect struct{}
// OnSourceAPIDescribe implements source.
func (*sourceRedirect) OnSourceAPIDescribe() interface{} {
return struct {
Type string `json:"type"`
}{"redirect"}
}
2020-10-27 23:29:53 +00:00
type pathReaderState int
2020-10-19 20:17:48 +00:00
const (
pathReaderStatePrePlay pathReaderState = iota
pathReaderStatePlay
2020-10-19 20:17:48 +00:00
)
2021-08-03 20:40:47 +00:00
type pathOnDemandState int
const (
2021-08-03 20:40:47 +00:00
pathOnDemandStateInitial pathOnDemandState = iota
pathOnDemandStateWaitingReady
pathOnDemandStateReady
pathOnDemandStateClosing
)
type pathSourceStaticSetReadyRes struct {
Stream *stream
Err error
}
type pathSourceStaticSetReadyReq struct {
Source sourceStatic
Tracks gortsplib.Tracks
Res chan pathSourceStaticSetReadyRes
}
type pathSourceStaticSetNotReadyReq struct {
2021-08-03 20:40:47 +00:00
Source sourceStatic
Res chan struct{}
}
type pathReaderRemoveReq struct {
Author reader
Res chan struct{}
}
type pathPublisherRemoveReq struct {
Author publisher
Res chan struct{}
}
type pathDescribeRes struct {
2021-08-01 15:22:28 +00:00
Path *path
Stream *stream
Redirect string
Err error
}
type pathDescribeReq struct {
PathName string
URL *base.URL
IP net.IP
ValidateCredentials func(pathUser conf.Credential, pathPass conf.Credential) error
Res chan pathDescribeRes
}
type pathReaderSetupPlayRes struct {
Path *path
Stream *stream
Err error
}
type pathReaderSetupPlayReq struct {
Author reader
PathName string
IP net.IP
ValidateCredentials func(pathUser conf.Credential, pathPass conf.Credential) error
Res chan pathReaderSetupPlayRes
}
type pathPublisherAnnounceRes struct {
Path *path
Err error
}
type pathPublisherAnnounceReq struct {
Author publisher
PathName string
IP net.IP
ValidateCredentials func(pathUser conf.Credential, pathPass conf.Credential) error
Res chan pathPublisherAnnounceRes
}
type pathReaderPlayReq struct {
Author reader
Res chan struct{}
}
type pathPublisherRecordRes struct {
Stream *stream
Err error
}
type pathPublisherRecordReq struct {
Author publisher
Tracks gortsplib.Tracks
Res chan pathPublisherRecordRes
}
type pathReaderPauseReq struct {
Author reader
Res chan struct{}
}
type pathPublisherPauseReq struct {
Author publisher
Res chan struct{}
}
type path struct {
rtspAddress string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
readBufferSize int
2021-01-10 11:55:53 +00:00
confName string
conf *conf.PathConf
name string
wg *sync.WaitGroup
stats *stats
parent pathParent
2020-10-19 20:17:48 +00:00
2021-08-03 20:40:47 +00:00
ctx context.Context
ctxCancel func()
source source
sourceReady bool
sourceStaticWg sync.WaitGroup
readers map[reader]pathReaderState
describeRequests []pathDescribeReq
setupPlayRequests []pathReaderSetupPlayReq
stream *stream
2021-08-03 20:40:47 +00:00
onDemandCmd *externalcmd.Cmd
onPublishCmd *externalcmd.Cmd
onDemandReadyTimer *time.Timer
onDemandCloseTimer *time.Timer
onDemandState pathOnDemandState
2020-10-19 20:17:48 +00:00
// in
sourceStaticSetReady chan pathSourceStaticSetReadyReq
sourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq
describe chan pathDescribeReq
publisherRemove chan pathPublisherRemoveReq
publisherAnnounce chan pathPublisherAnnounceReq
publisherRecord chan pathPublisherRecordReq
publisherPause chan pathPublisherPauseReq
readerRemove chan pathReaderRemoveReq
readerSetupPlay chan pathReaderSetupPlayReq
readerPlay chan pathReaderPlayReq
readerPause chan pathReaderPauseReq
apiPathsList chan apiPathsListReq2
}
func newPath(
parentCtx context.Context,
rtspAddress string,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
readBufferSize int,
confName string,
2020-10-19 20:17:48 +00:00
conf *conf.PathConf,
name string,
wg *sync.WaitGroup,
stats *stats,
parent pathParent) *path {
ctx, ctxCancel := context.WithCancel(parentCtx)
2021-05-10 19:32:59 +00:00
pa := &path{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
readBufferSize: readBufferSize,
confName: confName,
conf: conf,
name: name,
wg: wg,
stats: stats,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
readers: make(map[reader]pathReaderState),
2021-08-03 20:40:47 +00:00
onDemandReadyTimer: newEmptyTimer(),
onDemandCloseTimer: newEmptyTimer(),
sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq),
sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
describe: make(chan pathDescribeReq),
publisherRemove: make(chan pathPublisherRemoveReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq),
publisherRecord: make(chan pathPublisherRecordReq),
publisherPause: make(chan pathPublisherPauseReq),
readerRemove: make(chan pathReaderRemoveReq),
readerSetupPlay: make(chan pathReaderSetupPlayReq),
readerPlay: make(chan pathReaderPlayReq),
readerPause: make(chan pathReaderPauseReq),
apiPathsList: make(chan apiPathsListReq2),
2020-10-19 20:17:48 +00:00
}
pa.Log(logger.Info, "created")
2020-10-19 20:17:48 +00:00
pa.wg.Add(1)
go pa.run()
2021-08-01 14:58:46 +00:00
2020-10-19 20:17:48 +00:00
return pa
}
func (pa *path) Close() {
2021-05-10 19:32:59 +00:00
pa.ctxCancel()
pa.Log(logger.Info, "destroyed")
2020-10-19 20:17:48 +00:00
}
2020-11-05 11:30:25 +00:00
// Log is the main logging function.
func (pa *path) Log(level logger.Level, format string, args ...interface{}) {
pa.parent.Log(level, "[path "+pa.name+"] "+format, args...)
2020-10-19 20:17:48 +00:00
}
2021-07-31 16:32:00 +00:00
// ConfName returns the configuration name of this path.
func (pa *path) ConfName() string {
return pa.confName
}
// Conf returns the configuration of this path.
func (pa *path) Conf() *conf.PathConf {
return pa.conf
}
// Name returns the name of this path.
func (pa *path) Name() string {
return pa.name
}
func (pa *path) run() {
2020-10-19 20:17:48 +00:00
defer pa.wg.Done()
if pa.conf.Source == "redirect" {
2020-10-27 23:29:53 +00:00
pa.source = &sourceRedirect{}
2021-08-03 20:40:47 +00:00
} else if !pa.conf.SourceOnDemand && pa.hasStaticSource() {
pa.staticSourceCreate()
2020-10-19 20:17:48 +00:00
}
2020-11-21 12:35:33 +00:00
var onInitCmd *externalcmd.Cmd
2020-10-19 20:17:48 +00:00
if pa.conf.RunOnInit != "" {
pa.Log(logger.Info, "runOnInit command started")
_, port, _ := net.SplitHostPort(pa.rtspAddress)
onInitCmd = externalcmd.New(pa.conf.RunOnInit, pa.conf.RunOnInitRestart, externalcmd.Environment{
2020-11-01 16:33:06 +00:00
Path: pa.name,
Port: port,
2020-11-01 16:33:06 +00:00
})
2020-10-19 20:17:48 +00:00
}
outer:
for {
select {
2021-08-03 20:40:47 +00:00
case <-pa.onDemandReadyTimer.C:
for _, req := range pa.describeRequests {
2021-08-03 20:40:47 +00:00
req.Res <- pathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
2021-08-03 20:40:47 +00:00
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
2020-10-19 20:17:48 +00:00
}
pa.setupPlayRequests = nil
2020-10-19 20:17:48 +00:00
2021-08-03 20:40:47 +00:00
pa.onDemandCloseSource()
2021-08-03 20:40:47 +00:00
if pa.conf.Regexp != nil {
break outer
}
2021-08-03 20:40:47 +00:00
case <-pa.onDemandCloseTimer.C:
pa.onDemandCloseSource()
2021-08-03 20:40:47 +00:00
if pa.conf.Regexp != nil {
break outer
}
case req := <-pa.sourceStaticSetReady:
if req.Source == pa.source {
pa.sourceSetReady(req.Tracks)
req.Res <- pathSourceStaticSetReadyRes{Stream: pa.stream}
} else {
req.Res <- pathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")}
}
2020-10-19 20:17:48 +00:00
case req := <-pa.sourceStaticSetNotReady:
2021-08-03 20:40:47 +00:00
if req.Source == pa.source {
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
pa.onDemandCloseSource()
} else {
pa.sourceSetNotReady()
}
2021-08-03 20:40:47 +00:00
}
close(req.Res)
2020-10-19 20:17:48 +00:00
2021-08-03 20:40:47 +00:00
if pa.source == nil && pa.conf.Regexp != nil {
break outer
}
case req := <-pa.describe:
2021-08-11 10:45:53 +00:00
pa.handleDescribe(req)
2020-10-19 20:17:48 +00:00
case req := <-pa.publisherRemove:
2021-08-11 10:45:53 +00:00
pa.handlePublisherRemove(req)
2020-10-19 20:17:48 +00:00
2021-08-03 20:40:47 +00:00
if pa.source == nil && pa.conf.Regexp != nil {
break outer
}
case req := <-pa.publisherAnnounce:
2021-08-11 10:45:53 +00:00
pa.handlePublisherAnnounce(req)
2021-03-10 14:06:45 +00:00
case req := <-pa.publisherRecord:
2021-08-11 10:45:53 +00:00
pa.handlePublisherRecord(req)
2020-10-19 20:17:48 +00:00
case req := <-pa.publisherPause:
2021-08-11 10:45:53 +00:00
pa.handlePublisherPause(req)
2020-10-19 20:17:48 +00:00
2021-08-03 20:40:47 +00:00
if pa.source == nil && pa.conf.Regexp != nil {
break outer
}
case req := <-pa.readerRemove:
2021-08-11 10:45:53 +00:00
pa.handleReaderRemove(req)
case req := <-pa.readerSetupPlay:
2021-08-11 10:45:53 +00:00
pa.handleReaderSetupPlay(req)
case req := <-pa.readerPlay:
2021-08-11 10:45:53 +00:00
pa.handleReaderPlay(req)
case req := <-pa.readerPause:
2021-08-11 10:45:53 +00:00
pa.handleReaderPause(req)
2020-10-19 20:17:48 +00:00
case req := <-pa.apiPathsList:
2021-08-11 10:45:53 +00:00
pa.handleAPIPathsList(req)
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
2020-10-19 20:17:48 +00:00
break outer
}
}
2021-05-10 19:32:59 +00:00
pa.ctxCancel()
2021-08-03 20:40:47 +00:00
pa.onDemandReadyTimer.Stop()
pa.onDemandCloseTimer.Stop()
if onInitCmd != nil {
onInitCmd.Close()
pa.Log(logger.Info, "runOnInit command stopped")
}
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{Err: fmt.Errorf("terminated")}
}
for _, req := range pa.setupPlayRequests {
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
}
for rp, state := range pa.readers {
if state == pathReaderStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1)
}
rp.Close()
}
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.Log(logger.Info, "runOnDemand command stopped")
}
if pa.stream != nil {
pa.stream.close()
}
if pa.source != nil {
if source, ok := pa.source.(sourceStatic); ok {
source.Close()
pa.sourceStaticWg.Wait()
} else if source, ok := pa.source.(publisher); ok {
2021-08-03 20:40:47 +00:00
if pa.sourceReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
}
source.Close()
}
}
2020-10-19 20:17:48 +00:00
2021-05-10 19:32:59 +00:00
pa.parent.OnPathClose(pa)
2020-10-19 20:17:48 +00:00
}
func (pa *path) hasStaticSource() bool {
return strings.HasPrefix(pa.conf.Source, "rtsp://") ||
2020-12-14 22:32:24 +00:00
strings.HasPrefix(pa.conf.Source, "rtsps://") ||
2021-09-05 15:35:04 +00:00
strings.HasPrefix(pa.conf.Source, "rtmp://") ||
strings.HasPrefix(pa.conf.Source, "http://") ||
strings.HasPrefix(pa.conf.Source, "https://")
}
2021-08-03 20:40:47 +00:00
func (pa *path) isOnDemand() bool {
return (pa.hasStaticSource() && pa.conf.SourceOnDemand) || pa.conf.RunOnDemand != ""
}
func (pa *path) onDemandStartSource() {
pa.onDemandReadyTimer.Stop()
if pa.hasStaticSource() {
pa.staticSourceCreate()
pa.onDemandReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
2021-08-03 20:40:47 +00:00
} else {
pa.Log(logger.Info, "runOnDemand command started")
2021-08-03 20:40:47 +00:00
_, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{
Path: pa.name,
Port: port,
})
pa.onDemandReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
2021-08-03 20:40:47 +00:00
}
pa.onDemandState = pathOnDemandStateWaitingReady
}
func (pa *path) onDemandScheduleClose() {
pa.onDemandCloseTimer.Stop()
if pa.hasStaticSource() {
pa.onDemandCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter))
2021-08-03 20:40:47 +00:00
} else {
pa.onDemandCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter))
2021-08-03 20:40:47 +00:00
}
pa.onDemandState = pathOnDemandStateClosing
}
func (pa *path) onDemandCloseSource() {
if pa.onDemandState == pathOnDemandStateClosing {
pa.onDemandCloseTimer.Stop()
pa.onDemandCloseTimer = newEmptyTimer()
}
// set state before doPublisherRemove()
pa.onDemandState = pathOnDemandStateInitial
if pa.hasStaticSource() {
if pa.sourceReady {
pa.sourceSetNotReady()
}
pa.source.(sourceStatic).Close()
pa.source = nil
2021-08-03 20:40:47 +00:00
} else {
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
pa.Log(logger.Info, "runOnDemand command stopped")
}
2021-08-03 20:40:47 +00:00
if pa.source != nil {
pa.source.(publisher).Close()
pa.doPublisherRemove()
}
}
}
func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
2021-08-03 20:40:47 +00:00
pa.sourceReady = true
pa.stream = newStream(tracks)
2021-08-03 20:40:47 +00:00
if pa.isOnDemand() {
pa.onDemandReadyTimer.Stop()
pa.onDemandReadyTimer = newEmptyTimer()
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{
Stream: pa.stream,
}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
2021-08-11 10:45:53 +00:00
pa.handleReaderSetupPlayPost(req)
2021-08-03 20:40:47 +00:00
}
pa.setupPlayRequests = nil
if len(pa.readers) > 0 {
pa.onDemandState = pathOnDemandStateReady
} else {
pa.onDemandScheduleClose()
}
}
pa.parent.OnPathSourceReady(pa)
}
func (pa *path) sourceSetNotReady() {
if pa.onPublishCmd != nil {
pa.onPublishCmd.Close()
pa.onPublishCmd = nil
pa.Log(logger.Info, "runOnPublish command stopped")
2021-08-03 20:40:47 +00:00
}
for r := range pa.readers {
pa.doReaderRemove(r)
r.Close()
}
pa.sourceReady = false
pa.stream.close()
pa.stream = nil
2021-08-03 20:40:47 +00:00
}
func (pa *path) staticSourceCreate() {
2021-09-05 15:35:04 +00:00
switch {
case strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://"):
pa.source = newRTSPSource(
2021-05-11 15:20:32 +00:00
pa.ctx,
2021-01-10 11:55:53 +00:00
pa.conf.Source,
pa.conf.SourceProtocol,
pa.conf.SourceAnyPortEnable,
pa.conf.SourceFingerprint,
2021-01-10 11:55:53 +00:00
pa.readTimeout,
pa.writeTimeout,
pa.readBufferCount,
pa.readBufferSize,
&pa.sourceStaticWg,
2021-01-10 11:55:53 +00:00
pa)
2021-09-05 15:35:04 +00:00
case strings.HasPrefix(pa.conf.Source, "rtmp://"):
pa.source = newRTMPSource(
2021-05-11 15:20:32 +00:00
pa.ctx,
2021-01-10 11:55:53 +00:00
pa.conf.Source,
2021-01-31 15:24:58 +00:00
pa.readTimeout,
pa.writeTimeout,
&pa.sourceStaticWg,
2021-01-10 11:55:53 +00:00
pa)
2021-09-05 15:35:04 +00:00
case strings.HasPrefix(pa.conf.Source, "http://") ||
strings.HasPrefix(pa.conf.Source, "https://"):
pa.source = newHLSSource(
pa.ctx,
pa.conf.Source,
&pa.sourceStaticWg,
pa)
}
2020-10-19 20:17:48 +00:00
}
2021-08-03 20:40:47 +00:00
func (pa *path) doReaderRemove(r reader) {
state := pa.readers[r]
if state == pathReaderStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1)
pa.stream.readerRemove(r)
2020-10-19 20:17:48 +00:00
}
delete(pa.readers, r)
2020-10-19 20:17:48 +00:00
}
2021-08-03 20:40:47 +00:00
func (pa *path) doPublisherRemove() {
if pa.sourceReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
pa.onDemandCloseSource()
} else {
pa.sourceSetNotReady()
}
}
pa.source = nil
for r := range pa.readers {
2021-08-03 20:40:47 +00:00
pa.doReaderRemove(r)
r.Close()
}
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleDescribe(req pathDescribeReq) {
if _, ok := pa.source.(*sourceRedirect); ok {
req.Res <- pathDescribeRes{
Redirect: pa.conf.SourceRedirect,
}
return
}
2021-08-03 20:40:47 +00:00
if pa.sourceReady {
req.Res <- pathDescribeRes{
Stream: pa.stream,
}
return
2021-08-03 20:40:47 +00:00
}
2021-08-03 20:40:47 +00:00
if pa.isOnDemand() {
if pa.onDemandState == pathOnDemandStateInitial {
pa.onDemandStartSource()
}
pa.describeRequests = append(pa.describeRequests, req)
return
2021-08-03 20:40:47 +00:00
}
2021-08-03 20:40:47 +00:00
if pa.conf.Fallback != "" {
fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") {
ur := base.URL{
Scheme: req.URL.Scheme,
User: req.URL.User,
Host: req.URL.Host,
Path: pa.conf.Fallback,
2021-02-09 21:33:50 +00:00
}
2021-08-03 20:40:47 +00:00
return ur.String()
}
return pa.conf.Fallback
}()
req.Res <- pathDescribeRes{Redirect: fallbackURL}
return
2020-10-19 20:17:48 +00:00
}
2021-08-03 20:40:47 +00:00
req.Res <- pathDescribeRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
2020-10-19 20:17:48 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) {
2021-08-01 14:37:37 +00:00
if pa.source == req.Author {
2021-08-03 20:40:47 +00:00
pa.doPublisherRemove()
2021-08-01 14:37:37 +00:00
}
close(req.Res)
}
2021-08-11 10:45:53 +00:00
func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
2021-08-01 14:37:37 +00:00
if pa.source != nil {
if pa.hasStaticSource() {
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("path '%s' is assigned to a static source", pa.name)}
return
}
if pa.conf.DisablePublisherOverride {
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("another publisher is already publishing to path '%s'", pa.name)}
return
}
pa.Log(logger.Info, "closing existing publisher")
2021-08-03 20:40:47 +00:00
pa.source.(publisher).Close()
pa.doPublisherRemove()
2021-08-01 14:37:37 +00:00
}
pa.source = req.Author
2021-08-03 20:40:47 +00:00
2021-08-01 14:37:37 +00:00
req.Res <- pathPublisherAnnounceRes{Path: pa}
}
2021-08-11 10:45:53 +00:00
func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
2021-08-01 14:37:37 +00:00
if pa.source != req.Author {
req.Res <- pathPublisherRecordRes{Err: fmt.Errorf("publisher is not assigned to this path anymore")}
return
}
atomic.AddInt64(pa.stats.CountPublishers, 1)
req.Author.OnPublisherAccepted(len(req.Tracks))
2021-08-01 14:37:37 +00:00
pa.sourceSetReady(req.Tracks)
2021-08-01 14:37:37 +00:00
if pa.conf.RunOnPublish != "" {
pa.Log(logger.Info, "runOnPublish command started")
2021-08-01 14:37:37 +00:00
_, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onPublishCmd = externalcmd.New(pa.conf.RunOnPublish, pa.conf.RunOnPublishRestart, externalcmd.Environment{
Path: pa.name,
Port: port,
})
}
req.Res <- pathPublisherRecordRes{Stream: pa.stream}
2021-08-01 14:37:37 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handlePublisherPause(req pathPublisherPauseReq) {
2021-08-03 20:40:47 +00:00
if req.Author == pa.source && pa.sourceReady {
2021-08-01 14:37:37 +00:00
atomic.AddInt64(pa.stats.CountPublishers, -1)
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
pa.onDemandCloseSource()
} else {
pa.sourceSetNotReady()
}
2021-08-01 14:37:37 +00:00
}
close(req.Res)
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderRemove(req pathReaderRemoveReq) {
2021-08-01 14:37:37 +00:00
if _, ok := pa.readers[req.Author]; ok {
2021-08-03 20:40:47 +00:00
pa.doReaderRemove(req.Author)
2021-08-01 14:37:37 +00:00
}
close(req.Res)
2021-08-03 20:40:47 +00:00
if pa.isOnDemand() &&
len(pa.readers) == 0 &&
pa.onDemandState == pathOnDemandStateReady {
pa.onDemandScheduleClose()
}
2021-08-01 14:37:37 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
2021-08-03 20:40:47 +00:00
if pa.sourceReady {
2021-08-11 10:45:53 +00:00
pa.handleReaderSetupPlayPost(req)
2021-03-10 14:06:45 +00:00
return
2021-08-03 20:40:47 +00:00
}
2021-03-10 14:06:45 +00:00
2021-08-03 20:40:47 +00:00
if pa.isOnDemand() {
if pa.onDemandState == pathOnDemandStateInitial {
pa.onDemandStartSource()
}
2021-03-10 14:06:45 +00:00
pa.setupPlayRequests = append(pa.setupPlayRequests, req)
return
2020-10-19 20:17:48 +00:00
}
2021-08-03 20:40:47 +00:00
req.Res <- pathReaderSetupPlayRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
2021-03-10 14:06:45 +00:00
}
2020-10-19 20:17:48 +00:00
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) {
2021-08-03 20:40:47 +00:00
pa.readers[req.Author] = pathReaderStatePrePlay
2021-08-03 20:40:47 +00:00
if pa.isOnDemand() && pa.onDemandState == pathOnDemandStateClosing {
pa.onDemandState = pathOnDemandStateReady
pa.onDemandCloseTimer.Stop()
pa.onDemandCloseTimer = newEmptyTimer()
}
req.Res <- pathReaderSetupPlayRes{
Path: pa,
Stream: pa.stream,
}
2020-10-19 20:17:48 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderPlay(req pathReaderPlayReq) {
atomic.AddInt64(pa.stats.CountReaders, 1)
pa.readers[req.Author] = pathReaderStatePlay
pa.stream.readerAdd(req.Author)
req.Author.OnReaderAccepted()
close(req.Res)
2020-10-19 20:17:48 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderPause(req pathReaderPauseReq) {
if state, ok := pa.readers[req.Author]; ok && state == pathReaderStatePlay {
2020-11-07 21:47:10 +00:00
atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readers[req.Author] = pathReaderStatePrePlay
pa.stream.readerRemove(req.Author)
}
close(req.Res)
}
2020-11-07 21:47:10 +00:00
2021-08-11 10:45:53 +00:00
func (pa *path) handleAPIPathsList(req apiPathsListReq2) {
req.Data.Items[pa.name] = apiPathsItem{
ConfName: pa.confName,
Conf: pa.conf,
Source: func() interface{} {
if pa.source == nil {
return nil
}
return pa.source.OnSourceAPIDescribe()
}(),
SourceReady: pa.sourceReady,
Readers: func() []interface{} {
ret := []interface{}{}
for r := range pa.readers {
ret = append(ret, r.OnReaderAPIDescribe())
}
return ret
}(),
}
close(req.Res)
2021-08-11 10:45:53 +00:00
}
// OnSourceStaticSetReady is called by a sourceStatic.
func (pa *path) OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.Res = make(chan pathSourceStaticSetReadyRes)
2021-05-10 19:32:59 +00:00
select {
case pa.sourceStaticSetReady <- req:
return <-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
return pathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
}
// OnSourceStaticSetNotReady is called by a sourceStatic.
func (pa *path) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.Res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.sourceStaticSetNotReady <- req:
<-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
2020-11-05 11:30:25 +00:00
}
2021-08-01 15:22:28 +00:00
// OnDescribe is called by a reader or publisher through pathManager.
func (pa *path) OnDescribe(req pathDescribeReq) pathDescribeRes {
2021-05-10 19:32:59 +00:00
select {
case pa.describe <- req:
2021-08-01 15:22:28 +00:00
return <-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
2021-08-01 15:22:28 +00:00
return pathDescribeRes{Err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
// OnPublisherRemove is called by a publisher.
func (pa *path) OnPublisherRemove(req pathPublisherRemoveReq) {
req.Res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.publisherRemove <- req:
<-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
2020-10-19 20:17:48 +00:00
}
2021-08-01 15:22:28 +00:00
// OnPublisherAnnounce is called by a publisher through pathManager.
func (pa *path) OnPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
2021-05-10 19:32:59 +00:00
select {
case pa.publisherAnnounce <- req:
2021-08-01 15:22:28 +00:00
return <-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
2021-08-01 15:22:28 +00:00
return pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
// OnPublisherRecord is called by a publisher.
func (pa *path) OnPublisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes {
req.Res = make(chan pathPublisherRecordRes)
select {
case pa.publisherRecord <- req:
return <-req.Res
case <-pa.ctx.Done():
return pathPublisherRecordRes{Err: fmt.Errorf("terminated")}
}
}
// OnPublisherPause is called by a publisher.
func (pa *path) OnPublisherPause(req pathPublisherPauseReq) {
req.Res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.publisherPause <- req:
<-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
2020-10-19 20:17:48 +00:00
}
// OnReaderRemove is called by a reader.
func (pa *path) OnReaderRemove(req pathReaderRemoveReq) {
req.Res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.readerRemove <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
2021-08-01 15:22:28 +00:00
// OnReaderSetupPlay is called by a reader through pathManager.
func (pa *path) OnReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
select {
case pa.readerSetupPlay <- req:
2021-08-01 15:22:28 +00:00
return <-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
2021-08-01 15:22:28 +00:00
return pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
// OnReaderPlay is called by a reader.
func (pa *path) OnReaderPlay(req pathReaderPlayReq) {
req.Res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.readerPlay <- req:
<-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
}
// OnReaderPause is called by a reader.
func (pa *path) OnReaderPause(req pathReaderPauseReq) {
req.Res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.readerPause <- req:
<-req.Res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
2020-11-07 21:47:10 +00:00
}
2021-03-23 19:37:43 +00:00
// OnAPIPathsList is called by api.
func (pa *path) OnAPIPathsList(req apiPathsListReq2) {
req.Res = make(chan struct{})
select {
case pa.apiPathsList <- req:
<-req.Res
case <-pa.ctx.Done():
}
}