mediamtx/internal/core/path.go

980 lines
23 KiB
Go

package core
import (
"context"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
func newEmptyTimer() *time.Timer {
t := time.NewTimer(0)
<-t.C
return t
}
type pathErrNoOnePublishing struct {
PathName string
}
// Error implements the error interface.
func (e pathErrNoOnePublishing) Error() string {
return fmt.Sprintf("no one is publishing to path '%s'", e.PathName)
}
type pathErrAuthNotCritical struct {
*base.Response
}
// Error implements the error interface.
func (pathErrAuthNotCritical) Error() string {
return "non-critical authentication error"
}
type pathErrAuthCritical struct {
Message string
Response *base.Response
}
// Error implements the error interface.
func (pathErrAuthCritical) Error() string {
return "critical authentication error"
}
type pathParent interface {
Log(logger.Level, string, ...interface{})
OnPathSourceReady(*path)
OnPathClose(*path)
}
type pathRTSPSession interface {
IsRTSPSession()
}
type sourceRedirect struct{}
func (*sourceRedirect) IsSource() {}
type pathReaderState int
const (
pathReaderStatePrePlay pathReaderState = iota
pathReaderStatePlay
)
type pathSourceState int
const (
pathSourceStateNotReady pathSourceState = iota
pathSourceStateCreating
pathSourceStateReady
)
type pathSourceStaticSetReadyReq struct {
Tracks gortsplib.Tracks
Res chan struct{}
}
type pathSourceStaticSetNotReadyReq struct {
Res chan struct{}
}
type pathReaderRemoveReq struct {
Author reader
Res chan struct{}
}
type pathPublisherRemoveReq struct {
Author publisher
Res chan struct{}
}
type pathDescribeRes struct {
Stream *gortsplib.ServerStream
Redirect string
Err error
}
type pathDescribeReq struct {
PathName string
URL *base.URL
IP net.IP
ValidateCredentials func(pathUser string, pathPass string) error
Res chan pathDescribeRes
}
type pathReaderSetupPlayRes struct {
Path *path
Stream *gortsplib.ServerStream
Err error
}
type pathReaderSetupPlayReq struct {
Author reader
PathName string
IP net.IP
ValidateCredentials func(pathUser string, pathPass string) error
Res chan pathReaderSetupPlayRes
}
type pathPublisherAnnounceRes struct {
Path *path
Err error
}
type pathPublisherAnnounceReq struct {
Author publisher
PathName string
Tracks gortsplib.Tracks
IP net.IP
ValidateCredentials func(pathUser string, pathPass string) error
Res chan pathPublisherAnnounceRes
}
type pathReaderPlayReq struct {
Author reader
Res chan struct{}
}
type pathPublisherRecordRes struct {
Err error
}
type pathPublisherRecordReq struct {
Author publisher
Res chan pathPublisherRecordRes
}
type pathReaderPauseReq struct {
Author reader
Res chan struct{}
}
type pathPublisherPauseReq struct {
Author publisher
Res chan struct{}
}
type pathReadersMap struct {
mutex sync.RWMutex
ma map[reader]struct{}
}
func newPathReadersMap() *pathReadersMap {
return &pathReadersMap{
ma: make(map[reader]struct{}),
}
}
func (m *pathReadersMap) add(r reader) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.ma[r] = struct{}{}
}
func (m *pathReadersMap) remove(r reader) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.ma, r)
}
func (m *pathReadersMap) forwardFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for c := range m.ma {
c.OnReaderFrame(trackID, streamType, payload)
}
}
type path struct {
rtspAddress string
readTimeout time.Duration
writeTimeout time.Duration
readBufferCount int
readBufferSize int
confName string
conf *conf.PathConf
name string
wg *sync.WaitGroup
stats *stats
parent pathParent
ctx context.Context
ctxCancel func()
readers map[reader]pathReaderState
describeRequests []pathDescribeReq
setupPlayRequests []pathReaderSetupPlayReq
source source
sourceStaticWg sync.WaitGroup
stream *gortsplib.ServerStream
nonRTSPReaders *pathReadersMap
onDemandCmd *externalcmd.Cmd
onPublishCmd *externalcmd.Cmd
describeTimer *time.Timer
sourceCloseTimer *time.Timer
sourceCloseTimerStarted bool
sourceState pathSourceState
runOnDemandCloseTimer *time.Timer
runOnDemandCloseTimerStarted bool
closeTimer *time.Timer
closeTimerStarted bool
// in
sourceStaticSetReady chan pathSourceStaticSetReadyReq
sourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq
describe chan pathDescribeReq
publisherRemove chan pathPublisherRemoveReq
publisherAnnounce chan pathPublisherAnnounceReq
publisherRecord chan pathPublisherRecordReq
publisherPause chan pathPublisherPauseReq
readerRemove chan pathReaderRemoveReq
readerSetupPlay chan pathReaderSetupPlayReq
readerPlay chan pathReaderPlayReq
readerPause chan pathReaderPauseReq
}
func newPath(
parentCtx context.Context,
rtspAddress string,
readTimeout time.Duration,
writeTimeout time.Duration,
readBufferCount int,
readBufferSize int,
confName string,
conf *conf.PathConf,
name string,
wg *sync.WaitGroup,
stats *stats,
parent pathParent) *path {
ctx, ctxCancel := context.WithCancel(parentCtx)
pa := &path{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
readBufferSize: readBufferSize,
confName: confName,
conf: conf,
name: name,
wg: wg,
stats: stats,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
readers: make(map[reader]pathReaderState),
nonRTSPReaders: newPathReadersMap(),
describeTimer: newEmptyTimer(),
sourceCloseTimer: newEmptyTimer(),
runOnDemandCloseTimer: newEmptyTimer(),
closeTimer: newEmptyTimer(),
sourceStaticSetReady: make(chan pathSourceStaticSetReadyReq),
sourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
describe: make(chan pathDescribeReq),
publisherRemove: make(chan pathPublisherRemoveReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq),
publisherRecord: make(chan pathPublisherRecordReq),
publisherPause: make(chan pathPublisherPauseReq),
readerRemove: make(chan pathReaderRemoveReq),
readerSetupPlay: make(chan pathReaderSetupPlayReq),
readerPlay: make(chan pathReaderPlayReq),
readerPause: make(chan pathReaderPauseReq),
}
pa.wg.Add(1)
go pa.run()
return pa
}
func (pa *path) Close() {
pa.ctxCancel()
}
// Log is the main logging function.
func (pa *path) Log(level logger.Level, format string, args ...interface{}) {
pa.parent.Log(level, "[path "+pa.name+"] "+format, args...)
}
// ConfName returns the configuration name of this path.
func (pa *path) ConfName() string {
return pa.confName
}
// Conf returns the configuration of this path.
func (pa *path) Conf() *conf.PathConf {
return pa.conf
}
// Name returns the name of this path.
func (pa *path) Name() string {
return pa.name
}
func (pa *path) run() {
defer pa.wg.Done()
if pa.conf.Source == "redirect" {
pa.source = &sourceRedirect{}
} else if pa.hasStaticSource() && !pa.conf.SourceOnDemand {
pa.startStaticSource()
}
var onInitCmd *externalcmd.Cmd
if pa.conf.RunOnInit != "" {
pa.Log(logger.Info, "on init command started")
_, port, _ := net.SplitHostPort(pa.rtspAddress)
onInitCmd = externalcmd.New(pa.conf.RunOnInit, pa.conf.RunOnInitRestart, externalcmd.Environment{
Path: pa.name,
Port: port,
})
}
outer:
for {
select {
case <-pa.describeTimer.C:
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequests = nil
// set state after removeReader(), so schedule* works once
pa.sourceState = pathSourceStateNotReady
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
case <-pa.sourceCloseTimer.C:
pa.sourceCloseTimerStarted = false
pa.source.(sourceStatic).Close()
pa.source = nil
pa.scheduleClose()
case <-pa.runOnDemandCloseTimer.C:
pa.runOnDemandCloseTimerStarted = false
pa.Log(logger.Info, "on demand command stopped")
pa.onDemandCmd.Close()
pa.onDemandCmd = nil
pa.scheduleClose()
case <-pa.closeTimer.C:
break outer
case req := <-pa.sourceStaticSetReady:
pa.stream = gortsplib.NewServerStream(req.Tracks)
pa.onSourceSetReady()
close(req.Res)
case req := <-pa.sourceStaticSetNotReady:
pa.onSourceSetNotReady()
close(req.Res)
case req := <-pa.describe:
pa.onDescribe(req)
case req := <-pa.publisherRemove:
pa.onPublisherRemove(req)
case req := <-pa.publisherAnnounce:
pa.onPublisherAnnounce(req)
case req := <-pa.publisherRecord:
pa.onPublisherRecord(req)
case req := <-pa.publisherPause:
pa.onPublisherPause(req)
case req := <-pa.readerRemove:
pa.onReaderRemove(req)
case req := <-pa.readerSetupPlay:
pa.onReaderSetupPlay(req)
case req := <-pa.readerPlay:
pa.onReaderPlay(req)
case req := <-pa.readerPause:
pa.onReaderPause(req)
case <-pa.ctx.Done():
break outer
}
}
pa.ctxCancel()
pa.describeTimer.Stop()
pa.sourceCloseTimer.Stop()
pa.runOnDemandCloseTimer.Stop()
pa.closeTimer.Stop()
if onInitCmd != nil {
pa.Log(logger.Info, "on init command stopped")
onInitCmd.Close()
}
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{Err: fmt.Errorf("terminated")}
}
for _, req := range pa.setupPlayRequests {
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
}
for rp, state := range pa.readers {
if state == pathReaderStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1)
if _, ok := rp.(pathRTSPSession); !ok {
pa.nonRTSPReaders.remove(rp)
}
}
rp.Close()
}
if pa.onDemandCmd != nil {
pa.Log(logger.Info, "on demand command stopped")
pa.onDemandCmd.Close()
}
if pa.source != nil {
if source, ok := pa.source.(sourceStatic); ok {
source.Close()
pa.sourceStaticWg.Wait()
} else if source, ok := pa.source.(publisher); ok {
if pa.sourceState == pathSourceStateReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
}
source.Close()
}
}
pa.parent.OnPathClose(pa)
}
func (pa *path) hasStaticSource() bool {
return strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://") ||
strings.HasPrefix(pa.conf.Source, "rtmp://")
}
func (pa *path) startStaticSource() {
if strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://") {
pa.source = newRTSPSource(
pa.ctx,
pa.conf.Source,
pa.conf.SourceProtocolParsed,
pa.conf.SourceAnyPortEnable,
pa.conf.SourceFingerprint,
pa.readTimeout,
pa.writeTimeout,
pa.readBufferCount,
pa.readBufferSize,
&pa.sourceStaticWg,
pa.stats,
pa)
} else if strings.HasPrefix(pa.conf.Source, "rtmp://") {
pa.source = newRTMPSource(
pa.ctx,
pa.conf.Source,
pa.readTimeout,
pa.writeTimeout,
&pa.sourceStaticWg,
pa.stats,
pa)
}
}
func (pa *path) removeReader(r reader) {
state := pa.readers[r]
if state == pathReaderStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1)
if _, ok := r.(pathRTSPSession); !ok {
pa.nonRTSPReaders.remove(r)
}
}
delete(pa.readers, r)
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
}
func (pa *path) removePublisher(p publisher) {
if pa.sourceState == pathSourceStateReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.onSourceSetNotReady()
}
pa.source = nil
pa.stream.Close()
pa.stream = nil
for r := range pa.readers {
pa.removeReader(r)
r.Close()
}
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
}
func (pa *path) fixedPublisherStart() {
if pa.hasStaticSource() {
// start on-demand source
if pa.source == nil {
pa.startStaticSource()
if pa.sourceState != pathSourceStateCreating {
pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout)
pa.sourceState = pathSourceStateCreating
}
// reset timer
} else if pa.sourceCloseTimerStarted {
pa.sourceCloseTimer.Stop()
pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter)
}
}
if pa.conf.RunOnDemand != "" {
// start on-demand command
if pa.onDemandCmd == nil {
pa.Log(logger.Info, "on demand command started")
_, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{
Path: pa.name,
Port: port,
})
if pa.sourceState != pathSourceStateCreating {
pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout)
pa.sourceState = pathSourceStateCreating
}
// reset timer
} else if pa.runOnDemandCloseTimerStarted {
pa.runOnDemandCloseTimer.Stop()
pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter)
}
}
}
func (pa *path) onSourceSetReady() {
if pa.sourceState == pathSourceStateCreating {
pa.describeTimer.Stop()
pa.describeTimer = newEmptyTimer()
}
pa.sourceState = pathSourceStateReady
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{
Stream: pa.stream,
}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
pa.onReaderSetupPlayPost(req)
}
pa.setupPlayRequests = nil
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
pa.parent.OnPathSourceReady(pa)
}
func (pa *path) onSourceSetNotReady() {
pa.sourceState = pathSourceStateNotReady
if pa.onPublishCmd != nil {
pa.onPublishCmd.Close()
pa.onPublishCmd = nil
}
for r := range pa.readers {
pa.removeReader(r)
r.Close()
}
}
func (pa *path) onDescribe(req pathDescribeReq) {
pa.fixedPublisherStart()
pa.scheduleClose()
if _, ok := pa.source.(*sourceRedirect); ok {
req.Res <- pathDescribeRes{
Redirect: pa.conf.SourceRedirect,
}
return
}
switch pa.sourceState {
case pathSourceStateReady:
req.Res <- pathDescribeRes{
Stream: pa.stream,
}
return
case pathSourceStateCreating:
pa.describeRequests = append(pa.describeRequests, req)
return
case pathSourceStateNotReady:
if pa.conf.Fallback != "" {
fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") {
ur := base.URL{
Scheme: req.URL.Scheme,
User: req.URL.User,
Host: req.URL.Host,
Path: pa.conf.Fallback,
}
return ur.String()
}
return pa.conf.Fallback
}()
req.Res <- pathDescribeRes{Redirect: fallbackURL}
return
}
req.Res <- pathDescribeRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
return
}
}
func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) {
if pa.source == req.Author {
pa.removePublisher(req.Author)
}
close(req.Res)
}
func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) {
if pa.source != nil {
if pa.hasStaticSource() {
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("path '%s' is assigned to a static source", pa.name)}
return
}
if pa.conf.DisablePublisherOverride {
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("another publisher is already publishing to path '%s'", pa.name)}
return
}
pa.Log(logger.Info, "closing existing publisher")
curPub := pa.source.(publisher)
pa.removePublisher(curPub)
curPub.Close()
// prevent path closure
if pa.closeTimerStarted {
pa.closeTimer.Stop()
pa.closeTimer = newEmptyTimer()
pa.closeTimerStarted = false
}
}
pa.source = req.Author
pa.stream = gortsplib.NewServerStream(req.Tracks)
req.Res <- pathPublisherAnnounceRes{Path: pa}
}
func (pa *path) onPublisherRecord(req pathPublisherRecordReq) {
if pa.source != req.Author {
req.Res <- pathPublisherRecordRes{Err: fmt.Errorf("publisher is not assigned to this path anymore")}
return
}
atomic.AddInt64(pa.stats.CountPublishers, 1)
req.Author.OnPublisherAccepted(len(pa.stream.Tracks()))
pa.onSourceSetReady()
if pa.conf.RunOnPublish != "" {
_, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onPublishCmd = externalcmd.New(pa.conf.RunOnPublish, pa.conf.RunOnPublishRestart, externalcmd.Environment{
Path: pa.name,
Port: port,
})
}
req.Res <- pathPublisherRecordRes{}
}
func (pa *path) onPublisherPause(req pathPublisherPauseReq) {
if req.Author == pa.source && pa.sourceState == pathSourceStateReady {
atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.onSourceSetNotReady()
}
close(req.Res)
}
func (pa *path) onReaderRemove(req pathReaderRemoveReq) {
if _, ok := pa.readers[req.Author]; ok {
pa.removeReader(req.Author)
}
close(req.Res)
}
func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) {
pa.fixedPublisherStart()
pa.scheduleClose()
switch pa.sourceState {
case pathSourceStateReady:
pa.onReaderSetupPlayPost(req)
return
case pathSourceStateCreating:
pa.setupPlayRequests = append(pa.setupPlayRequests, req)
return
case pathSourceStateNotReady:
req.Res <- pathReaderSetupPlayRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
return
}
}
func (pa *path) onReaderSetupPlayPost(req pathReaderSetupPlayReq) {
if _, ok := pa.readers[req.Author]; !ok {
// prevent on-demand source from closing
if pa.sourceCloseTimerStarted {
pa.sourceCloseTimer = newEmptyTimer()
pa.sourceCloseTimerStarted = false
}
// prevent on-demand command from closing
if pa.runOnDemandCloseTimerStarted {
pa.runOnDemandCloseTimer = newEmptyTimer()
pa.runOnDemandCloseTimerStarted = false
}
pa.readers[req.Author] = pathReaderStatePrePlay
}
req.Res <- pathReaderSetupPlayRes{
Path: pa,
Stream: pa.stream,
}
}
func (pa *path) onReaderPlay(req pathReaderPlayReq) {
atomic.AddInt64(pa.stats.CountReaders, 1)
pa.readers[req.Author] = pathReaderStatePlay
if _, ok := req.Author.(pathRTSPSession); !ok {
pa.nonRTSPReaders.add(req.Author)
}
req.Author.OnReaderAccepted()
close(req.Res)
}
func (pa *path) onReaderPause(req pathReaderPauseReq) {
if state, ok := pa.readers[req.Author]; ok && state == pathReaderStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1)
pa.readers[req.Author] = pathReaderStatePrePlay
if _, ok := req.Author.(pathRTSPSession); !ok {
pa.nonRTSPReaders.remove(req.Author)
}
}
close(req.Res)
}
func (pa *path) scheduleSourceClose() {
if !pa.hasStaticSource() || !pa.conf.SourceOnDemand || pa.source == nil {
return
}
if pa.sourceCloseTimerStarted ||
pa.sourceState == pathSourceStateCreating ||
len(pa.readers) > 0 ||
pa.source != nil {
return
}
pa.sourceCloseTimer.Stop()
pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter)
pa.sourceCloseTimerStarted = true
}
func (pa *path) scheduleRunOnDemandClose() {
if pa.conf.RunOnDemand == "" || pa.onDemandCmd == nil {
return
}
if pa.runOnDemandCloseTimerStarted ||
pa.sourceState == pathSourceStateCreating ||
len(pa.readers) > 0 {
return
}
pa.runOnDemandCloseTimer.Stop()
pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter)
pa.runOnDemandCloseTimerStarted = true
}
func (pa *path) scheduleClose() {
if pa.conf.Regexp != nil &&
len(pa.readers) == 0 &&
pa.source == nil &&
pa.sourceState != pathSourceStateCreating &&
!pa.sourceCloseTimerStarted &&
!pa.runOnDemandCloseTimerStarted &&
!pa.closeTimerStarted {
pa.closeTimer.Stop()
pa.closeTimer = time.NewTimer(0)
pa.closeTimerStarted = true
}
}
// OnSourceStaticSetReady is called by a sourceStatic.
func (pa *path) OnSourceStaticSetReady(req pathSourceStaticSetReadyReq) {
req.Res = make(chan struct{})
select {
case pa.sourceStaticSetReady <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
// OnSourceStaticSetNotReady is called by a sourceStatic.
func (pa *path) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.Res = make(chan struct{})
select {
case pa.sourceStaticSetNotReady <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
// OnDescribe is called by pathManager (asynchronous).
func (pa *path) OnDescribe(req pathDescribeReq) {
select {
case pa.describe <- req:
case <-pa.ctx.Done():
req.Res <- pathDescribeRes{Err: fmt.Errorf("terminated")}
}
}
// OnPublisherRemove is called by a publisher.
func (pa *path) OnPublisherRemove(req pathPublisherRemoveReq) {
req.Res = make(chan struct{})
select {
case pa.publisherRemove <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
// OnPublisherAnnounce is called by pathManager (asynchronous).
func (pa *path) OnPublisherAnnounce(req pathPublisherAnnounceReq) {
select {
case pa.publisherAnnounce <- req:
case <-pa.ctx.Done():
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
}
}
// OnPublisherRecord is called by a publisher.
func (pa *path) OnPublisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes {
req.Res = make(chan pathPublisherRecordRes)
select {
case pa.publisherRecord <- req:
return <-req.Res
case <-pa.ctx.Done():
return pathPublisherRecordRes{Err: fmt.Errorf("terminated")}
}
}
// OnPublisherPause is called by a publisher.
func (pa *path) OnPublisherPause(req pathPublisherPauseReq) {
req.Res = make(chan struct{})
select {
case pa.publisherPause <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
// OnReaderRemove is called by a reader.
func (pa *path) OnReaderRemove(req pathReaderRemoveReq) {
req.Res = make(chan struct{})
select {
case pa.readerRemove <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
// OnReaderSetupPlay is called by pathManager (asynchronous).
func (pa *path) OnReaderSetupPlay(req pathReaderSetupPlayReq) {
select {
case pa.readerSetupPlay <- req:
case <-pa.ctx.Done():
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
}
}
// OnReaderPlay is called by a reader.
func (pa *path) OnReaderPlay(req pathReaderPlayReq) {
req.Res = make(chan struct{})
select {
case pa.readerPlay <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
// OnReaderPause is called by a reader.
func (pa *path) OnReaderPause(req pathReaderPauseReq) {
req.Res = make(chan struct{})
select {
case pa.readerPause <- req:
<-req.Res
case <-pa.ctx.Done():
}
}
// OnSourceFrame is called by a source.
func (pa *path) OnSourceFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
// forward to RTSP readers
pa.stream.WriteFrame(trackID, streamType, payload)
// forward to non-RTSP readers
pa.nonRTSPReaders.forwardFrame(trackID, streamType, payload)
}