diff --git a/internal/rtmp/conn_test.go b/internal/rtmp/conn_test.go index 6a0efdb2..727bdb23 100644 --- a/internal/rtmp/conn_test.go +++ b/internal/rtmp/conn_test.go @@ -625,17 +625,15 @@ func TestWriteTracks(t *testing.T) { // S->C event "stream is recorded" msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, &message.MsgUserControl{ - Type: 4, - Payload: []byte{0x00, 0x00, 0x00, 0x01}, + require.Equal(t, &message.MsgUserControlStreamIsRecorded{ + StreamID: 1, }, msg) // S->C event "stream begin 1" msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, &message.MsgUserControl{ - Type: 0, - Payload: []byte{0x00, 0x00, 0x00, 0x01}, + require.Equal(t, &message.MsgUserControlStreamBegin{ + StreamID: 1, }, msg) // S->C onStatus diff --git a/internal/rtmp/message/msg_usercontrol.go b/internal/rtmp/message/msg_usercontrol.go index c8818aeb..08fc903a 100644 --- a/internal/rtmp/message/msg_usercontrol.go +++ b/internal/rtmp/message/msg_usercontrol.go @@ -1,44 +1,12 @@ package message -import ( - "encoding/binary" - "fmt" - - "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" - "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +// user control types. +const ( + UserControlTypeStreamBegin = 0 + UserControlTypeStreamEOF = 1 + UserControlTypeStreamDry = 2 + UserControlTypeSetBufferLength = 3 + UserControlTypeStreamIsRecorded = 4 + UserControlTypePingRequest = 6 + UserControlTypePingResponse = 7 ) - -// MsgUserControl is a user control message. -type MsgUserControl struct { - Type uint16 - Payload []byte -} - -// Unmarshal implements Message. -func (m *MsgUserControl) Unmarshal(raw *rawmessage.Message) error { - if raw.ChunkStreamID != ControlChunkStreamID { - return fmt.Errorf("unexpected chunk stream ID") - } - - if len(raw.Body) < 2 { - return fmt.Errorf("unexpected body size") - } - - m.Type = binary.BigEndian.Uint16(raw.Body) - m.Payload = raw.Body[2:] - - return nil -} - -// Marshal implements Message. -func (m MsgUserControl) Marshal() (*rawmessage.Message, error) { - body := make([]byte, 2+len(m.Payload)) - binary.BigEndian.PutUint16(body, m.Type) - copy(body[2:], m.Payload) - - return &rawmessage.Message{ - ChunkStreamID: ControlChunkStreamID, - Type: chunk.MessageTypeUserControl, - Body: body, - }, nil -} diff --git a/internal/rtmp/message/msg_usercontrol_pingrequest.go b/internal/rtmp/message/msg_usercontrol_pingrequest.go new file mode 100644 index 00000000..1c945f2a --- /dev/null +++ b/internal/rtmp/message/msg_usercontrol_pingrequest.go @@ -0,0 +1,42 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgUserControlPingRequest is a user control message. +type MsgUserControlPingRequest struct { + ServerTime uint32 +} + +// Unmarshal implements Message. +func (m *MsgUserControlPingRequest) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 6 { + return fmt.Errorf("invalid body size") + } + + m.ServerTime = binary.BigEndian.Uint32(raw.Body[2:]) + + return nil +} + +// Marshal implements Message. +func (m MsgUserControlPingRequest) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 6) + binary.BigEndian.PutUint16(body, UserControlTypePingRequest) + binary.BigEndian.PutUint32(body[2:], m.ServerTime) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeUserControl, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_usercontrol_pingresponse.go b/internal/rtmp/message/msg_usercontrol_pingresponse.go new file mode 100644 index 00000000..39e8f7ca --- /dev/null +++ b/internal/rtmp/message/msg_usercontrol_pingresponse.go @@ -0,0 +1,42 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgUserControlPingResponse is a user control message. +type MsgUserControlPingResponse struct { + ServerTime uint32 +} + +// Unmarshal implements Message. +func (m *MsgUserControlPingResponse) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 6 { + return fmt.Errorf("invalid body size") + } + + m.ServerTime = binary.BigEndian.Uint32(raw.Body[2:]) + + return nil +} + +// Marshal implements Message. +func (m MsgUserControlPingResponse) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 6) + binary.BigEndian.PutUint16(body, UserControlTypePingResponse) + binary.BigEndian.PutUint32(body[2:], m.ServerTime) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeUserControl, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_usercontrol_setbufferlength.go b/internal/rtmp/message/msg_usercontrol_setbufferlength.go new file mode 100644 index 00000000..c27c05a9 --- /dev/null +++ b/internal/rtmp/message/msg_usercontrol_setbufferlength.go @@ -0,0 +1,45 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgUserControlSetBufferLength is a user control message. +type MsgUserControlSetBufferLength struct { + StreamID uint32 + BufferLength uint32 +} + +// Unmarshal implements Message. +func (m *MsgUserControlSetBufferLength) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 10 { + return fmt.Errorf("invalid body size") + } + + m.StreamID = binary.BigEndian.Uint32(raw.Body[2:]) + m.BufferLength = binary.BigEndian.Uint32(raw.Body[6:]) + + return nil +} + +// Marshal implements Message. +func (m MsgUserControlSetBufferLength) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 10) + binary.BigEndian.PutUint16(body, UserControlTypeSetBufferLength) + binary.BigEndian.PutUint32(body[2:], m.StreamID) + binary.BigEndian.PutUint32(body[6:], m.BufferLength) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeUserControl, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_usercontrol_streambegin.go b/internal/rtmp/message/msg_usercontrol_streambegin.go new file mode 100644 index 00000000..7103b72a --- /dev/null +++ b/internal/rtmp/message/msg_usercontrol_streambegin.go @@ -0,0 +1,42 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgUserControlStreamBegin is a user control message. +type MsgUserControlStreamBegin struct { + StreamID uint32 +} + +// Unmarshal implements Message. +func (m *MsgUserControlStreamBegin) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 6 { + return fmt.Errorf("invalid body size") + } + + m.StreamID = binary.BigEndian.Uint32(raw.Body[2:]) + + return nil +} + +// Marshal implements Message. +func (m MsgUserControlStreamBegin) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 6) + binary.BigEndian.PutUint16(body, UserControlTypeStreamBegin) + binary.BigEndian.PutUint32(body[2:], m.StreamID) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeUserControl, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_usercontrol_streamdry.go b/internal/rtmp/message/msg_usercontrol_streamdry.go new file mode 100644 index 00000000..5a6dfb2a --- /dev/null +++ b/internal/rtmp/message/msg_usercontrol_streamdry.go @@ -0,0 +1,42 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgUserControlStreamDry is a user control message. +type MsgUserControlStreamDry struct { + StreamID uint32 +} + +// Unmarshal implements Message. +func (m *MsgUserControlStreamDry) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 6 { + return fmt.Errorf("invalid body size") + } + + m.StreamID = binary.BigEndian.Uint32(raw.Body[2:]) + + return nil +} + +// Marshal implements Message. +func (m MsgUserControlStreamDry) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 6) + binary.BigEndian.PutUint16(body, UserControlTypeStreamDry) + binary.BigEndian.PutUint32(body[2:], m.StreamID) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeUserControl, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_usercontrol_streameof.go b/internal/rtmp/message/msg_usercontrol_streameof.go new file mode 100644 index 00000000..e2572fde --- /dev/null +++ b/internal/rtmp/message/msg_usercontrol_streameof.go @@ -0,0 +1,42 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgUserControlStreamEOF is a user control message. +type MsgUserControlStreamEOF struct { + StreamID uint32 +} + +// Unmarshal implements Message. +func (m *MsgUserControlStreamEOF) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 6 { + return fmt.Errorf("invalid body size") + } + + m.StreamID = binary.BigEndian.Uint32(raw.Body[2:]) + + return nil +} + +// Marshal implements Message. +func (m MsgUserControlStreamEOF) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 6) + binary.BigEndian.PutUint16(body, UserControlTypeStreamEOF) + binary.BigEndian.PutUint32(body[2:], m.StreamID) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeUserControl, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_usercontrol_streamisrecorded.go b/internal/rtmp/message/msg_usercontrol_streamisrecorded.go new file mode 100644 index 00000000..30482b70 --- /dev/null +++ b/internal/rtmp/message/msg_usercontrol_streamisrecorded.go @@ -0,0 +1,42 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgUserControlStreamIsRecorded is a user control message. +type MsgUserControlStreamIsRecorded struct { + StreamID uint32 +} + +// Unmarshal implements Message. +func (m *MsgUserControlStreamIsRecorded) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 6 { + return fmt.Errorf("invalid body size") + } + + m.StreamID = binary.BigEndian.Uint32(raw.Body[2:]) + + return nil +} + +// Marshal implements Message. +func (m MsgUserControlStreamIsRecorded) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 6) + binary.BigEndian.PutUint16(body, UserControlTypeStreamIsRecorded) + binary.BigEndian.PutUint32(body[2:], m.StreamID) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeUserControl, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/reader.go b/internal/rtmp/message/reader.go index e60ed0db..3932c4e5 100644 --- a/internal/rtmp/message/reader.go +++ b/internal/rtmp/message/reader.go @@ -1,6 +1,7 @@ package message import ( + "encoding/binary" "fmt" "io" @@ -8,8 +9,8 @@ import ( "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" ) -func messageFromType(typ chunk.MessageType) (Message, error) { - switch typ { +func allocateMessage(raw *rawmessage.Message) (Message, error) { + switch raw.Type { case chunk.MessageTypeSetChunkSize: return &MsgSetChunkSize{}, nil @@ -20,7 +21,36 @@ func messageFromType(typ chunk.MessageType) (Message, error) { return &MsgSetPeerBandwidth{}, nil case chunk.MessageTypeUserControl: - return &MsgUserControl{}, nil + if len(raw.Body) < 2 { + return nil, fmt.Errorf("invalid body size") + } + + subType := binary.BigEndian.Uint16(raw.Body) + switch subType { + case UserControlTypeStreamBegin: + return &MsgUserControlStreamBegin{}, nil + + case UserControlTypeStreamEOF: + return &MsgUserControlStreamEOF{}, nil + + case UserControlTypeStreamDry: + return &MsgUserControlStreamDry{}, nil + + case UserControlTypeSetBufferLength: + return &MsgUserControlSetBufferLength{}, nil + + case UserControlTypeStreamIsRecorded: + return &MsgUserControlStreamIsRecorded{}, nil + + case UserControlTypePingRequest: + return &MsgUserControlPingRequest{}, nil + + case UserControlTypePingResponse: + return &MsgUserControlPingResponse{}, nil + + default: + return nil, fmt.Errorf("invalid user control type") + } case chunk.MessageTypeCommandAMF0: return &MsgCommandAMF0{}, nil @@ -63,7 +93,7 @@ func (r *Reader) Read() (Message, error) { return nil, err } - msg, err := messageFromType(raw.Type) + msg, err := allocateMessage(raw) if err != nil { return nil, err }