split RTP packet handling from data handling (#2337)

This commit is contained in:
Alessandro Ros 2023-09-16 17:16:33 +02:00 committed by GitHub
parent f786f64690
commit c4cb4200ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 656 additions and 654 deletions

View File

@ -47,63 +47,68 @@ func (t *formatProcessorAV1) createEncoder() error {
return t.encoder.Init()
}
func (t *formatProcessorAV1) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := u.(*unit.AV1)
func (t *formatProcessorAV1) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.AV1)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
tu, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpav1.ErrNonStartingPacketAndNoPrevious || err == rtpav1.ErrMorePacketsNeeded {
return nil
}
return err
}
tunit.TU = tu
}
// route packet as is
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.TU)
pkts, err := t.encoder.Encode(u.TU)
if err != nil {
return err
}
setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS)
tunit.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts {
pkt.Timestamp = ts
}
u.RTPPackets = pkts
return nil
}
func (t *formatProcessorAV1) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.AV1{
func (t *formatProcessorAV1) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.AV1{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
tu, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpav1.ErrNonStartingPacketAndNoPrevious || err == rtpav1.ErrMorePacketsNeeded {
return u, nil
}
return nil, err
}
u.TU = tu
}
// route packet as is
return u, nil
}

View File

@ -1,29 +0,0 @@
package formatprocessor
import (
"time"
"github.com/pion/rtp"
)
// BaseUnit contains fields shared across all units.
type BaseUnit struct {
RTPPackets []*rtp.Packet
NTP time.Time
PTS time.Duration
}
// GetRTPPackets implements Unit.
func (u *BaseUnit) GetRTPPackets() []*rtp.Packet {
return u.RTPPackets
}
// GetNTP implements Unit.
func (u *BaseUnit) GetNTP() time.Time {
return u.NTP
}
// GetPTS implements Unit.
func (u *BaseUnit) GetPTS() time.Duration {
return u.PTS
}

View File

