server*: prevent crash that happens in case of socket errors, instead print errors (#193)

This commit is contained in:
aler9 2021-03-19 13:13:38 +01:00
parent 201fcd4b34
commit 3605b6ed2a
2 changed files with 41 additions and 15 deletions

View File

@ -4,6 +4,7 @@ import (
"net"
"strconv"
"sync"
"sync/atomic"
"github.com/notedit/rtmp/format/rtmp"
@ -18,9 +19,12 @@ type Parent interface {
// Server is a RTMP listener.
type Server struct {
l net.Listener
srv *rtmp.Server
wg sync.WaitGroup
parent Parent
l net.Listener
srv *rtmp.Server
closed uint32
wg sync.WaitGroup
accept chan *rtmputils.Conn
}
@ -38,6 +42,7 @@ func New(
}
s := &Server{
parent: parent,
l: l,
accept: make(chan *rtmputils.Conn),
}
@ -45,7 +50,7 @@ func New(
s.srv = rtmp.NewServer()
s.srv.HandleConn = s.innerHandleConn
parent.Log(logger.Info, "[RTMP listener] opened on %s", address)
s.log(logger.Info, "opened on %s", address)
s.wg.Add(1)
go s.run()
@ -53,6 +58,10 @@ func New(
return s, nil
}
func (s *Server) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[RTMP listener] "+format, append([]interface{}{}, args...)...)
}
// Close closes a Server.
func (s *Server) Close() {
go func() {
@ -60,6 +69,7 @@ func (s *Server) Close() {
co.NetConn().Close()
}
}()
atomic.StoreUint32(&s.closed, 1)
s.l.Close()
s.wg.Wait()
close(s.accept)
@ -71,7 +81,11 @@ func (s *Server) run() {
for {
nconn, err := s.l.Accept()
if err != nil {
break
if atomic.LoadUint32(&s.closed) == 1 {
break
}
s.log(logger.Warn, "ERR: %s", err)
continue
}
s.wg.Add(1)

View File

@ -3,6 +3,7 @@ package serverrtsp
import (
"crypto/tls"
"strconv"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
@ -17,9 +18,11 @@ type Parent interface {
// Server is a RTSP listener.
type Server struct {
useTLS bool
parent Parent
srv *gortsplib.Server
srv *gortsplib.Server
closed uint32
// out
accept chan *gortsplib.ServerConn
@ -70,6 +73,7 @@ func New(
}
s := &Server{
useTLS: useTLS,
parent: parent,
srv: srv,
accept: make(chan *gortsplib.ServerConn),
@ -84,19 +88,23 @@ func New(
parent.Log(logger.Info, "[RTSP/UDP/RTCP listener] opened on %s", conf.UDPRTCPAddress)
}
label := func() string {
if conf.TLSConfig != nil {
return "RTSP/TLS"
}
return "RTSP/TCP"
}()
parent.Log(logger.Info, "[%s listener] opened on %s", label, address)
s.log(logger.Info, "opened on %s", address)
go s.run()
return s, nil
}
func (s *Server) log(level logger.Level, format string, args ...interface{}) {
label := func() string {
if s.useTLS {
return "RTSP/TLS"
}
return "RTSP/TCP"
}()
s.parent.Log(level, "[%s listener] "+format, append([]interface{}{label}, args...)...)
}
// Close closes a Server.
func (s *Server) Close() {
go func() {
@ -104,7 +112,7 @@ func (s *Server) Close() {
co.Close()
}
}()
atomic.StoreUint32(&s.closed, 1)
s.srv.Close()
<-s.done
}
@ -115,7 +123,11 @@ func (s *Server) run() {
for {
conn, err := s.srv.Accept()
if err != nil {
break
if atomic.LoadUint32(&s.closed) == 1 {
break
}
s.log(logger.Warn, "ERR: %s", err)
continue
}
s.accept <- conn