mediamtx/internal/rtmp/rawmessage/reader.go

281 lines
6.1 KiB
Go
Raw Normal View History

2022-06-04 23:06:40 +00:00
package rawmessage
2022-05-16 09:57:29 +00:00
import (
"errors"
"fmt"
"time"
2022-06-04 23:06:40 +00:00
2022-06-08 18:47:36 +00:00
"github.com/aler9/rtsp-simple-server/internal/rtmp/bytecounter"
2022-06-04 23:06:40 +00:00
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
2022-05-16 09:57:29 +00:00
)
var errMoreChunksNeeded = errors.New("more chunks are needed")
2022-06-04 23:06:40 +00:00
type readerChunkStream struct {
mr *Reader
2022-05-16 09:57:29 +00:00
curTimestamp *uint32
2022-06-04 23:06:40 +00:00
curType *chunk.MessageType
2022-05-16 09:57:29 +00:00
curMessageStreamID *uint32
curBodyLen *uint32
curBody *[]byte
2022-06-07 19:09:57 +00:00
curTimestampDelta *uint32
2022-05-16 09:57:29 +00:00
}
2022-06-08 18:47:36 +00:00
func (rc *readerChunkStream) readChunk(c chunk.Chunk, chunkBodySize uint32) error {
err := c.Read(rc.mr.r, chunkBodySize)
if err != nil {
return err
}
// check if an ack is needed
if rc.mr.ackWindowSize != 0 {
count := rc.mr.r.Count()
diff := count - rc.mr.lastAckCount
if diff > (rc.mr.ackWindowSize) {
err := rc.mr.onAckNeeded(count)
if err != nil {
return err
}
rc.mr.lastAckCount += (rc.mr.ackWindowSize)
}
}
return nil
}
func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
2022-05-16 09:57:29 +00:00
switch typ {
case 0:
if rc.curBody != nil {
return nil, fmt.Errorf("received type 0 chunk but expected type 3 chunk")
}
2022-06-04 23:06:40 +00:00
var c0 chunk.Chunk0
2022-06-08 18:47:36 +00:00
err := rc.readChunk(&c0, rc.mr.chunkSize)
2022-05-16 09:57:29 +00:00
if err != nil {
return nil, err
}
v1 := c0.MessageStreamID
rc.curMessageStreamID = &v1
v2 := c0.Type
rc.curType = &v2
v3 := c0.Timestamp
rc.curTimestamp = &v3
v4 := c0.BodyLen
rc.curBodyLen = &v4
2022-06-07 19:09:57 +00:00
rc.curTimestampDelta = nil
2022-05-16 09:57:29 +00:00
if c0.BodyLen != uint32(len(c0.Body)) {
rc.curBody = &c0.Body
return nil, errMoreChunksNeeded
}
2022-06-04 23:06:40 +00:00
return &Message{
Timestamp: time.Duration(c0.Timestamp) * time.Millisecond,
2022-05-16 09:57:29 +00:00
Type: c0.Type,
MessageStreamID: c0.MessageStreamID,
Body: c0.Body,
}, nil
case 1:
if rc.curTimestamp == nil {
return nil, fmt.Errorf("received type 1 chunk without previous chunk")
}
if rc.curBody != nil {
return nil, fmt.Errorf("received type 1 chunk but expected type 3 chunk")
}
2022-06-04 23:06:40 +00:00
var c1 chunk.Chunk1
2022-06-08 18:47:36 +00:00
err := rc.readChunk(&c1, rc.mr.chunkSize)
2022-05-16 09:57:29 +00:00
if err != nil {
return nil, err
}
v2 := c1.Type
rc.curType = &v2
v3 := *rc.curTimestamp + c1.TimestampDelta
rc.curTimestamp = &v3
v4 := c1.BodyLen
rc.curBodyLen = &v4
2022-06-07 19:09:57 +00:00
v5 := c1.TimestampDelta
rc.curTimestampDelta = &v5
2022-05-16 09:57:29 +00:00
if c1.BodyLen != uint32(len(c1.Body)) {
rc.curBody = &c1.Body
return nil, errMoreChunksNeeded
}
2022-06-04 23:06:40 +00:00
return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
2022-05-16 09:57:29 +00:00
Type: c1.Type,
MessageStreamID: *rc.curMessageStreamID,
Body: c1.Body,
}, nil
case 2:
if rc.curTimestamp == nil {
return nil, fmt.Errorf("received type 2 chunk without previous chunk")
}
if rc.curBody != nil {
return nil, fmt.Errorf("received type 2 chunk but expected type 3 chunk")
}
chunkBodyLen := *rc.curBodyLen
2022-05-16 09:57:29 +00:00
if chunkBodyLen > rc.mr.chunkSize {
chunkBodyLen = rc.mr.chunkSize
}
2022-06-04 23:06:40 +00:00
var c2 chunk.Chunk2
2022-06-08 18:47:36 +00:00
err := rc.readChunk(&c2, chunkBodyLen)
2022-05-16 09:57:29 +00:00
if err != nil {
return nil, err
}
2022-06-07 19:09:57 +00:00
v1 := *rc.curTimestamp + c2.TimestampDelta
rc.curTimestamp = &v1
v2 := c2.TimestampDelta
rc.curTimestampDelta = &v2
2022-05-16 09:57:29 +00:00
if *rc.curBodyLen != uint32(len(c2.Body)) {
2022-05-16 09:57:29 +00:00
rc.curBody = &c2.Body
return nil, errMoreChunksNeeded
}
2022-06-04 23:06:40 +00:00
return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
2022-05-16 09:57:29 +00:00
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
Body: c2.Body,
}, nil
default: // 3
2022-06-07 19:09:57 +00:00
if rc.curBody == nil && rc.curTimestampDelta == nil {
2022-05-16 09:57:29 +00:00
return nil, fmt.Errorf("received type 3 chunk without previous chunk")
}
2022-06-07 19:09:57 +00:00
if rc.curBody != nil {
2022-06-08 18:47:36 +00:00
chunkBodyLen := (*rc.curBodyLen) - uint32(len(*rc.curBody))
2022-06-07 19:09:57 +00:00
if chunkBodyLen > rc.mr.chunkSize {
chunkBodyLen = rc.mr.chunkSize
}
var c3 chunk.Chunk3
2022-06-08 18:47:36 +00:00
err := rc.readChunk(&c3, chunkBodyLen)
2022-06-07 19:09:57 +00:00
if err != nil {
return nil, err
}
*rc.curBody = append(*rc.curBody, c3.Body...)
if *rc.curBodyLen != uint32(len(*rc.curBody)) {
return nil, errMoreChunksNeeded
}
body := *rc.curBody
rc.curBody = nil
return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
2022-06-07 19:09:57 +00:00
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
Body: body,
}, nil
2022-05-16 09:57:29 +00:00
}
2022-06-08 18:47:36 +00:00
chunkBodyLen := (*rc.curBodyLen)
2022-05-16 09:57:29 +00:00
if chunkBodyLen > rc.mr.chunkSize {
chunkBodyLen = rc.mr.chunkSize
}
2022-06-04 23:06:40 +00:00
var c3 chunk.Chunk3
2022-06-08 18:47:36 +00:00
err := rc.readChunk(&c3, chunkBodyLen)
2022-05-16 09:57:29 +00:00
if err != nil {
return nil, err
}
2022-06-07 19:09:57 +00:00
v1 := *rc.curTimestamp + *rc.curTimestampDelta
rc.curTimestamp = &v1
2022-05-16 09:57:29 +00:00
2022-07-19 14:42:11 +00:00
if *rc.curBodyLen != uint32(len(c3.Body)) {
rc.curBody = &c3.Body
return nil, errMoreChunksNeeded
}
2022-06-04 23:06:40 +00:00
return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
2022-05-16 09:57:29 +00:00
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
2022-06-07 19:09:57 +00:00
Body: c3.Body,
2022-05-16 09:57:29 +00:00
}, nil
}
}
2022-06-04 23:06:40 +00:00
// Reader is a raw message reader.
type Reader struct {
2022-06-08 18:47:36 +00:00
r *bytecounter.Reader
onAckNeeded func(uint32) error
chunkSize uint32
ackWindowSize uint32
lastAckCount uint32
chunkStreams map[byte]*readerChunkStream
2022-05-16 09:57:29 +00:00
}
2022-06-04 23:06:40 +00:00
// NewReader allocates a Reader.
2022-06-08 18:47:36 +00:00
func NewReader(r *bytecounter.Reader, onAckNeeded func(uint32) error) *Reader {
2022-06-04 23:06:40 +00:00
return &Reader{
r: r,
2022-06-08 18:47:36 +00:00
onAckNeeded: onAckNeeded,
2022-05-16 09:57:29 +00:00
chunkSize: 128,
2022-06-04 23:06:40 +00:00
chunkStreams: make(map[byte]*readerChunkStream),
2022-05-16 09:57:29 +00:00
}
}
// SetChunkSize sets the maximum chunk size.
2022-06-08 18:47:36 +00:00
func (r *Reader) SetChunkSize(v uint32) {
2022-06-04 23:06:40 +00:00
r.chunkSize = v
2022-05-16 09:57:29 +00:00
}
2022-06-08 18:47:36 +00:00
// SetWindowAckSize sets the window acknowledgement size.
func (r *Reader) SetWindowAckSize(v uint32) {
r.ackWindowSize = v
}
2022-06-04 23:06:40 +00:00
// Read reads a Message.
func (r *Reader) Read() (*Message, error) {
2022-05-16 09:57:29 +00:00
for {
2022-06-04 23:06:40 +00:00
byt, err := r.r.ReadByte()
2022-05-16 09:57:29 +00:00
if err != nil {
return nil, err
}
typ := byt >> 6
chunkStreamID := byt & 0x3F
2022-06-04 23:06:40 +00:00
rc, ok := r.chunkStreams[chunkStreamID]
2022-05-16 09:57:29 +00:00
if !ok {
2022-06-04 23:06:40 +00:00
rc = &readerChunkStream{mr: r}
r.chunkStreams[chunkStreamID] = rc
2022-05-16 09:57:29 +00:00
}
2022-06-04 23:06:40 +00:00
r.r.UnreadByte()
2022-05-16 09:57:29 +00:00
2022-06-08 18:47:36 +00:00
msg, err := rc.readMessage(typ)
2022-05-16 09:57:29 +00:00
if err != nil {
if err == errMoreChunksNeeded {
continue
}
return nil, err
}
msg.ChunkStreamID = chunkStreamID
return msg, err
}
}