@ -28,29 +28,32 @@ func newGeneric(
}, nil
}
func (t *formatProcessorGeneric) Process(u unit.Unit, _ bool) error {
tunit := u.(*unit.Generic)
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
return nil
func (t *formatProcessorGeneric) ProcessUnit(_ unit.Unit) error {
return fmt.Errorf("using a generic unit without RTP is not supported")
}
func (t *formatProcessorGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.Generic{
func (t *formatProcessorGeneric) ProcessRTPPacket(
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
_ bool,
) (Unit, error) {
u := &unit.Generic{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
return u, nil
}

View File

@ -2,12 +2,11 @@ package formatprocessor
import (
"testing"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/unit"
)
func TestGenericRemovePadding(t *testing.T) {
@ -31,15 +30,11 @@ func TestGenericRemovePadding(t *testing.T) {
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
Payload: []byte{1, 2, 3, 4},
PaddingSize: 20,
}
err = p.Process(&unit.Generic{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
},
}, false)
_, err = p.ProcessRTPPacket(pkt, time.Time{}, 0, false)
require.NoError(t, err)
require.Equal(t, &rtp.Packet{
@ -51,6 +46,6 @@ func TestGenericRemovePadding(t *testing.T) {
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
Payload: []byte{1, 2, 3, 4},
}, pkt)
}

View File

@ -220,83 +220,105 @@ func (t *formatProcessorH264) remuxAccessUnit(au [][]byte) [][]byte {
return filteredNALUs
}
func (t *formatProcessorH264) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := u.(*unit.H264)
func (t *formatProcessorH264) ProcessUnit(uu unit.Unit) error {
u := uu.(*unit.H264)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
t.updateTrackParametersFromRTPPacket(pkt)
t.updateTrackParametersFromAU(u.AU)
u.AU = t.remuxAccessUnit(u.AU)
if t.encoder == nil {
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
// RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > t.udpMaxPayloadSize {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
err := t.createEncoder(&v1, &v2)
if err != nil {
return err
}
}
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
au, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtph264.ErrNonStartingPacketAndNoPrevious || err == rtph264.ErrMorePacketsNeeded {
if t.encoder != nil {
tunit.RTPPackets = nil
}
return nil
}
return err
}
tunit.AU = t.remuxAccessUnit(au)
}
// route packet as is
if t.encoder == nil {
return nil
}
} else {
t.updateTrackParametersFromAU(tunit.AU)
tunit.AU = t.remuxAccessUnit(tunit.AU)
}
// encode into RTP
if len(tunit.AU) != 0 {
pkts, err := t.encoder.Encode(tunit.AU)
if u.AU != nil {
pkts, err := t.encoder.Encode(u.AU)
if err != nil {
return err
}
setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS)
tunit.RTPPackets = pkts
} else {
tunit.RTPPackets = nil
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts {
pkt.Timestamp = ts
}
u.RTPPackets = pkts
}
return nil
}
func (t *formatProcessorH264) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.H264{
func (t *formatProcessorH264) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.H264{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
t.updateTrackParametersFromRTPPacket(pkt)
if t.encoder == nil {
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
// RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > t.udpMaxPayloadSize {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
err := t.createEncoder(&v1, &v2)
if err != nil {
return nil, err
}
}
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
au, err := t.decoder.Decode(pkt)
if t.encoder != nil {
u.RTPPackets = nil
}
if err != nil {
if err == rtph264.ErrNonStartingPacketAndNoPrevious || err == rtph264.ErrMorePacketsNeeded {
return u, nil
}
return nil, err
}
u.AU = t.remuxAccessUnit(au)
}
// route packet as is
if t.encoder == nil {
return u, nil
}
// encode into RTP
if len(u.AU) != 0 {
pkts, err := t.encoder.Encode(u.AU)
if err != nil {
return nil, err
}
for _, newPKT := range pkts {
newPKT.Timestamp = pkt.Timestamp
}
u.RTPPackets = pkts
}
return u, nil
}

View File

@ -3,6 +3,7 @@ package formatprocessor
import (
"bytes"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
@ -27,36 +28,23 @@ func TestH264DynamicParams(t *testing.T) {
pkts, err := enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}})
require.NoError(t, err)
data := &unit.H264{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}
err = p.Process(data, true)
data, err := p.ProcessRTPPacket(pkts[0], time.Time{}, 0, true)
require.NoError(t, err)
require.Equal(t, [][]byte{
{byte(h264.NALUTypeIDR)},
}, data.AU)
}, data.(*unit.H264).AU)
pkts, err = enc.Encode([][]byte{{7, 4, 5, 6}}) // SPS
require.NoError(t, err)
err = p.Process(&unit.H264{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
_, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false)
require.NoError(t, err)
pkts, err = enc.Encode([][]byte{{8, 1}}) // PPS
require.NoError(t, err)
err = p.Process(&unit.H264{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
_, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false)
require.NoError(t, err)
require.Equal(t, []byte{7, 4, 5, 6}, forma.SPS)
@ -65,19 +53,14 @@ func TestH264DynamicParams(t *testing.T) {
pkts, err = enc.Encode([][]byte{{byte(h264.NALUTypeIDR)}})
require.NoError(t, err)
data = &unit.H264{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}
err = p.Process(data, true)
data, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, true)
require.NoError(t, err)
require.Equal(t, [][]byte{
{0x07, 4, 5, 6},
{0x08, 1},
{byte(h264.NALUTypeIDR)},
}, data.AU)
}, data.(*unit.H264).AU)
}
func TestH264OversizedPackets(t *testing.T) {
@ -131,15 +114,10 @@ func TestH264OversizedPackets(t *testing.T) {
Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04},
},
} {
data := &unit.H264{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
},
}
err := p.Process(data, false)
data, err := p.ProcessRTPPacket(pkt, time.Time{}, 0, false)
require.NoError(t, err)
out = append(out, data.RTPPackets...)
out = append(out, data.GetRTPPackets()...)
}
require.Equal(t, []*rtp.Packet{
@ -201,7 +179,7 @@ func TestH264EmptyPacket(t *testing.T) {
},
}
err = p.Process(unit, false)
err = p.ProcessUnit(unit)
require.NoError(t, err)
// if all NALUs have been removed, no RTP packets must be generated.

View File

@ -242,83 +242,105 @@ func (t *formatProcessorH265) remuxAccessUnit(au [][]byte) [][]byte {
return filteredNALUs
}
func (t *formatProcessorH265) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := u.(*unit.H265)
func (t *formatProcessorH265) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.H265)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
t.updateTrackParametersFromRTPPacket(pkt)
t.updateTrackParametersFromAU(u.AU)
u.AU = t.remuxAccessUnit(u.AU)
if t.encoder == nil {
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
// RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > t.udpMaxPayloadSize {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
err := t.createEncoder(&v1, &v2)
if err != nil {
return err
}
}
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
au, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtph265.ErrNonStartingPacketAndNoPrevious || err == rtph265.ErrMorePacketsNeeded {
if t.encoder != nil {
tunit.RTPPackets = nil
}
return nil
}
return err
}
tunit.AU = t.remuxAccessUnit(au)
}
// route packet as is
if t.encoder == nil {
return nil
}
} else {
t.updateTrackParametersFromAU(tunit.AU)
tunit.AU = t.remuxAccessUnit(tunit.AU)
}
// encode into RTP
if len(tunit.AU) != 0 {
pkts, err := t.encoder.Encode(tunit.AU)
if u.AU != nil {
pkts, err := t.encoder.Encode(u.AU)
if err != nil {
return err
}
setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS)
tunit.RTPPackets = pkts
} else {
tunit.RTPPackets = nil
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts {
pkt.Timestamp = ts
}
u.RTPPackets = pkts
}
return nil
}
func (t *formatProcessorH265) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.H265{
func (t *formatProcessorH265) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.H265{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
t.updateTrackParametersFromRTPPacket(pkt)
if t.encoder == nil {
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
// RTP packets exceed maximum size: start re-encoding them
if pkt.MarshalSize() > t.udpMaxPayloadSize {
v1 := pkt.SSRC
v2 := pkt.SequenceNumber
err := t.createEncoder(&v1, &v2)
if err != nil {
return nil, err
}
}
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil || t.encoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
au, err := t.decoder.Decode(pkt)
if t.encoder != nil {
u.RTPPackets = nil
}
if err != nil {
if err == rtph265.ErrNonStartingPacketAndNoPrevious || err == rtph265.ErrMorePacketsNeeded {
return u, nil
}
return nil, err
}
u.AU = t.remuxAccessUnit(au)
}
// route packet as is
if t.encoder == nil {
return u, nil
}
// encode into RTP
if len(u.AU) != 0 {
pkts, err := t.encoder.Encode(u.AU)
if err != nil {
return nil, err
}
for _, newPKT := range pkts {
newPKT.Timestamp = pkt.Timestamp
}
u.RTPPackets = pkts
}
return u, nil
}

View File

@ -3,6 +3,7 @@ package formatprocessor
import (
"bytes"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
@ -26,46 +27,29 @@ func TestH265DynamicParams(t *testing.T) {
pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}})
require.NoError(t, err)
data := &unit.H265{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}
err = p.Process(data, true)
data, err := p.ProcessRTPPacket(pkts[0], time.Time{}, 0, true)
require.NoError(t, err)
require.Equal(t, [][]byte{
{byte(h265.NALUType_CRA_NUT) << 1, 0},
}, data.AU)
}, data.(*unit.H265).AU)
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}})
require.NoError(t, err)
err = p.Process(&unit.H265{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
_, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false)
require.NoError(t, err)
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}})
require.NoError(t, err)
err = p.Process(&unit.H265{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
_, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false)
require.NoError(t, err)
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}})
require.NoError(t, err)
err = p.Process(&unit.H265{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}, false)
_, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, false)
require.NoError(t, err)
require.Equal(t, []byte{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}, forma.VPS)
@ -75,12 +59,7 @@ func TestH265DynamicParams(t *testing.T) {
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_CRA_NUT) << 1, 0}})
require.NoError(t, err)
data = &unit.H265{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkts[0]},
},
}
err = p.Process(data, true)
data, err = p.ProcessRTPPacket(pkts[0], time.Time{}, 0, true)
require.NoError(t, err)
require.Equal(t, [][]byte{
@ -88,7 +67,7 @@ func TestH265DynamicParams(t *testing.T) {
{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6},
{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9},
{byte(h265.NALUType_CRA_NUT) << 1, 0},
}, data.AU)
}, data.(*unit.H265).AU)
}
func TestH265OversizedPackets(t *testing.T) {
@ -130,15 +109,10 @@ func TestH265OversizedPackets(t *testing.T) {
Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4),
},
} {
data := &unit.H265{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
},
}
err = p.Process(data, false)
data, err := p.ProcessRTPPacket(pkt, time.Time{}, 0, false)
require.NoError(t, err)
out = append(out, data.RTPPackets...)
out = append(out, data.GetRTPPackets()...)
}
require.Equal(t, []*rtp.Packet{
@ -200,7 +174,7 @@ func TestH265EmptyPacket(t *testing.T) {
},
}
err = p.Process(unit, false)
err = p.ProcessUnit(unit)
require.NoError(t, err)
// if all NALUs have been removed, no RTP packets must be generated.

View File

@ -0,0 +1,112 @@
package formatprocessor //nolint:dupl
import (
"fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg1audio"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/unit"
)
type formatProcessorMPEG1Audio struct {
udpMaxPayloadSize int
format *format.MPEG1Audio
encoder *rtpmpeg1audio.Encoder
decoder *rtpmpeg1audio.Decoder
}
func newMPEG1Audio(
udpMaxPayloadSize int,
forma *format.MPEG1Audio,
generateRTPPackets bool,
) (*formatProcessorMPEG1Audio, error) {
t := &formatProcessorMPEG1Audio{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder()
if err != nil {
return nil, err
}
}
return t, nil
}
func (t *formatProcessorMPEG1Audio) createEncoder() error {
t.encoder = &rtpmpeg1audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
}
return t.encoder.Init()
}
func (t *formatProcessorMPEG1Audio) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.MPEG1Audio)
pkts, err := t.encoder.Encode(u.Frames)
if err != nil {
return err
}
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts {
pkt.Timestamp = ts
}
u.RTPPackets = pkts
return nil
}
func (t *formatProcessorMPEG1Audio) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.MPEG1Audio{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
frames, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpmpeg1audio.ErrNonStartingPacketAndNoPrevious || err == rtpmpeg1audio.ErrMorePacketsNeeded {
return u, nil
}
return nil, err
}
u.Frames = frames
}
// route packet as is
return u, nil
}

