mediamtx/internal/asyncwriter/async_writer.go

76 lines
1.3 KiB
Go

// Package asyncwriter contains an asynchronous writer.
package asyncwriter
import (
"fmt"
"github.com/bluenviron/gortsplib/v4/pkg/ringbuffer"
"github.com/bluenviron/mediamtx/internal/logger"
)
// Writer is an asynchronous writer.
type Writer struct {
writeErrLogger logger.Writer
buffer *ringbuffer.RingBuffer
// out
err chan error
}
// New allocates a Writer.
func New(
queueSize int,
parent logger.Writer,
) *Writer {
buffer, _ := ringbuffer.New(uint64(queueSize))
return &Writer{
writeErrLogger: logger.NewLimitedLogger(parent),
buffer: buffer,
err: make(chan error),
}
}
// Start starts the writer routine.
func (w *Writer) Start() {
go w.run()
}
// Stop stops the writer routine.
func (w *Writer) Stop() {
w.buffer.Close()
<-w.err
}
// Error returns whenever there's an error.
func (w *Writer) Error() chan error {
return w.err
}
func (w *Writer) run() {
w.err <- w.runInner()
}
func (w *Writer) runInner() error {
for {
cb, ok := w.buffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
err := cb.(func() error)()
if err != nil {
return err
}
}
}
// Push appends an element to the queue.
func (w *Writer) Push(cb func() error) {
ok := w.buffer.Push(cb)
if !ok {
w.writeErrLogger.Log(logger.Warn, "write queue is full")
}
}