add option runOnDemand (#36)

This commit is contained in:
aler9 2020-07-30 13:31:18 +02:00
parent d7d2ba38f1
commit e409f673d4
8 changed files with 352 additions and 254 deletions

View File

@ -75,9 +75,10 @@ paths:
# readPass: tast
proxied:
source: rtsp://192.168.2.198:8554/stream
sourceProtocol: tcp
sourceOnDemand: yes
# source: rtsp://192.168.2.198:8554/stream
# sourceProtocol: tcp
# sourceOnDemand: yes
runOnDemand: ffmpeg -i rtsp://192.168.2.198:8554/stream -c copy -f rtsp rtsp://localhost:8554/proxied2
# original:
# runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed

119
client.go
View File

@ -23,6 +23,11 @@ const (
clientUdpWriteBufferSize = 128 * 1024
)
type describeRes struct {
sdp []byte
err error
}
type clientTrack struct {
rtpPort int
rtcpPort int
@ -77,7 +82,7 @@ type client struct {
p *program
conn *gortsplib.ConnServer
state clientState
path string
pathId string
authUser string
authPass string
authHelper *gortsplib.AuthServer
@ -88,12 +93,12 @@ type client struct {
readBuf *doubleBuffer
writeBuf *doubleBuffer
describeRes chan []byte
describeRes chan describeRes
events chan clientEvent // only if state = Play and gortsplib.StreamProtocol = TCP
done chan struct{}
}
func newServerClient(p *program, nconn net.Conn) *client {
func newClient(p *program, nconn net.Conn) *client {
c := &client{
p: p,
conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{
@ -125,12 +130,12 @@ func (c *client) zone() string {
}
func (c *client) run() {
var runOnConnectCmd *exec.Cmd
var onConnectCmd *exec.Cmd
if c.p.conf.RunOnConnect != "" {
runOnConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect)
runOnConnectCmd.Stdout = os.Stdout
runOnConnectCmd.Stderr = os.Stderr
err := runOnConnectCmd.Start()
onConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect)
onConnectCmd.Stdout = os.Stdout
onConnectCmd.Stderr = os.Stderr
err := onConnectCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
@ -158,9 +163,9 @@ outer:
c.conn.NetConn().Close() // close socket in case it has not been closed yet
if runOnConnectCmd != nil {
runOnConnectCmd.Process.Signal(os.Interrupt)
runOnConnectCmd.Wait()
if onConnectCmd != nil {
onConnectCmd.Process.Signal(os.Interrupt)
onConnectCmd.Wait()
}
close(c.done) // close() never blocks
@ -311,14 +316,14 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
pconf := c.p.findConfForPath(path)
if pconf == nil {
confp := c.p.findConfForPath(path)
if confp == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
return false
}
err := c.authenticate(pconf.readIpsParsed, pconf.ReadUser, pconf.ReadPass, req)
err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return false
@ -326,11 +331,11 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return true
}
c.describeRes = make(chan []byte)
c.describeRes = make(chan describeRes)
c.p.events <- programEventClientDescribe{c, path}
sdp := <-c.describeRes
if sdp == nil {
c.writeResError(req, gortsplib.StatusNotFound, fmt.Errorf("no one is publishing on path '%s'", path))
describeRes := <-c.describeRes
if describeRes.err != nil {
c.writeResError(req, gortsplib.StatusNotFound, describeRes.err)
return false
}
@ -341,7 +346,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
"Content-Base": gortsplib.HeaderValue{req.Url.String() + "/"},
"Content-Type": gortsplib.HeaderValue{"application/sdp"},
},
Content: sdp,
Content: describeRes.sdp,
})
return true
@ -357,14 +362,14 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
pconf := c.p.findConfForPath(path)
if pconf == nil {
confp := c.p.findConfForPath(path)
if confp == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
return false
}
err := c.authenticate(pconf.publishIpsParsed, pconf.PublishUser, pconf.PublishPass, req)
err := c.authenticate(confp.publishIpsParsed, confp.PublishUser, confp.PublishPass, req)
if err != nil {
if err == errAuthCritical {
return false
@ -435,14 +440,14 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
switch c.state {
// play
case clientStateInitial, clientStatePrePlay:
pconf := c.p.findConfForPath(path)
if pconf == nil {
confp := c.p.findConfForPath(path)
if confp == nil {
c.writeResError(req, gortsplib.StatusBadRequest,
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
return false
}
err := c.authenticate(pconf.readIpsParsed, pconf.ReadUser, pconf.ReadPass, req)
err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req)
if err != nil {
if err == errAuthCritical {
return false
@ -473,7 +478,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
if c.path != "" && path != c.path {
if c.pathId != "" && path != c.pathId {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
@ -513,7 +518,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
if c.path != "" && path != c.path {
if c.pathId != "" && path != c.pathId {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
@ -559,8 +564,8 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
// after ANNOUNCE, c.path is already set
if path != c.path {
// after ANNOUNCE, c.pathId is already set
if path != c.pathId {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
@ -593,7 +598,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
if len(c.streamTracks) >= len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) {
if len(c.streamTracks) >= len(c.p.paths[c.pathId].publisherSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return false
}
@ -645,7 +650,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
if len(c.streamTracks) >= len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) {
if len(c.streamTracks) >= len(c.p.paths[c.pathId].publisherSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return false
}
@ -689,7 +694,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
if path != c.path {
if path != c.pathId {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
@ -724,12 +729,12 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}
if path != c.path {
if path != c.pathId {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
return false
}
if len(c.streamTracks) != len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) {
if len(c.streamTracks) != len(c.p.paths[c.pathId].publisherSdpParsed.MediaDescriptions) {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup"))
return false
}
@ -756,7 +761,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
}
func (c *client) runPlay(path string) {
pconf := c.p.findConfForPath(path)
confp := c.p.findConfForPath(path)
if c.streamProtocol == gortsplib.StreamProtocolTcp {
c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize)
@ -767,19 +772,19 @@ func (c *client) runPlay(path string) {
c.p.events <- programEventClientPlay2{done, c}
<-done
c.log("is receiving on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
c.log("is receiving on path '%s', %d %s via %s", c.pathId, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
return "tracks"
}(), c.streamProtocol)
var runOnReadCmd *exec.Cmd
if pconf.RunOnRead != "" {
runOnReadCmd = exec.Command("/bin/sh", "-c", pconf.RunOnRead)
runOnReadCmd.Stdout = os.Stdout
runOnReadCmd.Stderr = os.Stderr
err := runOnReadCmd.Start()
var onReadCmd *exec.Cmd
if confp.RunOnRead != "" {
onReadCmd = exec.Command("/bin/sh", "-c", confp.RunOnRead)
onReadCmd.Stdout = os.Stdout
onReadCmd.Stderr = os.Stderr
err := onReadCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
@ -848,14 +853,14 @@ func (c *client) runPlay(path string) {
close(c.events)
}
if runOnReadCmd != nil {
runOnReadCmd.Process.Signal(os.Interrupt)
runOnReadCmd.Wait()
if onReadCmd != nil {
onReadCmd.Process.Signal(os.Interrupt)
onReadCmd.Wait()
}
}
func (c *client) runRecord(path string) {
pconf := c.p.findConfForPath(path)
confp := c.p.findConfForPath(path)
c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks {
@ -866,19 +871,19 @@ func (c *client) runRecord(path string) {
c.p.events <- programEventClientRecord{done, c}
<-done
c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
c.log("is publishing on path '%s', %d %s via %s", c.pathId, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
return "tracks"
}(), c.streamProtocol)
var runOnPublishCmd *exec.Cmd
if pconf.RunOnPublish != "" {
runOnPublishCmd = exec.Command("/bin/sh", "-c", pconf.RunOnPublish)
runOnPublishCmd.Stdout = os.Stdout
runOnPublishCmd.Stderr = os.Stderr
err := runOnPublishCmd.Start()
var onPublishCmd *exec.Cmd
if confp.RunOnPublish != "" {
onPublishCmd = exec.Command("/bin/sh", "-c", confp.RunOnPublish)
onPublishCmd.Stdout = os.Stdout
onPublishCmd.Stderr = os.Stderr
err := onPublishCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
@ -967,7 +972,7 @@ func (c *client) runRecord(path string) {
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.events <- programEventClientFrameTcp{
c.path,
c.pathId,
frame.TrackId,
frame.StreamType,
frame.Content,
@ -1017,8 +1022,8 @@ func (c *client) runRecord(path string) {
c.rtcpReceivers[trackId].Close()
}
if runOnPublishCmd != nil {
runOnPublishCmd.Process.Signal(os.Interrupt)
runOnPublishCmd.Wait()
if onPublishCmd != nil {
onPublishCmd.Process.Signal(os.Interrupt)
onPublishCmd.Wait()
}
}

119
conf.go
View File

@ -26,6 +26,7 @@ type confPath struct {
ReadPass string `yaml:"readPass"`
ReadIps []string `yaml:"readIps"`
readIpsParsed []interface{}
RunOnDemand string `yaml:"runOnDemand"`
RunOnPublish string `yaml:"runOnPublish"`
RunOnRead string `yaml:"runOnRead"`
}
@ -148,91 +149,95 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
}
}
for path, pconf := range conf.Paths {
if pconf == nil {
for path, confp := range conf.Paths {
if confp == nil {
conf.Paths[path] = &confPath{}
pconf = conf.Paths[path]
confp = conf.Paths[path]
}
if pconf.Source == "" {
pconf.Source = "record"
if confp.Source == "" {
confp.Source = "record"
}
if pconf.PublishUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishUser) {
return nil, fmt.Errorf("publish username must be alphanumeric")
}
}
if pconf.PublishPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishPass) {
return nil, fmt.Errorf("publish password must be alphanumeric")
}
}
pconf.publishIpsParsed, err = parseIpCidrList(pconf.PublishIps)
if err != nil {
return nil, err
}
if pconf.ReadUser != "" && pconf.ReadPass == "" || pconf.ReadUser == "" && pconf.ReadPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
if pconf.ReadUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.ReadUser) {
return nil, fmt.Errorf("read username must be alphanumeric")
}
}
if pconf.ReadPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.ReadPass) {
return nil, fmt.Errorf("read password must be alphanumeric")
}
}
if pconf.ReadUser != "" && pconf.ReadPass == "" || pconf.ReadUser == "" && pconf.ReadPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
pconf.readIpsParsed, err = parseIpCidrList(pconf.ReadIps)
if err != nil {
return nil, err
}
if pconf.Source != "record" {
if confp.Source != "record" {
if path == "all" {
return nil, fmt.Errorf("path 'all' cannot have a RTSP source")
}
if pconf.SourceProtocol == "" {
pconf.SourceProtocol = "udp"
if confp.SourceProtocol == "" {
confp.SourceProtocol = "udp"
}
pconf.sourceUrl, err = url.Parse(pconf.Source)
confp.sourceUrl, err = url.Parse(confp.Source)
if err != nil {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source)
return nil, fmt.Errorf("'%s' is not a valid RTSP url", confp.Source)
}
if pconf.sourceUrl.Scheme != "rtsp" {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source)
if confp.sourceUrl.Scheme != "rtsp" {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", confp.Source)
}
if pconf.sourceUrl.Port() == "" {
pconf.sourceUrl.Host += ":554"
if confp.sourceUrl.Port() == "" {
confp.sourceUrl.Host += ":554"
}
if pconf.sourceUrl.User != nil {
pass, _ := pconf.sourceUrl.User.Password()
user := pconf.sourceUrl.User.Username()
if confp.sourceUrl.User != nil {
pass, _ := confp.sourceUrl.User.Password()
user := confp.sourceUrl.User.Username()
if user != "" && pass == "" ||
user == "" && pass != "" {
fmt.Errorf("username and password must be both provided")
}
}
switch pconf.SourceProtocol {
switch confp.SourceProtocol {
case "udp":
pconf.sourceProtocolParsed = gortsplib.StreamProtocolUdp
confp.sourceProtocolParsed = gortsplib.StreamProtocolUdp
case "tcp":
pconf.sourceProtocolParsed = gortsplib.StreamProtocolTcp
confp.sourceProtocolParsed = gortsplib.StreamProtocolTcp
default:
return nil, fmt.Errorf("unsupported protocol '%s'", pconf.SourceProtocol)
return nil, fmt.Errorf("unsupported protocol '%s'", confp.SourceProtocol)
}
}
if confp.PublishUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.PublishUser) {
return nil, fmt.Errorf("publish username must be alphanumeric")
}
}
if confp.PublishPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.PublishPass) {
return nil, fmt.Errorf("publish password must be alphanumeric")
}
}
confp.publishIpsParsed, err = parseIpCidrList(confp.PublishIps)
if err != nil {
return nil, err
}
if confp.ReadUser != "" && confp.ReadPass == "" || confp.ReadUser == "" && confp.ReadPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
if confp.ReadUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.ReadUser) {
return nil, fmt.Errorf("read username must be alphanumeric")
}
}
if confp.ReadPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.ReadPass) {
return nil, fmt.Errorf("read password must be alphanumeric")
}
}
if confp.ReadUser != "" && confp.ReadPass == "" || confp.ReadUser == "" && confp.ReadPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
confp.readIpsParsed, err = parseIpCidrList(confp.ReadIps)
if err != nil {
return nil, err
}
if confp.RunOnDemand != "" && path == "all" {
return nil, fmt.Errorf("option 'runOnDemand' cannot be used in path 'all'")
}
}
return conf, nil

120
main.go
View File

@ -145,12 +145,6 @@ type programEventSourceFrame struct {
func (programEventSourceFrame) isProgramEvent() {}
type programEventSourceReset struct {
source *source
}
func (programEventSourceReset) isProgramEvent() {}
type programEventTerminate struct{}
func (programEventTerminate) isProgramEvent() {}
@ -197,11 +191,17 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
done: make(chan struct{}),
}
for path, pconf := range conf.Paths {
if pconf.Source != "record" {
s := newSource(p, path, pconf)
for path, confp := range conf.Paths {
if path == "all" {
continue
}
newPath(p, path, confp, true)
if confp.Source != "record" {
s := newSource(p, path, confp)
p.sources = append(p.sources, s)
p.paths[path] = newPath(p, path, s)
p.paths[path].publisher = s
}
}
@ -265,21 +265,21 @@ outer:
case rawEvt := <-p.events:
switch evt := rawEvt.(type) {
case programEventClientNew:
c := newServerClient(p, evt.nconn)
c := newClient(p, evt.nconn)
p.clients[c] = struct{}{}
c.log("connected")
case programEventClientClose:
delete(p.clients, evt.client)
if evt.client.path != "" {
if path, ok := p.paths[evt.client.path]; ok {
// if this is a publisher
if evt.client.pathId != "" {
if path, ok := p.paths[evt.client.pathId]; ok {
if path.publisher == evt.client {
path.publisherReset()
path.publisherRemove()
// delete the path
delete(p.paths, evt.client.path)
if !path.permanent {
delete(p.paths, evt.client.pathId)
}
}
}
}
@ -289,35 +289,30 @@ outer:
case programEventClientDescribe:
path, ok := p.paths[evt.path]
// no path: return 404
if !ok {
evt.client.describeRes <- nil
evt.client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", evt.path)}
continue
}
sdpText, wait := path.describe()
if wait {
evt.client.path = evt.path
evt.client.state = clientStateWaitingDescription
continue
}
evt.client.describeRes <- sdpText
path.describe(evt.client)
case programEventClientAnnounce:
_, ok := p.paths[evt.path]
if ok {
evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path)
continue
if path, ok := p.paths[evt.path]; ok {
if path.publisher != nil {
evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path)
continue
}
} else {
newPath(p, evt.path, p.findConfForPath(evt.path), false)
}
evt.client.path = evt.path
evt.client.state = clientStateAnnounce
p.paths[evt.path] = newPath(p, evt.path, evt.client)
p.paths[evt.path].publisher = evt.client
p.paths[evt.path].publisherSdpText = evt.sdpText
p.paths[evt.path].publisherSdpParsed = evt.sdpParsed
evt.client.pathId = evt.path
evt.client.state = clientStateAnnounce
evt.res <- nil
case programEventClientSetupPlay:
@ -332,7 +327,7 @@ outer:
continue
}
evt.client.path = evt.path
evt.client.pathId = evt.path
evt.client.streamProtocol = evt.protocol
evt.client.streamTracks = append(evt.client.streamTracks, &clientTrack{
rtpPort: evt.rtpPort,
@ -351,9 +346,9 @@ outer:
evt.res <- nil
case programEventClientPlay1:
path, ok := p.paths[evt.client.path]
path, ok := p.paths[evt.client.pathId]
if !ok || !path.publisherReady {
evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.client.path)
evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.client.pathId)
continue
}
@ -377,13 +372,13 @@ outer:
case programEventClientRecord:
p.publisherCount += 1
evt.client.state = clientStateRecord
p.paths[evt.client.path].publisherSetReady()
p.paths[evt.client.pathId].publisherSetReady()
close(evt.done)
case programEventClientRecordStop:
p.publisherCount -= 1
evt.client.state = clientStatePreRecord
p.paths[evt.client.path].publisherSetNotReady()
p.paths[evt.client.pathId].publisherSetNotReady()
close(evt.done)
case programEventClientFrameUdp:
@ -393,24 +388,21 @@ outer:
}
client.rtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf)
p.forwardFrame(client.path, trackId, evt.streamType, evt.buf)
p.forwardFrame(client.pathId, trackId, evt.streamType, evt.buf)
case programEventClientFrameTcp:
p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf)
case programEventSourceReady:
evt.source.log("ready")
p.paths[evt.source.path].publisherSetReady()
p.paths[evt.source.pathId].publisherSetReady()
case programEventSourceNotReady:
evt.source.log("not ready")
p.paths[evt.source.path].publisherSetNotReady()
p.paths[evt.source.pathId].publisherSetNotReady()
case programEventSourceFrame:
p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf)
case programEventSourceReset:
p.paths[evt.source.path].publisherReset()
p.forwardFrame(evt.source.pathId, evt.trackId, evt.streamType, evt.buf)
case programEventTerminate:
break outer
@ -425,7 +417,7 @@ outer:
close(evt.done)
case programEventClientDescribe:
evt.client.describeRes <- nil
evt.client.describeRes <- describeRes{nil, fmt.Errorf("terminated")}
case programEventClientAnnounce:
evt.res <- fmt.Errorf("terminated")
@ -478,12 +470,12 @@ func (p *program) close() {
}
func (p *program) findConfForPath(path string) *confPath {
if pconf, ok := p.conf.Paths[path]; ok {
return pconf
if confp, ok := p.conf.Paths[path]; ok {
return confp
}
if pconf, ok := p.conf.Paths["all"]; ok {
return pconf
if confp, ok := p.conf.Paths["all"]; ok {
return confp
}
return nil
@ -518,35 +510,35 @@ func (p *program) findClientPublisher(addr *net.UDPAddr, streamType gortsplib.St
}
func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) {
for client := range p.clients {
if client.path == path && client.state == clientStatePlay {
if client.streamProtocol == gortsplib.StreamProtocolUdp {
for c := range p.clients {
if c.pathId == path && c.state == clientStatePlay {
if c.streamProtocol == gortsplib.StreamProtocolUdp {
if streamType == gortsplib.StreamTypeRtp {
p.rtpl.write(&udpAddrBufPair{
addr: &net.UDPAddr{
IP: client.ip(),
Zone: client.zone(),
Port: client.streamTracks[trackId].rtpPort,
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[trackId].rtpPort,
},
buf: frame,
})
} else {
p.rtcpl.write(&udpAddrBufPair{
addr: &net.UDPAddr{
IP: client.ip(),
Zone: client.zone(),
Port: client.streamTracks[trackId].rtcpPort,
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[trackId].rtcpPort,
},
buf: frame,
})
}
} else {
buf := client.writeBuf.swap()
buf := c.writeBuf.swap()
buf = buf[:len(frame)]
copy(buf, frame)
client.events <- clientEventFrameTcp{
c.events <- clientEventFrameTcp{
frame: &gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,

180
path.go
View File

@ -1,6 +1,9 @@
package main
import (
"fmt"
"os"
"os/exec"
"time"
"github.com/aler9/sdp/v3"
@ -14,97 +17,182 @@ type publisher interface {
type path struct {
p *program
id string
confp *confPath
permanent bool
publisher publisher
publisherReady bool
publisherSdpText []byte
publisherSdpParsed *sdp.SessionDescription
lastRequested time.Time
lastActivation time.Time
onDemandCmd *exec.Cmd
}
func newPath(p *program, id string, publisher publisher) *path {
return &path{
func newPath(p *program, id string, confp *confPath, permanent bool) {
pa := &path{
p: p,
id: id,
publisher: publisher,
confp: confp,
permanent: permanent,
}
p.paths[id] = pa
}
func (p *path) check() {
hasClients := func() bool {
for c := range p.p.clients {
if c.path == p.id {
func (pa *path) check() {
hasClientsWaitingDescribe := func() bool {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription && c.pathId == pa.id {
return true
}
}
return false
}()
source, publisherIsSource := p.publisher.(*source)
// stop source if needed
if !hasClients &&
publisherIsSource &&
source.state == sourceStateRunning &&
time.Since(p.lastRequested) >= 10*time.Second {
source.log("stopping due to inactivity")
source.state = sourceStateStopped
source.events <- sourceEventApplyState{source.state}
// reply to DESCRIBE requests if they are in timeout
if hasClientsWaitingDescribe &&
time.Since(pa.lastActivation) >= 5*time.Second {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathId == pa.id {
c.pathId = ""
c.state = clientStateInitial
c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.id)}
}
}
// perform actions below in next run
return
}
if source, ok := pa.publisher.(*source); ok {
if source.state == sourceStateRunning {
hasClients := func() bool {
for c := range pa.p.clients {
if c.pathId == pa.id {
return true
}
}
return false
}()
// stop source if needed
if !hasClients &&
time.Since(pa.lastRequested) >= 10*time.Second {
source.log("stopping since we're not requested anymore")
source.state = sourceStateStopped
source.events <- sourceEventApplyState{source.state}
}
}
} else {
if pa.onDemandCmd != nil {
hasClientReaders := func() bool {
for c := range pa.p.clients {
if c.pathId == pa.id && c != pa.publisher {
return true
}
}
return false
}()
// stop on demand command if needed
if !hasClientReaders &&
time.Since(pa.lastRequested) >= 10*time.Second {
pa.p.log("stopping on demand command since it is not requested anymore")
pa.onDemandCmd.Process.Signal(os.Interrupt)
pa.onDemandCmd.Wait()
pa.onDemandCmd = nil
}
}
}
}
func (p *path) describe() ([]byte, bool) {
p.lastRequested = time.Now()
func (pa *path) describe(client *client) {
pa.lastRequested = time.Now()
// publisher was found but is not ready: wait
if !p.publisherReady {
// publisher not found
if pa.publisher == nil {
if pa.confp.RunOnDemand != "" {
if pa.onDemandCmd == nil {
pa.p.log("starting on demand command")
pa.lastActivation = time.Now()
pa.onDemandCmd = exec.Command("/bin/sh", "-c", pa.confp.RunOnDemand)
pa.onDemandCmd.Stdout = os.Stdout
pa.onDemandCmd.Stderr = os.Stderr
err := pa.onDemandCmd.Start()
if err != nil {
pa.p.log("ERR: %s", err)
}
}
client.pathId = pa.id
client.state = clientStateWaitingDescription
return
} else {
client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.id)}
return
}
}
// publisher was found but is not ready: put the client on hold
if !pa.publisherReady {
// start source if needed
if source, ok := p.publisher.(*source); ok && source.state == sourceStateStopped {
if source, ok := pa.publisher.(*source); ok && source.state == sourceStateStopped {
source.log("starting on demand")
pa.lastActivation = time.Now()
source.state = sourceStateRunning
source.events <- sourceEventApplyState{source.state}
}
return nil, true
client.pathId = pa.id
client.state = clientStateWaitingDescription
return
}
// publisher was found and is ready
return p.publisherSdpText, false
client.describeRes <- describeRes{pa.publisherSdpText, nil}
}
func (p *path) publisherSetReady() {
p.publisherReady = true
func (pa *path) publisherRemove() {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.pathId == pa.id {
c.pathId = ""
c.state = clientStateInitial
c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' is not available anymore", pa.id)}
}
}
pa.publisher = nil
}
func (pa *path) publisherSetReady() {
pa.publisherReady = true
// reply to all clients that are waiting for a description
for c := range p.p.clients {
for c := range pa.p.clients {
if c.state == clientStateWaitingDescription &&
c.path == p.id {
c.path = ""
c.pathId == pa.id {
c.pathId = ""
c.state = clientStateInitial
c.describeRes <- p.publisherSdpText
c.describeRes <- describeRes{pa.publisherSdpText, nil}
}
}
}
func (p *path) publisherSetNotReady() {
p.publisherReady = false
func (pa *path) publisherSetNotReady() {
pa.publisherReady = false
// close all clients that are reading
for c := range p.p.clients {
for c := range pa.p.clients {
if c.state != clientStateWaitingDescription &&
c != p.publisher &&
c.path == p.id {
c != pa.publisher &&
c.pathId == pa.id {
c.conn.NetConn().Close()
}
}
}
func (p *path) publisherReset() {
// reply to all clients that were waiting for a description
for oc := range p.p.clients {
if oc.state == clientStateWaitingDescription &&
oc.path == p.id {
oc.path = ""
oc.state = clientStateInitial
oc.describeRes <- nil
}
}
}

View File

@ -48,6 +48,11 @@ paths:
# IPs or networks (x.x.x.x/24) allowed to read
readIps: []
# command to run when this path is requested.
# This can be used, for example, to publish a stream on demand.
# This is terminated with SIGINT when a client stops publishing.
runOnDemand:
# command to run when a client starts publishing.
# This is terminated with SIGINT when a client stops publishing.
runOnPublish:

View File

@ -39,8 +39,8 @@ func (sourceEventTerminate) isSourceEvent() {}
type source struct {
p *program
path string
pconf *confPath
pathId string
confp *confPath
state sourceState
tracks []*gortsplib.Track
@ -48,16 +48,16 @@ type source struct {
done chan struct{}
}
func newSource(p *program, path string, pconf *confPath) *source {
func newSource(p *program, pathId string, confp *confPath) *source {
s := &source{
p: p,
path: path,
pconf: pconf,
pathId: pathId,
confp: confp,
events: make(chan sourceEvent),
done: make(chan struct{}),
}
if pconf.SourceOnDemand {
if confp.SourceOnDemand {
s.state = sourceStateStopped
} else {
s.state = sourceStateRunning
@ -67,7 +67,7 @@ func newSource(p *program, path string, pconf *confPath) *source {
}
func (s *source) log(format string, args ...interface{}) {
s.p.log("[source "+s.path+"] "+format, args...)
s.p.log("[source "+s.pathId+"] "+format, args...)
}
func (s *source) isPublisher() {}
@ -121,23 +121,24 @@ func (s *source) do(terminate chan struct{}, done chan struct{}) {
defer close(done)
for {
ok := s.doInner(terminate)
if !ok {
break
}
ok := func() bool {
ok := s.doInner(terminate)
if !ok {
return false
}
s.p.events <- programEventSourceReset{s}
if !func() bool {
t := time.NewTimer(sourceRetryInterval)
defer t.Stop()
select {
case <-terminate:
return false
case <-t.C:
return true
}
}() {
return true
}()
if !ok {
break
}
}
@ -151,7 +152,7 @@ func (s *source) doInner(terminate chan struct{}) bool {
dialDone := make(chan struct{})
go func() {
conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{
Host: s.pconf.sourceUrl.Host,
Host: s.confp.sourceUrl.Host,
ReadTimeout: s.p.conf.ReadTimeout,
WriteTimeout: s.p.conf.WriteTimeout,
})
@ -171,13 +172,13 @@ func (s *source) doInner(terminate chan struct{}) bool {
defer conn.Close()
_, err = conn.Options(s.pconf.sourceUrl)
_, err = conn.Options(s.confp.sourceUrl)
if err != nil {
s.log("ERR: %s", err)
return true
}
tracks, _, err := conn.Describe(s.pconf.sourceUrl)
tracks, _, err := conn.Describe(s.confp.sourceUrl)
if err != nil {
s.log("ERR: %s", err)
return true
@ -187,10 +188,10 @@ func (s *source) doInner(terminate chan struct{}) bool {
serverSdpParsed, serverSdpText := sdpForServer(tracks)
s.tracks = tracks
s.p.paths[s.path].publisherSdpText = serverSdpText
s.p.paths[s.path].publisherSdpParsed = serverSdpParsed
s.p.paths[s.pathId].publisherSdpText = serverSdpText
s.p.paths[s.pathId].publisherSdpParsed = serverSdpParsed
if s.pconf.sourceProtocolParsed == gortsplib.StreamProtocolUdp {
if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUdp {
return s.runUdp(terminate, conn)
} else {
return s.runTcp(terminate, conn)
@ -215,7 +216,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort := rtpPort + 1
rtpl, rtcpl, _, err = conn.SetupUdp(s.pconf.sourceUrl, track, rtpPort, rtcpPort)
rtpl, rtcpl, _, err = conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort)
if err != nil {
// retry if it's a bind error
if nerr, ok := err.(*net.OpError); ok {
@ -239,7 +240,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
})
}
_, err := conn.Play(s.pconf.sourceUrl)
_, err := conn.Play(s.confp.sourceUrl)
if err != nil {
s.log("ERR: %s", err)
return true
@ -289,7 +290,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
tcpConnDone := make(chan error)
go func() {
tcpConnDone <- conn.LoopUDP(s.pconf.sourceUrl)
tcpConnDone <- conn.LoopUDP(s.confp.sourceUrl)
}()
var ret bool
@ -323,14 +324,14 @@ outer:
func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) bool {
for _, track := range s.tracks {
_, err := conn.SetupTcp(s.pconf.sourceUrl, track)
_, err := conn.SetupTcp(s.confp.sourceUrl, track)
if err != nil {
s.log("ERR: %s", err)
return true
}
}
_, err := conn.Play(s.pconf.sourceUrl)
_, err := conn.Play(s.confp.sourceUrl)
if err != nil {
s.log("ERR: %s", err)
return true

View File

@ -74,6 +74,7 @@ func (db *doubleBuffer) swap() []byte {
return ret
}
// generate a sdp from scratch
func sdpForServer(tracks []*gortsplib.Track) (*sdp.SessionDescription, []byte) {
sout := &sdp.SessionDescription{
SessionName: func() *sdp.SessionName {