improve performance by using static buffers instead of make()

This commit is contained in:
aler9 2020-06-27 16:19:02 +02:00
parent 05a35ee545
commit 0219d24e99
5 changed files with 108 additions and 47 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.13
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200614194126-1173d41d7898
github.com/aler9/gortsplib v0.0.0-20200627140616-6bbbfefb75c8
github.com/stretchr/testify v1.4.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gortc.io/sdp v0.18.2

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200614194126-1173d41d7898 h1:ssMA0uv5d6RQQ6uZBHJklnOG0210xKboDpfVKkjmexY=
github.com/aler9/gortsplib v0.0.0-20200614194126-1173d41d7898/go.mod h1:sL64nUkmrTVhlT/GCaxRXyI2Xk7m8XSdw5Uv8xKGPdc=
github.com/aler9/gortsplib v0.0.0-20200627140616-6bbbfefb75c8 h1:uWmnp0579ky3QLU3cwo+WuBX+S13NC1Jx4aswCJ3Mws=
github.com/aler9/gortsplib v0.0.0-20200627140616-6bbbfefb75c8/go.mod h1:sL64nUkmrTVhlT/GCaxRXyI2Xk7m8XSdw5Uv8xKGPdc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

33
main.go
View File

@ -11,7 +11,6 @@ import (
"strings"
"time"
"github.com/aler9/gortsplib"
"gopkg.in/alecthomas/kingpin.v2"
"gortc.io/sdp"
)
@ -586,30 +585,22 @@ func (p *program) forwardTrack(path string, id int, trackFlowType trackFlowType,
if c.path == path && c.state == _CLIENT_STATE_PLAY {
if c.streamProtocol == _STREAM_PROTOCOL_UDP {
if trackFlowType == _TRACK_FLOW_RTP {
p.udplRtp.write <- &udpWrite{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[id].rtpPort,
},
buf: frame,
}
p.udplRtp.write(&net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[id].rtpPort,
}, frame)
} else {
p.udplRtcp.write <- &udpWrite{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[id].rtcpPort,
},
buf: frame,
}
p.udplRtcp.write(&net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[id].rtcpPort,
}, frame)
}
} else {
c.write <- &gortsplib.InterleavedFrame{
Channel: trackToInterleavedChannel(id, trackFlowType),
Content: frame,
}
c.writeFrame(trackToInterleavedChannel(id, trackFlowType), frame)
}
}
}

View File

