support clients that don't specify track ID, like tvheadend (#155)

This commit is contained in:
aler9 2021-01-20 22:02:09 +01:00
parent b6277dc7cf
commit cc703fe5c5
5 changed files with 87 additions and 71 deletions

View File

@ -10,7 +10,7 @@ _rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server a
Features: Features:
* Read and publish live streams with UDP and TCP * Read and publish live streams with UDP and TCP
* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MP3, AAC, Opus, PCM) * Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM)
* Serve multiple streams at once in separate paths * Serve multiple streams at once in separate paths
* Encrypt streams with TLS (RTSPS) * Encrypt streams with TLS (RTSPS)
* Pull and serve streams from other RTSP or RTMP servers, always or on-demand (RTSP proxy) * Pull and serve streams from other RTSP or RTMP servers, always or on-demand (RTSP proxy)

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.15
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210110113516-c3805aadc400 github.com/aler9/gortsplib v0.0.0-20210119134354-d9b819e85e9b
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210110113516-c3805aadc400 h1:mRaK+tJuQQ39M6poSfluxSu94kdqfzMrIi24+zacWeQ= github.com/aler9/gortsplib v0.0.0-20210119134354-d9b819e85e9b h1:gNkdUoWMsgM9URQIWPF8fUgShKiSxteG463yNWZbIiA=
github.com/aler9/gortsplib v0.0.0-20210110113516-c3805aadc400/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I= github.com/aler9/gortsplib v0.0.0-20210119134354-d9b819e85e9b/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@ -158,16 +158,16 @@ func (c *Client) run() {
} }
onDescribe := func(req *base.Request) (*base.Response, error) { onDescribe := func(req *base.Request) (*base.Response, error) {
basePath, ok := req.URL.BasePath() reqPath, ok := req.URL.RTSPPath()
if !ok { if !ok {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find base path (%s)", req.URL) }, fmt.Errorf("invalid path (%s)", req.URL)
} }
c.describeData = make(chan describeData) c.describeData = make(chan describeData)
path, err := c.parent.OnClientDescribe(c, basePath, req) path, err := c.parent.OnClientDescribe(c, reqPath, req)
if err != nil { if err != nil {
switch terr := err.(type) { switch terr := err.(type) {
case errAuthNotCritical: case errAuthNotCritical:
@ -199,7 +199,7 @@ func (c *Client) run() {
c.path = nil c.path = nil
if res.err != nil { if res.err != nil {
c.log(logger.Info, "no one is publishing to path '%s'", basePath) c.log(logger.Info, "no one is publishing to path '%s'", reqPath)
return &base.Response{ return &base.Response{
StatusCode: base.StatusNotFound, StatusCode: base.StatusNotFound,
}, nil }, nil
@ -242,14 +242,14 @@ func (c *Client) run() {
} }
onAnnounce := func(req *base.Request, tracks gortsplib.Tracks) (*base.Response, error) { onAnnounce := func(req *base.Request, tracks gortsplib.Tracks) (*base.Response, error) {
basePath, ok := req.URL.BasePath() reqPath, ok := req.URL.RTSPPath()
if !ok { if !ok {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find base path (%s)", req.URL) }, fmt.Errorf("invalid path (%s)", req.URL)
} }
path, err := c.parent.OnClientAnnounce(c, basePath, tracks, req) path, err := c.parent.OnClientAnnounce(c, reqPath, tracks, req)
if err != nil { if err != nil {
switch terr := err.(type) { switch terr := err.(type) {
case errAuthNotCritical: case errAuthNotCritical:
@ -281,19 +281,28 @@ func (c *Client) run() {
} }
onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) {
basePath, _, ok := req.URL.BasePathControlAttr()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find control attribute (%s)", req.URL)
}
switch c.conn.State() { switch c.conn.State() {
case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play
if c.path != nil && basePath != c.path.Name() { pathAndQuery, ok := req.URL.RTSPPathAndQuery()
if !ok {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath) }, fmt.Errorf("invalid path (%s)", req.URL)
}
_, pathAndQuery, ok = base.PathSplitControlAttribute(pathAndQuery)
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid path (%s)", req.URL)
}
reqPath, _ := base.PathSplitQuery(pathAndQuery)
if c.path != nil && reqPath != c.path.Name() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
} }
// play with UDP // play with UDP
@ -304,7 +313,7 @@ func (c *Client) run() {
}, nil }, nil
} }
path, err := c.parent.OnClientSetupPlay(c, basePath, trackID, req) path, err := c.parent.OnClientSetupPlay(c, reqPath, trackID, req)
if err != nil { if err != nil {
switch terr := err.(type) { switch terr := err.(type) {
case errAuthNotCritical: case errAuthNotCritical:
@ -346,7 +355,7 @@ func (c *Client) run() {
}, nil }, nil
} }
path, err := c.parent.OnClientSetupPlay(c, basePath, trackID, req) path, err := c.parent.OnClientSetupPlay(c, reqPath, trackID, req)
if err != nil { if err != nil {
switch terr := err.(type) { switch terr := err.(type) {
case errAuthNotCritical: case errAuthNotCritical:
@ -380,11 +389,18 @@ func (c *Client) run() {
}, nil }, nil
default: // record default: // record
// after ANNOUNCE, c.path is already set reqPathAndQuery, ok := req.URL.RTSPPathAndQuery()
if basePath != c.path.Name() { if !ok {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath) }, fmt.Errorf("invalid path (%s)", req.URL)
}
if !strings.HasPrefix(reqPathAndQuery, c.path.Name()) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid path: must begin with '%s', but is '%s'",
c.path.Name(), reqPathAndQuery)
} }
// record with UDP // record with UDP
@ -395,18 +411,6 @@ func (c *Client) run() {
}, nil }, nil
} }
if th.ClientPorts == nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"])
}
if c.conn.TracksLen() >= c.path.SourceTrackCount() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("all the tracks have already been setup")
}
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
Header: base.Header{ Header: base.Header{
@ -423,12 +427,6 @@ func (c *Client) run() {
}, nil }, nil
} }
if c.conn.TracksLen() >= c.path.SourceTrackCount() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("all the tracks have already been setup")
}
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
Header: base.Header{ Header: base.Header{
@ -440,20 +438,20 @@ func (c *Client) run() {
onPlay := func(req *base.Request) (*base.Response, error) { onPlay := func(req *base.Request) (*base.Response, error) {
if c.conn.State() == gortsplib.ServerConnStatePrePlay { if c.conn.State() == gortsplib.ServerConnStatePrePlay {
basePath, ok := req.URL.BasePath() reqPath, ok := req.URL.RTSPPath()
if !ok { if !ok {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find base path (%s)", req.URL) }, fmt.Errorf("invalid path (%s)", req.URL)
} }
// path can end with a slash, remove it // path can end with a slash, remove it
basePath = strings.TrimSuffix(basePath, "/") reqPath = strings.TrimSuffix(reqPath, "/")
if basePath != c.path.Name() { if reqPath != c.path.Name() {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath) }, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
} }
c.startPlay() c.startPlay()
@ -468,20 +466,20 @@ func (c *Client) run() {
} }
onRecord := func(req *base.Request) (*base.Response, error) { onRecord := func(req *base.Request) (*base.Response, error) {
basePath, ok := req.URL.BasePath() reqPath, ok := req.URL.RTSPPath()
if !ok { if !ok {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find base path (%s)", req.URL) }, fmt.Errorf("invalid path (%s)", req.URL)
} }
// path can end with a slash, remove it // path can end with a slash, remove it
basePath = strings.TrimSuffix(basePath, "/") reqPath = strings.TrimSuffix(reqPath, "/")
if basePath != c.path.Name() { if reqPath != c.path.Name() {
return &base.Response{ return &base.Response{
StatusCode: base.StatusBadRequest, StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath) }, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
} }
c.startRecord() c.startRecord()
@ -592,7 +590,8 @@ func (errAuthCritical) Error() string {
} }
// Authenticate performs an authentication. // Authenticate performs an authentication.
func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{}, user string, pass string, req *base.Request) error { func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{},
user string, pass string, req *base.Request, altURL *base.URL) error {
// validate ip // validate ip
if ips != nil { if ips != nil {
ip := c.ip() ip := c.ip()
@ -615,7 +614,8 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
c.authValidator = auth.NewValidator(user, pass, authMethods) c.authValidator = auth.NewValidator(user, pass, authMethods)
} }
err := c.authValidator.ValidateHeader(req.Header["Authorization"], req.Method, req.URL) err := c.authValidator.ValidateHeader(req.Header["Authorization"],
req.Method, req.URL, altURL)
if err != nil { if err != nil {
c.authFailures++ c.authFailures++
@ -658,12 +658,15 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
func (c *Client) startPlay() { func (c *Client) startPlay() {
c.path.OnClientPlay(c) c.path.OnClientPlay(c)
c.log(logger.Info, "is reading from path '%s', %d %s with %s", c.path.Name(), c.conn.TracksLen(), func() string { c.log(logger.Info, "is reading from path '%s', %d %s with %s", c.path.Name(),
if c.conn.TracksLen() == 1 { c.conn.SetuppedTracksLen(),
return "track" func() string {
} if c.conn.SetuppedTracksLen() == 1 {
return "tracks" return "track"
}(), *c.conn.TracksProtocol()) }
return "tracks"
}(),
*c.conn.SetuppedTracksProtocol())
if c.path.Conf().RunOnRead != "" { if c.path.Conf().RunOnRead != "" {
c.onReadCmd = externalcmd.New(c.path.Conf().RunOnRead, c.path.Conf().RunOnReadRestart, externalcmd.Environment{ c.onReadCmd = externalcmd.New(c.path.Conf().RunOnRead, c.path.Conf().RunOnReadRestart, externalcmd.Environment{
@ -682,12 +685,15 @@ func (c *Client) stopPlay() {
func (c *Client) startRecord() { func (c *Client) startRecord() {
c.path.OnClientRecord(c) c.path.OnClientRecord(c)
c.log(logger.Info, "is publishing to path '%s', %d %s with %s", c.path.Name(), c.conn.TracksLen(), func() string { c.log(logger.Info, "is publishing to path '%s', %d %s with %s", c.path.Name(),
if c.conn.TracksLen() == 1 { c.conn.SetuppedTracksLen(),
return "track" func() string {
} if c.conn.SetuppedTracksLen() == 1 {
return "tracks" return "track"
}(), *c.conn.TracksProtocol()) }
return "tracks"
}(),
*c.conn.SetuppedTracksProtocol())
if c.path.Conf().RunOnPublish != "" { if c.path.Conf().RunOnPublish != "" {
c.onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{ c.onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{
@ -705,7 +711,7 @@ func (c *Client) stopRecord() {
// OnReaderFrame implements path.Reader. // OnReaderFrame implements path.Reader.
func (c *Client) OnReaderFrame(trackID int, streamType base.StreamType, buf []byte) { func (c *Client) OnReaderFrame(trackID int, streamType base.StreamType, buf []byte) {
if !c.conn.HasTrack(trackID) { if !c.conn.HasSetuppedTrack(trackID) {
return return
} }

View File

@ -155,7 +155,7 @@ outer:
} }
err = req.Client.Authenticate(pm.authMethods, pathConf.ReadIpsParsed, err = req.Client.Authenticate(pm.authMethods, pathConf.ReadIpsParsed,
pathConf.ReadUser, pathConf.ReadPass, req.Req) pathConf.ReadUser, pathConf.ReadPass, req.Req, nil)
if err != nil { if err != nil {
req.Res <- path.ClientDescribeRes{nil, err} //nolint:govet req.Res <- path.ClientDescribeRes{nil, err} //nolint:govet
continue continue
@ -187,7 +187,8 @@ outer:
} }
err = req.Client.Authenticate(pm.authMethods, err = req.Client.Authenticate(pm.authMethods,
pathConf.PublishIpsParsed, pathConf.PublishUser, pathConf.PublishPass, req.Req) pathConf.PublishIpsParsed, pathConf.PublishUser,
pathConf.PublishPass, req.Req, nil)
if err != nil { if err != nil {
req.Res <- path.ClientAnnounceRes{nil, err} //nolint:govet req.Res <- path.ClientAnnounceRes{nil, err} //nolint:govet
continue continue
@ -223,8 +224,17 @@ outer:
continue continue
} }
// VLC strips the control attribute
// provide an alternative URL without the control attribute
altURL := &base.URL{
Scheme: req.Req.URL.Scheme,
Host: req.Req.URL.Host,
Path: "/" + req.PathName + "/",
}
err = req.Client.Authenticate(pm.authMethods, err = req.Client.Authenticate(pm.authMethods,
pathConf.ReadIpsParsed, pathConf.ReadUser, pathConf.ReadPass, req.Req) pathConf.ReadIpsParsed, pathConf.ReadUser, pathConf.ReadPass,
req.Req, altURL)
if err != nil { if err != nil {
req.Res <- path.ClientSetupPlayRes{nil, err} //nolint:govet req.Res <- path.ClientSetupPlayRes{nil, err} //nolint:govet
continue continue