RTMP client: speed up video reading by 1 frame

This commit is contained in:
aler9 2021-04-05 18:14:06 +02:00
parent 7387783506
commit fb0122ba18
5 changed files with 65 additions and 80 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210404174403-c2d5ced43bcf
github.com/aler9/gortsplib v0.0.0-20210405155604-491b36cdc4b2
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210404174403-c2d5ced43bcf h1:X8ToQ1H9I7gLp37fxH7mo/U0y+GgF+kwmHpeht4MUHE=
github.com/aler9/gortsplib v0.0.0-20210404174403-c2d5ced43bcf/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/gortsplib v0.0.0-20210405155604-491b36cdc4b2 h1:AYmBhFE5DTGoQ8XKqmklWO3+CCYYR1X18XL/b/sndmM=
github.com/aler9/gortsplib v0.0.0-20210405155604-491b36cdc4b2/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

View File

@ -223,6 +223,7 @@ func (c *Client) runRead() {
var videoTrack *gortsplib.Track
var h264Decoder *rtph264.Decoder
var audioTrack *gortsplib.Track
var audioClockRate int
var aacDecoder *rtpaac.Decoder
err = func() error {
@ -241,8 +242,8 @@ func (c *Client) runRead() {
}
audioTrack = t
clockRate, _ := audioTrack.ClockRate()
aacDecoder = rtpaac.NewDecoder(clockRate)
audioClockRate, _ = audioTrack.ClockRate()
aacDecoder = rtpaac.NewDecoder(audioClockRate)
}
}
@ -284,9 +285,8 @@ func (c *Client) runRead() {
go func() {
writerDone <- func() error {
videoInitialized := false
var videoStartDTS time.Time
var videoBuf [][]byte
var videoPTS time.Duration
var videoStartDTS time.Time
for {
data, ok := c.ringBuffer.Pull()
@ -295,10 +295,8 @@ func (c *Client) runRead() {
}
pair := data.(trackIDPayloadPair)
now := time.Now()
if videoTrack != nil && pair.trackID == videoTrack.ID {
nts, err := h264Decoder.Decode(pair.buf)
nalus, _, err := h264Decoder.Decode(pair.buf)
if err != nil {
if err != rtph264.ErrMorePacketsNeeded {
c.log(logger.Warn, "unable to decode video track: %v", err)
@ -306,46 +304,44 @@ func (c *Client) runRead() {
continue
}
for _, nt := range nts {
if !videoInitialized {
videoInitialized = true
videoStartDTS = time.Now()
}
for _, nalu := range nalus {
// remove SPS, PPS and AUD, not needed by RTMP
typ := h264.NALUType(nt.NALU[0] & 0x1F)
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter:
continue
}
if !videoInitialized {
videoInitialized = true
videoStartDTS = now
videoPTS = nt.Timestamp
}
videoBuf = append(videoBuf, nalu)
}
// aggregate NALUs by PTS.
// for instance, aggregate a SEI and a IDR.
// this delays the stream by one frame, but is required by RTMP.
if nt.Timestamp != videoPTS {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WriteH264(videoBuf, now.Sub(videoStartDTS))
if err != nil {
return err
}
videoBuf = nil
// RTP marker means that all the NALUs with the same PTS have been received.
// send them together.
marker := (pair.buf[1] >> 7 & 0x1) > 0
if marker {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WriteH264(videoBuf, time.Since(videoStartDTS))
if err != nil {
return err
}
videoPTS = nt.Timestamp
videoBuf = append(videoBuf, nt.NALU)
videoBuf = nil
}
} else if audioTrack != nil && pair.trackID == audioTrack.ID {
ats, err := aacDecoder.Decode(pair.buf)
aus, pts, err := aacDecoder.Decode(pair.buf)
if err != nil {
c.log(logger.Warn, "unable to decode audio track: %v", err)
continue
}
for _, at := range ats {
for i, au := range aus {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WriteAAC(at.AU, at.Timestamp)
err := c.conn.WriteAAC(au, pts+time.Duration(i)*1000*time.Second/time.Duration(audioClockRate))
if err != nil {
return err
}
@ -520,8 +516,7 @@ func (c *Client) runPublish() {
return err
}
ts := pkt.Time + pkt.CTime
var nts []*rtph264.NALUAndTimestamp
var outNALUs [][]byte
for _, nalu := range nalus {
// remove SPS, PPS and AUD, not needed by RTSP / RTMP
typ := h264.NALUType(nalu[0] & 0x1F)
@ -530,13 +525,10 @@ func (c *Client) runPublish() {
continue
}
nts = append(nts, &rtph264.NALUAndTimestamp{
Timestamp: ts,
NALU: nalu,
})
outNALUs = append(outNALUs, nalu)
}
frames, err := h264Encoder.Encode(nts)
frames, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime)
if err != nil {
return fmt.Errorf("ERR while encoding H264: %v", err)
}
@ -550,12 +542,7 @@ func (c *Client) runPublish() {
return fmt.Errorf("ERR: received an AAC frame, but track is not set up")
}
frames, err := aacEncoder.Encode([]*rtpaac.AUAndTimestamp{
{
Timestamp: pkt.Time + pkt.CTime,
AU: pkt.Data,
},
})
frames, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime)
if err != nil {
return fmt.Errorf("ERR while encoding AAC: %v", err)
}

View File

@ -14,7 +14,7 @@ const (
NALUTypeDataPartitionB NALUType = 3
NALUTypeDataPartitionC NALUType = 4
NALUTypeIDR NALUType = 5
NALUTypeSei NALUType = 6
NALUTypeSEI NALUType = 6
NALUTypeSPS NALUType = 7
NALUTypePPS NALUType = 8
NALUTypeAccessUnitDelimiter NALUType = 9
@ -32,12 +32,12 @@ const (
NALUTypeSliceExtensionDepth NALUType = 21
NALUTypeReserved22 NALUType = 22
NALUTypeReserved23 NALUType = 23
NALUTypeStapA NALUType = 24
NALUTypeStapB NALUType = 25
NALUTypeMtap16 NALUType = 26
NALUTypeMtap24 NALUType = 27
NALUTypeFuA NALUType = 28
NALUTypeFuB NALUType = 29
NALUTypeSTAPA NALUType = 24
NALUTypeSTAPB NALUType = 25
NALUTypeMTAP16 NALUType = 26
NALUTypeMTAP24 NALUType = 27
NALUTypeFUA NALUType = 28
NALUTypeFUB NALUType = 29
)
// String implements fmt.Stringer.
@ -53,7 +53,7 @@ func (nt NALUType) String() string {
return "DataPartitionC"
case NALUTypeIDR:
return "IDR"
case NALUTypeSei:
case NALUTypeSEI:
return "Sei"
case NALUTypeSPS:
return "SPS"
@ -89,18 +89,18 @@ func (nt NALUType) String() string {
return "Reserved22"
case NALUTypeReserved23:
return "Reserved23"
case NALUTypeStapA:
return "StapA"
case NALUTypeStapB:
return "StapB"
case NALUTypeMtap16:
return "Mtap16"
case NALUTypeMtap24:
return "Mtap24"
case NALUTypeFuA:
return "FuA"
case NALUTypeFuB:
return "FuB"
case NALUTypeSTAPA:
return "STAPA"
case NALUTypeSTAPB:
return "STAPB"
case NALUTypeMTAP16:
return "MTAP16"
case NALUTypeMTAP24:
return "MTAP24"
case NALUTypeFUA:
return "FUA"
case NALUTypeFUB:
return "FUB"
}
return fmt.Sprintf("unknown (%d)", nt)
}

View File

@ -214,16 +214,19 @@ func (s *Source) runInner() bool {
return err
}
ts := pkt.Time + pkt.CTime
var nts []*rtph264.NALUAndTimestamp
for _, nt := range nalus {
nts = append(nts, &rtph264.NALUAndTimestamp{
Timestamp: ts,
NALU: nt,
})
var outNALUs [][]byte
for _, nalu := range nalus {
// remove SPS, PPS and AUD, not needed by RTSP / RTMP
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter:
continue
}
outNALUs = append(outNALUs, nalu)
}
frames, err := h264Encoder.Encode(nts)
frames, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime)
if err != nil {
return fmt.Errorf("ERR while encoding H264: %v", err)
}
@ -237,12 +240,7 @@ func (s *Source) runInner() bool {
return fmt.Errorf("ERR: received an AAC frame, but track is not set up")
}
frames, err := aacEncoder.Encode([]*rtpaac.AUAndTimestamp{
{
Timestamp: pkt.Time + pkt.CTime,
AU: pkt.Data,
},
})
frames, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime)
if err != nil {
return fmt.Errorf("ERR while encoding AAC: %v", err)
}