rtmp: add message reader / writer

This commit is contained in:
aler9 2022-06-05 01:06:40 +02:00
parent 76e47686b2
commit ba83ef65d2
32 changed files with 1015 additions and 497 deletions

View File

@ -85,7 +85,7 @@ test-internal:
./internal/hls \
./internal/logger \
./internal/rlimit \
./internal/rtmp
./internal/rtmp/...
test-core:
$(foreach IMG,$(shell echo testimages/*/ | xargs -n1 basename), \

View File

@ -1,14 +0,0 @@
package base
import (
"io"
)
// HandshakeC0 is the C0 part of an handshake.
type HandshakeC0 struct{}
// Read reads a HandshakeC0.
func (HandshakeC0) Write(w io.Writer) error {
_, err := w.Write([]byte{rtmpVersion})
return err
}

View File

@ -1,45 +0,0 @@
package base
import (
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"io"
)
func hsCalcDigestPos(p []byte, base int) (pos int) {
for i := 0; i < 4; i++ {
pos += int(p[base+i])
}
pos = (pos % 728) + base + 4
return
}
func hsMakeDigest(key []byte, src []byte, gap int) (dst []byte) {
h := hmac.New(sha256.New, key)
if gap <= 0 {
h.Write(src)
} else {
h.Write(src[:gap])
h.Write(src[gap+32:])
}
return h.Sum(nil)
}
// HandshakeC1 is the C1 part of an handshake.
type HandshakeC1 struct{}
// Read reads a HandshakeC1.
func (HandshakeC1) Write(w io.Writer) error {
buf := make([]byte, 1536)
copy(buf[0:4], []byte{0x00, 0x00, 0x00, 0x00})
copy(buf[4:8], []byte{0x09, 0x00, 0x7c, 0x02})
rand.Read(buf[8:])
gap := hsCalcDigestPos(buf[0:], 8)
digest := hsMakeDigest(hsClientPartialKey, buf[0:], gap)
copy(buf[gap+0:], digest)
_, err := w.Write(buf[0:])
return err
}

View File

@ -1,10 +0,0 @@
package base
// RawMessage is a message.
type RawMessage struct {
ChunkStreamID byte
Timestamp uint32
Type MessageType
MessageStreamID uint32
Body []byte
}

View File

@ -1,7 +1,6 @@
package base
package chunk
import (
"fmt"
"io"
)
@ -26,12 +25,8 @@ func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error {
return err
}
if header[0]>>6 != 0 {
return fmt.Errorf("wrong chunk header type")
}
c.ChunkStreamID = header[0] & 0x3F
c.Timestamp = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1])
c.Timestamp = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6])
c.Type = MessageType(header[7])
c.MessageStreamID = uint32(header[8])<<24 | uint32(header[9])<<16 | uint32(header[10])<<8 | uint32(header[11])
@ -57,7 +52,10 @@ func (c Chunk0) Write(w io.Writer) error {
header[5] = byte(c.BodyLen >> 8)
header[6] = byte(c.BodyLen)
header[7] = byte(c.Type)
header[8] = byte(c.MessageStreamID)
header[8] = byte(c.MessageStreamID >> 24)
header[9] = byte(c.MessageStreamID >> 16)
header[10] = byte(c.MessageStreamID >> 8)
header[11] = byte(c.MessageStreamID)
_, err := w.Write(header)
if err != nil {
return err

View File

@ -0,0 +1,36 @@
package chunk
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
)
var chunk0enc = []byte{
0x19, 0xb1, 0xa1, 0x91, 0x0, 0x0, 0x14, 0x14,
0x3, 0x5d, 0x17, 0x3d, 0x1, 0x2, 0x3, 0x4,
}
var chunk0dec = Chunk0{
ChunkStreamID: 25,
Timestamp: 11641233,
Type: MessageTypeCommandAMF0,
MessageStreamID: 56432445,
BodyLen: 20,
Body: []byte{0x01, 0x02, 0x03, 0x04},
}
func TestChunk0Read(t *testing.T) {
var chunk0 Chunk0
err := chunk0.Read(bytes.NewReader(chunk0enc), 4)
require.NoError(t, err)
require.Equal(t, chunk0dec, chunk0)
}
func TestChunk0Write(t *testing.T) {
var buf bytes.Buffer
err := chunk0dec.Write(&buf)
require.NoError(t, err)
require.Equal(t, chunk0enc, buf.Bytes())
}

View File

@ -1,7 +1,6 @@
package base
package chunk
import (
"fmt"
"io"
)
@ -27,12 +26,8 @@ func (c *Chunk1) Read(r io.Reader, chunkMaxBodyLen int) error {
return err
}
if header[0]>>6 != 1 {
return fmt.Errorf("wrong chunk header type")
}
c.ChunkStreamID = header[0] & 0x3F
c.TimestampDelta = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1])
c.TimestampDelta = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6])
c.Type = MessageType(header[7])

View File

@ -0,0 +1,35 @@
package chunk
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
)
var chunk1enc = []byte{
0x59, 0xb1, 0xa1, 0x91, 0x0, 0x0, 0x14, 0x14,
0x1, 0x2, 0x3, 0x4,
}
var chunk1dec = Chunk1{
ChunkStreamID: 25,
TimestampDelta: 11641233,
Type: MessageTypeCommandAMF0,
BodyLen: 20,
Body: []byte{0x01, 0x02, 0x03, 0x04},
}
func TestChunk1Read(t *testing.T) {
var chunk1 Chunk1
err := chunk1.Read(bytes.NewReader(chunk1enc), 4)
require.NoError(t, err)
require.Equal(t, chunk1dec, chunk1)
}
func TestChunk1Write(t *testing.T) {
var buf bytes.Buffer
err := chunk1dec.Write(&buf)
require.NoError(t, err)
require.Equal(t, chunk1enc, buf.Bytes())
}

View File

@ -1,7 +1,6 @@
package base
package chunk
import (
"fmt"
"io"
)
@ -23,12 +22,8 @@ func (c *Chunk2) Read(r io.Reader, chunkBodyLen int) error {
return err
}
if header[0]>>6 != 2 {
return fmt.Errorf("wrong chunk header type")
}
c.ChunkStreamID = header[0] & 0x3F
c.TimestampDelta = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1])
c.TimestampDelta = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
c.Body = make([]byte, chunkBodyLen)
_, err = r.Read(c.Body)

View File

@ -0,0 +1,32 @@
package chunk
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
)
var chunk2enc = []byte{
0x59, 0xb1, 0xa1, 0x91, 0x1, 0x2, 0x3, 0x4,
}
var chunk2dec = Chunk2{
ChunkStreamID: 25,
TimestampDelta: 11641233,
Body: []byte{0x01, 0x02, 0x03, 0x04},
}
func TestChunk2Read(t *testing.T) {
var chunk2 Chunk2
err := chunk2.Read(bytes.NewReader(chunk2enc), 4)
require.NoError(t, err)
require.Equal(t, chunk2dec, chunk2)
}
func TestChunk2Write(t *testing.T) {
var buf bytes.Buffer
err := chunk2dec.Write(&buf)
require.NoError(t, err)
require.Equal(t, chunk2enc, buf.Bytes())
}

View File

@ -1,7 +1,6 @@
package base
package chunk
import (
"fmt"
"io"
)
@ -24,10 +23,6 @@ func (c *Chunk3) Read(r io.Reader, chunkBodyLen int) error {
return err
}
if header[0]>>6 != 2 {
return fmt.Errorf("wrong chunk header type")
}
c.ChunkStreamID = header[0] & 0x3F
c.Body = make([]byte, chunkBodyLen)

View File

@ -0,0 +1,31 @@
package chunk
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
)
var chunk3enc = []byte{
0xd9, 0x1, 0x2, 0x3, 0x4,
}
var chunk3dec = Chunk3{
ChunkStreamID: 25,
Body: []byte{0x01, 0x02, 0x03, 0x04},
}
func TestChunk3Read(t *testing.T) {
var chunk3 Chunk3
err := chunk3.Read(bytes.NewReader(chunk3enc), 4)
require.NoError(t, err)
require.Equal(t, chunk3dec, chunk3)
}
func TestChunk3Write(t *testing.T) {
var buf bytes.Buffer
err := chunk3dec.Write(&buf)
require.NoError(t, err)
require.Equal(t, chunk3enc, buf.Bytes())
}

View File

@ -1,4 +1,4 @@
package base
package chunk
// MessageType is a message type.
type MessageType byte

View File

@ -1,7 +1,6 @@
package rtmp
import (
"bufio"
"net"
"net/url"
"strings"
@ -13,7 +12,8 @@ import (
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/stretchr/testify/require"
"github.com/aler9/rtsp-simple-server/internal/rtmp/base"
"github.com/aler9/rtsp-simple-server/internal/rtmp/handshake"
"github.com/aler9/rtsp-simple-server/internal/rtmp/message"
)
func splitPath(u *url.URL) (app, stream string) {
@ -115,15 +115,15 @@ func TestReadTracks(t *testing.T) {
defer conn.Close()
// C->S handshake C0
err = base.HandshakeC0{}.Write(conn)
err = handshake.C0{}.Write(conn)
require.NoError(t, err)
// C->S handshake C1
err = base.HandshakeC1{}.Write(conn)
err = handshake.C1{}.Write(conn)
require.NoError(t, err)
// S->C handshake S0
err = base.HandshakeS0{}.Read(conn)
err = handshake.S0{}.Read(conn)
require.NoError(t, err)
// S->C handshake S1+S2
@ -132,59 +132,53 @@ func TestReadTracks(t *testing.T) {
require.NoError(t, err)
// C->S handshake C2
err = base.HandshakeC2{}.Write(conn, s1s2)
err = handshake.C2{}.Write(conn, s1s2)
require.NoError(t, err)
mw := base.NewRawMessageWriter(conn)
mr := base.NewRawMessageReader(bufio.NewReader(conn))
mw := message.NewWriter(conn)
mr := message.NewReader(conn)
// C->S connect
byts := flvio.FillAMF0ValsMalloc([]interface{}{
"connect",
1,
flvio.AMFMap{
{K: "app", V: "/stream"},
{K: "flashVer", V: "LNX 9,0,124,2"},
{K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")},
{K: "fpad", V: false},
{K: "capabilities", V: 15},
{K: "audioCodecs", V: 4071},
{K: "videoCodecs", V: 252},
{K: "videoFunction", V: 1},
},
})
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: byts,
Payload: []interface{}{
"connect",
1,
flvio.AMFMap{
{K: "app", V: "/stream"},
{K: "flashVer", V: "LNX 9,0,124,2"},
{K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")},
{K: "fpad", V: false},
{K: "capabilities", V: 15},
{K: "audioCodecs", V: 4071},
{K: "videoCodecs", V: 252},
{K: "videoFunction", V: 1},
},
},
})
require.NoError(t, err)
// S->C window acknowledgement size
msg, err := mr.Read()
require.NoError(t, err)
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
Body: []byte{0x00, 38, 37, 160},
require.Equal(t, &message.MsgSetWindowAckSize{
Value: 2500000,
}, msg)
// S->C set peer bandwidth
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetPeerBandwidth,
Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02},
require.Equal(t, &message.MsgSetPeerBandwidth{
Value: 2500000,
Type: 2,
}, msg)
// S->C set chunk size
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
require.Equal(t, &message.MsgSetChunkSize{
Value: 65536,
}, msg)
mr.SetChunkSize(65536)
@ -192,151 +186,141 @@ func TestReadTracks(t *testing.T) {
// S->C result
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(3), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err := flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"_result",
float64(1),
flvio.AMFMap{
{K: "fmsVer", V: "LNX 9,0,124,2"},
{K: "capabilities", V: float64(31)},
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"_result",
float64(1),
flvio.AMFMap{
{K: "fmsVer", V: "LNX 9,0,124,2"},
{K: "capabilities", V: float64(31)},
},
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetConnection.Connect.Success"},
{K: "description", V: "Connection succeeded."},
{K: "objectEncoding", V: float64(0)},
},
},
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetConnection.Connect.Success"},
{K: "description", V: "Connection succeeded."},
{K: "objectEncoding", V: float64(0)},
},
}, arr)
}, msg)
// C->S set chunk size
err = mw.Write(&base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
err = mw.Write(&message.MsgSetChunkSize{
Value: 65536,
})
require.NoError(t, err)
mw.SetChunkSize(65536)
// C->S releaseStream
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
Payload: []interface{}{
"releaseStream",
float64(2),
nil,
"",
}),
},
})
require.NoError(t, err)
// C->S FCPublish
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
Payload: []interface{}{
"FCPublish",
float64(3),
nil,
"",
}),
},
})
require.NoError(t, err)
// C->S createStream
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
Payload: []interface{}{
"createStream",
float64(4),
nil,
}),
},
})
require.NoError(t, err)
// S->C result
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(3), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err = flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"_result",
float64(4),
nil,
float64(1),
}, arr)
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"_result",
float64(4),
nil,
float64(1),
},
}, msg)
// C->S publish
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 8,
Type: base.MessageTypeCommandAMF0,
MessageStreamID: 1,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
Payload: []interface{}{
"publish",
float64(5),
nil,
"",
"live",
}),
},
})
require.NoError(t, err)
// S->C onStatus
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(5), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err = flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"onStatus",
float64(5),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Publish.Start"},
{K: "description", V: "publish start"},
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 16777216,
Payload: []interface{}{
"onStatus",
float64(5),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Publish.Start"},
{K: "description", V: "publish start"},
},
},
}, arr)
}, msg)
switch ca {
case "standard":
// C->S metadata
byts = flvio.FillAMF0ValsMalloc([]interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "videodatarate",
V: float64(0),
},
{
K: "videocodecid",
V: float64(codecH264),
},
{
K: "audiodatarate",
V: float64(0),
},
{
K: "audiocodecid",
V: float64(codecAAC),
err = mw.Write(&message.MsgDataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "videodatarate",
V: float64(0),
},
{
K: "videocodecid",
V: float64(codecH264),
},
{
K: "audiodatarate",
V: float64(0),
},
{
K: "audiocodecid",
V: float64(codecAAC),
},
},
},
})
err = mw.Write(&base.RawMessage{
ChunkStreamID: 4,
Type: base.MessageTypeDataAMF0,
MessageStreamID: 1,
Body: byts,
})
require.NoError(t, err)
// C->S H264 decoder config
@ -352,9 +336,8 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgVideo{
ChunkStreamID: 6,
Type: base.MessageTypeVideo,
MessageStreamID: 1,
Body: body,
})
@ -367,9 +350,8 @@ func TestReadTracks(t *testing.T) {
ChannelCount: 2,
}.Encode()
require.NoError(t, err)
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgAudio{
ChunkStreamID: 4,
Type: base.MessageTypeAudio,
MessageStreamID: 1,
Body: append([]byte{
flvio.SOUND_AAC<<4 | flvio.SOUND_44Khz<<2 | flvio.SOUND_16BIT<<1 | flvio.SOUND_STEREO,
@ -380,30 +362,28 @@ func TestReadTracks(t *testing.T) {
case "metadata without codec id":
// C->S metadata
byts = flvio.FillAMF0ValsMalloc([]interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "width",
V: float64(2688),
},
{
K: "height",
V: float64(1520),
},
{
K: "framerate",
V: float64(0o25),
err = mw.Write(&message.MsgDataAMF0{
ChunkStreamID: 4,
MessageStreamID: 1,
Payload: []interface{}{
"@setDataFrame",
"onMetaData",
flvio.AMFMap{
{
K: "width",
V: float64(2688),
},
{
K: "height",
V: float64(1520),
},
{
K: "framerate",
V: float64(0o25),
},
},
},
})
err = mw.Write(&base.RawMessage{
ChunkStreamID: 4,
Type: base.MessageTypeDataAMF0,
MessageStreamID: 1,
Body: byts,
})
require.NoError(t, err)
// C->S H264 decoder config
@ -419,9 +399,8 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgVideo{
ChunkStreamID: 6,
Type: base.MessageTypeVideo,
MessageStreamID: 1,
Body: body,
})
@ -441,9 +420,8 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgVideo{
ChunkStreamID: 6,
Type: base.MessageTypeVideo,
MessageStreamID: 1,
Body: body,
})
@ -493,15 +471,15 @@ func TestWriteTracks(t *testing.T) {
defer conn.Close()
// C->S handshake C0
err = base.HandshakeC0{}.Write(conn)
err = handshake.C0{}.Write(conn)
require.NoError(t, err)
// C-> handshake C1
err = base.HandshakeC1{}.Write(conn)
err = handshake.C1{}.Write(conn)
require.NoError(t, err)
// S->C handshake S0
err = base.HandshakeS0{}.Read(conn)
err = handshake.S0{}.Read(conn)
require.NoError(t, err)
// S->C handshake S1+S2
@ -510,59 +488,52 @@ func TestWriteTracks(t *testing.T) {
require.NoError(t, err)
// C->S handshake C2
err = base.HandshakeC2{}.Write(conn, s1s2)
err = handshake.C2{}.Write(conn, s1s2)
require.NoError(t, err)
mw := base.NewRawMessageWriter(conn)
mr := base.NewRawMessageReader(bufio.NewReader(conn))
mw := message.NewWriter(conn)
mr := message.NewReader(conn)
// C->S connect
byts := flvio.FillAMF0ValsMalloc([]interface{}{
"connect",
1,
flvio.AMFMap{
{K: "app", V: "/stream"},
{K: "flashVer", V: "LNX 9,0,124,2"},
{K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")},
{K: "fpad", V: false},
{K: "capabilities", V: 15},
{K: "audioCodecs", V: 4071},
{K: "videoCodecs", V: 252},
{K: "videoFunction", V: 1},
},
})
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: byts,
Payload: []interface{}{
"connect",
1,
flvio.AMFMap{
{K: "app", V: "/stream"},
{K: "flashVer", V: "LNX 9,0,124,2"},
{K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")},
{K: "fpad", V: false},
{K: "capabilities", V: 15},
{K: "audioCodecs", V: 4071},
{K: "videoCodecs", V: 252},
{K: "videoFunction", V: 1},
},
},
})
require.NoError(t, err)
// S->C window acknowledgement size
msg, err := mr.Read()
require.NoError(t, err)
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
Body: []byte{0x00, 38, 37, 160},
require.Equal(t, &message.MsgSetWindowAckSize{
Value: 2500000,
}, msg)
// S->C set peer bandwidth
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetPeerBandwidth,
Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02},
require.Equal(t, &message.MsgSetPeerBandwidth{
Value: 2500000,
Type: 2,
}, msg)
// S->C set chunk size
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
require.Equal(t, &message.MsgSetChunkSize{
Value: 65536,
}, msg)
mr.SetChunkSize(65536)
@ -570,222 +541,214 @@ func TestWriteTracks(t *testing.T) {
// S->C result
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(3), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err := flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"_result",
float64(1),
flvio.AMFMap{
{K: "fmsVer", V: "LNX 9,0,124,2"},
{K: "capabilities", V: float64(31)},
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"_result",
float64(1),
flvio.AMFMap{
{K: "fmsVer", V: "LNX 9,0,124,2"},
{K: "capabilities", V: float64(31)},
},
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetConnection.Connect.Success"},
{K: "description", V: "Connection succeeded."},
{K: "objectEncoding", V: float64(0)},
},
},
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetConnection.Connect.Success"},
{K: "description", V: "Connection succeeded."},
{K: "objectEncoding", V: float64(0)},
},
}, arr)
}, msg)
// C->S window acknowledgement size
err = mw.Write(&base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
Body: []byte{0x00, 0x26, 0x25, 0xa0},
err = mw.Write(&message.MsgSetWindowAckSize{
Value: 2500000,
})
require.NoError(t, err)
// C->S set chunk size
err = mw.Write(&base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
err = mw.Write(&message.MsgSetChunkSize{
Value: 65536,
})
require.NoError(t, err)
mw.SetChunkSize(65536)
// C->S createStream
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
Payload: []interface{}{
"createStream",
float64(2),
nil,
}),
},
})
require.NoError(t, err)
// S->C result
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(3), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err = flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"_result",
float64(2),
nil,
float64(1),
}, arr)
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"_result",
float64(2),
nil,
float64(1),
},
}, msg)
// C->S getStreamLength
byts = flvio.FillAMF0ValsMalloc([]interface{}{
"getStreamLength",
float64(3),
nil,
"",
})
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 8,
Body: byts,
Payload: []interface{}{
"getStreamLength",
float64(3),
nil,
"",
},
})
require.NoError(t, err)
// C->S play
byts = flvio.FillAMF0ValsMalloc([]interface{}{
"play",
float64(4),
nil,
"",
float64(-2000),
})
err = mw.Write(&base.RawMessage{
err = mw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 8,
Type: base.MessageTypeCommandAMF0,
Body: byts,
Payload: []interface{}{
"play",
float64(4),
nil,
"",
float64(-2000),
},
})
require.NoError(t, err)
// S->C event "stream is recorded"
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeUserControl,
Body: []byte{0x00, 0x04, 0x00, 0x00, 0x00, 0x01},
require.Equal(t, &message.MsgUserControl{
Type: 4,
Payload: []byte{0x00, 0x00, 0x00, 0x01},
}, msg)
// S->C event "stream begin 1"
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeUserControl,
Body: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
require.Equal(t, &message.MsgUserControl{
Type: 0,
Payload: []byte{0x00, 0x00, 0x00, 0x01},
}, msg)
// S->C onStatus
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(5), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err = flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Play.Reset"},
{K: "description", V: "play reset"},
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 16777216,
Payload: []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Play.Reset"},
{K: "description", V: "play reset"},
},
},
}, arr)
}, msg)
// S->C onStatus
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(5), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err = flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Play.Start"},
{K: "description", V: "play start"},
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 16777216,
Payload: []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Play.Start"},
{K: "description", V: "play start"},
},
},
}, arr)
}, msg)
// S->C onStatus
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(5), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err = flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Data.Start"},
{K: "description", V: "data start"},
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 16777216,
Payload: []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Data.Start"},
{K: "description", V: "data start"},
},
},
}, arr)
}, msg)
// S->C onStatus
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(5), msg.ChunkStreamID)
require.Equal(t, base.MessageTypeCommandAMF0, msg.Type)
arr, err = flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Play.PublishNotify"},
{K: "description", V: "publish notify"},
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 16777216,
Payload: []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Play.PublishNotify"},
{K: "description", V: "publish notify"},
},
},
}, arr)
}, msg)
// S->C onMetadata
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(4), msg.ChunkStreamID)
require.Equal(t, base.MessageType(0x12), msg.Type)
arr, err = flvio.ParseAMFVals(msg.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
"onMetaData",
flvio.AMFMap{
{K: "videodatarate", V: float64(0)},
{K: "videocodecid", V: float64(7)},
{K: "audiodatarate", V: float64(0)},
{K: "audiocodecid", V: float64(10)},
require.Equal(t, &message.MsgDataAMF0{
ChunkStreamID: 4,
MessageStreamID: 16777216,
Payload: []interface{}{
"onMetaData",
flvio.AMFMap{
{K: "videodatarate", V: float64(0)},
{K: "videocodecid", V: float64(7)},
{K: "audiodatarate", V: float64(0)},
{K: "audiocodecid", V: float64(10)},
},
},
}, arr)
}, msg)
// S->C H264 decoder config
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(6), msg.ChunkStreamID)
require.Equal(t, base.MessageType(0x09), msg.Type)
require.Equal(t, []byte{
0x17, 0x0, 0x0, 0x0, 0x0, 0x1, 0x64, 0x0,
0xc, 0xff, 0xe1, 0x0, 0x15, 0x67, 0x64, 0x0,
0xc, 0xac, 0x3b, 0x50, 0xb0, 0x4b, 0x42, 0x0,
0x0, 0x3, 0x0, 0x2, 0x0, 0x0, 0x3, 0x0,
0x3d, 0x8, 0x1, 0x0, 0x4, 0x68, 0xee, 0x3c,
0x80,
}, msg.Body)
require.Equal(t, &message.MsgVideo{
ChunkStreamID: 6,
MessageStreamID: 16777216,
Body: []byte{
0x17, 0x0, 0x0, 0x0, 0x0, 0x1, 0x64, 0x0,
0xc, 0xff, 0xe1, 0x0, 0x15, 0x67, 0x64, 0x0,
0xc, 0xac, 0x3b, 0x50, 0xb0, 0x4b, 0x42, 0x0,
0x0, 0x3, 0x0, 0x2, 0x0, 0x0, 0x3, 0x0,
0x3d, 0x8, 0x1, 0x0, 0x4, 0x68, 0xee, 0x3c,
0x80,
},
}, msg)
// S->C AAC decoder config
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, uint8(4), msg.ChunkStreamID)
require.Equal(t, base.MessageType(0x08), msg.Type)
require.Equal(t, []byte{0xae, 0x0, 0x12, 0x10}, msg.Body)
require.Equal(t, &message.MsgAudio{
ChunkStreamID: 4,
MessageStreamID: 16777216,
Body: []byte{0xae, 0x0, 0x12, 0x10},
}, msg)
}

View File

@ -0,0 +1,18 @@
package handshake
import (
"io"
)
const (
rtmpVersion = 0x03
)
// C0 is the C0 part of an handshake.
type C0 struct{}
// Read reads a C0.
func (C0) Write(w io.Writer) error {
_, err := w.Write([]byte{rtmpVersion})
return err
}

View File

@ -1,12 +1,10 @@
package base
package handshake
const (
// ControlChunkStreamID is the stream ID used for control messages.
ControlChunkStreamID = 2
)
const (
rtmpVersion = 0x03
import (
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"io"
)
var (
@ -30,3 +28,40 @@ var (
hsClientPartialKey = hsClientFullKey[:30]
hsServerPartialKey = hsServerFullKey[:36]
)
func hsCalcDigestPos(p []byte, base int) (pos int) {
for i := 0; i < 4; i++ {
pos += int(p[base+i])
}
pos = (pos % 728) + base + 4
return
}
func hsMakeDigest(key []byte, src []byte, gap int) (dst []byte) {
h := hmac.New(sha256.New, key)
if gap <= 0 {
h.Write(src)
} else {
h.Write(src[:gap])
h.Write(src[gap+32:])
}
return h.Sum(nil)
}
// C1 is the C1 part of an handshake.
type C1 struct{}
// Read reads a C1.
func (C1) Write(w io.Writer) error {
buf := make([]byte, 1536)
copy(buf[0:4], []byte{0x00, 0x00, 0x00, 0x00})
copy(buf[4:8], []byte{0x09, 0x00, 0x7c, 0x02})
rand.Read(buf[8:])
gap := hsCalcDigestPos(buf[0:], 8)
digest := hsMakeDigest(hsClientPartialKey, buf[0:], gap)
copy(buf[gap+0:], digest)
_, err := w.Write(buf[0:])
return err
}

View File

@ -1,4 +1,4 @@
package base
package handshake
import (
"bytes"
@ -28,11 +28,11 @@ func hsParse1(p []byte, peerkey []byte, key []byte) (ok bool, digest []byte) {
return
}
// HandshakeC2 is the C2 part of an handshake.
type HandshakeC2 struct{}
// C2 is the C2 part of an handshake.
type C2 struct{}
// Read reads a HandshakeC2.
func (HandshakeC2) Write(w io.Writer, s1s2 []byte) error {
// Read reads a C2.
func (C2) Write(w io.Writer, s1s2 []byte) error {
ok, key := hsParse1(s1s2[:1536], hsServerPartialKey, hsClientFullKey)
if !ok {
return fmt.Errorf("unable to parse S1+S2")

View File

@ -1,15 +1,15 @@
package base
package handshake
import (
"fmt"
"io"
)
// HandshakeS0 is the S0 part of an handshake.
type HandshakeS0 struct{}
// S0 is the S0 part of an handshake.
type S0 struct{}
// Read reads a HandshakeS0.
func (HandshakeS0) Read(r io.Reader) error {
// Read reads a S0.
func (S0) Read(r io.Reader) error {
buf := make([]byte, 1)
_, err := io.ReadFull(r, buf)
if err != nil {

View File

@ -0,0 +1,16 @@
package message
import (
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
const (
// ControlChunkStreamID is the stream ID used for control messages.
ControlChunkStreamID = 2
)
// Message is a message.
type Message interface {
Unmarshal(*rawmessage.Message) error
Marshal() (*rawmessage.Message, error)
}

View File

@ -0,0 +1,31 @@
package message
import (
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgAudio is an audio message.
type MsgAudio struct {
ChunkStreamID byte
MessageStreamID uint32
Body []byte
}
// Unmarshal implements Message.
func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.MessageStreamID = raw.MessageStreamID
m.Body = raw.Body
return nil
}
// Marshal implements Message.
func (m MsgAudio) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Type: chunk.MessageTypeAudio,
MessageStreamID: m.MessageStreamID,
Body: m.Body,
}, nil
}

View File

@ -0,0 +1,39 @@
package message
import (
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgCommandAMF0 is a AMF0 command message.
type MsgCommandAMF0 struct {
ChunkStreamID byte
MessageStreamID uint32
Payload []interface{}
}
// Unmarshal implements Message.
func (m *MsgCommandAMF0) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.MessageStreamID = raw.MessageStreamID
payload, err := flvio.ParseAMFVals(raw.Body, false)
if err != nil {
return err
}
m.Payload = payload
return nil
}
// Marshal implements Message.
func (m MsgCommandAMF0) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Type: chunk.MessageTypeCommandAMF0,
MessageStreamID: m.MessageStreamID,
Body: flvio.FillAMF0ValsMalloc(m.Payload),
}, nil
}

View File

@ -0,0 +1,39 @@
package message
import (
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgDataAMF0 is a AMF0 data message.
type MsgDataAMF0 struct {
ChunkStreamID byte
MessageStreamID uint32
Payload []interface{}
}
// Unmarshal implements Message.
func (m *MsgDataAMF0) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.MessageStreamID = raw.MessageStreamID
payload, err := flvio.ParseAMFVals(raw.Body, false)
if err != nil {
return err
}
m.Payload = payload
return nil
}
// Marshal implements Message.
func (m MsgDataAMF0) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Type: chunk.MessageTypeDataAMF0,
MessageStreamID: m.MessageStreamID,
Body: flvio.FillAMF0ValsMalloc(m.Payload),
}, nil
}

View File

@ -0,0 +1,40 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgSetChunkSize is a set chunk size message.
type MsgSetChunkSize struct {
Value uint32
}
// Unmarshal implements Message.
func (m *MsgSetChunkSize) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 4 {
return fmt.Errorf("unexpected body size")
}
m.Value = binary.BigEndian.Uint32(raw.Body)
return nil
}
// Marshal implements Message.
func (m *MsgSetChunkSize) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 4)
binary.BigEndian.PutUint32(body, m.Value)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeSetChunkSize,
Body: body,
}, nil
}

View File

@ -0,0 +1,43 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgSetPeerBandwidth is a set peer bandwidth message.
type MsgSetPeerBandwidth struct {
Value uint32
Type byte
}
// Unmarshal implements Message.
func (m *MsgSetPeerBandwidth) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 5 {
return fmt.Errorf("unexpected body size")
}
m.Value = binary.BigEndian.Uint32(raw.Body)
m.Type = raw.Body[4]
return nil
}
// Marshal implements Message.
func (m *MsgSetPeerBandwidth) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 5)
binary.BigEndian.PutUint32(body, m.Value)
body[4] = m.Type
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeSetChunkSize,
Body: body,
}, nil
}

View File

@ -0,0 +1,40 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgSetWindowAckSize is a set window acknowledgement message.
type MsgSetWindowAckSize struct {
Value uint32
}
// Unmarshal implements Message.
func (m *MsgSetWindowAckSize) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) != 4 {
return fmt.Errorf("unexpected body size")
}
m.Value = binary.BigEndian.Uint32(raw.Body)
return nil
}
// Marshal implements Message.
func (m *MsgSetWindowAckSize) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 4)
binary.BigEndian.PutUint32(body, m.Value)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeSetWindowAckSize,
Body: body,
}, nil
}

View File

@ -0,0 +1,44 @@
package message
import (
"encoding/binary"
"fmt"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgUserControl is a user control message.
type MsgUserControl struct {
Type uint16
Payload []byte
}
// Unmarshal implements Message.
func (m *MsgUserControl) Unmarshal(raw *rawmessage.Message) error {
if raw.ChunkStreamID != ControlChunkStreamID {
return fmt.Errorf("unexpected chunk stream ID")
}
if len(raw.Body) < 2 {
return fmt.Errorf("unexpected body size")
}
m.Type = binary.BigEndian.Uint16(raw.Body)
m.Payload = raw.Body[2:]
return nil
}
// Marshal implements Message.
func (m MsgUserControl) Marshal() (*rawmessage.Message, error) {
body := make([]byte, 2+len(m.Payload))
binary.BigEndian.PutUint16(body, m.Type)
copy(body[2:], m.Payload)
return &rawmessage.Message{
ChunkStreamID: ControlChunkStreamID,
Type: chunk.MessageTypeUserControl,
Body: body,
}, nil
}

View File

@ -0,0 +1,31 @@
package message
import (
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// MsgVideo is a video message.
type MsgVideo struct {
ChunkStreamID byte
MessageStreamID uint32
Body []byte
}
// Unmarshal implements Message.
func (m *MsgVideo) Unmarshal(raw *rawmessage.Message) error {
m.ChunkStreamID = raw.ChunkStreamID
m.MessageStreamID = raw.MessageStreamID
m.Body = raw.Body
return nil
}
// Marshal implements Message.
func (m MsgVideo) Marshal() (*rawmessage.Message, error) {
return &rawmessage.Message{
ChunkStreamID: m.ChunkStreamID,
Type: chunk.MessageTypeVideo,
MessageStreamID: m.MessageStreamID,
Body: m.Body,
}, nil
}

View File

@ -0,0 +1,77 @@
package message
import (
"fmt"
"io"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
func messageFromType(typ chunk.MessageType) (Message, error) {
switch typ {
case chunk.MessageTypeSetChunkSize:
return &MsgSetChunkSize{}, nil
case chunk.MessageTypeSetWindowAckSize:
return &MsgSetWindowAckSize{}, nil
case chunk.MessageTypeSetPeerBandwidth:
return &MsgSetPeerBandwidth{}, nil
case chunk.MessageTypeUserControl:
return &MsgUserControl{}, nil
case chunk.MessageTypeCommandAMF0:
return &MsgCommandAMF0{}, nil
case chunk.MessageTypeDataAMF0:
return &MsgDataAMF0{}, nil
case chunk.MessageTypeAudio:
return &MsgAudio{}, nil
case chunk.MessageTypeVideo:
return &MsgVideo{}, nil
default:
return nil, fmt.Errorf("unhandled message")
}
}
// Reader is a message reader.
type Reader struct {
r *rawmessage.Reader
}
// NewReader allocates a Reader.
func NewReader(r io.Reader) *Reader {
return &Reader{
r: rawmessage.NewReader(r),
}
}
// SetChunkSize sets the maximum chunk size.
func (r *Reader) SetChunkSize(v int) {
r.r.SetChunkSize(v)
}
// Read reads a essage.
func (r *Reader) Read() (Message, error) {
raw, err := r.r.Read()
if err != nil {
return nil, err
}
msg, err := messageFromType(raw.Type)
if err != nil {
return nil, err
}
err = msg.Unmarshal(raw)
if err != nil {
return nil, err
}
return msg, nil
}

View File

@ -0,0 +1,34 @@
package message
import (
"io"
"github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage"
)
// Writer is a message writer.
type Writer struct {
w *rawmessage.Writer
}
// NewWriter allocates a Writer.
func NewWriter(w io.Writer) *Writer {
return &Writer{
w: rawmessage.NewWriter(w),
}
}
// SetChunkSize sets the maximum chunk size.
func (mw *Writer) SetChunkSize(v int) {
mw.w.SetChunkSize(v)
}
// Write writes a message.
func (mw *Writer) Write(msg Message) error {
raw, err := msg.Marshal()
if err != nil {
return err
}
return mw.w.Write(raw)
}

View File

@ -0,0 +1,14 @@
package rawmessage
import (
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
)
// Message is a raw message.
type Message struct {
ChunkStreamID byte
Timestamp uint32
Type chunk.MessageType
MessageStreamID uint32
Body []byte
}

View File

@ -1,30 +1,33 @@
package base
package rawmessage
import (
"bufio"
"errors"
"fmt"
"io"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
)
var errMoreChunksNeeded = errors.New("more chunks are needed")
type rawRawMessageReaderChunkStream struct {
mr *RawMessageReader
type readerChunkStream struct {
mr *Reader
curTimestamp *uint32
curType *MessageType
curType *chunk.MessageType
curMessageStreamID *uint32
curBodyLen *uint32
curBody *[]byte
}
func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
func (rc *readerChunkStream) read(typ byte) (*Message, error) {
switch typ {
case 0:
if rc.curBody != nil {
return nil, fmt.Errorf("received type 0 chunk but expected type 3 chunk")
}
var c0 Chunk0
var c0 chunk.Chunk0
err := c0.Read(rc.mr.r, rc.mr.chunkSize)
if err != nil {
return nil, err
@ -44,7 +47,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
return nil, errMoreChunksNeeded
}
return &RawMessage{
return &Message{
Timestamp: c0.Timestamp,
Type: c0.Type,
MessageStreamID: c0.MessageStreamID,
@ -60,7 +63,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
return nil, fmt.Errorf("received type 1 chunk but expected type 3 chunk")
}
var c1 Chunk1
var c1 chunk.Chunk1
err := c1.Read(rc.mr.r, rc.mr.chunkSize)
if err != nil {
return nil, err
@ -78,7 +81,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
return nil, errMoreChunksNeeded
}
return &RawMessage{
return &Message{
Timestamp: *rc.curTimestamp + c1.TimestampDelta,
Type: c1.Type,
MessageStreamID: *rc.curMessageStreamID,
@ -99,7 +102,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
chunkBodyLen = rc.mr.chunkSize
}
var c2 Chunk2
var c2 chunk.Chunk2
err := c2.Read(rc.mr.r, chunkBodyLen)
if err != nil {
return nil, err
@ -113,7 +116,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
return nil, errMoreChunksNeeded
}
return &RawMessage{
return &Message{
Timestamp: *rc.curTimestamp + c2.TimestampDelta,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
@ -134,7 +137,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
chunkBodyLen = rc.mr.chunkSize
}
var c3 Chunk3
var c3 chunk.Chunk3
err := c3.Read(rc.mr.r, chunkBodyLen)
if err != nil {
return nil, err
@ -149,7 +152,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
body := *rc.curBody
rc.curBody = nil
return &RawMessage{
return &Message{
Timestamp: *rc.curTimestamp,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
@ -158,30 +161,31 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
}
}
// RawMessageReader is a message reader.
type RawMessageReader struct {
// Reader is a raw message reader.
type Reader struct {
r *bufio.Reader
chunkSize int
chunkStreams map[byte]*rawRawMessageReaderChunkStream
chunkStreams map[byte]*readerChunkStream
}
// NewRawMessageReader allocates a RawMessageReader.
func NewRawMessageReader(r *bufio.Reader) *RawMessageReader {
return &RawMessageReader{
r: r,
// NewReader allocates a Reader.
func NewReader(r io.Reader) *Reader {
return &Reader{
r: bufio.NewReader(r),
chunkSize: 128,
chunkStreams: make(map[byte]*rawRawMessageReaderChunkStream),
chunkStreams: make(map[byte]*readerChunkStream),
}
}
// SetChunkSize sets the maximum chunk size.
func (mr *RawMessageReader) SetChunkSize(v int) {
mr.chunkSize = v
func (r *Reader) SetChunkSize(v int) {
r.chunkSize = v
}
func (mr *RawMessageReader) Read() (*RawMessage, error) {
// Read reads a Message.
func (r *Reader) Read() (*Message, error) {
for {
byt, err := mr.r.ReadByte()
byt, err := r.r.ReadByte()
if err != nil {
return nil, err
}
@ -189,13 +193,13 @@ func (mr *RawMessageReader) Read() (*RawMessage, error) {
typ := byt >> 6
chunkStreamID := byt & 0x3F
rc, ok := mr.chunkStreams[chunkStreamID]
rc, ok := r.chunkStreams[chunkStreamID]
if !ok {
rc = &rawRawMessageReaderChunkStream{mr: mr}
mr.chunkStreams[chunkStreamID] = rc
rc = &readerChunkStream{mr: r}
r.chunkStreams[chunkStreamID] = rc
}
mr.r.UnreadByte()
r.r.UnreadByte()
msg, err := rc.read(typ)
if err != nil {

View File

@ -1,19 +1,21 @@
package base
package rawmessage
import (
"io"
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
)
type rawMessageWriterChunkStream struct {
mw *RawMessageWriter
type writerChunkStream struct {
mw *Writer
lastMessageStreamID *uint32
lastType *MessageType
lastType *chunk.MessageType
lastBodyLen *int
lastTimestamp *uint32
lastTimestampDelta *uint32
}
func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error {
func (wc *writerChunkStream) write(msg *Message) error {
bodyLen := len(msg.Body)
pos := 0
firstChunk := true
@ -40,7 +42,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error {
switch {
case wc.lastMessageStreamID == nil || timestampDelta == nil || *wc.lastMessageStreamID != msg.MessageStreamID:
err := Chunk0{
err := chunk.Chunk0{
ChunkStreamID: msg.ChunkStreamID,
Timestamp: msg.Timestamp,
Type: msg.Type,
@ -53,7 +55,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error {
}
case *wc.lastType != msg.Type || *wc.lastBodyLen != bodyLen:
err := Chunk1{
err := chunk.Chunk1{
ChunkStreamID: msg.ChunkStreamID,
TimestampDelta: *timestampDelta,
Type: msg.Type,
@ -65,7 +67,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error {
}
case wc.lastTimestampDelta == nil || *wc.lastTimestampDelta != *timestampDelta:
err := Chunk2{
err := chunk.Chunk2{
ChunkStreamID: msg.ChunkStreamID,
TimestampDelta: *timestampDelta,
Body: msg.Body[pos : pos+chunkBodyLen],
@ -75,7 +77,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error {
}
default:
err := Chunk3{
err := chunk.Chunk3{
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(wc.mw.w)
@ -98,7 +100,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error {
wc.lastTimestampDelta = &v5
}
} else {
err := Chunk3{
err := chunk.Chunk3{
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(wc.mw.w)
@ -115,33 +117,33 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error {
}
}
// RawMessageWriter is a message writer.
type RawMessageWriter struct {
// Writer is a raw message writer.
type Writer struct {
w io.Writer
chunkSize int
chunkStreams map[byte]*rawMessageWriterChunkStream
chunkStreams map[byte]*writerChunkStream
}
// NewRawMessageWriter allocates a RawMessageWriter.
func NewRawMessageWriter(w io.Writer) *RawMessageWriter {
return &RawMessageWriter{
// NewWriter allocates a Writer.
func NewWriter(w io.Writer) *Writer {
return &Writer{
w: w,
chunkSize: 128,
chunkStreams: make(map[byte]*rawMessageWriterChunkStream),
chunkStreams: make(map[byte]*writerChunkStream),
}
}
// SetChunkSize sets the maximum chunk size.
func (mw *RawMessageWriter) SetChunkSize(v int) {
mw.chunkSize = v
func (w *Writer) SetChunkSize(v int) {
w.chunkSize = v
}
// Write writes a Message.
func (mw *RawMessageWriter) Write(msg *RawMessage) error {
wc, ok := mw.chunkStreams[msg.ChunkStreamID]
func (w *Writer) Write(msg *Message) error {
wc, ok := w.chunkStreams[msg.ChunkStreamID]
if !ok {
wc = &rawMessageWriterChunkStream{mw: mw}
mw.chunkStreams[msg.ChunkStreamID] = wc
wc = &writerChunkStream{mw: w}
w.chunkStreams[msg.ChunkStreamID] = wc
}
return wc.write(msg)