update gortsplib

This commit is contained in:
aler9 2020-12-13 13:58:56 +01:00
parent 40c2eb3da2
commit 854f00afdc
3 changed files with 169 additions and 226 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20201212222949-4c942d33fed8
github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20201212222949-4c942d33fed8 h1:nDEZtoFBPDPgu9wxujoTEmMXFNTg+d0ATYKSgGHtsgE=
github.com/aler9/gortsplib v0.0.0-20201212222949-4c942d33fed8/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c h1:NEqO8o8hNAEeYB7OZnkT2k9QVLcBlOZXBLZYQeE1aRg=
github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@ -200,181 +200,15 @@ func (c *Client) run() {
defer onConnectCmd.Close()
}
readDone := c.conn.Read(c.onRequest, c.onFrame)
select {
case err := <-readDone:
c.conn.Close()
if err != io.EOF && err != errTerminated {
c.log(logger.Info, "ERR: %s", err)
}
switch c.state {
case statePlay:
c.stopPlay()
case stateRecord:
c.stopRecord()
}
if c.path != nil {
c.path.OnClientRemove(c)
c.path = nil
}
c.parent.OnClientClose(c)
<-c.terminate
case <-c.terminate:
c.conn.Close()
<-readDone
switch c.state {
case statePlay:
c.stopPlay()
case stateRecord:
c.stopRecord()
}
if c.path != nil {
c.path.OnClientRemove(c)
c.path = nil
}
}
}
type errAuthNotCritical struct {
*base.Response
}
func (errAuthNotCritical) Error() string {
return "auth not critical"
}
type errAuthCritical struct {
*base.Response
}
func (errAuthCritical) Error() string {
return "auth critical"
}
// Authenticate performs an authentication.
func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{}, user string, pass string, req *base.Request) error {
// validate ip
if ips != nil {
ip := c.ip()
if !ipEqualOrInRange(ip, ips) {
c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
return errAuthCritical{&base.Response{
StatusCode: base.StatusUnauthorized,
}}
}
onRequest := func(req *base.Request) {
c.log(logger.Debug, "[c->s] %v", req)
}
// validate user
if user != "" {
// reset authHelper every time the credentials change
if c.authHelper == nil || c.authUser != user || c.authPass != pass {
c.authUser = user
c.authPass = pass
c.authHelper = auth.NewServer(user, pass, authMethods)
}
err := c.authHelper.ValidateHeader(req.Header["Authorization"], req.Method, req.URL)
if err != nil {
c.authFailures++
// vlc with login prompt sends 4 requests:
// 1) without credentials
// 2) with password but without username
// 3) without credentials
// 4) with password and username
// therefore we must allow up to 3 failures
if c.authFailures > 3 {
c.log(logger.Info, "ERR: unauthorized: %s", err)
return errAuthCritical{&base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authHelper.GenerateHeader(),
},
}}
}
if c.authFailures > 1 {
c.log(logger.Debug, "WARN: unauthorized: %s", err)
}
return errAuthNotCritical{&base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authHelper.GenerateHeader(),
},
}}
}
onResponse := func(res *base.Response) {
c.log(logger.Debug, "[s->c] %v", res)
}
// login successful, reset authFailures
c.authFailures = 0
return nil
}
func (c *Client) checkState(allowed map[state]struct{}) error {
if _, ok := allowed[c.state]; ok {
return nil
}
var allowedList []state
for s := range allowed {
allowedList = append(allowedList, s)
}
return fmt.Errorf("client must be in state %v, while is in state %v",
allowedList, c.state)
}
func (c *Client) onRequest(req *base.Request) (*base.Response, error) {
c.log(logger.Debug, "[c->s] %v", req)
res, err := c.onRequestInner(req)
c.log(logger.Debug, "[s->c] %v", res)
return res, err
}
func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) {
switch req.Method {
case base.Options:
return &base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.GetParameter),
string(base.Describe),
string(base.Announce),
string(base.Setup),
string(base.Play),
string(base.Record),
string(base.Pause),
string(base.Teardown),
}, ", ")},
},
}, nil
// GET_PARAMETER is used like a ping
case base.GetParameter:
return &base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"text/parameters"},
},
Content: []byte("\n"),
}, nil
case base.Describe:
onDescribe := func(req *base.Request) (*base.Response, error) {
err := c.checkState(map[state]struct{}{
stateInitial: {},
})
@ -457,8 +291,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) {
StatusCode: base.StatusBadRequest,
}, errTerminated
}
}
case base.Announce:
onAnnounce := func(req *base.Request, tracks gortsplib.Tracks) (*base.Response, error) {
err := c.checkState(map[state]struct{}{
stateInitial: {},
})
@ -475,32 +310,6 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) {
}, fmt.Errorf("unable to find base path (%s)", req.URL)
}
ct, ok := req.Header["Content-Type"]
if !ok || len(ct) != 1 {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("Content-Type header missing")
}
if ct[0] != "application/sdp" {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unsupported Content-Type '%s'", ct)
}
tracks, err := gortsplib.ReadTracks(req.Content)
if err != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid SDP: %s", err)
}
if len(tracks) == 0 {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("no tracks defined")
}
path, err := c.parent.OnClientAnnounce(c, basePath, tracks, req)
if err != nil {
switch terr := err.(type) {
@ -528,15 +337,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}
case base.Setup:
th, err := headers.ReadTransport(req.Header["Transport"])
if err != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("transport header: %s", err)
}
onSetup := func(req *base.Request, th *headers.Transport) (*base.Response, error) {
if th.Delivery != nil && *th.Delivery == base.StreamDeliveryMulticast {
return &base.Response{
StatusCode: base.StatusBadRequest,
@ -828,8 +631,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) {
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("client is in state '%s'", c.state)
}
}
case base.Play:
onPlay := func(req *base.Request) (*base.Response, error) {
// play can be sent twice, allow calling it even if we're already playing
err := c.checkState(map[state]struct{}{
statePrePlay: {},
@ -873,8 +677,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) {
"Session": base.HeaderValue{sessionID},
},
}, nil
}
case base.Record:
onRecord := func(req *base.Request) (*base.Response, error) {
err := c.checkState(map[state]struct{}{
statePreRecord: {},
})
@ -914,8 +719,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) {
"Session": base.HeaderValue{sessionID},
},
}, nil
}
case base.Pause:
onPause := func(req *base.Request) (*base.Response, error) {
err := c.checkState(map[state]struct{}{
statePrePlay: {},
statePlay: {},
@ -944,28 +750,165 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) {
"Session": base.HeaderValue{sessionID},
},
}, nil
}
case base.Teardown:
return &base.Response{
StatusCode: base.StatusOK,
}, errTerminated
onFrame := func(trackID int, streamType gortsplib.StreamType, content []byte) {
if c.state == stateRecord {
if trackID >= len(c.streamTracks) {
return
}
default:
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unhandled method '%s'", req.Method)
c.rtcpReceivers[trackID].ProcessFrame(time.Now(), streamType, content)
c.path.OnFrame(trackID, streamType, content)
}
}
readDone := c.conn.Read(gortsplib.ServerConnReadHandlers{
OnRequest: onRequest,
OnResponse: onResponse,
OnDescribe: onDescribe,
OnAnnounce: onAnnounce,
OnSetup: onSetup,
OnPlay: onPlay,
OnRecord: onRecord,
OnPause: onPause,
OnFrame: onFrame,
})
select {
case err := <-readDone:
c.conn.Close()
if err != io.EOF && err != errTerminated {
c.log(logger.Info, "ERR: %s", err)
}
switch c.state {
case statePlay:
c.stopPlay()
case stateRecord:
c.stopRecord()
}
if c.path != nil {
c.path.OnClientRemove(c)
c.path = nil
}
c.parent.OnClientClose(c)
<-c.terminate
case <-c.terminate:
c.conn.Close()
<-readDone
switch c.state {
case statePlay:
c.stopPlay()
case stateRecord:
c.stopRecord()
}
if c.path != nil {
c.path.OnClientRemove(c)
c.path = nil
}
}
}
func (c *Client) onFrame(trackID int, streamType gortsplib.StreamType, content []byte) {
if c.state == stateRecord {
if trackID >= len(c.streamTracks) {
return
type errAuthNotCritical struct {
*base.Response
}
func (errAuthNotCritical) Error() string {
return "auth not critical"
}
type errAuthCritical struct {
*base.Response
}
func (errAuthCritical) Error() string {
return "auth critical"
}
// Authenticate performs an authentication.
func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{}, user string, pass string, req *base.Request) error {
// validate ip
if ips != nil {
ip := c.ip()
if !ipEqualOrInRange(ip, ips) {
c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
return errAuthCritical{&base.Response{
StatusCode: base.StatusUnauthorized,
}}
}
}
// validate user
if user != "" {
// reset authHelper every time the credentials change
if c.authHelper == nil || c.authUser != user || c.authPass != pass {
c.authUser = user
c.authPass = pass
c.authHelper = auth.NewServer(user, pass, authMethods)
}
c.rtcpReceivers[trackID].ProcessFrame(time.Now(), streamType, content)
c.path.OnFrame(trackID, streamType, content)
err := c.authHelper.ValidateHeader(req.Header["Authorization"], req.Method, req.URL)
if err != nil {
c.authFailures++
// vlc with login prompt sends 4 requests:
// 1) without credentials
// 2) with password but without username
// 3) without credentials
// 4) with password and username
// therefore we must allow up to 3 failures
if c.authFailures > 3 {
c.log(logger.Info, "ERR: unauthorized: %s", err)
return errAuthCritical{&base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authHelper.GenerateHeader(),
},
}}
}
if c.authFailures > 1 {
c.log(logger.Debug, "WARN: unauthorized: %s", err)
}
return errAuthNotCritical{&base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authHelper.GenerateHeader(),
},
}}
}
}
// login successful, reset authFailures
c.authFailures = 0
return nil
}
func (c *Client) checkState(allowed map[state]struct{}) error {
if _, ok := allowed[c.state]; ok {
return nil
}
var allowedList []state
for s := range allowed {
allowedList = append(allowedList, s)
}
return fmt.Errorf("client must be in state %v, while is in state %v",
allowedList, c.state)
}
func (c *Client) startPlay() {