diff --git a/internal/core/api.go b/internal/core/api.go index 036a786b..cf9831ca 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -165,123 +165,22 @@ func loadConfPathData(ctx *gin.Context) (interface{}, error) { return in, err } -type apiPathsListItem struct { - ConfName string `json:"confName"` - Conf *conf.PathConf `json:"conf"` - Source interface{} `json:"source"` - SourceReady bool `json:"sourceReady"` - Readers []interface{} `json:"readers"` -} - -type apiPathsListData struct { - Items map[string]apiPathsListItem `json:"items"` -} - -type apiPathsListRes struct { - Data *apiPathsListData - Paths map[string]*path - Err error -} - -type apiPathsListReq struct { - Res chan apiPathsListRes -} - -type apiPathsListSubReq struct { - Data *apiPathsListData - Res chan struct{} -} - -type apiRTSPSessionsListItem struct { - RemoteAddr string `json:"remoteAddr"` - State string `json:"state"` -} - -type apiRTSPSessionsListData struct { - Items map[string]apiRTSPSessionsListItem `json:"items"` -} - -type apiRTSPSessionsListRes struct { - Data *apiRTSPSessionsListData - Err error -} - -type apiRTSPSessionsListReq struct{} - -type apiRTSPSessionsKickRes struct { - Err error -} - -type apiRTSPSessionsKickReq struct { - ID string -} - -type apiRTMPConnsListItem struct { - RemoteAddr string `json:"remoteAddr"` - State string `json:"state"` -} - -type apiRTMPConnsListData struct { - Items map[string]apiRTMPConnsListItem `json:"items"` -} - -type apiRTMPConnsListRes struct { - Data *apiRTMPConnsListData - Err error -} - -type apiRTMPConnsListReq struct { - Res chan apiRTMPConnsListRes -} - -type apiRTMPConnsKickRes struct { - Err error -} - -type apiRTMPConnsKickReq struct { - ID string - Res chan apiRTMPConnsKickRes -} - -type apiHLSMuxersListItem struct { - LastRequest string `json:"lastRequest"` -} - -type apiHLSMuxersListData struct { - Items map[string]apiHLSMuxersListItem `json:"items"` -} - -type apiHLSMuxersListRes struct { - Data *apiHLSMuxersListData - Muxers map[string]*hlsMuxer - Err error -} - -type apiHLSMuxersListReq struct { - Res chan apiHLSMuxersListRes -} - -type apiHLSMuxersListSubReq struct { - Data *apiHLSMuxersListData - Res chan struct{} -} - type apiPathManager interface { - onAPIPathsList(req apiPathsListReq) apiPathsListRes + onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListRes } type apiRTSPServer interface { - onAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes - onAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSessionsKickRes + onAPISessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes + onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes } type apiRTMPServer interface { - onAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes - onAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes + onAPIConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes + onAPIConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes } type apiHLSServer interface { - onAPIHLSMuxersList(req apiHLSMuxersListReq) apiHLSMuxersListRes + onAPIHLSMuxersList(req hlsServerMuxersListReq) hlsServerMuxersListRes } type apiParent interface { @@ -336,6 +235,7 @@ func newAPI( group.POST("/v1/config/paths/add/*name", a.onConfigPathsAdd) group.POST("/v1/config/paths/edit/*name", a.onConfigPathsEdit) group.POST("/v1/config/paths/remove/*name", a.onConfigPathsDelete) + group.GET("/v1/paths/list", a.onPathsList) if !interfaceIsEmpty(a.rtspServer) { @@ -554,7 +454,7 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { } func (a *api) onPathsList(ctx *gin.Context) { - res := a.pathManager.onAPIPathsList(apiPathsListReq{}) + res := a.pathManager.onAPIPathsList(pathAPIPathsListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -564,7 +464,7 @@ func (a *api) onPathsList(ctx *gin.Context) { } func (a *api) onRTSPSessionsList(ctx *gin.Context) { - res := a.rtspServer.onAPIRTSPSessionsList(apiRTSPSessionsListReq{}) + res := a.rtspServer.onAPISessionsList(rtspServerAPISessionsListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -576,7 +476,7 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) { func (a *api) onRTSPSessionsKick(ctx *gin.Context) { id := ctx.Param("id") - res := a.rtspServer.onAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) + res := a.rtspServer.onAPISessionsKick(rtspServerAPISessionsKickReq{ID: id}) if res.Err != nil { ctx.AbortWithStatus(http.StatusNotFound) return @@ -586,7 +486,7 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { } func (a *api) onRTSPSSessionsList(ctx *gin.Context) { - res := a.rtspsServer.onAPIRTSPSessionsList(apiRTSPSessionsListReq{}) + res := a.rtspsServer.onAPISessionsList(rtspServerAPISessionsListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -598,7 +498,7 @@ func (a *api) onRTSPSSessionsList(ctx *gin.Context) { func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { id := ctx.Param("id") - res := a.rtspsServer.onAPIRTSPSessionsKick(apiRTSPSessionsKickReq{ID: id}) + res := a.rtspsServer.onAPISessionsKick(rtspServerAPISessionsKickReq{ID: id}) if res.Err != nil { ctx.AbortWithStatus(http.StatusNotFound) return @@ -608,7 +508,7 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { } func (a *api) onRTMPConnsList(ctx *gin.Context) { - res := a.rtmpServer.onAPIRTMPConnsList(apiRTMPConnsListReq{}) + res := a.rtmpServer.onAPIConnsList(rtmpServerAPIConnsListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return @@ -620,7 +520,7 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) { func (a *api) onRTMPConnsKick(ctx *gin.Context) { id := ctx.Param("id") - res := a.rtmpServer.onAPIRTMPConnsKick(apiRTMPConnsKickReq{ID: id}) + res := a.rtmpServer.onAPIConnsKick(rtmpServerAPIConnsKickReq{ID: id}) if res.Err != nil { ctx.AbortWithStatus(http.StatusNotFound) return @@ -630,7 +530,7 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { } func (a *api) onHLSMuxersList(ctx *gin.Context) { - res := a.hlsServer.onAPIHLSMuxersList(apiHLSMuxersListReq{}) + res := a.hlsServer.onAPIHLSMuxersList(hlsServerMuxersListReq{}) if res.Err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return diff --git a/internal/core/core_test.go b/internal/core/core_test.go index fd9d1e8b..f352114d 100644 --- a/internal/core/core_test.go +++ b/internal/core/core_test.go @@ -188,7 +188,7 @@ func TestCorePathAutoDeletion(t *testing.T) { } }() - res := p.pathManager.onAPIPathsList(apiPathsListReq{}) + res := p.pathManager.onAPIPathsList(pathAPIPathsListReq{}) require.NoError(t, res.Err) require.Equal(t, 0, len(res.Data.Items)) diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 5b991178..84cbba34 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -141,8 +141,8 @@ type hlsMuxer struct { requests []hlsMuxerRequest // in - request chan hlsMuxerRequest - apiHLSMuxersList chan apiHLSMuxersListSubReq + request chan hlsMuxerRequest + hlsServerMuxersList chan hlsServerMuxersListSubReq } func newHLSMuxer( @@ -174,8 +174,8 @@ func newHLSMuxer( v := time.Now().Unix() return &v }(), - request: make(chan hlsMuxerRequest), - apiHLSMuxersList: make(chan apiHLSMuxersListSubReq), + request: make(chan hlsMuxerRequest), + hlsServerMuxersList: make(chan hlsServerMuxersListSubReq), } m.log(logger.Info, "opened") @@ -226,8 +226,8 @@ func (m *hlsMuxer) run() { m.requests = append(m.requests, req) } - case req := <-m.apiHLSMuxersList: - req.Data.Items[m.name] = apiHLSMuxersListItem{ + case req := <-m.hlsServerMuxersList: + req.Data.Items[m.name] = hlsServerMuxersListItem{ LastRequest: time.Unix(atomic.LoadInt64(m.lastRequestTime), 0).String(), } close(req.Res) @@ -512,10 +512,10 @@ func (m *hlsMuxer) onReaderAPIDescribe() interface{} { } // onAPIHLSMuxersList is called by api. -func (m *hlsMuxer) onAPIHLSMuxersList(req apiHLSMuxersListSubReq) { +func (m *hlsMuxer) onAPIHLSMuxersList(req hlsServerMuxersListSubReq) { req.Res = make(chan struct{}) select { - case m.apiHLSMuxersList <- req: + case m.hlsServerMuxersList <- req: <-req.Res case <-m.ctx.Done(): diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index 02713b19..0e2fa92a 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -17,6 +17,29 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" ) +type hlsServerMuxersListItem struct { + LastRequest string `json:"lastRequest"` +} + +type hlsServerMuxersListData struct { + Items map[string]hlsServerMuxersListItem `json:"items"` +} + +type hlsServerMuxersListRes struct { + Data *hlsServerMuxersListData + Muxers map[string]*hlsMuxer + Err error +} + +type hlsServerMuxersListReq struct { + Res chan hlsServerMuxersListRes +} + +type hlsServerMuxersListSubReq struct { + Data *hlsServerMuxersListData + Res chan struct{} +} + type hlsServerParent interface { Log(logger.Level, string, ...interface{}) } @@ -38,10 +61,10 @@ type hlsServer struct { muxers map[string]*hlsMuxer // in - pathSourceReady chan *path - request chan hlsMuxerRequest - muxerClose chan *hlsMuxer - apiHLSMuxersList chan apiHLSMuxersListReq + pathSourceReady chan *path + request chan hlsMuxerRequest + muxerClose chan *hlsMuxer + apiMuxersList chan hlsServerMuxersListReq } func newHLSServer( @@ -79,7 +102,7 @@ func newHLSServer( pathSourceReady: make(chan *path), request: make(chan hlsMuxerRequest), muxerClose: make(chan *hlsMuxer), - apiHLSMuxersList: make(chan apiHLSMuxersListReq), + apiMuxersList: make(chan hlsServerMuxersListReq), } s.log(logger.Info, "listener opened on "+address) @@ -134,14 +157,14 @@ outer: } delete(s.muxers, c.PathName()) - case req := <-s.apiHLSMuxersList: + case req := <-s.apiMuxersList: muxers := make(map[string]*hlsMuxer) for name, m := range s.muxers { muxers[name] = m } - req.Res <- apiHLSMuxersListRes{ + req.Res <- hlsServerMuxersListRes{ Muxers: muxers, } @@ -275,23 +298,23 @@ func (s *hlsServer) onPathSourceReady(pa *path) { } // onAPIHLSMuxersList is called by api. -func (s *hlsServer) onAPIHLSMuxersList(req apiHLSMuxersListReq) apiHLSMuxersListRes { - req.Res = make(chan apiHLSMuxersListRes) +func (s *hlsServer) onAPIHLSMuxersList(req hlsServerMuxersListReq) hlsServerMuxersListRes { + req.Res = make(chan hlsServerMuxersListRes) select { - case s.apiHLSMuxersList <- req: + case s.apiMuxersList <- req: res := <-req.Res - res.Data = &apiHLSMuxersListData{ - Items: make(map[string]apiHLSMuxersListItem), + res.Data = &hlsServerMuxersListData{ + Items: make(map[string]hlsServerMuxersListItem), } for _, pa := range res.Muxers { - pa.onAPIHLSMuxersList(apiHLSMuxersListSubReq{Data: res.Data}) + pa.onAPIHLSMuxersList(hlsServerMuxersListSubReq{Data: res.Data}) } return res case <-s.ctx.Done(): - return apiHLSMuxersListRes{Err: fmt.Errorf("terminated")} + return hlsServerMuxersListRes{Err: fmt.Errorf("terminated")} } } diff --git a/internal/core/metrics.go b/internal/core/metrics.go index 41d002d8..1b71689c 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -18,19 +18,19 @@ func metric(key string, value int64) string { } type metricsPathManager interface { - onAPIPathsList(req apiPathsListReq) apiPathsListRes + onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListRes } type metricsRTSPServer interface { - onAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes + onAPISessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes } type metricsRTMPServer interface { - onAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes + onAPIConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes } type metricsHLSServer interface { - onAPIHLSMuxersList(req apiHLSMuxersListReq) apiHLSMuxersListRes + onAPIHLSMuxersList(req hlsServerMuxersListReq) hlsServerMuxersListRes } type metricsParent interface { @@ -88,7 +88,7 @@ func (m *metrics) run() { func (m *metrics) onMetrics(ctx *gin.Context) { out := "" - res := m.pathManager.onAPIPathsList(apiPathsListReq{}) + res := m.pathManager.onAPIPathsList(pathAPIPathsListReq{}) if res.Err == nil { for name, p := range res.Data.Items { if p.SourceReady { @@ -100,7 +100,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.rtspServer) { - res := m.rtspServer.onAPIRTSPSessionsList(apiRTSPSessionsListReq{}) + res := m.rtspServer.onAPISessionsList(rtspServerAPISessionsListReq{}) if res.Err == nil { idleCount := int64(0) readCount := int64(0) @@ -127,7 +127,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.rtspsServer) { - res := m.rtspsServer.onAPIRTSPSessionsList(apiRTSPSessionsListReq{}) + res := m.rtspsServer.onAPISessionsList(rtspServerAPISessionsListReq{}) if res.Err == nil { idleCount := int64(0) readCount := int64(0) @@ -154,7 +154,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.rtmpServer) { - res := m.rtmpServer.onAPIRTMPConnsList(apiRTMPConnsListReq{}) + res := m.rtmpServer.onAPIConnsList(rtmpServerAPIConnsListReq{}) if res.Err == nil { idleCount := int64(0) readCount := int64(0) @@ -181,7 +181,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.hlsServer) { - res := m.hlsServer.onAPIHLSMuxersList(apiHLSMuxersListReq{}) + res := m.hlsServer.onAPIHLSMuxersList(hlsServerMuxersListReq{}) if res.Err == nil { for name := range res.Data.Items { out += metric("hls_muxers{name=\""+name+"\"}", 1) diff --git a/internal/core/path.go b/internal/core/path.go index d6b0d354..3ba8a6ad 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -179,6 +179,33 @@ type pathPublisherPauseReq struct { Res chan struct{} } +type pathAPIPathsListItem struct { + ConfName string `json:"confName"` + Conf *conf.PathConf `json:"conf"` + Source interface{} `json:"source"` + SourceReady bool `json:"sourceReady"` + Readers []interface{} `json:"readers"` +} + +type pathAPIPathsListData struct { + Items map[string]pathAPIPathsListItem `json:"items"` +} + +type pathAPIPathsListRes struct { + Data *pathAPIPathsListData + Paths map[string]*path + Err error +} + +type pathAPIPathsListReq struct { + Res chan pathAPIPathsListRes +} + +type pathAPIPathsListSubReq struct { + Data *pathAPIPathsListData + Res chan struct{} +} + type path struct { rtspAddress string readTimeout conf.StringDuration @@ -218,7 +245,7 @@ type path struct { readerSetupPlay chan pathReaderSetupPlayReq readerPlay chan pathReaderPlayReq readerPause chan pathReaderPauseReq - apiPathsList chan apiPathsListSubReq + apiPathsList chan pathAPIPathsListSubReq } func newPath( @@ -262,7 +289,7 @@ func newPath( readerSetupPlay: make(chan pathReaderSetupPlayReq), readerPlay: make(chan pathReaderPlayReq), readerPause: make(chan pathReaderPauseReq), - apiPathsList: make(chan apiPathsListSubReq), + apiPathsList: make(chan pathAPIPathsListSubReq), } pa.log(logger.Info, "opened") @@ -832,8 +859,8 @@ func (pa *path) handleReaderPause(req pathReaderPauseReq) { close(req.Res) } -func (pa *path) handleAPIPathsList(req apiPathsListSubReq) { - req.Data.Items[pa.name] = apiPathsListItem{ +func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) { + req.Data.Items[pa.name] = pathAPIPathsListItem{ ConfName: pa.confName, Conf: pa.conf, Source: func() interface{} { @@ -967,7 +994,7 @@ func (pa *path) onReaderPause(req pathReaderPauseReq) { } // onAPIPathsList is called by api. -func (pa *path) onAPIPathsList(req apiPathsListSubReq) { +func (pa *path) onAPIPathsList(req pathAPIPathsListSubReq) { req.Res = make(chan struct{}) select { case pa.apiPathsList <- req: diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 542fc4cb..f6840bf8 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -44,7 +44,7 @@ type pathManager struct { readerSetupPlay chan pathReaderSetupPlayReq publisherAnnounce chan pathPublisherAnnounceReq hlsServerSet chan pathManagerHLSServer - apiPathsList chan apiPathsListReq + apiPathsList chan pathAPIPathsListReq } func newPathManager( @@ -78,7 +78,7 @@ func newPathManager( readerSetupPlay: make(chan pathReaderSetupPlayReq), publisherAnnounce: make(chan pathPublisherAnnounceReq), hlsServerSet: make(chan pathManagerHLSServer), - apiPathsList: make(chan apiPathsListReq), + apiPathsList: make(chan pathAPIPathsListReq), } for pathName, pathConf := range pm.pathConfs { @@ -254,7 +254,7 @@ outer: paths[name] = pa } - req.Res <- apiPathsListRes{ + req.Res <- pathAPIPathsListRes{ Paths: paths, } @@ -421,23 +421,23 @@ func (pm *pathManager) onHLSServerSet(s pathManagerHLSServer) { } // onAPIPathsList is called by api. -func (pm *pathManager) onAPIPathsList(req apiPathsListReq) apiPathsListRes { - req.Res = make(chan apiPathsListRes) +func (pm *pathManager) onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListRes { + req.Res = make(chan pathAPIPathsListRes) select { case pm.apiPathsList <- req: res := <-req.Res - res.Data = &apiPathsListData{ - Items: make(map[string]apiPathsListItem), + res.Data = &pathAPIPathsListData{ + Items: make(map[string]pathAPIPathsListItem), } for _, pa := range res.Paths { - pa.onAPIPathsList(apiPathsListSubReq{Data: res.Data}) + pa.onAPIPathsList(pathAPIPathsListSubReq{Data: res.Data}) } return res case <-pm.ctx.Done(): - return apiPathsListRes{Err: fmt.Errorf("terminated")} + return pathAPIPathsListRes{Err: fmt.Errorf("terminated")} } } diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index fc71a4a8..ef547c99 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -15,6 +15,33 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" ) +type rtmpServerAPIConnsListItem struct { + RemoteAddr string `json:"remoteAddr"` + State string `json:"state"` +} + +type rtmpServerAPIConnsListData struct { + Items map[string]rtmpServerAPIConnsListItem `json:"items"` +} + +type rtmpServerAPIConnsListRes struct { + Data *rtmpServerAPIConnsListData + Err error +} + +type rtmpServerAPIConnsListReq struct { + Res chan rtmpServerAPIConnsListRes +} + +type rtmpServerAPIConnsKickRes struct { + Err error +} + +type rtmpServerAPIConnsKickReq struct { + ID string + Res chan rtmpServerAPIConnsKickRes +} + type rtmpServerParent interface { Log(logger.Level, string, ...interface{}) } @@ -37,9 +64,9 @@ type rtmpServer struct { conns map[*rtmpConn]struct{} // in - connClose chan *rtmpConn - apiRTMPConnsList chan apiRTMPConnsListReq - apiRTMPConnsKick chan apiRTMPConnsKickReq + connClose chan *rtmpConn + apiConnsList chan rtmpServerAPIConnsListReq + apiConnsKick chan rtmpServerAPIConnsKickReq } func newRTMPServer( @@ -76,8 +103,8 @@ func newRTMPServer( l: l, conns: make(map[*rtmpConn]struct{}), connClose: make(chan *rtmpConn), - apiRTMPConnsList: make(chan apiRTMPConnsListReq), - apiRTMPConnsKick: make(chan apiRTMPConnsKickReq), + apiConnsList: make(chan rtmpServerAPIConnsListReq), + apiConnsKick: make(chan rtmpServerAPIConnsKickReq), } s.log(logger.Info, "listener opened on %s", address) @@ -162,13 +189,13 @@ outer: } delete(s.conns, c) - case req := <-s.apiRTMPConnsList: - data := &apiRTMPConnsListData{ - Items: make(map[string]apiRTMPConnsListItem), + case req := <-s.apiConnsList: + data := &rtmpServerAPIConnsListData{ + Items: make(map[string]rtmpServerAPIConnsListItem), } for c := range s.conns { - data.Items[c.ID()] = apiRTMPConnsListItem{ + data.Items[c.ID()] = rtmpServerAPIConnsListItem{ RemoteAddr: c.RemoteAddr().String(), State: func() string { switch c.safeState() { @@ -183,9 +210,9 @@ outer: } } - req.Res <- apiRTMPConnsListRes{Data: data} + req.Res <- rtmpServerAPIConnsListRes{Data: data} - case req := <-s.apiRTMPConnsKick: + case req := <-s.apiConnsKick: res := func() bool { for c := range s.conns { if c.ID() == req.ID { @@ -197,9 +224,9 @@ outer: return false }() if res { - req.Res <- apiRTMPConnsKickRes{} + req.Res <- rtmpServerAPIConnsKickRes{} } else { - req.Res <- apiRTMPConnsKickRes{fmt.Errorf("not found")} + req.Res <- rtmpServerAPIConnsKickRes{fmt.Errorf("not found")} } case <-s.ctx.Done(): @@ -252,24 +279,26 @@ func (s *rtmpServer) onConnClose(c *rtmpConn) { } } -// onAPIRTMPConnsList is called by api. -func (s *rtmpServer) onAPIRTMPConnsList(req apiRTMPConnsListReq) apiRTMPConnsListRes { - req.Res = make(chan apiRTMPConnsListRes) +// onAPIConnsList is called by api. +func (s *rtmpServer) onAPIConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes { + req.Res = make(chan rtmpServerAPIConnsListRes) select { - case s.apiRTMPConnsList <- req: + case s.apiConnsList <- req: return <-req.Res + case <-s.ctx.Done(): - return apiRTMPConnsListRes{Err: fmt.Errorf("terminated")} + return rtmpServerAPIConnsListRes{Err: fmt.Errorf("terminated")} } } -// onAPIRTMPConnsKick is called by api. -func (s *rtmpServer) onAPIRTMPConnsKick(req apiRTMPConnsKickReq) apiRTMPConnsKickRes { - req.Res = make(chan apiRTMPConnsKickRes) +// onAPIConnsKick is called by api. +func (s *rtmpServer) onAPIConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes { + req.Res = make(chan rtmpServerAPIConnsKickRes) select { - case s.apiRTMPConnsKick <- req: + case s.apiConnsKick <- req: return <-req.Res + case <-s.ctx.Done(): - return apiRTMPConnsKickRes{Err: fmt.Errorf("terminated")} + return rtmpServerAPIConnsKickRes{Err: fmt.Errorf("terminated")} } } diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index be7616a3..f4c11af5 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -20,6 +20,30 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" ) +type rtspServerAPISessionsListItem struct { + RemoteAddr string `json:"remoteAddr"` + State string `json:"state"` +} + +type rtspServerAPISessionsListData struct { + Items map[string]rtspServerAPISessionsListItem `json:"items"` +} + +type rtspServerAPISessionsListRes struct { + Data *rtspServerAPISessionsListData + Err error +} + +type rtspServerAPISessionsListReq struct{} + +type rtspServerAPISessionsKickRes struct { + Err error +} + +type rtspServerAPISessionsKickReq struct { + ID string +} + type rtspServerParent interface { Log(logger.Level, string, ...interface{}) } @@ -359,23 +383,23 @@ func (s *rtspServer) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) { se.onFrame(ctx) } -// onAPIRTSPSessionsList is called by api and metrics. -func (s *rtspServer) onAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSessionsListRes { +// onAPISessionsList is called by api and metrics. +func (s *rtspServer) onAPISessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes { select { case <-s.ctx.Done(): - return apiRTSPSessionsListRes{Err: fmt.Errorf("terminated")} + return rtspServerAPISessionsListRes{Err: fmt.Errorf("terminated")} default: } s.mutex.RLock() defer s.mutex.RUnlock() - data := &apiRTSPSessionsListData{ - Items: make(map[string]apiRTSPSessionsListItem), + data := &rtspServerAPISessionsListData{ + Items: make(map[string]rtspServerAPISessionsListItem), } for _, s := range s.sessions { - data.Items[s.ID()] = apiRTSPSessionsListItem{ + data.Items[s.ID()] = rtspServerAPISessionsListItem{ RemoteAddr: s.RemoteAddr().String(), State: func() string { switch s.safeState() { @@ -392,14 +416,14 @@ func (s *rtspServer) onAPIRTSPSessionsList(req apiRTSPSessionsListReq) apiRTSPSe } } - return apiRTSPSessionsListRes{Data: data} + return rtspServerAPISessionsListRes{Data: data} } -// onAPIRTSPSessionsKick is called by api. -func (s *rtspServer) onAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSessionsKickRes { +// onAPISessionsKick is called by api. +func (s *rtspServer) onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes { select { case <-s.ctx.Done(): - return apiRTSPSessionsKickRes{Err: fmt.Errorf("terminated")} + return rtspServerAPISessionsKickRes{Err: fmt.Errorf("terminated")} default: } @@ -411,9 +435,9 @@ func (s *rtspServer) onAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSe se.close() delete(s.sessions, key) se.onClose(liberrors.ErrServerTerminated{}) - return apiRTSPSessionsKickRes{} + return rtspServerAPISessionsKickRes{} } } - return apiRTSPSessionsKickRes{Err: fmt.Errorf("not found")} + return rtspServerAPISessionsKickRes{Err: fmt.Errorf("not found")} }