mediamtx/internal/rtmp/message/reader.go

152 lines
3.1 KiB
Go
Raw Normal View History

2022-06-04 23:06:40 +00:00
package message
import (
"fmt"
2023-05-16 14:14:20 +00:00
"github.com/bluenviron/mediamtx/internal/rtmp/bytecounter"
"github.com/bluenviron/mediamtx/internal/rtmp/rawmessage"
2022-06-04 23:06:40 +00:00
)
2022-06-05 20:44:55 +00:00
func allocateMessage(raw *rawmessage.Message) (Message, error) {
switch Type(raw.Type) {
case TypeSetChunkSize:
return &SetChunkSize{}, nil
2022-06-04 23:06:40 +00:00
case TypeAcknowledge:
return &Acknowledge{}, nil
2022-06-08 18:47:36 +00:00
case TypeSetWindowAckSize:
return &SetWindowAckSize{}, nil
2022-06-04 23:06:40 +00:00
case TypeSetPeerBandwidth:
return &SetPeerBandwidth{}, nil
2022-06-04 23:06:40 +00:00
case TypeUserControl:
2022-06-05 20:44:55 +00:00
if len(raw.Body) < 2 {
return nil, fmt.Errorf("not enough bytes")
2022-06-05 20:44:55 +00:00
}
userControlType := UserControlType(uint16(raw.Body[0])<<8 | uint16(raw.Body[1]))
switch userControlType {
2022-06-05 20:44:55 +00:00
case UserControlTypeStreamBegin:
return &UserControlStreamBegin{}, nil
2022-06-05 20:44:55 +00:00
case UserControlTypeStreamEOF:
return &UserControlStreamEOF{}, nil
2022-06-05 20:44:55 +00:00
case UserControlTypeStreamDry:
return &UserControlStreamDry{}, nil
2022-06-05 20:44:55 +00:00
case UserControlTypeSetBufferLength:
return &UserControlSetBufferLength{}, nil
2022-06-05 20:44:55 +00:00
case UserControlTypeStreamIsRecorded:
return &UserControlStreamIsRecorded{}, nil
2022-06-05 20:44:55 +00:00
case UserControlTypePingRequest:
return &UserControlPingRequest{}, nil
2022-06-05 20:44:55 +00:00
case UserControlTypePingResponse:
return &UserControlPingResponse{}, nil
2022-06-05 20:44:55 +00:00
default:
return nil, fmt.Errorf("invalid user control type: %v", userControlType)
2022-06-05 20:44:55 +00:00
}
2022-06-04 23:06:40 +00:00
case TypeCommandAMF0:
return &CommandAMF0{}, nil
case TypeDataAMF0:
return &DataAMF0{}, nil
case TypeAudio:
return &Audio{}, nil
case TypeVideo:
if len(raw.Body) < 5 {
return nil, fmt.Errorf("not enough bytes")
}
if (raw.Body[0] & 0b10000000) != 0 {
var fourCC [4]byte
copy(fourCC[:], raw.Body[1:5])
switch fourCC {
case FourCCAV1, FourCCVP9, FourCCHEVC:
default:
return nil, fmt.Errorf("invalid fourCC: %v", fourCC)
}
2022-06-04 23:06:40 +00:00
extendedType := ExtendedType(raw.Body[0] & 0x0F)
2022-06-04 23:06:40 +00:00
switch extendedType {
case ExtendedTypeSequenceStart:
return &ExtendedSequenceStart{}, nil
2022-06-04 23:06:40 +00:00
case ExtendedTypeCodedFrames:
return &ExtendedCodedFrames{}, nil
case ExtendedTypeSequenceEnd:
return &ExtendedSequenceEnd{}, nil
case ExtendedTypeFramesX:
return &ExtendedFramesX{}, nil
case ExtendedTypeMetadata:
return &ExtendedMetadata{}, nil
case ExtendedTypeMPEG2TSSequenceStart:
return &ExtendedMPEG2TSSequenceStart{}, nil
default:
return nil, fmt.Errorf("invalid extended type: %v", extendedType)
}
}
return &Video{}, nil
2022-06-04 23:06:40 +00:00
default:
return nil, fmt.Errorf("invalid message type: %v", raw.Type)
2022-06-04 23:06:40 +00:00
}
}
// Reader is a message reader.
type Reader struct {
r *rawmessage.Reader
}
// NewReader allocates a Reader.
2022-06-08 18:47:36 +00:00
func NewReader(r *bytecounter.Reader, onAckNeeded func(uint32) error) *Reader {
2022-06-04 23:06:40 +00:00
return &Reader{
2022-06-08 18:47:36 +00:00
r: rawmessage.NewReader(r, onAckNeeded),
2022-06-04 23:06:40 +00:00
}
}
2022-06-08 18:47:36 +00:00
// Read reads a Message.
2022-06-04 23:06:40 +00:00
func (r *Reader) Read() (Message, error) {
raw, err := r.r.Read()
if err != nil {
return nil, err
}
2022-06-05 20:44:55 +00:00
msg, err := allocateMessage(raw)
2022-06-04 23:06:40 +00:00
if err != nil {
return nil, err
}
err = msg.Unmarshal(raw)
if err != nil {
return nil, err
}
2022-06-08 18:47:36 +00:00
switch tmsg := msg.(type) {
case *SetChunkSize:
2022-06-08 18:47:36 +00:00
r.r.SetChunkSize(tmsg.Value)
case *SetWindowAckSize:
2022-06-08 18:47:36 +00:00
r.r.SetWindowAckSize(tmsg.Value)
}
2022-06-04 23:06:40 +00:00
return msg, nil
}