RTMP: filter out SPS, PPS and AU NALUs

This commit is contained in:
aler9 2021-04-05 14:53:55 +02:00
parent ede82808bb
commit 7387783506
6 changed files with 270 additions and 125 deletions

View File

@ -57,7 +57,7 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values) {
return pathName, ur.Query()
}
type trackIDBufPair struct {
type trackIDPayloadPair struct {
trackID int
buf []byte
}
@ -82,8 +82,8 @@ type Client struct {
readBufferCount int
runOnConnect string
runOnConnectRestart bool
stats *stats.Stats
wg *sync.WaitGroup
stats *stats.Stats
conn *rtmp.Conn
pathMan PathMan
parent Parent
@ -293,8 +293,7 @@ func (c *Client) runRead() {
if !ok {
return fmt.Errorf("terminated")
}
pair := data.(trackIDBufPair)
pair := data.(trackIDPayloadPair)
now := time.Now()
@ -308,14 +307,22 @@ func (c *Client) runRead() {
}
for _, nt := range nts {
// remove SPS, PPS and AUD, not needed by RTMP
typ := h264.NALUType(nt.NALU[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter:
continue
}
if !videoInitialized {
videoInitialized = true
videoStartDTS = now
videoPTS = nt.Timestamp
}
// aggregate NALUs by PTS
// this delays the stream by one frame, but is required by RTMP
// aggregate NALUs by PTS.
// for instance, aggregate a SEI and a IDR.
// this delays the stream by one frame, but is required by RTMP.
if nt.Timestamp != videoPTS {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WriteH264(videoBuf, now.Sub(videoStartDTS))
@ -515,10 +522,17 @@ func (c *Client) runPublish() {
ts := pkt.Time + pkt.CTime
var nts []*rtph264.NALUAndTimestamp
for _, nt := range nalus {
for _, nalu := range nalus {
// remove SPS, PPS and AUD, not needed by RTSP / RTMP
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter:
continue
}
nts = append(nts, &rtph264.NALUAndTimestamp{
Timestamp: ts,
NALU: nt,
NALU: nalu,
})
}
@ -616,8 +630,8 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
}
// OnFrame implements path.Reader.
func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, buf []byte) {
func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP {
c.ringBuffer.Push(trackIDBufPair{trackID, buf})
c.ringBuffer.Push(trackIDPayloadPair{trackID, payload})
}
}

View File

@ -4,93 +4,6 @@ import (
"fmt"
)
func removeAntiCompetition(nalu []byte) []byte {
// 0x00 0x00 0x03 0x00 -> 0x00 0x00 0x00
// 0x00 0x00 0x03 0x01 -> 0x00 0x00 0x01
// 0x00 0x00 0x03 0x02 -> 0x00 0x00 0x02
// 0x00 0x00 0x03 0x03 -> 0x00 0x00 0x03
var ret []byte
step := 0
start := 0
for i, b := range nalu {
switch step {
case 0:
if b == 0 {
step++
}
case 1:
if b == 0 {
step++
} else {
step = 0
}
case 2:
if b == 3 {
step++
} else {
step = 0
}
case 3:
switch b {
case 3, 2, 1, 0:
ret = append(ret, nalu[start:i-3]...)
ret = append(ret, []byte{0x00, 0x00, b}...)
step = 0
start = i + 1
default:
step = 0
}
}
}
ret = append(ret, nalu[start:]...)
return ret
}
func addAntiCompetition(dest []byte, nalu []byte) []byte {
step := 0
start := 0
for i, b := range nalu {
switch step {
case 0:
if b == 0 {
step++
}
case 1:
if b == 0 {
step++
} else {
step = 0
}
case 2:
switch b {
case 3, 2, 1, 0:
dest = append(dest, nalu[start:i-2]...)
dest = append(dest, []byte{0x00, 0x00, 0x03, b}...)
step = 0
start = i + 1
default:
step = 0
}
}
}
dest = append(dest, nalu[start:]...)
return dest
}
// DecodeAnnexB decodes NALUs from the Annex-B code stream format.
func DecodeAnnexB(byts []byte) ([][]byte, error) {
bl := len(byts)
@ -134,7 +47,7 @@ func DecodeAnnexB(byts []byte) ([][]byte, error) {
if len(nalu) == 0 {
return nil, fmt.Errorf("empty NALU")
}
ret = append(ret, removeAntiCompetition(nalu))
ret = append(ret, nalu)
start = i + 1
}
zeros = 0
@ -148,7 +61,7 @@ func DecodeAnnexB(byts []byte) ([][]byte, error) {
if len(nalu) == 0 {
return nil, fmt.Errorf("empty NALU")
}
ret = append(ret, removeAntiCompetition(nalu))
ret = append(ret, nalu)
return ret, nil
}
@ -159,7 +72,7 @@ func EncodeAnnexB(nalus [][]byte) ([]byte, error) {
for _, nalu := range nalus {
ret = append(ret, []byte{0x00, 0x00, 0x00, 0x01}...)
ret = addAntiCompetition(ret, nalu)
ret = append(ret, nalu...)
}
return ret, nil

View File

@ -63,31 +63,6 @@ var annexBCases = []struct {
{0xee, 0xff},
},
},
{
"anti-competition",
[]byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x03, 0x00,
0x00, 0x00, 0x03, 0x01,
0x00, 0x00, 0x03, 0x02,
0x00, 0x00, 0x03, 0x03,
},
[]byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x03, 0x00,
0x00, 0x00, 0x03, 0x01,
0x00, 0x00, 0x03, 0x02,
0x00, 0x00, 0x03, 0x03,
},
[][]byte{
{
0x00, 0x00, 0x00,
0x00, 0x00, 0x01,
0x00, 0x00, 0x02,
0x00, 0x00, 0x03,
},
},
},
}
func TestAnnexBDecode(t *testing.T) {

View File

@ -0,0 +1,90 @@
package h264
// AntiCompetitionAdd adds the anti-competition bytes to a NALU.
func AntiCompetitionAdd(nalu []byte) []byte {
var ret []byte
step := 0
start := 0
for i, b := range nalu {
switch step {
case 0:
if b == 0 {
step++
}
case 1:
if b == 0 {
step++
} else {
step = 0
}
case 2:
switch b {
case 3, 2, 1, 0:
ret = append(ret, nalu[start:i-2]...)
ret = append(ret, []byte{0x00, 0x00, 0x03, b}...)
step = 0
start = i + 1
default:
step = 0
}
}
}
ret = append(ret, nalu[start:]...)
return ret
}
// AntiCompetitionRemove removes the anti-competition bytes from a NALU.
func AntiCompetitionRemove(nalu []byte) []byte {
// 0x00 0x00 0x03 0x00 -> 0x00 0x00 0x00
// 0x00 0x00 0x03 0x01 -> 0x00 0x00 0x01
// 0x00 0x00 0x03 0x02 -> 0x00 0x00 0x02
// 0x00 0x00 0x03 0x03 -> 0x00 0x00 0x03
var ret []byte
step := 0
start := 0
for i, b := range nalu {
switch step {
case 0:
if b == 0 {
step++
}
case 1:
if b == 0 {
step++
} else {
step = 0
}
case 2:
if b == 3 {
step++
} else {
step = 0
}
case 3:
switch b {
case 3, 2, 1, 0:
ret = append(ret, nalu[start:i-3]...)
ret = append(ret, []byte{0x00, 0x00, b}...)
step = 0
start = i + 1
default:
step = 0
}
}
}
ret = append(ret, nalu[start:]...)
return ret
}

View File

@ -0,0 +1,47 @@
package h264
import (
"testing"
"github.com/stretchr/testify/require"
)
var casesAntiCompetition = []struct {
name string
unproc []byte
proc []byte
}{
{
"base",
[]byte{
0x00, 0x00, 0x00,
0x00, 0x00, 0x01,
0x00, 0x00, 0x02,
0x00, 0x00, 0x03,
},
[]byte{
0x00, 0x00, 0x03, 0x00,
0x00, 0x00, 0x03, 0x01,
0x00, 0x00, 0x03, 0x02,
0x00, 0x00, 0x03, 0x03,
},
},
}
func TestAntiCompetitionAdd(t *testing.T) {
for _, ca := range casesAntiCompetition {
t.Run(ca.name, func(t *testing.T) {
proc := AntiCompetitionAdd(ca.unproc)
require.Equal(t, ca.proc, proc)
})
}
}
func TestAntiCompetitionRemove(t *testing.T) {
for _, ca := range casesAntiCompetition {
t.Run(ca.name, func(t *testing.T) {
unproc := AntiCompetitionRemove(ca.proc)
require.Equal(t, ca.unproc, unproc)
})
}
}

106
internal/h264/nalutype.go Normal file
View File

@ -0,0 +1,106 @@
package h264
import (
"fmt"
)
// NALUType is the type of a NALU.
type NALUType uint8
// standard NALU types.
const (
NALUTypeNonIDR NALUType = 1
NALUTypeDataPartitionA NALUType = 2
NALUTypeDataPartitionB NALUType = 3
NALUTypeDataPartitionC NALUType = 4
NALUTypeIDR NALUType = 5
NALUTypeSei NALUType = 6
NALUTypeSPS NALUType = 7
NALUTypePPS NALUType = 8
NALUTypeAccessUnitDelimiter NALUType = 9
NALUTypeEndOfSequence NALUType = 10
NALUTypeEndOfStream NALUType = 11
NALUTypeFillerData NALUType = 12
NALUTypeSPSExtension NALUType = 13
NALUTypePrefix NALUType = 14
NALUTypeSubsetSPS NALUType = 15
NALUTypeReserved16 NALUType = 16
NALUTypeReserved17 NALUType = 17
NALUTypeReserved18 NALUType = 18
NALUTypeSliceLayerWithoutPartitioning NALUType = 19
NALUTypeSliceExtension NALUType = 20
NALUTypeSliceExtensionDepth NALUType = 21
NALUTypeReserved22 NALUType = 22
NALUTypeReserved23 NALUType = 23
NALUTypeStapA NALUType = 24
NALUTypeStapB NALUType = 25
NALUTypeMtap16 NALUType = 26
NALUTypeMtap24 NALUType = 27
NALUTypeFuA NALUType = 28
NALUTypeFuB NALUType = 29
)
// String implements fmt.Stringer.
func (nt NALUType) String() string {
switch nt {
case NALUTypeNonIDR:
return "NonIDR"
case NALUTypeDataPartitionA:
return "DataPartitionA"
case NALUTypeDataPartitionB:
return "DataPartitionB"
case NALUTypeDataPartitionC:
return "DataPartitionC"
case NALUTypeIDR:
return "IDR"
case NALUTypeSei:
return "Sei"
case NALUTypeSPS:
return "SPS"
case NALUTypePPS:
return "PPS"
case NALUTypeAccessUnitDelimiter:
return "AccessUnitDelimiter"
case NALUTypeEndOfSequence:
return "EndOfSequence"
case NALUTypeEndOfStream:
return "EndOfStream"
case NALUTypeFillerData:
return "FillerData"
case NALUTypeSPSExtension:
return "SPSExtension"
case NALUTypePrefix:
return "Prefix"
case NALUTypeSubsetSPS:
return "SubsetSPS"
case NALUTypeReserved16:
return "Reserved16"
case NALUTypeReserved17:
return "Reserved17"
case NALUTypeReserved18:
return "Reserved18"
case NALUTypeSliceLayerWithoutPartitioning:
return "SliceLayerWithoutPartitioning"
case NALUTypeSliceExtension:
return "SliceExtension"
case NALUTypeSliceExtensionDepth:
return "SliceExtensionDepth"
case NALUTypeReserved22:
return "Reserved22"
case NALUTypeReserved23:
return "Reserved23"
case NALUTypeStapA:
return "StapA"
case NALUTypeStapB:
return "StapB"
case NALUTypeMtap16:
return "Mtap16"
case NALUTypeMtap24:
return "Mtap24"
case NALUTypeFuA:
return "FuA"
case NALUTypeFuB:
return "FuB"
}
return fmt.Sprintf("unknown (%d)", nt)
}