mediamtx/internal/core/path.go

1088 lines
26 KiB
Go
Raw Normal View History

package core
2020-10-19 20:17:48 +00:00
import (
2021-05-10 19:32:59 +00:00
"context"
2020-10-19 20:17:48 +00:00
"fmt"
"net"
"strconv"
2020-10-19 20:17:48 +00:00
"strings"
"sync"
"time"
"github.com/aler9/gortsplib"
2021-02-09 21:33:50 +00:00
"github.com/aler9/gortsplib/pkg/base"
2022-06-04 23:36:29 +00:00
"github.com/aler9/gortsplib/pkg/url"
2020-10-19 20:17:48 +00:00
2020-11-01 21:56:56 +00:00
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
2020-10-19 20:17:48 +00:00
)
func newEmptyTimer() *time.Timer {
t := time.NewTimer(0)
<-t.C
return t
}
2020-10-19 20:17:48 +00:00
type authenticateFunc func(
pathIPs []fmt.Stringer,
pathUser conf.Credential,
pathPass conf.Credential,
) error
type pathErrNoOnePublishing struct {
2022-01-14 22:42:41 +00:00
pathName string
}
// Error implements the error interface.
func (e pathErrNoOnePublishing) Error() string {
2022-01-14 22:42:41 +00:00
return fmt.Sprintf("no one is publishing to path '%s'", e.pathName)
}
type pathErrAuthNotCritical struct {
2022-01-14 22:42:41 +00:00
message string
response *base.Response
}
// Error implements the error interface.
func (pathErrAuthNotCritical) Error() string {
return "non-critical authentication error"
}
type pathErrAuthCritical struct {
2022-01-14 22:42:41 +00:00
message string
response *base.Response
}
// Error implements the error interface.
func (pathErrAuthCritical) Error() string {
return "critical authentication error"
}
type pathParent interface {
2021-10-27 19:01:00 +00:00
log(logger.Level, string, ...interface{})
pathSourceReady(*path)
pathSourceNotReady(*path)
2021-10-27 19:01:00 +00:00
onPathClose(*path)
2020-10-19 20:17:48 +00:00
}
type pathRTSPSession interface {
isRTSPSession()
}
type pathReaderState int
2020-10-19 20:17:48 +00:00
const (
pathReaderStatePrePlay pathReaderState = iota
pathReaderStatePlay
2020-10-19 20:17:48 +00:00
)
2021-08-03 20:40:47 +00:00
type pathOnDemandState int
const (
2021-08-03 20:40:47 +00:00
pathOnDemandStateInitial pathOnDemandState = iota
pathOnDemandStateWaitingReady
pathOnDemandStateReady
pathOnDemandStateClosing
)
type pathSourceStaticSetReadyRes struct {
2022-01-14 22:42:41 +00:00
stream *stream
err error
}
type pathSourceStaticSetReadyReq struct {
2022-01-14 22:42:41 +00:00
tracks gortsplib.Tracks
res chan pathSourceStaticSetReadyRes
}
type pathSourceStaticSetNotReadyReq struct {
res chan struct{}
}
type pathReaderRemoveReq struct {
2022-01-14 22:42:41 +00:00
author reader
res chan struct{}
}
type pathPublisherRemoveReq struct {
2022-01-14 22:42:41 +00:00
author publisher
res chan struct{}
}
type pathDescribeRes struct {
2022-01-14 22:42:41 +00:00
path *path
stream *stream
redirect string
err error
}
type pathDescribeReq struct {
2022-01-14 22:42:41 +00:00
pathName string
2022-06-04 23:36:29 +00:00
url *url.URL
2022-01-14 22:42:41 +00:00
authenticate authenticateFunc
res chan pathDescribeRes
}
type pathReaderSetupPlayRes struct {
2022-01-14 22:42:41 +00:00
path *path
stream *stream
err error
}
type pathReaderSetupPlayReq struct {
2022-01-14 22:42:41 +00:00
author reader
pathName string
authenticate authenticateFunc
res chan pathReaderSetupPlayRes
}
type pathPublisherAnnounceRes struct {
2022-01-14 22:42:41 +00:00
path *path
err error
}
type pathPublisherAnnounceReq struct {
2022-01-14 22:42:41 +00:00
author publisher
pathName string
authenticate authenticateFunc
res chan pathPublisherAnnounceRes
}
type pathReaderPlayReq struct {
2022-01-14 22:42:41 +00:00
author reader
res chan struct{}
}
type pathPublisherRecordRes struct {
2022-01-14 22:42:41 +00:00
stream *stream
err error
}
type pathPublisherRecordReq struct {
2022-01-14 22:42:41 +00:00
author publisher
tracks gortsplib.Tracks
res chan pathPublisherRecordRes
}
type pathReaderPauseReq struct {
2022-01-14 22:42:41 +00:00
author reader
res chan struct{}
}
type pathPublisherPauseReq struct {
2022-01-14 22:42:41 +00:00
author publisher
res chan struct{}
}
type pathAPIPathsListItem struct {
ConfName string `json:"confName"`
Conf *conf.PathConf `json:"conf"`
Source interface{} `json:"source"`
SourceReady bool `json:"sourceReady"`
Readers []interface{} `json:"readers"`
}
type pathAPIPathsListData struct {
Items map[string]pathAPIPathsListItem `json:"items"`
}
type pathAPIPathsListRes struct {
2022-01-14 22:42:41 +00:00
data *pathAPIPathsListData
paths map[string]*path
err error
}
type pathAPIPathsListReq struct {
2022-01-14 22:42:41 +00:00
res chan pathAPIPathsListRes
}
type pathAPIPathsListSubReq struct {
2022-01-14 22:42:41 +00:00
data *pathAPIPathsListData
res chan struct{}
}
type path struct {
rtspAddress string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
2021-01-10 11:55:53 +00:00
confName string
conf *conf.PathConf
name string
matches []string
2021-01-10 11:55:53 +00:00
wg *sync.WaitGroup
externalCmdPool *externalcmd.Pool
parent pathParent
2020-10-19 20:17:48 +00:00
ctx context.Context
ctxCancel func()
source source
sourceReady bool
stream *stream
readers map[reader]pathReaderState
2022-08-14 09:24:05 +00:00
describeRequestsOnHold []pathDescribeReq
setupPlayRequestsOnHold []pathReaderSetupPlayReq
onDemandCmd *externalcmd.Cmd
onReadyCmd *externalcmd.Cmd
onDemandStaticSourceState pathOnDemandState
onDemandStaticSourceReadyTimer *time.Timer
onDemandStaticSourceCloseTimer *time.Timer
onDemandPublisherState pathOnDemandState
onDemandPublisherReadyTimer *time.Timer
onDemandPublisherCloseTimer *time.Timer
2020-10-19 20:17:48 +00:00
// in
chSourceStaticSetReady chan pathSourceStaticSetReadyReq
chSourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq
chDescribe chan pathDescribeReq
chPublisherRemove chan pathPublisherRemoveReq
chPublisherAnnounce chan pathPublisherAnnounceReq
chPublisherRecord chan pathPublisherRecordReq
chPublisherPause chan pathPublisherPauseReq
chReaderRemove chan pathReaderRemoveReq
chReaderSetupPlay chan pathReaderSetupPlayReq
chReaderPlay chan pathReaderPlayReq
chReaderPause chan pathReaderPauseReq
chAPIPathsList chan pathAPIPathsListSubReq
}
func newPath(
parentCtx context.Context,
rtspAddress string,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
confName string,
2020-10-19 20:17:48 +00:00
conf *conf.PathConf,
name string,
matches []string,
wg *sync.WaitGroup,
externalCmdPool *externalcmd.Pool,
2022-04-07 10:50:35 +00:00
parent pathParent,
) *path {
ctx, ctxCancel := context.WithCancel(parentCtx)
2021-05-10 19:32:59 +00:00
pa := &path{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
confName: confName,
conf: conf,
name: name,
matches: matches,
wg: wg,
externalCmdPool: externalCmdPool,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
readers: make(map[reader]pathReaderState),
onDemandStaticSourceReadyTimer: newEmptyTimer(),
onDemandStaticSourceCloseTimer: newEmptyTimer(),
onDemandPublisherReadyTimer: newEmptyTimer(),
onDemandPublisherCloseTimer: newEmptyTimer(),
chSourceStaticSetReady: make(chan pathSourceStaticSetReadyReq),
chSourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
chDescribe: make(chan pathDescribeReq),
chPublisherRemove: make(chan pathPublisherRemoveReq),
chPublisherAnnounce: make(chan pathPublisherAnnounceReq),
chPublisherRecord: make(chan pathPublisherRecordReq),
chPublisherPause: make(chan pathPublisherPauseReq),
chReaderRemove: make(chan pathReaderRemoveReq),
chReaderSetupPlay: make(chan pathReaderSetupPlayReq),
chReaderPlay: make(chan pathReaderPlayReq),
chReaderPause: make(chan pathReaderPauseReq),
chAPIPathsList: make(chan pathAPIPathsListSubReq),
2020-10-19 20:17:48 +00:00
}
2022-02-19 21:15:37 +00:00
pa.log(logger.Debug, "created")
2020-10-19 20:17:48 +00:00
pa.wg.Add(1)
go pa.run()
2021-08-01 14:58:46 +00:00
2020-10-19 20:17:48 +00:00
return pa
}
2021-10-27 19:01:00 +00:00
func (pa *path) close() {
2021-05-10 19:32:59 +00:00
pa.ctxCancel()
2020-10-19 20:17:48 +00:00
}
2020-11-05 11:30:25 +00:00
// Log is the main logging function.
2021-10-27 19:01:00 +00:00
func (pa *path) log(level logger.Level, format string, args ...interface{}) {
pa.parent.log(level, "[path "+pa.name+"] "+format, args...)
2020-10-19 20:17:48 +00:00
}
2021-07-31 16:32:00 +00:00
// ConfName returns the configuration name of this path.
func (pa *path) ConfName() string {
return pa.confName
}
// Conf returns the configuration of this path.
func (pa *path) Conf() *conf.PathConf {
return pa.conf
}
// Name returns the name of this path.
func (pa *path) Name() string {
return pa.name
}
func (pa *path) hasStaticSource() bool {
return strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://") ||
strings.HasPrefix(pa.conf.Source, "rtmp://") ||
strings.HasPrefix(pa.conf.Source, "http://") ||
strings.HasPrefix(pa.conf.Source, "https://")
}
func (pa *path) hasOnDemandStaticSource() bool {
return pa.hasStaticSource() && pa.conf.SourceOnDemand
}
func (pa *path) hasOnDemandPublisher() bool {
return pa.conf.RunOnDemand != ""
}
func (pa *path) run() {
2020-10-19 20:17:48 +00:00
defer pa.wg.Done()
if pa.conf.Source == "redirect" {
2020-10-27 23:29:53 +00:00
pa.source = &sourceRedirect{}
} else if pa.hasStaticSource() {
pa.source = newSourceStatic(
pa.conf.Source,
pa.conf.SourceProtocol,
pa.conf.SourceAnyPortEnable,
pa.conf.SourceFingerprint,
pa.readTimeout,
pa.writeTimeout,
pa.readBufferCount,
pa)
if !pa.conf.SourceOnDemand {
pa.source.(*sourceStatic).start()
}
2020-10-19 20:17:48 +00:00
}
2020-11-21 12:35:33 +00:00
var onInitCmd *externalcmd.Cmd
2020-10-19 20:17:48 +00:00
if pa.conf.RunOnInit != "" {
2021-10-27 19:01:00 +00:00
pa.log(logger.Info, "runOnInit command started")
onInitCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnInit,
pa.conf.RunOnInitRestart,
pa.externalCmdEnv(),
func(co int) {
pa.log(logger.Info, "runOnInit command exited with code %d", co)
})
2020-10-19 20:17:48 +00:00
}
err := func() error {
for {
select {
case <-pa.onDemandStaticSourceReadyTimer.C:
2022-08-14 09:24:05 +00:00
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
2022-08-14 09:24:05 +00:00
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequestsOnHold = nil
pa.onDemandStaticSourceStop()
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case <-pa.onDemandStaticSourceCloseTimer.C:
pa.sourceSetNotReady()
pa.onDemandStaticSourceStop()
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case <-pa.onDemandPublisherReadyTimer.C:
2022-08-14 09:24:05 +00:00
for _, req := range pa.describeRequestsOnHold {
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
2022-08-14 09:24:05 +00:00
pa.describeRequestsOnHold = nil
2020-10-19 20:17:48 +00:00
for _, req := range pa.setupPlayRequestsOnHold {
2022-01-14 22:42:41 +00:00
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequestsOnHold = nil
pa.onDemandPublisherStop()
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case <-pa.onDemandPublisherCloseTimer.C:
pa.onDemandPublisherStop()
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
2020-10-19 20:17:48 +00:00
case req := <-pa.chSourceStaticSetReady:
pa.sourceSetReady(req.tracks)
if pa.hasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
pa.onDemandStaticSourceScheduleClose()
2022-08-14 09:24:05 +00:00
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
2022-08-14 09:24:05 +00:00
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequestsOnHold = nil
}
2020-10-19 20:17:48 +00:00
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
case req := <-pa.chSourceStaticSetNotReady:
pa.sourceSetNotReady()
if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
pa.onDemandStaticSourceStop()
}
2022-01-14 22:42:41 +00:00
close(req.res)
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
2021-08-03 20:40:47 +00:00
case req := <-pa.chDescribe:
pa.handleDescribe(req)
2020-10-19 20:17:48 +00:00
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case req := <-pa.chPublisherRemove:
pa.handlePublisherRemove(req)
2020-10-19 20:17:48 +00:00
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
2021-08-03 20:40:47 +00:00
case req := <-pa.chPublisherAnnounce:
pa.handlePublisherAnnounce(req)
2021-03-10 14:06:45 +00:00
case req := <-pa.chPublisherRecord:
pa.handlePublisherRecord(req)
2020-10-19 20:17:48 +00:00
case req := <-pa.chPublisherPause:
pa.handlePublisherPause(req)
2020-10-19 20:17:48 +00:00
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
2021-08-03 20:40:47 +00:00
case req := <-pa.chReaderRemove:
pa.handleReaderRemove(req)
case req := <-pa.chReaderSetupPlay:
pa.handleReaderSetupPlay(req)
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case req := <-pa.chReaderPlay:
pa.handleReaderPlay(req)
case req := <-pa.chReaderPause:
pa.handleReaderPause(req)
2020-10-19 20:17:48 +00:00
case req := <-pa.chAPIPathsList:
pa.handleAPIPathsList(req)
case <-pa.ctx.Done():
return fmt.Errorf("terminated")
}
2020-10-19 20:17:48 +00:00
}
}()
2020-10-19 20:17:48 +00:00
2021-05-10 19:32:59 +00:00
pa.ctxCancel()
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherCloseTimer.Stop()
if onInitCmd != nil {
onInitCmd.Close()
2021-10-27 19:01:00 +00:00
pa.log(logger.Info, "runOnInit command stopped")
}
2022-08-14 09:24:05 +00:00
for _, req := range pa.describeRequestsOnHold {
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{err: fmt.Errorf("terminated")}
}
for _, req := range pa.setupPlayRequestsOnHold {
2022-01-14 22:42:41 +00:00
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
}
if pa.sourceReady {
pa.sourceSetNotReady()
}
if pa.source != nil {
if source, ok := pa.source.(*sourceStatic); ok {
2021-10-27 19:01:00 +00:00
source.close()
} else if source, ok := pa.source.(publisher); ok {
2021-10-27 19:01:00 +00:00
source.close()
}
}
2020-10-19 20:17:48 +00:00
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
2021-10-27 19:01:00 +00:00
pa.log(logger.Info, "runOnDemand command stopped")
}
2022-02-19 21:15:37 +00:00
pa.log(logger.Debug, "destroyed (%v)", err)
2021-10-27 19:01:00 +00:00
pa.parent.onPathClose(pa)
2020-10-19 20:17:48 +00:00
}
func (pa *path) shouldClose() bool {
return pa.conf.Regexp != nil &&
pa.source == nil &&
len(pa.readers) == 0 &&
2022-08-14 09:24:05 +00:00
len(pa.describeRequestsOnHold) == 0 &&
len(pa.setupPlayRequestsOnHold) == 0
2021-08-03 20:40:47 +00:00
}
func (pa *path) externalCmdEnv() externalcmd.Environment {
_, port, _ := net.SplitHostPort(pa.rtspAddress)
env := externalcmd.Environment{
"RTSP_PATH": pa.name,
"RTSP_PORT": port,
}
2021-12-08 20:14:03 +00:00
if len(pa.matches) > 1 {
for i, ma := range pa.matches[1:] {
env["G"+strconv.FormatInt(int64(i+1), 10)] = ma
2021-12-08 20:14:03 +00:00
}
}
return env
}
func (pa *path) onDemandStaticSourceStart() {
pa.source.(*sourceStatic).start()
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))
2021-08-03 20:40:47 +00:00
pa.onDemandStaticSourceState = pathOnDemandStateWaitingReady
2021-08-03 20:40:47 +00:00
}
func (pa *path) onDemandStaticSourceScheduleClose() {
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandCloseAfter))
pa.onDemandStaticSourceState = pathOnDemandStateClosing
}
func (pa *path) onDemandStaticSourceStop() {
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = newEmptyTimer()
2021-08-03 20:40:47 +00:00
}
pa.onDemandStaticSourceState = pathOnDemandStateInitial
pa.source.(*sourceStatic).stop()
}
func (pa *path) onDemandPublisherStart() {
pa.log(logger.Info, "runOnDemand command started")
pa.onDemandCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnDemand,
pa.conf.RunOnDemandRestart,
pa.externalCmdEnv(),
func(co int) {
pa.log(logger.Info, "runOnDemand command exited with code %d", co)
})
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandStartTimeout))
pa.onDemandPublisherState = pathOnDemandStateWaitingReady
2021-08-03 20:40:47 +00:00
}
func (pa *path) onDemandPublisherScheduleClose() {
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = time.NewTimer(time.Duration(pa.conf.RunOnDemandCloseAfter))
pa.onDemandPublisherState = pathOnDemandStateClosing
}
func (pa *path) onDemandPublisherStop() {
if pa.onDemandPublisherState == pathOnDemandStateClosing {
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = newEmptyTimer()
2021-08-03 20:40:47 +00:00
}
// set state before doPublisherRemove()
pa.onDemandPublisherState = pathOnDemandStateInitial
2021-08-03 20:40:47 +00:00
if pa.source != nil {
pa.source.(publisher).close()
pa.doPublisherRemove()
}
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
pa.log(logger.Info, "runOnDemand command stopped")
2021-08-03 20:40:47 +00:00
}
}
func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
2021-08-03 20:40:47 +00:00
pa.sourceReady = true
pa.stream = newStream(tracks)
2021-08-03 20:40:47 +00:00
if pa.conf.RunOnReady != "" {
pa.log(logger.Info, "runOnReady command started")
pa.onReadyCmd = externalcmd.NewCmd(
pa.externalCmdPool,
pa.conf.RunOnReady,
pa.conf.RunOnReadyRestart,
pa.externalCmdEnv(),
func(co int) {
pa.log(logger.Info, "runOnReady command exited with code %d", co)
})
}
pa.parent.pathSourceReady(pa)
2021-08-03 20:40:47 +00:00
}
func (pa *path) sourceSetNotReady() {
pa.parent.pathSourceNotReady(pa)
for r := range pa.readers {
pa.doReaderRemove(r)
2021-10-27 19:01:00 +00:00
r.close()
}
if pa.onReadyCmd != nil {
pa.onReadyCmd.Close()
pa.onReadyCmd = nil
pa.log(logger.Info, "runOnReady command stopped")
2021-08-03 20:40:47 +00:00
}
pa.sourceReady = false
if pa.stream != nil {
pa.stream.close()
pa.stream = nil
}
2021-08-03 20:40:47 +00:00
}
func (pa *path) doReaderRemove(r reader) {
state := pa.readers[r]
if state == pathReaderStatePlay {
pa.stream.readerRemove(r)
2020-10-19 20:17:48 +00:00
}
delete(pa.readers, r)
2020-10-19 20:17:48 +00:00
}
2021-08-03 20:40:47 +00:00
func (pa *path) doPublisherRemove() {
if pa.sourceReady {
if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial {
pa.onDemandPublisherStop()
} else {
pa.sourceSetNotReady()
}
}
pa.source = nil
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleDescribe(req pathDescribeReq) {
if _, ok := pa.source.(*sourceRedirect); ok {
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{
redirect: pa.conf.SourceRedirect,
}
return
}
2021-08-03 20:40:47 +00:00
if pa.sourceReady {
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{
stream: pa.stream,
}
return
2021-08-03 20:40:47 +00:00
}
if pa.hasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
pa.onDemandStaticSourceStart()
2021-08-03 20:40:47 +00:00
}
2022-08-14 09:24:05 +00:00
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
return
}
if pa.hasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateInitial {
pa.onDemandPublisherStart()
}
2022-08-14 09:24:05 +00:00
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
return
2021-08-03 20:40:47 +00:00
}
2021-08-03 20:40:47 +00:00
if pa.conf.Fallback != "" {
fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") {
2022-06-04 23:36:29 +00:00
ur := url.URL{
2022-01-14 22:42:41 +00:00
Scheme: req.url.Scheme,
User: req.url.User,
Host: req.url.Host,
2021-08-03 20:40:47 +00:00
Path: pa.conf.Fallback,
2021-02-09 21:33:50 +00:00
}
2021-08-03 20:40:47 +00:00
return ur.String()
}
return pa.conf.Fallback
}()
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{redirect: fallbackURL}
return
2020-10-19 20:17:48 +00:00
}
2021-08-03 20:40:47 +00:00
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{err: pathErrNoOnePublishing{pathName: pa.name}}
2020-10-19 20:17:48 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) {
2022-01-14 22:42:41 +00:00
if pa.source == req.author {
2021-08-03 20:40:47 +00:00
pa.doPublisherRemove()
2021-08-01 14:37:37 +00:00
}
2022-01-14 22:42:41 +00:00
close(req.res)
2021-08-01 14:37:37 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
if pa.conf.Source != "publisher" {
req.res <- pathPublisherAnnounceRes{
err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name),
2021-08-01 14:37:37 +00:00
}
return
}
2021-08-01 14:37:37 +00:00
if pa.source != nil {
2021-08-01 14:37:37 +00:00
if pa.conf.DisablePublisherOverride {
req.res <- pathPublisherAnnounceRes{err: fmt.Errorf("someone is already publishing to path '%s'", pa.name)}
2021-08-01 14:37:37 +00:00
return
}
2021-10-27 19:01:00 +00:00
pa.log(logger.Info, "closing existing publisher")
pa.source.(publisher).close()
2021-08-03 20:40:47 +00:00
pa.doPublisherRemove()
2021-08-01 14:37:37 +00:00
}
2022-01-14 22:42:41 +00:00
pa.source = req.author
2021-08-03 20:40:47 +00:00
2022-01-14 22:42:41 +00:00
req.res <- pathPublisherAnnounceRes{path: pa}
2021-08-01 14:37:37 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
2022-01-14 22:42:41 +00:00
if pa.source != req.author {
req.res <- pathPublisherRecordRes{err: fmt.Errorf("publisher is not assigned to this path anymore")}
2021-08-01 14:37:37 +00:00
return
}
2022-01-14 22:42:41 +00:00
req.author.onPublisherAccepted(len(req.tracks))
2021-08-01 14:37:37 +00:00
2022-01-14 22:42:41 +00:00
pa.sourceSetReady(req.tracks)
2021-08-01 14:37:37 +00:00
if pa.hasOnDemandPublisher() {
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = newEmptyTimer()
pa.onDemandPublisherScheduleClose()
2022-08-14 09:24:05 +00:00
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
2022-08-14 09:24:05 +00:00
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequestsOnHold = nil
}
2022-01-14 22:42:41 +00:00
req.res <- pathPublisherRecordRes{stream: pa.stream}
2021-08-01 14:37:37 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handlePublisherPause(req pathPublisherPauseReq) {
2022-01-14 22:42:41 +00:00
if req.author == pa.source && pa.sourceReady {
if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial {
pa.onDemandPublisherStop()
} else {
pa.sourceSetNotReady()
}
2021-08-01 14:37:37 +00:00
}
2022-01-14 22:42:41 +00:00
close(req.res)
2021-08-01 14:37:37 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderRemove(req pathReaderRemoveReq) {
2022-01-14 22:42:41 +00:00
if _, ok := pa.readers[req.author]; ok {
pa.doReaderRemove(req.author)
2021-08-01 14:37:37 +00:00
}
2022-01-14 22:42:41 +00:00
close(req.res)
2021-08-03 20:40:47 +00:00
if len(pa.readers) == 0 {
if pa.hasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateReady {
pa.onDemandStaticSourceScheduleClose()
}
} else if pa.hasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateReady {
pa.onDemandPublisherScheduleClose()
}
}
2021-08-03 20:40:47 +00:00
}
2021-08-01 14:37:37 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
2021-08-03 20:40:47 +00:00
if pa.sourceReady {
2021-08-11 10:45:53 +00:00
pa.handleReaderSetupPlayPost(req)
2021-03-10 14:06:45 +00:00
return
2021-08-03 20:40:47 +00:00
}
2021-03-10 14:06:45 +00:00
if pa.hasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
pa.onDemandStaticSourceStart()
}
pa.setupPlayRequestsOnHold = append(pa.setupPlayRequestsOnHold, req)
return
}
if pa.hasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateInitial {
pa.onDemandPublisherStart()
2021-08-03 20:40:47 +00:00
}
pa.setupPlayRequestsOnHold = append(pa.setupPlayRequestsOnHold, req)
2021-03-10 14:06:45 +00:00
return
2020-10-19 20:17:48 +00:00
}
2021-08-03 20:40:47 +00:00
2022-01-14 22:42:41 +00:00
req.res <- pathReaderSetupPlayRes{err: pathErrNoOnePublishing{pathName: pa.name}}
2021-03-10 14:06:45 +00:00
}
2020-10-19 20:17:48 +00:00
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) {
2022-01-14 22:42:41 +00:00
pa.readers[req.author] = pathReaderStatePrePlay
if pa.hasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateClosing {
pa.onDemandStaticSourceState = pathOnDemandStateReady
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandStaticSourceCloseTimer = newEmptyTimer()
}
} else if pa.hasOnDemandPublisher() {
if pa.onDemandPublisherState == pathOnDemandStateClosing {
pa.onDemandPublisherState = pathOnDemandStateReady
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = newEmptyTimer()
}
}
2022-01-14 22:42:41 +00:00
req.res <- pathReaderSetupPlayRes{
path: pa,
stream: pa.stream,
}
2020-10-19 20:17:48 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderPlay(req pathReaderPlayReq) {
2022-01-14 22:42:41 +00:00
pa.readers[req.author] = pathReaderStatePlay
2022-01-14 22:42:41 +00:00
pa.stream.readerAdd(req.author)
2022-01-14 22:42:41 +00:00
req.author.onReaderAccepted()
2022-01-14 22:42:41 +00:00
close(req.res)
2020-10-19 20:17:48 +00:00
}
2021-08-11 10:45:53 +00:00
func (pa *path) handleReaderPause(req pathReaderPauseReq) {
2022-01-14 22:42:41 +00:00
if state, ok := pa.readers[req.author]; ok && state == pathReaderStatePlay {
pa.readers[req.author] = pathReaderStatePrePlay
pa.stream.readerRemove(req.author)
}
2022-01-14 22:42:41 +00:00
close(req.res)
}
2020-11-07 21:47:10 +00:00
func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) {
2022-01-14 22:42:41 +00:00
req.data.Items[pa.name] = pathAPIPathsListItem{
2021-08-11 10:45:53 +00:00
ConfName: pa.confName,
Conf: pa.conf,
Source: func() interface{} {
if pa.source == nil {
return nil
}
return pa.source.apiSourceDescribe()
2021-08-11 10:45:53 +00:00
}(),
SourceReady: pa.sourceReady,
Readers: func() []interface{} {
ret := []interface{}{}
for r := range pa.readers {
ret = append(ret, r.apiReaderDescribe())
2021-08-11 10:45:53 +00:00
}
return ret
}(),
}
2022-01-14 22:42:41 +00:00
close(req.res)
2021-08-11 10:45:53 +00:00
}
// sourceStaticSetReady is called by sourceStatic.
func (pa *path) sourceStaticSetReady(sourceStaticCtx context.Context, req pathSourceStaticSetReadyReq) {
2021-05-10 19:32:59 +00:00
select {
case pa.chSourceStaticSetReady <- req:
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
// this avoids:
// - invalid requests sent after the source has been terminated
// - freezes caused by <-done inside stop()
case <-sourceStaticCtx.Done():
req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
}
// sourceStaticSetNotReady is called by sourceStatic.
func (pa *path) sourceStaticSetNotReady(sourceStaticCtx context.Context, req pathSourceStaticSetNotReadyReq) {
2021-05-10 19:32:59 +00:00
select {
case pa.chSourceStaticSetNotReady <- req:
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
close(req.res)
// this avoids:
// - invalid requests sent after the source has been terminated
// - freezes caused by <-done inside stop()
case <-sourceStaticCtx.Done():
close(req.res)
2021-05-10 19:32:59 +00:00
}
2020-11-05 11:30:25 +00:00
}
// describe is called by a reader or publisher through pathManager.
func (pa *path) describe(req pathDescribeReq) pathDescribeRes {
2021-05-10 19:32:59 +00:00
select {
case pa.chDescribe <- req:
2022-01-14 22:42:41 +00:00
return <-req.res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
2022-01-14 22:42:41 +00:00
return pathDescribeRes{err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
// publisherRemove is called by a publisher.
func (pa *path) publisherRemove(req pathPublisherRemoveReq) {
2022-01-14 22:42:41 +00:00
req.res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.chPublisherRemove <- req:
2022-01-14 22:42:41 +00:00
<-req.res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
2020-10-19 20:17:48 +00:00
}
// publisherAnnounce is called by a publisher through pathManager.
func (pa *path) publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
2021-05-10 19:32:59 +00:00
select {
case pa.chPublisherAnnounce <- req:
2022-01-14 22:42:41 +00:00
return <-req.res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
2022-01-14 22:42:41 +00:00
return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
// publisherRecord is called by a publisher.
func (pa *path) publisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes {
2022-01-14 22:42:41 +00:00
req.res = make(chan pathPublisherRecordRes)
select {
case pa.chPublisherRecord <- req:
2022-01-14 22:42:41 +00:00
return <-req.res
case <-pa.ctx.Done():
2022-01-14 22:42:41 +00:00
return pathPublisherRecordRes{err: fmt.Errorf("terminated")}
}
}
// publisherPause is called by a publisher.
func (pa *path) publisherPause(req pathPublisherPauseReq) {
2022-01-14 22:42:41 +00:00
req.res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.chPublisherPause <- req:
2022-01-14 22:42:41 +00:00
<-req.res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
2020-10-19 20:17:48 +00:00
}
// readerRemove is called by a reader.
func (pa *path) readerRemove(req pathReaderRemoveReq) {
2022-01-14 22:42:41 +00:00
req.res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.chReaderRemove <- req:
2022-01-14 22:42:41 +00:00
<-req.res
case <-pa.ctx.Done():
}
}
// readerSetupPlay is called by a reader through pathManager.
func (pa *path) readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
select {
case pa.chReaderSetupPlay <- req:
2022-01-14 22:42:41 +00:00
return <-req.res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
2022-01-14 22:42:41 +00:00
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
// readerPlay is called by a reader.
func (pa *path) readerPlay(req pathReaderPlayReq) {
2022-01-14 22:42:41 +00:00
req.res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.chReaderPlay <- req:
2022-01-14 22:42:41 +00:00
<-req.res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
}
// readerPause is called by a reader.
func (pa *path) readerPause(req pathReaderPauseReq) {
2022-01-14 22:42:41 +00:00
req.res = make(chan struct{})
2021-05-10 19:32:59 +00:00
select {
case pa.chReaderPause <- req:
2022-01-14 22:42:41 +00:00
<-req.res
2021-05-10 19:32:59 +00:00
case <-pa.ctx.Done():
}
2020-11-07 21:47:10 +00:00
}
2021-03-23 19:37:43 +00:00
// apiPathsList is called by api.
func (pa *path) apiPathsList(req pathAPIPathsListSubReq) {
2022-01-14 22:42:41 +00:00
req.res = make(chan struct{})
select {
case pa.chAPIPathsList <- req:
2022-01-14 22:42:41 +00:00
<-req.res
case <-pa.ctx.Done():
}
}