mediamtx/internal/core/path_manager.go

416 lines
9.7 KiB
Go
Raw Normal View History

package core
2020-10-19 20:17:48 +00:00
import (
2021-05-10 19:32:59 +00:00
"context"
2020-10-19 20:17:48 +00:00
"fmt"
"sync"
2020-11-01 21:56:56 +00:00
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
2020-10-19 20:17:48 +00:00
)
2021-08-11 10:45:53 +00:00
type pathManagerHLSServer interface {
2021-10-27 19:01:00 +00:00
onPathSourceReady(pa *path)
2021-08-11 10:45:53 +00:00
}
type pathManagerParent interface {
Log(logger.Level, string, ...interface{})
2020-10-19 20:17:48 +00:00
}
type pathManager struct {
rtspAddress string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
readBufferSize int
2021-01-10 11:55:53 +00:00
pathConfs map[string]*conf.PathConf
externalCmdPool *externalcmd.Pool
metrics *metrics
parent pathManagerParent
2020-10-19 20:17:48 +00:00
2021-05-10 19:32:59 +00:00
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
2021-08-11 10:45:53 +00:00
hlsServer pathManagerHLSServer
paths map[string]*path
2020-10-19 20:17:48 +00:00
// in
confReload chan map[string]*conf.PathConf
pathClose chan *path
pathSourceReady chan *path
describe chan pathDescribeReq
readerSetupPlay chan pathReaderSetupPlayReq
publisherAnnounce chan pathPublisherAnnounceReq
2021-08-11 10:45:53 +00:00
hlsServerSet chan pathManagerHLSServer
apiPathsList chan pathAPIPathsListReq
2020-10-19 20:17:48 +00:00
}
func newPathManager(
parentCtx context.Context,
rtspAddress string,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
readBufferSize int,
pathConfs map[string]*conf.PathConf,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
parent pathManagerParent) *pathManager {
ctx, ctxCancel := context.WithCancel(parentCtx)
2021-05-10 19:32:59 +00:00
pm := &pathManager{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
readBufferSize: readBufferSize,
pathConfs: pathConfs,
externalCmdPool: externalCmdPool,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
paths: make(map[string]*path),
confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path),
pathSourceReady: make(chan *path),
describe: make(chan pathDescribeReq),
readerSetupPlay: make(chan pathReaderSetupPlayReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq),
2021-08-11 10:45:53 +00:00
hlsServerSet: make(chan pathManagerHLSServer),
apiPathsList: make(chan pathAPIPathsListReq),
2020-10-19 20:17:48 +00:00
}
for pathConfName, pathConf := range pm.pathConfs {
2021-08-01 15:22:28 +00:00
if pathConf.Regexp == nil {
pm.createPath(pathConfName, pathConf, pathConfName, nil)
2021-08-01 15:22:28 +00:00
}
}
2020-10-19 20:17:48 +00:00
if pm.metrics != nil {
2021-10-27 19:01:00 +00:00
pm.metrics.onPathManagerSet(pm)
}
2022-02-19 21:15:37 +00:00
pm.log(logger.Debug, "path manager created")
2022-01-25 14:05:44 +00:00
2021-05-10 19:32:59 +00:00
pm.wg.Add(1)
2020-10-19 20:17:48 +00:00
go pm.run()
2021-05-10 19:32:59 +00:00
2020-10-19 20:17:48 +00:00
return pm
}
func (pm *pathManager) close() {
2022-02-19 21:15:37 +00:00
pm.log(logger.Debug, "path manager is shutting down")
2021-05-10 19:32:59 +00:00
pm.ctxCancel()
pm.wg.Wait()
2020-10-19 20:17:48 +00:00
}
2020-11-05 11:30:25 +00:00
// Log is the main logging function.
2021-10-27 19:01:00 +00:00
func (pm *pathManager) log(level logger.Level, format string, args ...interface{}) {
pm.parent.Log(level, format, args...)
2020-10-19 20:17:48 +00:00
}
func (pm *pathManager) run() {
2021-05-10 19:32:59 +00:00
defer pm.wg.Done()
2020-10-19 20:17:48 +00:00
outer:
for {
select {
case pathConfs := <-pm.confReload:
// remove confs
for pathConfName := range pm.pathConfs {
if _, ok := pathConfs[pathConfName]; !ok {
delete(pm.pathConfs, pathConfName)
}
}
// update confs
for pathConfName, oldConf := range pm.pathConfs {
if !oldConf.Equal(pathConfs[pathConfName]) {
pm.pathConfs[pathConfName] = pathConfs[pathConfName]
}
}
// add confs
for pathConfName, pathConf := range pathConfs {
if _, ok := pm.pathConfs[pathConfName]; !ok {
pm.pathConfs[pathConfName] = pathConf
}
}
// remove paths associated with a conf which doesn't exist anymore
// or has changed
for _, pa := range pm.paths {
if pathConf, ok := pm.pathConfs[pa.ConfName()]; !ok || pathConf != pa.Conf() {
delete(pm.paths, pa.Name())
2021-10-27 19:01:00 +00:00
pa.close()
}
}
2021-08-01 15:22:28 +00:00
// add new paths
for pathConfName, pathConf := range pm.pathConfs {
if _, ok := pm.paths[pathConfName]; !ok && pathConf.Regexp == nil {
pm.createPath(pathConfName, pathConf, pathConfName, nil)
2021-08-01 15:22:28 +00:00
}
}
2020-10-19 20:17:48 +00:00
case pa := <-pm.pathClose:
2021-05-10 19:32:59 +00:00
if pmpa, ok := pm.paths[pa.Name()]; !ok || pmpa != pa {
continue
}
2020-10-19 20:17:48 +00:00
delete(pm.paths, pa.Name())
2021-10-27 19:01:00 +00:00
pa.close()
2020-10-19 20:17:48 +00:00
case pa := <-pm.pathSourceReady:
if pm.hlsServer != nil {
2021-10-27 19:01:00 +00:00
pm.hlsServer.onPathSourceReady(pa)
}
case req := <-pm.describe:
2022-01-14 22:42:41 +00:00
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
2020-10-19 20:17:48 +00:00
if err != nil {
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{err: err}
2020-10-19 20:17:48 +00:00
continue
}
2022-01-14 22:42:41 +00:00
err = req.authenticate(
pathConf.ReadIPs,
2021-03-10 14:06:45 +00:00
pathConf.ReadUser,
pathConf.ReadPass)
2020-10-19 20:17:48 +00:00
if err != nil {
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{err: err}
2020-10-19 20:17:48 +00:00
continue
}
// create path if it doesn't exist
2022-01-14 22:42:41 +00:00
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
2020-10-19 20:17:48 +00:00
}
2022-01-14 22:42:41 +00:00
req.res <- pathDescribeRes{path: pm.paths[req.pathName]}
2020-10-19 20:17:48 +00:00
case req := <-pm.readerSetupPlay:
2022-01-14 22:42:41 +00:00
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
2020-10-19 20:17:48 +00:00
if err != nil {
2022-01-14 22:42:41 +00:00
req.res <- pathReaderSetupPlayRes{err: err}
2020-10-19 20:17:48 +00:00
continue
}
2022-01-14 22:42:41 +00:00
if req.authenticate != nil {
err = req.authenticate(
pathConf.ReadIPs,
pathConf.ReadUser,
pathConf.ReadPass)
if err != nil {
2022-01-14 22:42:41 +00:00
req.res <- pathReaderSetupPlayRes{err: err}
continue
}
2020-10-19 20:17:48 +00:00
}
// create path if it doesn't exist
2022-01-14 22:42:41 +00:00
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
2020-10-19 20:17:48 +00:00
}
2022-01-14 22:42:41 +00:00
req.res <- pathReaderSetupPlayRes{path: pm.paths[req.pathName]}
2020-10-19 20:17:48 +00:00
case req := <-pm.publisherAnnounce:
2022-01-14 22:42:41 +00:00
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
2020-10-19 20:17:48 +00:00
if err != nil {
2022-01-14 22:42:41 +00:00
req.res <- pathPublisherAnnounceRes{err: err}
2020-10-19 20:17:48 +00:00
continue
}
2022-01-14 22:42:41 +00:00
err = req.authenticate(
pathConf.PublishIPs,
2021-03-10 14:06:45 +00:00
pathConf.PublishUser,
pathConf.PublishPass)
2020-10-19 20:17:48 +00:00
if err != nil {
2022-01-14 22:42:41 +00:00
req.res <- pathPublisherAnnounceRes{err: err}
2020-10-19 20:17:48 +00:00
continue
}
// create path if it doesn't exist
2022-01-14 22:42:41 +00:00
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
}
2022-01-14 22:42:41 +00:00
req.res <- pathPublisherAnnounceRes{path: pm.paths[req.pathName]}
2020-10-19 20:17:48 +00:00
case s := <-pm.hlsServerSet:
pm.hlsServer = s
case req := <-pm.apiPathsList:
paths := make(map[string]*path)
for name, pa := range pm.paths {
paths[name] = pa
}
2022-01-14 22:42:41 +00:00
req.res <- pathAPIPathsListRes{
paths: paths,
}
2021-05-10 19:32:59 +00:00
case <-pm.ctx.Done():
2020-10-19 20:17:48 +00:00
break outer
}
}
2021-05-10 19:32:59 +00:00
pm.ctxCancel()
if pm.metrics != nil {
2021-10-27 19:01:00 +00:00
pm.metrics.onPathManagerSet(nil)
}
2020-10-19 20:17:48 +00:00
}
func (pm *pathManager) createPath(
pathConfName string,
pathConf *conf.PathConf,
name string,
matches []string) {
pm.paths[name] = newPath(
2021-05-11 15:20:32 +00:00
pm.ctx,
pm.rtspAddress,
pm.readTimeout,
pm.writeTimeout,
pm.readBufferCount,
pm.readBufferSize,
pathConfName,
pathConf,
name,
matches,
&pm.wg,
pm.externalCmdPool,
pm)
}
func (pm *pathManager) findPathConf(name string) (string, *conf.PathConf, []string, error) {
err := conf.IsValidPathName(name)
2020-10-19 20:17:48 +00:00
if err != nil {
return "", nil, nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
2020-10-19 20:17:48 +00:00
}
// normal path
if pathConf, ok := pm.pathConfs[name]; ok {
return name, pathConf, nil, nil
2020-10-19 20:17:48 +00:00
}
// regular expression path
for pathConfName, pathConf := range pm.pathConfs {
if pathConf.Regexp != nil {
m := pathConf.Regexp.FindStringSubmatch(name)
if m != nil {
return pathConfName, pathConf, m, nil
}
2020-10-19 20:17:48 +00:00
}
}
return "", nil, nil, fmt.Errorf("path '%s' is not configured", name)
}
2021-10-27 19:01:00 +00:00
// onConfReload is called by core.
func (pm *pathManager) onConfReload(pathConfs map[string]*conf.PathConf) {
2021-05-10 19:32:59 +00:00
select {
case pm.confReload <- pathConfs:
case <-pm.ctx.Done():
}
2020-10-19 20:17:48 +00:00
}
2021-10-27 19:01:00 +00:00
// onPathSourceReady is called by path.
func (pm *pathManager) onPathSourceReady(pa *path) {
select {
case pm.pathSourceReady <- pa:
case <-pm.ctx.Done():
}
}
2021-10-27 19:01:00 +00:00
// onPathClose is called by path.
func (pm *pathManager) onPathClose(pa *path) {
2021-05-10 19:32:59 +00:00
select {
case pm.pathClose <- pa:
case <-pm.ctx.Done():
}
2020-10-19 20:17:48 +00:00
}
2021-10-27 19:01:00 +00:00
// onDescribe is called by a reader or publisher.
func (pm *pathManager) onDescribe(req pathDescribeReq) pathDescribeRes {
2022-01-14 22:42:41 +00:00
req.res = make(chan pathDescribeRes)
2021-05-10 19:32:59 +00:00
select {
case pm.describe <- req:
2022-01-14 22:42:41 +00:00
res := <-req.res
if res.err != nil {
2021-08-01 15:22:28 +00:00
return res
}
2022-01-14 22:42:41 +00:00
return res.path.onDescribe(req)
2021-08-01 15:22:28 +00:00
2021-05-10 19:32:59 +00:00
case <-pm.ctx.Done():
2022-01-14 22:42:41 +00:00
return pathDescribeRes{err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
2021-10-27 19:01:00 +00:00
// onPublisherAnnounce is called by a publisher.
func (pm *pathManager) onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
2022-01-14 22:42:41 +00:00
req.res = make(chan pathPublisherAnnounceRes)
2021-05-10 19:32:59 +00:00
select {
case pm.publisherAnnounce <- req:
2022-01-14 22:42:41 +00:00
res := <-req.res
if res.err != nil {
2021-08-01 15:22:28 +00:00
return res
}
2022-01-14 22:42:41 +00:00
return res.path.onPublisherAnnounce(req)
2021-08-01 15:22:28 +00:00
2021-05-10 19:32:59 +00:00
case <-pm.ctx.Done():
2022-01-14 22:42:41 +00:00
return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
2021-10-27 19:01:00 +00:00
// onReaderSetupPlay is called by a reader.
func (pm *pathManager) onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
2022-01-14 22:42:41 +00:00
req.res = make(chan pathReaderSetupPlayRes)
2021-05-10 19:32:59 +00:00
select {
case pm.readerSetupPlay <- req:
2022-01-14 22:42:41 +00:00
res := <-req.res
if res.err != nil {
2021-08-01 15:22:28 +00:00
return res
}
2022-01-14 22:42:41 +00:00
return res.path.onReaderSetupPlay(req)
2021-08-01 15:22:28 +00:00
2021-05-10 19:32:59 +00:00
case <-pm.ctx.Done():
2022-01-14 22:42:41 +00:00
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
2021-05-10 19:32:59 +00:00
}
2020-10-19 20:17:48 +00:00
}
2021-05-07 21:07:31 +00:00
2021-10-27 19:01:00 +00:00
// onHLSServerSet is called by hlsServer.
func (pm *pathManager) onHLSServerSet(s pathManagerHLSServer) {
select {
case pm.hlsServerSet <- s:
case <-pm.ctx.Done():
}
}
2021-10-27 19:01:00 +00:00
// onAPIPathsList is called by api.
func (pm *pathManager) onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListRes {
2022-01-14 22:42:41 +00:00
req.res = make(chan pathAPIPathsListRes)
select {
case pm.apiPathsList <- req:
2022-01-14 22:42:41 +00:00
res := <-req.res
2022-01-14 22:42:41 +00:00
res.data = &pathAPIPathsListData{
Items: make(map[string]pathAPIPathsListItem),
}
2022-01-14 22:42:41 +00:00
for _, pa := range res.paths {
pa.onAPIPathsList(pathAPIPathsListSubReq{data: res.data})
}
2021-11-05 16:14:31 +00:00
return res
case <-pm.ctx.Done():
2022-01-14 22:42:41 +00:00
return pathAPIPathsListRes{err: fmt.Errorf("terminated")}
}
}