rtmp client: speed up acceptance of clients by moving handshake inside client routine

This commit is contained in:
aler9 2021-04-03 12:08:07 +02:00
parent 897322e3a6
commit 99a07c0d33
7 changed files with 55 additions and 33 deletions

2
go.mod
View File

@ -17,4 +17,4 @@ require (
gopkg.in/yaml.v2 v2.2.8
)
replace github.com/notedit/rtmp => github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d
replace github.com/notedit/rtmp => github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927

4
go.sum
View File

@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210402161256-ab6826e6a7ce h1:Fv9ds+JR8aE3EHJG6kWK2UyfqvPTg6WuYg6nAN9t77A=
github.com/aler9/gortsplib v0.0.0-20210402161256-ab6826e6a7ce/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@ -169,6 +169,17 @@ func (c *Client) run() {
defer onConnectCmd.Close()
}
c.conn.NetConn().SetDeadline(time.Now().Add(c.readTimeout))
err := c.conn.ServerHandshake()
if err != nil {
c.log(logger.Info, "ERR: %s", err)
c.conn.NetConn().Close()
c.parent.OnClientClose(c)
<-c.terminate
return
}
if c.conn.IsPublishing() {
c.runPublish()
} else {

View File

@ -11,5 +11,8 @@ func Dial(address string) (*Conn, error) {
return nil, err
}
return NewConn(rconn, nconn), nil
return &Conn{
rconn: rconn,
nconn: nconn,
}, nil
}

View File

@ -8,20 +8,12 @@ import (
"github.com/notedit/rtmp/format/rtmp"
)
// Conn contains a RTMP connection and a net connection.
// Conn is a RTMP connection.
type Conn struct {
rconn *rtmp.Conn
nconn net.Conn
}
// NewConn allocates a Conn.
func NewConn(rconn *rtmp.Conn, nconn net.Conn) *Conn {
return &Conn{
rconn: rconn,
nconn: nconn,
}
}
// NetConn returns the underlying net.Conn.
func (c *Conn) NetConn() net.Conn {
return c.nconn

29
internal/rtmp/server.go Normal file
View File

@ -0,0 +1,29 @@
package rtmp
import (
"bufio"
"net"
"github.com/notedit/rtmp/format/rtmp"
)
// NewServerConn initializes a server-side connection.
func NewServerConn(nconn net.Conn) *Conn {
// https://github.com/aler9/rtmp/blob/master/format/rtmp/server.go#L46
rw := &bufio.ReadWriter{
Reader: bufio.NewReaderSize(nconn, 4096),
Writer: bufio.NewWriterSize(nconn, 4096),
}
c := rtmp.NewConn(rw)
c.IsServer = true
return &Conn{
rconn: c,
nconn: nconn,
}
}
// ServerHandshake performs the handshake of a server-side connection.
func (c *Conn) ServerHandshake() error {
return c.rconn.Prepare(rtmp.StageGotPublishOrPlayCommand, 0)
}

View File

@ -3,11 +3,8 @@ package serverrtmp
import (
"net"
"strconv"
"sync"
"sync/atomic"
nrtmp "github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtmp"
)
@ -22,11 +19,11 @@ type Server struct {
parent Parent
l net.Listener
srv *nrtmp.Server
closed uint32
wg sync.WaitGroup
// out
accept chan *rtmp.Conn
done chan struct{}
}
// New allocates a Server.
@ -45,14 +42,11 @@ func New(
parent: parent,
l: l,
accept: make(chan *rtmp.Conn),
done: make(chan struct{}),
}
s.srv = nrtmp.NewServer()
s.srv.HandleConn = s.innerHandleConn
s.log(logger.Info, "opened on %s", address)
s.wg.Add(1)
go s.run()
return s, nil
@ -71,12 +65,11 @@ func (s *Server) Close() {
}()
atomic.StoreUint32(&s.closed, 1)
s.l.Close()
s.wg.Wait()
close(s.accept)
<-s.done
}
func (s *Server) run() {
defer s.wg.Done()
defer close(s.done)
for {
nconn, err := s.l.Accept()
@ -88,16 +81,10 @@ func (s *Server) run() {
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.srv.HandleNetConn(nconn)
}()
s.accept <- rtmp.NewServerConn(nconn)
}
}
func (s *Server) innerHandleConn(rconn *nrtmp.Conn, nconn net.Conn) {
s.accept <- rtmp.NewConn(rconn, nconn)
close(s.accept)
}
// Accept returns a channel to accept incoming connections.