mirror of
https://github.com/bluenviron/mediamtx
synced 2025-01-19 05:21:09 +00:00
start runOnDemand even with clients that don't send DESCRIBE (#155)
This commit is contained in:
parent
40f2d5cd09
commit
b30dbc1315
@ -27,10 +27,21 @@ const (
|
||||
pauseAfterAuthError = 2 * time.Second
|
||||
)
|
||||
|
||||
// ErrNoOnePublishing is a "no one is publishing" error.
|
||||
type ErrNoOnePublishing struct {
|
||||
PathName string
|
||||
}
|
||||
|
||||
// Error implements the error interface.
|
||||
func (e ErrNoOnePublishing) Error() string {
|
||||
return fmt.Sprintf("no one is publishing to path '%s'", e.PathName)
|
||||
}
|
||||
|
||||
// DescribeRes is a client describe response.
|
||||
type DescribeRes struct {
|
||||
Path Path
|
||||
Err error
|
||||
SDP []byte
|
||||
Redirect string
|
||||
Err error
|
||||
}
|
||||
|
||||
// DescribeReq is a client describe request.
|
||||
@ -95,12 +106,6 @@ type PauseReq struct {
|
||||
Res chan struct{}
|
||||
}
|
||||
|
||||
type describeData struct {
|
||||
sdp []byte
|
||||
redirect string
|
||||
err error
|
||||
}
|
||||
|
||||
// Path is implemented by path.Path.
|
||||
type Path interface {
|
||||
Name() string
|
||||
@ -143,8 +148,7 @@ type Client struct {
|
||||
onPublishCmd *externalcmd.Cmd
|
||||
|
||||
// in
|
||||
describeData chan describeData // from path
|
||||
terminate chan struct{}
|
||||
terminate chan struct{}
|
||||
}
|
||||
|
||||
// New allocates a Client.
|
||||
@ -233,8 +237,6 @@ func (c *Client) run() {
|
||||
}, fmt.Errorf("invalid path (%s)", req.URL)
|
||||
}
|
||||
|
||||
c.describeData = make(chan describeData)
|
||||
|
||||
resc := make(chan DescribeRes)
|
||||
c.parent.OnClientDescribe(DescribeReq{c, reqPath, req, resc})
|
||||
res := <-resc
|
||||
@ -252,6 +254,11 @@ func (c *Client) run() {
|
||||
}
|
||||
return terr.Response, errTerminated
|
||||
|
||||
case ErrNoOnePublishing:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusNotFound,
|
||||
}, res.Err
|
||||
|
||||
default:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
@ -259,58 +266,23 @@ func (c *Client) run() {
|
||||
}
|
||||
}
|
||||
|
||||
c.path = res.Path
|
||||
|
||||
select {
|
||||
case res := <-c.describeData:
|
||||
resc := make(chan struct{})
|
||||
c.path.OnClientRemove(RemoveReq{c, resc})
|
||||
<-resc
|
||||
c.path = nil
|
||||
|
||||
if res.err != nil {
|
||||
c.log(logger.Info, "no one is publishing to path '%s'", reqPath)
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusNotFound,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if res.redirect != "" {
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusMovedPermanently,
|
||||
Header: base.Header{
|
||||
"Location": base.HeaderValue{res.redirect},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
if res.Redirect != "" {
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
StatusCode: base.StatusMovedPermanently,
|
||||
Header: base.Header{
|
||||
"Content-Base": base.HeaderValue{req.URL.String() + "/"},
|
||||
"Content-Type": base.HeaderValue{"application/sdp"},
|
||||
"Location": base.HeaderValue{res.Redirect},
|
||||
},
|
||||
Body: res.sdp,
|
||||
}, nil
|
||||
|
||||
case <-c.terminate:
|
||||
ch := c.describeData
|
||||
go func() {
|
||||
for range ch {
|
||||
}
|
||||
}()
|
||||
|
||||
resc := make(chan struct{})
|
||||
c.path.OnClientRemove(RemoveReq{c, resc})
|
||||
<-resc
|
||||
c.path = nil
|
||||
|
||||
close(c.describeData)
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
}, errTerminated
|
||||
}
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
Header: base.Header{
|
||||
"Content-Base": base.HeaderValue{req.URL.String() + "/"},
|
||||
"Content-Type": base.HeaderValue{"application/sdp"},
|
||||
},
|
||||
Body: res.SDP,
|
||||
}, nil
|
||||
}
|
||||
|
||||
onAnnounce := func(req *base.Request, tracks gortsplib.Tracks) (*base.Response, error) {
|
||||
@ -353,6 +325,20 @@ func (c *Client) run() {
|
||||
}
|
||||
|
||||
onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) {
|
||||
if th.Protocol == gortsplib.StreamProtocolUDP {
|
||||
if _, ok := c.protocols[gortsplib.StreamProtocolUDP]; !ok {
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusUnsupportedTransport,
|
||||
}, nil
|
||||
}
|
||||
} else {
|
||||
if _, ok := c.protocols[gortsplib.StreamProtocolTCP]; !ok {
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusUnsupportedTransport,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
switch c.conn.State() {
|
||||
case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play
|
||||
pathAndQuery, ok := req.URL.RTSPPathAndQuery()
|
||||
@ -377,56 +363,6 @@ func (c *Client) run() {
|
||||
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
|
||||
}
|
||||
|
||||
// play with UDP
|
||||
if th.Protocol == gortsplib.StreamProtocolUDP {
|
||||
if _, ok := c.protocols[gortsplib.StreamProtocolUDP]; !ok {
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusUnsupportedTransport,
|
||||
}, nil
|
||||
}
|
||||
|
||||
resc := make(chan SetupPlayRes)
|
||||
c.parent.OnClientSetupPlay(SetupPlayReq{c, reqPath, trackID, req, resc})
|
||||
res := <-resc
|
||||
|
||||
if res.Err != nil {
|
||||
switch terr := res.Err.(type) {
|
||||
case errAuthNotCritical:
|
||||
return terr.Response, nil
|
||||
|
||||
case errAuthCritical:
|
||||
// wait some seconds to stop brute force attacks
|
||||
select {
|
||||
case <-time.After(pauseAfterAuthError):
|
||||
case <-c.terminate:
|
||||
}
|
||||
return terr.Response, errTerminated
|
||||
|
||||
default:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
}, res.Err
|
||||
}
|
||||
}
|
||||
|
||||
c.path = res.Path
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
Header: base.Header{
|
||||
"Session": base.HeaderValue{sessionID},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// play with TCP
|
||||
|
||||
if _, ok := c.protocols[gortsplib.StreamProtocolTCP]; !ok {
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusUnsupportedTransport,
|
||||
}, nil
|
||||
}
|
||||
|
||||
resc := make(chan SetupPlayRes)
|
||||
c.parent.OnClientSetupPlay(SetupPlayReq{c, reqPath, trackID, req, resc})
|
||||
res := <-resc
|
||||
@ -444,6 +380,11 @@ func (c *Client) run() {
|
||||
}
|
||||
return terr.Response, errTerminated
|
||||
|
||||
case ErrNoOnePublishing:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusNotFound,
|
||||
}, res.Err
|
||||
|
||||
default:
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusBadRequest,
|
||||
@ -453,13 +394,6 @@ func (c *Client) run() {
|
||||
|
||||
c.path = res.Path
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
Header: base.Header{
|
||||
"Session": base.HeaderValue{sessionID},
|
||||
},
|
||||
}, nil
|
||||
|
||||
default: // record
|
||||
reqPathAndQuery, ok := req.URL.RTSPPathAndQuery()
|
||||
if !ok {
|
||||
@ -474,38 +408,14 @@ func (c *Client) run() {
|
||||
}, fmt.Errorf("invalid path: must begin with '%s', but is '%s'",
|
||||
c.path.Name(), reqPathAndQuery)
|
||||
}
|
||||
|
||||
// record with UDP
|
||||
if th.Protocol == gortsplib.StreamProtocolUDP {
|
||||
if _, ok := c.protocols[gortsplib.StreamProtocolUDP]; !ok {
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusUnsupportedTransport,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
Header: base.Header{
|
||||
"Session": base.HeaderValue{sessionID},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// record with TCP
|
||||
|
||||
if _, ok := c.protocols[gortsplib.StreamProtocolTCP]; !ok {
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusUnsupportedTransport,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
Header: base.Header{
|
||||
"Session": base.HeaderValue{sessionID},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &base.Response{
|
||||
StatusCode: base.StatusOK,
|
||||
Header: base.Header{
|
||||
"Session": base.HeaderValue{sessionID},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
onPlay := func(req *base.Request) (*base.Response, error) {
|
||||
@ -801,8 +711,3 @@ func (c *Client) OnReaderFrame(trackID int, streamType base.StreamType, buf []by
|
||||
|
||||
c.conn.WriteFrame(trackID, streamType, buf)
|
||||
}
|
||||
|
||||
// OnPathDescribeData is called by path.Path.
|
||||
func (c *Client) OnPathDescribeData(sdp []byte, redirect string, err error) {
|
||||
c.describeData <- describeData{sdp, redirect, err}
|
||||
}
|
||||
|
@ -57,8 +57,7 @@ func (*sourceRedirect) IsSource() {}
|
||||
type clientState int
|
||||
|
||||
const (
|
||||
clientStateWaitingDescribe clientState = iota
|
||||
clientStatePrePlay
|
||||
clientStatePrePlay clientState = iota
|
||||
clientStatePlay
|
||||
clientStatePreRecord
|
||||
clientStateRecord
|
||||
@ -88,6 +87,8 @@ type Path struct {
|
||||
|
||||
clients map[*client.Client]clientState
|
||||
clientsWg sync.WaitGroup
|
||||
describeRequests []client.DescribeReq
|
||||
setupPlayRequests []client.SetupPlayReq
|
||||
source source
|
||||
sourceTrackCount int
|
||||
sourceSdp []byte
|
||||
@ -196,12 +197,15 @@ outer:
|
||||
for {
|
||||
select {
|
||||
case <-pa.describeTimer.C:
|
||||
for c, state := range pa.clients {
|
||||
if state == clientStateWaitingDescribe {
|
||||
pa.removeClient(c)
|
||||
c.OnPathDescribeData(nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name))
|
||||
}
|
||||
for _, req := range pa.describeRequests {
|
||||
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
|
||||
}
|
||||
pa.describeRequests = nil
|
||||
|
||||
for _, req := range pa.setupPlayRequests {
|
||||
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
|
||||
}
|
||||
pa.setupPlayRequests = nil
|
||||
|
||||
// set state after removeClient(), so schedule* works once
|
||||
pa.sourceState = sourceStateNotReady
|
||||
@ -238,23 +242,10 @@ outer:
|
||||
pa.onSourceSetNotReady()
|
||||
|
||||
case req := <-pa.clientDescribe:
|
||||
if _, ok := pa.clients[req.Client]; ok {
|
||||
req.Res <- client.DescribeRes{nil, fmt.Errorf("already subscribed")} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
// reply immediately
|
||||
req.Res <- client.DescribeRes{pa, nil} //nolint:govet
|
||||
|
||||
pa.onClientDescribe(req.Client)
|
||||
pa.onClientDescribe(req)
|
||||
|
||||
case req := <-pa.clientSetupPlay:
|
||||
err := pa.onClientSetupPlay(req.Client, req.TrackID)
|
||||
if err != nil {
|
||||
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
req.Res <- client.SetupPlayRes{pa, nil} //nolint:govet
|
||||
pa.onClientSetupPlay(req)
|
||||
|
||||
case req := <-pa.clientPlay:
|
||||
pa.onClientPlay(req.Client)
|
||||
@ -317,6 +308,14 @@ outer:
|
||||
pa.onDemandCmd.Close()
|
||||
}
|
||||
|
||||
for _, req := range pa.describeRequests {
|
||||
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
|
||||
}
|
||||
|
||||
for _, req := range pa.setupPlayRequests {
|
||||
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
}
|
||||
|
||||
for c, state := range pa.clients {
|
||||
if state != clientStatePreRemove {
|
||||
switch state {
|
||||
@ -361,7 +360,7 @@ func (pa *Path) exhaustChannels() {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- client.DescribeRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req, ok := <-pa.clientAnnounce:
|
||||
if !ok {
|
||||
@ -485,7 +484,7 @@ func (pa *Path) removeClient(c *client.Client) {
|
||||
|
||||
// close all clients that are reading or waiting to read
|
||||
for oc, state := range pa.clients {
|
||||
if state != clientStatePreRemove && state != clientStateWaitingDescribe {
|
||||
if state != clientStatePreRemove {
|
||||
pa.removeClient(oc)
|
||||
pa.parent.OnPathClientClose(oc)
|
||||
}
|
||||
@ -505,13 +504,15 @@ func (pa *Path) onSourceSetReady() {
|
||||
|
||||
pa.sourceState = sourceStateReady
|
||||
|
||||
// reply to all clients that are waiting for a description
|
||||
for c, state := range pa.clients {
|
||||
if state == clientStateWaitingDescribe {
|
||||
pa.removeClient(c)
|
||||
c.OnPathDescribeData(pa.sourceSdp, "", nil)
|
||||
}
|
||||
for _, req := range pa.describeRequests {
|
||||
req.Res <- client.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet
|
||||
}
|
||||
pa.describeRequests = nil
|
||||
|
||||
for _, req := range pa.setupPlayRequests {
|
||||
pa.onClientSetupPlayPost(req)
|
||||
}
|
||||
pa.setupPlayRequests = nil
|
||||
|
||||
pa.scheduleSourceClose()
|
||||
pa.scheduleRunOnDemandClose()
|
||||
@ -523,9 +524,6 @@ func (pa *Path) onSourceSetNotReady() {
|
||||
|
||||
// close all clients that are reading or waiting to read
|
||||
for c, state := range pa.clients {
|
||||
if state == clientStateWaitingDescribe {
|
||||
panic("not possible")
|
||||
}
|
||||
if c != pa.source && state != clientStatePreRemove {
|
||||
pa.removeClient(c)
|
||||
pa.parent.OnPathClientClose(c)
|
||||
@ -533,21 +531,9 @@ func (pa *Path) onSourceSetNotReady() {
|
||||
}
|
||||
}
|
||||
|
||||
func (pa *Path) onClientDescribe(c *client.Client) {
|
||||
// prevent on-demand source from closing
|
||||
if pa.sourceCloseTimerStarted {
|
||||
pa.sourceCloseTimer = newEmptyTimer()
|
||||
pa.sourceCloseTimerStarted = false
|
||||
}
|
||||
|
||||
// prevent on-demand command from closing
|
||||
if pa.runOnDemandCloseTimerStarted {
|
||||
pa.runOnDemandCloseTimer = newEmptyTimer()
|
||||
pa.runOnDemandCloseTimerStarted = false
|
||||
}
|
||||
|
||||
// start on-demand source
|
||||
func (pa *Path) fixedPublisherStart() {
|
||||
if pa.hasExternalSource() {
|
||||
// start on-demand source
|
||||
if pa.source == nil {
|
||||
pa.startExternalSource()
|
||||
|
||||
@ -555,11 +541,18 @@ func (pa *Path) onClientDescribe(c *client.Client) {
|
||||
pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout)
|
||||
pa.sourceState = sourceStateWaitingDescribe
|
||||
}
|
||||
|
||||
} else {
|
||||
// reset timer
|
||||
if pa.sourceCloseTimerStarted {
|
||||
pa.sourceCloseTimer.Stop()
|
||||
pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// start on-demand command
|
||||
if pa.conf.RunOnDemand != "" {
|
||||
// start on-demand command
|
||||
if pa.onDemandCmd == nil {
|
||||
pa.Log(logger.Info, "on demand command started")
|
||||
pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{
|
||||
@ -571,52 +564,58 @@ func (pa *Path) onClientDescribe(c *client.Client) {
|
||||
pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout)
|
||||
pa.sourceState = sourceStateWaitingDescribe
|
||||
}
|
||||
|
||||
} else {
|
||||
// reset timer
|
||||
if pa.runOnDemandCloseTimerStarted {
|
||||
pa.runOnDemandCloseTimer.Stop()
|
||||
pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pa *Path) onClientDescribe(req client.DescribeReq) {
|
||||
if _, ok := pa.clients[req.Client]; ok {
|
||||
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet
|
||||
return
|
||||
}
|
||||
|
||||
pa.fixedPublisherStart()
|
||||
pa.scheduleClose()
|
||||
|
||||
if _, ok := pa.source.(*sourceRedirect); ok {
|
||||
pa.addClient(c, clientStatePreRemove)
|
||||
pa.removeClient(c)
|
||||
c.OnPathDescribeData(nil, pa.conf.SourceRedirect, nil)
|
||||
req.Res <- client.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet
|
||||
return
|
||||
}
|
||||
|
||||
switch pa.sourceState {
|
||||
case sourceStateReady:
|
||||
pa.addClient(c, clientStatePreRemove)
|
||||
pa.removeClient(c)
|
||||
c.OnPathDescribeData(pa.sourceSdp, "", nil)
|
||||
req.Res <- client.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet
|
||||
return
|
||||
|
||||
case sourceStateWaitingDescribe:
|
||||
pa.addClient(c, clientStateWaitingDescribe)
|
||||
pa.describeRequests = append(pa.describeRequests, req)
|
||||
return
|
||||
|
||||
case sourceStateNotReady:
|
||||
if pa.conf.Fallback != "" {
|
||||
pa.addClient(c, clientStatePreRemove)
|
||||
pa.removeClient(c)
|
||||
c.OnPathDescribeData(nil, pa.conf.Fallback, nil)
|
||||
req.Res <- client.DescribeRes{nil, pa.conf.Fallback, nil} //nolint:govet
|
||||
return
|
||||
}
|
||||
|
||||
pa.addClient(c, clientStatePreRemove)
|
||||
pa.removeClient(c)
|
||||
c.OnPathDescribeData(nil, "", fmt.Errorf("no one is publishing to path '%s'", pa.name))
|
||||
req.Res <- client.DescribeRes{nil, "", client.ErrNoOnePublishing{pa.name}} //nolint:govet
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (pa *Path) onClientSetupPlay(c *client.Client, trackID int) error {
|
||||
if pa.sourceState != sourceStateReady {
|
||||
return fmt.Errorf("no one is publishing to path '%s'", pa.name)
|
||||
func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) {
|
||||
if req.TrackID >= pa.sourceTrackCount {
|
||||
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("track %d does not exist", req.TrackID)} //nolint:govet
|
||||
return
|
||||
}
|
||||
|
||||
if trackID >= pa.sourceTrackCount {
|
||||
return fmt.Errorf("track %d does not exist", trackID)
|
||||
}
|
||||
|
||||
if _, ok := pa.clients[c]; !ok {
|
||||
if _, ok := pa.clients[req.Client]; !ok {
|
||||
// prevent on-demand source from closing
|
||||
if pa.sourceCloseTimerStarted {
|
||||
pa.sourceCloseTimer = newEmptyTimer()
|
||||
@ -629,10 +628,29 @@ func (pa *Path) onClientSetupPlay(c *client.Client, trackID int) error {
|
||||
pa.runOnDemandCloseTimerStarted = false
|
||||
}
|
||||
|
||||
pa.addClient(c, clientStatePrePlay)
|
||||
pa.addClient(req.Client, clientStatePrePlay)
|
||||
}
|
||||
|
||||
return nil
|
||||
req.Res <- client.SetupPlayRes{pa, nil} //nolint:govet
|
||||
}
|
||||
|
||||
func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) {
|
||||
pa.fixedPublisherStart()
|
||||
pa.scheduleClose()
|
||||
|
||||
switch pa.sourceState {
|
||||
case sourceStateReady:
|
||||
pa.onClientSetupPlayPost(req)
|
||||
return
|
||||
|
||||
case sourceStateWaitingDescribe:
|
||||
pa.setupPlayRequests = append(pa.setupPlayRequests, req)
|
||||
return
|
||||
|
||||
case sourceStateNotReady:
|
||||
req.Res <- client.SetupPlayRes{nil, client.ErrNoOnePublishing{pa.name}} //nolint:govet
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (pa *Path) onClientPlay(c *client.Client) {
|
||||
@ -737,16 +755,18 @@ func (pa *Path) scheduleRunOnDemandClose() {
|
||||
}
|
||||
|
||||
func (pa *Path) scheduleClose() {
|
||||
if pa.closeTimerStarted ||
|
||||
pa.conf.Regexp == nil ||
|
||||
pa.hasClients() ||
|
||||
pa.source != nil {
|
||||
return
|
||||
}
|
||||
if pa.conf.Regexp != nil &&
|
||||
!pa.hasClients() &&
|
||||
pa.source == nil &&
|
||||
pa.sourceState != sourceStateWaitingDescribe &&
|
||||
!pa.sourceCloseTimerStarted &&
|
||||
!pa.runOnDemandCloseTimerStarted &&
|
||||
!pa.closeTimerStarted {
|
||||
|
||||
pa.closeTimer.Stop()
|
||||
pa.closeTimer = time.NewTimer(0)
|
||||
pa.closeTimerStarted = true
|
||||
pa.closeTimer.Stop()
|
||||
pa.closeTimer = time.NewTimer(0)
|
||||
pa.closeTimerStarted = true
|
||||
}
|
||||
}
|
||||
|
||||
// ConfName returns the configuration name of this path.
|
||||
|
@ -149,14 +149,14 @@ outer:
|
||||
case req := <-pm.clientDescribe:
|
||||
pathName, pathConf, err := pm.findPathConf(req.PathName)
|
||||
if err != nil {
|
||||
req.Res <- client.DescribeRes{nil, err} //nolint:govet
|
||||
req.Res <- client.DescribeRes{nil, "", err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
err = req.Client.Authenticate(pm.authMethods, pathConf.ReadIpsParsed,
|
||||
pathConf.ReadUser, pathConf.ReadPass, req.Req, nil)
|
||||
if err != nil {
|
||||
req.Res <- client.DescribeRes{nil, err} //nolint:govet
|
||||
req.Res <- client.DescribeRes{nil, "", err} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
@ -212,12 +212,7 @@ outer:
|
||||
pm.paths[req.PathName].OnPathManAnnounce(req)
|
||||
|
||||
case req := <-pm.clientSetupPlay:
|
||||
if _, ok := pm.paths[req.PathName]; !ok {
|
||||
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("no one is publishing to path '%s'", req.PathName)} //nolint:govet
|
||||
continue
|
||||
}
|
||||
|
||||
_, pathConf, err := pm.findPathConf(req.PathName)
|
||||
pathName, pathConf, err := pm.findPathConf(req.PathName)
|
||||
if err != nil {
|
||||
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet
|
||||
continue
|
||||
@ -239,6 +234,22 @@ outer:
|
||||
continue
|
||||
}
|
||||
|
||||
// create path if it doesn't exist
|
||||
if _, ok := pm.paths[req.PathName]; !ok {
|
||||
pa := path.New(
|
||||
pm.rtspPort,
|
||||
pm.readTimeout,
|
||||
pm.writeTimeout,
|
||||
pm.readBufferCount,
|
||||
pathName,
|
||||
pathConf,
|
||||
req.PathName,
|
||||
&pm.wg,
|
||||
pm.stats,
|
||||
pm)
|
||||
pm.paths[req.PathName] = pa
|
||||
}
|
||||
|
||||
pm.paths[req.PathName].OnPathManSetupPlay(req)
|
||||
|
||||
case <-pm.terminate:
|
||||
@ -259,13 +270,22 @@ outer:
|
||||
return
|
||||
}
|
||||
|
||||
case req := <-pm.clientDescribe:
|
||||
req.Res <- client.DescribeRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
case req, ok := <-pm.clientDescribe:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req := <-pm.clientAnnounce:
|
||||
case req, ok := <-pm.clientAnnounce:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
|
||||
case req := <-pm.clientSetupPlay:
|
||||
case req, ok := <-pm.clientSetupPlay:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
|
||||
}
|
||||
}
|
||||
|
203
main_test.go
203
main_test.go
@ -862,65 +862,162 @@ wait
|
||||
err = os.Chmod(onDemandFile, 0755)
|
||||
require.NoError(t, err)
|
||||
|
||||
p1, ok := testProgram(fmt.Sprintf("paths:\n"+
|
||||
" all:\n"+
|
||||
" runOnDemand: %s\n", onDemandFile))
|
||||
require.Equal(t, true, ok)
|
||||
defer p1.close()
|
||||
t.Run("describe", func(t *testing.T) {
|
||||
defer os.Remove(doneFile)
|
||||
|
||||
func() {
|
||||
conn, err := net.Dial("tcp", ownDockerIP+":8554")
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
p1, ok := testProgram(fmt.Sprintf("paths:\n"+
|
||||
" all:\n"+
|
||||
" runOnDemand: %s\n"+
|
||||
" runOnDemandCloseAfter: 2s\n", onDemandFile))
|
||||
require.Equal(t, true, ok)
|
||||
defer p1.close()
|
||||
|
||||
err = base.Request{
|
||||
Method: base.Describe,
|
||||
URL: base.MustParseURL("rtsp://localhost:8554/ondemand"),
|
||||
Header: base.Header{
|
||||
"CSeq": base.HeaderValue{"1"},
|
||||
},
|
||||
}.Write(bconn.Writer)
|
||||
require.NoError(t, err)
|
||||
func() {
|
||||
conn, err := net.Dial("tcp", ownDockerIP+":8554")
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
|
||||
var res base.Response
|
||||
err = res.Read(bconn.Reader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, base.StatusOK, res.StatusCode)
|
||||
err = base.Request{
|
||||
Method: base.Describe,
|
||||
URL: base.MustParseURL("rtsp://localhost:8554/ondemand"),
|
||||
Header: base.Header{
|
||||
"CSeq": base.HeaderValue{"1"},
|
||||
},
|
||||
}.Write(bconn.Writer)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = base.Request{
|
||||
Method: base.Setup,
|
||||
URL: base.MustParseURL("rtsp://localhost:8554/ondemand/trackID=0"),
|
||||
Header: base.Header{
|
||||
"CSeq": base.HeaderValue{"2"},
|
||||
"Transport": headers.Transport{
|
||||
Protocol: gortsplib.StreamProtocolTCP,
|
||||
Delivery: func() *base.StreamDelivery {
|
||||
v := base.StreamDeliveryUnicast
|
||||
return &v
|
||||
}(),
|
||||
Mode: func() *headers.TransportMode {
|
||||
v := headers.TransportModePlay
|
||||
return &v
|
||||
}(),
|
||||
InterleavedIds: &[2]int{0, 1},
|
||||
}.Write(),
|
||||
},
|
||||
}.Write(bconn.Writer)
|
||||
require.NoError(t, err)
|
||||
var res base.Response
|
||||
err = res.Read(bconn.Reader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, base.StatusOK, res.StatusCode)
|
||||
}()
|
||||
|
||||
err = res.Read(bconn.Reader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, base.StatusOK, res.StatusCode)
|
||||
}()
|
||||
|
||||
for {
|
||||
_, err := os.Stat(doneFile)
|
||||
if err == nil {
|
||||
break
|
||||
for {
|
||||
_, err := os.Stat(doneFile)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("describe and setup", func(t *testing.T) {
|
||||
defer os.Remove(doneFile)
|
||||
|
||||
p1, ok := testProgram(fmt.Sprintf("paths:\n"+
|
||||
" all:\n"+
|
||||
" runOnDemand: %s\n"+
|
||||
" runOnDemandCloseAfter: 2s\n", onDemandFile))
|
||||
require.Equal(t, true, ok)
|
||||
defer p1.close()
|
||||
|
||||
func() {
|
||||
conn, err := net.Dial("tcp", ownDockerIP+":8554")
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
|
||||
err = base.Request{
|
||||
Method: base.Describe,
|
||||
URL: base.MustParseURL("rtsp://localhost:8554/ondemand"),
|
||||
Header: base.Header{
|
||||
"CSeq": base.HeaderValue{"1"},
|
||||
},
|
||||
}.Write(bconn.Writer)
|
||||
require.NoError(t, err)
|
||||
|
||||
var res base.Response
|
||||
err = res.Read(bconn.Reader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, base.StatusOK, res.StatusCode)
|
||||
|
||||
err = base.Request{
|
||||
Method: base.Setup,
|
||||
URL: base.MustParseURL("rtsp://localhost:8554/ondemand/trackID=0"),
|
||||
Header: base.Header{
|
||||
"CSeq": base.HeaderValue{"2"},
|
||||
"Transport": headers.Transport{
|
||||
Protocol: gortsplib.StreamProtocolTCP,
|
||||
Delivery: func() *base.StreamDelivery {
|
||||
v := base.StreamDeliveryUnicast
|
||||
return &v
|
||||
}(),
|
||||
Mode: func() *headers.TransportMode {
|
||||
v := headers.TransportModePlay
|
||||
return &v
|
||||
}(),
|
||||
InterleavedIds: &[2]int{0, 1},
|
||||
}.Write(),
|
||||
},
|
||||
}.Write(bconn.Writer)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = res.Read(bconn.Reader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, base.StatusOK, res.StatusCode)
|
||||
}()
|
||||
|
||||
for {
|
||||
_, err := os.Stat(doneFile)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("setup", func(t *testing.T) {
|
||||
defer os.Remove(doneFile)
|
||||
|
||||
p1, ok := testProgram(fmt.Sprintf("paths:\n"+
|
||||
" all:\n"+
|
||||
" runOnDemand: %s\n"+
|
||||
" runOnDemandCloseAfter: 2s\n", onDemandFile))
|
||||
require.Equal(t, true, ok)
|
||||
defer p1.close()
|
||||
|
||||
func() {
|
||||
conn, err := net.Dial("tcp", ownDockerIP+":8554")
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
|
||||
err = base.Request{
|
||||
Method: base.Setup,
|
||||
URL: base.MustParseURL("rtsp://localhost:8554/ondemand/trackID=0"),
|
||||
Header: base.Header{
|
||||
"CSeq": base.HeaderValue{"1"},
|
||||
"Transport": headers.Transport{
|
||||
Protocol: gortsplib.StreamProtocolTCP,
|
||||
Delivery: func() *base.StreamDelivery {
|
||||
v := base.StreamDeliveryUnicast
|
||||
return &v
|
||||
}(),
|
||||
Mode: func() *headers.TransportMode {
|
||||
v := headers.TransportModePlay
|
||||
return &v
|
||||
}(),
|
||||
InterleavedIds: &[2]int{0, 1},
|
||||
}.Write(),
|
||||
},
|
||||
}.Write(bconn.Writer)
|
||||
require.NoError(t, err)
|
||||
|
||||
var res base.Response
|
||||
err = res.Read(bconn.Reader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, base.StatusOK, res.StatusCode)
|
||||
}()
|
||||
|
||||
for {
|
||||
_, err := os.Stat(doneFile)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHotReloading(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user