mediamtx/internal/rtmp/rawmessage/writer.go

196 lines
4.5 KiB
Go

package rawmessage
import (
"bufio"
"fmt"
"time"
"github.com/bluenviron/mediamtx/internal/rtmp/bytecounter"
"github.com/bluenviron/mediamtx/internal/rtmp/chunk"
)
type writerChunkStream struct {
mw *Writer
lastMessageStreamID *uint32
lastType *uint8
lastBodyLen *uint32
lastTimestamp *int64
lastTimestampDelta *int64
}
func (wc *writerChunkStream) writeChunk(c chunk.Chunk) error {
// check if we received an acknowledge
if wc.mw.checkAcknowledge && wc.mw.ackWindowSize != 0 {
diff := uint32(wc.mw.w.Count()) - wc.mw.ackValue
if diff > (wc.mw.ackWindowSize * 3 / 2) {
return fmt.Errorf("no acknowledge received within window")
}
}
buf, err := c.Marshal()
if err != nil {
return err
}
_, err = wc.mw.bw.Write(buf)
if err != nil {
return err
}
return nil
}
func (wc *writerChunkStream) writeMessage(msg *Message) error {
bodyLen := uint32(len(msg.Body))
pos := uint32(0)
firstChunk := true
// convert timestamp to milliseconds before splitting message in chunks
/// otherwise timestampDelta gets messed up.
timestamp := int64(msg.Timestamp / time.Millisecond)
var timestampDelta *int64
if wc.lastTimestamp != nil {
diff := timestamp - *wc.lastTimestamp
// use delta only if it is positive
if diff >= 0 {
timestampDelta = &diff
}
}
for {
chunkBodyLen := bodyLen - pos
if chunkBodyLen > wc.mw.chunkSize {
chunkBodyLen = wc.mw.chunkSize
}
if firstChunk {
firstChunk = false
switch {
case wc.lastMessageStreamID == nil || timestampDelta == nil || *wc.lastMessageStreamID != msg.MessageStreamID:
err := wc.writeChunk(&chunk.Chunk0{
ChunkStreamID: msg.ChunkStreamID,
Timestamp: uint32(timestamp),
Type: msg.Type,
MessageStreamID: msg.MessageStreamID,
BodyLen: (bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
})
if err != nil {
return err
}
case *wc.lastType != msg.Type || *wc.lastBodyLen != bodyLen:
err := wc.writeChunk(&chunk.Chunk1{
ChunkStreamID: msg.ChunkStreamID,
TimestampDelta: uint32(*timestampDelta),
Type: msg.Type,
BodyLen: (bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
})
if err != nil {
return err
}
case wc.lastTimestampDelta == nil || *wc.lastTimestampDelta != *timestampDelta:
err := wc.writeChunk(&chunk.Chunk2{
ChunkStreamID: msg.ChunkStreamID,
TimestampDelta: uint32(*timestampDelta),
Body: msg.Body[pos : pos+chunkBodyLen],
})
if err != nil {
return err
}
default:
err := wc.writeChunk(&chunk.Chunk3{
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
})
if err != nil {
return err
}
}
v1 := msg.MessageStreamID
wc.lastMessageStreamID = &v1
v2 := msg.Type
wc.lastType = &v2
v3 := bodyLen
wc.lastBodyLen = &v3
v4 := timestamp
wc.lastTimestamp = &v4
if timestampDelta != nil {
v5 := *timestampDelta
wc.lastTimestampDelta = &v5
}
} else {
err := wc.writeChunk(&chunk.Chunk3{
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
})
if err != nil {
return err
}
}
pos += chunkBodyLen
if (bodyLen - pos) == 0 {
return wc.mw.bw.Flush()
}
}
}
// Writer is a raw message writer.
type Writer struct {
w *bytecounter.Writer
bw *bufio.Writer
checkAcknowledge bool
chunkSize uint32
ackWindowSize uint32
ackValue uint32
chunkStreams map[byte]*writerChunkStream
}
// NewWriter allocates a Writer.
func NewWriter(w *bytecounter.Writer, checkAcknowledge bool) *Writer {
return &Writer{
w: w,
bw: bufio.NewWriter(w),
checkAcknowledge: checkAcknowledge,
chunkSize: 128,
chunkStreams: make(map[byte]*writerChunkStream),
}
}
// SetChunkSize sets the maximum chunk size.
func (w *Writer) SetChunkSize(v uint32) {
w.chunkSize = v
}
// SetWindowAckSize sets the window acknowledgement size.
func (w *Writer) SetWindowAckSize(v uint32) {
w.ackWindowSize = v
}
// SetAcknowledgeValue sets the acknowledge sequence number.
func (w *Writer) SetAcknowledgeValue(v uint32) {
w.ackValue = v
}
// Write writes a Message.
func (w *Writer) Write(msg *Message) error {
wc, ok := w.chunkStreams[msg.ChunkStreamID]
if !ok {
wc = &writerChunkStream{mw: w}
w.chunkStreams[msg.ChunkStreamID] = wc
}
return wc.writeMessage(msg)
}