rtmp: improve performance

reuse existing structs instead of allocating them during every read()
This commit is contained in:
aler9 2022-08-15 15:42:16 +02:00
parent 4f023b25e8
commit 0db2d3eb8c
3 changed files with 91 additions and 66 deletions

View File

@ -3,7 +3,7 @@ package rtmp
import (
"errors"
"fmt"
"net"
"io"
"net/url"
"strings"
"time"
@ -112,9 +112,9 @@ type Conn struct {
}
// NewConn initializes a connection.
func NewConn(nconn net.Conn) *Conn {
func NewConn(rw io.ReadWriter) *Conn {
c := &Conn{}
c.bc = bytecounter.NewReadWriter(nconn)
c.bc = bytecounter.NewReadWriter(rw)
c.mrw = message.NewReadWriter(c.bc, false)
return c
}

View File

@ -1,6 +1,7 @@
package rtmp
import (
"bytes"
"net"
"net/url"
"testing"
@ -1210,3 +1211,32 @@ func TestWriteTracks(t *testing.T) {
Payload: []byte{0x12, 0x10},
}, msg)
}
func BenchmarkRead(b *testing.B) {
var buf bytes.Buffer
for n := 0; n < b.N; n++ {
buf.Write([]byte{
7, 0, 0, 23, 0, 0, 98, 8,
0, 0, 0, 64, 175, 1, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4,
})
}
conn := NewConn(&buf)
for n := 0; n < b.N; n++ {
conn.ReadMessage()
}
}

View File

@ -17,7 +17,7 @@ type readerChunkStream struct {
curType *chunk.MessageType
curMessageStreamID *uint32
curBodyLen *uint32
curBody *[]byte
curBody []byte
curTimestampDelta *uint32
}
@ -52,33 +52,31 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
return nil, fmt.Errorf("received type 0 chunk but expected type 3 chunk")
}
var c0 chunk.Chunk0
err := rc.readChunk(&c0, rc.mr.chunkSize)
err := rc.readChunk(&rc.mr.c0, rc.mr.chunkSize)
if err != nil {
return nil, err
}
v1 := c0.MessageStreamID
v1 := rc.mr.c0.MessageStreamID
rc.curMessageStreamID = &v1
v2 := c0.Type
v2 := rc.mr.c0.Type
rc.curType = &v2
v3 := c0.Timestamp
v3 := rc.mr.c0.Timestamp
rc.curTimestamp = &v3
v4 := c0.BodyLen
v4 := rc.mr.c0.BodyLen
rc.curBodyLen = &v4
rc.curTimestampDelta = nil
if c0.BodyLen != uint32(len(c0.Body)) {
rc.curBody = &c0.Body
if rc.mr.c0.BodyLen != uint32(len(rc.mr.c0.Body)) {
rc.curBody = rc.mr.c0.Body
return nil, errMoreChunksNeeded
}
return &Message{
Timestamp: time.Duration(c0.Timestamp) * time.Millisecond,
Type: c0.Type,
MessageStreamID: c0.MessageStreamID,
Body: c0.Body,
}, nil
rc.mr.msg.Timestamp = time.Duration(rc.mr.c0.Timestamp) * time.Millisecond
rc.mr.msg.Type = rc.mr.c0.Type
rc.mr.msg.MessageStreamID = rc.mr.c0.MessageStreamID
rc.mr.msg.Body = rc.mr.c0.Body
return &rc.mr.msg, nil
case 1:
if rc.curTimestamp == nil {
@ -89,32 +87,30 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
return nil, fmt.Errorf("received type 1 chunk but expected type 3 chunk")
}
var c1 chunk.Chunk1
err := rc.readChunk(&c1, rc.mr.chunkSize)
err := rc.readChunk(&rc.mr.c1, rc.mr.chunkSize)
if err != nil {
return nil, err
}
v2 := c1.Type
v2 := rc.mr.c1.Type
rc.curType = &v2
v3 := *rc.curTimestamp + c1.TimestampDelta
v3 := *rc.curTimestamp + rc.mr.c1.TimestampDelta
rc.curTimestamp = &v3
v4 := c1.BodyLen
v4 := rc.mr.c1.BodyLen
rc.curBodyLen = &v4
v5 := c1.TimestampDelta
v5 := rc.mr.c1.TimestampDelta
rc.curTimestampDelta = &v5
if c1.BodyLen != uint32(len(c1.Body)) {
rc.curBody = &c1.Body
if rc.mr.c1.BodyLen != uint32(len(rc.mr.c1.Body)) {
rc.curBody = rc.mr.c1.Body
return nil, errMoreChunksNeeded
}
return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: c1.Type,
MessageStreamID: *rc.curMessageStreamID,
Body: c1.Body,
}, nil
rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond
rc.mr.msg.Type = rc.mr.c1.Type
rc.mr.msg.MessageStreamID = *rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c1.Body
return &rc.mr.msg, nil
case 2:
if rc.curTimestamp == nil {
@ -130,28 +126,26 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
chunkBodyLen = rc.mr.chunkSize
}
var c2 chunk.Chunk2
err := rc.readChunk(&c2, chunkBodyLen)
err := rc.readChunk(&rc.mr.c2, chunkBodyLen)
if err != nil {
return nil, err
}
v1 := *rc.curTimestamp + c2.TimestampDelta
v1 := *rc.curTimestamp + rc.mr.c2.TimestampDelta
rc.curTimestamp = &v1
v2 := c2.TimestampDelta
v2 := rc.mr.c2.TimestampDelta
rc.curTimestampDelta = &v2
if *rc.curBodyLen != uint32(len(c2.Body)) {
rc.curBody = &c2.Body
if *rc.curBodyLen != uint32(len(rc.mr.c2.Body)) {
rc.curBody = rc.mr.c2.Body
return nil, errMoreChunksNeeded
}
return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
Body: c2.Body,
}, nil
rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond
rc.mr.msg.Type = *rc.curType
rc.mr.msg.MessageStreamID = *rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c2.Body
return &rc.mr.msg, nil
default: // 3
if rc.curBody == nil && rc.curTimestampDelta == nil {
@ -159,32 +153,30 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
}
if rc.curBody != nil {
chunkBodyLen := (*rc.curBodyLen) - uint32(len(*rc.curBody))
chunkBodyLen := (*rc.curBodyLen) - uint32(len(rc.curBody))
if chunkBodyLen > rc.mr.chunkSize {
chunkBodyLen = rc.mr.chunkSize
}
var c3 chunk.Chunk3
err := rc.readChunk(&c3, chunkBodyLen)
err := rc.readChunk(&rc.mr.c3, chunkBodyLen)
if err != nil {
return nil, err
}
*rc.curBody = append(*rc.curBody, c3.Body...)
rc.curBody = append(rc.curBody, rc.mr.c3.Body...)
if *rc.curBodyLen != uint32(len(*rc.curBody)) {
if *rc.curBodyLen != uint32(len(rc.curBody)) {
return nil, errMoreChunksNeeded
}
body := *rc.curBody
body := rc.curBody
rc.curBody = nil
return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
Body: body,
}, nil
rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond
rc.mr.msg.Type = *rc.curType
rc.mr.msg.MessageStreamID = *rc.curMessageStreamID
rc.mr.msg.Body = body
return &rc.mr.msg, nil
}
chunkBodyLen := (*rc.curBodyLen)
@ -192,8 +184,7 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
chunkBodyLen = rc.mr.chunkSize
}
var c3 chunk.Chunk3
err := rc.readChunk(&c3, chunkBodyLen)
err := rc.readChunk(&rc.mr.c3, chunkBodyLen)
if err != nil {
return nil, err
}
@ -201,17 +192,16 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
v1 := *rc.curTimestamp + *rc.curTimestampDelta
rc.curTimestamp = &v1
if *rc.curBodyLen != uint32(len(c3.Body)) {
rc.curBody = &c3.Body
if *rc.curBodyLen != uint32(len(rc.mr.c3.Body)) {
rc.curBody = rc.mr.c3.Body
return nil, errMoreChunksNeeded
}
return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
Body: c3.Body,
}, nil
rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond
rc.mr.msg.Type = *rc.curType
rc.mr.msg.MessageStreamID = *rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c3.Body
return &rc.mr.msg, nil
}
}
@ -223,6 +213,11 @@ type Reader struct {
chunkSize uint32
ackWindowSize uint32
lastAckCount uint32
msg Message
c0 chunk.Chunk0
c1 chunk.Chunk1
c2 chunk.Chunk2
c3 chunk.Chunk3
chunkStreams map[byte]*readerChunkStream
}