diff --git a/client.go b/client.go index 75e0d353..f2c45a51 100644 --- a/client.go +++ b/client.go @@ -16,10 +16,6 @@ import ( const ( clientCheckStreamInterval = 5 * time.Second clientReceiverReportInterval = 10 * time.Second - clientTCPReadBufferSize = 128 * 1024 - clientTCPWriteBufferSize = 128 * 1024 - clientUDPReadBufferSize = 2048 - clientUDPWriteBufferSize = 128 * 1024 ) type clientDescribeReq struct { @@ -119,9 +115,10 @@ func newClient(p *program, nconn net.Conn) *client { c := &client{ p: p, conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{ - Conn: nconn, - ReadTimeout: p.conf.ReadTimeout, - WriteTimeout: p.conf.WriteTimeout, + Conn: nconn, + ReadTimeout: p.conf.ReadTimeout, + WriteTimeout: p.conf.WriteTimeout, + ReadBufferCount: 2, }), state: clientStateInitial, streamTracks: make(map[int]*clientTrack), @@ -946,14 +943,8 @@ func (c *client) runPlayTCP() { readDone := make(chan error) go func() { - frame := &gortsplib.InterleavedFrame{} - readBuf := make([]byte, clientTCPReadBufferSize) - for { - frame.Content = readBuf - frame.Content = frame.Content[:cap(frame.Content)] - - recv, err := c.conn.ReadFrameOrRequest(frame, false) + recv, err := c.conn.ReadFrameOrRequest(false) if err != nil { readDone <- err break @@ -1144,19 +1135,13 @@ func (c *client) runRecordUDP() { } func (c *client) runRecordTCP() { - frame := &gortsplib.InterleavedFrame{} - readBuf := newMultiBuffer(2, clientTCPReadBufferSize) - readRequest := make(chan readRequestPair) defer close(readRequest) readDone := make(chan error) go func() { for { - frame.Content = readBuf.next() - frame.Content = frame.Content[:cap(frame.Content)] - - recv, err := c.conn.ReadFrameOrRequest(frame, true) + recv, err := c.conn.ReadFrameOrRequest(true) if err != nil { readDone <- err break @@ -1164,14 +1149,14 @@ func (c *client) runRecordTCP() { switch recvt := recv.(type) { case *gortsplib.InterleavedFrame: - if frame.TrackId >= len(c.streamTracks) { - readDone <- fmt.Errorf("invalid track id '%d'", frame.TrackId) + if recvt.TrackId >= len(c.streamTracks) { + readDone <- fmt.Errorf("invalid track id '%d'", recvt.TrackId) break } - c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) + c.rtcpReceivers[recvt.TrackId].OnFrame(recvt.StreamType, recvt.Content) - c.p.readersMap.forwardFrame(c.path, frame.TrackId, frame.StreamType, frame.Content) + c.p.readersMap.forwardFrame(c.path, recvt.TrackId, recvt.StreamType, recvt.Content) case *gortsplib.Request: err := c.handleRequest(recvt) diff --git a/go.mod b/go.mod index 2f018180..f5887054 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200920093758-10469faa0777 + github.com/aler9/gortsplib v0.0.0-20200920120711-81b5754b0b0c github.com/davecgh/go-spew v1.1.1 // indirect github.com/stretchr/testify v1.6.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index b5d1545f..2e391932 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200920093758-10469faa0777 h1:WpP46odBYEPXa1GLtGrf8W8gZGLohDJCJTYmlPJCo2w= -github.com/aler9/gortsplib v0.0.0-20200920093758-10469faa0777/go.mod h1:IQy51zikcH4wQFNwYPHtC0+HTcPlahJcxcYiMqlCyiw= +github.com/aler9/gortsplib v0.0.0-20200920120711-81b5754b0b0c h1:tXQF5q+mlu5KEALSgHR+ReWfF+cS7h3oM4+nJuTBf7E= +github.com/aler9/gortsplib v0.0.0-20200920120711-81b5754b0b0c/go.mod h1:IQy51zikcH4wQFNwYPHtC0+HTcPlahJcxcYiMqlCyiw= github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625 h1:A3upkpYzceQTuBPvVleu1zd6R8jInhg5ifimSO7ku/o= github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/proxy.go b/proxy.go index 237d6211..53a3e83b 100644 --- a/proxy.go +++ b/proxy.go @@ -9,9 +9,7 @@ import ( ) const ( - proxyRetryInterval = 5 * time.Second - proxyUDPReadBufferSize = 2048 - proxyTCPReadBufferSize = 128 * 1024 + proxyRetryInterval = 5 * time.Second ) type proxyState int @@ -137,9 +135,10 @@ func (s *proxy) runInnerInner() bool { dialDone := make(chan struct{}) go func() { conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{ - Host: s.pathConf.sourceUrl.Host, - ReadTimeout: s.p.conf.ReadTimeout, - WriteTimeout: s.p.conf.WriteTimeout, + Host: s.pathConf.sourceUrl.Host, + ReadTimeout: s.p.conf.ReadTimeout, + WriteTimeout: s.p.conf.WriteTimeout, + ReadBufferCount: 2, }) close(dialDone) }() @@ -216,17 +215,14 @@ func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool { go func(trackId int, rtpRead gortsplib.UDPReadFunc) { defer wg.Done() - multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize) for { - buf := multiBuf.next() - - n, err := rtpRead(buf) + buf, err := rtpRead() if err != nil { break } s.p.readersMap.forwardFrame(s.path, trackId, - gortsplib.StreamTypeRtp, buf[:n]) + gortsplib.StreamTypeRtp, buf) } }(trackId, rtpRead) } @@ -237,17 +233,14 @@ func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool { go func(trackId int, rtcpRead gortsplib.UDPReadFunc) { defer wg.Done() - multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize) for { - buf := multiBuf.next() - - n, err := rtcpRead(buf) + buf, err := rtcpRead() if err != nil { break } s.p.readersMap.forwardFrame(s.path, trackId, - gortsplib.StreamTypeRtcp, buf[:n]) + gortsplib.StreamTypeRtcp, buf) } }(trackId, rtcpRead) } @@ -302,16 +295,10 @@ func (s *proxy) runTCP(conn *gortsplib.ConnClient) bool { s.p.proxyReady <- s - frame := &gortsplib.InterleavedFrame{} - multiBuf := newMultiBuffer(2, proxyTCPReadBufferSize) - tcpConnDone := make(chan error) go func() { for { - frame.Content = multiBuf.next() - frame.Content = frame.Content[:cap(frame.Content)] - - err := conn.ReadFrame(frame) + frame, err := conn.ReadFrame() if err != nil { tcpConnDone <- err return diff --git a/serverudp.go b/serverudp.go index bd2a1cb5..d2e05b5e 100644 --- a/serverudp.go +++ b/serverudp.go @@ -8,6 +8,10 @@ import ( "github.com/aler9/gortsplib" ) +const ( + udpReadBufferSize = 2048 +) + type udpBufAddrPair struct { buf []byte addr *net.UDPAddr @@ -17,7 +21,7 @@ type serverUDP struct { p *program pc *net.UDPConn streamType gortsplib.StreamType - readBuf *multiBuffer + readBuf *gortsplib.MultiBuffer writec chan udpBufAddrPair done chan struct{} @@ -35,7 +39,7 @@ func newServerUDP(p *program, port int, streamType gortsplib.StreamType) (*serve p: p, pc: pc, streamType: streamType, - readBuf: newMultiBuffer(2, clientUDPReadBufferSize), + readBuf: gortsplib.NewMultiBuffer(2, udpReadBufferSize), writec: make(chan udpBufAddrPair), done: make(chan struct{}), } @@ -65,7 +69,7 @@ func (l *serverUDP) run() { }() for { - buf := l.readBuf.next() + buf := l.readBuf.Next() n, addr, err := l.pc.ReadFromUDP(buf) if err != nil { break diff --git a/utils.go b/utils.go index e547cfb0..6c57eb6f 100644 --- a/utils.go +++ b/utils.go @@ -51,31 +51,6 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool { return false } -type multiBuffer struct { - buffers [][]byte - curBuf int -} - -func newMultiBuffer(count int, size int) *multiBuffer { - buffers := make([][]byte, count) - for i := 0; i < count; i++ { - buffers[i] = make([]byte, size) - } - - return &multiBuffer{ - buffers: buffers, - } -} - -func (mb *multiBuffer) next() []byte { - ret := mb.buffers[mb.curBuf] - mb.curBuf += 1 - if mb.curBuf >= len(mb.buffers) { - mb.curBuf = 0 - } - return ret -} - func splitPath(path string) (string, string, error) { pos := func() int { for i := len(path) - 1; i >= 0; i-- {