mediamtx/internal/api/api.go

1169 lines
27 KiB
Go

// Package api contains the API server.
package api
import (
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"os"
"reflect"
"sort"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/auth"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpp"
"github.com/bluenviron/mediamtx/internal/recordstore"
"github.com/bluenviron/mediamtx/internal/restrictnetwork"
"github.com/bluenviron/mediamtx/internal/servers/hls"
"github.com/bluenviron/mediamtx/internal/servers/rtmp"
"github.com/bluenviron/mediamtx/internal/servers/rtsp"
"github.com/bluenviron/mediamtx/internal/servers/srt"
"github.com/bluenviron/mediamtx/internal/servers/webrtc"
)
func interfaceIsEmpty(i interface{}) bool {
return reflect.ValueOf(i).Kind() != reflect.Ptr || reflect.ValueOf(i).IsNil()
}
func sortedKeys(paths map[string]*conf.Path) []string {
ret := make([]string, len(paths))
i := 0
for name := range paths {
ret[i] = name
i++
}
sort.Strings(ret)
return ret
}
func paramName(ctx *gin.Context) (string, bool) {
name := ctx.Param("name")
if len(name) < 2 || name[0] != '/' {
return "", false
}
return name[1:], true
}
func recordingsOfPath(
pathConf *conf.Path,
pathName string,
) *defs.APIRecording {
ret := &defs.APIRecording{
Name: pathName,
}
segments, _ := recordstore.FindSegments(pathConf, pathName)
ret.Segments = make([]*defs.APIRecordingSegment, len(segments))
for i, seg := range segments {
ret.Segments[i] = &defs.APIRecordingSegment{
Start: seg.Start,
}
}
return ret
}
// PathManager contains methods used by the API and Metrics server.
type PathManager interface {
APIPathsList() (*defs.APIPathList, error)
APIPathsGet(string) (*defs.APIPath, error)
}
// HLSServer contains methods used by the API and Metrics server.
type HLSServer interface {
APIMuxersList() (*defs.APIHLSMuxerList, error)
APIMuxersGet(string) (*defs.APIHLSMuxer, error)
}
// RTSPServer contains methods used by the API and Metrics server.
type RTSPServer interface {
APIConnsList() (*defs.APIRTSPConnsList, error)
APIConnsGet(uuid.UUID) (*defs.APIRTSPConn, error)
APISessionsList() (*defs.APIRTSPSessionList, error)
APISessionsGet(uuid.UUID) (*defs.APIRTSPSession, error)
APISessionsKick(uuid.UUID) error
}
// RTMPServer contains methods used by the API and Metrics server.
type RTMPServer interface {
APIConnsList() (*defs.APIRTMPConnList, error)
APIConnsGet(uuid.UUID) (*defs.APIRTMPConn, error)
APIConnsKick(uuid.UUID) error
}
// SRTServer contains methods used by the API and Metrics server.
type SRTServer interface {
APIConnsList() (*defs.APISRTConnList, error)
APIConnsGet(uuid.UUID) (*defs.APISRTConn, error)
APIConnsKick(uuid.UUID) error
}
// WebRTCServer contains methods used by the API and Metrics server.
type WebRTCServer interface {
APISessionsList() (*defs.APIWebRTCSessionList, error)
APISessionsGet(uuid.UUID) (*defs.APIWebRTCSession, error)
APISessionsKick(uuid.UUID) error
}
type apiAuthManager interface {
Authenticate(req *auth.Request) error
}
type apiParent interface {
logger.Writer
APIConfigSet(conf *conf.Conf)
}
// API is an API server.
type API struct {
Address string
Encryption bool
ServerKey string
ServerCert string
AllowOrigin string
TrustedProxies conf.IPNetworks
ReadTimeout conf.StringDuration
Conf *conf.Conf
AuthManager apiAuthManager
PathManager PathManager
RTSPServer RTSPServer
RTSPSServer RTSPServer
RTMPServer RTMPServer
RTMPSServer RTMPServer
HLSServer HLSServer
WebRTCServer WebRTCServer
SRTServer SRTServer
Parent apiParent
httpServer *httpp.Server
mutex sync.RWMutex
}
// Initialize initializes API.
func (a *API) Initialize() error {
router := gin.New()
router.SetTrustedProxies(a.TrustedProxies.ToTrustedProxies()) //nolint:errcheck
router.Use(a.middlewareOrigin)
router.Use(a.middlewareAuth)
group := router.Group("/v3")
group.GET("/config/global/get", a.onConfigGlobalGet)
group.PATCH("/config/global/patch", a.onConfigGlobalPatch)
group.GET("/config/pathdefaults/get", a.onConfigPathDefaultsGet)
group.PATCH("/config/pathdefaults/patch", a.onConfigPathDefaultsPatch)
group.GET("/config/paths/list", a.onConfigPathsList)
group.GET("/config/paths/get/*name", a.onConfigPathsGet)
group.POST("/config/paths/add/*name", a.onConfigPathsAdd)
group.PATCH("/config/paths/patch/*name", a.onConfigPathsPatch)
group.POST("/config/paths/replace/*name", a.onConfigPathsReplace)
group.DELETE("/config/paths/delete/*name", a.onConfigPathsDelete)
group.GET("/paths/list", a.onPathsList)
group.GET("/paths/get/*name", a.onPathsGet)
if !interfaceIsEmpty(a.HLSServer) {
group.GET("/hlsmuxers/list", a.onHLSMuxersList)
group.GET("/hlsmuxers/get/*name", a.onHLSMuxersGet)
}
if !interfaceIsEmpty(a.RTSPServer) {
group.GET("/rtspconns/list", a.onRTSPConnsList)
group.GET("/rtspconns/get/:id", a.onRTSPConnsGet)
group.GET("/rtspsessions/list", a.onRTSPSessionsList)
group.GET("/rtspsessions/get/:id", a.onRTSPSessionsGet)
group.POST("/rtspsessions/kick/:id", a.onRTSPSessionsKick)
}
if !interfaceIsEmpty(a.RTSPSServer) {
group.GET("/rtspsconns/list", a.onRTSPSConnsList)
group.GET("/rtspsconns/get/:id", a.onRTSPSConnsGet)
group.GET("/rtspssessions/list", a.onRTSPSSessionsList)
group.GET("/rtspssessions/get/:id", a.onRTSPSSessionsGet)
group.POST("/rtspssessions/kick/:id", a.onRTSPSSessionsKick)
}
if !interfaceIsEmpty(a.RTMPServer) {
group.GET("/rtmpconns/list", a.onRTMPConnsList)
group.GET("/rtmpconns/get/:id", a.onRTMPConnsGet)
group.POST("/rtmpconns/kick/:id", a.onRTMPConnsKick)
}
if !interfaceIsEmpty(a.RTMPSServer) {
group.GET("/rtmpsconns/list", a.onRTMPSConnsList)
group.GET("/rtmpsconns/get/:id", a.onRTMPSConnsGet)
group.POST("/rtmpsconns/kick/:id", a.onRTMPSConnsKick)
}
if !interfaceIsEmpty(a.WebRTCServer) {
group.GET("/webrtcsessions/list", a.onWebRTCSessionsList)
group.GET("/webrtcsessions/get/:id", a.onWebRTCSessionsGet)
group.POST("/webrtcsessions/kick/:id", a.onWebRTCSessionsKick)
}
if !interfaceIsEmpty(a.SRTServer) {
group.GET("/srtconns/list", a.onSRTConnsList)
group.GET("/srtconns/get/:id", a.onSRTConnsGet)
group.POST("/srtconns/kick/:id", a.onSRTConnsKick)
}
group.GET("/recordings/list", a.onRecordingsList)
group.GET("/recordings/get/*name", a.onRecordingsGet)
group.DELETE("/recordings/deletesegment", a.onRecordingDeleteSegment)
network, address := restrictnetwork.Restrict("tcp", a.Address)
a.httpServer = &httpp.Server{
Network: network,
Address: address,
ReadTimeout: time.Duration(a.ReadTimeout),
Encryption: a.Encryption,
ServerCert: a.ServerCert,
ServerKey: a.ServerKey,
Handler: router,
Parent: a,
}
err := a.httpServer.Initialize()
if err != nil {
return err
}
a.Log(logger.Info, "listener opened on "+address)
return nil
}
// Close closes the API.
func (a *API) Close() {
a.Log(logger.Info, "listener is closing")
a.httpServer.Close()
}
// Log implements logger.Writer.
func (a *API) Log(level logger.Level, format string, args ...interface{}) {
a.Parent.Log(level, "[API] "+format, args...)
}
func (a *API) writeError(ctx *gin.Context, status int, err error) {
// show error in logs
a.Log(logger.Error, err.Error())
// add error to response
ctx.JSON(status, &defs.APIError{
Error: err.Error(),
})
}
func (a *API) middlewareOrigin(ctx *gin.Context) {
ctx.Header("Access-Control-Allow-Origin", a.AllowOrigin)
ctx.Header("Access-Control-Allow-Credentials", "true")
// preflight requests
if ctx.Request.Method == http.MethodOptions &&
ctx.Request.Header.Get("Access-Control-Request-Method") != "" {
ctx.Header("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH, DELETE")
ctx.Header("Access-Control-Allow-Headers", "Authorization, Content-Type")
ctx.AbortWithStatus(http.StatusNoContent)
return
}
}
func (a *API) middlewareAuth(ctx *gin.Context) {
err := a.AuthManager.Authenticate(&auth.Request{
IP: net.ParseIP(ctx.ClientIP()),
Action: conf.AuthActionAPI,
HTTPRequest: ctx.Request,
})
if err != nil {
if err.(*auth.Error).AskCredentials { //nolint:errorlint
ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`)
ctx.AbortWithStatus(http.StatusUnauthorized)
return
}
// wait some seconds to mitigate brute force attacks
<-time.After(auth.PauseAfterError)
ctx.AbortWithStatus(http.StatusUnauthorized)
return
}
}
func (a *API) onConfigGlobalGet(ctx *gin.Context) {
a.mutex.RLock()
c := a.Conf
a.mutex.RUnlock()
ctx.JSON(http.StatusOK, c.Global())
}
func (a *API) onConfigGlobalPatch(ctx *gin.Context) {
var c conf.OptionalGlobal
err := json.NewDecoder(ctx.Request.Body).Decode(&c)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
newConf := a.Conf.Clone()
newConf.PatchGlobal(&c)
err = newConf.Validate()
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.Conf = newConf
// since reloading the configuration can cause the shutdown of the API,
// call it in a goroutine
go a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK)
}
func (a *API) onConfigPathDefaultsGet(ctx *gin.Context) {
a.mutex.RLock()
c := a.Conf
a.mutex.RUnlock()
ctx.JSON(http.StatusOK, c.PathDefaults)
}
func (a *API) onConfigPathDefaultsPatch(ctx *gin.Context) {
var p conf.OptionalPath
err := json.NewDecoder(ctx.Request.Body).Decode(&p)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
newConf := a.Conf.Clone()
newConf.PatchPathDefaults(&p)
err = newConf.Validate()
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.Conf = newConf
a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK)
}
func (a *API) onConfigPathsList(ctx *gin.Context) {
a.mutex.RLock()
c := a.Conf
a.mutex.RUnlock()
data := &defs.APIPathConfList{
Items: make([]*conf.Path, len(c.Paths)),
}
for i, key := range sortedKeys(c.Paths) {
data.Items[i] = c.Paths[key]
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onConfigPathsGet(ctx *gin.Context) {
confName, ok := paramName(ctx)
if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return
}
a.mutex.RLock()
c := a.Conf
a.mutex.RUnlock()
p, ok := c.Paths[confName]
if !ok {
a.writeError(ctx, http.StatusNotFound, fmt.Errorf("path configuration not found"))
return
}
ctx.JSON(http.StatusOK, p)
}
func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl
confName, ok := paramName(ctx)
if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return
}
var p conf.OptionalPath
err := json.NewDecoder(ctx.Request.Body).Decode(&p)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
newConf := a.Conf.Clone()
err = newConf.AddPath(confName, &p)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
err = newConf.Validate()
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.Conf = newConf
a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK)
}
func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl
confName, ok := paramName(ctx)
if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return
}
var p conf.OptionalPath
err := json.NewDecoder(ctx.Request.Body).Decode(&p)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
newConf := a.Conf.Clone()
err = newConf.PatchPath(confName, &p)
if err != nil {
if errors.Is(err, conf.ErrPathNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusBadRequest, err)
}
return
}
err = newConf.Validate()
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.Conf = newConf
a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK)
}
func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl
confName, ok := paramName(ctx)
if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return
}
var p conf.OptionalPath
err := json.NewDecoder(ctx.Request.Body).Decode(&p)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
newConf := a.Conf.Clone()
err = newConf.ReplacePath(confName, &p)
if err != nil {
if errors.Is(err, conf.ErrPathNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusBadRequest, err)
}
return
}
err = newConf.Validate()
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.Conf = newConf
a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK)
}
func (a *API) onConfigPathsDelete(ctx *gin.Context) {
confName, ok := paramName(ctx)
if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
newConf := a.Conf.Clone()
err := newConf.RemovePath(confName)
if err != nil {
if errors.Is(err, conf.ErrPathNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusBadRequest, err)
}
return
}
err = newConf.Validate()
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
a.Conf = newConf
a.Parent.APIConfigSet(newConf)
ctx.Status(http.StatusOK)
}
func (a *API) onPathsList(ctx *gin.Context) {
data, err := a.PathManager.APIPathsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onPathsGet(ctx *gin.Context) {
pathName, ok := paramName(ctx)
if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return
}
data, err := a.PathManager.APIPathsGet(pathName)
if err != nil {
if errors.Is(err, conf.ErrPathNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPConnsList(ctx *gin.Context) {
data, err := a.RTSPServer.APIConnsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data, err := a.RTSPServer.APIConnsGet(uuid)
if err != nil {
if errors.Is(err, rtsp.ErrConnNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPSessionsList(ctx *gin.Context) {
data, err := a.RTSPServer.APISessionsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data, err := a.RTSPServer.APISessionsGet(uuid)
if err != nil {
if errors.Is(err, rtsp.ErrSessionNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPSessionsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
err = a.RTSPServer.APISessionsKick(uuid)
if err != nil {
if errors.Is(err, rtsp.ErrSessionNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.Status(http.StatusOK)
}
func (a *API) onRTSPSConnsList(ctx *gin.Context) {
data, err := a.RTSPSServer.APIConnsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPSConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data, err := a.RTSPSServer.APIConnsGet(uuid)
if err != nil {
if errors.Is(err, rtsp.ErrConnNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPSSessionsList(ctx *gin.Context) {
data, err := a.RTSPSServer.APISessionsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPSSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data, err := a.RTSPSServer.APISessionsGet(uuid)
if err != nil {
if errors.Is(err, rtsp.ErrSessionNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTSPSSessionsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
err = a.RTSPSServer.APISessionsKick(uuid)
if err != nil {
if errors.Is(err, rtsp.ErrSessionNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.Status(http.StatusOK)
}
func (a *API) onRTMPConnsList(ctx *gin.Context) {
data, err := a.RTMPServer.APIConnsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTMPConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data, err := a.RTMPServer.APIConnsGet(uuid)
if err != nil {
if errors.Is(err, rtmp.ErrConnNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTMPConnsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
err = a.RTMPServer.APIConnsKick(uuid)
if err != nil {
if errors.Is(err, rtmp.ErrConnNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.Status(http.StatusOK)
}
func (a *API) onRTMPSConnsList(ctx *gin.Context) {
data, err := a.RTMPSServer.APIConnsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTMPSConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data, err := a.RTMPSServer.APIConnsGet(uuid)
if err != nil {
if errors.Is(err, rtmp.ErrConnNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRTMPSConnsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
err = a.RTMPSServer.APIConnsKick(uuid)
if err != nil {
if errors.Is(err, rtmp.ErrConnNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.Status(http.StatusOK)
}
func (a *API) onHLSMuxersList(ctx *gin.Context) {
data, err := a.HLSServer.APIMuxersList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onHLSMuxersGet(ctx *gin.Context) {
pathName, ok := paramName(ctx)
if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return
}
data, err := a.HLSServer.APIMuxersGet(pathName)
if err != nil {
if errors.Is(err, hls.ErrMuxerNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onWebRTCSessionsList(ctx *gin.Context) {
data, err := a.WebRTCServer.APISessionsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onWebRTCSessionsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data, err := a.WebRTCServer.APISessionsGet(uuid)
if err != nil {
if errors.Is(err, webrtc.ErrSessionNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onWebRTCSessionsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
err = a.WebRTCServer.APISessionsKick(uuid)
if err != nil {
if errors.Is(err, webrtc.ErrSessionNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.Status(http.StatusOK)
}
func (a *API) onSRTConnsList(ctx *gin.Context) {
data, err := a.SRTServer.APIConnsList()
if err != nil {
a.writeError(ctx, http.StatusInternalServerError, err)
return
}
data.ItemCount = len(data.Items)
pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
ctx.JSON(http.StatusOK, data)
}
func (a *API) onSRTConnsGet(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data, err := a.SRTServer.APIConnsGet(uuid)
if err != nil {
if errors.Is(err, srt.ErrConnNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onSRTConnsKick(ctx *gin.Context) {
uuid, err := uuid.Parse(ctx.Param("id"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
err = a.SRTServer.APIConnsKick(uuid)
if err != nil {
if errors.Is(err, srt.ErrConnNotFound) {
a.writeError(ctx, http.StatusNotFound, err)
} else {
a.writeError(ctx, http.StatusInternalServerError, err)
}
return
}
ctx.Status(http.StatusOK)
}
func (a *API) onRecordingsList(ctx *gin.Context) {
a.mutex.RLock()
c := a.Conf
a.mutex.RUnlock()
pathNames := recordstore.FindAllPathsWithSegments(c.Paths)
data := defs.APIRecordingList{}
data.ItemCount = len(pathNames)
pageCount, err := paginate(&pathNames, ctx.Query("itemsPerPage"), ctx.Query("page"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
data.PageCount = pageCount
data.Items = make([]*defs.APIRecording, len(pathNames))
for i, pathName := range pathNames {
pathConf, _, _ := conf.FindPathConf(c.Paths, pathName)
data.Items[i] = recordingsOfPath(pathConf, pathName)
}
ctx.JSON(http.StatusOK, data)
}
func (a *API) onRecordingsGet(ctx *gin.Context) {
pathName, ok := paramName(ctx)
if !ok {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name"))
return
}
a.mutex.RLock()
c := a.Conf
a.mutex.RUnlock()
pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
ctx.JSON(http.StatusOK, recordingsOfPath(pathConf, pathName))
}
func (a *API) onRecordingDeleteSegment(ctx *gin.Context) {
pathName := ctx.Query("path")
start, err := time.Parse(time.RFC3339, ctx.Query("start"))
if err != nil {
a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid 'start' parameter: %w", err))
return
}
a.mutex.RLock()
c := a.Conf
a.mutex.RUnlock()
pathConf, _, err := conf.FindPathConf(c.Paths, pathName)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
pathFormat := recordstore.PathAddExtension(
strings.ReplaceAll(pathConf.RecordPath, "%path", pathName),
pathConf.RecordFormat,
)
segmentPath := recordstore.Path{
Start: start,
}.Encode(pathFormat)
err = os.Remove(segmentPath)
if err != nil {
a.writeError(ctx, http.StatusBadRequest, err)
return
}
ctx.Status(http.StatusOK)
}
// ReloadConf is called by core.
func (a *API) ReloadConf(conf *conf.Conf) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.Conf = conf
}