diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 9ac3716a..f881cd1b 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -47,6 +47,7 @@ func decrypt(key string, byts []byte) ([]byte, error) { // Conf is the main program configuration. type Conf struct { + // general LogLevel string `yaml:"logLevel"` LogLevelParsed logger.Level `yaml:"-" json:"-"` LogDestinations []string `yaml:"logDestinations"` @@ -61,6 +62,7 @@ type Conf struct { RunOnConnect string `yaml:"runOnConnect"` RunOnConnectRestart bool `yaml:"runOnConnectRestart"` + // rtsp Protocols []string `yaml:"protocols"` ProtocolsParsed map[gortsplib.StreamProtocol]struct{} `yaml:"-" json:"-"` Encryption string `yaml:"encryption"` @@ -75,9 +77,11 @@ type Conf struct { AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"` ReadBufferSize int `yaml:"readBufferSize"` + // rtmp RTMPEnable bool `yaml:"rtmpEnable"` RTMPPort int `yaml:"rtmpPort"` + // path Paths map[string]*PathConf `yaml:"paths"` } diff --git a/internal/serverrtsp/server.go b/internal/serverrtsp/server.go index e23bb152..0e0cfb7f 100644 --- a/internal/serverrtsp/server.go +++ b/internal/serverrtsp/server.go @@ -33,6 +33,7 @@ func New( readTimeout time.Duration, writeTimeout time.Duration, readBufferCount int, + readBufferSize int, useUDP bool, rtpPort int, rtcpPort int, @@ -45,6 +46,7 @@ func New( ReadTimeout: readTimeout, WriteTimeout: writeTimeout, ReadBufferCount: readBufferCount, + ReadBufferSize: readBufferSize, } if useUDP { diff --git a/main.go b/main.go index e239ab1f..2e05a210 100644 --- a/main.go +++ b/main.go @@ -193,6 +193,7 @@ func (p *program) createResources(initial bool) error { p.conf.ReadTimeout, p.conf.WriteTimeout, p.conf.ReadBufferCount, + p.conf.ReadBufferSize, useUDP, p.conf.RTPPort, p.conf.RTCPPort, @@ -215,6 +216,7 @@ func (p *program) createResources(initial bool) error { p.conf.ReadTimeout, p.conf.WriteTimeout, p.conf.ReadBufferCount, + p.conf.ReadBufferSize, false, 0, 0, diff --git a/main_test.go b/main_test.go index e2ca5b14..3392393b 100644 --- a/main_test.go +++ b/main_test.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "bytes" "fmt" "io/ioutil" "net" @@ -691,6 +692,93 @@ func TestRTSPPath(t *testing.T) { } } +func TestRTSPNonCompliantFrameSize(t *testing.T) { + t.Run("publish", func(t *testing.T) { + p, ok := testProgram("readBufferSize: 4500\n") + require.Equal(t, true, ok) + defer p.close() + + track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + conf := gortsplib.ClientConf{ + StreamProtocol: func() *gortsplib.StreamProtocol { + v := gortsplib.StreamProtocolTCP + return &v + }(), + } + + source, err := conf.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", + gortsplib.Tracks{track}) + require.NoError(t, err) + defer source.Close() + + buf := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 4096/5) + err = source.WriteFrame(track.ID, gortsplib.StreamTypeRTP, buf) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + err = source.WriteFrame(track.ID, gortsplib.StreamTypeRTP, buf) + require.NoError(t, err) + }) + + t.Run("proxy", func(t *testing.T) { + p1, ok := testProgram("protocols: [tcp]\n" + + "readBufferSize: 4500\n") + require.Equal(t, true, ok) + defer p1.close() + + track, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + conf := gortsplib.ClientConf{ + StreamProtocol: func() *gortsplib.StreamProtocol { + v := gortsplib.StreamProtocolTCP + return &v + }(), + ReadBufferSize: 4500, + } + + source, err := conf.DialPublish("rtsp://"+ownDockerIP+":8554/teststream", + gortsplib.Tracks{track}) + require.NoError(t, err) + defer source.Close() + + p2, ok := testProgram("protocols: [tcp]\n" + + "readBufferSize: 4500\n" + + "rtspPort: 8555\n" + + "paths:\n" + + " teststream:\n" + + " source: rtsp://" + ownDockerIP + ":8554/teststream\n") + require.Equal(t, true, ok) + defer p2.close() + + time.Sleep(100 * time.Millisecond) + + dest, err := conf.DialRead("rtsp://" + ownDockerIP + ":8555/teststream") + require.NoError(t, err) + defer dest.Close() + + done := make(chan struct{}) + cerr := dest.ReadFrames(func(trackID int, typ gortsplib.StreamType, buf []byte) { + if typ == gortsplib.StreamTypeRTP { + close(done) + } + }) + + buf := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 4096/5) + err = source.WriteFrame(track.ID, gortsplib.StreamTypeRTP, buf) + require.NoError(t, err) + + select { + case err := <-cerr: + t.Error(err) + case <-done: + } + }) +} + func TestRTSPRedirect(t *testing.T) { p1, ok := testProgram("paths:\n" + " path1:\n" +