diff --git a/Makefile b/Makefile index e22b30ad..2e9a9ee8 100644 --- a/Makefile +++ b/Makefile @@ -75,9 +75,10 @@ paths: # readPass: tast proxied: - source: rtsp://192.168.2.198:8554/stream - sourceProtocol: tcp - sourceOnDemand: yes +# source: rtsp://192.168.2.198:8554/stream +# sourceProtocol: tcp +# sourceOnDemand: yes + runOnDemand: ffmpeg -i rtsp://192.168.2.198:8554/stream -c copy -f rtsp rtsp://localhost:8554/proxied2 # original: # runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed diff --git a/client.go b/client.go index be50cbbc..f55f98d5 100644 --- a/client.go +++ b/client.go @@ -23,6 +23,11 @@ const ( clientUdpWriteBufferSize = 128 * 1024 ) +type describeRes struct { + sdp []byte + err error +} + type clientTrack struct { rtpPort int rtcpPort int @@ -77,7 +82,7 @@ type client struct { p *program conn *gortsplib.ConnServer state clientState - path string + pathId string authUser string authPass string authHelper *gortsplib.AuthServer @@ -88,12 +93,12 @@ type client struct { readBuf *doubleBuffer writeBuf *doubleBuffer - describeRes chan []byte + describeRes chan describeRes events chan clientEvent // only if state = Play and gortsplib.StreamProtocol = TCP done chan struct{} } -func newServerClient(p *program, nconn net.Conn) *client { +func newClient(p *program, nconn net.Conn) *client { c := &client{ p: p, conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{ @@ -125,12 +130,12 @@ func (c *client) zone() string { } func (c *client) run() { - var runOnConnectCmd *exec.Cmd + var onConnectCmd *exec.Cmd if c.p.conf.RunOnConnect != "" { - runOnConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect) - runOnConnectCmd.Stdout = os.Stdout - runOnConnectCmd.Stderr = os.Stderr - err := runOnConnectCmd.Start() + onConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect) + onConnectCmd.Stdout = os.Stdout + onConnectCmd.Stderr = os.Stderr + err := onConnectCmd.Start() if err != nil { c.log("ERR: %s", err) } @@ -158,9 +163,9 @@ outer: c.conn.NetConn().Close() // close socket in case it has not been closed yet - if runOnConnectCmd != nil { - runOnConnectCmd.Process.Signal(os.Interrupt) - runOnConnectCmd.Wait() + if onConnectCmd != nil { + onConnectCmd.Process.Signal(os.Interrupt) + onConnectCmd.Wait() } close(c.done) // close() never blocks @@ -311,14 +316,14 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - pconf := c.p.findConfForPath(path) - if pconf == nil { + confp := c.p.findConfForPath(path) + if confp == nil { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unable to find a valid configuration for path '%s'", path)) return false } - err := c.authenticate(pconf.readIpsParsed, pconf.ReadUser, pconf.ReadPass, req) + err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req) if err != nil { if err == errAuthCritical { return false @@ -326,11 +331,11 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return true } - c.describeRes = make(chan []byte) + c.describeRes = make(chan describeRes) c.p.events <- programEventClientDescribe{c, path} - sdp := <-c.describeRes - if sdp == nil { - c.writeResError(req, gortsplib.StatusNotFound, fmt.Errorf("no one is publishing on path '%s'", path)) + describeRes := <-c.describeRes + if describeRes.err != nil { + c.writeResError(req, gortsplib.StatusNotFound, describeRes.err) return false } @@ -341,7 +346,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { "Content-Base": gortsplib.HeaderValue{req.Url.String() + "/"}, "Content-Type": gortsplib.HeaderValue{"application/sdp"}, }, - Content: sdp, + Content: describeRes.sdp, }) return true @@ -357,14 +362,14 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - pconf := c.p.findConfForPath(path) - if pconf == nil { + confp := c.p.findConfForPath(path) + if confp == nil { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unable to find a valid configuration for path '%s'", path)) return false } - err := c.authenticate(pconf.publishIpsParsed, pconf.PublishUser, pconf.PublishPass, req) + err := c.authenticate(confp.publishIpsParsed, confp.PublishUser, confp.PublishPass, req) if err != nil { if err == errAuthCritical { return false @@ -435,14 +440,14 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { switch c.state { // play case clientStateInitial, clientStatePrePlay: - pconf := c.p.findConfForPath(path) - if pconf == nil { + confp := c.p.findConfForPath(path) + if confp == nil { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unable to find a valid configuration for path '%s'", path)) return false } - err := c.authenticate(pconf.readIpsParsed, pconf.ReadUser, pconf.ReadPass, req) + err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req) if err != nil { if err == errAuthCritical { return false @@ -473,7 +478,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if c.path != "" && path != c.path { + if c.pathId != "" && path != c.pathId { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed")) return false } @@ -513,7 +518,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if c.path != "" && path != c.path { + if c.pathId != "" && path != c.pathId { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed")) return false } @@ -559,8 +564,8 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - // after ANNOUNCE, c.path is already set - if path != c.path { + // after ANNOUNCE, c.pathId is already set + if path != c.pathId { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed")) return false } @@ -593,7 +598,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) >= len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) { + if len(c.streamTracks) >= len(c.p.paths[c.pathId].publisherSdpParsed.MediaDescriptions) { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return false } @@ -645,7 +650,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if len(c.streamTracks) >= len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) { + if len(c.streamTracks) >= len(c.p.paths[c.pathId].publisherSdpParsed.MediaDescriptions) { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) return false } @@ -689,7 +694,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if path != c.path { + if path != c.pathId { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed")) return false } @@ -724,12 +729,12 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if path != c.path { + if path != c.pathId { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed")) return false } - if len(c.streamTracks) != len(c.p.paths[c.path].publisherSdpParsed.MediaDescriptions) { + if len(c.streamTracks) != len(c.p.paths[c.pathId].publisherSdpParsed.MediaDescriptions) { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup")) return false } @@ -756,7 +761,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { } func (c *client) runPlay(path string) { - pconf := c.p.findConfForPath(path) + confp := c.p.findConfForPath(path) if c.streamProtocol == gortsplib.StreamProtocolTcp { c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize) @@ -767,19 +772,19 @@ func (c *client) runPlay(path string) { c.p.events <- programEventClientPlay2{done, c} <-done - c.log("is receiving on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string { + c.log("is receiving on path '%s', %d %s via %s", c.pathId, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" } return "tracks" }(), c.streamProtocol) - var runOnReadCmd *exec.Cmd - if pconf.RunOnRead != "" { - runOnReadCmd = exec.Command("/bin/sh", "-c", pconf.RunOnRead) - runOnReadCmd.Stdout = os.Stdout - runOnReadCmd.Stderr = os.Stderr - err := runOnReadCmd.Start() + var onReadCmd *exec.Cmd + if confp.RunOnRead != "" { + onReadCmd = exec.Command("/bin/sh", "-c", confp.RunOnRead) + onReadCmd.Stdout = os.Stdout + onReadCmd.Stderr = os.Stderr + err := onReadCmd.Start() if err != nil { c.log("ERR: %s", err) } @@ -848,14 +853,14 @@ func (c *client) runPlay(path string) { close(c.events) } - if runOnReadCmd != nil { - runOnReadCmd.Process.Signal(os.Interrupt) - runOnReadCmd.Wait() + if onReadCmd != nil { + onReadCmd.Process.Signal(os.Interrupt) + onReadCmd.Wait() } } func (c *client) runRecord(path string) { - pconf := c.p.findConfForPath(path) + confp := c.p.findConfForPath(path) c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks)) for trackId := range c.streamTracks { @@ -866,19 +871,19 @@ func (c *client) runRecord(path string) { c.p.events <- programEventClientRecord{done, c} <-done - c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string { + c.log("is publishing on path '%s', %d %s via %s", c.pathId, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" } return "tracks" }(), c.streamProtocol) - var runOnPublishCmd *exec.Cmd - if pconf.RunOnPublish != "" { - runOnPublishCmd = exec.Command("/bin/sh", "-c", pconf.RunOnPublish) - runOnPublishCmd.Stdout = os.Stdout - runOnPublishCmd.Stderr = os.Stderr - err := runOnPublishCmd.Start() + var onPublishCmd *exec.Cmd + if confp.RunOnPublish != "" { + onPublishCmd = exec.Command("/bin/sh", "-c", confp.RunOnPublish) + onPublishCmd.Stdout = os.Stdout + onPublishCmd.Stderr = os.Stderr + err := onPublishCmd.Start() if err != nil { c.log("ERR: %s", err) } @@ -967,7 +972,7 @@ func (c *client) runRecord(path string) { c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.p.events <- programEventClientFrameTcp{ - c.path, + c.pathId, frame.TrackId, frame.StreamType, frame.Content, @@ -1017,8 +1022,8 @@ func (c *client) runRecord(path string) { c.rtcpReceivers[trackId].Close() } - if runOnPublishCmd != nil { - runOnPublishCmd.Process.Signal(os.Interrupt) - runOnPublishCmd.Wait() + if onPublishCmd != nil { + onPublishCmd.Process.Signal(os.Interrupt) + onPublishCmd.Wait() } } diff --git a/conf.go b/conf.go index 6ae3fdc5..f79e1fcc 100644 --- a/conf.go +++ b/conf.go @@ -26,6 +26,7 @@ type confPath struct { ReadPass string `yaml:"readPass"` ReadIps []string `yaml:"readIps"` readIpsParsed []interface{} + RunOnDemand string `yaml:"runOnDemand"` RunOnPublish string `yaml:"runOnPublish"` RunOnRead string `yaml:"runOnRead"` } @@ -148,91 +149,95 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { } } - for path, pconf := range conf.Paths { - if pconf == nil { + for path, confp := range conf.Paths { + if confp == nil { conf.Paths[path] = &confPath{} - pconf = conf.Paths[path] + confp = conf.Paths[path] } - if pconf.Source == "" { - pconf.Source = "record" + if confp.Source == "" { + confp.Source = "record" } - if pconf.PublishUser != "" { - if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishUser) { - return nil, fmt.Errorf("publish username must be alphanumeric") - } - } - if pconf.PublishPass != "" { - if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishPass) { - return nil, fmt.Errorf("publish password must be alphanumeric") - } - } - pconf.publishIpsParsed, err = parseIpCidrList(pconf.PublishIps) - if err != nil { - return nil, err - } - - if pconf.ReadUser != "" && pconf.ReadPass == "" || pconf.ReadUser == "" && pconf.ReadPass != "" { - return nil, fmt.Errorf("read username and password must be both filled") - } - if pconf.ReadUser != "" { - if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.ReadUser) { - return nil, fmt.Errorf("read username must be alphanumeric") - } - } - if pconf.ReadPass != "" { - if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.ReadPass) { - return nil, fmt.Errorf("read password must be alphanumeric") - } - } - if pconf.ReadUser != "" && pconf.ReadPass == "" || pconf.ReadUser == "" && pconf.ReadPass != "" { - return nil, fmt.Errorf("read username and password must be both filled") - } - pconf.readIpsParsed, err = parseIpCidrList(pconf.ReadIps) - if err != nil { - return nil, err - } - - if pconf.Source != "record" { + if confp.Source != "record" { if path == "all" { return nil, fmt.Errorf("path 'all' cannot have a RTSP source") } - if pconf.SourceProtocol == "" { - pconf.SourceProtocol = "udp" + if confp.SourceProtocol == "" { + confp.SourceProtocol = "udp" } - pconf.sourceUrl, err = url.Parse(pconf.Source) + confp.sourceUrl, err = url.Parse(confp.Source) if err != nil { - return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source) + return nil, fmt.Errorf("'%s' is not a valid RTSP url", confp.Source) } - if pconf.sourceUrl.Scheme != "rtsp" { - return nil, fmt.Errorf("'%s' is not a valid RTSP url", pconf.Source) + if confp.sourceUrl.Scheme != "rtsp" { + return nil, fmt.Errorf("'%s' is not a valid RTSP url", confp.Source) } - if pconf.sourceUrl.Port() == "" { - pconf.sourceUrl.Host += ":554" + if confp.sourceUrl.Port() == "" { + confp.sourceUrl.Host += ":554" } - if pconf.sourceUrl.User != nil { - pass, _ := pconf.sourceUrl.User.Password() - user := pconf.sourceUrl.User.Username() + if confp.sourceUrl.User != nil { + pass, _ := confp.sourceUrl.User.Password() + user := confp.sourceUrl.User.Username() if user != "" && pass == "" || user == "" && pass != "" { fmt.Errorf("username and password must be both provided") } } - switch pconf.SourceProtocol { + switch confp.SourceProtocol { case "udp": - pconf.sourceProtocolParsed = gortsplib.StreamProtocolUdp + confp.sourceProtocolParsed = gortsplib.StreamProtocolUdp case "tcp": - pconf.sourceProtocolParsed = gortsplib.StreamProtocolTcp + confp.sourceProtocolParsed = gortsplib.StreamProtocolTcp default: - return nil, fmt.Errorf("unsupported protocol '%s'", pconf.SourceProtocol) + return nil, fmt.Errorf("unsupported protocol '%s'", confp.SourceProtocol) } } + + if confp.PublishUser != "" { + if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.PublishUser) { + return nil, fmt.Errorf("publish username must be alphanumeric") + } + } + if confp.PublishPass != "" { + if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.PublishPass) { + return nil, fmt.Errorf("publish password must be alphanumeric") + } + } + confp.publishIpsParsed, err = parseIpCidrList(confp.PublishIps) + if err != nil { + return nil, err + } + + if confp.ReadUser != "" && confp.ReadPass == "" || confp.ReadUser == "" && confp.ReadPass != "" { + return nil, fmt.Errorf("read username and password must be both filled") + } + if confp.ReadUser != "" { + if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.ReadUser) { + return nil, fmt.Errorf("read username must be alphanumeric") + } + } + if confp.ReadPass != "" { + if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(confp.ReadPass) { + return nil, fmt.Errorf("read password must be alphanumeric") + } + } + if confp.ReadUser != "" && confp.ReadPass == "" || confp.ReadUser == "" && confp.ReadPass != "" { + return nil, fmt.Errorf("read username and password must be both filled") + } + confp.readIpsParsed, err = parseIpCidrList(confp.ReadIps) + if err != nil { + return nil, err + } + + if confp.RunOnDemand != "" && path == "all" { + return nil, fmt.Errorf("option 'runOnDemand' cannot be used in path 'all'") + } } return conf, nil diff --git a/main.go b/main.go index 6c390f27..45780c37 100644 --- a/main.go +++ b/main.go @@ -145,12 +145,6 @@ type programEventSourceFrame struct { func (programEventSourceFrame) isProgramEvent() {} -type programEventSourceReset struct { - source *source -} - -func (programEventSourceReset) isProgramEvent() {} - type programEventTerminate struct{} func (programEventTerminate) isProgramEvent() {} @@ -197,11 +191,17 @@ func newProgram(args []string, stdin io.Reader) (*program, error) { done: make(chan struct{}), } - for path, pconf := range conf.Paths { - if pconf.Source != "record" { - s := newSource(p, path, pconf) + for path, confp := range conf.Paths { + if path == "all" { + continue + } + + newPath(p, path, confp, true) + + if confp.Source != "record" { + s := newSource(p, path, confp) p.sources = append(p.sources, s) - p.paths[path] = newPath(p, path, s) + p.paths[path].publisher = s } } @@ -265,21 +265,21 @@ outer: case rawEvt := <-p.events: switch evt := rawEvt.(type) { case programEventClientNew: - c := newServerClient(p, evt.nconn) + c := newClient(p, evt.nconn) p.clients[c] = struct{}{} c.log("connected") case programEventClientClose: delete(p.clients, evt.client) - if evt.client.path != "" { - if path, ok := p.paths[evt.client.path]; ok { - // if this is a publisher + if evt.client.pathId != "" { + if path, ok := p.paths[evt.client.pathId]; ok { if path.publisher == evt.client { - path.publisherReset() + path.publisherRemove() - // delete the path - delete(p.paths, evt.client.path) + if !path.permanent { + delete(p.paths, evt.client.pathId) + } } } } @@ -289,35 +289,30 @@ outer: case programEventClientDescribe: path, ok := p.paths[evt.path] - - // no path: return 404 if !ok { - evt.client.describeRes <- nil + evt.client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", evt.path)} continue } - sdpText, wait := path.describe() - - if wait { - evt.client.path = evt.path - evt.client.state = clientStateWaitingDescription - continue - } - - evt.client.describeRes <- sdpText + path.describe(evt.client) case programEventClientAnnounce: - _, ok := p.paths[evt.path] - if ok { - evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path) - continue + if path, ok := p.paths[evt.path]; ok { + if path.publisher != nil { + evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path) + continue + } + + } else { + newPath(p, evt.path, p.findConfForPath(evt.path), false) } - evt.client.path = evt.path - evt.client.state = clientStateAnnounce - p.paths[evt.path] = newPath(p, evt.path, evt.client) + p.paths[evt.path].publisher = evt.client p.paths[evt.path].publisherSdpText = evt.sdpText p.paths[evt.path].publisherSdpParsed = evt.sdpParsed + + evt.client.pathId = evt.path + evt.client.state = clientStateAnnounce evt.res <- nil case programEventClientSetupPlay: @@ -332,7 +327,7 @@ outer: continue } - evt.client.path = evt.path + evt.client.pathId = evt.path evt.client.streamProtocol = evt.protocol evt.client.streamTracks = append(evt.client.streamTracks, &clientTrack{ rtpPort: evt.rtpPort, @@ -351,9 +346,9 @@ outer: evt.res <- nil case programEventClientPlay1: - path, ok := p.paths[evt.client.path] + path, ok := p.paths[evt.client.pathId] if !ok || !path.publisherReady { - evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.client.path) + evt.res <- fmt.Errorf("no one is publishing on path '%s'", evt.client.pathId) continue } @@ -377,13 +372,13 @@ outer: case programEventClientRecord: p.publisherCount += 1 evt.client.state = clientStateRecord - p.paths[evt.client.path].publisherSetReady() + p.paths[evt.client.pathId].publisherSetReady() close(evt.done) case programEventClientRecordStop: p.publisherCount -= 1 evt.client.state = clientStatePreRecord - p.paths[evt.client.path].publisherSetNotReady() + p.paths[evt.client.pathId].publisherSetNotReady() close(evt.done) case programEventClientFrameUdp: @@ -393,24 +388,21 @@ outer: } client.rtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf) - p.forwardFrame(client.path, trackId, evt.streamType, evt.buf) + p.forwardFrame(client.pathId, trackId, evt.streamType, evt.buf) case programEventClientFrameTcp: p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf) case programEventSourceReady: evt.source.log("ready") - p.paths[evt.source.path].publisherSetReady() + p.paths[evt.source.pathId].publisherSetReady() case programEventSourceNotReady: evt.source.log("not ready") - p.paths[evt.source.path].publisherSetNotReady() + p.paths[evt.source.pathId].publisherSetNotReady() case programEventSourceFrame: - p.forwardFrame(evt.source.path, evt.trackId, evt.streamType, evt.buf) - - case programEventSourceReset: - p.paths[evt.source.path].publisherReset() + p.forwardFrame(evt.source.pathId, evt.trackId, evt.streamType, evt.buf) case programEventTerminate: break outer @@ -425,7 +417,7 @@ outer: close(evt.done) case programEventClientDescribe: - evt.client.describeRes <- nil + evt.client.describeRes <- describeRes{nil, fmt.Errorf("terminated")} case programEventClientAnnounce: evt.res <- fmt.Errorf("terminated") @@ -478,12 +470,12 @@ func (p *program) close() { } func (p *program) findConfForPath(path string) *confPath { - if pconf, ok := p.conf.Paths[path]; ok { - return pconf + if confp, ok := p.conf.Paths[path]; ok { + return confp } - if pconf, ok := p.conf.Paths["all"]; ok { - return pconf + if confp, ok := p.conf.Paths["all"]; ok { + return confp } return nil @@ -518,35 +510,35 @@ func (p *program) findClientPublisher(addr *net.UDPAddr, streamType gortsplib.St } func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) { - for client := range p.clients { - if client.path == path && client.state == clientStatePlay { - if client.streamProtocol == gortsplib.StreamProtocolUdp { + for c := range p.clients { + if c.pathId == path && c.state == clientStatePlay { + if c.streamProtocol == gortsplib.StreamProtocolUdp { if streamType == gortsplib.StreamTypeRtp { p.rtpl.write(&udpAddrBufPair{ addr: &net.UDPAddr{ - IP: client.ip(), - Zone: client.zone(), - Port: client.streamTracks[trackId].rtpPort, + IP: c.ip(), + Zone: c.zone(), + Port: c.streamTracks[trackId].rtpPort, }, buf: frame, }) } else { p.rtcpl.write(&udpAddrBufPair{ addr: &net.UDPAddr{ - IP: client.ip(), - Zone: client.zone(), - Port: client.streamTracks[trackId].rtcpPort, + IP: c.ip(), + Zone: c.zone(), + Port: c.streamTracks[trackId].rtcpPort, }, buf: frame, }) } } else { - buf := client.writeBuf.swap() + buf := c.writeBuf.swap() buf = buf[:len(frame)] copy(buf, frame) - client.events <- clientEventFrameTcp{ + c.events <- clientEventFrameTcp{ frame: &gortsplib.InterleavedFrame{ TrackId: trackId, StreamType: streamType, diff --git a/path.go b/path.go index 862ebd93..4ac50917 100644 --- a/path.go +++ b/path.go @@ -1,6 +1,9 @@ package main import ( + "fmt" + "os" + "os/exec" "time" "github.com/aler9/sdp/v3" @@ -14,97 +17,182 @@ type publisher interface { type path struct { p *program id string + confp *confPath + permanent bool publisher publisher publisherReady bool publisherSdpText []byte publisherSdpParsed *sdp.SessionDescription lastRequested time.Time + lastActivation time.Time + onDemandCmd *exec.Cmd } -func newPath(p *program, id string, publisher publisher) *path { - return &path{ +func newPath(p *program, id string, confp *confPath, permanent bool) { + pa := &path{ p: p, id: id, - publisher: publisher, + confp: confp, + permanent: permanent, } + + p.paths[id] = pa } -func (p *path) check() { - hasClients := func() bool { - for c := range p.p.clients { - if c.path == p.id { +func (pa *path) check() { + hasClientsWaitingDescribe := func() bool { + for c := range pa.p.clients { + if c.state == clientStateWaitingDescription && c.pathId == pa.id { return true } } return false }() - source, publisherIsSource := p.publisher.(*source) - // stop source if needed - if !hasClients && - publisherIsSource && - source.state == sourceStateRunning && - time.Since(p.lastRequested) >= 10*time.Second { - source.log("stopping due to inactivity") - source.state = sourceStateStopped - source.events <- sourceEventApplyState{source.state} + // reply to DESCRIBE requests if they are in timeout + if hasClientsWaitingDescribe && + time.Since(pa.lastActivation) >= 5*time.Second { + for c := range pa.p.clients { + if c.state == clientStateWaitingDescription && + c.pathId == pa.id { + c.pathId = "" + c.state = clientStateInitial + c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.id)} + } + } + + // perform actions below in next run + return + } + + if source, ok := pa.publisher.(*source); ok { + if source.state == sourceStateRunning { + hasClients := func() bool { + for c := range pa.p.clients { + if c.pathId == pa.id { + return true + } + } + return false + }() + + // stop source if needed + if !hasClients && + time.Since(pa.lastRequested) >= 10*time.Second { + source.log("stopping since we're not requested anymore") + source.state = sourceStateStopped + source.events <- sourceEventApplyState{source.state} + } + } + + } else { + if pa.onDemandCmd != nil { + hasClientReaders := func() bool { + for c := range pa.p.clients { + if c.pathId == pa.id && c != pa.publisher { + return true + } + } + return false + }() + + // stop on demand command if needed + if !hasClientReaders && + time.Since(pa.lastRequested) >= 10*time.Second { + pa.p.log("stopping on demand command since it is not requested anymore") + + pa.onDemandCmd.Process.Signal(os.Interrupt) + pa.onDemandCmd.Wait() + pa.onDemandCmd = nil + } + } } } -func (p *path) describe() ([]byte, bool) { - p.lastRequested = time.Now() +func (pa *path) describe(client *client) { + pa.lastRequested = time.Now() - // publisher was found but is not ready: wait - if !p.publisherReady { + // publisher not found + if pa.publisher == nil { + if pa.confp.RunOnDemand != "" { + if pa.onDemandCmd == nil { + pa.p.log("starting on demand command") + + pa.lastActivation = time.Now() + pa.onDemandCmd = exec.Command("/bin/sh", "-c", pa.confp.RunOnDemand) + pa.onDemandCmd.Stdout = os.Stdout + pa.onDemandCmd.Stderr = os.Stderr + err := pa.onDemandCmd.Start() + if err != nil { + pa.p.log("ERR: %s", err) + } + } + + client.pathId = pa.id + client.state = clientStateWaitingDescription + return + + } else { + client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.id)} + return + } + } + + // publisher was found but is not ready: put the client on hold + if !pa.publisherReady { // start source if needed - if source, ok := p.publisher.(*source); ok && source.state == sourceStateStopped { + if source, ok := pa.publisher.(*source); ok && source.state == sourceStateStopped { source.log("starting on demand") + pa.lastActivation = time.Now() source.state = sourceStateRunning source.events <- sourceEventApplyState{source.state} } - return nil, true + client.pathId = pa.id + client.state = clientStateWaitingDescription + return } // publisher was found and is ready - return p.publisherSdpText, false + client.describeRes <- describeRes{pa.publisherSdpText, nil} } -func (p *path) publisherSetReady() { - p.publisherReady = true +func (pa *path) publisherRemove() { + for c := range pa.p.clients { + if c.state == clientStateWaitingDescription && + c.pathId == pa.id { + c.pathId = "" + c.state = clientStateInitial + c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' is not available anymore", pa.id)} + } + } + + pa.publisher = nil +} + +func (pa *path) publisherSetReady() { + pa.publisherReady = true // reply to all clients that are waiting for a description - for c := range p.p.clients { + for c := range pa.p.clients { if c.state == clientStateWaitingDescription && - c.path == p.id { - c.path = "" + c.pathId == pa.id { + c.pathId = "" c.state = clientStateInitial - c.describeRes <- p.publisherSdpText + c.describeRes <- describeRes{pa.publisherSdpText, nil} } } } -func (p *path) publisherSetNotReady() { - p.publisherReady = false +func (pa *path) publisherSetNotReady() { + pa.publisherReady = false // close all clients that are reading - for c := range p.p.clients { + for c := range pa.p.clients { if c.state != clientStateWaitingDescription && - c != p.publisher && - c.path == p.id { + c != pa.publisher && + c.pathId == pa.id { c.conn.NetConn().Close() } } } - -func (p *path) publisherReset() { - // reply to all clients that were waiting for a description - for oc := range p.p.clients { - if oc.state == clientStateWaitingDescription && - oc.path == p.id { - oc.path = "" - oc.state = clientStateInitial - oc.describeRes <- nil - } - } -} diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index 9c093934..a99bcd52 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -48,6 +48,11 @@ paths: # IPs or networks (x.x.x.x/24) allowed to read readIps: [] + # command to run when this path is requested. + # This can be used, for example, to publish a stream on demand. + # This is terminated with SIGINT when a client stops publishing. + runOnDemand: + # command to run when a client starts publishing. # This is terminated with SIGINT when a client stops publishing. runOnPublish: diff --git a/source.go b/source.go index 79a0e474..9aa7e7d0 100644 --- a/source.go +++ b/source.go @@ -39,8 +39,8 @@ func (sourceEventTerminate) isSourceEvent() {} type source struct { p *program - path string - pconf *confPath + pathId string + confp *confPath state sourceState tracks []*gortsplib.Track @@ -48,16 +48,16 @@ type source struct { done chan struct{} } -func newSource(p *program, path string, pconf *confPath) *source { +func newSource(p *program, pathId string, confp *confPath) *source { s := &source{ p: p, - path: path, - pconf: pconf, + pathId: pathId, + confp: confp, events: make(chan sourceEvent), done: make(chan struct{}), } - if pconf.SourceOnDemand { + if confp.SourceOnDemand { s.state = sourceStateStopped } else { s.state = sourceStateRunning @@ -67,7 +67,7 @@ func newSource(p *program, path string, pconf *confPath) *source { } func (s *source) log(format string, args ...interface{}) { - s.p.log("[source "+s.path+"] "+format, args...) + s.p.log("[source "+s.pathId+"] "+format, args...) } func (s *source) isPublisher() {} @@ -121,23 +121,24 @@ func (s *source) do(terminate chan struct{}, done chan struct{}) { defer close(done) for { - ok := s.doInner(terminate) - if !ok { - break - } + ok := func() bool { + ok := s.doInner(terminate) + if !ok { + return false + } - s.p.events <- programEventSourceReset{s} - - if !func() bool { t := time.NewTimer(sourceRetryInterval) defer t.Stop() + select { case <-terminate: return false case <-t.C: - return true } - }() { + + return true + }() + if !ok { break } } @@ -151,7 +152,7 @@ func (s *source) doInner(terminate chan struct{}) bool { dialDone := make(chan struct{}) go func() { conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{ - Host: s.pconf.sourceUrl.Host, + Host: s.confp.sourceUrl.Host, ReadTimeout: s.p.conf.ReadTimeout, WriteTimeout: s.p.conf.WriteTimeout, }) @@ -171,13 +172,13 @@ func (s *source) doInner(terminate chan struct{}) bool { defer conn.Close() - _, err = conn.Options(s.pconf.sourceUrl) + _, err = conn.Options(s.confp.sourceUrl) if err != nil { s.log("ERR: %s", err) return true } - tracks, _, err := conn.Describe(s.pconf.sourceUrl) + tracks, _, err := conn.Describe(s.confp.sourceUrl) if err != nil { s.log("ERR: %s", err) return true @@ -187,10 +188,10 @@ func (s *source) doInner(terminate chan struct{}) bool { serverSdpParsed, serverSdpText := sdpForServer(tracks) s.tracks = tracks - s.p.paths[s.path].publisherSdpText = serverSdpText - s.p.paths[s.path].publisherSdpParsed = serverSdpParsed + s.p.paths[s.pathId].publisherSdpText = serverSdpText + s.p.paths[s.pathId].publisherSdpParsed = serverSdpParsed - if s.pconf.sourceProtocolParsed == gortsplib.StreamProtocolUdp { + if s.confp.sourceProtocolParsed == gortsplib.StreamProtocolUdp { return s.runUdp(terminate, conn) } else { return s.runTcp(terminate, conn) @@ -215,7 +216,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000 rtcpPort := rtpPort + 1 - rtpl, rtcpl, _, err = conn.SetupUdp(s.pconf.sourceUrl, track, rtpPort, rtcpPort) + rtpl, rtcpl, _, err = conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort) if err != nil { // retry if it's a bind error if nerr, ok := err.(*net.OpError); ok { @@ -239,7 +240,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo }) } - _, err := conn.Play(s.pconf.sourceUrl) + _, err := conn.Play(s.confp.sourceUrl) if err != nil { s.log("ERR: %s", err) return true @@ -289,7 +290,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo tcpConnDone := make(chan error) go func() { - tcpConnDone <- conn.LoopUDP(s.pconf.sourceUrl) + tcpConnDone <- conn.LoopUDP(s.confp.sourceUrl) }() var ret bool @@ -323,14 +324,14 @@ outer: func (s *source) runTcp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { for _, track := range s.tracks { - _, err := conn.SetupTcp(s.pconf.sourceUrl, track) + _, err := conn.SetupTcp(s.confp.sourceUrl, track) if err != nil { s.log("ERR: %s", err) return true } } - _, err := conn.Play(s.pconf.sourceUrl) + _, err := conn.Play(s.confp.sourceUrl) if err != nil { s.log("ERR: %s", err) return true diff --git a/utils.go b/utils.go index f9895f40..75f6fcad 100644 --- a/utils.go +++ b/utils.go @@ -74,6 +74,7 @@ func (db *doubleBuffer) swap() []byte { return ret } +// generate a sdp from scratch func sdpForServer(tracks []*gortsplib.Track) (*sdp.SessionDescription, []byte) { sout := &sdp.SessionDescription{ SessionName: func() *sdp.SessionName {