rtmp: add user control messages

This commit is contained in:
aler9 2022-06-05 22:44:55 +02:00
parent 9d3fd3bc37
commit 6a24c82589
10 changed files with 344 additions and 51 deletions

View File

@ -625,17 +625,15 @@ func TestWriteTracks(t *testing.T) {
// S->C event "stream is recorded"
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgUserControl{
Type: 4,
Payload: []byte{0x00, 0x00, 0x00, 0x01},
require.Equal(t, &message.MsgUserControlStreamIsRecorded{
StreamID: 1,
}, msg)
// S->C event "stream begin 1"
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgUserControl{
Type: 0,
Payload: []byte{0x00, 0x00, 0x00, 0x01},
require.Equal(t, &message.MsgUserControlStreamBegin{
StreamID: 1,
}, msg)
// S->C onStatus

View File

@ -1,44 +1,12 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
// user control types.
const (
UserControlTypeStreamBegin = 0
UserControlTypeStreamEOF = 1
UserControlTypeStreamDry = 2
UserControlTypeSetBufferLength = 3
UserControlTypeStreamIsRecorded = 4
UserControlTypePingRequest = 6
UserControlTypePingResponse = 7
)
// MsgUserControl is a user control message.
type MsgUserControl struct {
Type uint16
Payload []byte
}
// Unmarshal implements Message.
func (m *MsgUserControl) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) < 2 {
return fmt.Errorf("unexpected body size")
}
m.Type = binary.BigEndian.Uint16(raw.Body)
m.Payload = raw.Body[2:]
return nil
}
// Marshal implements Message.
func (m MsgUserControl) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 2+len(m.Payload))
binary.BigEndian.PutUint16(body, m.Type)
copy(body[2:], m.Payload)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -0,0 +1,42 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgUserControlPingRequest is a user control message.
type MsgUserControlPingRequest struct {
ServerTime uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlPingRequest) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 6 {
return fmt.Errorf("invalid body size")
}
m.ServerTime = binary.BigEndian.Uint32(raw.Body[2:])
return nil
}
// Marshal implements Message.
func (m MsgUserControlPingRequest) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 6)
binary.BigEndian.PutUint16(body, UserControlTypePingRequest)
binary.BigEndian.PutUint32(body[2:], m.ServerTime)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -0,0 +1,42 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgUserControlPingResponse is a user control message.
type MsgUserControlPingResponse struct {
ServerTime uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlPingResponse) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 6 {
return fmt.Errorf("invalid body size")
}
m.ServerTime = binary.BigEndian.Uint32(raw.Body[2:])
return nil
}
// Marshal implements Message.
func (m MsgUserControlPingResponse) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 6)
binary.BigEndian.PutUint16(body, UserControlTypePingResponse)
binary.BigEndian.PutUint32(body[2:], m.ServerTime)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -0,0 +1,45 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgUserControlSetBufferLength is a user control message.
type MsgUserControlSetBufferLength struct {
StreamID uint32
BufferLength uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlSetBufferLength) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 10 {
return fmt.Errorf("invalid body size")
}
m.StreamID = binary.BigEndian.Uint32(raw.Body[2:])
m.BufferLength = binary.BigEndian.Uint32(raw.Body[6:])
return nil
}
// Marshal implements Message.
func (m MsgUserControlSetBufferLength) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 10)
binary.BigEndian.PutUint16(body, UserControlTypeSetBufferLength)
binary.BigEndian.PutUint32(body[2:], m.StreamID)
binary.BigEndian.PutUint32(body[6:], m.BufferLength)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -0,0 +1,42 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgUserControlStreamBegin is a user control message.
type MsgUserControlStreamBegin struct {
StreamID uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlStreamBegin) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 6 {
return fmt.Errorf("invalid body size")
}
m.StreamID = binary.BigEndian.Uint32(raw.Body[2:])
return nil
}
// Marshal implements Message.
func (m MsgUserControlStreamBegin) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 6)
binary.BigEndian.PutUint16(body, UserControlTypeStreamBegin)
binary.BigEndian.PutUint32(body[2:], m.StreamID)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -0,0 +1,42 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgUserControlStreamDry is a user control message.
type MsgUserControlStreamDry struct {
StreamID uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlStreamDry) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 6 {
return fmt.Errorf("invalid body size")
}
m.StreamID = binary.BigEndian.Uint32(raw.Body[2:])
return nil
}
// Marshal implements Message.
func (m MsgUserControlStreamDry) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 6)
binary.BigEndian.PutUint16(body, UserControlTypeStreamDry)
binary.BigEndian.PutUint32(body[2:], m.StreamID)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -0,0 +1,42 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgUserControlStreamEOF is a user control message.
type MsgUserControlStreamEOF struct {
StreamID uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlStreamEOF) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 6 {
return fmt.Errorf("invalid body size")
}
m.StreamID = binary.BigEndian.Uint32(raw.Body[2:])
return nil
}
// Marshal implements Message.
func (m MsgUserControlStreamEOF) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 6)
binary.BigEndian.PutUint16(body, UserControlTypeStreamEOF)
binary.BigEndian.PutUint32(body[2:], m.StreamID)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -0,0 +1,42 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgUserControlStreamIsRecorded is a user control message.
type MsgUserControlStreamIsRecorded struct {
StreamID uint32
}
// Unmarshal implements Message.
func (m *MsgUserControlStreamIsRecorded) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 6 {
return fmt.Errorf("invalid body size")
}
m.StreamID = binary.BigEndian.Uint32(raw.Body[2:])
return nil
}
// Marshal implements Message.
func (m MsgUserControlStreamIsRecorded) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 6)
binary.BigEndian.PutUint16(body, UserControlTypeStreamIsRecorded)
binary.BigEndian.PutUint32(body[2:], m.StreamID)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -1,6 +1,7 @@
package message
import (
"encoding/binary"
"fmt"
"io"
@ -8,8 +9,8 @@ import (
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
func messageFromType(typ chunk.MessageType) (Message, error) {
switch typ {
func allocateMessage(raw *rawmessage.Message) (Message, error) {
switch raw.Type {
case chunk.MessageTypeSetChunkSize:
return &MsgSetChunkSize{}, nil
@ -20,7 +21,36 @@ func messageFromType(typ chunk.MessageType) (Message, error) {
return &MsgSetPeerBandwidth{}, nil
case chunk.MessageTypeUserControl:
return &MsgUserControl{}, nil
if len(raw.Body) < 2 {
return nil, fmt.Errorf("invalid body size")
}
subType := binary.BigEndian.Uint16(raw.Body)
switch subType {
case UserControlTypeStreamBegin:
return &MsgUserControlStreamBegin{}, nil
case UserControlTypeStreamEOF:
return &MsgUserControlStreamEOF{}, nil
case UserControlTypeStreamDry:
return &MsgUserControlStreamDry{}, nil
case UserControlTypeSetBufferLength:
return &MsgUserControlSetBufferLength{}, nil
case UserControlTypeStreamIsRecorded:
return &MsgUserControlStreamIsRecorded{}, nil
case UserControlTypePingRequest:
return &MsgUserControlPingRequest{}, nil
case UserControlTypePingResponse:
return &MsgUserControlPingResponse{}, nil
default:
return nil, fmt.Errorf("invalid user control type")
}
case chunk.MessageTypeCommandAMF0:
return &MsgCommandAMF0{}, nil
@ -63,7 +93,7 @@ func (r *Reader) Read() (Message, error) {
return nil, err
}
msg, err := messageFromType(raw.Type)
msg, err := allocateMessage(raw)
if err != nil {
return nil, err
}