From 13414214124d6cfb96fc03956bcb598581413516 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Tue, 26 Dec 2023 13:41:15 +0100 Subject: [PATCH] move api, metrics and pprof into dedicated packages (#2843) --- internal/{core => api}/api.go | 286 ++++++++++++-------------- internal/api/api_test.go | 39 ++++ internal/core/api_test.go | 34 +-- internal/core/core.go | 103 +++++----- internal/core/path.go | 4 +- internal/core/path_manager.go | 16 +- internal/core/path_manager_test.go | 2 +- internal/{core => metrics}/metrics.go | 72 ++++--- internal/{core => pprof}/pprof.go | 36 ++-- 9 files changed, 299 insertions(+), 293 deletions(-) rename internal/{core => api}/api.go (75%) create mode 100644 internal/api/api_test.go rename internal/{core => metrics}/metrics.go (82%) rename internal/{core => pprof}/pprof.go (56%) diff --git a/internal/core/api.go b/internal/api/api.go similarity index 75% rename from internal/core/api.go rename to internal/api/api.go index ddbc2dac..60c3f808 100644 --- a/internal/core/api.go +++ b/internal/api/api.go @@ -1,4 +1,5 @@ -package core +// Package api contains the API server. +package api import ( "encoding/json" @@ -97,17 +98,20 @@ func paramName(ctx *gin.Context) (string, bool) { return name[1:], true } -type apiPathManager interface { - apiPathsList() (*defs.APIPathList, error) - apiPathsGet(string) (*defs.APIPath, error) +// PathManager contains methods used by the API and Metrics server. +type PathManager interface { + APIPathsList() (*defs.APIPathList, error) + APIPathsGet(string) (*defs.APIPath, error) } -type apiHLSServer interface { +// HLSServer contains methods used by the API and Metrics server. +type HLSServer interface { APIMuxersList() (*defs.APIHLSMuxerList, error) APIMuxersGet(string) (*defs.APIHLSMuxer, error) } -type apiRTSPServer interface { +// RTSPServer contains methods used by the API and Metrics server. +type RTSPServer interface { APIConnsList() (*defs.APIRTSPConnsList, error) APIConnsGet(uuid.UUID) (*defs.APIRTSPConn, error) APISessionsList() (*defs.APIRTSPSessionList, error) @@ -115,19 +119,22 @@ type apiRTSPServer interface { APISessionsKick(uuid.UUID) error } -type apiRTMPServer interface { +// RTMPServer contains methods used by the API and Metrics server. +type RTMPServer interface { APIConnsList() (*defs.APIRTMPConnList, error) APIConnsGet(uuid.UUID) (*defs.APIRTMPConn, error) APIConnsKick(uuid.UUID) error } -type apiSRTServer interface { +// SRTServer contains methods used by the API and Metrics server. +type SRTServer interface { APIConnsList() (*defs.APISRTConnList, error) APIConnsGet(uuid.UUID) (*defs.APISRTConn, error) APIConnsKick(uuid.UUID) error } -type apiWebRTCServer interface { +// WebRTCServer contains methods used by the API and Metrics server. +type WebRTCServer interface { APISessionsList() (*defs.APIWebRTCSessionList, error) APISessionsGet(uuid.UUID) (*defs.APIWebRTCSession, error) APISessionsKick(uuid.UUID) error @@ -135,52 +142,30 @@ type apiWebRTCServer interface { type apiParent interface { logger.Writer - apiConfigSet(conf *conf.Conf) + APIConfigSet(conf *conf.Conf) } -type api struct { - conf *conf.Conf - pathManager apiPathManager - rtspServer apiRTSPServer - rtspsServer apiRTSPServer - rtmpServer apiRTMPServer - rtmpsServer apiRTMPServer - hlsManager apiHLSServer - webRTCServer apiWebRTCServer - srtServer apiSRTServer - parent apiParent +// API is an API server. +type API struct { + Address string + ReadTimeout conf.StringDuration + Conf *conf.Conf + PathManager PathManager + RTSPServer RTSPServer + RTSPSServer RTSPServer + RTMPServer RTMPServer + RTMPSServer RTMPServer + HLSServer HLSServer + WebRTCServer WebRTCServer + SRTServer SRTServer + Parent apiParent httpServer *httpserv.WrappedServer mutex sync.Mutex } -func newAPI( - address string, - readTimeout conf.StringDuration, - conf *conf.Conf, - pathManager apiPathManager, - rtspServer apiRTSPServer, - rtspsServer apiRTSPServer, - rtmpServer apiRTMPServer, - rtmpsServer apiRTMPServer, - hlsManager apiHLSServer, - webRTCServer apiWebRTCServer, - srtServer apiSRTServer, - parent apiParent, -) (*api, error) { - a := &api{ - conf: conf, - pathManager: pathManager, - rtspServer: rtspServer, - rtspsServer: rtspsServer, - rtmpServer: rtmpServer, - rtmpsServer: rtmpsServer, - hlsManager: hlsManager, - webRTCServer: webRTCServer, - srtServer: srtServer, - parent: parent, - } - +// Initialize initializes API. +func (a *API) Initialize() error { router := gin.New() router.SetTrustedProxies(nil) //nolint:errcheck @@ -202,12 +187,12 @@ func newAPI( group.GET("/v3/paths/list", a.onPathsList) group.GET("/v3/paths/get/*name", a.onPathsGet) - if !interfaceIsEmpty(a.hlsManager) { + if !interfaceIsEmpty(a.HLSServer) { group.GET("/v3/hlsmuxers/list", a.onHLSMuxersList) group.GET("/v3/hlsmuxers/get/*name", a.onHLSMuxersGet) } - if !interfaceIsEmpty(a.rtspServer) { + if !interfaceIsEmpty(a.RTSPServer) { group.GET("/v3/rtspconns/list", a.onRTSPConnsList) group.GET("/v3/rtspconns/get/:id", a.onRTSPConnsGet) group.GET("/v3/rtspsessions/list", a.onRTSPSessionsList) @@ -215,7 +200,7 @@ func newAPI( group.POST("/v3/rtspsessions/kick/:id", a.onRTSPSessionsKick) } - if !interfaceIsEmpty(a.rtspsServer) { + if !interfaceIsEmpty(a.RTSPSServer) { group.GET("/v3/rtspsconns/list", a.onRTSPSConnsList) group.GET("/v3/rtspsconns/get/:id", a.onRTSPSConnsGet) group.GET("/v3/rtspssessions/list", a.onRTSPSSessionsList) @@ -223,63 +208,64 @@ func newAPI( group.POST("/v3/rtspssessions/kick/:id", a.onRTSPSSessionsKick) } - if !interfaceIsEmpty(a.rtmpServer) { + if !interfaceIsEmpty(a.RTMPServer) { group.GET("/v3/rtmpconns/list", a.onRTMPConnsList) group.GET("/v3/rtmpconns/get/:id", a.onRTMPConnsGet) group.POST("/v3/rtmpconns/kick/:id", a.onRTMPConnsKick) } - if !interfaceIsEmpty(a.rtmpsServer) { + if !interfaceIsEmpty(a.RTMPSServer) { group.GET("/v3/rtmpsconns/list", a.onRTMPSConnsList) group.GET("/v3/rtmpsconns/get/:id", a.onRTMPSConnsGet) group.POST("/v3/rtmpsconns/kick/:id", a.onRTMPSConnsKick) } - if !interfaceIsEmpty(a.webRTCServer) { + if !interfaceIsEmpty(a.WebRTCServer) { group.GET("/v3/webrtcsessions/list", a.onWebRTCSessionsList) group.GET("/v3/webrtcsessions/get/:id", a.onWebRTCSessionsGet) group.POST("/v3/webrtcsessions/kick/:id", a.onWebRTCSessionsKick) } - if !interfaceIsEmpty(a.srtServer) { + if !interfaceIsEmpty(a.SRTServer) { group.GET("/v3/srtconns/list", a.onSRTConnsList) group.GET("/v3/srtconns/get/:id", a.onSRTConnsGet) group.POST("/v3/srtconns/kick/:id", a.onSRTConnsKick) } - network, address := restrictnetwork.Restrict("tcp", address) + network, address := restrictnetwork.Restrict("tcp", a.Address) var err error a.httpServer, err = httpserv.NewWrappedServer( network, address, - time.Duration(readTimeout), + time.Duration(a.ReadTimeout), "", "", router, a, ) if err != nil { - return nil, err + return err } a.Log(logger.Info, "listener opened on "+address) - return a, nil + return nil } -func (a *api) close() { +// Close closes the API. +func (a *API) Close() { a.Log(logger.Info, "listener is closing") a.httpServer.Close() } // Log implements logger.Writer. -func (a *api) Log(level logger.Level, format string, args ...interface{}) { - a.parent.Log(level, "[API] "+format, args...) +func (a *API) Log(level logger.Level, format string, args ...interface{}) { + a.Parent.Log(level, "[API] "+format, args...) } // error coming from something the user inserted into the request. -func (a *api) writeError(ctx *gin.Context, status int, err error) { +func (a *API) writeError(ctx *gin.Context, status int, err error) { // show error in logs a.Log(logger.Error, err.Error()) @@ -289,15 +275,15 @@ func (a *api) writeError(ctx *gin.Context, status int, err error) { }) } -func (a *api) onConfigGlobalGet(ctx *gin.Context) { +func (a *API) onConfigGlobalGet(ctx *gin.Context) { a.mutex.Lock() - c := a.conf + c := a.Conf a.mutex.Unlock() ctx.JSON(http.StatusOK, c.Global()) } -func (a *api) onConfigGlobalPatch(ctx *gin.Context) { +func (a *API) onConfigGlobalPatch(ctx *gin.Context) { var c conf.OptionalGlobal err := json.NewDecoder(ctx.Request.Body).Decode(&c) if err != nil { @@ -308,7 +294,7 @@ func (a *api) onConfigGlobalPatch(ctx *gin.Context) { a.mutex.Lock() defer a.mutex.Unlock() - newConf := a.conf.Clone() + newConf := a.Conf.Clone() newConf.PatchGlobal(&c) @@ -318,24 +304,24 @@ func (a *api) onConfigGlobalPatch(ctx *gin.Context) { return } - a.conf = newConf + a.Conf = newConf // since reloading the configuration can cause the shutdown of the API, // call it in a goroutine - go a.parent.apiConfigSet(newConf) + go a.Parent.APIConfigSet(newConf) ctx.Status(http.StatusOK) } -func (a *api) onConfigPathDefaultsGet(ctx *gin.Context) { +func (a *API) onConfigPathDefaultsGet(ctx *gin.Context) { a.mutex.Lock() - c := a.conf + c := a.Conf a.mutex.Unlock() ctx.JSON(http.StatusOK, c.PathDefaults) } -func (a *api) onConfigPathDefaultsPatch(ctx *gin.Context) { +func (a *API) onConfigPathDefaultsPatch(ctx *gin.Context) { var p conf.OptionalPath err := json.NewDecoder(ctx.Request.Body).Decode(&p) if err != nil { @@ -346,7 +332,7 @@ func (a *api) onConfigPathDefaultsPatch(ctx *gin.Context) { a.mutex.Lock() defer a.mutex.Unlock() - newConf := a.conf.Clone() + newConf := a.Conf.Clone() newConf.PatchPathDefaults(&p) @@ -356,15 +342,15 @@ func (a *api) onConfigPathDefaultsPatch(ctx *gin.Context) { return } - a.conf = newConf - a.parent.apiConfigSet(newConf) + a.Conf = newConf + a.Parent.APIConfigSet(newConf) ctx.Status(http.StatusOK) } -func (a *api) onConfigPathsList(ctx *gin.Context) { +func (a *API) onConfigPathsList(ctx *gin.Context) { a.mutex.Lock() - c := a.conf + c := a.Conf a.mutex.Unlock() data := &defs.APIPathConfList{ @@ -386,7 +372,7 @@ func (a *api) onConfigPathsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onConfigPathsGet(ctx *gin.Context) { +func (a *API) onConfigPathsGet(ctx *gin.Context) { name, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) @@ -394,7 +380,7 @@ func (a *api) onConfigPathsGet(ctx *gin.Context) { } a.mutex.Lock() - c := a.conf + c := a.Conf a.mutex.Unlock() p, ok := c.Paths[name] @@ -406,7 +392,7 @@ func (a *api) onConfigPathsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, p) } -func (a *api) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl +func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl name, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) @@ -423,7 +409,7 @@ func (a *api) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl a.mutex.Lock() defer a.mutex.Unlock() - newConf := a.conf.Clone() + newConf := a.Conf.Clone() err = newConf.AddPath(name, &p) if err != nil { @@ -437,13 +423,13 @@ func (a *api) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl return } - a.conf = newConf - a.parent.apiConfigSet(newConf) + a.Conf = newConf + a.Parent.APIConfigSet(newConf) ctx.Status(http.StatusOK) } -func (a *api) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl +func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl name, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) @@ -460,7 +446,7 @@ func (a *api) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl a.mutex.Lock() defer a.mutex.Unlock() - newConf := a.conf.Clone() + newConf := a.Conf.Clone() err = newConf.PatchPath(name, &p) if err != nil { @@ -474,13 +460,13 @@ func (a *api) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl return } - a.conf = newConf - a.parent.apiConfigSet(newConf) + a.Conf = newConf + a.Parent.APIConfigSet(newConf) ctx.Status(http.StatusOK) } -func (a *api) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl +func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl name, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) @@ -497,7 +483,7 @@ func (a *api) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl a.mutex.Lock() defer a.mutex.Unlock() - newConf := a.conf.Clone() + newConf := a.Conf.Clone() err = newConf.ReplacePath(name, &p) if err != nil { @@ -511,13 +497,13 @@ func (a *api) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl return } - a.conf = newConf - a.parent.apiConfigSet(newConf) + a.Conf = newConf + a.Parent.APIConfigSet(newConf) ctx.Status(http.StatusOK) } -func (a *api) onConfigPathsDelete(ctx *gin.Context) { +func (a *API) onConfigPathsDelete(ctx *gin.Context) { name, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) @@ -527,7 +513,7 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { a.mutex.Lock() defer a.mutex.Unlock() - newConf := a.conf.Clone() + newConf := a.Conf.Clone() err := newConf.RemovePath(name) if err != nil { @@ -541,14 +527,14 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { return } - a.conf = newConf - a.parent.apiConfigSet(newConf) + a.Conf = newConf + a.Parent.APIConfigSet(newConf) ctx.Status(http.StatusOK) } -func (a *api) onPathsList(ctx *gin.Context) { - data, err := a.pathManager.apiPathsList() +func (a *API) onPathsList(ctx *gin.Context) { + data, err := a.PathManager.APIPathsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -565,14 +551,14 @@ func (a *api) onPathsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onPathsGet(ctx *gin.Context) { +func (a *API) onPathsGet(ctx *gin.Context) { name, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return } - data, err := a.pathManager.apiPathsGet(name) + data, err := a.PathManager.APIPathsGet(name) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -581,8 +567,8 @@ func (a *api) onPathsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPConnsList(ctx *gin.Context) { - data, err := a.rtspServer.APIConnsList() +func (a *API) onRTSPConnsList(ctx *gin.Context) { + data, err := a.RTSPServer.APIConnsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -599,14 +585,14 @@ func (a *api) onRTSPConnsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPConnsGet(ctx *gin.Context) { +func (a *API) onRTSPConnsGet(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - data, err := a.rtspServer.APIConnsGet(uuid) + data, err := a.RTSPServer.APIConnsGet(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -615,8 +601,8 @@ func (a *api) onRTSPConnsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPSessionsList(ctx *gin.Context) { - data, err := a.rtspServer.APISessionsList() +func (a *API) onRTSPSessionsList(ctx *gin.Context) { + data, err := a.RTSPServer.APISessionsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -633,14 +619,14 @@ func (a *api) onRTSPSessionsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPSessionsGet(ctx *gin.Context) { +func (a *API) onRTSPSessionsGet(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - data, err := a.rtspServer.APISessionsGet(uuid) + data, err := a.RTSPServer.APISessionsGet(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -649,14 +635,14 @@ func (a *api) onRTSPSessionsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPSessionsKick(ctx *gin.Context) { +func (a *API) onRTSPSessionsKick(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - err = a.rtspServer.APISessionsKick(uuid) + err = a.RTSPServer.APISessionsKick(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -665,8 +651,8 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } -func (a *api) onRTSPSConnsList(ctx *gin.Context) { - data, err := a.rtspsServer.APIConnsList() +func (a *API) onRTSPSConnsList(ctx *gin.Context) { + data, err := a.RTSPSServer.APIConnsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -683,14 +669,14 @@ func (a *api) onRTSPSConnsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPSConnsGet(ctx *gin.Context) { +func (a *API) onRTSPSConnsGet(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - data, err := a.rtspsServer.APIConnsGet(uuid) + data, err := a.RTSPSServer.APIConnsGet(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -699,8 +685,8 @@ func (a *api) onRTSPSConnsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPSSessionsList(ctx *gin.Context) { - data, err := a.rtspsServer.APISessionsList() +func (a *API) onRTSPSSessionsList(ctx *gin.Context) { + data, err := a.RTSPSServer.APISessionsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -717,14 +703,14 @@ func (a *api) onRTSPSSessionsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPSSessionsGet(ctx *gin.Context) { +func (a *API) onRTSPSSessionsGet(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - data, err := a.rtspsServer.APISessionsGet(uuid) + data, err := a.RTSPSServer.APISessionsGet(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -733,14 +719,14 @@ func (a *api) onRTSPSSessionsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { +func (a *API) onRTSPSSessionsKick(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - err = a.rtspsServer.APISessionsKick(uuid) + err = a.RTSPSServer.APISessionsKick(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -749,8 +735,8 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } -func (a *api) onRTMPConnsList(ctx *gin.Context) { - data, err := a.rtmpServer.APIConnsList() +func (a *API) onRTMPConnsList(ctx *gin.Context) { + data, err := a.RTMPServer.APIConnsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -767,14 +753,14 @@ func (a *api) onRTMPConnsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTMPConnsGet(ctx *gin.Context) { +func (a *API) onRTMPConnsGet(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - data, err := a.rtmpServer.APIConnsGet(uuid) + data, err := a.RTMPServer.APIConnsGet(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -783,14 +769,14 @@ func (a *api) onRTMPConnsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTMPConnsKick(ctx *gin.Context) { +func (a *API) onRTMPConnsKick(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - err = a.rtmpServer.APIConnsKick(uuid) + err = a.RTMPServer.APIConnsKick(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -799,8 +785,8 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } -func (a *api) onRTMPSConnsList(ctx *gin.Context) { - data, err := a.rtmpsServer.APIConnsList() +func (a *API) onRTMPSConnsList(ctx *gin.Context) { + data, err := a.RTMPSServer.APIConnsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -817,14 +803,14 @@ func (a *api) onRTMPSConnsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTMPSConnsGet(ctx *gin.Context) { +func (a *API) onRTMPSConnsGet(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - data, err := a.rtmpsServer.APIConnsGet(uuid) + data, err := a.RTMPSServer.APIConnsGet(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -833,14 +819,14 @@ func (a *api) onRTMPSConnsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onRTMPSConnsKick(ctx *gin.Context) { +func (a *API) onRTMPSConnsKick(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - err = a.rtmpsServer.APIConnsKick(uuid) + err = a.RTMPSServer.APIConnsKick(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -849,8 +835,8 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } -func (a *api) onHLSMuxersList(ctx *gin.Context) { - data, err := a.hlsManager.APIMuxersList() +func (a *API) onHLSMuxersList(ctx *gin.Context) { + data, err := a.HLSServer.APIMuxersList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -867,14 +853,14 @@ func (a *api) onHLSMuxersList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onHLSMuxersGet(ctx *gin.Context) { +func (a *API) onHLSMuxersGet(ctx *gin.Context) { name, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return } - data, err := a.hlsManager.APIMuxersGet(name) + data, err := a.HLSServer.APIMuxersGet(name) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -883,8 +869,8 @@ func (a *api) onHLSMuxersGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onWebRTCSessionsList(ctx *gin.Context) { - data, err := a.webRTCServer.APISessionsList() +func (a *API) onWebRTCSessionsList(ctx *gin.Context) { + data, err := a.WebRTCServer.APISessionsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -901,14 +887,14 @@ func (a *api) onWebRTCSessionsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onWebRTCSessionsGet(ctx *gin.Context) { +func (a *API) onWebRTCSessionsGet(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - data, err := a.webRTCServer.APISessionsGet(uuid) + data, err := a.WebRTCServer.APISessionsGet(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -917,14 +903,14 @@ func (a *api) onWebRTCSessionsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onWebRTCSessionsKick(ctx *gin.Context) { +func (a *API) onWebRTCSessionsKick(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - err = a.webRTCServer.APISessionsKick(uuid) + err = a.WebRTCServer.APISessionsKick(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -933,8 +919,8 @@ func (a *api) onWebRTCSessionsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } -func (a *api) onSRTConnsList(ctx *gin.Context) { - data, err := a.srtServer.APIConnsList() +func (a *API) onSRTConnsList(ctx *gin.Context) { + data, err := a.SRTServer.APIConnsList() if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -951,14 +937,14 @@ func (a *api) onSRTConnsList(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onSRTConnsGet(ctx *gin.Context) { +func (a *API) onSRTConnsGet(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - data, err := a.srtServer.APIConnsGet(uuid) + data, err := a.SRTServer.APIConnsGet(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -967,14 +953,14 @@ func (a *api) onSRTConnsGet(ctx *gin.Context) { ctx.JSON(http.StatusOK, data) } -func (a *api) onSRTConnsKick(ctx *gin.Context) { +func (a *API) onSRTConnsKick(ctx *gin.Context) { uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return } - err = a.srtServer.APIConnsKick(uuid) + err = a.SRTServer.APIConnsKick(uuid) if err != nil { a.writeError(ctx, http.StatusInternalServerError, err) return @@ -983,9 +969,9 @@ func (a *api) onSRTConnsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } -// confReload is called by core. -func (a *api) confReload(conf *conf.Conf) { +// ReloadConf is called by core. +func (a *API) ReloadConf(conf *conf.Conf) { a.mutex.Lock() defer a.mutex.Unlock() - a.conf = conf + a.Conf = conf } diff --git a/internal/api/api_test.go b/internal/api/api_test.go new file mode 100644 index 00000000..ddc656f3 --- /dev/null +++ b/internal/api/api_test.go @@ -0,0 +1,39 @@ +package api + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPaginate(t *testing.T) { + items := make([]int, 5) + for i := 0; i < 5; i++ { + items[i] = i + } + + pageCount, err := paginate(&items, "1", "1") + require.NoError(t, err) + require.Equal(t, 5, pageCount) + require.Equal(t, []int{1}, items) + + items = make([]int, 5) + for i := 0; i < 5; i++ { + items[i] = i + } + + pageCount, err = paginate(&items, "3", "2") + require.NoError(t, err) + require.Equal(t, 2, pageCount) + require.Equal(t, []int{}, items) + + items = make([]int, 6) + for i := 0; i < 6; i++ { + items[i] = i + } + + pageCount, err = paginate(&items, "4", "1") + require.NoError(t, err) + require.Equal(t, 2, pageCount) + require.Equal(t, []int{4, 5}, items) +} diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 7964c8ca..635f10ab 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -20,7 +20,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" - "github.com/datarhei/gosrt" + srt "github.com/datarhei/gosrt" "github.com/google/uuid" "github.com/pion/rtp" "github.com/stretchr/testify/require" @@ -102,38 +102,6 @@ func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in int require.NoError(t, err) } -func TestPagination(t *testing.T) { - items := make([]int, 5) - for i := 0; i < 5; i++ { - items[i] = i - } - - pageCount, err := paginate(&items, "1", "1") - require.NoError(t, err) - require.Equal(t, 5, pageCount) - require.Equal(t, []int{1}, items) - - items = make([]int, 5) - for i := 0; i < 5; i++ { - items[i] = i - } - - pageCount, err = paginate(&items, "3", "2") - require.NoError(t, err) - require.Equal(t, 2, pageCount) - require.Equal(t, []int{}, items) - - items = make([]int, 6) - for i := 0; i < 6; i++ { - items[i] = i - } - - pageCount, err = paginate(&items, "4", "1") - require.NoError(t, err) - require.Equal(t, 2, pageCount) - require.Equal(t, []int{4, 5}, items) -} - func TestAPIConfigGlobalGet(t *testing.T) { p, ok := newInstance("api: yes\n") require.Equal(t, true, ok) diff --git a/internal/core/core.go b/internal/core/core.go index e7e88820..688bdadf 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -16,10 +16,13 @@ import ( "github.com/bluenviron/gortsplib/v4" "github.com/gin-gonic/gin" + "github.com/bluenviron/mediamtx/internal/api" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/confwatcher" "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/metrics" + "github.com/bluenviron/mediamtx/internal/pprof" "github.com/bluenviron/mediamtx/internal/record" "github.com/bluenviron/mediamtx/internal/rlimit" "github.com/bluenviron/mediamtx/internal/servers/hls" @@ -84,8 +87,8 @@ type Core struct { conf *conf.Conf logger *logger.Logger externalCmdPool *externalcmd.Pool - metrics *metrics - pprof *pprof + metrics *metrics.Metrics + pprof *pprof.PPROF recordCleaner *record.Cleaner pathManager *pathManager rtspServer *rtsp.Server @@ -95,7 +98,7 @@ type Core struct { hlsServer *hls.Server webRTCServer *webrtc.Server srtServer *srt.Server - api *api + api *api.API confWatcher *confwatcher.ConfWatcher // in @@ -275,12 +278,12 @@ func (p *Core) createResources(initial bool) error { if p.conf.Metrics && p.metrics == nil { - p.metrics = &metrics{ + p.metrics = &metrics.Metrics{ Address: p.conf.MetricsAddress, ReadTimeout: p.conf.ReadTimeout, Parent: p, } - err = p.metrics.initialize() + err := p.metrics.Initialize() if err != nil { return err } @@ -288,11 +291,12 @@ func (p *Core) createResources(initial bool) error { if p.conf.PPROF && p.pprof == nil { - p.pprof, err = newPPROF( - p.conf.PPROFAddress, - p.conf.ReadTimeout, - p, - ) + p.pprof = &pprof.PPROF{ + Address: p.conf.PPROFAddress, + ReadTimeout: p.conf.ReadTimeout, + Parent: p, + } + err := p.pprof.Initialize() if err != nil { return err } @@ -324,7 +328,7 @@ func (p *Core) createResources(initial bool) error { ) if p.metrics != nil { - p.metrics.setPathManager(p.pathManager) + p.metrics.SetPathManager(p.pathManager) } } @@ -366,7 +370,7 @@ func (p *Core) createResources(initial bool) error { } if p.metrics != nil { - p.metrics.setRTSPServer(p.rtspServer) + p.metrics.SetRTSPServer(p.rtspServer) } } @@ -405,7 +409,7 @@ func (p *Core) createResources(initial bool) error { } if p.metrics != nil { - p.metrics.setRTSPSServer(p.rtspsServer) + p.metrics.SetRTSPSServer(p.rtspsServer) } } @@ -435,7 +439,7 @@ func (p *Core) createResources(initial bool) error { } if p.metrics != nil { - p.metrics.setRTMPServer(p.rtmpServer) + p.metrics.SetRTMPServer(p.rtmpServer) } } @@ -465,7 +469,7 @@ func (p *Core) createResources(initial bool) error { } if p.metrics != nil { - p.metrics.setRTMPSServer(p.rtmpsServer) + p.metrics.SetRTMPSServer(p.rtmpsServer) } } @@ -499,7 +503,7 @@ func (p *Core) createResources(initial bool) error { p.pathManager.setHLSServer(p.hlsServer) if p.metrics != nil { - p.metrics.setHLSServer(p.hlsServer) + p.metrics.SetHLSServer(p.hlsServer) } } @@ -524,14 +528,14 @@ func (p *Core) createResources(initial bool) error { PathManager: p.pathManager, Parent: p, } - err = p.webRTCServer.Initialize() + err := p.webRTCServer.Initialize() if err != nil { p.webRTCServer = nil return err } if p.metrics != nil { - p.metrics.setWebRTCServer(p.webRTCServer) + p.metrics.SetWebRTCServer(p.webRTCServer) } } @@ -551,32 +555,33 @@ func (p *Core) createResources(initial bool) error { PathManager: p.pathManager, Parent: p, } - err = p.srtServer.Initialize() + err := p.srtServer.Initialize() if err != nil { return err } if p.metrics != nil { - p.metrics.setSRTServer(p.srtServer) + p.metrics.SetSRTServer(p.srtServer) } } if p.conf.API && p.api == nil { - p.api, err = newAPI( - p.conf.APIAddress, - p.conf.ReadTimeout, - p.conf, - p.pathManager, - p.rtspServer, - p.rtspsServer, - p.rtmpServer, - p.rtmpsServer, - p.hlsServer, - p.webRTCServer, - p.srtServer, - p, - ) + p.api = &api.API{ + Address: p.conf.APIAddress, + ReadTimeout: p.conf.ReadTimeout, + Conf: p.conf, + PathManager: p.pathManager, + RTSPServer: p.rtspServer, + RTSPSServer: p.rtspsServer, + RTMPServer: p.rtmpServer, + RTMPSServer: p.rtmpsServer, + HLSServer: p.hlsServer, + WebRTCServer: p.webRTCServer, + SRTServer: p.srtServer, + Parent: p, + } + err := p.api.Initialize() if err != nil { return err } @@ -626,7 +631,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { closeMetrics || closeLogger if !closePathManager && !reflect.DeepEqual(newConf.Paths, p.conf.Paths) { - p.pathManager.confReload(newConf.Paths) + p.pathManager.ReloadConf(newConf.Paths) } closeRTSPServer := newConf == nil || @@ -779,16 +784,16 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { if p.api != nil { if closeAPI { - p.api.close() + p.api.Close() p.api = nil } else if !calledByAPI { // avoid a loop - p.api.confReload(newConf) + p.api.ReloadConf(newConf) } } if closeSRTServer && p.srtServer != nil { if p.metrics != nil { - p.metrics.setSRTServer(nil) + p.metrics.SetSRTServer(nil) } p.srtServer.Close() @@ -797,7 +802,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { if closeWebRTCServer && p.webRTCServer != nil { if p.metrics != nil { - p.metrics.setWebRTCServer(nil) + p.metrics.SetWebRTCServer(nil) } p.webRTCServer.Close() @@ -806,7 +811,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { if closeHLSServer && p.hlsServer != nil { if p.metrics != nil { - p.metrics.setHLSServer(nil) + p.metrics.SetHLSServer(nil) } p.pathManager.setHLSServer(nil) @@ -817,7 +822,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { if closeRTMPSServer && p.rtmpsServer != nil { if p.metrics != nil { - p.metrics.setRTMPSServer(nil) + p.metrics.SetRTMPSServer(nil) } p.rtmpsServer.Close() @@ -826,7 +831,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { if closeRTMPServer && p.rtmpServer != nil { if p.metrics != nil { - p.metrics.setRTMPServer(nil) + p.metrics.SetRTMPServer(nil) } p.rtmpServer.Close() @@ -835,7 +840,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { if closeRTSPSServer && p.rtspsServer != nil { if p.metrics != nil { - p.metrics.setRTSPSServer(nil) + p.metrics.SetRTSPSServer(nil) } p.rtspsServer.Close() @@ -844,7 +849,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { if closeRTSPServer && p.rtspServer != nil { if p.metrics != nil { - p.metrics.setRTSPServer(nil) + p.metrics.SetRTSPServer(nil) } p.rtspServer.Close() @@ -853,7 +858,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { if closePathManager && p.pathManager != nil { if p.metrics != nil { - p.metrics.setPathManager(nil) + p.metrics.SetPathManager(nil) } p.pathManager.close() @@ -866,12 +871,12 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { } if closePPROF && p.pprof != nil { - p.pprof.close() + p.pprof.Close() p.pprof = nil } if closeMetrics && p.metrics != nil { - p.metrics.close() + p.metrics.Close() p.metrics = nil } @@ -892,8 +897,8 @@ func (p *Core) reloadConf(newConf *conf.Conf, calledByAPI bool) error { return p.createResources(false) } -// apiConfigSet is called by api. -func (p *Core) apiConfigSet(conf *conf.Conf) { +// APIConfigSet is called by api. +func (p *Core) APIConfigSet(conf *conf.Conf) { select { case p.chAPIConfigSet <- conf: case <-p.ctx.Done(): diff --git a/internal/core/path.go b/internal/core/path.go index e43a9e3e..64eeca0f 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -1019,8 +1019,8 @@ func (pa *path) RemoveReader(req defs.PathRemoveReaderReq) { } } -// apiPathsGet is called by api. -func (pa *path) apiPathsGet(req pathAPIPathsGetReq) (*defs.APIPath, error) { +// APIPathsGet is called by api. +func (pa *path) APIPathsGet(req pathAPIPathsGetReq) (*defs.APIPath, error) { req.res = make(chan pathAPIPathsGetRes) select { case pa.chAPIPathsGet <- req: diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 69d69f00..f08ecfcd 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -433,8 +433,8 @@ func (pm *pathManager) removePath(pa *path) { delete(pm.paths, pa.name) } -// confReload is called by core. -func (pm *pathManager) confReload(pathConfs map[string]*conf.Path) { +// ReloadConf is called by core. +func (pm *pathManager) ReloadConf(pathConfs map[string]*conf.Path) { select { case pm.chReloadConf <- pathConfs: case <-pm.ctx.Done(): @@ -545,8 +545,8 @@ func (pm *pathManager) setHLSServer(s pathManagerHLSServer) { } } -// apiPathsList is called by api. -func (pm *pathManager) apiPathsList() (*defs.APIPathList, error) { +// APIPathsList is called by api. +func (pm *pathManager) APIPathsList() (*defs.APIPathList, error) { req := pathAPIPathsListReq{ res: make(chan pathAPIPathsListRes), } @@ -560,7 +560,7 @@ func (pm *pathManager) apiPathsList() (*defs.APIPathList, error) { } for _, pa := range res.paths { - item, err := pa.apiPathsGet(pathAPIPathsGetReq{}) + item, err := pa.APIPathsGet(pathAPIPathsGetReq{}) if err == nil { res.data.Items = append(res.data.Items, item) } @@ -577,8 +577,8 @@ func (pm *pathManager) apiPathsList() (*defs.APIPathList, error) { } } -// apiPathsGet is called by api. -func (pm *pathManager) apiPathsGet(name string) (*defs.APIPath, error) { +// APIPathsGet is called by api. +func (pm *pathManager) APIPathsGet(name string) (*defs.APIPath, error) { req := pathAPIPathsGetReq{ name: name, res: make(chan pathAPIPathsGetRes), @@ -591,7 +591,7 @@ func (pm *pathManager) apiPathsGet(name string) (*defs.APIPath, error) { return nil, res.err } - data, err := res.path.apiPathsGet(req) + data, err := res.path.APIPathsGet(req) return data, err case <-pm.ctx.Done(): diff --git a/internal/core/path_manager_test.go b/internal/core/path_manager_test.go index 79c33380..85d14c8d 100644 --- a/internal/core/path_manager_test.go +++ b/internal/core/path_manager_test.go @@ -78,7 +78,7 @@ func TestPathAutoDeletion(t *testing.T) { } }() - data, err := p.pathManager.apiPathsList() + data, err := p.pathManager.APIPathsList() require.NoError(t, err) require.Equal(t, 0, len(data.Items)) diff --git a/internal/core/metrics.go b/internal/metrics/metrics.go similarity index 82% rename from internal/core/metrics.go rename to internal/metrics/metrics.go index 9068ecb8..b495d612 100644 --- a/internal/core/metrics.go +++ b/internal/metrics/metrics.go @@ -1,20 +1,27 @@ -package core +// Package metrics contains the metrics provider. +package metrics import ( "io" "net/http" + "reflect" "strconv" "sync" "time" "github.com/gin-gonic/gin" + "github.com/bluenviron/mediamtx/internal/api" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/httpserv" "github.com/bluenviron/mediamtx/internal/restrictnetwork" ) +func interfaceIsEmpty(i interface{}) bool { + return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil() +} + func metric(key string, tags string, value int64) string { return key + tags + " " + strconv.FormatInt(value, 10) + "\n" } @@ -23,24 +30,26 @@ type metricsParent interface { logger.Writer } -type metrics struct { +// Metrics is a metrics provider. +type Metrics struct { Address string ReadTimeout conf.StringDuration Parent metricsParent httpServer *httpserv.WrappedServer mutex sync.Mutex - pathManager apiPathManager - rtspServer apiRTSPServer - rtspsServer apiRTSPServer - rtmpServer apiRTMPServer - rtmpsServer apiRTMPServer - srtServer apiSRTServer - hlsManager apiHLSServer - webRTCServer apiWebRTCServer + pathManager api.PathManager + rtspServer api.RTSPServer + rtspsServer api.RTSPServer + rtmpServer api.RTMPServer + rtmpsServer api.RTMPServer + srtServer api.SRTServer + hlsManager api.HLSServer + webRTCServer api.WebRTCServer } -func (m *metrics) initialize() error { +// Initialize initializes metrics. +func (m *Metrics) Initialize() error { router := gin.New() router.SetTrustedProxies(nil) //nolint:errcheck @@ -67,20 +76,21 @@ func (m *metrics) initialize() error { return nil } -func (m *metrics) close() { +// Close closes Metrics. +func (m *Metrics) Close() { m.Log(logger.Info, "listener is closing") m.httpServer.Close() } // Log implements logger.Writer. -func (m *metrics) Log(level logger.Level, format string, args ...interface{}) { +func (m *Metrics) Log(level logger.Level, format string, args ...interface{}) { m.Parent.Log(level, "[metrics] "+format, args...) } -func (m *metrics) onMetrics(ctx *gin.Context) { +func (m *Metrics) onMetrics(ctx *gin.Context) { out := "" - data, err := m.pathManager.apiPathsList() + data, err := m.pathManager.APIPathsList() if err == nil && len(data.Items) != 0 { for _, i := range data.Items { var state string @@ -249,57 +259,57 @@ func (m *metrics) onMetrics(ctx *gin.Context) { io.WriteString(ctx.Writer, out) //nolint:errcheck } -// setPathManager is called by core. -func (m *metrics) setPathManager(s apiPathManager) { +// SetPathManager is called by core. +func (m *Metrics) SetPathManager(s api.PathManager) { m.mutex.Lock() defer m.mutex.Unlock() m.pathManager = s } -// setHLSServer is called by core. -func (m *metrics) setHLSServer(s apiHLSServer) { +// SetHLSServer is called by core. +func (m *Metrics) SetHLSServer(s api.HLSServer) { m.mutex.Lock() defer m.mutex.Unlock() m.hlsManager = s } -// setRTSPServer is called by core. -func (m *metrics) setRTSPServer(s apiRTSPServer) { +// SetRTSPServer is called by core. +func (m *Metrics) SetRTSPServer(s api.RTSPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtspServer = s } -// setRTSPSServer is called by core. -func (m *metrics) setRTSPSServer(s apiRTSPServer) { +// SetRTSPSServer is called by core. +func (m *Metrics) SetRTSPSServer(s api.RTSPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtspsServer = s } -// setRTMPServer is called by core. -func (m *metrics) setRTMPServer(s apiRTMPServer) { +// SetRTMPServer is called by core. +func (m *Metrics) SetRTMPServer(s api.RTMPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtmpServer = s } -// setRTMPSServer is called by core. -func (m *metrics) setRTMPSServer(s apiRTMPServer) { +// SetRTMPSServer is called by core. +func (m *Metrics) SetRTMPSServer(s api.RTMPServer) { m.mutex.Lock() defer m.mutex.Unlock() m.rtmpsServer = s } -// setSRTServer is called by core. -func (m *metrics) setSRTServer(s apiSRTServer) { +// SetSRTServer is called by core. +func (m *Metrics) SetSRTServer(s api.SRTServer) { m.mutex.Lock() defer m.mutex.Unlock() m.srtServer = s } -// setWebRTCServer is called by core. -func (m *metrics) setWebRTCServer(s apiWebRTCServer) { +// SetWebRTCServer is called by core. +func (m *Metrics) SetWebRTCServer(s api.WebRTCServer) { m.mutex.Lock() defer m.mutex.Unlock() m.webRTCServer = s diff --git a/internal/core/pprof.go b/internal/pprof/pprof.go similarity index 56% rename from internal/core/pprof.go rename to internal/pprof/pprof.go index b92396c0..63803fc9 100644 --- a/internal/core/pprof.go +++ b/internal/pprof/pprof.go @@ -1,4 +1,5 @@ -package core +// Package pprof contains a pprof exporter. +package pprof import ( "net/http" @@ -17,48 +18,45 @@ type pprofParent interface { logger.Writer } -type pprof struct { - parent pprofParent +// PPROF is a pprof exporter. +type PPROF struct { + Address string + ReadTimeout conf.StringDuration + Parent pprofParent httpServer *httpserv.WrappedServer } -func newPPROF( - address string, - readTimeout conf.StringDuration, - parent pprofParent, -) (*pprof, error) { - pp := &pprof{ - parent: parent, - } - - network, address := restrictnetwork.Restrict("tcp", address) +// Initialize initializes PPROF. +func (pp *PPROF) Initialize() error { + network, address := restrictnetwork.Restrict("tcp", pp.Address) var err error pp.httpServer, err = httpserv.NewWrappedServer( network, address, - time.Duration(readTimeout), + time.Duration(pp.ReadTimeout), "", "", http.DefaultServeMux, pp, ) if err != nil { - return nil, err + return err } pp.Log(logger.Info, "listener opened on "+address) - return pp, nil + return nil } -func (pp *pprof) close() { +// Close closes PPROF. +func (pp *PPROF) Close() { pp.Log(logger.Info, "listener is closing") pp.httpServer.Close() } // Log implements logger.Writer. -func (pp *pprof) Log(level logger.Level, format string, args ...interface{}) { - pp.parent.Log(level, "[pprof] "+format, args...) +func (pp *PPROF) Log(level logger.Level, format string, args ...interface{}) { + pp.Parent.Log(level, "[pprof] "+format, args...) }