diff --git a/internal/clienthls/client.go b/internal/clienthls/client.go index 45c74bc6..f059af0e 100644 --- a/internal/clienthls/client.go +++ b/internal/clienthls/client.go @@ -3,6 +3,7 @@ package clienthls import ( "bytes" "fmt" + "io" "net" "net/http" "strconv" @@ -20,7 +21,6 @@ import ( "github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/h264" "github.com/aler9/rtsp-simple-server/internal/logger" - "github.com/aler9/rtsp-simple-server/internal/serverhls" "github.com/aler9/rtsp-simple-server/internal/stats" ) @@ -100,6 +100,15 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool { return false } +// Request is an HTTP request received by an HLS server. +type Request struct { + Path string + Subpath string + Req *http.Request + W http.ResponseWriter + Res chan io.Reader +} + type trackIDPayloadPair struct { trackID int buf []byte @@ -113,7 +122,7 @@ type PathMan interface { // Parent is implemented by clientman.ClientMan. type Parent interface { Log(logger.Level, string, ...interface{}) - OnClientClose(client.Client) + OnClientClose(*Client) } // Client is a HLS client. @@ -136,7 +145,7 @@ type Client struct { lastRequestTime int64 // in - request chan serverhls.Request + request chan Request terminate chan struct{} } @@ -162,12 +171,12 @@ func New( parent: parent, lastRequestTime: time.Now().Unix(), tsByName: make(map[string]*tsFile), - request: make(chan serverhls.Request), + request: make(chan Request), terminate: make(chan struct{}), } atomic.AddInt64(c.stats.CountClients, 1) - c.log(logger.Info, "connected (HLS)") + c.log(logger.Info, "connected") c.wg.Add(1) go c.run() @@ -178,6 +187,7 @@ func New( // Close closes a Client. func (c *Client) Close() { atomic.AddInt64(c.stats.CountClients, -1) + c.log(logger.Info, "disconnected") close(c.terminate) } @@ -193,7 +203,7 @@ func (c *Client) IsClient() {} func (c *Client) IsSource() {} func (c *Client) log(level logger.Level, format string, args ...interface{}) { - c.parent.Log(level, "[client hls/%s] "+format, append([]interface{}{c.pathName}, args...)...) + c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.pathName}, args...)...) } // PathName returns the path name of the client. @@ -203,7 +213,6 @@ func (c *Client) PathName() string { func (c *Client) run() { defer c.wg.Done() - defer c.log(logger.Info, "disconnected") var videoTrack *gortsplib.Track var h264SPS []byte @@ -586,7 +595,7 @@ func (c *Client) runRequestHandler(done chan struct{}) { } // OnRequest is called by clientman.ClientMan. -func (c *Client) OnRequest(req serverhls.Request) { +func (c *Client) OnRequest(req Request) { c.request <- req } diff --git a/internal/clientman/clientman.go b/internal/clientman/clientman.go deleted file mode 100644 index 8ab87dde..00000000 --- a/internal/clientman/clientman.go +++ /dev/null @@ -1,175 +0,0 @@ -package clientman - -import ( - "sync" - "time" - - "github.com/aler9/gortsplib/pkg/base" - - "github.com/aler9/rtsp-simple-server/internal/client" - "github.com/aler9/rtsp-simple-server/internal/clienthls" - "github.com/aler9/rtsp-simple-server/internal/logger" - "github.com/aler9/rtsp-simple-server/internal/serverhls" - "github.com/aler9/rtsp-simple-server/internal/stats" -) - -// PathManager is implemented by pathman.PathManager. -type PathManager interface { - OnClientDescribe(client.DescribeReq) - OnClientAnnounce(client.AnnounceReq) - OnClientSetupPlay(client.SetupPlayReq) -} - -// Parent is implemented by program. -type Parent interface { - Log(logger.Level, string, ...interface{}) -} - -// ClientManager is a client manager. -type ClientManager struct { - hlsSegmentCount int - hlsSegmentDuration time.Duration - rtspAddress string - readTimeout time.Duration - writeTimeout time.Duration - readBufferCount int - runOnConnect string - runOnConnectRestart bool - protocols map[base.StreamProtocol]struct{} - stats *stats.Stats - pathMan PathManager - serverHLS *serverhls.Server - parent Parent - - clients map[client.Client]struct{} - clientsByHLSPath map[string]*clienthls.Client - wg sync.WaitGroup - - // in - clientClose chan client.Client - terminate chan struct{} - - // out - done chan struct{} -} - -// New allocates a ClientManager. -func New( - hlsSegmentCount int, - hlsSegmentDuration time.Duration, - rtspAddress string, - readTimeout time.Duration, - writeTimeout time.Duration, - readBufferCount int, - runOnConnect string, - runOnConnectRestart bool, - protocols map[base.StreamProtocol]struct{}, - stats *stats.Stats, - pathMan PathManager, - serverHLS *serverhls.Server, - parent Parent) *ClientManager { - - cm := &ClientManager{ - hlsSegmentCount: hlsSegmentCount, - hlsSegmentDuration: hlsSegmentDuration, - rtspAddress: rtspAddress, - readTimeout: readTimeout, - writeTimeout: writeTimeout, - readBufferCount: readBufferCount, - runOnConnect: runOnConnect, - runOnConnectRestart: runOnConnectRestart, - protocols: protocols, - stats: stats, - pathMan: pathMan, - serverHLS: serverHLS, - parent: parent, - clients: make(map[client.Client]struct{}), - clientsByHLSPath: make(map[string]*clienthls.Client), - clientClose: make(chan client.Client), - terminate: make(chan struct{}), - done: make(chan struct{}), - } - - go cm.run() - - return cm -} - -// Close closes a ClientManager. -func (cm *ClientManager) Close() { - close(cm.terminate) - <-cm.done -} - -// Log is the main logging function. -func (cm *ClientManager) Log(level logger.Level, format string, args ...interface{}) { - cm.parent.Log(level, format, args...) -} - -func (cm *ClientManager) run() { - defer close(cm.done) - - hlsRequest := func() chan serverhls.Request { - if cm.serverHLS != nil { - return cm.serverHLS.Request() - } - return make(chan serverhls.Request) - }() - -outer: - for { - select { - case req := <-hlsRequest: - c, ok := cm.clientsByHLSPath[req.Path] - if !ok { - c = clienthls.New( - cm.hlsSegmentCount, - cm.hlsSegmentDuration, - cm.readBufferCount, - &cm.wg, - cm.stats, - req.Path, - cm.pathMan, - cm) - cm.clients[c] = struct{}{} - cm.clientsByHLSPath[req.Path] = c - } - c.OnRequest(req) - - case c := <-cm.clientClose: - if _, ok := cm.clients[c]; !ok { - continue - } - cm.onClientClose(c) - - case <-cm.terminate: - break outer - } - } - - go func() { - for range cm.clientClose { - } - }() - - for c := range cm.clients { - c.Close() - } - - cm.wg.Wait() - - close(cm.clientClose) -} - -func (cm *ClientManager) onClientClose(c client.Client) { - delete(cm.clients, c) - if hc, ok := c.(*clienthls.Client); ok { - delete(cm.clientsByHLSPath, hc.PathName()) - } - c.Close() -} - -// OnClientClose is called by a client. -func (cm *ClientManager) OnClientClose(c client.Client) { - cm.clientClose <- c -} diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index 5096173e..01f54384 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -129,7 +129,7 @@ func New( } atomic.AddInt64(c.stats.CountClients, 1) - c.log(logger.Info, "connected (RTMP)") + c.log(logger.Info, "connected") c.wg.Add(1) go c.run() @@ -140,6 +140,7 @@ func New( // Close closes a Client. func (c *Client) Close() { atomic.AddInt64(c.stats.CountClients, -1) + c.log(logger.Info, "disconnected") close(c.terminate) } @@ -164,7 +165,6 @@ func (c *Client) ip() net.IP { func (c *Client) run() { defer c.wg.Done() - defer c.log(logger.Info, "disconnected") if c.runOnConnect != "" { _, port, _ := net.SplitHostPort(c.rtspAddress) diff --git a/internal/serverhls/server.go b/internal/serverhls/server.go index a4178047..3291f9f0 100644 --- a/internal/serverhls/server.go +++ b/internal/serverhls/server.go @@ -6,19 +6,15 @@ import ( "net" "net/http" "strings" + "sync" + "time" + "github.com/aler9/rtsp-simple-server/internal/clienthls" "github.com/aler9/rtsp-simple-server/internal/logger" + "github.com/aler9/rtsp-simple-server/internal/pathman" + "github.com/aler9/rtsp-simple-server/internal/stats" ) -// Request is an HTTP request received by the HLS server. -type Request struct { - Path string - Subpath string - Req *http.Request - W http.ResponseWriter - Res chan io.Reader -} - // Parent is implemented by program. type Parent interface { Log(logger.Level, string, ...interface{}) @@ -26,18 +22,34 @@ type Parent interface { // Server is an HLS server. type Server struct { - parent Parent + hlsSegmentCount int + hlsSegmentDuration time.Duration + readBufferCount int + stats *stats.Stats + pathMan *pathman.PathManager + parent Parent - ln net.Listener - s *http.Server + ln net.Listener + wg sync.WaitGroup + clients map[string]*clienthls.Client + + // in + request chan clienthls.Request + clientClose chan *clienthls.Client + terminate chan struct{} // out - request chan Request + done chan struct{} } // New allocates a Server. func New( address string, + hlsSegmentCount int, + hlsSegmentDuration time.Duration, + readBufferCount int, + stats *stats.Stats, + pathMan *pathman.PathManager, parent Parent, ) (*Server, error) { @@ -47,40 +59,104 @@ func New( } s := &Server{ - parent: parent, - ln: ln, - request: make(chan Request), + hlsSegmentCount: hlsSegmentCount, + hlsSegmentDuration: hlsSegmentDuration, + readBufferCount: readBufferCount, + stats: stats, + pathMan: pathMan, + parent: parent, + ln: ln, + clients: make(map[string]*clienthls.Client), + request: make(chan clienthls.Request), + clientClose: make(chan *clienthls.Client), + terminate: make(chan struct{}), + done: make(chan struct{}), } - s.s = &http.Server{ - Handler: s, - } + s.Log(logger.Info, "listener opened on "+address) - s.log(logger.Info, "opened on "+address) - - go s.s.Serve(s.ln) + go s.run() return s, nil } -func (s *Server) log(level logger.Level, format string, args ...interface{}) { - s.parent.Log(level, "[HLS listener] "+format, append([]interface{}{}, args...)...) +// Log is the main logging function. +func (s *Server) Log(level logger.Level, format string, args ...interface{}) { + s.parent.Log(level, "[HLS] "+format, append([]interface{}{}, args...)...) } // Close closes all the server resources. func (s *Server) Close() { + close(s.terminate) + <-s.done +} + +func (s *Server) run() { + defer close(s.done) + + hs := &http.Server{Handler: s} + go hs.Serve(s.ln) + +outer: + for { + select { + case req := <-s.request: + c, ok := s.clients[req.Path] + if !ok { + c = clienthls.New( + s.hlsSegmentCount, + s.hlsSegmentDuration, + s.readBufferCount, + &s.wg, + s.stats, + req.Path, + s.pathMan, + s) + s.clients[req.Path] = c + } + c.OnRequest(req) + + case c := <-s.clientClose: + if c2, ok := s.clients[c.PathName()]; !ok || c2 != c { + continue + } + s.doClientClose(c) + + case <-s.terminate: + break outer + } + } + go func() { - for req := range s.request { - req.Res <- nil + for { + select { + case req, ok := <-s.request: + if !ok { + return + } + req.Res <- nil + + case _, ok := <-s.clientClose: + if !ok { + return + } + } } }() - s.s.Shutdown(context.Background()) + + for _, c := range s.clients { + s.doClientClose(c) + } + + hs.Shutdown(context.Background()) + close(s.request) + close(s.clientClose) } // ServeHTTP implements http.Handler. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.log(logger.Info, "%s %s from %s", r.Method, r.URL.Path, r.RemoteAddr) + s.Log(logger.Info, "%s %s from %s", r.Method, r.URL.Path, r.RemoteAddr) // remove leading prefix path := r.URL.Path[1:] @@ -98,7 +174,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } cres := make(chan io.Reader) - s.request <- Request{ + s.request <- clienthls.Request{ Path: parts[0], Subpath: parts[1], Req: r, @@ -125,7 +201,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -// Request returns a channel to handle incoming HTTP requests. -func (s *Server) Request() chan Request { - return s.request +func (s *Server) doClientClose(c *clienthls.Client) { + delete(s.clients, c.PathName()) + c.Close() +} + +// OnClientClose is called by a client. +func (s *Server) OnClientClose(c *clienthls.Client) { + s.clientClose <- c } diff --git a/internal/serverrtmp/server.go b/internal/serverrtmp/server.go index a23bdceb..51d11370 100644 --- a/internal/serverrtmp/server.go +++ b/internal/serverrtmp/server.go @@ -97,7 +97,7 @@ func (s *Server) run() { defer close(s.done) s.wg.Add(1) - clientNew := make(chan net.Conn) + connNew := make(chan net.Conn) acceptErr := make(chan error) go func() { defer s.wg.Done() @@ -108,7 +108,7 @@ func (s *Server) run() { return err } - clientNew <- conn + connNew <- conn } }() }() @@ -120,7 +120,7 @@ outer: s.Log(logger.Warn, "ERR: %s", err) break outer - case nconn := <-clientNew: + case nconn := <-connNew: c := clientrtmp.New( s.rtspAddress, s.readTimeout, @@ -154,7 +154,7 @@ outer: return } - case conn, ok := <-clientNew: + case conn, ok := <-connNew: if !ok { return } @@ -177,7 +177,7 @@ outer: s.wg.Wait() close(acceptErr) - close(clientNew) + close(connNew) close(s.clientClose) } diff --git a/internal/serverrtsp/server.go b/internal/serverrtsp/server.go index 33af3c41..dcbe35ae 100644 --- a/internal/serverrtsp/server.go +++ b/internal/serverrtsp/server.go @@ -141,7 +141,7 @@ func (s *Server) run() { defer close(s.done) s.wg.Add(1) - clientNew := make(chan *gortsplib.ServerConn) + connNew := make(chan *gortsplib.ServerConn) acceptErr := make(chan error) go func() { defer s.wg.Done() @@ -152,7 +152,7 @@ func (s *Server) run() { return err } - clientNew <- conn + connNew <- conn } }() }() @@ -164,7 +164,7 @@ outer: s.Log(logger.Warn, "ERR: %s", err) break outer - case conn := <-clientNew: + case conn := <-connNew: c := clientrtsp.New( s.isTLS, s.rtspAddress, @@ -198,7 +198,7 @@ outer: return } - case conn, ok := <-clientNew: + case conn, ok := <-connNew: if !ok { return } @@ -221,7 +221,7 @@ outer: s.wg.Wait() close(acceptErr) - close(clientNew) + close(connNew) close(s.clientClose) } diff --git a/main.go b/main.go index 3c642ebd..c20cb03a 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "github.com/aler9/gortsplib" "gopkg.in/alecthomas/kingpin.v2" - "github.com/aler9/rtsp-simple-server/internal/clientman" "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/confwatcher" "github.com/aler9/rtsp-simple-server/internal/logger" @@ -33,12 +32,11 @@ type program struct { logger *logger.Logger metrics *metrics.Metrics pprof *pprof.PPROF + pathMan *pathman.PathManager serverRTSPPlain *serverrtsp.Server serverRTSPTLS *serverrtsp.Server serverRTMP *serverrtmp.Server serverHLS *serverhls.Server - pathMan *pathman.PathManager - clientMan *clientman.ClientManager confWatcher *confwatcher.ConfWatcher terminate chan struct{} @@ -191,17 +189,6 @@ func (p *program) createResources(initial bool) error { } } - if !p.conf.HLSDisable { - if p.serverHLS == nil { - p.serverHLS, err = serverhls.New( - p.conf.HLSAddress, - p) - if err != nil { - return err - } - } - } - if p.pathMan == nil { p.pathMan = pathman.New( p.conf.RTSPAddress, @@ -215,23 +202,6 @@ func (p *program) createResources(initial bool) error { p) } - if p.clientMan == nil { - p.clientMan = clientman.New( - p.conf.HLSSegmentCount, - p.conf.HLSSegmentDuration, - p.conf.RTSPAddress, - p.conf.ReadTimeout, - p.conf.WriteTimeout, - p.conf.ReadBufferCount, - p.conf.RunOnConnect, - p.conf.RunOnConnectRestart, - p.conf.ProtocolsParsed, - p.stats, - p.pathMan, - p.serverHLS, - p) - } - if !p.conf.RTSPDisable && (p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional) { @@ -310,6 +280,22 @@ func (p *program) createResources(initial bool) error { } } + if !p.conf.HLSDisable { + if p.serverHLS == nil { + p.serverHLS, err = serverhls.New( + p.conf.HLSAddress, + p.conf.HLSSegmentCount, + p.conf.HLSSegmentDuration, + p.conf.ReadBufferCount, + p.stats, + p.pathMan, + p) + if err != nil { + return err + } + } + } + return nil } @@ -342,14 +328,6 @@ func (p *program) closeResources(newConf *conf.Conf) { closePPROF = true } - closeServerHLS := false - if newConf == nil || - newConf.HLSDisable != p.conf.HLSDisable || - newConf.HLSAddress != p.conf.HLSAddress || - closeStats { - closeServerHLS = true - } - closePathMan := false if newConf == nil || newConf.RTSPAddress != p.conf.RTSPAddress || @@ -364,23 +342,6 @@ func (p *program) closeResources(newConf *conf.Conf) { p.pathMan.OnProgramConfReload(newConf.Paths) } - closeClientMan := false - if newConf == nil || - closeServerHLS || - closePathMan || - newConf.HLSSegmentCount != p.conf.HLSSegmentCount || - newConf.HLSSegmentDuration != p.conf.HLSSegmentDuration || - newConf.RTSPAddress != p.conf.RTSPAddress || - newConf.ReadTimeout != p.conf.ReadTimeout || - newConf.WriteTimeout != p.conf.WriteTimeout || - newConf.ReadBufferCount != p.conf.ReadBufferCount || - newConf.RunOnConnect != p.conf.RunOnConnect || - newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || - !reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) || - closeStats { - closeClientMan = true - } - closeServerPlain := false if newConf == nil || newConf.RTSPDisable != p.conf.RTSPDisable || @@ -435,6 +396,18 @@ func (p *program) closeResources(newConf *conf.Conf) { closeServerRTMP = true } + closeServerHLS := false + if newConf == nil || + newConf.HLSDisable != p.conf.HLSDisable || + newConf.HLSAddress != p.conf.HLSAddress || + newConf.HLSSegmentCount != p.conf.HLSSegmentCount || + newConf.HLSSegmentDuration != p.conf.HLSSegmentDuration || + newConf.ReadBufferCount != p.conf.ReadBufferCount || + closeStats || + closePathMan { + closeServerHLS = true + } + if closeServerTLS && p.serverRTSPTLS != nil { p.serverRTSPTLS.Close() p.serverRTSPTLS = nil @@ -445,11 +418,6 @@ func (p *program) closeResources(newConf *conf.Conf) { p.serverRTSPPlain = nil } - if closeClientMan && p.clientMan != nil { - p.clientMan.Close() - p.clientMan = nil - } - if closePathMan && p.pathMan != nil { p.pathMan.Close() p.pathMan = nil