diff --git a/internal/client/client.go b/internal/client/client.go index 3a3c47dc..28346045 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -27,10 +27,21 @@ const ( pauseAfterAuthError = 2 * time.Second ) +// ErrNoOnePublishing is a "no one is publishing" error. +type ErrNoOnePublishing struct { + PathName string +} + +// Error implements the error interface. +func (e ErrNoOnePublishing) Error() string { + return fmt.Sprintf("no one is publishing to path '%s'", e.PathName) +} + // DescribeRes is a client describe response. type DescribeRes struct { - Path Path - Err error + SDP []byte + Redirect string + Err error } // DescribeReq is a client describe request. @@ -95,12 +106,6 @@ type PauseReq struct { Res chan struct{} } -type describeData struct { - sdp []byte - redirect string - err error -} - // Path is implemented by path.Path. type Path interface { Name() string @@ -143,8 +148,7 @@ type Client struct { onPublishCmd *externalcmd.Cmd // in - describeData chan describeData // from path - terminate chan struct{} + terminate chan struct{} } // New allocates a Client. @@ -233,8 +237,6 @@ func (c *Client) run() { }, fmt.Errorf("invalid path (%s)", req.URL) } - c.describeData = make(chan describeData) - resc := make(chan DescribeRes) c.parent.OnClientDescribe(DescribeReq{c, reqPath, req, resc}) res := <-resc @@ -252,6 +254,11 @@ func (c *Client) run() { } return terr.Response, errTerminated + case ErrNoOnePublishing: + return &base.Response{ + StatusCode: base.StatusNotFound, + }, res.Err + default: return &base.Response{ StatusCode: base.StatusBadRequest, @@ -259,58 +266,23 @@ func (c *Client) run() { } } - c.path = res.Path - - select { - case res := <-c.describeData: - resc := make(chan struct{}) - c.path.OnClientRemove(RemoveReq{c, resc}) - <-resc - c.path = nil - - if res.err != nil { - c.log(logger.Info, "no one is publishing to path '%s'", reqPath) - return &base.Response{ - StatusCode: base.StatusNotFound, - }, nil - } - - if res.redirect != "" { - return &base.Response{ - StatusCode: base.StatusMovedPermanently, - Header: base.Header{ - "Location": base.HeaderValue{res.redirect}, - }, - }, nil - } - + if res.Redirect != "" { return &base.Response{ - StatusCode: base.StatusOK, + StatusCode: base.StatusMovedPermanently, Header: base.Header{ - "Content-Base": base.HeaderValue{req.URL.String() + "/"}, - "Content-Type": base.HeaderValue{"application/sdp"}, + "Location": base.HeaderValue{res.Redirect}, }, - Body: res.sdp, }, nil - - case <-c.terminate: - ch := c.describeData - go func() { - for range ch { - } - }() - - resc := make(chan struct{}) - c.path.OnClientRemove(RemoveReq{c, resc}) - <-resc - c.path = nil - - close(c.describeData) - - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, errTerminated } + + return &base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Base": base.HeaderValue{req.URL.String() + "/"}, + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: res.SDP, + }, nil } onAnnounce := func(req *base.Request, tracks gortsplib.Tracks) (*base.Response, error) { @@ -353,6 +325,20 @@ func (c *Client) run() { } onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { + if th.Protocol == gortsplib.StreamProtocolUDP { + if _, ok := c.protocols[gortsplib.StreamProtocolUDP]; !ok { + return &base.Response{ + StatusCode: base.StatusUnsupportedTransport, + }, nil + } + } else { + if _, ok := c.protocols[gortsplib.StreamProtocolTCP]; !ok { + return &base.Response{ + StatusCode: base.StatusUnsupportedTransport, + }, nil + } + } + switch c.conn.State() { case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play pathAndQuery, ok := req.URL.RTSPPathAndQuery() @@ -377,56 +363,6 @@ func (c *Client) run() { }, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath) } - // play with UDP - if th.Protocol == gortsplib.StreamProtocolUDP { - if _, ok := c.protocols[gortsplib.StreamProtocolUDP]; !ok { - return &base.Response{ - StatusCode: base.StatusUnsupportedTransport, - }, nil - } - - resc := make(chan SetupPlayRes) - c.parent.OnClientSetupPlay(SetupPlayReq{c, reqPath, trackID, req, resc}) - res := <-resc - - if res.Err != nil { - switch terr := res.Err.(type) { - case errAuthNotCritical: - return terr.Response, nil - - case errAuthCritical: - // wait some seconds to stop brute force attacks - select { - case <-time.After(pauseAfterAuthError): - case <-c.terminate: - } - return terr.Response, errTerminated - - default: - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, res.Err - } - } - - c.path = res.Path - - return &base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Session": base.HeaderValue{sessionID}, - }, - }, nil - } - - // play with TCP - - if _, ok := c.protocols[gortsplib.StreamProtocolTCP]; !ok { - return &base.Response{ - StatusCode: base.StatusUnsupportedTransport, - }, nil - } - resc := make(chan SetupPlayRes) c.parent.OnClientSetupPlay(SetupPlayReq{c, reqPath, trackID, req, resc}) res := <-resc @@ -444,6 +380,11 @@ func (c *Client) run() { } return terr.Response, errTerminated + case ErrNoOnePublishing: + return &base.Response{ + StatusCode: base.StatusNotFound, + }, res.Err + default: return &base.Response{ StatusCode: base.StatusBadRequest, @@ -453,13 +394,6 @@ func (c *Client) run() { c.path = res.Path - return &base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Session": base.HeaderValue{sessionID}, - }, - }, nil - default: // record reqPathAndQuery, ok := req.URL.RTSPPathAndQuery() if !ok { @@ -474,38 +408,14 @@ func (c *Client) run() { }, fmt.Errorf("invalid path: must begin with '%s', but is '%s'", c.path.Name(), reqPathAndQuery) } - - // record with UDP - if th.Protocol == gortsplib.StreamProtocolUDP { - if _, ok := c.protocols[gortsplib.StreamProtocolUDP]; !ok { - return &base.Response{ - StatusCode: base.StatusUnsupportedTransport, - }, nil - } - - return &base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Session": base.HeaderValue{sessionID}, - }, - }, nil - } - - // record with TCP - - if _, ok := c.protocols[gortsplib.StreamProtocolTCP]; !ok { - return &base.Response{ - StatusCode: base.StatusUnsupportedTransport, - }, nil - } - - return &base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Session": base.HeaderValue{sessionID}, - }, - }, nil } + + return &base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Session": base.HeaderValue{sessionID}, + }, + }, nil } onPlay := func(req *base.Request) (*base.Response, error) { @@ -801,8 +711,3 @@ func (c *Client) OnReaderFrame(trackID int, streamType base.StreamType, buf []by c.conn.WriteFrame(trackID, streamType, buf) } - -// OnPathDescribeData is called by path.Path. -func (c *Client) OnPathDescribeData(sdp []byte, redirect string, err error) { - c.describeData <- describeData{sdp, redirect, err} -} diff --git a/internal/path/path.go b/internal/path/path.go index 7726282e..04aa21ad 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -57,8 +57,7 @@ func (*sourceRedirect) IsSource() {} type clientState int const ( - clientStateWaitingDescribe clientState = iota - clientStatePrePlay + clientStatePrePlay clientState = iota clientStatePlay clientStatePreRecord clientStateRecord @@ -88,6 +87,8 @@ type Path struct { clients map[*client.Client]clientState clientsWg sync.WaitGroup + describeRequests []client.DescribeReq + setupPlayRequests []client.SetupPlayReq source source sourceTrackCount int sourceSdp []byte @@ -196,12 +197,15 @@ outer: for { select { case <-pa.describeTimer.C: - for c, state := range pa.clients { - if state == clientStateWaitingDescribe { - pa.removeClient(c) - c.OnPathDescribeData(nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)) - } + for _, req := range pa.describeRequests { + req.Res <- client.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet } + pa.describeRequests = nil + + for _, req := range pa.setupPlayRequests { + req.Res <- client.SetupPlayRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet + } + pa.setupPlayRequests = nil // set state after removeClient(), so schedule* works once pa.sourceState = sourceStateNotReady @@ -238,23 +242,10 @@ outer: pa.onSourceSetNotReady() case req := <-pa.clientDescribe: - if _, ok := pa.clients[req.Client]; ok { - req.Res <- client.DescribeRes{nil, fmt.Errorf("already subscribed")} //nolint:govet - continue - } - - // reply immediately - req.Res <- client.DescribeRes{pa, nil} //nolint:govet - - pa.onClientDescribe(req.Client) + pa.onClientDescribe(req) case req := <-pa.clientSetupPlay: - err := pa.onClientSetupPlay(req.Client, req.TrackID) - if err != nil { - req.Res <- client.SetupPlayRes{nil, err} //nolint:govet - continue - } - req.Res <- client.SetupPlayRes{pa, nil} //nolint:govet + pa.onClientSetupPlay(req) case req := <-pa.clientPlay: pa.onClientPlay(req.Client) @@ -317,6 +308,14 @@ outer: pa.onDemandCmd.Close() } + for _, req := range pa.describeRequests { + req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet + } + + for _, req := range pa.setupPlayRequests { + req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet + } + for c, state := range pa.clients { if state != clientStatePreRemove { switch state { @@ -361,7 +360,7 @@ func (pa *Path) exhaustChannels() { if !ok { return } - req.Res <- client.DescribeRes{nil, fmt.Errorf("terminated")} //nolint:govet + req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet case req, ok := <-pa.clientAnnounce: if !ok { @@ -485,7 +484,7 @@ func (pa *Path) removeClient(c *client.Client) { // close all clients that are reading or waiting to read for oc, state := range pa.clients { - if state != clientStatePreRemove && state != clientStateWaitingDescribe { + if state != clientStatePreRemove { pa.removeClient(oc) pa.parent.OnPathClientClose(oc) } @@ -505,13 +504,15 @@ func (pa *Path) onSourceSetReady() { pa.sourceState = sourceStateReady - // reply to all clients that are waiting for a description - for c, state := range pa.clients { - if state == clientStateWaitingDescribe { - pa.removeClient(c) - c.OnPathDescribeData(pa.sourceSdp, "", nil) - } + for _, req := range pa.describeRequests { + req.Res <- client.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet } + pa.describeRequests = nil + + for _, req := range pa.setupPlayRequests { + pa.onClientSetupPlayPost(req) + } + pa.setupPlayRequests = nil pa.scheduleSourceClose() pa.scheduleRunOnDemandClose() @@ -523,9 +524,6 @@ func (pa *Path) onSourceSetNotReady() { // close all clients that are reading or waiting to read for c, state := range pa.clients { - if state == clientStateWaitingDescribe { - panic("not possible") - } if c != pa.source && state != clientStatePreRemove { pa.removeClient(c) pa.parent.OnPathClientClose(c) @@ -533,21 +531,9 @@ func (pa *Path) onSourceSetNotReady() { } } -func (pa *Path) onClientDescribe(c *client.Client) { - // prevent on-demand source from closing - if pa.sourceCloseTimerStarted { - pa.sourceCloseTimer = newEmptyTimer() - pa.sourceCloseTimerStarted = false - } - - // prevent on-demand command from closing - if pa.runOnDemandCloseTimerStarted { - pa.runOnDemandCloseTimer = newEmptyTimer() - pa.runOnDemandCloseTimerStarted = false - } - - // start on-demand source +func (pa *Path) fixedPublisherStart() { if pa.hasExternalSource() { + // start on-demand source if pa.source == nil { pa.startExternalSource() @@ -555,11 +541,18 @@ func (pa *Path) onClientDescribe(c *client.Client) { pa.describeTimer = time.NewTimer(pa.conf.SourceOnDemandStartTimeout) pa.sourceState = sourceStateWaitingDescribe } + + } else { + // reset timer + if pa.sourceCloseTimerStarted { + pa.sourceCloseTimer.Stop() + pa.sourceCloseTimer = time.NewTimer(pa.conf.SourceOnDemandCloseAfter) + } } } - // start on-demand command if pa.conf.RunOnDemand != "" { + // start on-demand command if pa.onDemandCmd == nil { pa.Log(logger.Info, "on demand command started") pa.onDemandCmd = externalcmd.New(pa.conf.RunOnDemand, pa.conf.RunOnDemandRestart, externalcmd.Environment{ @@ -571,52 +564,58 @@ func (pa *Path) onClientDescribe(c *client.Client) { pa.describeTimer = time.NewTimer(pa.conf.RunOnDemandStartTimeout) pa.sourceState = sourceStateWaitingDescribe } + + } else { + // reset timer + if pa.runOnDemandCloseTimerStarted { + pa.runOnDemandCloseTimer.Stop() + pa.runOnDemandCloseTimer = time.NewTimer(pa.conf.RunOnDemandCloseAfter) + } } } +} + +func (pa *Path) onClientDescribe(req client.DescribeReq) { + if _, ok := pa.clients[req.Client]; ok { + req.Res <- client.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet + return + } + + pa.fixedPublisherStart() + pa.scheduleClose() if _, ok := pa.source.(*sourceRedirect); ok { - pa.addClient(c, clientStatePreRemove) - pa.removeClient(c) - c.OnPathDescribeData(nil, pa.conf.SourceRedirect, nil) + req.Res <- client.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet return } switch pa.sourceState { case sourceStateReady: - pa.addClient(c, clientStatePreRemove) - pa.removeClient(c) - c.OnPathDescribeData(pa.sourceSdp, "", nil) + req.Res <- client.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet return case sourceStateWaitingDescribe: - pa.addClient(c, clientStateWaitingDescribe) + pa.describeRequests = append(pa.describeRequests, req) return case sourceStateNotReady: if pa.conf.Fallback != "" { - pa.addClient(c, clientStatePreRemove) - pa.removeClient(c) - c.OnPathDescribeData(nil, pa.conf.Fallback, nil) + req.Res <- client.DescribeRes{nil, pa.conf.Fallback, nil} //nolint:govet return } - pa.addClient(c, clientStatePreRemove) - pa.removeClient(c) - c.OnPathDescribeData(nil, "", fmt.Errorf("no one is publishing to path '%s'", pa.name)) + req.Res <- client.DescribeRes{nil, "", client.ErrNoOnePublishing{pa.name}} //nolint:govet return } } -func (pa *Path) onClientSetupPlay(c *client.Client, trackID int) error { - if pa.sourceState != sourceStateReady { - return fmt.Errorf("no one is publishing to path '%s'", pa.name) +func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) { + if req.TrackID >= pa.sourceTrackCount { + req.Res <- client.SetupPlayRes{nil, fmt.Errorf("track %d does not exist", req.TrackID)} //nolint:govet + return } - if trackID >= pa.sourceTrackCount { - return fmt.Errorf("track %d does not exist", trackID) - } - - if _, ok := pa.clients[c]; !ok { + if _, ok := pa.clients[req.Client]; !ok { // prevent on-demand source from closing if pa.sourceCloseTimerStarted { pa.sourceCloseTimer = newEmptyTimer() @@ -629,10 +628,29 @@ func (pa *Path) onClientSetupPlay(c *client.Client, trackID int) error { pa.runOnDemandCloseTimerStarted = false } - pa.addClient(c, clientStatePrePlay) + pa.addClient(req.Client, clientStatePrePlay) } - return nil + req.Res <- client.SetupPlayRes{pa, nil} //nolint:govet +} + +func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) { + pa.fixedPublisherStart() + pa.scheduleClose() + + switch pa.sourceState { + case sourceStateReady: + pa.onClientSetupPlayPost(req) + return + + case sourceStateWaitingDescribe: + pa.setupPlayRequests = append(pa.setupPlayRequests, req) + return + + case sourceStateNotReady: + req.Res <- client.SetupPlayRes{nil, client.ErrNoOnePublishing{pa.name}} //nolint:govet + return + } } func (pa *Path) onClientPlay(c *client.Client) { @@ -737,16 +755,18 @@ func (pa *Path) scheduleRunOnDemandClose() { } func (pa *Path) scheduleClose() { - if pa.closeTimerStarted || - pa.conf.Regexp == nil || - pa.hasClients() || - pa.source != nil { - return - } + if pa.conf.Regexp != nil && + !pa.hasClients() && + pa.source == nil && + pa.sourceState != sourceStateWaitingDescribe && + !pa.sourceCloseTimerStarted && + !pa.runOnDemandCloseTimerStarted && + !pa.closeTimerStarted { - pa.closeTimer.Stop() - pa.closeTimer = time.NewTimer(0) - pa.closeTimerStarted = true + pa.closeTimer.Stop() + pa.closeTimer = time.NewTimer(0) + pa.closeTimerStarted = true + } } // ConfName returns the configuration name of this path. diff --git a/internal/pathman/pathman.go b/internal/pathman/pathman.go index f16f2f6d..6bb6ab10 100644 --- a/internal/pathman/pathman.go +++ b/internal/pathman/pathman.go @@ -149,14 +149,14 @@ outer: case req := <-pm.clientDescribe: pathName, pathConf, err := pm.findPathConf(req.PathName) if err != nil { - req.Res <- client.DescribeRes{nil, err} //nolint:govet + req.Res <- client.DescribeRes{nil, "", err} //nolint:govet continue } err = req.Client.Authenticate(pm.authMethods, pathConf.ReadIpsParsed, pathConf.ReadUser, pathConf.ReadPass, req.Req, nil) if err != nil { - req.Res <- client.DescribeRes{nil, err} //nolint:govet + req.Res <- client.DescribeRes{nil, "", err} //nolint:govet continue } @@ -212,12 +212,7 @@ outer: pm.paths[req.PathName].OnPathManAnnounce(req) case req := <-pm.clientSetupPlay: - if _, ok := pm.paths[req.PathName]; !ok { - req.Res <- client.SetupPlayRes{nil, fmt.Errorf("no one is publishing to path '%s'", req.PathName)} //nolint:govet - continue - } - - _, pathConf, err := pm.findPathConf(req.PathName) + pathName, pathConf, err := pm.findPathConf(req.PathName) if err != nil { req.Res <- client.SetupPlayRes{nil, err} //nolint:govet continue @@ -239,6 +234,22 @@ outer: continue } + // create path if it doesn't exist + if _, ok := pm.paths[req.PathName]; !ok { + pa := path.New( + pm.rtspPort, + pm.readTimeout, + pm.writeTimeout, + pm.readBufferCount, + pathName, + pathConf, + req.PathName, + &pm.wg, + pm.stats, + pm) + pm.paths[req.PathName] = pa + } + pm.paths[req.PathName].OnPathManSetupPlay(req) case <-pm.terminate: @@ -259,13 +270,22 @@ outer: return } - case req := <-pm.clientDescribe: - req.Res <- client.DescribeRes{nil, fmt.Errorf("terminated")} //nolint:govet + case req, ok := <-pm.clientDescribe: + if !ok { + return + } + req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet - case req := <-pm.clientAnnounce: + case req, ok := <-pm.clientAnnounce: + if !ok { + return + } req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet - case req := <-pm.clientSetupPlay: + case req, ok := <-pm.clientSetupPlay: + if !ok { + return + } req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet } } diff --git a/main_test.go b/main_test.go index 002caea7..deacf564 100644 --- a/main_test.go +++ b/main_test.go @@ -862,65 +862,162 @@ wait err = os.Chmod(onDemandFile, 0755) require.NoError(t, err) - p1, ok := testProgram(fmt.Sprintf("paths:\n"+ - " all:\n"+ - " runOnDemand: %s\n", onDemandFile)) - require.Equal(t, true, ok) - defer p1.close() + t.Run("describe", func(t *testing.T) { + defer os.Remove(doneFile) - func() { - conn, err := net.Dial("tcp", ownDockerIP+":8554") - require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + p1, ok := testProgram(fmt.Sprintf("paths:\n"+ + " all:\n"+ + " runOnDemand: %s\n"+ + " runOnDemandCloseAfter: 2s\n", onDemandFile)) + require.Equal(t, true, ok) + defer p1.close() - err = base.Request{ - Method: base.Describe, - URL: base.MustParseURL("rtsp://localhost:8554/ondemand"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) + func() { + conn, err := net.Dial("tcp", ownDockerIP+":8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - var res base.Response - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) + err = base.Request{ + Method: base.Describe, + URL: base.MustParseURL("rtsp://localhost:8554/ondemand"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) - err = base.Request{ - Method: base.Setup, - URL: base.MustParseURL("rtsp://localhost:8554/ondemand/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - "Transport": headers.Transport{ - Protocol: gortsplib.StreamProtocolTCP, - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - Mode: func() *headers.TransportMode { - v := headers.TransportModePlay - return &v - }(), - InterleavedIds: &[2]int{0, 1}, - }.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + }() - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - }() - - for { - _, err := os.Stat(doneFile) - if err == nil { - break + for { + _, err := os.Stat(doneFile) + if err == nil { + break + } + time.Sleep(500 * time.Millisecond) } - time.Sleep(500 * time.Millisecond) - } + }) + + t.Run("describe and setup", func(t *testing.T) { + defer os.Remove(doneFile) + + p1, ok := testProgram(fmt.Sprintf("paths:\n"+ + " all:\n"+ + " runOnDemand: %s\n"+ + " runOnDemandCloseAfter: 2s\n", onDemandFile)) + require.Equal(t, true, ok) + defer p1.close() + + func() { + conn, err := net.Dial("tcp", ownDockerIP+":8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.Describe, + URL: base.MustParseURL("rtsp://localhost:8554/ondemand"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/ondemand/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Transport": headers.Transport{ + Protocol: gortsplib.StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + }() + + for { + _, err := os.Stat(doneFile) + if err == nil { + break + } + time.Sleep(500 * time.Millisecond) + } + }) + + t.Run("setup", func(t *testing.T) { + defer os.Remove(doneFile) + + p1, ok := testProgram(fmt.Sprintf("paths:\n"+ + " all:\n"+ + " runOnDemand: %s\n"+ + " runOnDemandCloseAfter: 2s\n", onDemandFile)) + require.Equal(t, true, ok) + defer p1.close() + + func() { + conn, err := net.Dial("tcp", ownDockerIP+":8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/ondemand/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": headers.Transport{ + Protocol: gortsplib.StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + }() + + for { + _, err := os.Stat(doneFile) + if err == nil { + break + } + time.Sleep(500 * time.Millisecond) + } + }) } func TestHotReloading(t *testing.T) {