diff --git a/internal/serverrtmp/server.go b/internal/serverrtmp/server.go index 4ed6fd46..48a88e8d 100644 --- a/internal/serverrtmp/server.go +++ b/internal/serverrtmp/server.go @@ -4,6 +4,7 @@ import ( "net" "strconv" "sync" + "sync/atomic" "github.com/notedit/rtmp/format/rtmp" @@ -18,9 +19,12 @@ type Parent interface { // Server is a RTMP listener. type Server struct { - l net.Listener - srv *rtmp.Server - wg sync.WaitGroup + parent Parent + + l net.Listener + srv *rtmp.Server + closed uint32 + wg sync.WaitGroup accept chan *rtmputils.Conn } @@ -38,6 +42,7 @@ func New( } s := &Server{ + parent: parent, l: l, accept: make(chan *rtmputils.Conn), } @@ -45,7 +50,7 @@ func New( s.srv = rtmp.NewServer() s.srv.HandleConn = s.innerHandleConn - parent.Log(logger.Info, "[RTMP listener] opened on %s", address) + s.log(logger.Info, "opened on %s", address) s.wg.Add(1) go s.run() @@ -53,6 +58,10 @@ func New( return s, nil } +func (s *Server) log(level logger.Level, format string, args ...interface{}) { + s.parent.Log(level, "[RTMP listener] "+format, append([]interface{}{}, args...)...) +} + // Close closes a Server. func (s *Server) Close() { go func() { @@ -60,6 +69,7 @@ func (s *Server) Close() { co.NetConn().Close() } }() + atomic.StoreUint32(&s.closed, 1) s.l.Close() s.wg.Wait() close(s.accept) @@ -71,7 +81,11 @@ func (s *Server) run() { for { nconn, err := s.l.Accept() if err != nil { - break + if atomic.LoadUint32(&s.closed) == 1 { + break + } + s.log(logger.Warn, "ERR: %s", err) + continue } s.wg.Add(1) diff --git a/internal/serverrtsp/server.go b/internal/serverrtsp/server.go index 0e0cfb7f..ad951e59 100644 --- a/internal/serverrtsp/server.go +++ b/internal/serverrtsp/server.go @@ -3,6 +3,7 @@ package serverrtsp import ( "crypto/tls" "strconv" + "sync/atomic" "time" "github.com/aler9/gortsplib" @@ -17,9 +18,11 @@ type Parent interface { // Server is a RTSP listener. type Server struct { + useTLS bool parent Parent - srv *gortsplib.Server + srv *gortsplib.Server + closed uint32 // out accept chan *gortsplib.ServerConn @@ -70,6 +73,7 @@ func New( } s := &Server{ + useTLS: useTLS, parent: parent, srv: srv, accept: make(chan *gortsplib.ServerConn), @@ -84,19 +88,23 @@ func New( parent.Log(logger.Info, "[RTSP/UDP/RTCP listener] opened on %s", conf.UDPRTCPAddress) } - label := func() string { - if conf.TLSConfig != nil { - return "RTSP/TLS" - } - return "RTSP/TCP" - }() - parent.Log(logger.Info, "[%s listener] opened on %s", label, address) + s.log(logger.Info, "opened on %s", address) go s.run() return s, nil } +func (s *Server) log(level logger.Level, format string, args ...interface{}) { + label := func() string { + if s.useTLS { + return "RTSP/TLS" + } + return "RTSP/TCP" + }() + s.parent.Log(level, "[%s listener] "+format, append([]interface{}{label}, args...)...) +} + // Close closes a Server. func (s *Server) Close() { go func() { @@ -104,7 +112,7 @@ func (s *Server) Close() { co.Close() } }() - + atomic.StoreUint32(&s.closed, 1) s.srv.Close() <-s.done } @@ -115,7 +123,11 @@ func (s *Server) run() { for { conn, err := s.srv.Accept() if err != nil { - break + if atomic.LoadUint32(&s.closed) == 1 { + break + } + s.log(logger.Warn, "ERR: %s", err) + continue } s.accept <- conn