mediamtx/internal/rtmp/message/reader.go

113 lines
2.4 KiB
Go
Raw Normal View History

2022-06-04 23:06:40 +00:00
package message
import (
"fmt"
2022-06-08 18:47:36 +00:00
"github.com/aler9/rtsp-simple-server/internal/rtmp/bytecounter"
2022-06-04 23:06:40 +00:00
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
2022-06-05 20:44:55 +00:00
func allocateMessage(raw *rawmessage.Message) (Message, error) {
switch raw.Type {
2022-06-04 23:06:40 +00:00
case chunk.MessageTypeSetChunkSize:
return &MsgSetChunkSize{}, nil
2022-06-08 18:47:36 +00:00
case chunk.MessageTypeAcknowledge:
return &MsgAcknowledge{}, nil
2022-06-04 23:06:40 +00:00
case chunk.MessageTypeSetWindowAckSize:
return &MsgSetWindowAckSize{}, nil
case chunk.MessageTypeSetPeerBandwidth:
return &MsgSetPeerBandwidth{}, nil
case chunk.MessageTypeUserControl:
2022-06-05 20:44:55 +00:00
if len(raw.Body) < 2 {
return nil, fmt.Errorf("invalid body size")
}
subType := uint16(raw.Body[0])<<8 | uint16(raw.Body[1])
2022-06-05 20:44:55 +00:00
switch subType {
case UserControlTypeStreamBegin:
return &MsgUserControlStreamBegin{}, nil
case UserControlTypeStreamEOF:
return &MsgUserControlStreamEOF{}, nil
case UserControlTypeStreamDry:
return &MsgUserControlStreamDry{}, nil
case UserControlTypeSetBufferLength:
return &MsgUserControlSetBufferLength{}, nil
case UserControlTypeStreamIsRecorded:
return &MsgUserControlStreamIsRecorded{}, nil
case UserControlTypePingRequest:
return &MsgUserControlPingRequest{}, nil
case UserControlTypePingResponse:
return &MsgUserControlPingResponse{}, nil
default:
return nil, fmt.Errorf("invalid user control type")
}
2022-06-04 23:06:40 +00:00
case chunk.MessageTypeCommandAMF0:
return &MsgCommandAMF0{}, nil
case chunk.MessageTypeDataAMF0:
return &MsgDataAMF0{}, nil
case chunk.MessageTypeAudio:
return &MsgAudio{}, nil
case chunk.MessageTypeVideo:
return &MsgVideo{}, nil
default:
return nil, fmt.Errorf("unhandled message type (%v)", raw.Type)
2022-06-04 23:06:40 +00:00
}
}
// Reader is a message reader.
type Reader struct {
r *rawmessage.Reader
}
// NewReader allocates a Reader.
2022-06-08 18:47:36 +00:00
func NewReader(r *bytecounter.Reader, onAckNeeded func(uint32) error) *Reader {
2022-06-04 23:06:40 +00:00
return &Reader{
2022-06-08 18:47:36 +00:00
r: rawmessage.NewReader(r, onAckNeeded),
2022-06-04 23:06:40 +00:00
}
}
2022-06-08 18:47:36 +00:00
// Read reads a Message.
2022-06-04 23:06:40 +00:00
func (r *Reader) Read() (Message, error) {
raw, err := r.r.Read()
if err != nil {
return nil, err
}
2022-06-05 20:44:55 +00:00
msg, err := allocateMessage(raw)
2022-06-04 23:06:40 +00:00
if err != nil {
return nil, err
}
err = msg.Unmarshal(raw)
if err != nil {
return nil, err
}
2022-06-08 18:47:36 +00:00
switch tmsg := msg.(type) {
case *MsgSetChunkSize:
r.r.SetChunkSize(tmsg.Value)
case *MsgSetWindowAckSize:
r.r.SetWindowAckSize(tmsg.Value)
}
2022-06-04 23:06:40 +00:00
return msg, nil
}