mediamtx/internal/rtmp/base/messagewriter.go

143 lines
3.1 KiB
Go
Raw Normal View History

2022-05-13 18:53:52 +00:00
package base
import (
"io"
)
type messageWriterChunkStream struct {
mw *MessageWriter
lastMessageStreamID *uint32
2022-05-13 22:11:01 +00:00
lastType *MessageType
lastBodyLen *int
lastTimestamp *uint32
lastTimestampDelta *uint32
2022-05-13 18:53:52 +00:00
}
func (wc *messageWriterChunkStream) write(msg *Message) error {
2022-05-13 18:53:52 +00:00
bodyLen := len(msg.Body)
pos := 0
firstChunk := true
2022-05-13 18:53:52 +00:00
2022-05-13 22:11:01 +00:00
var timestampDelta *uint32
if wc.lastTimestamp != nil {
v := msg.Timestamp - *wc.lastTimestamp
timestampDelta = &v
}
2022-05-13 18:53:52 +00:00
for {
chunkBodyLen := bodyLen - pos
if chunkBodyLen > wc.mw.chunkSize {
chunkBodyLen = wc.mw.chunkSize
2022-05-13 18:53:52 +00:00
}
if firstChunk {
firstChunk = false
2022-05-13 18:53:52 +00:00
2022-05-13 22:11:01 +00:00
switch {
case wc.lastMessageStreamID == nil || *wc.lastMessageStreamID != msg.MessageStreamID:
2022-05-13 18:53:52 +00:00
err := Chunk0{
ChunkStreamID: msg.ChunkStreamID,
Type: msg.Type,
2022-05-13 18:53:52 +00:00
MessageStreamID: msg.MessageStreamID,
BodyLen: uint32(bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(wc.mw.w)
2022-05-13 18:53:52 +00:00
if err != nil {
return err
}
2022-05-13 22:11:01 +00:00
case wc.lastTimestampDelta == nil || *wc.lastType != msg.Type || *wc.lastBodyLen != bodyLen:
2022-05-13 18:53:52 +00:00
err := Chunk1{
2022-05-13 22:11:01 +00:00
ChunkStreamID: msg.ChunkStreamID,
TimestampDelta: *timestampDelta,
Type: msg.Type,
BodyLen: uint32(bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(wc.mw.w)
if err != nil {
return err
}
case *wc.lastTimestampDelta != *timestampDelta:
err := Chunk2{
ChunkStreamID: msg.ChunkStreamID,
TimestampDelta: *timestampDelta,
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(wc.mw.w)
if err != nil {
return err
}
default:
err := Chunk3{
2022-05-13 18:53:52 +00:00
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(wc.mw.w)
2022-05-13 18:53:52 +00:00
if err != nil {
return err
}
}
2022-05-13 22:11:01 +00:00
v1 := msg.MessageStreamID
wc.lastMessageStreamID = &v1
v2 := msg.Type
wc.lastType = &v2
v3 := bodyLen
wc.lastBodyLen = &v3
v4 := msg.Timestamp
wc.lastTimestamp = &v4
if timestampDelta != nil {
v5 := *timestampDelta
wc.lastTimestampDelta = &v5
}
2022-05-13 18:53:52 +00:00
} else {
err := Chunk3{
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(wc.mw.w)
2022-05-13 18:53:52 +00:00
if err != nil {
return err
}
}
pos += chunkBodyLen
if (bodyLen - pos) == 0 {
return nil
}
}
}
// MessageWriter is a message writer.
type MessageWriter struct {
w io.Writer
chunkSize int
chunkStreams map[byte]*messageWriterChunkStream
}
// NewMessageWriter instantiates a MessageWriter.
func NewMessageWriter(w io.Writer) *MessageWriter {
return &MessageWriter{
w: w,
chunkSize: 128,
chunkStreams: make(map[byte]*messageWriterChunkStream),
}
}
// SetChunkSize sets the maximum chunk size.
func (mw *MessageWriter) SetChunkSize(v int) {
mw.chunkSize = v
}
// Write writes a Message.
func (mw *MessageWriter) Write(msg *Message) error {
cs, ok := mw.chunkStreams[msg.ChunkStreamID]
if !ok {
cs = &messageWriterChunkStream{mw: mw}
mw.chunkStreams[msg.ChunkStreamID] = cs
}
return cs.write(msg)
}