mediamtx/internal/pathman/pathman.go

396 lines
9.0 KiB
Go
Raw Normal View History

2020-10-19 20:17:48 +00:00
package pathman
import (
"fmt"
2021-05-07 21:07:31 +00:00
"net"
2020-10-19 20:17:48 +00:00
"sync"
"time"
2021-05-07 21:07:31 +00:00
"github.com/aler9/gortsplib/pkg/base"
2020-11-15 16:56:54 +00:00
"github.com/aler9/gortsplib/pkg/headers"
2020-10-19 20:17:48 +00:00
2020-11-01 21:56:56 +00:00
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/logger"
2020-11-01 21:56:56 +00:00
"github.com/aler9/rtsp-simple-server/internal/path"
2021-04-27 17:19:47 +00:00
"github.com/aler9/rtsp-simple-server/internal/readpublisher"
2020-11-01 21:56:56 +00:00
"github.com/aler9/rtsp-simple-server/internal/stats"
2020-10-19 20:17:48 +00:00
)
2021-05-07 21:07:31 +00:00
func ipEqualOrInRange(ip net.IP, ips []interface{}) bool {
for _, item := range ips {
switch titem := item.(type) {
case net.IP:
if titem.Equal(ip) {
return true
}
case *net.IPNet:
if titem.Contains(ip) {
return true
}
}
}
return false
}
2020-11-05 11:30:25 +00:00
// Parent is implemented by program.
2020-10-19 20:17:48 +00:00
type Parent interface {
Log(logger.Level, string, ...interface{})
2020-10-19 20:17:48 +00:00
}
2020-11-05 11:30:25 +00:00
// PathManager is a path.Path manager.
2020-10-19 20:17:48 +00:00
type PathManager struct {
rtspAddress string
2021-01-10 11:55:53 +00:00
readTimeout time.Duration
writeTimeout time.Duration
readBufferCount int
readBufferSize int
2021-01-10 11:55:53 +00:00
authMethods []headers.AuthMethod
pathConfs map[string]*conf.PathConf
stats *stats.Stats
parent Parent
2020-10-19 20:17:48 +00:00
paths map[string]*path.Path
wg sync.WaitGroup
// in
2021-05-09 12:41:18 +00:00
confReload chan map[string]*conf.PathConf
pathClose chan *path.Path
rpDescribe chan readpublisher.DescribeReq
rpSetupPlay chan readpublisher.SetupPlayReq
rpAnnounce chan readpublisher.AnnounceReq
terminate chan struct{}
2020-10-19 20:17:48 +00:00
// out
2021-04-27 11:43:15 +00:00
done chan struct{}
2020-10-19 20:17:48 +00:00
}
2020-11-05 11:30:25 +00:00
// New allocates a PathManager.
func New(
rtspAddress string,
2020-10-19 20:17:48 +00:00
readTimeout time.Duration,
writeTimeout time.Duration,
readBufferCount int,
readBufferSize int,
2020-10-19 20:17:48 +00:00
authMethods []headers.AuthMethod,
pathConfs map[string]*conf.PathConf,
stats *stats.Stats,
2020-10-19 20:17:48 +00:00
parent Parent) *PathManager {
pm := &PathManager{
rtspAddress: rtspAddress,
2020-10-19 20:17:48 +00:00
readTimeout: readTimeout,
writeTimeout: writeTimeout,
2021-01-10 11:55:53 +00:00
readBufferCount: readBufferCount,
readBufferSize: readBufferSize,
2020-10-19 20:17:48 +00:00
authMethods: authMethods,
pathConfs: pathConfs,
stats: stats,
2020-10-19 20:17:48 +00:00
parent: parent,
paths: make(map[string]*path.Path),
confReload: make(chan map[string]*conf.PathConf),
2020-10-19 20:17:48 +00:00
pathClose: make(chan *path.Path),
2021-05-09 12:41:18 +00:00
rpDescribe: make(chan readpublisher.DescribeReq),
rpSetupPlay: make(chan readpublisher.SetupPlayReq),
rpAnnounce: make(chan readpublisher.AnnounceReq),
2020-10-19 20:17:48 +00:00
terminate: make(chan struct{}),
done: make(chan struct{}),
}
pm.createPaths()
2020-10-19 20:17:48 +00:00
go pm.run()
return pm
}
2020-11-05 11:30:25 +00:00
// Close closes a PathManager.
2020-10-19 20:17:48 +00:00
func (pm *PathManager) Close() {
close(pm.terminate)
<-pm.done
}
2020-11-05 11:30:25 +00:00
// Log is the main logging function.
func (pm *PathManager) Log(level logger.Level, format string, args ...interface{}) {
pm.parent.Log(level, format, args...)
2020-10-19 20:17:48 +00:00
}
func (pm *PathManager) run() {
defer close(pm.done)
outer:
for {
select {
case pathConfs := <-pm.confReload:
// remove confs
for pathName := range pm.pathConfs {
if _, ok := pathConfs[pathName]; !ok {
delete(pm.pathConfs, pathName)
}
}
// update confs
for pathName, oldConf := range pm.pathConfs {
if !oldConf.Equal(pathConfs[pathName]) {
pm.pathConfs[pathName] = pathConfs[pathName]
}
}
// add confs
for pathName, pathConf := range pathConfs {
if _, ok := pm.pathConfs[pathName]; !ok {
pm.pathConfs[pathName] = pathConf
}
}
// remove paths associated with a conf which doesn't exist anymore
// or has changed
for _, pa := range pm.paths {
if pathConf, ok := pm.pathConfs[pa.ConfName()]; !ok || pathConf != pa.Conf() {
delete(pm.paths, pa.Name())
pa.Close()
}
}
// add paths
pm.createPaths()
2020-10-19 20:17:48 +00:00
case pa := <-pm.pathClose:
if _, ok := pm.paths[pa.Name()]; !ok {
continue
}
2020-10-19 20:17:48 +00:00
delete(pm.paths, pa.Name())
pa.Close()
2021-05-09 12:41:18 +00:00
case req := <-pm.rpDescribe:
pathName, pathConf, err := pm.findPathConf(req.PathName)
2020-10-19 20:17:48 +00:00
if err != nil {
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet
2020-10-19 20:17:48 +00:00
continue
}
2021-05-07 21:07:31 +00:00
err = pm.authenticate(
req.IP,
req.ValidateCredentials,
2021-03-10 14:06:45 +00:00
req.PathName,
2021-05-07 21:07:31 +00:00
pathConf.ReadIPsParsed,
2021-03-10 14:06:45 +00:00
pathConf.ReadUser,
pathConf.ReadPass,
2021-05-07 21:07:31 +00:00
)
2020-10-19 20:17:48 +00:00
if err != nil {
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet
2020-10-19 20:17:48 +00:00
continue
}
// create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok {
2021-03-10 14:06:45 +00:00
pm.createPath(pathName, pathConf, req.PathName)
2020-10-19 20:17:48 +00:00
}
pm.paths[req.PathName].OnPathManDescribe(req)
2021-05-09 12:41:18 +00:00
case req := <-pm.rpSetupPlay:
pathName, pathConf, err := pm.findPathConf(req.PathName)
2020-10-19 20:17:48 +00:00
if err != nil {
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet
2020-10-19 20:17:48 +00:00
continue
}
2021-05-07 21:07:31 +00:00
err = pm.authenticate(
req.IP,
req.ValidateCredentials,
2021-03-10 14:06:45 +00:00
req.PathName,
2021-05-07 21:07:31 +00:00
pathConf.ReadIPsParsed,
2021-03-10 14:06:45 +00:00
pathConf.ReadUser,
pathConf.ReadPass,
2021-05-07 21:07:31 +00:00
)
2020-10-19 20:17:48 +00:00
if err != nil {
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet
2020-10-19 20:17:48 +00:00
continue
}
// create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok {
2021-03-10 14:06:45 +00:00
pm.createPath(pathName, pathConf, req.PathName)
2020-10-19 20:17:48 +00:00
}
2021-03-10 14:06:45 +00:00
pm.paths[req.PathName].OnPathManSetupPlay(req)
2020-10-19 20:17:48 +00:00
2021-05-09 12:41:18 +00:00
case req := <-pm.rpAnnounce:
pathName, pathConf, err := pm.findPathConf(req.PathName)
2020-10-19 20:17:48 +00:00
if err != nil {
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet
2020-10-19 20:17:48 +00:00
continue
}
2021-05-07 21:07:31 +00:00
err = pm.authenticate(
req.IP,
req.ValidateCredentials,
req.PathName,
2021-05-07 21:07:31 +00:00
pathConf.PublishIPsParsed,
2021-03-10 14:06:45 +00:00
pathConf.PublishUser,
pathConf.PublishPass,
2021-05-07 21:07:31 +00:00
)
2020-10-19 20:17:48 +00:00
if err != nil {
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet
2020-10-19 20:17:48 +00:00
continue
}
// create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok {
2021-03-10 14:06:45 +00:00
pm.createPath(pathName, pathConf, req.PathName)
}
2021-03-10 14:06:45 +00:00
pm.paths[req.PathName].OnPathManAnnounce(req)
2020-10-19 20:17:48 +00:00
case <-pm.terminate:
break outer
}
}
go func() {
for {
select {
case _, ok := <-pm.confReload:
if !ok {
return
}
2020-10-19 20:17:48 +00:00
case _, ok := <-pm.pathClose:
if !ok {
return
}
2021-05-09 12:41:18 +00:00
case req, ok := <-pm.rpDescribe:
if !ok {
return
}
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
2020-10-19 20:17:48 +00:00
2021-05-09 12:41:18 +00:00
case req, ok := <-pm.rpSetupPlay:
if !ok {
return
}
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
2020-10-19 20:17:48 +00:00
2021-05-09 12:41:18 +00:00
case req, ok := <-pm.rpAnnounce:
if !ok {
return
}
2021-04-27 17:19:47 +00:00
req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
2020-10-19 20:17:48 +00:00
}
}
}()
for _, pa := range pm.paths {
pa.Close()
}
pm.wg.Wait()
close(pm.confReload)
2020-10-19 20:17:48 +00:00
close(pm.pathClose)
2021-05-09 12:41:18 +00:00
close(pm.rpDescribe)
close(pm.rpSetupPlay)
close(pm.rpAnnounce)
2020-10-19 20:17:48 +00:00
}
2021-03-10 14:06:45 +00:00
func (pm *PathManager) createPath(confName string, conf *conf.PathConf, name string) {
pm.paths[name] = path.New(
pm.rtspAddress,
pm.readTimeout,
pm.writeTimeout,
pm.readBufferCount,
pm.readBufferSize,
confName,
conf,
name,
&pm.wg,
pm.stats,
pm)
}
func (pm *PathManager) createPaths() {
for pathName, pathConf := range pm.pathConfs {
if _, ok := pm.paths[pathName]; !ok && pathConf.Regexp == nil {
2021-03-10 14:06:45 +00:00
pm.createPath(pathName, pathConf, pathName)
}
}
}
func (pm *PathManager) findPathConf(name string) (string, *conf.PathConf, error) {
2020-10-19 20:17:48 +00:00
err := conf.CheckPathName(name)
if err != nil {
return "", nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
2020-10-19 20:17:48 +00:00
}
// normal path
if pathConf, ok := pm.pathConfs[name]; ok {
return name, pathConf, nil
2020-10-19 20:17:48 +00:00
}
// regular expression path
for pathName, pathConf := range pm.pathConfs {
2020-10-19 20:17:48 +00:00
if pathConf.Regexp != nil && pathConf.Regexp.MatchString(name) {
return pathName, pathConf, nil
2020-10-19 20:17:48 +00:00
}
}
return "", nil, fmt.Errorf("unable to find a valid configuration for path '%s'", name)
}
2020-11-05 11:30:25 +00:00
// OnProgramConfReload is called by program.
func (pm *PathManager) OnProgramConfReload(pathConfs map[string]*conf.PathConf) {
pm.confReload <- pathConfs
2020-10-19 20:17:48 +00:00
}
2020-11-05 11:30:25 +00:00
// OnPathClose is called by path.Path.
2020-10-19 20:17:48 +00:00
func (pm *PathManager) OnPathClose(pa *path.Path) {
pm.pathClose <- pa
}
2021-05-09 12:41:18 +00:00
// OnReadPublisherDescribe is called by a ReadPublisher.
2021-04-27 17:19:47 +00:00
func (pm *PathManager) OnReadPublisherDescribe(req readpublisher.DescribeReq) {
2021-05-09 12:41:18 +00:00
pm.rpDescribe <- req
2020-10-19 20:17:48 +00:00
}
2021-05-09 12:41:18 +00:00
// OnReadPublisherAnnounce is called by a ReadPublisher.
2021-04-27 17:19:47 +00:00
func (pm *PathManager) OnReadPublisherAnnounce(req readpublisher.AnnounceReq) {
2021-05-09 12:41:18 +00:00
pm.rpAnnounce <- req
2020-10-19 20:17:48 +00:00
}
2021-05-09 12:41:18 +00:00
// OnReadPublisherSetupPlay is called by a ReadPublisher.
2021-04-27 17:19:47 +00:00
func (pm *PathManager) OnReadPublisherSetupPlay(req readpublisher.SetupPlayReq) {
2021-05-09 12:41:18 +00:00
pm.rpSetupPlay <- req
2020-10-19 20:17:48 +00:00
}
2021-05-07 21:07:31 +00:00
func (pm *PathManager) authenticate(
ip net.IP,
validateCredentials func(authMethods []headers.AuthMethod, pathUser string, pathPass string) error,
pathName string,
pathIPs []interface{},
pathUser string,
pathPass string,
) error {
// validate ip
if pathIPs != nil && ip != nil {
if !ipEqualOrInRange(ip, pathIPs) {
return readpublisher.ErrAuthCritical{
Message: fmt.Sprintf("IP '%s' not allowed", ip),
Response: &base.Response{
StatusCode: base.StatusUnauthorized,
},
}
}
}
// validate user
if pathUser != "" && validateCredentials != nil {
err := validateCredentials(pm.authMethods, pathUser, pathPass)
if err != nil {
return err
}
}
return nil
}