add 'restart' option to all external commands

This commit is contained in:
aler9 2020-10-31 16:36:09 +01:00
parent e511eb4ef1
commit 38b07ab68a
9 changed files with 191 additions and 115 deletions

View File

@ -109,6 +109,7 @@ Edit `rtsp-simple-server.yml` and replace everything inside section `paths` with
paths:
cam:
runOnInit: ffmpeg -f v4l2 -i /dev/video0 -f rtsp rtsp://localhost:8554/$RTSP_SERVER_PATH
runOnInitRestart: yes
```
After starting the server, the webcam is available on `rtsp://localhost:8554/cam`. The ffmpeg command works only on Linux; for Windows and Mac equivalents, read the [ffmpeg wiki](https://trac.ffmpeg.org/wiki/Capture/Webcam).
@ -128,6 +129,7 @@ Then edit `rtsp-simple-server.yml` and replace everything inside section `paths`
paths:
cam:
runOnInit: gst-launch-1.0 rpicamsrc preview=false bitrate=2000000 keyframe-interval=50 ! video/x-h264,width=1920,height=1080,framerate=25/1 ! rtspclientsink location=rtsp://localhost:8554/$RTSP_SERVER_PATH
runOnInitRestart: yes
```
After starting the server, the webcam is available on `rtsp://localhost:8554/cam`.
@ -139,6 +141,7 @@ Edit `rtsp-simple-server.yml` and replace everything inside section `paths` with
paths:
ondemand:
runOnDemand: ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/$RTSP_SERVER_PATH
runOnDemandRestart: yes
```
The command inserted into `runOnDemand` will start only when a client requests the path `ondemand`, therefore the file will start streaming only when requested.
@ -151,6 +154,7 @@ paths:
all:
original:
runOnPublish: ffmpeg -i rtsp://localhost:8554/$RTSP_SERVER_PATH -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed
runOnPublishRestart: yes
```
### Authentication

View File

