diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 55445439..d6bb88c4 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -489,6 +489,32 @@ components: items: $ref: '#/components/schemas/HLSMuxer' + RecordingSegment: + type: object + properties: + start: + type: string + + Recording: + type: object + properties: + name: + type: string + segments: + type: array + items: + $ref: '#/components/schemas/RecordingSegment' + + RecordingList: + type: object + properties: + pageCount: + type: integer + items: + type: array + items: + $ref: '#/components/schemas/Recording' + RTMPConn: type: object properties: @@ -1984,3 +2010,122 @@ paths: application/json: schema: $ref: '#/components/schemas/Error' + + /v3/recordings/list: + get: + operationId: recordingsList + tags: [Recordings] + summary: returns all recordings. + description: '' + parameters: + - name: page + in: query + description: page number. + schema: + type: integer + default: 0 + - name: itemsPerPage + in: query + description: items per page. + schema: + type: integer + default: 100 + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RecordingList' + '400': + description: invalid request. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: server error. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v3/recordings/get/{name}: + get: + operationId: recordingsGet + tags: [Recordings] + summary: returns recordings for a path. + description: '' + parameters: + - name: name + in: path + required: true + description: name of the path. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/Recording' + '400': + description: invalid request. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '404': + description: path not found. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: server error. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v3/recordings/deletesegment: + delete: + operationId: recordingsDeleteSegment + tags: [Recordings] + summary: deletes a recording segment. + description: '' + parameters: + - name: path + in: query + required: true + description: path. + schema: + type: string + - name: start + in: query + required: true + description: starting date of the segment. + schema: + type: string + responses: + '200': + description: the request was successful. + '400': + description: invalid request. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '404': + description: connection not found. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: server error. + content: + application/json: + schema: + $ref: '#/components/schemas/Error' diff --git a/internal/api/api.go b/internal/api/api.go index a8020b82..4f650663 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -6,9 +6,11 @@ import ( "errors" "fmt" "net/http" + "os" "reflect" "sort" "strconv" + "strings" "sync" "time" @@ -19,6 +21,7 @@ import ( "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/httpserv" + "github.com/bluenviron/mediamtx/internal/record" "github.com/bluenviron/mediamtx/internal/restrictnetwork" "github.com/bluenviron/mediamtx/internal/servers/hls" "github.com/bluenviron/mediamtx/internal/servers/rtmp" @@ -238,6 +241,10 @@ func (a *API) Initialize() error { group.POST("/v3/srtconns/kick/:id", a.onSRTConnsKick) } + group.GET("/v3/recordings/list", a.onRecordingsList) + group.GET("/v3/recordings/get/*name", a.onRecordingsGet) + group.DELETE("/v3/recordings/deletesegment", a.onRecordingDeleteSegment) + network, address := restrictnetwork.Restrict("tcp", a.Address) var err error @@ -378,7 +385,7 @@ func (a *API) onConfigPathsList(ctx *gin.Context) { } func (a *API) onConfigPathsGet(ctx *gin.Context) { - name, ok := paramName(ctx) + confName, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return @@ -388,7 +395,7 @@ func (a *API) onConfigPathsGet(ctx *gin.Context) { c := a.Conf a.mutex.RUnlock() - p, ok := c.Paths[name] + p, ok := c.Paths[confName] if !ok { a.writeError(ctx, http.StatusNotFound, fmt.Errorf("path configuration not found")) return @@ -398,7 +405,7 @@ func (a *API) onConfigPathsGet(ctx *gin.Context) { } func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl - name, ok := paramName(ctx) + confName, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return @@ -416,7 +423,7 @@ func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl newConf := a.Conf.Clone() - err = newConf.AddPath(name, &p) + err = newConf.AddPath(confName, &p) if err != nil { a.writeError(ctx, http.StatusBadRequest, err) return @@ -435,7 +442,7 @@ func (a *API) onConfigPathsAdd(ctx *gin.Context) { //nolint:dupl } func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl - name, ok := paramName(ctx) + confName, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return @@ -453,7 +460,7 @@ func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl newConf := a.Conf.Clone() - err = newConf.PatchPath(name, &p) + err = newConf.PatchPath(confName, &p) if err != nil { if errors.Is(err, conf.ErrPathNotFound) { a.writeError(ctx, http.StatusNotFound, err) @@ -476,7 +483,7 @@ func (a *API) onConfigPathsPatch(ctx *gin.Context) { //nolint:dupl } func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl - name, ok := paramName(ctx) + confName, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return @@ -494,7 +501,7 @@ func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl newConf := a.Conf.Clone() - err = newConf.ReplacePath(name, &p) + err = newConf.ReplacePath(confName, &p) if err != nil { if errors.Is(err, conf.ErrPathNotFound) { a.writeError(ctx, http.StatusNotFound, err) @@ -517,7 +524,7 @@ func (a *API) onConfigPathsReplace(ctx *gin.Context) { //nolint:dupl } func (a *API) onConfigPathsDelete(ctx *gin.Context) { - name, ok := paramName(ctx) + confName, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return @@ -528,7 +535,7 @@ func (a *API) onConfigPathsDelete(ctx *gin.Context) { newConf := a.Conf.Clone() - err := newConf.RemovePath(name) + err := newConf.RemovePath(confName) if err != nil { if errors.Is(err, conf.ErrPathNotFound) { a.writeError(ctx, http.StatusNotFound, err) @@ -569,13 +576,13 @@ func (a *API) onPathsList(ctx *gin.Context) { } func (a *API) onPathsGet(ctx *gin.Context) { - name, ok := paramName(ctx) + pathName, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return } - data, err := a.PathManager.APIPathsGet(name) + data, err := a.PathManager.APIPathsGet(pathName) if err != nil { if errors.Is(err, conf.ErrPathNotFound) { a.writeError(ctx, http.StatusNotFound, err) @@ -915,13 +922,13 @@ func (a *API) onHLSMuxersList(ctx *gin.Context) { } func (a *API) onHLSMuxersGet(ctx *gin.Context) { - name, ok := paramName(ctx) + pathName, ok := paramName(ctx) if !ok { a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) return } - data, err := a.HLSServer.APIMuxersGet(name) + data, err := a.HLSServer.APIMuxersGet(pathName) if err != nil { if errors.Is(err, hls.ErrMuxerNotFound) { a.writeError(ctx, http.StatusNotFound, err) @@ -1050,6 +1057,100 @@ func (a *API) onSRTConnsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } +func (a *API) onRecordingsList(ctx *gin.Context) { + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + pathNames := getAllPathsWithRecordings(c.Paths) + + data := defs.APIRecordingList{} + + data.ItemCount = len(pathNames) + pageCount, err := paginate(&pathNames, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + data.PageCount = pageCount + + data.Items = make([]*defs.APIRecording, len(pathNames)) + + for i, pathName := range pathNames { + _, pathConf, _, _ := conf.FindPathConf(c.Paths, pathName) + data.Items[i] = recordingEntry(pathConf, pathName) + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *API) onRecordingsGet(ctx *gin.Context) { + pathName, ok := paramName(ctx) + if !ok { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid name")) + return + } + + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + _, pathConf, _, err := conf.FindPathConf(c.Paths, pathName) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + if !pathConf.Playback { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("playback is disabled on path '%s'", pathName)) + return + } + + ctx.JSON(http.StatusOK, recordingEntry(pathConf, pathName)) +} + +func (a *API) onRecordingDeleteSegment(ctx *gin.Context) { + pathName := ctx.Query("path") + + start, err := time.Parse(time.RFC3339, ctx.Query("start")) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid 'start' parameter: %w", err)) + return + } + + a.mutex.RLock() + c := a.Conf + a.mutex.RUnlock() + + _, pathConf, _, err := conf.FindPathConf(c.Paths, pathName) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + if !pathConf.Playback { + a.writeError(ctx, http.StatusBadRequest, fmt.Errorf("playback is disabled on path '%s'", pathName)) + return + } + + pathFormat := record.PathAddExtension( + strings.ReplaceAll(pathConf.RecordPath, "%path", pathName), + pathConf.RecordFormat, + ) + + segmentPath := record.Path{ + Start: start, + }.Encode(pathFormat) + + err = os.Remove(segmentPath) + if err != nil { + a.writeError(ctx, http.StatusBadRequest, err) + return + } + + ctx.Status(http.StatusOK) +} + // ReloadConf is called by core. func (a *API) ReloadConf(conf *conf.Conf) { a.mutex.Lock() diff --git a/internal/api/api_test.go b/internal/api/api_test.go index ddc656f3..33f7fad0 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -1,11 +1,92 @@ package api import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/url" + "os" + "path/filepath" "testing" + "time" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" "github.com/stretchr/testify/require" ) +type testParent struct{} + +func (testParent) Log(_ logger.Level, _ string, _ ...interface{}) { +} + +func (testParent) APIConfigSet(_ *conf.Conf) {} + +func writeTempFile(byts []byte) (string, error) { + tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-") + if err != nil { + return "", err + } + defer tmpf.Close() + + _, err = tmpf.Write(byts) + if err != nil { + return "", err + } + + return tmpf.Name(), nil +} + +func tempConf(t *testing.T, cnt string) *conf.Conf { + fi, err := writeTempFile([]byte(cnt)) + require.NoError(t, err) + defer os.Remove(fi) + + cnf, _, err := conf.Load(fi, nil) + require.NoError(t, err) + + return cnf +} + +func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in interface{}, out interface{}) { + buf := func() io.Reader { + if in == nil { + return nil + } + + byts, err := json.Marshal(in) + require.NoError(t, err) + + return bytes.NewBuffer(byts) + }() + + req, err := http.NewRequest(method, ur, buf) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + t.Errorf("bad status code: %d", res.StatusCode) + } + + if out == nil { + return + } + + err = json.NewDecoder(res.Body).Decode(out) + require.NoError(t, err) +} + +func checkError(t *testing.T, msg string, body io.Reader) { + var resErr map[string]interface{} + err := json.NewDecoder(body).Decode(&resErr) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{"error": msg}, resErr) +} + func TestPaginate(t *testing.T) { items := make([]int, 5) for i := 0; i < 5; i++ { @@ -37,3 +118,530 @@ func TestPaginate(t *testing.T) { require.Equal(t, 2, pageCount) require.Equal(t, []int{4, 5}, items) } + +func TestConfigGlobalGet(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + var out map[string]interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/global/get", nil, &out) + require.Equal(t, true, out["api"]) +} + +func TestConfigGlobalPatch(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/global/patch", map[string]interface{}{ + "rtmp": false, + "readTimeout": "7s", + "protocols": []string{"tcp"}, + "readBufferCount": 4096, // test setting a deprecated parameter + }, nil) + + time.Sleep(500 * time.Millisecond) + + var out map[string]interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/global/get", nil, &out) + require.Equal(t, false, out["rtmp"]) + require.Equal(t, "7s", out["readTimeout"]) + require.Equal(t, []interface{}{"tcp"}, out["protocols"]) + require.Equal(t, float64(4096), out["readBufferCount"]) +} + +func TestAPIConfigGlobalPatchUnknownField(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + b := map[string]interface{}{ + "test": "asd", + } + + byts, err := json.Marshal(b) + require.NoError(t, err) + + hc := &http.Client{Transport: &http.Transport{}} + + req, err := http.NewRequest(http.MethodPatch, "http://localhost:9997/v3/config/global/patch", bytes.NewReader(byts)) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusBadRequest, res.StatusCode) + checkError(t, "json: unknown field \"test\"", res.Body) +} + +func TestAPIConfigPathDefaultsGet(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + var out map[string]interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/pathdefaults/get", nil, &out) + require.Equal(t, "publisher", out["source"]) +} + +func TestAPIConfigPathDefaultsPatch(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/pathdefaults/patch", map[string]interface{}{ + "readUser": "myuser", + "readPass": "mypass", + }, nil) + + time.Sleep(500 * time.Millisecond) + + var out map[string]interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/pathdefaults/get", nil, &out) + require.Equal(t, "myuser", out["readUser"]) + require.Equal(t, "mypass", out["readPass"]) +} + +func TestAPIConfigPathsList(t *testing.T) { + cnf := tempConf(t, "api: yes\n"+ + "paths:\n"+ + " path1:\n"+ + " readUser: myuser1\n"+ + " readPass: mypass1\n"+ + " path2:\n"+ + " readUser: myuser2\n"+ + " readPass: mypass2\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + type pathConfig map[string]interface{} + + type listRes struct { + ItemCount int `json:"itemCount"` + PageCount int `json:"pageCount"` + Items []pathConfig `json:"items"` + } + + hc := &http.Client{Transport: &http.Transport{}} + + var out listRes + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/list", nil, &out) + require.Equal(t, 2, out.ItemCount) + require.Equal(t, 1, out.PageCount) + require.Equal(t, "path1", out.Items[0]["name"]) + require.Equal(t, "myuser1", out.Items[0]["readUser"]) + require.Equal(t, "mypass1", out.Items[0]["readPass"]) + require.Equal(t, "path2", out.Items[1]["name"]) + require.Equal(t, "myuser2", out.Items[1]["readUser"]) + require.Equal(t, "mypass2", out.Items[1]["readPass"]) +} + +func TestAPIConfigPathsGet(t *testing.T) { + cnf := tempConf(t, "api: yes\n"+ + "paths:\n"+ + " my/path:\n"+ + " readUser: myuser\n"+ + " readPass: mypass\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + var out map[string]interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "my/path", out["name"]) + require.Equal(t, "myuser", out["readUser"]) +} + +func TestAPIConfigPathsAdd(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", map[string]interface{}{ + "source": "rtsp://127.0.0.1:9999/mypath", + "sourceOnDemand": true, + "disablePublisherOverride": true, // test setting a deprecated parameter + "rpiCameraVFlip": true, + }, nil) + + var out map[string]interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "rtsp://127.0.0.1:9999/mypath", out["source"]) + require.Equal(t, true, out["sourceOnDemand"]) + require.Equal(t, true, out["disablePublisherOverride"]) + require.Equal(t, true, out["rpiCameraVFlip"]) +} + +func TestAPIConfigPathsAddUnknownField(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + b := map[string]interface{}{ + "test": "asd", + } + + byts, err := json.Marshal(b) + require.NoError(t, err) + + hc := &http.Client{Transport: &http.Transport{}} + + req, err := http.NewRequest(http.MethodPost, + "http://localhost:9997/v3/config/paths/add/my/path", bytes.NewReader(byts)) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusBadRequest, res.StatusCode) + checkError(t, "json: unknown field \"test\"", res.Body) +} + +func TestAPIConfigPathsPatch(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", map[string]interface{}{ + "source": "rtsp://127.0.0.1:9999/mypath", + "sourceOnDemand": true, + "disablePublisherOverride": true, // test setting a deprecated parameter + "rpiCameraVFlip": true, + }, nil) + + httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/paths/patch/my/path", map[string]interface{}{ + "source": "rtsp://127.0.0.1:9998/mypath", + "sourceOnDemand": true, + }, nil) + + var out map[string]interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) + require.Equal(t, true, out["sourceOnDemand"]) + require.Equal(t, true, out["disablePublisherOverride"]) + require.Equal(t, true, out["rpiCameraVFlip"]) +} + +func TestAPIConfigPathsReplace(t *testing.T) { //nolint:dupl + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", map[string]interface{}{ + "source": "rtsp://127.0.0.1:9999/mypath", + "sourceOnDemand": true, + "disablePublisherOverride": true, // test setting a deprecated parameter + "rpiCameraVFlip": true, + }, nil) + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/replace/my/path", map[string]interface{}{ + "source": "rtsp://127.0.0.1:9998/mypath", + "sourceOnDemand": true, + }, nil) + + var out map[string]interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) + require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) + require.Equal(t, true, out["sourceOnDemand"]) + require.Equal(t, nil, out["disablePublisherOverride"]) + require.Equal(t, false, out["rpiCameraVFlip"]) +} + +func TestAPIConfigPathsDelete(t *testing.T) { + cnf := tempConf(t, "api: yes\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err := api.Initialize() + require.NoError(t, err) + defer api.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", map[string]interface{}{ + "source": "rtsp://127.0.0.1:9999/mypath", + "sourceOnDemand": true, + }, nil) + + httpRequest(t, hc, http.MethodDelete, "http://localhost:9997/v3/config/paths/delete/my/path", nil, nil) + + req, err := http.NewRequest(http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusNotFound, res.StatusCode) + checkError(t, "path configuration not found", res.Body) +} + +func TestRecordingsList(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cnf := tempConf(t, "pathDefaults:\n"+ + " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ + "paths:\n"+ + " all_others:\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err = api.Initialize() + require.NoError(t, err) + defer api.Close() + + err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) + require.NoError(t, err) + + err = os.Mkdir(filepath.Join(dir, "mypath2"), 0o755) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-000000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2009-11-07_11-22-00-000000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath2", "2009-11-07_11-22-00-000000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + hc := &http.Client{Transport: &http.Transport{}} + + var out interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/recordings/list", nil, &out) + require.Equal(t, map[string]interface{}{ + "itemCount": float64(2), + "pageCount": float64(1), + "items": []interface{}{ + map[string]interface{}{ + "name": "mypath1", + "segments": []interface{}{ + map[string]interface{}{ + "start": time.Date(2008, 11, 0o7, 11, 22, 0, 0, time.Local).Format(time.RFC3339), + }, + map[string]interface{}{ + "start": time.Date(2009, 11, 0o7, 11, 22, 0, 0, time.Local).Format(time.RFC3339), + }, + }, + }, + map[string]interface{}{ + "name": "mypath2", + "segments": []interface{}{ + map[string]interface{}{ + "start": time.Date(2009, 11, 0o7, 11, 22, 0, 0, time.Local).Format(time.RFC3339), + }, + }, + }, + }, + }, out) +} + +func TestRecordingsGet(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cnf := tempConf(t, "pathDefaults:\n"+ + " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ + "paths:\n"+ + " all_others:\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err = api.Initialize() + require.NoError(t, err) + defer api.Close() + + err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-000000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2009-11-07_11-22-00-000000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + hc := &http.Client{Transport: &http.Transport{}} + + var out interface{} + httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/recordings/get/mypath1", nil, &out) + require.Equal(t, map[string]interface{}{ + "name": "mypath1", + "segments": []interface{}{ + map[string]interface{}{ + "start": time.Date(2008, 11, 0o7, 11, 22, 0, 0, time.Local).Format(time.RFC3339), + }, + map[string]interface{}{ + "start": time.Date(2009, 11, 0o7, 11, 22, 0, 0, time.Local).Format(time.RFC3339), + }, + }, + }, out) +} + +func TestRecordingsDeleteSegment(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cnf := tempConf(t, "pathDefaults:\n"+ + " recordPath: "+filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")+"\n"+ + "paths:\n"+ + " all_others:\n") + + api := API{ + Address: "localhost:9997", + ReadTimeout: conf.StringDuration(10 * time.Second), + Conf: cnf, + Parent: &testParent{}, + } + err = api.Initialize() + require.NoError(t, err) + defer api.Close() + + err = os.Mkdir(filepath.Join(dir, "mypath1"), 0o755) + require.NoError(t, err) + + err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-000000.mp4"), []byte(""), 0o644) + require.NoError(t, err) + + hc := &http.Client{Transport: &http.Transport{}} + + v := url.Values{} + v.Set("path", "mypath1") + v.Set("start", time.Date(2008, 11, 0o7, 11, 22, 0, 0, time.Local).Format(time.RFC3339)) + + u := &url.URL{ + Scheme: "http", + Host: "localhost:9997", + Path: "/v3/recordings/deletesegment", + RawQuery: v.Encode(), + } + + req, err := http.NewRequest(http.MethodDelete, u.String(), nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + require.Equal(t, http.StatusOK, res.StatusCode) +} diff --git a/internal/api/record.go b/internal/api/record.go new file mode 100644 index 00000000..b724ab12 --- /dev/null +++ b/internal/api/record.go @@ -0,0 +1,139 @@ +package api + +import ( + "errors" + "io/fs" + "path/filepath" + "sort" + "strings" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/playback" + "github.com/bluenviron/mediamtx/internal/record" +) + +var errFound = errors.New("found") + +func fixedPathHasRecordings(pathConf *conf.Path) bool { + recordPath := record.PathAddExtension( + strings.ReplaceAll(pathConf.RecordPath, "%path", pathConf.Name), + pathConf.RecordFormat, + ) + + // we have to convert to absolute paths + // otherwise, recordPath and fpath inside Walk() won't have common elements + recordPath, _ = filepath.Abs(recordPath) + + commonPath := record.CommonPath(recordPath) + + err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() { + var pa record.Path + ok := pa.Decode(recordPath, fpath) + if ok { + return errFound + } + } + + return nil + }) + if err != nil && !errors.Is(err, errFound) { + return false + } + + return errors.Is(err, errFound) +} + +func regexpPathGetRecordings(pathConf *conf.Path) []string { + recordPath := record.PathAddExtension( + pathConf.RecordPath, + pathConf.RecordFormat, + ) + + // we have to convert to absolute paths + // otherwise, recordPath and fpath inside Walk() won't have common elements + recordPath, _ = filepath.Abs(recordPath) + + commonPath := record.CommonPath(recordPath) + + var ret []string + + filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { //nolint:errcheck + if err != nil { + return err + } + + if !info.IsDir() { + var pa record.Path + ok := pa.Decode(recordPath, fpath) + if ok && pathConf.Regexp.FindStringSubmatch(pa.Path) != nil { + ret = append(ret, pa.Path) + } + } + + return nil + }) + + return ret +} + +func removeDuplicatesAndSort(in []string) []string { + ma := make(map[string]struct{}, len(in)) + for _, i := range in { + ma[i] = struct{}{} + } + + out := []string{} + + for k := range ma { + out = append(out, k) + } + + sort.Strings(out) + + return out +} + +func getAllPathsWithRecordings(paths map[string]*conf.Path) []string { + pathNames := []string{} + + for _, pathConf := range paths { + if pathConf.Playback { + if pathConf.Regexp == nil { + if fixedPathHasRecordings(pathConf) { + pathNames = append(pathNames, pathConf.Name) + } + } else { + pathNames = append(pathNames, regexpPathGetRecordings(pathConf)...) + } + } + } + + return removeDuplicatesAndSort(pathNames) +} + +func recordingEntry( + pathConf *conf.Path, + pathName string, +) *defs.APIRecording { + ret := &defs.APIRecording{ + Name: pathName, + } + + segments, _ := playback.FindSegments(pathConf, pathName) + + ret.Segments = make([]*defs.APIRecordingSegment, len(segments)) + + for i, seg := range segments { + ret.Segments[i] = &defs.APIRecordingSegment{ + Start: seg.Start, + } + } + + return ret +} diff --git a/internal/core/api_test.go b/internal/core/api_test.go index f0d4f425..2ceeb197 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -64,13 +64,6 @@ func checkClose(t *testing.T, closeFunc func() error) { require.NoError(t, closeFunc()) } -func checkError(t *testing.T, msg string, body io.Reader) { - var resErr map[string]interface{} - err := json.NewDecoder(body).Decode(&resErr) - require.NoError(t, err) - require.Equal(t, map[string]interface{}{"error": msg}, resErr) -} - func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in interface{}, out interface{}) { buf := func() io.Reader { if in == nil { @@ -102,281 +95,11 @@ func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in int require.NoError(t, err) } -func TestAPIConfigGlobalGet(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - var out map[string]interface{} - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/global/get", nil, &out) - require.Equal(t, true, out["api"]) -} - -func TestAPIConfigGlobalPatch(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/global/patch", map[string]interface{}{ - "rtmp": false, - "readTimeout": "7s", - "protocols": []string{"tcp"}, - "readBufferCount": 4096, // test setting a deprecated parameter - }, nil) - - time.Sleep(500 * time.Millisecond) - - var out map[string]interface{} - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/global/get", nil, &out) - require.Equal(t, false, out["rtmp"]) - require.Equal(t, "7s", out["readTimeout"]) - require.Equal(t, []interface{}{"tcp"}, out["protocols"]) - require.Equal(t, float64(4096), out["readBufferCount"]) -} - -func TestAPIConfigGlobalPatchUnknownField(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - b := map[string]interface{}{ - "test": "asd", - } - - byts, err := json.Marshal(b) +func checkError(t *testing.T, msg string, body io.Reader) { + var resErr map[string]interface{} + err := json.NewDecoder(body).Decode(&resErr) require.NoError(t, err) - - hc := &http.Client{Transport: &http.Transport{}} - - func() { - req, err := http.NewRequest(http.MethodPatch, "http://localhost:9997/v3/config/global/patch", bytes.NewReader(byts)) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusBadRequest, res.StatusCode) - checkError(t, "json: unknown field \"test\"", res.Body) - }() -} - -func TestAPIConfigPathDefaultsGet(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - var out map[string]interface{} - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/pathdefaults/get", nil, &out) - require.Equal(t, "publisher", out["source"]) -} - -func TestAPIConfigPathDefaultsPatch(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/pathdefaults/patch", map[string]interface{}{ - "readUser": "myuser", - "readPass": "mypass", - }, nil) - - time.Sleep(500 * time.Millisecond) - - var out map[string]interface{} - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/pathdefaults/get", nil, &out) - require.Equal(t, "myuser", out["readUser"]) - require.Equal(t, "mypass", out["readPass"]) -} - -func TestAPIConfigPathsList(t *testing.T) { - p, ok := newInstance("api: yes\n" + - "paths:\n" + - " path1:\n" + - " readUser: myuser1\n" + - " readPass: mypass1\n" + - " path2:\n" + - " readUser: myuser2\n" + - " readPass: mypass2\n") - require.Equal(t, true, ok) - defer p.Close() - - type pathConfig map[string]interface{} - - type listRes struct { - ItemCount int `json:"itemCount"` - PageCount int `json:"pageCount"` - Items []pathConfig `json:"items"` - } - - hc := &http.Client{Transport: &http.Transport{}} - - var out listRes - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/list", nil, &out) - require.Equal(t, 2, out.ItemCount) - require.Equal(t, 1, out.PageCount) - require.Equal(t, "path1", out.Items[0]["name"]) - require.Equal(t, "myuser1", out.Items[0]["readUser"]) - require.Equal(t, "mypass1", out.Items[0]["readPass"]) - require.Equal(t, "path2", out.Items[1]["name"]) - require.Equal(t, "myuser2", out.Items[1]["readUser"]) - require.Equal(t, "mypass2", out.Items[1]["readPass"]) -} - -func TestAPIConfigPathsGet(t *testing.T) { - p, ok := newInstance("api: yes\n" + - "paths:\n" + - " my/path:\n" + - " readUser: myuser\n" + - " readPass: mypass\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - var out map[string]interface{} - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "my/path", out["name"]) - require.Equal(t, "myuser", out["readUser"]) -} - -func TestAPIConfigPathsAdd(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", map[string]interface{}{ - "source": "rtsp://127.0.0.1:9999/mypath", - "sourceOnDemand": true, - "disablePublisherOverride": true, // test setting a deprecated parameter - "rpiCameraVFlip": true, - }, nil) - - var out map[string]interface{} - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "rtsp://127.0.0.1:9999/mypath", out["source"]) - require.Equal(t, true, out["sourceOnDemand"]) - require.Equal(t, true, out["disablePublisherOverride"]) - require.Equal(t, true, out["rpiCameraVFlip"]) -} - -func TestAPIConfigPathsAddUnknownField(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - b := map[string]interface{}{ - "test": "asd", - } - - byts, err := json.Marshal(b) - require.NoError(t, err) - - hc := &http.Client{Transport: &http.Transport{}} - - func() { - req, err := http.NewRequest(http.MethodPost, - "http://localhost:9997/v3/config/paths/add/my/path", bytes.NewReader(byts)) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusBadRequest, res.StatusCode) - checkError(t, "json: unknown field \"test\"", res.Body) - }() -} - -func TestAPIConfigPathsPatch(t *testing.T) { //nolint:dupl - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", map[string]interface{}{ - "source": "rtsp://127.0.0.1:9999/mypath", - "sourceOnDemand": true, - "disablePublisherOverride": true, // test setting a deprecated parameter - "rpiCameraVFlip": true, - }, nil) - - httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/paths/patch/my/path", map[string]interface{}{ - "source": "rtsp://127.0.0.1:9998/mypath", - "sourceOnDemand": true, - }, nil) - - var out map[string]interface{} - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) - require.Equal(t, true, out["sourceOnDemand"]) - require.Equal(t, true, out["disablePublisherOverride"]) - require.Equal(t, true, out["rpiCameraVFlip"]) -} - -func TestAPIConfigPathsReplace(t *testing.T) { //nolint:dupl - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", map[string]interface{}{ - "source": "rtsp://127.0.0.1:9999/mypath", - "sourceOnDemand": true, - "disablePublisherOverride": true, // test setting a deprecated parameter - "rpiCameraVFlip": true, - }, nil) - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/replace/my/path", map[string]interface{}{ - "source": "rtsp://127.0.0.1:9998/mypath", - "sourceOnDemand": true, - }, nil) - - var out map[string]interface{} - httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil, &out) - require.Equal(t, "rtsp://127.0.0.1:9998/mypath", out["source"]) - require.Equal(t, true, out["sourceOnDemand"]) - require.Equal(t, nil, out["disablePublisherOverride"]) - require.Equal(t, false, out["rpiCameraVFlip"]) -} - -func TestAPIConfigPathsDelete(t *testing.T) { - p, ok := newInstance("api: yes\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - httpRequest(t, hc, http.MethodPost, "http://localhost:9997/v3/config/paths/add/my/path", map[string]interface{}{ - "source": "rtsp://127.0.0.1:9999/mypath", - "sourceOnDemand": true, - }, nil) - - httpRequest(t, hc, http.MethodDelete, "http://localhost:9997/v3/config/paths/delete/my/path", nil, nil) - - func() { - req, err := http.NewRequest(http.MethodGet, "http://localhost:9997/v3/config/paths/get/my/path", nil) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusNotFound, res.StatusCode) - checkError(t, "path configuration not found", res.Body) - }() + require.Equal(t, map[string]interface{}{"error": msg}, resErr) } func TestAPIPathsList(t *testing.T) { diff --git a/internal/defs/api.go b/internal/defs/api.go index feaea0e6..758e790f 100644 --- a/internal/defs/api.go +++ b/internal/defs/api.go @@ -195,3 +195,21 @@ type APIWebRTCSessionList struct { PageCount int `json:"pageCount"` Items []*APIWebRTCSession `json:"items"` } + +// APIRecordingSegment is a recording segment. +type APIRecordingSegment struct { + Start time.Time `json:"start"` +} + +// APIRecording is a recording. +type APIRecording struct { + Name string `json:"name"` + Segments []*APIRecordingSegment `json:"segments"` +} + +// APIRecordingList is a list of recordings. +type APIRecordingList struct { + ItemCount int `json:"itemCount"` + PageCount int `json:"pageCount"` + Items []*APIRecording `json:"items"` +} diff --git a/internal/playback/segment.go b/internal/playback/segment.go index 7636815c..33b117f5 100644 --- a/internal/playback/segment.go +++ b/internal/playback/segment.go @@ -12,18 +12,19 @@ import ( "github.com/bluenviron/mediamtx/internal/record" ) -type segment struct { +// Segment is a recording segment. +type Segment struct { fpath string - start time.Time + Start time.Time duration time.Duration } -func findSegments( +func findSegmentsInTimespan( pathConf *conf.Path, pathName string, start time.Time, duration time.Duration, -) ([]*segment, error) { +) ([]*Segment, error) { if !pathConf.Playback { return nil, fmt.Errorf("playback is disabled on path '%s'", pathName) } @@ -39,7 +40,7 @@ func findSegments( commonPath := record.CommonPath(recordPath) end := start.Add(duration) - var segments []*segment + var segments []*Segment err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { if err != nil { @@ -51,10 +52,10 @@ func findSegments( ok := pa.Decode(recordPath, fpath) // gather all segments that starts before the end of the playback - if ok && !end.Before(time.Time(pa)) { - segments = append(segments, &segment{ + if ok && !end.Before(pa.Start) { + segments = append(segments, &Segment{ fpath: fpath, - start: time.Time(pa), + Start: pa.Start, }) } } @@ -70,13 +71,13 @@ func findSegments( } sort.Slice(segments, func(i, j int) bool { - return segments[i].start.Before(segments[j].start) + return segments[i].Start.Before(segments[j].Start) }) // find the segment that may contain the start of the playback and remove all previous ones found := false for i := 0; i < len(segments)-1; i++ { - if !start.Before(segments[i].start) && start.Before(segments[i+1].start) { + if !start.Before(segments[i].Start) && start.Before(segments[i+1].Start) { segments = segments[i:] found = true break @@ -86,7 +87,7 @@ func findSegments( // otherwise, keep the last segment only and check if it may contain the start of the playback if !found { segments = segments[len(segments)-1:] - if segments[len(segments)-1].start.After(start) { + if segments[len(segments)-1].Start.After(start) { return nil, errNoSegmentsFound } } @@ -94,10 +95,11 @@ func findSegments( return segments, nil } -func findAllSegments( +// FindSegments returns all segments of a path. +func FindSegments( pathConf *conf.Path, pathName string, -) ([]*segment, error) { +) ([]*Segment, error) { if !pathConf.Playback { return nil, fmt.Errorf("playback is disabled on path '%s'", pathName) } @@ -112,7 +114,7 @@ func findAllSegments( recordPath, _ = filepath.Abs(recordPath) commonPath := record.CommonPath(recordPath) - var segments []*segment + var segments []*Segment err := filepath.Walk(commonPath, func(fpath string, info fs.FileInfo, err error) error { if err != nil { @@ -123,9 +125,9 @@ func findAllSegments( var pa record.Path ok := pa.Decode(recordPath, fpath) if ok { - segments = append(segments, &segment{ + segments = append(segments, &Segment{ fpath: fpath, - start: time.Time(pa), + Start: pa.Start, }) } } @@ -141,24 +143,24 @@ func findAllSegments( } sort.Slice(segments, func(i, j int) bool { - return segments[i].start.Before(segments[j].start) + return segments[i].Start.Before(segments[j].Start) }) return segments, nil } -func canBeConcatenated(seg1, seg2 *segment) bool { - end1 := seg1.start.Add(seg1.duration) - return !seg2.start.Before(end1.Add(-concatenationTolerance)) && !seg2.start.After(end1.Add(concatenationTolerance)) +func canBeConcatenated(seg1, seg2 *Segment) bool { + end1 := seg1.Start.Add(seg1.duration) + return !seg2.Start.Before(end1.Add(-concatenationTolerance)) && !seg2.Start.After(end1.Add(concatenationTolerance)) } -func mergeConcatenatedSegments(in []*segment) []*segment { - var out []*segment +func mergeConcatenatedSegments(in []*Segment) []*Segment { + var out []*Segment for _, seg := range in { if len(out) != 0 && canBeConcatenated(out[len(out)-1], seg) { - start := out[len(out)-1].start - end := seg.start.Add(seg.duration) + start := out[len(out)-1].Start + end := seg.Start.Add(seg.duration) out[len(out)-1].duration = end.Sub(start) } else { out = append(out, seg) diff --git a/internal/playback/server.go b/internal/playback/server.go index 21a727e9..5d5d5868 100644 --- a/internal/playback/server.go +++ b/internal/playback/server.go @@ -137,7 +137,12 @@ func (p *Server) onList(ctx *gin.Context) { return } - segments, err := findAllSegments(pathConf, pathName) + if !pathConf.Playback { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("playback is disabled on path '%s'", pathName)) + return + } + + segments, err := FindSegments(pathConf, pathName) if err != nil { if errors.Is(err, errNoSegmentsFound) { p.writeError(ctx, http.StatusNotFound, err) @@ -166,7 +171,7 @@ func (p *Server) onList(ctx *gin.Context) { out := make([]listEntry, len(segments)) for i, seg := range segments { out[i] = listEntry{ - Start: seg.start, + Start: seg.Start, Duration: seg.duration.Seconds(), } } @@ -201,7 +206,7 @@ func (p *Server) onGet(ctx *gin.Context) { return } - segments, err := findSegments(pathConf, pathName, start, duration) + segments, err := findSegmentsInTimespan(pathConf, pathName, start, duration) if err != nil { if errors.Is(err, errNoSegmentsFound) { p.writeError(ctx, http.StatusNotFound, err) @@ -217,7 +222,7 @@ func (p *Server) onGet(ctx *gin.Context) { } ww := &writerWrapper{ctx: ctx} - minTime := start.Sub(segments[0].start) + minTime := start.Sub(segments[0].Start) maxTime := minTime + duration elapsed, err := fmp4SeekAndMux( @@ -253,7 +258,7 @@ func (p *Server) onGet(ctx *gin.Context) { for _, seg := range segments[1:] { // there's a gap between segments, stop serving the recording - if seg.start.Before(start.Add(-concatenationTolerance)) || seg.start.After(start.Add(concatenationTolerance)) { + if seg.Start.Before(start.Add(-concatenationTolerance)) || seg.Start.After(start.Add(concatenationTolerance)) { return } @@ -270,7 +275,7 @@ func (p *Server) onGet(ctx *gin.Context) { return } - start = seg.start.Add(elapsed) + start = seg.Start.Add(elapsed) duration -= elapsed overallElapsed += elapsed } diff --git a/internal/record/cleaner.go b/internal/record/cleaner.go index 695b0d1d..f285be42 100644 --- a/internal/record/cleaner.go +++ b/internal/record/cleaner.go @@ -98,7 +98,7 @@ func (c *Cleaner) doRunEntry(e *CleanerEntry) error { var pa Path ok := pa.Decode(entryPath, fpath) if ok { - if now.Sub(time.Time(pa)) > e.DeleteAfter { + if now.Sub(pa.Start) > e.DeleteAfter { c.Log(logger.Debug, "removing %s", fpath) os.Remove(fpath) } diff --git a/internal/record/format_fmp4_part.go b/internal/record/format_fmp4_part.go index dbe8a4a5..1f1968fe 100644 --- a/internal/record/format_fmp4_part.go +++ b/internal/record/format_fmp4_part.go @@ -54,7 +54,7 @@ func (p *formatFMP4Part) initialize() { func (p *formatFMP4Part) close() error { if p.s.fi == nil { - p.s.path = Path(p.s.startNTP).Encode(p.s.f.a.pathFormat) + p.s.path = Path{Start: p.s.startNTP}.Encode(p.s.f.a.pathFormat) p.s.f.a.agent.Log(logger.Debug, "creating segment %s", p.s.path) err := os.MkdirAll(filepath.Dir(p.s.path), 0o755) diff --git a/internal/record/format_mpegts_segment.go b/internal/record/format_mpegts_segment.go index a3870241..06e16883 100644 --- a/internal/record/format_mpegts_segment.go +++ b/internal/record/format_mpegts_segment.go @@ -43,7 +43,7 @@ func (s *formatMPEGTSSegment) close() error { func (s *formatMPEGTSSegment) Write(p []byte) (int, error) { if s.fi == nil { - s.path = Path(s.startNTP).Encode(s.f.a.pathFormat) + s.path = Path{Start: s.startNTP}.Encode(s.f.a.pathFormat) s.f.a.agent.Log(logger.Debug, "creating segment %s", s.path) err := os.MkdirAll(filepath.Dir(s.path), 0o755) diff --git a/internal/record/path.go b/internal/record/path.go index 1c31a049..fd58de69 100644 --- a/internal/record/path.go +++ b/internal/record/path.go @@ -62,8 +62,11 @@ func CommonPath(v string) string { return common } -// Path is a record path. -type Path time.Time +// Path is a path of a recording segment. +type Path struct { + Start time.Time + Path string +} // Decode decodes a Path. func (p *Path) Decode(format string, v string) bool { @@ -150,6 +153,9 @@ func (p *Path) Decode(format string, v string) bool { for k, v := range values { switch k { + case "%path": + p.Path = v + case "%Y": tmp, _ := strconv.ParseInt(v, 10, 64) year = int(tmp) @@ -184,9 +190,9 @@ func (p *Path) Decode(format string, v string) bool { } if unixSec > 0 { - *p = Path(time.Unix(unixSec, 0)) + p.Start = time.Unix(unixSec, 0) } else { - *p = Path(time.Date(year, month, day, hour, minute, second, micros*1000, time.Local)) + p.Start = time.Date(year, month, day, hour, minute, second, micros*1000, time.Local) } return true @@ -194,13 +200,14 @@ func (p *Path) Decode(format string, v string) bool { // Encode encodes a path. func (p Path) Encode(format string) string { - format = strings.ReplaceAll(format, "%Y", strconv.FormatInt(int64(time.Time(p).Year()), 10)) - format = strings.ReplaceAll(format, "%m", leadingZeros(int(time.Time(p).Month()), 2)) - format = strings.ReplaceAll(format, "%d", leadingZeros(time.Time(p).Day(), 2)) - format = strings.ReplaceAll(format, "%H", leadingZeros(time.Time(p).Hour(), 2)) - format = strings.ReplaceAll(format, "%M", leadingZeros(time.Time(p).Minute(), 2)) - format = strings.ReplaceAll(format, "%S", leadingZeros(time.Time(p).Second(), 2)) - format = strings.ReplaceAll(format, "%f", leadingZeros(time.Time(p).Nanosecond()/1000, 6)) - format = strings.ReplaceAll(format, "%s", strconv.FormatInt(time.Time(p).Unix(), 10)) + format = strings.ReplaceAll(format, "%path", p.Path) + format = strings.ReplaceAll(format, "%Y", strconv.FormatInt(int64(p.Start.Year()), 10)) + format = strings.ReplaceAll(format, "%m", leadingZeros(int(p.Start.Month()), 2)) + format = strings.ReplaceAll(format, "%d", leadingZeros(p.Start.Day(), 2)) + format = strings.ReplaceAll(format, "%H", leadingZeros(p.Start.Hour(), 2)) + format = strings.ReplaceAll(format, "%M", leadingZeros(p.Start.Minute(), 2)) + format = strings.ReplaceAll(format, "%S", leadingZeros(p.Start.Second(), 2)) + format = strings.ReplaceAll(format, "%f", leadingZeros(p.Start.Nanosecond()/1000, 6)) + format = strings.ReplaceAll(format, "%s", strconv.FormatInt(p.Start.Unix(), 10)) return format } diff --git a/internal/record/path_test.go b/internal/record/path_test.go index 3831561e..80064797 100644 --- a/internal/record/path_test.go +++ b/internal/record/path_test.go @@ -16,14 +16,20 @@ var pathCases = []struct { { "standard", "%path/%Y-%m-%d_%H-%M-%S-%f.mp4", - Path(time.Date(2008, 11, 0o7, 11, 22, 4, 123456000, time.Local)), - "%path/2008-11-07_11-22-04-123456.mp4", + Path{ + Start: time.Date(2008, 11, 0o7, 11, 22, 4, 123456000, time.Local), + Path: "mypath", + }, + "mypath/2008-11-07_11-22-04-123456.mp4", }, { "unix seconds", "%path/%s.mp4", - Path(time.Date(2021, 12, 2, 12, 15, 23, 0, time.UTC).Local()), - "%path/1638447323.mp4", + Path{ + Start: time.Date(2021, 12, 2, 12, 15, 23, 0, time.UTC).Local(), + Path: "mypath", + }, + "mypath/1638447323.mp4", }, }