View File

@ -1,107 +0,0 @@
package formatprocessor //nolint:dupl
import (
"fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg1audio"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/unit"
)
type formatProcessorMPEG1Audio struct {
udpMaxPayloadSize int
format *format.MPEG1Audio
encoder *rtpmpeg1audio.Encoder
decoder *rtpmpeg1audio.Decoder
}
func newMPEG1Audio(
udpMaxPayloadSize int,
forma *format.MPEG1Audio,
generateRTPPackets bool,
) (*formatProcessorMPEG1Audio, error) {
t := &formatProcessorMPEG1Audio{
udpMaxPayloadSize: udpMaxPayloadSize,
format: forma,
}
if generateRTPPackets {
err := t.createEncoder()
if err != nil {
return nil, err
}
}
return t, nil
}
func (t *formatProcessorMPEG1Audio) createEncoder() error {
t.encoder = &rtpmpeg1audio.Encoder{
PayloadMaxSize: t.udpMaxPayloadSize - 12,
}
return t.encoder.Init()
}
func (t *formatProcessorMPEG1Audio) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := u.(*unit.MPEG1Audio)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
frames, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpmpeg1audio.ErrNonStartingPacketAndNoPrevious || err == rtpmpeg1audio.ErrMorePacketsNeeded {
return nil
}
return err
}
tunit.Frames = frames
}
// route packet as is
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.Frames)
if err != nil {
return err
}
setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS)
tunit.RTPPackets = pkts
return nil
}
func (t *formatProcessorMPEG1Audio) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.MPEG1Audio{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
}