@ -97,15 +97,16 @@ type Parent interface {
}
type Client struct {
wg *sync.WaitGroup
stats *stats.Stats
serverUdpRtp *serverudp.Server
serverUdpRtcp *serverudp.Server
readTimeout time.Duration
runOnConnect string
protocols map[gortsplib.StreamProtocol]struct{}
conn *gortsplib.ConnServer
parent Parent
wg *sync.WaitGroup
stats *stats.Stats
serverUdpRtp *serverudp.Server
serverUdpRtcp *serverudp.Server
readTimeout time.Duration
runOnConnect string
runOnConnectRestart bool
protocols map[gortsplib.StreamProtocol]struct{}
conn *gortsplib.ConnServer
parent Parent
state state
path Path
@ -134,18 +135,20 @@ func New(
readTimeout time.Duration,
writeTimeout time.Duration,
runOnConnect string,
runOnConnectRestart bool,
protocols map[gortsplib.StreamProtocol]struct{},
nconn net.Conn,
parent Parent) *Client {
c := &Client{
wg: wg,
stats: stats,
serverUdpRtp: serverUdpRtp,
serverUdpRtcp: serverUdpRtcp,
readTimeout: readTimeout,
runOnConnect: runOnConnect,
protocols: protocols,
wg: wg,
stats: stats,
serverUdpRtp: serverUdpRtp,
serverUdpRtcp: serverUdpRtcp,
readTimeout: readTimeout,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
protocols: protocols,
conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{
Conn: nconn,
ReadTimeout: readTimeout,
@ -198,11 +201,7 @@ func (c *Client) run() {
var onConnectCmd *externalcmd.ExternalCmd
if c.runOnConnect != "" {
var err error
onConnectCmd, err = externalcmd.New(c.runOnConnect, "")
if err != nil {
c.log("ERR: %s", err)
}
onConnectCmd = externalcmd.New(c.runOnConnect, c.runOnConnectRestart, "")
}
for {
@ -939,11 +938,8 @@ func (c *Client) runPlay() bool {
var onReadCmd *externalcmd.ExternalCmd
if c.path.Conf().RunOnRead != "" {
var err error
onReadCmd, err = externalcmd.New(c.path.Conf().RunOnRead, c.path.Name())
if err != nil {
c.log("ERR: %s", err)
}
onReadCmd = externalcmd.New(c.path.Conf().RunOnRead,
c.path.Conf().RunOnReadRestart, c.path.Name())
}
if c.streamProtocol == gortsplib.StreamProtocolUDP {
@ -1121,11 +1117,8 @@ func (c *Client) runRecord() bool {
var onPublishCmd *externalcmd.ExternalCmd
if c.path.Conf().RunOnPublish != "" {
var err error
onPublishCmd, err = externalcmd.New(c.path.Conf().RunOnPublish, c.path.Name())
if err != nil {
c.log("ERR: %s", err)
}
onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish,
c.path.Conf().RunOnPublishRestart, c.path.Name())
}
if c.streamProtocol == gortsplib.StreamProtocolUDP {

View File

@ -20,16 +20,17 @@ type Parent interface {
}
type ClientManager struct {
readTimeout time.Duration
writeTimeout time.Duration
runOnConnect string
protocols map[headers.StreamProtocol]struct{}
stats *stats.Stats
serverUdpRtp *serverudp.Server
serverUdpRtcp *serverudp.Server
pathMan *pathman.PathManager
serverTcp *servertcp.Server
parent Parent
readTimeout time.Duration
writeTimeout time.Duration
runOnConnect string
runOnConnectRestart bool
protocols map[headers.StreamProtocol]struct{}
stats *stats.Stats
serverUdpRtp *serverudp.Server
serverUdpRtcp *serverudp.Server
pathMan *pathman.PathManager
serverTcp *servertcp.Server
parent Parent
clients map[*client.Client]struct{}
wg sync.WaitGroup
@ -46,6 +47,7 @@ func New(
readTimeout time.Duration,
writeTimeout time.Duration,
runOnConnect string,
runOnConnectRestart bool,
protocols map[headers.StreamProtocol]struct{},
stats *stats.Stats,
serverUdpRtp *serverudp.Server,
@ -55,20 +57,21 @@ func New(
parent Parent) *ClientManager {
cm := &ClientManager{
readTimeout: readTimeout,
writeTimeout: writeTimeout,
runOnConnect: runOnConnect,
protocols: protocols,
stats: stats,
serverUdpRtp: serverUdpRtp,
serverUdpRtcp: serverUdpRtcp,
pathMan: pathMan,
serverTcp: serverTcp,
parent: parent,
clients: make(map[*client.Client]struct{}),
clientClose: make(chan *client.Client),
terminate: make(chan struct{}),
done: make(chan struct{}),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
protocols: protocols,
stats: stats,
serverUdpRtp: serverUdpRtp,
serverUdpRtcp: serverUdpRtcp,
pathMan: pathMan,
serverTcp: serverTcp,
parent: parent,
clients: make(map[*client.Client]struct{}),
clientClose: make(chan *client.Client),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
go cm.run()
@ -98,6 +101,7 @@ outer:
cm.readTimeout,
cm.writeTimeout,
cm.runOnConnect,
cm.runOnConnectRestart,
cm.protocols,
conn,
cm)

View File

@ -20,6 +20,7 @@ type Conf struct {
RtpPort int `yaml:"rtpPort"`
RtcpPort int `yaml:"rtcpPort"`
RunOnConnect string `yaml:"runOnConnect"`
RunOnConnectRestart bool `yaml:"runOnConnectRestart"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
AuthMethods []string `yaml:"authMethods"`

View File

@ -22,9 +22,13 @@ type PathConf struct {
SourceOnDemand bool `yaml:"sourceOnDemand"`
SourceRedirect string `yaml:"sourceRedirect"`
RunOnInit string `yaml:"runOnInit"`
RunOnInitRestart bool `yaml:"runOnInitRestart"`
RunOnDemand string `yaml:"runOnDemand"`
RunOnDemandRestart bool `yaml:"runOnDemandRestart"`
RunOnPublish string `yaml:"runOnPublish"`
RunOnPublishRestart bool `yaml:"runOnPublishRestart"`
RunOnRead string `yaml:"runOnRead"`
RunOnReadRestart bool `yaml:"runOnReadRestart"`
PublishUser string `yaml:"publishUser"`
PublishPass string `yaml:"publishPass"`
PublishIps []string `yaml:"publishIps"`

View File

@ -5,29 +5,91 @@ import (
"os/exec"
"runtime"
"strings"
"time"
)
const (
restartPause = 2 * time.Second
)
type ExternalCmd struct {
cmd *exec.Cmd
cmdstr string
restart bool
pathName string
// in
terminate chan struct{}
// out
done chan struct{}
}
func New(cmdstr string, pathName string) (*ExternalCmd, error) {
func New(cmdstr string, restart bool, pathName string) *ExternalCmd {
e := &ExternalCmd{
cmdstr: cmdstr,
restart: restart,
pathName: pathName,
terminate: make(chan struct{}),
done: make(chan struct{}),
}
go e.run()
return e
}
func (e *ExternalCmd) Close() {
close(e.terminate)
<-e.done
}
func (e *ExternalCmd) run() {
defer close(e.done)
for {
if !e.runInner() {
break
}
}
}
func (e *ExternalCmd) runInner() bool {
ok := e.runInnerInner()
if !ok {
return false
}
if !e.restart {
<-e.terminate
return false
}
t := time.NewTimer(restartPause)
defer t.Stop()
select {
case <-t.C:
return true
case <-e.terminate:
return false
}
}
func (e *ExternalCmd) runInnerInner() bool {
var cmd *exec.Cmd
if runtime.GOOS == "windows" {
// on Windows the shell is not used and command is started directly
// variables are replaced manually in order to allow
// compatibility with linux commands
cmdstr = strings.ReplaceAll(cmdstr, "$RTSP_SERVER_PATH", pathName)
args := strings.Fields(cmdstr)
// variables are replaced manually in order to guarantee compatibility
// with Linux commands
args := strings.Fields(strings.ReplaceAll(e.cmdstr, "$RTSP_SERVER_PATH", e.pathName))
cmd = exec.Command(args[0], args[1:]...)
} else {
cmd = exec.Command("/bin/sh", "-c", "exec "+cmdstr)
cmd = exec.Command("/bin/sh", "-c", "exec "+e.cmdstr)
}
// variables are available through environment variables
// variables are inserted into the environment
cmd.Env = append(os.Environ(),
"RTSP_SERVER_PATH="+pathName,
"RTSP_SERVER_PATH="+e.pathName,
)
cmd.Stdout = os.Stdout
@ -35,21 +97,28 @@ func New(cmdstr string, pathName string) (*ExternalCmd, error) {
err := cmd.Start()
if err != nil {
return nil, err
return true
}
return &ExternalCmd{
cmd: cmd,
}, nil
}
cmdDone := make(chan struct{})
go func() {
defer close(cmdDone)
cmd.Wait()
}()
func (e *ExternalCmd) Close() {
// on Windows it's not possible to send os.Interrupt to a process
// Kill() is the only supported way
if runtime.GOOS == "windows" {
e.cmd.Process.Kill()
} else {
e.cmd.Process.Signal(os.Interrupt)
select {
case <-e.terminate:
// on Windows it's not possible to send os.Interrupt to a process
// Kill() is the only supported way
if runtime.GOOS == "windows" {
cmd.Process.Kill()
} else {
cmd.Process.Signal(os.Interrupt)
}
<-cmdDone
return false
case <-cmdDone:
return true
}
e.cmd.Wait()
}

View File

@ -181,8 +181,9 @@ func (p *program) createResources(initial bool) error {
if p.clientMan == nil {
p.clientMan = clientman.New(p.conf.ReadTimeout, p.conf.WriteTimeout,
p.conf.RunOnConnect, p.conf.ProtocolsParsed, p.stats,
p.serverUdpRtp, p.serverUdpRtcp, p.pathMan, p.serverTcp, p)
p.conf.RunOnConnect, p.conf.RunOnConnectRestart,
p.conf.ProtocolsParsed, p.stats, p.serverUdpRtp, p.serverUdpRtcp,
p.pathMan, p.serverTcp, p)
}
if p.confWatcher == nil {
@ -295,6 +296,7 @@ func (p *program) reloadConf() error {
conf.ReadTimeout != p.conf.ReadTimeout ||
conf.WriteTimeout != p.conf.WriteTimeout ||
conf.RunOnConnect != p.conf.RunOnConnect ||
conf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
!reflect.DeepEqual(conf.ProtocolsParsed, p.conf.ProtocolsParsed) {
closeClientMan = true
}

View File

@ -223,11 +223,8 @@ func (pa *Path) run() {
if pa.conf.RunOnInit != "" {
pa.Log("starting on init command")
var err error
pa.onInitCmd, err = externalcmd.New(pa.conf.RunOnInit, pa.name)
if err != nil {
pa.Log("ERR: %s", err)
}
pa.onInitCmd = externalcmd.New(pa.conf.RunOnInit,
pa.conf.RunOnInitRestart, pa.name)
}
tickerCheck := time.NewTicker(pathCheckPeriod)
@ -516,11 +513,8 @@ func (pa *Path) onClientDescribe(c *client.Client) {
if pa.onDemandCmd == nil { // start if needed
pa.Log("starting on demand command")
pa.lastDescribeActivation = time.Now()
var err error
pa.onDemandCmd, err = externalcmd.New(pa.conf.RunOnDemand, pa.name)
if err != nil {
pa.Log("ERR: %s", err)
}
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand,
pa.conf.RunOnDemandRestart, pa.name)
}
pa.clients[c] = clientStateWaitingDescribe

View File

@ -17,10 +17,6 @@ writeTimeout: 5s
# to enforce security).
authMethods: [basic, digest]
# command to run when a client connects.
# this is terminated with SIGINT when a client disconnects.
runOnConnect:
# enable Prometheus-compatible metrics on port 9998.
metrics: no
# enable pprof on port 9999 to monitor performances.
@ -31,6 +27,11 @@ logDestinations: [stdout]
# if "file" is in logDestinations, this is the file that will receive the logs.
logFile: rtsp-simple-server.log
# command to run when a client connects to the server.
# this is terminated with SIGINT when a client disconnects from the server.
runOnConnect:
runOnConnectRestart: no
# these settings are path-dependent.
# it's possible to use regular expressions by using a tilde as prefix.
# for example, "~^(test1|test2)$" will match both "test1" and "test2".
@ -57,28 +58,6 @@ paths:
# redirected to.
sourceRedirect:
# command to run when this path is loaded by the program.
# this can be used, for example, to publish a stream and keep it always opened.
# this is terminated with SIGINT when the program closes.
# the path name is available in the RTSP_SERVER_PATH variable.
runOnInit:
# command to run when this path is requested.
# this can be used, for example, to publish a stream on demand.
# this is terminated with SIGINT when the path is not requested anymore.
# the path name is available in the RTSP_SERVER_PATH variable.
runOnDemand:
# command to run when a client starts publishing.
# this is terminated with SIGINT when a client stops publishing.
# the path name is available in the RTSP_SERVER_PATH variable.
runOnPublish:
# command to run when a clients starts reading.
# this is terminated with SIGINT when a client stops reading.
# the path name is available in the RTSP_SERVER_PATH variable.
runOnRead:
# username required to publish.
publishUser:
# password required to publish.
@ -92,3 +71,29 @@ paths:
readPass:
# ips or networks (x.x.x.x/24) allowed to read.
readIps: []
# command to run when this path is loaded by the program.
# this can be used, for example, to publish a stream and keep it always opened.
# this is terminated with SIGINT when the program closes.
# the path name is available in the RTSP_SERVER_PATH variable.
runOnInit:
runOnInitRestart: no
# command to run when this path is requested.
# this can be used, for example, to publish a stream on demand.
# this is terminated with SIGINT when the path is not requested anymore.
# the path name is available in the RTSP_SERVER_PATH variable.
runOnDemand:
runOnDemandRestart: no
# command to run when a client starts publishing.
# this is terminated with SIGINT when a client stops publishing.
# the path name is available in the RTSP_SERVER_PATH variable.
runOnPublish:
runOnPublishRestart: no
# command to run when a clients starts reading.
# this is terminated with SIGINT when a client stops reading.
# the path name is available in the RTSP_SERVER_PATH variable.
runOnRead:
runOnReadRestart: no