From ba83ef65d2dc6d6660b4e7174c5660e618ed559b Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 5 Jun 2022 01:06:40 +0200 Subject: [PATCH] rtmp: add message reader / writer --- Makefile | 2 +- internal/rtmp/base/handshakec0.go | 14 - internal/rtmp/base/handshakec1.go | 45 -- internal/rtmp/base/rawmessage.go | 10 - internal/rtmp/{base => chunk}/chunk0.go | 14 +- internal/rtmp/chunk/chunk0_test.go | 36 + internal/rtmp/{base => chunk}/chunk1.go | 9 +- internal/rtmp/chunk/chunk1_test.go | 35 + internal/rtmp/{base => chunk}/chunk2.go | 9 +- internal/rtmp/chunk/chunk2_test.go | 32 + internal/rtmp/{base => chunk}/chunk3.go | 7 +- internal/rtmp/chunk/chunk3_test.go | 31 + internal/rtmp/{base => chunk}/messagetype.go | 2 +- internal/rtmp/conn_test.go | 619 ++++++++---------- internal/rtmp/handshake/c0.go | 18 + .../rtmp/{base/base.go => handshake/c1.go} | 51 +- .../{base/handshakec2.go => handshake/c2.go} | 10 +- .../{base/handshakes0.go => handshake/s0.go} | 10 +- internal/rtmp/message/message.go | 16 + internal/rtmp/message/msg_audio.go | 31 + internal/rtmp/message/msg_command_amf0.go | 39 ++ internal/rtmp/message/msg_dataamf0.go | 39 ++ internal/rtmp/message/msg_setchunksize.go | 40 ++ internal/rtmp/message/msg_setpeerbandwidth.go | 43 ++ internal/rtmp/message/msg_setwindowacksize.go | 40 ++ internal/rtmp/message/msg_usercontrol.go | 44 ++ internal/rtmp/message/msg_video.go | 31 + internal/rtmp/message/reader.go | 77 +++ internal/rtmp/message/writer.go | 34 + internal/rtmp/rawmessage/message.go | 14 + .../reader.go} | 62 +- .../writer.go} | 48 +- 32 files changed, 1015 insertions(+), 497 deletions(-) delete mode 100644 internal/rtmp/base/handshakec0.go delete mode 100644 internal/rtmp/base/handshakec1.go delete mode 100644 internal/rtmp/base/rawmessage.go rename internal/rtmp/{base => chunk}/chunk0.go (84%) create mode 100644 internal/rtmp/chunk/chunk0_test.go rename internal/rtmp/{base => chunk}/chunk1.go (88%) create mode 100644 internal/rtmp/chunk/chunk1_test.go rename internal/rtmp/{base => chunk}/chunk2.go (83%) create mode 100644 internal/rtmp/chunk/chunk2_test.go rename internal/rtmp/{base => chunk}/chunk3.go (90%) create mode 100644 internal/rtmp/chunk/chunk3_test.go rename internal/rtmp/{base => chunk}/messagetype.go (97%) create mode 100644 internal/rtmp/handshake/c0.go rename internal/rtmp/{base/base.go => handshake/c1.go} (51%) rename internal/rtmp/{base/handshakec2.go => handshake/c2.go} (82%) rename internal/rtmp/{base/handshakes0.go => handshake/s0.go} (57%) create mode 100644 internal/rtmp/message/message.go create mode 100644 internal/rtmp/message/msg_audio.go create mode 100644 internal/rtmp/message/msg_command_amf0.go create mode 100644 internal/rtmp/message/msg_dataamf0.go create mode 100644 internal/rtmp/message/msg_setchunksize.go create mode 100644 internal/rtmp/message/msg_setpeerbandwidth.go create mode 100644 internal/rtmp/message/msg_setwindowacksize.go create mode 100644 internal/rtmp/message/msg_usercontrol.go create mode 100644 internal/rtmp/message/msg_video.go create mode 100644 internal/rtmp/message/reader.go create mode 100644 internal/rtmp/message/writer.go create mode 100644 internal/rtmp/rawmessage/message.go rename internal/rtmp/{base/rawmessagereader.go => rawmessage/reader.go} (77%) rename internal/rtmp/{base/rawmessagewriter.go => rawmessage/writer.go} (74%) diff --git a/Makefile b/Makefile index f8f49c90..dacf8cce 100644 --- a/Makefile +++ b/Makefile @@ -85,7 +85,7 @@ test-internal: ./internal/hls \ ./internal/logger \ ./internal/rlimit \ - ./internal/rtmp + ./internal/rtmp/... test-core: $(foreach IMG,$(shell echo testimages/*/ | xargs -n1 basename), \ diff --git a/internal/rtmp/base/handshakec0.go b/internal/rtmp/base/handshakec0.go deleted file mode 100644 index 3a873bad..00000000 --- a/internal/rtmp/base/handshakec0.go +++ /dev/null @@ -1,14 +0,0 @@ -package base - -import ( - "io" -) - -// HandshakeC0 is the C0 part of an handshake. -type HandshakeC0 struct{} - -// Read reads a HandshakeC0. -func (HandshakeC0) Write(w io.Writer) error { - _, err := w.Write([]byte{rtmpVersion}) - return err -} diff --git a/internal/rtmp/base/handshakec1.go b/internal/rtmp/base/handshakec1.go deleted file mode 100644 index 8081fb2b..00000000 --- a/internal/rtmp/base/handshakec1.go +++ /dev/null @@ -1,45 +0,0 @@ -package base - -import ( - "crypto/hmac" - "crypto/rand" - "crypto/sha256" - "io" -) - -func hsCalcDigestPos(p []byte, base int) (pos int) { - for i := 0; i < 4; i++ { - pos += int(p[base+i]) - } - pos = (pos % 728) + base + 4 - return -} - -func hsMakeDigest(key []byte, src []byte, gap int) (dst []byte) { - h := hmac.New(sha256.New, key) - if gap <= 0 { - h.Write(src) - } else { - h.Write(src[:gap]) - h.Write(src[gap+32:]) - } - return h.Sum(nil) -} - -// HandshakeC1 is the C1 part of an handshake. -type HandshakeC1 struct{} - -// Read reads a HandshakeC1. -func (HandshakeC1) Write(w io.Writer) error { - buf := make([]byte, 1536) - copy(buf[0:4], []byte{0x00, 0x00, 0x00, 0x00}) - copy(buf[4:8], []byte{0x09, 0x00, 0x7c, 0x02}) - - rand.Read(buf[8:]) - gap := hsCalcDigestPos(buf[0:], 8) - digest := hsMakeDigest(hsClientPartialKey, buf[0:], gap) - copy(buf[gap+0:], digest) - - _, err := w.Write(buf[0:]) - return err -} diff --git a/internal/rtmp/base/rawmessage.go b/internal/rtmp/base/rawmessage.go deleted file mode 100644 index 8e9da338..00000000 --- a/internal/rtmp/base/rawmessage.go +++ /dev/null @@ -1,10 +0,0 @@ -package base - -// RawMessage is a message. -type RawMessage struct { - ChunkStreamID byte - Timestamp uint32 - Type MessageType - MessageStreamID uint32 - Body []byte -} diff --git a/internal/rtmp/base/chunk0.go b/internal/rtmp/chunk/chunk0.go similarity index 84% rename from internal/rtmp/base/chunk0.go rename to internal/rtmp/chunk/chunk0.go index 17a4596e..88612dd4 100644 --- a/internal/rtmp/base/chunk0.go +++ b/internal/rtmp/chunk/chunk0.go @@ -1,7 +1,6 @@ -package base +package chunk import ( - "fmt" "io" ) @@ -26,12 +25,8 @@ func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error { return err } - if header[0]>>6 != 0 { - return fmt.Errorf("wrong chunk header type") - } - c.ChunkStreamID = header[0] & 0x3F - c.Timestamp = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1]) + c.Timestamp = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3]) c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6]) c.Type = MessageType(header[7]) c.MessageStreamID = uint32(header[8])<<24 | uint32(header[9])<<16 | uint32(header[10])<<8 | uint32(header[11]) @@ -57,7 +52,10 @@ func (c Chunk0) Write(w io.Writer) error { header[5] = byte(c.BodyLen >> 8) header[6] = byte(c.BodyLen) header[7] = byte(c.Type) - header[8] = byte(c.MessageStreamID) + header[8] = byte(c.MessageStreamID >> 24) + header[9] = byte(c.MessageStreamID >> 16) + header[10] = byte(c.MessageStreamID >> 8) + header[11] = byte(c.MessageStreamID) _, err := w.Write(header) if err != nil { return err diff --git a/internal/rtmp/chunk/chunk0_test.go b/internal/rtmp/chunk/chunk0_test.go new file mode 100644 index 00000000..2021a2ae --- /dev/null +++ b/internal/rtmp/chunk/chunk0_test.go @@ -0,0 +1,36 @@ +package chunk + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +var chunk0enc = []byte{ + 0x19, 0xb1, 0xa1, 0x91, 0x0, 0x0, 0x14, 0x14, + 0x3, 0x5d, 0x17, 0x3d, 0x1, 0x2, 0x3, 0x4, +} + +var chunk0dec = Chunk0{ + ChunkStreamID: 25, + Timestamp: 11641233, + Type: MessageTypeCommandAMF0, + MessageStreamID: 56432445, + BodyLen: 20, + Body: []byte{0x01, 0x02, 0x03, 0x04}, +} + +func TestChunk0Read(t *testing.T) { + var chunk0 Chunk0 + err := chunk0.Read(bytes.NewReader(chunk0enc), 4) + require.NoError(t, err) + require.Equal(t, chunk0dec, chunk0) +} + +func TestChunk0Write(t *testing.T) { + var buf bytes.Buffer + err := chunk0dec.Write(&buf) + require.NoError(t, err) + require.Equal(t, chunk0enc, buf.Bytes()) +} diff --git a/internal/rtmp/base/chunk1.go b/internal/rtmp/chunk/chunk1.go similarity index 88% rename from internal/rtmp/base/chunk1.go rename to internal/rtmp/chunk/chunk1.go index 4523d482..b594c92e 100644 --- a/internal/rtmp/base/chunk1.go +++ b/internal/rtmp/chunk/chunk1.go @@ -1,7 +1,6 @@ -package base +package chunk import ( - "fmt" "io" ) @@ -27,12 +26,8 @@ func (c *Chunk1) Read(r io.Reader, chunkMaxBodyLen int) error { return err } - if header[0]>>6 != 1 { - return fmt.Errorf("wrong chunk header type") - } - c.ChunkStreamID = header[0] & 0x3F - c.TimestampDelta = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1]) + c.TimestampDelta = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3]) c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6]) c.Type = MessageType(header[7]) diff --git a/internal/rtmp/chunk/chunk1_test.go b/internal/rtmp/chunk/chunk1_test.go new file mode 100644 index 00000000..903a33de --- /dev/null +++ b/internal/rtmp/chunk/chunk1_test.go @@ -0,0 +1,35 @@ +package chunk + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +var chunk1enc = []byte{ + 0x59, 0xb1, 0xa1, 0x91, 0x0, 0x0, 0x14, 0x14, + 0x1, 0x2, 0x3, 0x4, +} + +var chunk1dec = Chunk1{ + ChunkStreamID: 25, + TimestampDelta: 11641233, + Type: MessageTypeCommandAMF0, + BodyLen: 20, + Body: []byte{0x01, 0x02, 0x03, 0x04}, +} + +func TestChunk1Read(t *testing.T) { + var chunk1 Chunk1 + err := chunk1.Read(bytes.NewReader(chunk1enc), 4) + require.NoError(t, err) + require.Equal(t, chunk1dec, chunk1) +} + +func TestChunk1Write(t *testing.T) { + var buf bytes.Buffer + err := chunk1dec.Write(&buf) + require.NoError(t, err) + require.Equal(t, chunk1enc, buf.Bytes()) +} diff --git a/internal/rtmp/base/chunk2.go b/internal/rtmp/chunk/chunk2.go similarity index 83% rename from internal/rtmp/base/chunk2.go rename to internal/rtmp/chunk/chunk2.go index 7515f70b..18040d11 100644 --- a/internal/rtmp/base/chunk2.go +++ b/internal/rtmp/chunk/chunk2.go @@ -1,7 +1,6 @@ -package base +package chunk import ( - "fmt" "io" ) @@ -23,12 +22,8 @@ func (c *Chunk2) Read(r io.Reader, chunkBodyLen int) error { return err } - if header[0]>>6 != 2 { - return fmt.Errorf("wrong chunk header type") - } - c.ChunkStreamID = header[0] & 0x3F - c.TimestampDelta = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1]) + c.TimestampDelta = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3]) c.Body = make([]byte, chunkBodyLen) _, err = r.Read(c.Body) diff --git a/internal/rtmp/chunk/chunk2_test.go b/internal/rtmp/chunk/chunk2_test.go new file mode 100644 index 00000000..066f810d --- /dev/null +++ b/internal/rtmp/chunk/chunk2_test.go @@ -0,0 +1,32 @@ +package chunk + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +var chunk2enc = []byte{ + 0x59, 0xb1, 0xa1, 0x91, 0x1, 0x2, 0x3, 0x4, +} + +var chunk2dec = Chunk2{ + ChunkStreamID: 25, + TimestampDelta: 11641233, + Body: []byte{0x01, 0x02, 0x03, 0x04}, +} + +func TestChunk2Read(t *testing.T) { + var chunk2 Chunk2 + err := chunk2.Read(bytes.NewReader(chunk2enc), 4) + require.NoError(t, err) + require.Equal(t, chunk2dec, chunk2) +} + +func TestChunk2Write(t *testing.T) { + var buf bytes.Buffer + err := chunk2dec.Write(&buf) + require.NoError(t, err) + require.Equal(t, chunk2enc, buf.Bytes()) +} diff --git a/internal/rtmp/base/chunk3.go b/internal/rtmp/chunk/chunk3.go similarity index 90% rename from internal/rtmp/base/chunk3.go rename to internal/rtmp/chunk/chunk3.go index dca1cacd..e8008abd 100644 --- a/internal/rtmp/base/chunk3.go +++ b/internal/rtmp/chunk/chunk3.go @@ -1,7 +1,6 @@ -package base +package chunk import ( - "fmt" "io" ) @@ -24,10 +23,6 @@ func (c *Chunk3) Read(r io.Reader, chunkBodyLen int) error { return err } - if header[0]>>6 != 2 { - return fmt.Errorf("wrong chunk header type") - } - c.ChunkStreamID = header[0] & 0x3F c.Body = make([]byte, chunkBodyLen) diff --git a/internal/rtmp/chunk/chunk3_test.go b/internal/rtmp/chunk/chunk3_test.go new file mode 100644 index 00000000..f6a1e874 --- /dev/null +++ b/internal/rtmp/chunk/chunk3_test.go @@ -0,0 +1,31 @@ +package chunk + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +var chunk3enc = []byte{ + 0xd9, 0x1, 0x2, 0x3, 0x4, +} + +var chunk3dec = Chunk3{ + ChunkStreamID: 25, + Body: []byte{0x01, 0x02, 0x03, 0x04}, +} + +func TestChunk3Read(t *testing.T) { + var chunk3 Chunk3 + err := chunk3.Read(bytes.NewReader(chunk3enc), 4) + require.NoError(t, err) + require.Equal(t, chunk3dec, chunk3) +} + +func TestChunk3Write(t *testing.T) { + var buf bytes.Buffer + err := chunk3dec.Write(&buf) + require.NoError(t, err) + require.Equal(t, chunk3enc, buf.Bytes()) +} diff --git a/internal/rtmp/base/messagetype.go b/internal/rtmp/chunk/messagetype.go similarity index 97% rename from internal/rtmp/base/messagetype.go rename to internal/rtmp/chunk/messagetype.go index 3e870fe6..5002d98b 100644 --- a/internal/rtmp/base/messagetype.go +++ b/internal/rtmp/chunk/messagetype.go @@ -1,4 +1,4 @@ -package base +package chunk // MessageType is a message type. type MessageType byte diff --git a/internal/rtmp/conn_test.go b/internal/rtmp/conn_test.go index a5e478a7..6a0efdb2 100644 --- a/internal/rtmp/conn_test.go +++ b/internal/rtmp/conn_test.go @@ -1,7 +1,6 @@ package rtmp import ( - "bufio" "net" "net/url" "strings" @@ -13,7 +12,8 @@ import ( "github.com/notedit/rtmp/format/flv/flvio" "github.com/stretchr/testify/require" - "github.com/aler9/rtsp-simple-server/internal/rtmp/base" + "github.com/aler9/rtsp-simple-server/internal/rtmp/handshake" + "github.com/aler9/rtsp-simple-server/internal/rtmp/message" ) func splitPath(u *url.URL) (app, stream string) { @@ -115,15 +115,15 @@ func TestReadTracks(t *testing.T) { defer conn.Close() // C->S handshake C0 - err = base.HandshakeC0{}.Write(conn) + err = handshake.C0{}.Write(conn) require.NoError(t, err) // C->S handshake C1 - err = base.HandshakeC1{}.Write(conn) + err = handshake.C1{}.Write(conn) require.NoError(t, err) // S->C handshake S0 - err = base.HandshakeS0{}.Read(conn) + err = handshake.S0{}.Read(conn) require.NoError(t, err) // S->C handshake S1+S2 @@ -132,59 +132,53 @@ func TestReadTracks(t *testing.T) { require.NoError(t, err) // C->S handshake C2 - err = base.HandshakeC2{}.Write(conn, s1s2) + err = handshake.C2{}.Write(conn, s1s2) require.NoError(t, err) - mw := base.NewRawMessageWriter(conn) - mr := base.NewRawMessageReader(bufio.NewReader(conn)) + mw := message.NewWriter(conn) + mr := message.NewReader(conn) // C->S connect - byts := flvio.FillAMF0ValsMalloc([]interface{}{ - "connect", - 1, - flvio.AMFMap{ - {K: "app", V: "/stream"}, - {K: "flashVer", V: "LNX 9,0,124,2"}, - {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, - {K: "fpad", V: false}, - {K: "capabilities", V: 15}, - {K: "audioCodecs", V: 4071}, - {K: "videoCodecs", V: 252}, - {K: "videoFunction", V: 1}, - }, - }) - err = mw.Write(&base.RawMessage{ + + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 3, - Type: base.MessageTypeCommandAMF0, - Body: byts, + Payload: []interface{}{ + "connect", + 1, + flvio.AMFMap{ + {K: "app", V: "/stream"}, + {K: "flashVer", V: "LNX 9,0,124,2"}, + {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, + {K: "fpad", V: false}, + {K: "capabilities", V: 15}, + {K: "audioCodecs", V: 4071}, + {K: "videoCodecs", V: 252}, + {K: "videoFunction", V: 1}, + }, + }, }) require.NoError(t, err) // S->C window acknowledgement size msg, err := mr.Read() require.NoError(t, err) - require.Equal(t, &base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetWindowAckSize, - Body: []byte{0x00, 38, 37, 160}, + require.Equal(t, &message.MsgSetWindowAckSize{ + Value: 2500000, }, msg) // S->C set peer bandwidth msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, &base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetPeerBandwidth, - Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02}, + require.Equal(t, &message.MsgSetPeerBandwidth{ + Value: 2500000, + Type: 2, }, msg) // S->C set chunk size msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, &base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetChunkSize, - Body: []byte{0x00, 0x01, 0x00, 0x00}, + require.Equal(t, &message.MsgSetChunkSize{ + Value: 65536, }, msg) mr.SetChunkSize(65536) @@ -192,151 +186,141 @@ func TestReadTracks(t *testing.T) { // S->C result msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(3), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err := flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "_result", - float64(1), - flvio.AMFMap{ - {K: "fmsVer", V: "LNX 9,0,124,2"}, - {K: "capabilities", V: float64(31)}, + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(1), + flvio.AMFMap{ + {K: "fmsVer", V: "LNX 9,0,124,2"}, + {K: "capabilities", V: float64(31)}, + }, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetConnection.Connect.Success"}, + {K: "description", V: "Connection succeeded."}, + {K: "objectEncoding", V: float64(0)}, + }, }, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetConnection.Connect.Success"}, - {K: "description", V: "Connection succeeded."}, - {K: "objectEncoding", V: float64(0)}, - }, - }, arr) + }, msg) // C->S set chunk size - err = mw.Write(&base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetChunkSize, - Body: []byte{0x00, 0x01, 0x00, 0x00}, + err = mw.Write(&message.MsgSetChunkSize{ + Value: 65536, }) require.NoError(t, err) mw.SetChunkSize(65536) // C->S releaseStream - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 3, - Type: base.MessageTypeCommandAMF0, - Body: flvio.FillAMF0ValsMalloc([]interface{}{ + Payload: []interface{}{ "releaseStream", float64(2), nil, "", - }), + }, }) require.NoError(t, err) // C->S FCPublish - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 3, - Type: base.MessageTypeCommandAMF0, - Body: flvio.FillAMF0ValsMalloc([]interface{}{ + Payload: []interface{}{ "FCPublish", float64(3), nil, "", - }), + }, }) require.NoError(t, err) // C->S createStream - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 3, - Type: base.MessageTypeCommandAMF0, - Body: flvio.FillAMF0ValsMalloc([]interface{}{ + Payload: []interface{}{ "createStream", float64(4), nil, - }), + }, }) require.NoError(t, err) // S->C result msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(3), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err = flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "_result", - float64(4), - nil, - float64(1), - }, arr) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(4), + nil, + float64(1), + }, + }, msg) // C->S publish - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 8, - Type: base.MessageTypeCommandAMF0, MessageStreamID: 1, - Body: flvio.FillAMF0ValsMalloc([]interface{}{ + Payload: []interface{}{ "publish", float64(5), nil, "", "live", - }), + }, }) require.NoError(t, err) // S->C onStatus msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(5), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err = flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "onStatus", - float64(5), - nil, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetStream.Publish.Start"}, - {K: "description", V: "publish start"}, + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 5, + MessageStreamID: 16777216, + Payload: []interface{}{ + "onStatus", + float64(5), + nil, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetStream.Publish.Start"}, + {K: "description", V: "publish start"}, + }, }, - }, arr) + }, msg) switch ca { case "standard": // C->S metadata - byts = flvio.FillAMF0ValsMalloc([]interface{}{ - "@setDataFrame", - "onMetaData", - flvio.AMFMap{ - { - K: "videodatarate", - V: float64(0), - }, - { - K: "videocodecid", - V: float64(codecH264), - }, - { - K: "audiodatarate", - V: float64(0), - }, - { - K: "audiocodecid", - V: float64(codecAAC), + err = mw.Write(&message.MsgDataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 1, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + { + K: "videodatarate", + V: float64(0), + }, + { + K: "videocodecid", + V: float64(codecH264), + }, + { + K: "audiodatarate", + V: float64(0), + }, + { + K: "audiocodecid", + V: float64(codecAAC), + }, }, }, }) - err = mw.Write(&base.RawMessage{ - ChunkStreamID: 4, - Type: base.MessageTypeDataAMF0, - MessageStreamID: 1, - Body: byts, - }) require.NoError(t, err) // C->S H264 decoder config @@ -352,9 +336,8 @@ func TestReadTracks(t *testing.T) { var n int codec.ToConfig(b, &n) body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...) - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgVideo{ ChunkStreamID: 6, - Type: base.MessageTypeVideo, MessageStreamID: 1, Body: body, }) @@ -367,9 +350,8 @@ func TestReadTracks(t *testing.T) { ChannelCount: 2, }.Encode() require.NoError(t, err) - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgAudio{ ChunkStreamID: 4, - Type: base.MessageTypeAudio, MessageStreamID: 1, Body: append([]byte{ flvio.SOUND_AAC<<4 | flvio.SOUND_44Khz<<2 | flvio.SOUND_16BIT<<1 | flvio.SOUND_STEREO, @@ -380,30 +362,28 @@ func TestReadTracks(t *testing.T) { case "metadata without codec id": // C->S metadata - byts = flvio.FillAMF0ValsMalloc([]interface{}{ - "@setDataFrame", - "onMetaData", - flvio.AMFMap{ - { - K: "width", - V: float64(2688), - }, - { - K: "height", - V: float64(1520), - }, - { - K: "framerate", - V: float64(0o25), + err = mw.Write(&message.MsgDataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 1, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + { + K: "width", + V: float64(2688), + }, + { + K: "height", + V: float64(1520), + }, + { + K: "framerate", + V: float64(0o25), + }, }, }, }) - err = mw.Write(&base.RawMessage{ - ChunkStreamID: 4, - Type: base.MessageTypeDataAMF0, - MessageStreamID: 1, - Body: byts, - }) require.NoError(t, err) // C->S H264 decoder config @@ -419,9 +399,8 @@ func TestReadTracks(t *testing.T) { var n int codec.ToConfig(b, &n) body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...) - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgVideo{ ChunkStreamID: 6, - Type: base.MessageTypeVideo, MessageStreamID: 1, Body: body, }) @@ -441,9 +420,8 @@ func TestReadTracks(t *testing.T) { var n int codec.ToConfig(b, &n) body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...) - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgVideo{ ChunkStreamID: 6, - Type: base.MessageTypeVideo, MessageStreamID: 1, Body: body, }) @@ -493,15 +471,15 @@ func TestWriteTracks(t *testing.T) { defer conn.Close() // C->S handshake C0 - err = base.HandshakeC0{}.Write(conn) + err = handshake.C0{}.Write(conn) require.NoError(t, err) // C-> handshake C1 - err = base.HandshakeC1{}.Write(conn) + err = handshake.C1{}.Write(conn) require.NoError(t, err) // S->C handshake S0 - err = base.HandshakeS0{}.Read(conn) + err = handshake.S0{}.Read(conn) require.NoError(t, err) // S->C handshake S1+S2 @@ -510,59 +488,52 @@ func TestWriteTracks(t *testing.T) { require.NoError(t, err) // C->S handshake C2 - err = base.HandshakeC2{}.Write(conn, s1s2) + err = handshake.C2{}.Write(conn, s1s2) require.NoError(t, err) - mw := base.NewRawMessageWriter(conn) - mr := base.NewRawMessageReader(bufio.NewReader(conn)) + mw := message.NewWriter(conn) + mr := message.NewReader(conn) // C->S connect - byts := flvio.FillAMF0ValsMalloc([]interface{}{ - "connect", - 1, - flvio.AMFMap{ - {K: "app", V: "/stream"}, - {K: "flashVer", V: "LNX 9,0,124,2"}, - {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, - {K: "fpad", V: false}, - {K: "capabilities", V: 15}, - {K: "audioCodecs", V: 4071}, - {K: "videoCodecs", V: 252}, - {K: "videoFunction", V: 1}, - }, - }) - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 3, - Type: base.MessageTypeCommandAMF0, - Body: byts, + Payload: []interface{}{ + "connect", + 1, + flvio.AMFMap{ + {K: "app", V: "/stream"}, + {K: "flashVer", V: "LNX 9,0,124,2"}, + {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, + {K: "fpad", V: false}, + {K: "capabilities", V: 15}, + {K: "audioCodecs", V: 4071}, + {K: "videoCodecs", V: 252}, + {K: "videoFunction", V: 1}, + }, + }, }) require.NoError(t, err) // S->C window acknowledgement size msg, err := mr.Read() require.NoError(t, err) - require.Equal(t, &base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetWindowAckSize, - Body: []byte{0x00, 38, 37, 160}, + require.Equal(t, &message.MsgSetWindowAckSize{ + Value: 2500000, }, msg) // S->C set peer bandwidth msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, &base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetPeerBandwidth, - Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02}, + require.Equal(t, &message.MsgSetPeerBandwidth{ + Value: 2500000, + Type: 2, }, msg) // S->C set chunk size msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, &base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetChunkSize, - Body: []byte{0x00, 0x01, 0x00, 0x00}, + require.Equal(t, &message.MsgSetChunkSize{ + Value: 65536, }, msg) mr.SetChunkSize(65536) @@ -570,222 +541,214 @@ func TestWriteTracks(t *testing.T) { // S->C result msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(3), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err := flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "_result", - float64(1), - flvio.AMFMap{ - {K: "fmsVer", V: "LNX 9,0,124,2"}, - {K: "capabilities", V: float64(31)}, + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(1), + flvio.AMFMap{ + {K: "fmsVer", V: "LNX 9,0,124,2"}, + {K: "capabilities", V: float64(31)}, + }, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetConnection.Connect.Success"}, + {K: "description", V: "Connection succeeded."}, + {K: "objectEncoding", V: float64(0)}, + }, }, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetConnection.Connect.Success"}, - {K: "description", V: "Connection succeeded."}, - {K: "objectEncoding", V: float64(0)}, - }, - }, arr) + }, msg) // C->S window acknowledgement size - err = mw.Write(&base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetWindowAckSize, - Body: []byte{0x00, 0x26, 0x25, 0xa0}, + err = mw.Write(&message.MsgSetWindowAckSize{ + Value: 2500000, }) require.NoError(t, err) // C->S set chunk size - err = mw.Write(&base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeSetChunkSize, - Body: []byte{0x00, 0x01, 0x00, 0x00}, + err = mw.Write(&message.MsgSetChunkSize{ + Value: 65536, }) require.NoError(t, err) mw.SetChunkSize(65536) // C->S createStream - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 3, - Type: base.MessageTypeCommandAMF0, - Body: flvio.FillAMF0ValsMalloc([]interface{}{ + Payload: []interface{}{ "createStream", float64(2), nil, - }), + }, }) require.NoError(t, err) // S->C result msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(3), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err = flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "_result", - float64(2), - nil, - float64(1), - }, arr) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(2), + nil, + float64(1), + }, + }, msg) // C->S getStreamLength - byts = flvio.FillAMF0ValsMalloc([]interface{}{ - "getStreamLength", - float64(3), - nil, - "", - }) - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 8, - Body: byts, + Payload: []interface{}{ + "getStreamLength", + float64(3), + nil, + "", + }, }) require.NoError(t, err) // C->S play - byts = flvio.FillAMF0ValsMalloc([]interface{}{ - "play", - float64(4), - nil, - "", - float64(-2000), - }) - err = mw.Write(&base.RawMessage{ + err = mw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 8, - Type: base.MessageTypeCommandAMF0, - Body: byts, + Payload: []interface{}{ + "play", + float64(4), + nil, + "", + float64(-2000), + }, }) require.NoError(t, err) // S->C event "stream is recorded" msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, &base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeUserControl, - Body: []byte{0x00, 0x04, 0x00, 0x00, 0x00, 0x01}, + require.Equal(t, &message.MsgUserControl{ + Type: 4, + Payload: []byte{0x00, 0x00, 0x00, 0x01}, }, msg) // S->C event "stream begin 1" msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, &base.RawMessage{ - ChunkStreamID: base.ControlChunkStreamID, - Type: base.MessageTypeUserControl, - Body: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + require.Equal(t, &message.MsgUserControl{ + Type: 0, + Payload: []byte{0x00, 0x00, 0x00, 0x01}, }, msg) // S->C onStatus msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(5), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err = flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "onStatus", - float64(4), - nil, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetStream.Play.Reset"}, - {K: "description", V: "play reset"}, + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 5, + MessageStreamID: 16777216, + Payload: []interface{}{ + "onStatus", + float64(4), + nil, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetStream.Play.Reset"}, + {K: "description", V: "play reset"}, + }, }, - }, arr) + }, msg) // S->C onStatus msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(5), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err = flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "onStatus", - float64(4), - nil, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetStream.Play.Start"}, - {K: "description", V: "play start"}, + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 5, + MessageStreamID: 16777216, + Payload: []interface{}{ + "onStatus", + float64(4), + nil, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetStream.Play.Start"}, + {K: "description", V: "play start"}, + }, }, - }, arr) + }, msg) // S->C onStatus msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(5), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err = flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "onStatus", - float64(4), - nil, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetStream.Data.Start"}, - {K: "description", V: "data start"}, + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 5, + MessageStreamID: 16777216, + Payload: []interface{}{ + "onStatus", + float64(4), + nil, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetStream.Data.Start"}, + {K: "description", V: "data start"}, + }, }, - }, arr) + }, msg) // S->C onStatus msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(5), msg.ChunkStreamID) - require.Equal(t, base.MessageTypeCommandAMF0, msg.Type) - arr, err = flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "onStatus", - float64(4), - nil, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetStream.Play.PublishNotify"}, - {K: "description", V: "publish notify"}, + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 5, + MessageStreamID: 16777216, + Payload: []interface{}{ + "onStatus", + float64(4), + nil, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetStream.Play.PublishNotify"}, + {K: "description", V: "publish notify"}, + }, }, - }, arr) + }, msg) // S->C onMetadata msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(4), msg.ChunkStreamID) - require.Equal(t, base.MessageType(0x12), msg.Type) - arr, err = flvio.ParseAMFVals(msg.Body, false) - require.NoError(t, err) - require.Equal(t, []interface{}{ - "onMetaData", - flvio.AMFMap{ - {K: "videodatarate", V: float64(0)}, - {K: "videocodecid", V: float64(7)}, - {K: "audiodatarate", V: float64(0)}, - {K: "audiocodecid", V: float64(10)}, + require.Equal(t, &message.MsgDataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 16777216, + Payload: []interface{}{ + "onMetaData", + flvio.AMFMap{ + {K: "videodatarate", V: float64(0)}, + {K: "videocodecid", V: float64(7)}, + {K: "audiodatarate", V: float64(0)}, + {K: "audiocodecid", V: float64(10)}, + }, }, - }, arr) + }, msg) // S->C H264 decoder config msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(6), msg.ChunkStreamID) - require.Equal(t, base.MessageType(0x09), msg.Type) - require.Equal(t, []byte{ - 0x17, 0x0, 0x0, 0x0, 0x0, 0x1, 0x64, 0x0, - 0xc, 0xff, 0xe1, 0x0, 0x15, 0x67, 0x64, 0x0, - 0xc, 0xac, 0x3b, 0x50, 0xb0, 0x4b, 0x42, 0x0, - 0x0, 0x3, 0x0, 0x2, 0x0, 0x0, 0x3, 0x0, - 0x3d, 0x8, 0x1, 0x0, 0x4, 0x68, 0xee, 0x3c, - 0x80, - }, msg.Body) + require.Equal(t, &message.MsgVideo{ + ChunkStreamID: 6, + MessageStreamID: 16777216, + Body: []byte{ + 0x17, 0x0, 0x0, 0x0, 0x0, 0x1, 0x64, 0x0, + 0xc, 0xff, 0xe1, 0x0, 0x15, 0x67, 0x64, 0x0, + 0xc, 0xac, 0x3b, 0x50, 0xb0, 0x4b, 0x42, 0x0, + 0x0, 0x3, 0x0, 0x2, 0x0, 0x0, 0x3, 0x0, + 0x3d, 0x8, 0x1, 0x0, 0x4, 0x68, 0xee, 0x3c, + 0x80, + }, + }, msg) // S->C AAC decoder config msg, err = mr.Read() require.NoError(t, err) - require.Equal(t, uint8(4), msg.ChunkStreamID) - require.Equal(t, base.MessageType(0x08), msg.Type) - require.Equal(t, []byte{0xae, 0x0, 0x12, 0x10}, msg.Body) + require.Equal(t, &message.MsgAudio{ + ChunkStreamID: 4, + MessageStreamID: 16777216, + Body: []byte{0xae, 0x0, 0x12, 0x10}, + }, msg) } diff --git a/internal/rtmp/handshake/c0.go b/internal/rtmp/handshake/c0.go new file mode 100644 index 00000000..fe5ea892 --- /dev/null +++ b/internal/rtmp/handshake/c0.go @@ -0,0 +1,18 @@ +package handshake + +import ( + "io" +) + +const ( + rtmpVersion = 0x03 +) + +// C0 is the C0 part of an handshake. +type C0 struct{} + +// Read reads a C0. +func (C0) Write(w io.Writer) error { + _, err := w.Write([]byte{rtmpVersion}) + return err +} diff --git a/internal/rtmp/base/base.go b/internal/rtmp/handshake/c1.go similarity index 51% rename from internal/rtmp/base/base.go rename to internal/rtmp/handshake/c1.go index abf062d2..fdce9221 100644 --- a/internal/rtmp/base/base.go +++ b/internal/rtmp/handshake/c1.go @@ -1,12 +1,10 @@ -package base +package handshake -const ( - // ControlChunkStreamID is the stream ID used for control messages. - ControlChunkStreamID = 2 -) - -const ( - rtmpVersion = 0x03 +import ( + "crypto/hmac" + "crypto/rand" + "crypto/sha256" + "io" ) var ( @@ -30,3 +28,40 @@ var ( hsClientPartialKey = hsClientFullKey[:30] hsServerPartialKey = hsServerFullKey[:36] ) + +func hsCalcDigestPos(p []byte, base int) (pos int) { + for i := 0; i < 4; i++ { + pos += int(p[base+i]) + } + pos = (pos % 728) + base + 4 + return +} + +func hsMakeDigest(key []byte, src []byte, gap int) (dst []byte) { + h := hmac.New(sha256.New, key) + if gap <= 0 { + h.Write(src) + } else { + h.Write(src[:gap]) + h.Write(src[gap+32:]) + } + return h.Sum(nil) +} + +// C1 is the C1 part of an handshake. +type C1 struct{} + +// Read reads a C1. +func (C1) Write(w io.Writer) error { + buf := make([]byte, 1536) + copy(buf[0:4], []byte{0x00, 0x00, 0x00, 0x00}) + copy(buf[4:8], []byte{0x09, 0x00, 0x7c, 0x02}) + + rand.Read(buf[8:]) + gap := hsCalcDigestPos(buf[0:], 8) + digest := hsMakeDigest(hsClientPartialKey, buf[0:], gap) + copy(buf[gap+0:], digest) + + _, err := w.Write(buf[0:]) + return err +} diff --git a/internal/rtmp/base/handshakec2.go b/internal/rtmp/handshake/c2.go similarity index 82% rename from internal/rtmp/base/handshakec2.go rename to internal/rtmp/handshake/c2.go index 883cf4b9..6c156ed3 100644 --- a/internal/rtmp/base/handshakec2.go +++ b/internal/rtmp/handshake/c2.go @@ -1,4 +1,4 @@ -package base +package handshake import ( "bytes" @@ -28,11 +28,11 @@ func hsParse1(p []byte, peerkey []byte, key []byte) (ok bool, digest []byte) { return } -// HandshakeC2 is the C2 part of an handshake. -type HandshakeC2 struct{} +// C2 is the C2 part of an handshake. +type C2 struct{} -// Read reads a HandshakeC2. -func (HandshakeC2) Write(w io.Writer, s1s2 []byte) error { +// Read reads a C2. +func (C2) Write(w io.Writer, s1s2 []byte) error { ok, key := hsParse1(s1s2[:1536], hsServerPartialKey, hsClientFullKey) if !ok { return fmt.Errorf("unable to parse S1+S2") diff --git a/internal/rtmp/base/handshakes0.go b/internal/rtmp/handshake/s0.go similarity index 57% rename from internal/rtmp/base/handshakes0.go rename to internal/rtmp/handshake/s0.go index 24a7facb..d7ddcc2a 100644 --- a/internal/rtmp/base/handshakes0.go +++ b/internal/rtmp/handshake/s0.go @@ -1,15 +1,15 @@ -package base +package handshake import ( "fmt" "io" ) -// HandshakeS0 is the S0 part of an handshake. -type HandshakeS0 struct{} +// S0 is the S0 part of an handshake. +type S0 struct{} -// Read reads a HandshakeS0. -func (HandshakeS0) Read(r io.Reader) error { +// Read reads a S0. +func (S0) Read(r io.Reader) error { buf := make([]byte, 1) _, err := io.ReadFull(r, buf) if err != nil { diff --git a/internal/rtmp/message/message.go b/internal/rtmp/message/message.go new file mode 100644 index 00000000..b6d78652 --- /dev/null +++ b/internal/rtmp/message/message.go @@ -0,0 +1,16 @@ +package message + +import ( + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +const ( + // ControlChunkStreamID is the stream ID used for control messages. + ControlChunkStreamID = 2 +) + +// Message is a message. +type Message interface { + Unmarshal(*rawmessage.Message) error + Marshal() (*rawmessage.Message, error) +} diff --git a/internal/rtmp/message/msg_audio.go b/internal/rtmp/message/msg_audio.go new file mode 100644 index 00000000..10a74e24 --- /dev/null +++ b/internal/rtmp/message/msg_audio.go @@ -0,0 +1,31 @@ +package message + +import ( + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgAudio is an audio message. +type MsgAudio struct { + ChunkStreamID byte + MessageStreamID uint32 + Body []byte +} + +// Unmarshal implements Message. +func (m *MsgAudio) Unmarshal(raw *rawmessage.Message) error { + m.ChunkStreamID = raw.ChunkStreamID + m.MessageStreamID = raw.MessageStreamID + m.Body = raw.Body + return nil +} + +// Marshal implements Message. +func (m MsgAudio) Marshal() (*rawmessage.Message, error) { + return &rawmessage.Message{ + ChunkStreamID: m.ChunkStreamID, + Type: chunk.MessageTypeAudio, + MessageStreamID: m.MessageStreamID, + Body: m.Body, + }, nil +} diff --git a/internal/rtmp/message/msg_command_amf0.go b/internal/rtmp/message/msg_command_amf0.go new file mode 100644 index 00000000..412a9af8 --- /dev/null +++ b/internal/rtmp/message/msg_command_amf0.go @@ -0,0 +1,39 @@ +package message + +import ( + "github.com/notedit/rtmp/format/flv/flvio" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgCommandAMF0 is a AMF0 command message. +type MsgCommandAMF0 struct { + ChunkStreamID byte + MessageStreamID uint32 + Payload []interface{} +} + +// Unmarshal implements Message. +func (m *MsgCommandAMF0) Unmarshal(raw *rawmessage.Message) error { + m.ChunkStreamID = raw.ChunkStreamID + m.MessageStreamID = raw.MessageStreamID + + payload, err := flvio.ParseAMFVals(raw.Body, false) + if err != nil { + return err + } + m.Payload = payload + + return nil +} + +// Marshal implements Message. +func (m MsgCommandAMF0) Marshal() (*rawmessage.Message, error) { + return &rawmessage.Message{ + ChunkStreamID: m.ChunkStreamID, + Type: chunk.MessageTypeCommandAMF0, + MessageStreamID: m.MessageStreamID, + Body: flvio.FillAMF0ValsMalloc(m.Payload), + }, nil +} diff --git a/internal/rtmp/message/msg_dataamf0.go b/internal/rtmp/message/msg_dataamf0.go new file mode 100644 index 00000000..c7979595 --- /dev/null +++ b/internal/rtmp/message/msg_dataamf0.go @@ -0,0 +1,39 @@ +package message + +import ( + "github.com/notedit/rtmp/format/flv/flvio" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgDataAMF0 is a AMF0 data message. +type MsgDataAMF0 struct { + ChunkStreamID byte + MessageStreamID uint32 + Payload []interface{} +} + +// Unmarshal implements Message. +func (m *MsgDataAMF0) Unmarshal(raw *rawmessage.Message) error { + m.ChunkStreamID = raw.ChunkStreamID + m.MessageStreamID = raw.MessageStreamID + + payload, err := flvio.ParseAMFVals(raw.Body, false) + if err != nil { + return err + } + m.Payload = payload + + return nil +} + +// Marshal implements Message. +func (m MsgDataAMF0) Marshal() (*rawmessage.Message, error) { + return &rawmessage.Message{ + ChunkStreamID: m.ChunkStreamID, + Type: chunk.MessageTypeDataAMF0, + MessageStreamID: m.MessageStreamID, + Body: flvio.FillAMF0ValsMalloc(m.Payload), + }, nil +} diff --git a/internal/rtmp/message/msg_setchunksize.go b/internal/rtmp/message/msg_setchunksize.go new file mode 100644 index 00000000..418b3716 --- /dev/null +++ b/internal/rtmp/message/msg_setchunksize.go @@ -0,0 +1,40 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgSetChunkSize is a set chunk size message. +type MsgSetChunkSize struct { + Value uint32 +} + +// Unmarshal implements Message. +func (m *MsgSetChunkSize) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 4 { + return fmt.Errorf("unexpected body size") + } + + m.Value = binary.BigEndian.Uint32(raw.Body) + return nil +} + +// Marshal implements Message. +func (m *MsgSetChunkSize) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 4) + binary.BigEndian.PutUint32(body, m.Value) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeSetChunkSize, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_setpeerbandwidth.go b/internal/rtmp/message/msg_setpeerbandwidth.go new file mode 100644 index 00000000..d5da16ac --- /dev/null +++ b/internal/rtmp/message/msg_setpeerbandwidth.go @@ -0,0 +1,43 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgSetPeerBandwidth is a set peer bandwidth message. +type MsgSetPeerBandwidth struct { + Value uint32 + Type byte +} + +// Unmarshal implements Message. +func (m *MsgSetPeerBandwidth) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 5 { + return fmt.Errorf("unexpected body size") + } + + m.Value = binary.BigEndian.Uint32(raw.Body) + m.Type = raw.Body[4] + return nil +} + +// Marshal implements Message. +func (m *MsgSetPeerBandwidth) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 5) + binary.BigEndian.PutUint32(body, m.Value) + body[4] = m.Type + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeSetChunkSize, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_setwindowacksize.go b/internal/rtmp/message/msg_setwindowacksize.go new file mode 100644 index 00000000..7e45a9b8 --- /dev/null +++ b/internal/rtmp/message/msg_setwindowacksize.go @@ -0,0 +1,40 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgSetWindowAckSize is a set window acknowledgement message. +type MsgSetWindowAckSize struct { + Value uint32 +} + +// Unmarshal implements Message. +func (m *MsgSetWindowAckSize) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) != 4 { + return fmt.Errorf("unexpected body size") + } + + m.Value = binary.BigEndian.Uint32(raw.Body) + return nil +} + +// Marshal implements Message. +func (m *MsgSetWindowAckSize) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 4) + binary.BigEndian.PutUint32(body, m.Value) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeSetWindowAckSize, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_usercontrol.go b/internal/rtmp/message/msg_usercontrol.go new file mode 100644 index 00000000..c8818aeb --- /dev/null +++ b/internal/rtmp/message/msg_usercontrol.go @@ -0,0 +1,44 @@ +package message + +import ( + "encoding/binary" + "fmt" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgUserControl is a user control message. +type MsgUserControl struct { + Type uint16 + Payload []byte +} + +// Unmarshal implements Message. +func (m *MsgUserControl) Unmarshal(raw *rawmessage.Message) error { + if raw.ChunkStreamID != ControlChunkStreamID { + return fmt.Errorf("unexpected chunk stream ID") + } + + if len(raw.Body) < 2 { + return fmt.Errorf("unexpected body size") + } + + m.Type = binary.BigEndian.Uint16(raw.Body) + m.Payload = raw.Body[2:] + + return nil +} + +// Marshal implements Message. +func (m MsgUserControl) Marshal() (*rawmessage.Message, error) { + body := make([]byte, 2+len(m.Payload)) + binary.BigEndian.PutUint16(body, m.Type) + copy(body[2:], m.Payload) + + return &rawmessage.Message{ + ChunkStreamID: ControlChunkStreamID, + Type: chunk.MessageTypeUserControl, + Body: body, + }, nil +} diff --git a/internal/rtmp/message/msg_video.go b/internal/rtmp/message/msg_video.go new file mode 100644 index 00000000..81f2e579 --- /dev/null +++ b/internal/rtmp/message/msg_video.go @@ -0,0 +1,31 @@ +package message + +import ( + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// MsgVideo is a video message. +type MsgVideo struct { + ChunkStreamID byte + MessageStreamID uint32 + Body []byte +} + +// Unmarshal implements Message. +func (m *MsgVideo) Unmarshal(raw *rawmessage.Message) error { + m.ChunkStreamID = raw.ChunkStreamID + m.MessageStreamID = raw.MessageStreamID + m.Body = raw.Body + return nil +} + +// Marshal implements Message. +func (m MsgVideo) Marshal() (*rawmessage.Message, error) { + return &rawmessage.Message{ + ChunkStreamID: m.ChunkStreamID, + Type: chunk.MessageTypeVideo, + MessageStreamID: m.MessageStreamID, + Body: m.Body, + }, nil +} diff --git a/internal/rtmp/message/reader.go b/internal/rtmp/message/reader.go new file mode 100644 index 00000000..e60ed0db --- /dev/null +++ b/internal/rtmp/message/reader.go @@ -0,0 +1,77 @@ +package message + +import ( + "fmt" + "io" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +func messageFromType(typ chunk.MessageType) (Message, error) { + switch typ { + case chunk.MessageTypeSetChunkSize: + return &MsgSetChunkSize{}, nil + + case chunk.MessageTypeSetWindowAckSize: + return &MsgSetWindowAckSize{}, nil + + case chunk.MessageTypeSetPeerBandwidth: + return &MsgSetPeerBandwidth{}, nil + + case chunk.MessageTypeUserControl: + return &MsgUserControl{}, nil + + case chunk.MessageTypeCommandAMF0: + return &MsgCommandAMF0{}, nil + + case chunk.MessageTypeDataAMF0: + return &MsgDataAMF0{}, nil + + case chunk.MessageTypeAudio: + return &MsgAudio{}, nil + + case chunk.MessageTypeVideo: + return &MsgVideo{}, nil + + default: + return nil, fmt.Errorf("unhandled message") + } +} + +// Reader is a message reader. +type Reader struct { + r *rawmessage.Reader +} + +// NewReader allocates a Reader. +func NewReader(r io.Reader) *Reader { + return &Reader{ + r: rawmessage.NewReader(r), + } +} + +// SetChunkSize sets the maximum chunk size. +func (r *Reader) SetChunkSize(v int) { + r.r.SetChunkSize(v) +} + +// Read reads a essage. +func (r *Reader) Read() (Message, error) { + raw, err := r.r.Read() + if err != nil { + return nil, err + } + + msg, err := messageFromType(raw.Type) + if err != nil { + return nil, err + } + + err = msg.Unmarshal(raw) + if err != nil { + return nil, err + } + + return msg, nil +} diff --git a/internal/rtmp/message/writer.go b/internal/rtmp/message/writer.go new file mode 100644 index 00000000..0661cda9 --- /dev/null +++ b/internal/rtmp/message/writer.go @@ -0,0 +1,34 @@ +package message + +import ( + "io" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/rawmessage" +) + +// Writer is a message writer. +type Writer struct { + w *rawmessage.Writer +} + +// NewWriter allocates a Writer. +func NewWriter(w io.Writer) *Writer { + return &Writer{ + w: rawmessage.NewWriter(w), + } +} + +// SetChunkSize sets the maximum chunk size. +func (mw *Writer) SetChunkSize(v int) { + mw.w.SetChunkSize(v) +} + +// Write writes a message. +func (mw *Writer) Write(msg Message) error { + raw, err := msg.Marshal() + if err != nil { + return err + } + + return mw.w.Write(raw) +} diff --git a/internal/rtmp/rawmessage/message.go b/internal/rtmp/rawmessage/message.go new file mode 100644 index 00000000..631e6225 --- /dev/null +++ b/internal/rtmp/rawmessage/message.go @@ -0,0 +1,14 @@ +package rawmessage + +import ( + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" +) + +// Message is a raw message. +type Message struct { + ChunkStreamID byte + Timestamp uint32 + Type chunk.MessageType + MessageStreamID uint32 + Body []byte +} diff --git a/internal/rtmp/base/rawmessagereader.go b/internal/rtmp/rawmessage/reader.go similarity index 77% rename from internal/rtmp/base/rawmessagereader.go rename to internal/rtmp/rawmessage/reader.go index 3fc2ec71..e87ef364 100644 --- a/internal/rtmp/base/rawmessagereader.go +++ b/internal/rtmp/rawmessage/reader.go @@ -1,30 +1,33 @@ -package base +package rawmessage import ( "bufio" "errors" "fmt" + "io" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" ) var errMoreChunksNeeded = errors.New("more chunks are needed") -type rawRawMessageReaderChunkStream struct { - mr *RawMessageReader +type readerChunkStream struct { + mr *Reader curTimestamp *uint32 - curType *MessageType + curType *chunk.MessageType curMessageStreamID *uint32 curBodyLen *uint32 curBody *[]byte } -func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { +func (rc *readerChunkStream) read(typ byte) (*Message, error) { switch typ { case 0: if rc.curBody != nil { return nil, fmt.Errorf("received type 0 chunk but expected type 3 chunk") } - var c0 Chunk0 + var c0 chunk.Chunk0 err := c0.Read(rc.mr.r, rc.mr.chunkSize) if err != nil { return nil, err @@ -44,7 +47,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { return nil, errMoreChunksNeeded } - return &RawMessage{ + return &Message{ Timestamp: c0.Timestamp, Type: c0.Type, MessageStreamID: c0.MessageStreamID, @@ -60,7 +63,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { return nil, fmt.Errorf("received type 1 chunk but expected type 3 chunk") } - var c1 Chunk1 + var c1 chunk.Chunk1 err := c1.Read(rc.mr.r, rc.mr.chunkSize) if err != nil { return nil, err @@ -78,7 +81,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { return nil, errMoreChunksNeeded } - return &RawMessage{ + return &Message{ Timestamp: *rc.curTimestamp + c1.TimestampDelta, Type: c1.Type, MessageStreamID: *rc.curMessageStreamID, @@ -99,7 +102,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { chunkBodyLen = rc.mr.chunkSize } - var c2 Chunk2 + var c2 chunk.Chunk2 err := c2.Read(rc.mr.r, chunkBodyLen) if err != nil { return nil, err @@ -113,7 +116,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { return nil, errMoreChunksNeeded } - return &RawMessage{ + return &Message{ Timestamp: *rc.curTimestamp + c2.TimestampDelta, Type: *rc.curType, MessageStreamID: *rc.curMessageStreamID, @@ -134,7 +137,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { chunkBodyLen = rc.mr.chunkSize } - var c3 Chunk3 + var c3 chunk.Chunk3 err := c3.Read(rc.mr.r, chunkBodyLen) if err != nil { return nil, err @@ -149,7 +152,7 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { body := *rc.curBody rc.curBody = nil - return &RawMessage{ + return &Message{ Timestamp: *rc.curTimestamp, Type: *rc.curType, MessageStreamID: *rc.curMessageStreamID, @@ -158,30 +161,31 @@ func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) { } } -// RawMessageReader is a message reader. -type RawMessageReader struct { +// Reader is a raw message reader. +type Reader struct { r *bufio.Reader chunkSize int - chunkStreams map[byte]*rawRawMessageReaderChunkStream + chunkStreams map[byte]*readerChunkStream } -// NewRawMessageReader allocates a RawMessageReader. -func NewRawMessageReader(r *bufio.Reader) *RawMessageReader { - return &RawMessageReader{ - r: r, +// NewReader allocates a Reader. +func NewReader(r io.Reader) *Reader { + return &Reader{ + r: bufio.NewReader(r), chunkSize: 128, - chunkStreams: make(map[byte]*rawRawMessageReaderChunkStream), + chunkStreams: make(map[byte]*readerChunkStream), } } // SetChunkSize sets the maximum chunk size. -func (mr *RawMessageReader) SetChunkSize(v int) { - mr.chunkSize = v +func (r *Reader) SetChunkSize(v int) { + r.chunkSize = v } -func (mr *RawMessageReader) Read() (*RawMessage, error) { +// Read reads a Message. +func (r *Reader) Read() (*Message, error) { for { - byt, err := mr.r.ReadByte() + byt, err := r.r.ReadByte() if err != nil { return nil, err } @@ -189,13 +193,13 @@ func (mr *RawMessageReader) Read() (*RawMessage, error) { typ := byt >> 6 chunkStreamID := byt & 0x3F - rc, ok := mr.chunkStreams[chunkStreamID] + rc, ok := r.chunkStreams[chunkStreamID] if !ok { - rc = &rawRawMessageReaderChunkStream{mr: mr} - mr.chunkStreams[chunkStreamID] = rc + rc = &readerChunkStream{mr: r} + r.chunkStreams[chunkStreamID] = rc } - mr.r.UnreadByte() + r.r.UnreadByte() msg, err := rc.read(typ) if err != nil { diff --git a/internal/rtmp/base/rawmessagewriter.go b/internal/rtmp/rawmessage/writer.go similarity index 74% rename from internal/rtmp/base/rawmessagewriter.go rename to internal/rtmp/rawmessage/writer.go index 23add0a3..20b069c4 100644 --- a/internal/rtmp/base/rawmessagewriter.go +++ b/internal/rtmp/rawmessage/writer.go @@ -1,19 +1,21 @@ -package base +package rawmessage import ( "io" + + "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" ) -type rawMessageWriterChunkStream struct { - mw *RawMessageWriter +type writerChunkStream struct { + mw *Writer lastMessageStreamID *uint32 - lastType *MessageType + lastType *chunk.MessageType lastBodyLen *int lastTimestamp *uint32 lastTimestampDelta *uint32 } -func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error { +func (wc *writerChunkStream) write(msg *Message) error { bodyLen := len(msg.Body) pos := 0 firstChunk := true @@ -40,7 +42,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error { switch { case wc.lastMessageStreamID == nil || timestampDelta == nil || *wc.lastMessageStreamID != msg.MessageStreamID: - err := Chunk0{ + err := chunk.Chunk0{ ChunkStreamID: msg.ChunkStreamID, Timestamp: msg.Timestamp, Type: msg.Type, @@ -53,7 +55,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error { } case *wc.lastType != msg.Type || *wc.lastBodyLen != bodyLen: - err := Chunk1{ + err := chunk.Chunk1{ ChunkStreamID: msg.ChunkStreamID, TimestampDelta: *timestampDelta, Type: msg.Type, @@ -65,7 +67,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error { } case wc.lastTimestampDelta == nil || *wc.lastTimestampDelta != *timestampDelta: - err := Chunk2{ + err := chunk.Chunk2{ ChunkStreamID: msg.ChunkStreamID, TimestampDelta: *timestampDelta, Body: msg.Body[pos : pos+chunkBodyLen], @@ -75,7 +77,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error { } default: - err := Chunk3{ + err := chunk.Chunk3{ ChunkStreamID: msg.ChunkStreamID, Body: msg.Body[pos : pos+chunkBodyLen], }.Write(wc.mw.w) @@ -98,7 +100,7 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error { wc.lastTimestampDelta = &v5 } } else { - err := Chunk3{ + err := chunk.Chunk3{ ChunkStreamID: msg.ChunkStreamID, Body: msg.Body[pos : pos+chunkBodyLen], }.Write(wc.mw.w) @@ -115,33 +117,33 @@ func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error { } } -// RawMessageWriter is a message writer. -type RawMessageWriter struct { +// Writer is a raw message writer. +type Writer struct { w io.Writer chunkSize int - chunkStreams map[byte]*rawMessageWriterChunkStream + chunkStreams map[byte]*writerChunkStream } -// NewRawMessageWriter allocates a RawMessageWriter. -func NewRawMessageWriter(w io.Writer) *RawMessageWriter { - return &RawMessageWriter{ +// NewWriter allocates a Writer. +func NewWriter(w io.Writer) *Writer { + return &Writer{ w: w, chunkSize: 128, - chunkStreams: make(map[byte]*rawMessageWriterChunkStream), + chunkStreams: make(map[byte]*writerChunkStream), } } // SetChunkSize sets the maximum chunk size. -func (mw *RawMessageWriter) SetChunkSize(v int) { - mw.chunkSize = v +func (w *Writer) SetChunkSize(v int) { + w.chunkSize = v } // Write writes a Message. -func (mw *RawMessageWriter) Write(msg *RawMessage) error { - wc, ok := mw.chunkStreams[msg.ChunkStreamID] +func (w *Writer) Write(msg *Message) error { + wc, ok := w.chunkStreams[msg.ChunkStreamID] if !ok { - wc = &rawMessageWriterChunkStream{mw: mw} - mw.chunkStreams[msg.ChunkStreamID] = wc + wc = &writerChunkStream{mw: w} + w.chunkStreams[msg.ChunkStreamID] = wc } return wc.write(msg)