@ -79,9 +79,15 @@ type serverClient struct {
streamTracks []*track
udpLastFrameTime time.Time
udpCheckStreamTicker *time.Ticker
readBuf1 []byte
readBuf2 []byte
readCurBuf bool
writeBuf1 []byte
writeBuf2 []byte
writeCurBuf bool
write chan *gortsplib.InterleavedFrame
done chan struct{}
writec chan *gortsplib.InterleavedFrame
done chan struct{}
}
func newServerClient(p *program, nconn net.Conn) *serverClient {
@ -92,20 +98,19 @@ func newServerClient(p *program, nconn net.Conn) *serverClient {
ReadTimeout: p.args.readTimeout,
WriteTimeout: p.args.writeTimeout,
}),
state: _CLIENT_STATE_STARTING,
write: make(chan *gortsplib.InterleavedFrame),
done: make(chan struct{}),
state: _CLIENT_STATE_STARTING,
readBuf1: make([]byte, 0, 512*1024),
readBuf2: make([]byte, 0, 512*1024),
writeBuf1: make([]byte, 2048),
writeBuf2: make([]byte, 2048),
writec: make(chan *gortsplib.InterleavedFrame),
done: make(chan struct{}),
}
go c.run()
return c
}
func (c *serverClient) close() {
c.conn.NetConn().Close()
<-c.done
}
func (c *serverClient) log(format string, args ...interface{}) {
c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
}
@ -147,7 +152,7 @@ func (c *serverClient) run() {
}
go func() {
for range c.write {
for range c.writec {
}
}()
@ -165,11 +170,34 @@ func (c *serverClient) run() {
c.p.events <- programEventClientClose{done, c}
<-done
close(c.write)
close(c.writec)
close(c.done)
}
func (c *serverClient) close() {
c.conn.NetConn().Close()
<-c.done
}
func (c *serverClient) writeFrame(channel uint8, inbuf []byte) {
var buf []byte
if !c.writeCurBuf {
buf = c.writeBuf1
} else {
buf = c.writeBuf2
}
buf = buf[:len(inbuf)]
copy(buf, inbuf)
c.writeCurBuf = !c.writeCurBuf
c.writec <- &gortsplib.InterleavedFrame{
Channel: channel,
Content: buf,
}
}
func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) {
c.log("ERR: %s", err)
@ -696,7 +724,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
if c.streamProtocol == _STREAM_PROTOCOL_TCP {
// write RTP frames sequentially
go func() {
for frame := range c.write {
for frame := range c.writec {
c.conn.WriteInterleavedFrame(frame)
}
}()
@ -782,8 +810,18 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
// when protocol is TCP, the RTSP connection becomes a RTP connection
// receive RTP data and parse it
if c.streamProtocol == _STREAM_PROTOCOL_TCP {
frame := &gortsplib.InterleavedFrame{}
for {
frame, err := c.conn.ReadInterleavedFrame()
if !c.readCurBuf {
frame.Content = c.readBuf1
} else {
frame.Content = c.readBuf2
}
frame.Content = frame.Content[:cap(frame.Content)]
c.readCurBuf = !c.readCurBuf
err := c.conn.ReadInterleavedFrame(frame)
if err != nil {
if err != io.EOF {
c.log("ERR: %s", err)

View File

@ -14,9 +14,15 @@ type serverUdpListener struct {
p *program
nconn *net.UDPConn
trackFlowType trackFlowType
readBuf1 []byte
readBuf2 []byte
readCurBuf bool
writeBuf1 []byte
writeBuf2 []byte
writeCurBuf bool
write chan *udpWrite
done chan struct{}
writec chan *udpWrite
done chan struct{}
}
func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*serverUdpListener, error) {
@ -31,7 +37,11 @@ func newServerUdpListener(p *program, port int, trackFlowType trackFlowType) (*s
p: p,
nconn: nconn,
trackFlowType: trackFlowType,
write: make(chan *udpWrite),
readBuf1: make([]byte, 2048),
readBuf2: make([]byte, 2048),
writeBuf1: make([]byte, 2048),
writeBuf2: make([]byte, 2048),
writec: make(chan *udpWrite),
done: make(chan struct{}),
}
@ -51,17 +61,21 @@ func (l *serverUdpListener) log(format string, args ...interface{}) {
func (l *serverUdpListener) run() {
go func() {
for w := range l.write {
for w := range l.writec {
l.nconn.SetWriteDeadline(time.Now().Add(l.p.args.writeTimeout))
l.nconn.WriteTo(w.buf, w.addr)
}
}()
for {
// create a buffer for each read.
// this is necessary since the buffer is propagated with channels
// so it must be unique.
buf := make([]byte, 2048) // UDP MTU is 1400
var buf []byte
if !l.readCurBuf {
buf = l.readBuf1
} else {
buf = l.readBuf2
}
l.readCurBuf = !l.readCurBuf
n, addr, err := l.nconn.ReadFromUDP(buf)
if err != nil {
break
@ -74,7 +88,7 @@ func (l *serverUdpListener) run() {
}
}
close(l.write)
close(l.writec)
close(l.done)
}
@ -83,3 +97,21 @@ func (l *serverUdpListener) close() {
l.nconn.Close()
<-l.done
}
func (l *serverUdpListener) write(addr *net.UDPAddr, inbuf []byte) {
var buf []byte
if !l.writeCurBuf {
buf = l.writeBuf1
} else {
buf = l.writeBuf2
}
buf = buf[:len(inbuf)]
copy(buf, inbuf)
l.writeCurBuf = !l.writeCurBuf
l.writec <- &udpWrite{
addr: addr,
buf: buf,
}
}