diff --git a/conf.go b/conf.go index a39dfb17..730a5305 100644 --- a/conf.go +++ b/conf.go @@ -86,10 +86,10 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { for _, proto := range conf.Protocols { switch proto { case "udp": - conf.protocolsParsed[_STREAM_PROTOCOL_UDP] = struct{}{} + conf.protocolsParsed[streamProtocolUdp] = struct{}{} case "tcp": - conf.protocolsParsed[_STREAM_PROTOCOL_TCP] = struct{}{} + conf.protocolsParsed[streamProtocolTcp] = struct{}{} default: return nil, fmt.Errorf("unsupported protocol: %s", proto) diff --git a/go.mod b/go.mod index ee78d7bf..b3823fc6 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,7 @@ go 1.13 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0 - github.com/pion/rtcp v1.2.3 + github.com/aler9/gortsplib v0.0.0-20200713074216-643dce1dd787 github.com/pion/sdp v1.3.0 github.com/stretchr/testify v1.5.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index 7980594a..6aa05a35 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0 h1:9Ph5Zl7JkTIEXkot6Q3Acag+9klW9cpwI9navzSE0gs= -github.com/aler9/gortsplib v0.0.0-20200712204813-71824f42dca0/go.mod h1:VsK6bzyxOh2ymYRX/U7ZfM4fEsXKXd1ylL73c2eNzUA= +github.com/aler9/gortsplib v0.0.0-20200713074216-643dce1dd787 h1:6svRLsZW0bSOLSU/P3KjkLDJdU0KQjJkfc+Ttcd0veg= +github.com/aler9/gortsplib v0.0.0-20200713074216-643dce1dd787/go.mod h1:17dcA4Qak5TLqgun8OR0wnSbFQIg4cvYVSf1nbCt+qU= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= diff --git a/main.go b/main.go index c030e893..b781266f 100644 --- a/main.go +++ b/main.go @@ -24,12 +24,12 @@ type track struct { type streamProtocol int const ( - _STREAM_PROTOCOL_UDP streamProtocol = iota - _STREAM_PROTOCOL_TCP + streamProtocolUdp streamProtocol = iota + streamProtocolTcp ) func (s streamProtocol) String() string { - if s == _STREAM_PROTOCOL_UDP { + if s == streamProtocolUdp { return "udp" } return "tcp" @@ -292,10 +292,10 @@ outer: } switch evt.client.state { - case _CLIENT_STATE_PLAY: + case clientStatePlay: p.receiverCount -= 1 - case _CLIENT_STATE_RECORD: + case clientStateRecord: p.publisherCount -= 1 } @@ -319,7 +319,7 @@ outer: } evt.client.path = evt.path - evt.client.state = _CLIENT_STATE_ANNOUNCE + evt.client.state = clientStateAnnounce p.publishers[evt.path] = evt.client evt.res <- nil @@ -343,7 +343,7 @@ outer: rtpPort: evt.rtpPort, rtcpPort: evt.rtcpPort, }) - evt.client.state = _CLIENT_STATE_PRE_PLAY + evt.client.state = clientStatePrePlay evt.res <- nil case programEventClientSetupRecord: @@ -352,7 +352,7 @@ outer: rtpPort: evt.rtpPort, rtcpPort: evt.rtcpPort, }) - evt.client.state = _CLIENT_STATE_PRE_RECORD + evt.client.state = clientStatePreRecord evt.res <- nil case programEventClientPlay1: @@ -373,12 +373,12 @@ outer: case programEventClientPlay2: p.receiverCount += 1 - evt.client.state = _CLIENT_STATE_PLAY + evt.client.state = clientStatePlay evt.res <- nil case programEventClientRecord: p.publisherCount += 1 - evt.client.state = _CLIENT_STATE_RECORD + evt.client.state = clientStateRecord evt.res <- nil case programEventClientFrameUdp: @@ -387,7 +387,7 @@ outer: continue } - client.rtcpReceivers[trackId].onFrame(evt.streamType, evt.buf) + client.RtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf) p.forwardFrame(client.path, trackId, evt.streamType, evt.buf) case programEventClientFrameTcp: @@ -476,8 +476,8 @@ func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamTy continue } - if cl.streamProtocol != _STREAM_PROTOCOL_UDP || - cl.state != _CLIENT_STATE_RECORD || + if cl.streamProtocol != streamProtocolUdp || + cl.state != clientStateRecord || !cl.ip().Equal(addr.IP) { continue } @@ -499,8 +499,8 @@ func (p *program) findPublisher(addr *net.UDPAddr, streamType gortsplib.StreamTy func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) { for client := range p.clients { - if client.path == path && client.state == _CLIENT_STATE_PLAY { - if client.streamProtocol == _STREAM_PROTOCOL_UDP { + if client.path == path && client.state == clientStatePlay { + if client.streamProtocol == streamProtocolUdp { if streamType == gortsplib.StreamTypeRtp { p.rtpl.write(&udpAddrBufPair{ addr: &net.UDPAddr{ @@ -522,16 +522,15 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St } } else { - channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, streamType) - buf := client.writeBuf.swap() buf = buf[:len(frame)] copy(buf, frame) client.events <- serverClientEventFrameTcp{ frame: &gortsplib.InterleavedFrame{ - Channel: channel, - Content: buf, + TrackId: trackId, + StreamType: streamType, + Content: buf, }, } } diff --git a/server-client.go b/server-client.go index a3144cd0..e705ab22 100644 --- a/server-client.go +++ b/server-client.go @@ -14,8 +14,8 @@ import ( ) const ( - _CLIENT_CHECK_STREAM_INTERVAL = 5 * time.Second - _CLIENT_RECEIVER_REPORT_INTERVAL = 10 * time.Second + clientCheckStreamInterval = 5 * time.Second + clientReceiverReportInterval = 10 * time.Second ) type serverClientEvent interface { @@ -31,32 +31,32 @@ func (serverClientEventFrameTcp) isServerClientEvent() {} type serverClientState int const ( - _CLIENT_STATE_STARTING serverClientState = iota - _CLIENT_STATE_ANNOUNCE - _CLIENT_STATE_PRE_PLAY - _CLIENT_STATE_PLAY - _CLIENT_STATE_PRE_RECORD - _CLIENT_STATE_RECORD + clientStateStarting serverClientState = iota + clientStateAnnounce + clientStatePrePlay + clientStatePlay + clientStatePreRecord + clientStateRecord ) func (cs serverClientState) String() string { switch cs { - case _CLIENT_STATE_STARTING: + case clientStateStarting: return "STARTING" - case _CLIENT_STATE_ANNOUNCE: + case clientStateAnnounce: return "ANNOUNCE" - case _CLIENT_STATE_PRE_PLAY: + case clientStatePrePlay: return "PRE_PLAY" - case _CLIENT_STATE_PLAY: + case clientStatePlay: return "PLAY" - case _CLIENT_STATE_PRE_RECORD: + case clientStatePreRecord: return "PRE_RECORD" - case _CLIENT_STATE_RECORD: + case clientStateRecord: return "RECORD" } return "UNKNOWN" @@ -75,7 +75,7 @@ type serverClient struct { streamSdpParsed *sdp.SessionDescription // only if publisher streamProtocol streamProtocol streamTracks []*track - rtcpReceivers []*rtcpReceiver + RtcpReceivers []*gortsplib.RtcpReceiver readBuf *doubleBuffer writeBuf *doubleBuffer @@ -91,7 +91,7 @@ func newServerClient(p *program, nconn net.Conn) *serverClient { ReadTimeout: p.conf.ReadTimeout, WriteTimeout: p.conf.WriteTimeout, }), - state: _CLIENT_STATE_STARTING, + state: clientStateStarting, readBuf: newDoubleBuffer(512 * 1024), done: make(chan struct{}), } @@ -113,7 +113,7 @@ func (c *serverClient) zone() string { } func (c *serverClient) publisherIsReady() bool { - return c.state == _CLIENT_STATE_RECORD + return c.state == clientStateRecord } func (c *serverClient) publisherSdpText() []byte { @@ -136,13 +136,13 @@ func (c *serverClient) run() { outer: for { switch c.state { - case _CLIENT_STATE_PLAY: + case clientStatePlay: ok := c.runPlay() if !ok { break outer } - case _CLIENT_STATE_RECORD: + case clientStateRecord: ok := c.runRecord() if !ok { break outer @@ -210,7 +210,7 @@ outer: } func (c *serverClient) runPlay() bool { - if c.streamProtocol == _STREAM_PROTOCOL_TCP { + if c.streamProtocol == streamProtocolTcp { readDone := make(chan error) go func() { buf := make([]byte, 2048) @@ -276,7 +276,7 @@ func (c *serverClient) runPlay() bool { } func (c *serverClient) runRecord() bool { - if c.streamProtocol == _STREAM_PROTOCOL_TCP { + if c.streamProtocol == streamProtocolTcp { frame := &gortsplib.InterleavedFrame{} readDone := make(chan error) @@ -292,18 +292,17 @@ func (c *serverClient) runRecord() bool { switch recvt := recv.(type) { case *gortsplib.InterleavedFrame: - trackId, streamType := gortsplib.ConvChannelToTrackIdAndStreamType(frame.Channel) - if trackId >= len(c.streamTracks) { - c.log("ERR: invalid track id '%d'", trackId) + if frame.TrackId >= len(c.streamTracks) { + c.log("ERR: invalid track id '%d'", frame.TrackId) readDone <- nil break } - c.rtcpReceivers[trackId].onFrame(streamType, frame.Content) + c.RtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.p.events <- programEventClientFrameTcp{ c.path, - trackId, - streamType, + frame.TrackId, + frame.StreamType, frame.Content, } @@ -317,8 +316,8 @@ func (c *serverClient) runRecord() bool { } }() - checkStreamTicker := time.NewTicker(_CLIENT_CHECK_STREAM_INTERVAL) - receiverReportTicker := time.NewTicker(_CLIENT_RECEIVER_REPORT_INTERVAL) + checkStreamTicker := time.NewTicker(clientCheckStreamInterval) + receiverReportTicker := time.NewTicker(clientReceiverReportInterval) outer1: for { @@ -331,7 +330,7 @@ func (c *serverClient) runRecord() bool { case <-checkStreamTicker.C: for trackId := range c.streamTracks { - if time.Since(c.rtcpReceivers[trackId].lastFrameTime()) >= c.p.conf.StreamDeadAfter { + if time.Since(c.RtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter { c.log("ERR: stream is dead") c.conn.NetConn().Close() <-readDone @@ -341,12 +340,11 @@ func (c *serverClient) runRecord() bool { case <-receiverReportTicker.C: for trackId := range c.streamTracks { - channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, gortsplib.StreamTypeRtcp) - - frame := c.rtcpReceivers[trackId].report() + frame := c.RtcpReceivers[trackId].Report() c.conn.WriteFrame(&gortsplib.InterleavedFrame{ - Channel: channel, - Content: frame, + TrackId: trackId, + StreamType: gortsplib.StreamTypeRtcp, + Content: frame, }) } } @@ -373,8 +371,8 @@ func (c *serverClient) runRecord() bool { } }() - checkStreamTicker := time.NewTicker(_CLIENT_CHECK_STREAM_INTERVAL) - receiverReportTicker := time.NewTicker(_CLIENT_RECEIVER_REPORT_INTERVAL) + checkStreamTicker := time.NewTicker(clientCheckStreamInterval) + receiverReportTicker := time.NewTicker(clientReceiverReportInterval) outer2: for { @@ -387,7 +385,7 @@ func (c *serverClient) runRecord() bool { case <-checkStreamTicker.C: for trackId := range c.streamTracks { - if time.Since(c.rtcpReceivers[trackId].lastFrameTime()) >= c.p.conf.StreamDeadAfter { + if time.Since(c.RtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.StreamDeadAfter { c.log("ERR: stream is dead") c.conn.NetConn().Close() <-readDone @@ -397,7 +395,7 @@ func (c *serverClient) runRecord() bool { case <-receiverReportTicker.C: for trackId := range c.streamTracks { - frame := c.rtcpReceivers[trackId].report() + frame := c.RtcpReceivers[trackId].Report() c.p.rtcpl.writeChan <- &udpAddrBufPair{ addr: &net.UDPAddr{ IP: c.ip(), @@ -419,7 +417,7 @@ func (c *serverClient) runRecord() bool { <-done for trackId := range c.streamTracks { - c.rtcpReceivers[trackId].close() + c.RtcpReceivers[trackId].Close() } return false @@ -584,9 +582,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { return nil case gortsplib.DESCRIBE: - if c.state != _CLIENT_STATE_STARTING { + if c.state != clientStateStarting { c.writeResError(req, gortsplib.StatusBadRequest, - fmt.Errorf("client is in state '%s' instead of '%s'", c.state, _CLIENT_STATE_STARTING)) + fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting)) return errClientTerminate } @@ -625,9 +623,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { return nil case gortsplib.ANNOUNCE: - if c.state != _CLIENT_STATE_STARTING { + if c.state != clientStateStarting { c.writeResError(req, gortsplib.StatusBadRequest, - fmt.Errorf("client is in state '%s' instead of '%s'", c.state, _CLIENT_STATE_STARTING)) + fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting)) return errClientTerminate } @@ -709,7 +707,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { switch c.state { // play - case _CLIENT_STATE_STARTING, _CLIENT_STATE_PRE_PLAY: + case clientStateStarting, clientStatePrePlay: pconf := c.findConfForPath(path) if pconf == nil { c.writeResError(req, gortsplib.StatusBadRequest, @@ -737,7 +735,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { } return false }() { - if _, ok := c.p.conf.protocolsParsed[_STREAM_PROTOCOL_UDP]; !ok { + if _, ok := c.p.conf.protocolsParsed[streamProtocolUdp]; !ok { c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) return errClientTerminate } @@ -753,13 +751,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { return errClientTerminate } - if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { + if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolUdp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) return errClientTerminate } res := make(chan error) - c.p.events <- programEventClientSetupPlay{res, c, path, _STREAM_PROTOCOL_UDP, rtpPort, rtcpPort} + c.p.events <- programEventClientSetupPlay{res, c, path, streamProtocolUdp, rtpPort, rtcpPort} err = <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) @@ -783,7 +781,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { // play via TCP } else if _, ok := th["RTP/AVP/TCP"]; ok { - if _, ok := c.p.conf.protocolsParsed[_STREAM_PROTOCOL_TCP]; !ok { + if _, ok := c.p.conf.protocolsParsed[streamProtocolTcp]; !ok { c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) return errClientTerminate } @@ -793,13 +791,13 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { return errClientTerminate } - if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { + if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolTcp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) return errClientTerminate } res := make(chan error) - c.p.events <- programEventClientSetupPlay{res, c, path, _STREAM_PROTOCOL_TCP, 0, 0} + c.p.events <- programEventClientSetupPlay{res, c, path, streamProtocolTcp, 0, 0} err = <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) @@ -828,7 +826,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { } // record - case _CLIENT_STATE_ANNOUNCE, _CLIENT_STATE_PRE_RECORD: + case clientStateAnnounce, clientStatePreRecord: if _, ok := th["mode=record"]; !ok { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record")) return errClientTerminate @@ -852,7 +850,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { } return false }() { - if _, ok := c.p.conf.protocolsParsed[_STREAM_PROTOCOL_UDP]; !ok { + if _, ok := c.p.conf.protocolsParsed[streamProtocolUdp]; !ok { c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) return errClientTerminate } @@ -863,7 +861,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { return errClientTerminate } - if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { + if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolUdp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) return errClientTerminate } @@ -874,7 +872,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { } res := make(chan error) - c.p.events <- programEventClientSetupRecord{res, c, _STREAM_PROTOCOL_UDP, rtpPort, rtcpPort} + c.p.events <- programEventClientSetupRecord{res, c, streamProtocolUdp, rtpPort, rtcpPort} err := <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) @@ -898,12 +896,12 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { // record via TCP } else if _, ok := th["RTP/AVP/TCP"]; ok { - if _, ok := c.p.conf.protocolsParsed[_STREAM_PROTOCOL_TCP]; !ok { + if _, ok := c.p.conf.protocolsParsed[streamProtocolTcp]; !ok { c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) return errClientTerminate } - if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { + if len(c.streamTracks) > 0 && c.streamProtocol != streamProtocolTcp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) return errClientTerminate } @@ -926,7 +924,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { } res := make(chan error) - c.p.events <- programEventClientSetupRecord{res, c, _STREAM_PROTOCOL_TCP, 0, 0} + c.p.events <- programEventClientSetupRecord{res, c, streamProtocolTcp, 0, 0} err := <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) @@ -958,9 +956,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { } case gortsplib.PLAY: - if c.state != _CLIENT_STATE_PRE_PLAY { + if c.state != clientStatePrePlay { c.writeResError(req, gortsplib.StatusBadRequest, - fmt.Errorf("client is in state '%s' instead of '%s'", c.state, _CLIENT_STATE_PRE_PLAY)) + fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePrePlay)) return errClientTerminate } @@ -989,7 +987,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { }, }) - if c.streamProtocol == _STREAM_PROTOCOL_TCP { + if c.streamProtocol == streamProtocolTcp { c.writeBuf = newDoubleBuffer(2048) c.events = make(chan serverClientEvent) } @@ -1009,9 +1007,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { return errClientChangeRunMode case gortsplib.RECORD: - if c.state != _CLIENT_STATE_PRE_RECORD { + if c.state != clientStatePreRecord { c.writeResError(req, gortsplib.StatusBadRequest, - fmt.Errorf("client is in state '%s' instead of '%s'", c.state, _CLIENT_STATE_PRE_RECORD)) + fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePreRecord)) return errClientTerminate } @@ -1033,9 +1031,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) error { }, }) - c.rtcpReceivers = make([]*rtcpReceiver, len(c.streamTracks)) + c.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks)) for trackId := range c.streamTracks { - c.rtcpReceivers[trackId] = newRtcpReceiver() + c.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } res := make(chan error) diff --git a/streamer-udpl.go b/streamer-udpl.go index 910f9640..ad5ae2ac 100644 --- a/streamer-udpl.go +++ b/streamer-udpl.go @@ -80,7 +80,7 @@ func (l *streamerUdpListener) run() { continue } - l.streamer.rtcpReceivers[l.trackId].onFrame(l.streamType, buf[:n]) + l.streamer.RtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.streamType, buf[:n]} } diff --git a/streamer.go b/streamer.go index 2fcdf381..a53099be 100644 --- a/streamer.go +++ b/streamer.go @@ -12,10 +12,10 @@ import ( ) const ( - _STREAMER_RETRY_INTERVAL = 5 * time.Second - _STREAMER_CHECK_STREAM_INTERVAL = 5 * time.Second - _STREAMER_KEEPALIVE_INTERVAL = 60 * time.Second - _STREAMER_RECEIVER_REPORT_INTERVAL = 10 * time.Second + streamerRetryInterval = 5 * time.Second + streamerCheckStreamInterval = 5 * time.Second + streamerKeepaliveInterval = 60 * time.Second + streamerReceiverReportInterval = 10 * time.Second ) type streamerUdpListenerPair struct { @@ -32,7 +32,7 @@ type streamer struct { clientSdpParsed *sdp.SessionDescription serverSdpText []byte serverSdpParsed *sdp.SessionDescription - rtcpReceivers []*rtcpReceiver + RtcpReceivers []*gortsplib.RtcpReceiver readBuf *doubleBuffer terminate chan struct{} @@ -62,10 +62,10 @@ func newStreamer(p *program, path string, source string, sourceProtocol string) proto, err := func() (streamProtocol, error) { switch sourceProtocol { case "udp": - return _STREAM_PROTOCOL_UDP, nil + return streamProtocolUdp, nil case "tcp": - return _STREAM_PROTOCOL_TCP, nil + return streamProtocolTcp, nil } return streamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol) }() @@ -109,7 +109,7 @@ func (s *streamer) run() { break } - t := time.NewTimer(_STREAMER_RETRY_INTERVAL) + t := time.NewTimer(streamerRetryInterval) select { case <-s.terminate: break @@ -143,15 +143,11 @@ func (s *streamer) do() bool { } defer nconn.Close() - conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{ + conn := gortsplib.NewConnClient(gortsplib.ConnClientConf{ Conn: nconn, ReadTimeout: s.p.conf.ReadTimeout, WriteTimeout: s.p.conf.WriteTimeout, }) - if err != nil { - s.log("ERR: %s", err) - return true - } _, err = conn.Options(s.u) if err != nil { @@ -172,7 +168,7 @@ func (s *streamer) do() bool { s.serverSdpText = serverSdpText s.serverSdpParsed = serverSdpParsed - if s.proto == _STREAM_PROTOCOL_UDP { + if s.proto == streamProtocolUdp { return s.runUdp(conn) } else { return s.runTcp(conn) @@ -244,9 +240,9 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { return true } - s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.MediaDescriptions)) + s.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.clientSdpParsed.MediaDescriptions)) for trackId := range s.clientSdpParsed.MediaDescriptions { - s.rtcpReceivers[trackId] = newRtcpReceiver() + s.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } for _, pair := range streamerUdpListenerPairs { @@ -254,9 +250,9 @@ func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { pair.rtcpl.start() } - sendKeepaliveTicker := time.NewTicker(_STREAMER_KEEPALIVE_INTERVAL) - checkStreamTicker := time.NewTicker(_STREAMER_CHECK_STREAM_INTERVAL) - receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL) + sendKeepaliveTicker := time.NewTicker(streamerKeepaliveInterval) + checkStreamTicker := time.NewTicker(streamerCheckStreamInterval) + receiverReportTicker := time.NewTicker(streamerReceiverReportInterval) s.p.events <- programEventStreamerReady{s} @@ -270,14 +266,7 @@ outer: break outer case <-sendKeepaliveTicker.C: - _, err = conn.Do(&gortsplib.Request{ - Method: gortsplib.OPTIONS, - Url: &url.URL{ - Scheme: "rtsp", - Host: s.u.Host, - Path: "/", - }, - }) + _, err := conn.Options(s.u) if err != nil { s.log("ERR: %s", err) ret = true @@ -286,7 +275,7 @@ outer: case <-checkStreamTicker.C: for trackId := range s.clientSdpParsed.MediaDescriptions { - if time.Since(s.rtcpReceivers[trackId].lastFrameTime()) >= s.p.conf.StreamDeadAfter { + if time.Since(s.RtcpReceivers[trackId].LastFrameTime()) >= s.p.conf.StreamDeadAfter { s.log("ERR: stream is dead") ret = true break outer @@ -295,7 +284,7 @@ outer: case <-receiverReportTicker.C: for trackId := range s.clientSdpParsed.MediaDescriptions { - frame := s.rtcpReceivers[trackId].report() + frame := s.RtcpReceivers[trackId].Report() streamerUdpListenerPairs[trackId].rtcpl.writeChan <- &udpAddrBufPair{ addr: &net.UDPAddr{ IP: conn.NetConn().RemoteAddr().(*net.TCPAddr).IP, @@ -320,7 +309,7 @@ outer: } for trackId := range s.clientSdpParsed.MediaDescriptions { - s.rtcpReceivers[trackId].close() + s.RtcpReceivers[trackId].Close() } return ret @@ -341,9 +330,9 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { return true } - s.rtcpReceivers = make([]*rtcpReceiver, len(s.clientSdpParsed.MediaDescriptions)) + s.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(s.clientSdpParsed.MediaDescriptions)) for trackId := range s.clientSdpParsed.MediaDescriptions { - s.rtcpReceivers[trackId] = newRtcpReceiver() + s.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } s.p.events <- programEventStreamerReady{s} @@ -363,16 +352,14 @@ func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { break } - trackId, streamType := gortsplib.ConvChannelToTrackIdAndStreamType(frame.Channel) - - s.rtcpReceivers[trackId].onFrame(streamType, frame.Content) - s.p.events <- programEventStreamerFrame{s, trackId, streamType, frame.Content} + s.RtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) + s.p.events <- programEventStreamerFrame{s, frame.TrackId, frame.StreamType, frame.Content} } }() // a ticker to check the stream is not needed since there's already a deadline // on the RTSP reads - receiverReportTicker := time.NewTicker(_STREAMER_RECEIVER_REPORT_INTERVAL) + receiverReportTicker := time.NewTicker(streamerReceiverReportInterval) var ret bool @@ -389,13 +376,12 @@ outer: case <-receiverReportTicker.C: for trackId := range s.clientSdpParsed.MediaDescriptions { - frame := s.rtcpReceivers[trackId].report() - - channel := gortsplib.ConvTrackIdAndStreamTypeToChannel(trackId, gortsplib.StreamTypeRtcp) + frame := s.RtcpReceivers[trackId].Report() conn.WriteFrame(&gortsplib.InterleavedFrame{ - Channel: channel, - Content: frame, + TrackId: trackId, + StreamType: gortsplib.StreamTypeRtcp, + Content: frame, }) } } @@ -406,7 +392,7 @@ outer: s.p.events <- programEventStreamerNotReady{s} for trackId := range s.clientSdpParsed.MediaDescriptions { - s.rtcpReceivers[trackId].close() + s.RtcpReceivers[trackId].Close() } return ret diff --git a/utils.go b/utils.go index 6573d283..7bd2e619 100644 --- a/utils.go +++ b/utils.go @@ -2,13 +2,9 @@ package main import ( "fmt" - "math/rand" "net" "strconv" - "time" - "github.com/aler9/gortsplib" - "github.com/pion/rtcp" "github.com/pion/sdp" ) @@ -77,142 +73,6 @@ func (db *doubleBuffer) swap() []byte { return ret } -type rtcpReceiverEvent interface { - isRtpReceiverEvent() -} - -type rtcpReceiverEventFrameRtp struct { - sequenceNumber uint16 -} - -func (rtcpReceiverEventFrameRtp) isRtpReceiverEvent() {} - -type rtcpReceiverEventFrameRtcp struct { - ssrc uint32 - ntpTimeMiddle uint32 -} - -func (rtcpReceiverEventFrameRtcp) isRtpReceiverEvent() {} - -type rtcpReceiverEventLastFrameTime struct { - res chan time.Time -} - -func (rtcpReceiverEventLastFrameTime) isRtpReceiverEvent() {} - -type rtcpReceiverEventReport struct { - res chan []byte -} - -func (rtcpReceiverEventReport) isRtpReceiverEvent() {} - -type rtcpReceiverEventTerminate struct{} - -func (rtcpReceiverEventTerminate) isRtpReceiverEvent() {} - -type rtcpReceiver struct { - events chan rtcpReceiverEvent - done chan struct{} -} - -func newRtcpReceiver() *rtcpReceiver { - rr := &rtcpReceiver{ - events: make(chan rtcpReceiverEvent), - done: make(chan struct{}), - } - go rr.run() - return rr -} - -func (rr *rtcpReceiver) run() { - lastFrameTime := time.Now() - publisherSSRC := uint32(0) - receiverSSRC := rand.Uint32() - sequenceNumberCycles := uint16(0) - lastSequenceNumber := uint16(0) - lastSenderReport := uint32(0) - -outer: - for rawEvt := range rr.events { - switch evt := rawEvt.(type) { - case rtcpReceiverEventFrameRtp: - if evt.sequenceNumber < lastSequenceNumber { - sequenceNumberCycles += 1 - } - lastSequenceNumber = evt.sequenceNumber - lastFrameTime = time.Now() - - case rtcpReceiverEventFrameRtcp: - publisherSSRC = evt.ssrc - lastSenderReport = evt.ntpTimeMiddle - - case rtcpReceiverEventLastFrameTime: - evt.res <- lastFrameTime - - case rtcpReceiverEventReport: - rr := &rtcp.ReceiverReport{ - SSRC: receiverSSRC, - Reports: []rtcp.ReceptionReport{ - { - SSRC: publisherSSRC, - LastSequenceNumber: uint32(sequenceNumberCycles)<<8 | uint32(lastSequenceNumber), - LastSenderReport: lastSenderReport, - }, - }, - } - frame, _ := rr.Marshal() - evt.res <- frame - - case rtcpReceiverEventTerminate: - break outer - } - } - - close(rr.events) - - close(rr.done) -} - -func (rr *rtcpReceiver) close() { - rr.events <- rtcpReceiverEventTerminate{} - <-rr.done -} - -func (rr *rtcpReceiver) onFrame(streamType gortsplib.StreamType, buf []byte) { - if streamType == gortsplib.StreamTypeRtp { - // extract sequence number of first frame - if len(buf) >= 3 { - sequenceNumber := uint16(uint16(buf[2])<<8 | uint16(buf[1])) - rr.events <- rtcpReceiverEventFrameRtp{sequenceNumber} - } - - } else { - frames, err := rtcp.Unmarshal(buf) - if err == nil { - for _, frame := range frames { - if senderReport, ok := (frame).(*rtcp.SenderReport); ok { - rr.events <- rtcpReceiverEventFrameRtcp{ - senderReport.SSRC, - uint32(senderReport.NTPTime >> 16), - } - } - } - } - } -} - -func (rr *rtcpReceiver) lastFrameTime() time.Time { - res := make(chan time.Time) - rr.events <- rtcpReceiverEventLastFrameTime{res} - return <-res -} - -func (rr *rtcpReceiver) report() []byte { - res := make(chan []byte) - rr.events <- rtcpReceiverEventReport{res} - return <-res -} - func sdpForServer(sin *sdp.SessionDescription) (*sdp.SessionDescription, []byte) { sout := &sdp.SessionDescription{ SessionName: "Stream",