135 lines
2.6 KiB
Go
135 lines
2.6 KiB
Go
// Package recordcleaner contains the recording cleaner.
|
|
package recordcleaner
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/bluenviron/mediamtx/internal/conf"
|
|
"github.com/bluenviron/mediamtx/internal/logger"
|
|
"github.com/bluenviron/mediamtx/internal/recordstore"
|
|
)
|
|
|
|
var timeNow = time.Now
|
|
|
|
// Cleaner removes expired recording segments from disk.
|
|
type Cleaner struct {
|
|
PathConfs map[string]*conf.Path
|
|
Parent logger.Writer
|
|
|
|
ctx context.Context
|
|
ctxCancel func()
|
|
|
|
chReloadConf chan map[string]*conf.Path
|
|
done chan struct{}
|
|
}
|
|
|
|
// Initialize initializes a Cleaner.
|
|
func (c *Cleaner) Initialize() {
|
|
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
|
|
c.chReloadConf = make(chan map[string]*conf.Path)
|
|
c.done = make(chan struct{})
|
|
|
|
go c.run()
|
|
}
|
|
|
|
// Close closes the Cleaner.
|
|
func (c *Cleaner) Close() {
|
|
c.ctxCancel()
|
|
<-c.done
|
|
}
|
|
|
|
// Log implements logger.Writer.
|
|
func (c *Cleaner) Log(level logger.Level, format string, args ...interface{}) {
|
|
c.Parent.Log(level, "[record cleaner]"+format, args...)
|
|
}
|
|
|
|
// ReloadPathConfs is called by core.Core.
|
|
func (c *Cleaner) ReloadPathConfs(pathConfs map[string]*conf.Path) {
|
|
select {
|
|
case c.chReloadConf <- pathConfs:
|
|
case <-c.ctx.Done():
|
|
}
|
|
}
|
|
|
|
func (c *Cleaner) run() {
|
|
defer close(c.done)
|
|
|
|
c.doRun() //nolint:errcheck
|
|
|
|
for {
|
|
select {
|
|
case <-time.After(c.cleanInterval()):
|
|
c.doRun()
|
|
|
|
case cnf := <-c.chReloadConf:
|
|
c.PathConfs = cnf
|
|
|
|
case <-c.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Cleaner) atLeastOneRecordDeleteAfter() bool {
|
|
for _, e := range c.PathConfs {
|
|
if e.RecordDeleteAfter != 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *Cleaner) cleanInterval() time.Duration {
|
|
if !c.atLeastOneRecordDeleteAfter() {
|
|
return 365 * 24 * time.Hour
|
|
}
|
|
|
|
interval := 30 * 60 * time.Second
|
|
|
|
for _, e := range c.PathConfs {
|
|
if e.RecordDeleteAfter != 0 &&
|
|
interval > (time.Duration(e.RecordDeleteAfter)/2) {
|
|
interval = time.Duration(e.RecordDeleteAfter) / 2
|
|
}
|
|
}
|
|
|
|
return interval
|
|
}
|
|
|
|
func (c *Cleaner) doRun() {
|
|
now := timeNow()
|
|
|
|
pathNames := recordstore.FindAllPathsWithSegments(c.PathConfs)
|
|
|
|
for _, pathName := range pathNames {
|
|
c.processPath(now, pathName) //nolint:errcheck
|
|
}
|
|
}
|
|
|
|
func (c *Cleaner) processPath(now time.Time, pathName string) error {
|
|
pathConf, _, err := conf.FindPathConf(c.PathConfs, pathName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if pathConf.RecordDeleteAfter == 0 {
|
|
return nil
|
|
}
|
|
|
|
segments, err := recordstore.FindSegments(pathConf, pathName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, seg := range segments {
|
|
if now.Sub(seg.Start) > time.Duration(pathConf.RecordDeleteAfter) {
|
|
c.Log(logger.Debug, "removing %s", seg.Fpath)
|
|
os.Remove(seg.Fpath)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|