mediamtx/internal/core/stream.go

142 lines
3.1 KiB
Go

package core
import (
"bytes"
"sync"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/h264"
)
type streamNonRTSPReadersMap struct {
mutex sync.RWMutex
ma map[reader]struct{}
}
func newStreamNonRTSPReadersMap() *streamNonRTSPReadersMap {
return &streamNonRTSPReadersMap{
ma: make(map[reader]struct{}),
}
}
func (m *streamNonRTSPReadersMap) close() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.ma = nil
}
func (m *streamNonRTSPReadersMap) add(r reader) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.ma[r] = struct{}{}
}
func (m *streamNonRTSPReadersMap) remove(r reader) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.ma, r)
}
func (m *streamNonRTSPReadersMap) forwardPacketRTP(trackID int, data *data) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for c := range m.ma {
c.onReaderData(trackID, data)
}
}
type stream struct {
nonRTSPReaders *streamNonRTSPReadersMap
rtspStream *gortsplib.ServerStream
}
func newStream(tracks gortsplib.Tracks) *stream {
s := &stream{
nonRTSPReaders: newStreamNonRTSPReadersMap(),
rtspStream: gortsplib.NewServerStream(tracks),
}
return s
}
func (s *stream) close() {
s.nonRTSPReaders.close()
s.rtspStream.Close()
}
func (s *stream) tracks() gortsplib.Tracks {
return s.rtspStream.Tracks()
}
func (s *stream) readerAdd(r reader) {
if _, ok := r.(pathRTSPSession); !ok {
s.nonRTSPReaders.add(r)
}
}
func (s *stream) readerRemove(r reader) {
if _, ok := r.(pathRTSPSession); !ok {
s.nonRTSPReaders.remove(r)
}
}
func (s *stream) updateH264TrackParameters(h264track *gortsplib.TrackH264, nalus [][]byte) {
for _, nalu := range nalus {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
if !bytes.Equal(nalu, h264track.SPS()) {
h264track.SetSPS(append([]byte(nil), nalu...))
}
case h264.NALUTypePPS:
if !bytes.Equal(nalu, h264track.PPS()) {
h264track.SetPPS(append([]byte(nil), nalu...))
}
}
}
}
// remux is needed to
// - fix corrupted streams
// - make streams compatible with all protocols
func (s *stream) remuxH264NALUs(h264track *gortsplib.TrackH264, data *data) {
var filteredNALUs [][]byte //nolint:prealloc
for _, nalu := range data.h264NALUs {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS:
// remove since they're automatically added before every IDR
continue
case h264.NALUTypeAccessUnitDelimiter:
// remove since it is not needed
continue
case h264.NALUTypeIDR:
// add SPS and PPS before every IDR
filteredNALUs = append(filteredNALUs, h264track.SPS(), h264track.PPS())
}
filteredNALUs = append(filteredNALUs, nalu)
}
data.h264NALUs = filteredNALUs
}
func (s *stream) writeData(trackID int, data *data) {
track := s.rtspStream.Tracks()[trackID]
if h264track, ok := track.(*gortsplib.TrackH264); ok {
s.updateH264TrackParameters(h264track, data.h264NALUs)
s.remuxH264NALUs(h264track, data)
}
// forward to RTSP readers
s.rtspStream.WritePacketRTP(trackID, data.rtp, data.ptsEqualsDTS)
// forward to non-RTSP readers
s.nonRTSPReaders.forwardPacketRTP(trackID, data)
}