mediamtx/internal/stream/stream_format.go
Dr. Ralf S. Engelschall 4bf0d10079
metrics: add paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent (#2620) (#2619) (#2629)
* add missing Prometheus exports (#2620, #2619):
paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent

* protect Stream.BytesSent()

* add tests

---------

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
2023-11-08 11:20:16 +01:00

112 lines
2.5 KiB
Go

package stream
import (
"sync/atomic"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/formatprocessor"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
func unitSize(u unit.Unit) uint64 {
n := uint64(0)
for _, pkt := range u.GetRTPPackets() {
n += uint64(pkt.MarshalSize())
}
return n
}
type streamFormat struct {
decodeErrLogger logger.Writer
proc formatprocessor.Processor
readers map[*asyncwriter.Writer]readerFunc
}
func newStreamFormat(
udpMaxPayloadSize int,
forma format.Format,
generateRTPPackets bool,
decodeErrLogger logger.Writer,
) (*streamFormat, error) {
proc, err := formatprocessor.New(udpMaxPayloadSize, forma, generateRTPPackets)
if err != nil {
return nil, err
}
sf := &streamFormat{
decodeErrLogger: decodeErrLogger,
proc: proc,
readers: make(map[*asyncwriter.Writer]readerFunc),
}
return sf, nil
}
func (sf *streamFormat) addReader(r *asyncwriter.Writer, cb readerFunc) {
sf.readers[r] = cb
}
func (sf *streamFormat) removeReader(r *asyncwriter.Writer) {
delete(sf.readers, r)
}
func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) {
err := sf.proc.ProcessUnit(u)
if err != nil {
sf.decodeErrLogger.Log(logger.Warn, err.Error())
return
}
sf.writeUnitInner(s, medi, u)
}
func (sf *streamFormat) writeRTPPacket(
s *Stream,
medi *description.Media,
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
) {
hasNonRTSPReaders := len(sf.readers) > 0
u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders)
if err != nil {
sf.decodeErrLogger.Log(logger.Warn, err.Error())
return
}
sf.writeUnitInner(s, medi, u)
}
func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u unit.Unit) {
size := unitSize(u)
atomic.AddUint64(s.bytesReceived, size)
if s.rtspStream != nil {
for _, pkt := range u.GetRTPPackets() {
s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
}
}
if s.rtspsStream != nil {
for _, pkt := range u.GetRTPPackets() {
s.rtspsStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
}
}
for writer, cb := range sf.readers {
ccb := cb
writer.Push(func() error {
atomic.AddUint64(s.bytesSent, size)
return ccb(u)
})
}
}