View File

@ -49,63 +49,68 @@ func (t *formatProcessorMPEG4AudioGeneric) createEncoder() error {
return t.encoder.Init()
}
func (t *formatProcessorMPEG4AudioGeneric) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := u.(*unit.MPEG4AudioGeneric)
func (t *formatProcessorMPEG4AudioGeneric) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.MPEG4AudioGeneric)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil || true {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
aus, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpmpeg4audio.ErrMorePacketsNeeded {
return nil
}
return err
}
tunit.AUs = aus
}
// route packet as is
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.AUs)
pkts, err := t.encoder.Encode(u.AUs)
if err != nil {
return err
}
setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS)
tunit.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts {
pkt.Timestamp = ts
}
u.RTPPackets = pkts
return nil
}
func (t *formatProcessorMPEG4AudioGeneric) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.MPEG4AudioGeneric{
func (t *formatProcessorMPEG4AudioGeneric) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.MPEG4AudioGeneric{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
aus, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpmpeg4audio.ErrMorePacketsNeeded {
return u, nil
}
return nil, err
}
u.AUs = aus
}
// route packet as is
return u, nil
}

View File

@ -45,63 +45,68 @@ func (t *formatProcessorMPEG4AudioLATM) createEncoder() error {
return t.encoder.Init()
}
func (t *formatProcessorMPEG4AudioLATM) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := u.(*unit.MPEG4AudioLATM)
func (t *formatProcessorMPEG4AudioLATM) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.MPEG4AudioLATM)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
au, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpmpeg4audiolatm.ErrMorePacketsNeeded {
return nil
}
return err
}
tunit.AU = au
}
// route packet as is
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.AU)
pkts, err := t.encoder.Encode(u.AU)
if err != nil {
return err
}
setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS)
tunit.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts {
pkt.Timestamp = ts
}
u.RTPPackets = pkts
return nil
}
func (t *formatProcessorMPEG4AudioLATM) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.MPEG4AudioLATM{
func (t *formatProcessorMPEG4AudioLATM) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.MPEG4AudioLATM{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
au, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpmpeg4audiolatm.ErrMorePacketsNeeded {
return u, nil
}
return nil, err
}
u.AU = au
}
// route packet as is
return u, nil
}

