This commit is contained in:
aler9 2020-09-19 17:13:45 +02:00
parent b1b6f669dd
commit 33f703a1a3
10 changed files with 163 additions and 134 deletions

View File

@ -51,7 +51,7 @@ test:
test-nodocker:
$(foreach IMG,$(shell echo test-images/*/ | xargs -n1 basename), \
docker build -q test-images/$(IMG) -t rtsp-simple-server-test-$(IMG)$(NL))
go test -race -v .
go test -race -v -run OnDemand .
stress:
docker build -q . -f stress/$(NAME)/Dockerfile -t temp

View File

@ -201,7 +201,7 @@ There are multiple ways to monitor the server usage over time:
```
2020/01/01 00:00:00 [2/1/1] [client 127.0.0.1:44428] OPTION
```
means that there are 2 clients, 1 publisher and 1 receiver.
means that there are 2 clients, 1 publisher and 1 reader.
* A metrics exporter, compatible with Prometheus, can be enabled with the option `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request:
```

View File

@ -27,12 +27,14 @@ const (
type clientDescribeReq struct {
client *client
pathName string
pathConf *pathConf
}
type clientAnnounceReq struct {
res chan error
client *client
pathName string
pathConf *pathConf
trackCount int
sdp []byte
}
@ -369,14 +371,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
pathName = removeQueryFromPath(pathName)
confp := c.p.findConfForPathName(pathName)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", pathName))
pathConf, err := c.p.conf.checkPathNameAndFindConf(pathName)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return errRunTerminate
}
err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req)
err = c.authenticate(pathConf.readIpsParsed, pathConf.ReadUser, pathConf.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return errRunTerminate
@ -384,7 +385,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
return nil
}
c.p.clientDescribe <- clientDescribeReq{c, pathName}
c.p.clientDescribe <- clientDescribeReq{c, pathName, pathConf}
c.describeCSeq = cseq
c.describeUrl = req.Url.String()
@ -400,25 +401,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
pathName = removeQueryFromPath(pathName)
if len(pathName) == 0 {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("empty base path"))
return errRunTerminate
}
err := checkPathName(pathName)
pathConf, err := c.p.conf.checkPathNameAndFindConf(pathName)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid path name: %s (%s)", err, pathName))
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return errRunTerminate
}
confp := c.p.findConfForPathName(pathName)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", pathName))
return errRunTerminate
}
err = c.authenticate(confp.publishIpsParsed, confp.PublishUser, confp.PublishPass, req)
err = c.authenticate(pathConf.publishIpsParsed, pathConf.PublishUser, pathConf.PublishPass, req)
if err != nil {
if err == errAuthCritical {
return errRunTerminate
@ -451,7 +440,7 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
sdp := tracks.Write()
res := make(chan error)
c.p.clientAnnounce <- clientAnnounceReq{res, c, pathName, len(tracks), sdp}
c.p.clientAnnounce <- clientAnnounceReq{res, c, pathName, pathConf, len(tracks), sdp}
err = <-res
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
@ -489,14 +478,13 @@ func (c *client) handleRequest(req *gortsplib.Request) error {
switch c.state {
// play
case clientStateInitial, clientStatePrePlay:
confp := c.p.findConfForPathName(basePath)
if confp == nil {
c.writeResError(cseq, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", basePath))
pathConf, err := c.p.conf.checkPathNameAndFindConf(basePath)
if err != nil {
c.writeResError(cseq, gortsplib.StatusBadRequest, err)
return errRunTerminate
}
err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req)
err = c.authenticate(pathConf.readIpsParsed, pathConf.ReadUser, pathConf.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return errRunTerminate
@ -899,9 +887,9 @@ func (c *client) runPlay() bool {
}(), c.streamProtocol)
var onReadCmd *exec.Cmd
if c.path.confp.RunOnRead != "" {
if c.path.conf.RunOnRead != "" {
var err error
onReadCmd, err = startExternalCommand(c.path.confp.RunOnRead, c.path.name)
onReadCmd, err = startExternalCommand(c.path.conf.RunOnRead, c.path.name)
if err != nil {
c.log("ERR: %s", err)
}
@ -1050,9 +1038,9 @@ func (c *client) runRecord() bool {
}(), c.streamProtocol)
var onPublishCmd *exec.Cmd
if c.path.confp.RunOnPublish != "" {
if c.path.conf.RunOnPublish != "" {
var err error
onPublishCmd, err = startExternalCommand(c.path.confp.RunOnPublish, c.path.name)
onPublishCmd, err = startExternalCommand(c.path.conf.RunOnPublish, c.path.name)
if err != nil {
c.log("ERR: %s", err)
}

139
conf.go
View File

@ -12,7 +12,8 @@ import (
"gopkg.in/yaml.v2"
)
type confPath struct {
type pathConf struct {
regexp *regexp.Regexp
Source string `yaml:"source"`
sourceUrl *url.URL ``
SourceProtocol string `yaml:"sourceProtocol"`
@ -48,7 +49,7 @@ type conf struct {
LogDestinations []string `yaml:"logDestinations"`
logDestinationsParsed map[logDestination]struct{} ``
LogFile string `yaml:"logFile"`
Paths map[string]*confPath `yaml:"paths"`
Paths map[string]*pathConf `yaml:"paths"`
}
func loadConf(fpath string, stdin io.Reader) (*conf, error) {
@ -172,105 +173,147 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
}
if len(conf.Paths) == 0 {
conf.Paths = map[string]*confPath{
conf.Paths = map[string]*pathConf{
"all": {},
}
}
for name, confp := range conf.Paths {
if confp == nil {
conf.Paths[name] = &confPath{}
confp = conf.Paths[name]
// "all" is an alias for "~^.*$"
if _, ok := conf.Paths["all"]; ok {
conf.Paths["~^.*$"] = conf.Paths["all"]
delete(conf.Paths, "all")
}
for name, pconf := range conf.Paths {
if pconf == nil {
conf.Paths[name] = &pathConf{}
pconf = conf.Paths[name]
}
err := checkPathName(name)
if err != nil {
return nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
if name == "" {
return nil, fmt.Errorf("path name can not be empty")
}
if confp.Source == "" {
confp.Source = "record"
}
if confp.Source != "record" {
if name == "all" {
return nil, fmt.Errorf("path 'all' cannot have a RTSP source; use another path")
}
confp.sourceUrl, err = url.Parse(confp.Source)
// normal path
if name[0] != '~' {
err := checkPathName(name)
if err != nil {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", confp.Source)
return nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
}
if confp.sourceUrl.Scheme != "rtsp" {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", confp.Source)
// regular expression path
} else {
pathRegexp, err := regexp.Compile(name[1:])
if err != nil {
return nil, fmt.Errorf("invalid regular expression: %s", name[1:])
}
if confp.sourceUrl.Port() == "" {
confp.sourceUrl.Host += ":554"
pconf.regexp = pathRegexp
}
if pconf.Source == "" {
pconf.Source = "record"
}
if pconf.Source != "record" {
if pconf.regexp != nil {
return nil, fmt.Errorf("a path with a regular expression cannot have a RTSP source; use another path")
}
if confp.sourceUrl.User != nil {
pass, _ := confp.sourceUrl.User.Password()
user := confp.sourceUrl.User.Username()
pconf.sourceUrl, err = url.Parse(pconf.Source)
if err != nil {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source)
}
if pconf.sourceUrl.Scheme != "rtsp" {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source)
}
if pconf.sourceUrl.Port() == "" {
pconf.sourceUrl.Host += ":554"
}
if pconf.sourceUrl.User != nil {
pass, _ := pconf.sourceUrl.User.Password()
user := pconf.sourceUrl.User.Username()
if user != "" && pass == "" ||
user == "" && pass != "" {
fmt.Errorf("username and password must be both provided")
}
}
if confp.SourceProtocol == "" {
confp.SourceProtocol = "udp"
if pconf.SourceProtocol == "" {
pconf.SourceProtocol = "udp"
}
switch confp.SourceProtocol {
switch pconf.SourceProtocol {
case "udp":
confp.sourceProtocolParsed = gortsplib.StreamProtocolUDP
pconf.sourceProtocolParsed = gortsplib.StreamProtocolUDP
case "tcp":
confp.sourceProtocolParsed = gortsplib.StreamProtocolTCP
pconf.sourceProtocolParsed = gortsplib.StreamProtocolTCP
default:
return nil, fmt.Errorf("unsupported protocol '%s'", confp.SourceProtocol)
return nil, fmt.Errorf("unsupported protocol '%s'", pconf.SourceProtocol)
}
}
if confp.PublishUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.PublishUser) {
if pconf.PublishUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishUser) {
return nil, fmt.Errorf("publish username must be alphanumeric")
}
}
if confp.PublishPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.PublishPass) {
if pconf.PublishPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishPass) {
return nil, fmt.Errorf("publish password must be alphanumeric")
}
}
confp.publishIpsParsed, err = parseIpCidrList(confp.PublishIps)
pconf.publishIpsParsed, err = parseIpCidrList(pconf.PublishIps)
if err != nil {
return nil, err
}
if confp.ReadUser != "" && confp.ReadPass == "" || confp.ReadUser == "" && confp.ReadPass != "" {
if pconf.ReadUser != "" && pconf.ReadPass == "" || pconf.ReadUser == "" && pconf.ReadPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
if confp.ReadUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.ReadUser) {
if pconf.ReadUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.ReadUser) {
return nil, fmt.Errorf("read username must be alphanumeric")
}
}
if confp.ReadPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.ReadPass) {
if pconf.ReadPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.ReadPass) {
return nil, fmt.Errorf("read password must be alphanumeric")
}
}
if confp.ReadUser != "" && confp.ReadPass == "" || confp.ReadUser == "" && confp.ReadPass != "" {
if pconf.ReadUser != "" && pconf.ReadPass == "" || pconf.ReadUser == "" && pconf.ReadPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
confp.readIpsParsed, err = parseIpCidrList(confp.ReadIps)
pconf.readIpsParsed, err = parseIpCidrList(pconf.ReadIps)
if err != nil {
return nil, err
}
if name == "all" && confp.RunOnInit != "" {
return nil, fmt.Errorf("path 'all' does not support option 'runOnInit'; use another path")
if pconf.regexp != nil && pconf.RunOnInit != "" {
return nil, fmt.Errorf("a path with a regular expression does not support option 'runOnInit'; use another path")
}
}
return conf, nil
}
func (conf *conf) checkPathNameAndFindConf(name string) (*pathConf, error) {
err := checkPathName(name)
if err != nil {
return nil, fmt.Errorf("invalid path name: %s (%s)", err, name)
}
// normal path
if pconf, ok := conf.Paths[name]; ok {
return pconf, nil
}
// regular expression path
for _, pconf := range conf.Paths {
if pconf.regexp != nil && pconf.regexp.MatchString(name) {
return pconf, nil
}
}
return nil, fmt.Errorf("unable to find a valid configuration for path '%s'", name)
}

23
main.go
View File

@ -110,11 +110,10 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
}
}
for name, confp := range conf.Paths {
if name == "all" {
continue
for name, pathConf := range conf.Paths {
if pathConf.regexp == nil {
p.paths[name] = newPath(p, name, pathConf)
}
p.paths[name] = newPath(p, name, confp, true)
}
if _, ok := conf.protocolsParsed[gortsplib.StreamProtocolUDP]; ok {
@ -204,7 +203,7 @@ outer:
case req := <-p.clientDescribe:
// create path if not exist
if _, ok := p.paths[req.pathName]; !ok {
p.paths[req.pathName] = newPath(p, req.pathName, p.findConfForPathName(req.pathName), false)
p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf)
}
p.paths[req.pathName].onDescribe(req.client)
@ -212,7 +211,7 @@ outer:
case req := <-p.clientAnnounce:
// create path if not exist
if path, ok := p.paths[req.pathName]; !ok {
p.paths[req.pathName] = newPath(p, req.pathName, p.findConfForPathName(req.pathName), false)
p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf)
} else {
if path.publisher != nil {
@ -364,18 +363,6 @@ func (p *program) close() {
<-p.done
}
func (p *program) findConfForPathName(name string) *confPath {
if confp, ok := p.conf.Paths[name]; ok {
return confp
}
if confp, ok := p.conf.Paths["all"]; ok {
return confp
}
return nil
}
func main() {
_, err := newProgram(os.Args[1:], os.Stdin)
if err != nil {

View File

@ -8,6 +8,7 @@ import (
"strconv"
"testing"
"time"
"fmt"
"github.com/stretchr/testify/require"
)
@ -480,8 +481,11 @@ func TestProxy(t *testing.T) {
func TestRunOnDemand(t *testing.T) {
stdin := []byte("\n" +
"paths:\n" +
" ondemand:\n" +
" runOnDemand: ffmpeg -hide_banner -loglevel error -re -i test-images/ffmpeg/emptyvideo.ts -c copy -f rtsp rtsp://localhost:8554/ondemand\n")
" all:\n" +
" runOnDemand: ffmpeg -hide_banner -loglevel error -re -i test-images/ffmpeg/emptyvideo.ts -c copy -f rtsp rtsp://localhost:8554/$RTSP_SERVER_PATH\n")
fmt.Println("TEST", string(stdin))
p1, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin))
require.NoError(t, err)
defer p1.close()

30
path.go
View File

@ -21,8 +21,7 @@ type publisher interface {
type path struct {
p *program
name string
confp *confPath
permanent bool
conf *pathConf
source *source
publisher publisher
publisherReady bool
@ -34,16 +33,15 @@ type path struct {
onDemandCmd *exec.Cmd
}
func newPath(p *program, name string, confp *confPath, permanent bool) *path {
func newPath(p *program, name string, conf *pathConf) *path {
pa := &path{
p: p,
name: name,
confp: confp,
permanent: permanent,
p: p,
name: name,
conf: conf,
}
if confp.Source != "record" {
s := newSource(p, pa, confp)
if conf.Source != "record" {
s := newSource(p, pa, conf)
pa.source = s
pa.publisher = s
}
@ -60,11 +58,11 @@ func (pa *path) onInit() {
go pa.source.run(pa.source.state)
}
if pa.confp.RunOnInit != "" {
if pa.conf.RunOnInit != "" {
pa.log("starting on init command")
var err error
pa.onInitCmd, err = startExternalCommand(pa.confp.RunOnInit, pa.name)
pa.onInitCmd, err = startExternalCommand(pa.conf.RunOnInit, pa.name)
if err != nil {
pa.log("ERR: %s", err)
}
@ -149,7 +147,7 @@ func (pa *path) onCheck() {
// stop on demand source if needed
if pa.source != nil &&
pa.confp.SourceOnDemand &&
pa.conf.SourceOnDemand &&
pa.source.state == sourceStateRunning &&
!pa.hasClients() &&
time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs {
@ -168,8 +166,8 @@ func (pa *path) onCheck() {
pa.onDemandCmd = nil
}
// remove non-permanent paths
if !pa.permanent &&
// remove regular expression paths
if pa.conf.regexp != nil &&
pa.publisher == nil &&
!pa.hasClients() {
pa.onClose(false)
@ -223,13 +221,13 @@ func (pa *path) onDescribe(client *client) {
// publisher not found
if pa.publisher == nil {
// on demand command is available: put the client on hold
if pa.confp.RunOnDemand != "" {
if pa.conf.RunOnDemand != "" {
if pa.onDemandCmd == nil { // start if needed
pa.log("starting on demand command")
pa.lastDescribeActivation = time.Now()
var err error
pa.onDemandCmd, err = startExternalCommand(pa.confp.RunOnDemand, "")
pa.onDemandCmd, err = startExternalCommand(pa.conf.RunOnDemand, "")
if err != nil {
pa.log("ERR: %s", err)
}

View File

@ -29,8 +29,11 @@ logDestinations: [stdout]
# if 'file' is in logDestinations, this is the file that will receive the logs
logFile: rtsp-simple-server.log
# these settings are path-dependent. The settings under the path 'all' are
# applied to all paths that do not match a specific entry.
# these settings are path-dependent.
# The settings under the path 'all' are applied to all paths that do not match
# another entry.
# It's possible to use regular expressions by using a tilde as prefix,
# for instance, '~^(test1|test2)$' will match both test1 and test2.
paths:
all:
# source of the stream - this can be:

View File

@ -24,7 +24,7 @@ const (
type source struct {
p *program
path *path
confp *confPath
pathConf *pathConf
state sourceState
tracks []*gortsplib.Track
innerRunning bool
@ -36,17 +36,17 @@ type source struct {
done chan struct{}
}
func newSource(p *program, path *path, confp *confPath) *source {
func newSource(p *program, path *path, pathConf *pathConf) *source {
s := &source{
p: p,
path: path,
confp: confp,
pathConf: pathConf,
setState: make(chan sourceState),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
if confp.SourceOnDemand {
if pathConf.SourceOnDemand {
s.state = sourceStateStopped
} else {
s.state = sourceStateRunning
@ -134,7 +134,7 @@ func (s *source) runInnerInner() bool {
dialDone := make(chan struct{})
go func() {
conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{
Host: s.confp.sourceUrl.Host,
Host: s.pathConf.sourceUrl.Host,
ReadTimeout: s.p.conf.ReadTimeout,
WriteTimeout: s.p.conf.WriteTimeout,
})
@ -152,14 +152,14 @@ func (s *source) runInnerInner() bool {
return true
}
_, err = conn.Options(s.confp.sourceUrl)
_, err = conn.Options(s.pathConf.sourceUrl)
if err != nil {
conn.Close()
s.path.log("source ERR: %s", err)
return true
}
tracks, _, err := conn.Describe(s.confp.sourceUrl)
tracks, _, err := conn.Describe(s.pathConf.sourceUrl)
if err != nil {
conn.Close()
s.path.log("source ERR: %s", err)
@ -173,7 +173,7 @@ func (s *source) runInnerInner() bool {
s.path.publisherTrackCount = len(tracks)
s.path.publisherSdp = serverSdp
if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUDP {
if s.pathConf.sourceProtocolParsed == gortsplib.StreamProtocolUDP {
return s.runUDP(conn)
} else {
return s.runTCP(conn)
@ -191,7 +191,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort := rtpPort + 1
rtpRead, rtcpRead, _, err := conn.SetupUDP(s.confp.sourceUrl, track, rtpPort, rtcpPort)
rtpRead, rtcpRead, _, err := conn.SetupUDP(s.pathConf.sourceUrl, track, rtpPort, rtcpPort)
if err != nil {
if isBindError(err) {
continue // retry
@ -208,7 +208,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
}
}
_, err := conn.Play(s.confp.sourceUrl)
_, err := conn.Play(s.pathConf.sourceUrl)
if err != nil {
conn.Close()
s.path.log("source ERR: %s", err)
@ -263,7 +263,7 @@ func (s *source) runUDP(conn *gortsplib.ConnClient) bool {
tcpConnDone := make(chan error)
go func() {
tcpConnDone <- conn.LoopUDP(s.confp.sourceUrl)
tcpConnDone <- conn.LoopUDP(s.pathConf.sourceUrl)
}()
var ret bool
@ -294,7 +294,7 @@ outer:
func (s *source) runTCP(conn *gortsplib.ConnClient) bool {
for _, track := range s.tracks {
_, err := conn.SetupTCP(s.confp.sourceUrl, track)
_, err := conn.SetupTCP(s.pathConf.sourceUrl, track)
if err != nil {
conn.Close()
s.path.log("source ERR: %s", err)
@ -302,7 +302,7 @@ func (s *source) runTCP(conn *gortsplib.ConnClient) bool {
}
}
_, err := conn.Play(s.confp.sourceUrl)
_, err := conn.Play(s.pathConf.sourceUrl)
if err != nil {
conn.Close()
s.path.log("source ERR: %s", err)

View File

@ -118,8 +118,8 @@ func removeQueryFromPath(path string) string {
var rePathName = regexp.MustCompile("^[0-9a-zA-Z_\\-/]+$")
func checkPathName(name string) error {
if !rePathName.MatchString(name) {
return fmt.Errorf("can contain only alfanumeric characters, underscore, minus or slash")
if name == "" {
return fmt.Errorf("cannot be empty")
}
if name[0] == '/' {
@ -130,6 +130,10 @@ func checkPathName(name string) error {
return fmt.Errorf("can't end with a slash")
}
if !rePathName.MatchString(name) {
return fmt.Errorf("can contain only alfanumeric characters, underscore, minus or slash")
}
return nil
}
@ -147,6 +151,8 @@ func startExternalCommand(cmdstr string, pathName string) (*exec.Cmd, error) {
cmd = exec.Command("/bin/sh", "-c", cmdstr)
}
fmt.Println("PATH NAME", pathName)
// variables are available through environment variables
cmd.Env = append(os.Environ(),
"RTSP_SERVER_PATH="+pathName,