From 0219d24e99769ce833ebb16a20f8c46e575ee45e Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 27 Jun 2020 16:19:02 +0200 Subject: [PATCH] improve performance by using static buffers instead of make() --- go.mod | 2 +- go.sum | 4 +-- main.go | 33 +++++++++--------------- server-client.go | 66 ++++++++++++++++++++++++++++++++++++++---------- server-udpl.go | 50 +++++++++++++++++++++++++++++------- 5 files changed, 108 insertions(+), 47 deletions(-) diff --git a/go.mod b/go.mod index 777deab5..82679c6e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200614194126-1173d41d7898 + github.com/aler9/gortsplib v0.0.0-20200627140616-6bbbfefb75c8 github.com/stretchr/testify v1.4.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gortc.io/sdp v0.18.2 diff --git a/go.sum b/go.sum index f278583f..ff13dfe1 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200614194126-1173d41d7898 h1:ssMA0uv5d6RQQ6uZBHJklnOG0210xKboDpfVKkjmexY= -github.com/aler9/gortsplib v0.0.0-20200614194126-1173d41d7898/go.mod h1:sL64nUkmrTVhlT/GCaxRXyI2Xk7m8XSdw5Uv8xKGPdc= +github.com/aler9/gortsplib v0.0.0-20200627140616-6bbbfefb75c8 h1:uWmnp0579ky3QLU3cwo+WuBX+S13NC1Jx4aswCJ3Mws= +github.com/aler9/gortsplib v0.0.0-20200627140616-6bbbfefb75c8/go.mod h1:sL64nUkmrTVhlT/GCaxRXyI2Xk7m8XSdw5Uv8xKGPdc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/main.go b/main.go index 1cbe31cb..bd5ea87a 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/aler9/gortsplib" "gopkg.in/alecthomas/kingpin.v2" "gortc.io/sdp" ) @@ -586,30 +585,22 @@ func (p *program) forwardTrack(path string, id int, trackFlowType trackFlowType, if c.path == path && c.state == _CLIENT_STATE_PLAY { if c.streamProtocol == _STREAM_PROTOCOL_UDP { if trackFlowType == _TRACK_FLOW_RTP { - p.udplRtp.write <- &udpWrite{ - addr: &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: c.streamTracks[id].rtpPort, - }, - buf: frame, - } + p.udplRtp.write(&net.UDPAddr{ + IP: c.ip(), + Zone: c.zone(), + Port: c.streamTracks[id].rtpPort, + }, frame) + } else { - p.udplRtcp.write <- &udpWrite{ - addr: &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: c.streamTracks[id].rtcpPort, - }, - buf: frame, - } + p.udplRtcp.write(&net.UDPAddr{ + IP: c.ip(), + Zone: c.zone(), + Port: c.streamTracks[id].rtcpPort, + }, frame) } } else { - c.write <- &gortsplib.InterleavedFrame{ - Channel: trackToInterleavedChannel(id, trackFlowType), - Content: frame, - } + c.writeFrame(trackToInterleavedChannel(id, trackFlowType), frame) } } } diff --git a/server-client.go b/server-client.go index 266efb22..ba22f62f 100644 --- a/server-client.go +++ b/server-client.go @@ -79,9 +79,15 @@ type serverClient struct { streamTracks []*track udpLastFrameTime time.Time udpCheckStreamTicker *time.Ticker + readBuf1 []byte + readBuf2 []byte + readCurBuf bool + writeBuf1 []byte + writeBuf2 []byte + writeCurBuf bool - write chan *gortsplib.InterleavedFrame - done chan struct{} + writec chan *gortsplib.InterleavedFrame + done chan struct{} } func newServerClient(p *program, nconn net.Conn) *serverClient { @@ -92,20 +98,19 @@ func newServerClient(p *program, nconn net.Conn) *serverClient { ReadTimeout: p.args.readTimeout, WriteTimeout: p.args.writeTimeout, }), - state: _CLIENT_STATE_STARTING, - write: make(chan *gortsplib.InterleavedFrame), - done: make(chan struct{}), + state: _CLIENT_STATE_STARTING, + readBuf1: make([]byte, 0, 512*1024), + readBuf2: make([]byte, 0, 512*1024), + writeBuf1: make([]byte, 2048), + writeBuf2: make([]byte, 2048), + writec: make(chan *gortsplib.InterleavedFrame), + done: make(chan struct{}), } go c.run() return c } -func (c *serverClient) close() { - c.conn.NetConn().Close() - <-c.done -} - func (c *serverClient) log(format string, args ...interface{}) { c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...) } @@ -147,7 +152,7 @@ func (c *serverClient) run() { } go func() { - for range c.write { + for range c.writec { } }() @@ -165,11 +170,34 @@ func (c *serverClient) run() { c.p.events <- programEventClientClose{done, c} <-done - close(c.write) + close(c.writec) close(c.done) } +func (c *serverClient) close() { + c.conn.NetConn().Close() + <-c.done +} + +func (c *serverClient) writeFrame(channel uint8, inbuf []byte) { + var buf []byte + if !c.writeCurBuf { + buf = c.writeBuf1 + } else { + buf = c.writeBuf2 + } + + buf = buf[:len(inbuf)] + copy(buf, inbuf) + c.writeCurBuf = !c.writeCurBuf + + c.writec <- &gortsplib.InterleavedFrame{ + Channel: channel, + Content: buf, + } +} + func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) { c.log("ERR: %s", err) @@ -696,7 +724,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { if c.streamProtocol == _STREAM_PROTOCOL_TCP { // write RTP frames sequentially go func() { - for frame := range c.write { + for frame := range c.writec { c.conn.WriteInterleavedFrame(frame) } }() @@ -782,8 +810,18 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { // when protocol is TCP, the RTSP connection becomes a RTP connection // receive RTP data and parse it if c.streamProtocol == _STREAM_PROTOCOL_TCP { + frame := &gortsplib.InterleavedFrame{} for { - frame, err := c.conn.ReadInterleavedFrame() + if !c.readCurBuf { + frame.Content = c.readBuf1 + } else { + frame.Content = c.readBuf2 + } + + frame.Content = frame.Content[:cap(frame.Content)] + c.readCurBuf = !c.readCurBuf + + err := c.conn.ReadInterleavedFrame(frame) if err != nil { if err != io.EOF { c.log("ERR: %s", err) diff --git a/server-udpl.go b/server-udpl.go index d8fbfa23..cf135262 100644 --- a/server-udpl.go +++ b/server-udpl.go @@ -14,9 +14,15 @@ type serverUdpListener struct { p *program nconn *net.UDPConn trackFlowType trackFlowType + readBuf1 []byte + readBuf2 []byte + readCurBuf bool + writeBuf1 []byte + writeBuf2 []byte + writeCurBuf bool - write chan *udpWrite - done chan struct{} + writec chan *udpWrite + done chan struct{} } func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*serverUdpListener, error) { @@ -31,7 +37,11 @@ func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*s p: p, nconn: nconn, trackFlowType: trackFlowType, - write: make(chan *udpWrite), + readBuf1: make([]byte, 2048), + readBuf2: make([]byte, 2048), + writeBuf1: make([]byte, 2048), + writeBuf2: make([]byte, 2048), + writec: make(chan *udpWrite), done: make(chan struct{}), } @@ -51,17 +61,21 @@ func (l *serverUdpListener) log(format string, args ...interface{}) { func (l *serverUdpListener) run() { go func() { - for w := range l.write { + for w := range l.writec { l.nconn.SetWriteDeadline(time.Now().Add(l.p.args.writeTimeout)) l.nconn.WriteTo(w.buf, w.addr) } }() for { - // create a buffer for each read. - // this is necessary since the buffer is propagated with channels - // so it must be unique. - buf := make([]byte, 2048) // UDP MTU is 1400 + var buf []byte + if !l.readCurBuf { + buf = l.readBuf1 + } else { + buf = l.readBuf2 + } + l.readCurBuf = !l.readCurBuf + n, addr, err := l.nconn.ReadFromUDP(buf) if err != nil { break @@ -74,7 +88,7 @@ func (l *serverUdpListener) run() { } } - close(l.write) + close(l.writec) close(l.done) } @@ -83,3 +97,21 @@ func (l *serverUdpListener) close() { l.nconn.Close() <-l.done } + +func (l *serverUdpListener) write(addr *net.UDPAddr, inbuf []byte) { + var buf []byte + if !l.writeCurBuf { + buf = l.writeBuf1 + } else { + buf = l.writeBuf2 + } + + buf = buf[:len(inbuf)] + copy(buf, inbuf) + l.writeCurBuf = !l.writeCurBuf + + l.writec <- &udpWrite{ + addr: addr, + buf: buf, + } +}