View File

@ -47,67 +47,71 @@ func (t *formatProcessorOpus) createEncoder() error {
return t.encoder.Init()
}
func (t *formatProcessorOpus) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := u.(*unit.Opus)
func (t *formatProcessorOpus) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.Opus)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
packet, err := t.decoder.Decode(pkt)
if err != nil {
return err
}
tunit.Packets = [][]byte{packet}
}
// route packet as is
return nil
}
// encode into RTP
var rtpPackets []*rtp.Packet //nolint:prealloc
pts := tunit.PTS
for _, packet := range tunit.Packets {
pts := u.PTS
for _, packet := range u.Packets {
pkt, err := t.encoder.Encode(packet)
if err != nil {
return err
}
setTimestamp([]*rtp.Packet{pkt}, tunit.RTPPackets, t.format.ClockRate(), pts)
ts := uint32(multiplyAndDivide(pts, time.Duration(t.format.ClockRate()), time.Second))
pkt.Timestamp = ts
rtpPackets = append(rtpPackets, pkt)
pts += opus.PacketDuration(packet)
}
tunit.RTPPackets = rtpPackets
u.RTPPackets = rtpPackets
return nil
}
func (t *formatProcessorOpus) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.Opus{
func (t *formatProcessorOpus) ProcessRTPPacket(
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.Opus{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
packet, err := t.decoder.Decode(pkt)
if err != nil {
return nil, err
}
u.Packets = [][]byte{packet}
}
// route packet as is
return u, nil
}

View File

@ -18,25 +18,18 @@ func multiplyAndDivide(v, m, d time.Duration) time.Duration {
return (secs*m + dec*m/d)
}
func setTimestamp(newPackets []*rtp.Packet, oldPackets []*rtp.Packet, clockRate int, pts time.Duration) {
if oldPackets != nil { // get timestamp from old packets
for _, pkt := range newPackets {
pkt.Timestamp = oldPackets[0].Timestamp
}
} else { // get timestamp from PTS
for _, pkt := range newPackets {
pkt.Timestamp = uint32(multiplyAndDivide(pts, time.Duration(clockRate), time.Second))
}
}
}
// Processor cleans and normalizes streams.
type Processor interface {
// cleans and normalizes a data unit.
Process(unit.Unit, bool) error
// process a Unit.
ProcessUnit(unit.Unit) error
// wraps a RTP packet into a Unit.
UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit
// process a RTP packet and convert it into a unit.
ProcessRTPPacket(
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error)
}
// New allocates a Processor.

View File

@ -46,63 +46,68 @@ func (t *formatProcessorVP8) createEncoder() error {
return t.encoder.Init()
}
func (t *formatProcessorVP8) Process(y unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := y.(*unit.VP8)
func (t *formatProcessorVP8) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.VP8)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
frame, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpvp8.ErrNonStartingPacketAndNoPrevious || err == rtpvp8.ErrMorePacketsNeeded {
return nil
}
return err
}
tunit.Frame = frame
}
// route packet as is
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.Frame)
pkts, err := t.encoder.Encode(u.Frame)
if err != nil {
return err
}
setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS)
tunit.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts {
pkt.Timestamp = ts
}
u.RTPPackets = pkts
return nil
}
func (t *formatProcessorVP8) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.VP8{
func (t *formatProcessorVP8) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.VP8{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
frame, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpvp8.ErrNonStartingPacketAndNoPrevious || err == rtpvp8.ErrMorePacketsNeeded {
return u, nil
}
return nil, err
}
u.Frame = frame
}
// route packet as is
return u, nil
}

