From 6702cb41ed54414198b9e0194aadc796c10935ab Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 12 Aug 2021 10:50:29 +0200 Subject: [PATCH] api: make sure that entities are deleted immediately after a kick request --- internal/core/hls_remuxer.go | 6 +---- internal/core/hls_server.go | 11 +-------- internal/core/rtmp_conn.go | 6 +---- internal/core/rtmp_server.go | 13 ++--------- internal/core/rtsp_conn.go | 26 +++++++++++----------- internal/core/rtsp_server.go | 14 +++++++----- internal/core/rtsp_session.go | 42 +++++++++++++++++------------------ 7 files changed, 48 insertions(+), 70 deletions(-) diff --git a/internal/core/hls_remuxer.go b/internal/core/hls_remuxer.go index c3696e25..8dd1c38b 100644 --- a/internal/core/hls_remuxer.go +++ b/internal/core/hls_remuxer.go @@ -158,11 +158,6 @@ func newHLSRemuxer( return r } -// ParentClose closes a Remuxer. -func (r *hlsRemuxer) ParentClose() { - r.log(logger.Info, "destroyed") -} - func (r *hlsRemuxer) Close() { r.ctxCancel() } @@ -178,6 +173,7 @@ func (r *hlsRemuxer) PathName() string { func (r *hlsRemuxer) run() { defer r.wg.Done() + defer r.log(logger.Info, "destroyed") remuxerCtx, remuxerCtxCancel := context.WithCancel(context.Background()) remuxerReady := make(chan struct{}) diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index 6ad8947e..448dd253 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -119,7 +119,7 @@ outer: if c2, ok := s.remuxers[c.PathName()]; !ok || c2 != c { continue } - s.doRemuxerClose(c) + delete(s.remuxers, c.PathName()) case <-s.ctx.Done(): break outer @@ -128,10 +128,6 @@ outer: s.ctxCancel() - for _, c := range s.remuxers { - s.doRemuxerClose(c) - } - hs.Shutdown(context.Background()) s.pathManager.OnHLSServer(nil) @@ -234,11 +230,6 @@ func (s *hlsServer) findOrCreateRemuxer(pathName string) *hlsRemuxer { return r } -func (s *hlsServer) doRemuxerClose(c *hlsRemuxer) { - delete(s.remuxers, c.PathName()) - c.ParentClose() -} - // OnRemuxerClose is called by hlsRemuxer. func (s *hlsServer) OnRemuxerClose(c *hlsRemuxer) { select { diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index a5221f3f..2b2637f8 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -120,11 +120,6 @@ func newRTMPConn( return c } -// ParentClose closes a Conn. -func (c *rtmpConn) ParentClose() { - c.log(logger.Info, "closed") -} - // Close closes a Conn. func (c *rtmpConn) Close() { c.ctxCancel() @@ -156,6 +151,7 @@ func (c *rtmpConn) safeState() gortsplib.ServerSessionState { func (c *rtmpConn) run() { defer c.wg.Done() + defer c.log(logger.Info, "closed") if c.runOnConnect != "" { _, port, _ := net.SplitHostPort(c.rtspAddress) diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index 0905ed25..a8b069ad 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -157,7 +157,7 @@ outer: if _, ok := s.conns[c]; !ok { continue } - s.doConnClose(c) + delete(s.conns, c) case req := <-s.apiRTMPConnsList: for c := range s.conns { @@ -181,6 +181,7 @@ outer: res := func() bool { for c := range s.conns { if c.ID() == req.ID { + delete(s.conns, c) c.Close() return true } @@ -201,10 +202,6 @@ outer: s.ctxCancel() s.l.Close() - - for c := range s.conns { - s.doConnClose(c) - } } func (s *rtmpServer) newConnID() (string, error) { @@ -235,12 +232,6 @@ func (s *rtmpServer) newConnID() (string, error) { } } -func (s *rtmpServer) doConnClose(c *rtmpConn) { - delete(s.conns, c) - c.ParentClose() - c.Close() -} - // OnConnClose is called by rtmpConn. func (s *rtmpServer) OnConnClose(c *rtmpConn) { select { diff --git a/internal/core/rtsp_conn.go b/internal/core/rtsp_conn.go index 77d52089..803f43ca 100644 --- a/internal/core/rtsp_conn.go +++ b/internal/core/rtsp_conn.go @@ -87,19 +87,6 @@ func newRTSPConn( return c } -// ParentClose closes a Conn. -func (c *rtspConn) ParentClose(err error) { - if err != io.EOF && !isTeardownErr(err) && !isTerminatedErr(err) { - c.log(logger.Info, "ERR: %v", err) - } - - c.log(logger.Info, "closed") - - if c.onConnectCmd != nil { - c.onConnectCmd.Close() - } -} - func (c *rtspConn) log(level logger.Level, format string, args ...interface{}) { c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...) } @@ -178,6 +165,19 @@ func (c *rtspConn) validateCredentials( return nil } +// OnClose is called by rtspServer. +func (c *rtspConn) OnClose(err error) { + if err != io.EOF && !isTeardownErr(err) && !isTerminatedErr(err) { + c.log(logger.Info, "ERR: %v", err) + } + + c.log(logger.Info, "closed") + + if c.onConnectCmd != nil { + c.onConnectCmd.Close() + } +} + // OnRequest is called by rtspServer. func (c *rtspConn) OnRequest(req *base.Request) { c.log(logger.Debug, "[c->s] %v", req) diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index 18cdeb59..0ce9c1bc 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -234,7 +234,7 @@ func (s *rtspServer) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { delete(s.conns, ctx.Conn) s.mutex.Unlock() - c.ParentClose(ctx.Error) + c.OnClose(ctx.Error) } // OnRequest implements gortsplib.ServerHandlerOnRequest. @@ -281,7 +281,9 @@ func (s *rtspServer) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCt delete(s.sessions, ctx.Session) s.mutex.Unlock() - se.ParentClose() + if se != nil { + se.OnClose() + } } // OnDescribe implements gortsplib.ServerHandlerOnDescribe. @@ -384,9 +386,11 @@ func (s *rtspServer) OnAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSe s.mutex.RLock() defer s.mutex.RUnlock() - for _, s := range s.sessions { - if s.ID() == req.ID { - s.Close() + for key, se := range s.sessions { + if se.ID() == req.ID { + se.Close() + delete(s.sessions, key) + se.OnClose() return apiRTSPSessionsKickRes{} } } diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 8a3dae17..49f73ee2 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -69,27 +69,6 @@ func newRTSPSession( return s } -// ParentClose closes a Session. -func (s *rtspSession) ParentClose() { - if s.ss.State() == gortsplib.ServerSessionStateRead { - if s.onReadCmd != nil { - s.onReadCmd.Close() - } - } - - switch s.ss.State() { - case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead: - s.path.OnReaderRemove(pathReaderRemoveReq{Author: s}) - s.path = nil - - case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish: - s.path.OnPublisherRemove(pathPublisherRemoveReq{Author: s}) - s.path = nil - } - - s.log(logger.Info, "closed") -} - // Close closes a Session. func (s *rtspSession) Close() { s.ss.Close() @@ -125,6 +104,27 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{} s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.id}, args...)...) } +// OnClose is called by rtspServer. +func (s *rtspSession) OnClose() { + if s.ss.State() == gortsplib.ServerSessionStateRead { + if s.onReadCmd != nil { + s.onReadCmd.Close() + } + } + + switch s.ss.State() { + case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead: + s.path.OnReaderRemove(pathReaderRemoveReq{Author: s}) + s.path = nil + + case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish: + s.path.OnPublisherRemove(pathPublisherRemoveReq{Author: s}) + s.path = nil + } + + s.log(logger.Info, "closed") +} + // OnAnnounce is called by rtspServer. func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { res := s.pathManager.OnPublisherAnnounce(pathPublisherAnnounceReq{