View File

@ -46,63 +46,68 @@ func (t *formatProcessorVP9) createEncoder() error {
return t.encoder.Init()
}
func (t *formatProcessorVP9) Process(u unit.Unit, hasNonRTSPReaders bool) error { //nolint:dupl
tunit := u.(*unit.VP9)
func (t *formatProcessorVP9) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.VP9)
if tunit.RTPPackets != nil {
pkt := tunit.RTPPackets[0]
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return err
}
}
frame, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpvp9.ErrNonStartingPacketAndNoPrevious || err == rtpvp9.ErrMorePacketsNeeded {
return nil
}
return err
}
tunit.Frame = frame
}
// route packet as is
return nil
}
// encode into RTP
pkts, err := t.encoder.Encode(tunit.Frame)
pkts, err := t.encoder.Encode(u.Frame)
if err != nil {
return err
}
setTimestamp(pkts, tunit.RTPPackets, t.format.ClockRate(), tunit.PTS)
tunit.RTPPackets = pkts
ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second))
for _, pkt := range pkts {
pkt.Timestamp = ts
}
u.RTPPackets = pkts
return nil
}
func (t *formatProcessorVP9) UnitForRTPPacket(pkt *rtp.Packet, ntp time.Time, pts time.Duration) Unit {
return &unit.VP9{
func (t *formatProcessorVP9) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
hasNonRTSPReaders bool,
) (Unit, error) {
u := &unit.VP9{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0
if pkt.MarshalSize() > t.udpMaxPayloadSize {
return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)",
pkt.MarshalSize(), t.udpMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.format.CreateDecoder()
if err != nil {
return nil, err
}
}
frame, err := t.decoder.Decode(pkt)
if err != nil {
if err == rtpvp9.ErrNonStartingPacketAndNoPrevious || err == rtpvp9.ErrMorePacketsNeeded {
return u, nil
}
return nil, err
}
u.Frame = frame
}
// route packet as is
return u, nil
}

View File

@ -57,14 +57,34 @@ func (sf *streamFormat) removeReader(r *asyncwriter.Writer) {
}
func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) {
hasNonRTSPReaders := len(sf.readers) > 0
err := sf.proc.Process(u, hasNonRTSPReaders)
err := sf.proc.ProcessUnit(u)
if err != nil {
sf.decodeErrLogger.Log(logger.Warn, err.Error())
return
}
sf.writeUnitInner(s, medi, u)
}
func (sf *streamFormat) writeRTPPacket(
s *Stream,
medi *description.Media,
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
) {
hasNonRTSPReaders := len(sf.readers) > 0
u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders)
if err != nil {
sf.decodeErrLogger.Log(logger.Warn, err.Error())
return
}
sf.writeUnitInner(s, medi, u)
}
func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u unit.Unit) {
atomic.AddUint64(s.bytesReceived, unitSize(u))
if s.rtspStream != nil {
@ -86,13 +106,3 @@ func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Uni
})
}
}
func (sf *streamFormat) writeRTPPacket(
s *Stream,
medi *description.Media,
pkt *rtp.Packet,
ntp time.Time,
pts time.Duration,
) {
sf.writeUnit(s, medi, sf.proc.UnitForRTPPacket(pkt, ntp, pts))
}