diff --git a/README.md b/README.md index 538ad7d2..8b17419b 100644 --- a/README.md +++ b/README.md @@ -1272,6 +1272,7 @@ The server allows to specify commands that are executed when a certain event hap `runOnConnect` allows to run a command when a client connects to the server: ```yml +# Command to run when a client connects to the server. # This is terminated with SIGINT when a client disconnects from the server. # The following environment variables are available: # * RTSP_PORT: RTSP server port @@ -1285,6 +1286,7 @@ runOnConnectRestart: no `runOnDisconnect` allows to run a command when a client disconnects from the server: ```yml +# Command to run when a client disconnects from the server. # Environment variables are the same of runOnConnect. runOnDisconnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&conn_id=$MTX_CONN_ID ``` @@ -1294,7 +1296,8 @@ runOnDisconnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&c ```yml paths: mypath: - # This is terminated with SIGINT when the program closes. + # Command to run when this path is initialized. + # This can be used to publish a stream when the server is launched. # The following environment variables are available: # * MTX_PATH: path name # * RTSP_PORT: RTSP server port @@ -1310,6 +1313,8 @@ paths: ```yml paths: mypath: + # Command to run when this path is requested by a reader + # and no one is publishing to this path yet. # This is terminated with SIGINT when the program closes. # The following environment variables are available: # * MTX_PATH: path name @@ -1326,6 +1331,8 @@ paths: ```yml pathDefaults: + # Command to run when the stream is ready to be read, whenever it is + # published by a client or pulled from a server / camera. # This is terminated with SIGINT when the stream is not ready anymore. # The following environment variables are available: # * MTX_PATH: path name @@ -1344,6 +1351,7 @@ pathDefaults: ```yml pathDefaults: + # Command to run when the stream is not available anymore. # Environment variables are the same of runOnReady. runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID ``` @@ -1352,6 +1360,7 @@ pathDefaults: ```yml pathDefaults: + # Command to run when a client starts reading. # This is terminated with SIGINT when a client stops reading. # The following environment variables are available: # * MTX_PATH: path name diff --git a/internal/core/path.go b/internal/core/path.go index 7b57a94d..e6cdc531 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -201,8 +201,8 @@ type path struct { chStaticSourceSetReady chan defs.PathSourceStaticSetReadyReq chStaticSourceSetNotReady chan defs.PathSourceStaticSetNotReadyReq chDescribe chan pathDescribeReq - chRemovePublisher chan pathRemovePublisherReq chAddPublisher chan pathAddPublisherReq + chRemovePublisher chan pathRemovePublisherReq chStartPublisher chan pathStartPublisherReq chStopPublisher chan pathStopPublisherReq chAddReader chan pathAddReaderReq @@ -254,8 +254,8 @@ func newPath( chStaticSourceSetReady: make(chan defs.PathSourceStaticSetReadyReq), chStaticSourceSetNotReady: make(chan defs.PathSourceStaticSetNotReadyReq), chDescribe: make(chan pathDescribeReq), - chRemovePublisher: make(chan pathRemovePublisherReq), chAddPublisher: make(chan pathAddPublisherReq), + chRemovePublisher: make(chan pathRemovePublisherReq), chStartPublisher: make(chan pathStartPublisherReq), chStopPublisher: make(chan pathStopPublisherReq), chAddReader: make(chan pathAddReaderReq), @@ -357,7 +357,7 @@ func (pa *path) run() { } if pa.onUnDemandHook != nil { - pa.onUnDemandHook("path closed") + pa.onUnDemandHook("path destroyed") } pa.Log(logger.Debug, "destroyed: %v", err) @@ -414,6 +414,9 @@ func (pa *path) runInner() error { return fmt.Errorf("not in use") } + case req := <-pa.chAddPublisher: + pa.doAddPublisher(req) + case req := <-pa.chRemovePublisher: pa.doRemovePublisher(req) @@ -421,9 +424,6 @@ func (pa *path) runInner() error { return fmt.Errorf("not in use") } - case req := <-pa.chAddPublisher: - pa.doAddPublisher(req) - case req := <-pa.chStartPublisher: pa.doStartPublisher(req) @@ -519,22 +519,11 @@ func (pa *path) doSourceStaticSetReady(req defs.PathSourceStaticSetReadyReq) { if pa.conf.HasOnDemandStaticSource() { pa.onDemandStaticSourceReadyTimer.Stop() pa.onDemandStaticSourceReadyTimer = newEmptyTimer() - pa.onDemandStaticSourceScheduleClose() - - for _, req := range pa.describeRequestsOnHold { - req.res <- pathDescribeRes{ - stream: pa.stream, - } - } - pa.describeRequestsOnHold = nil - - for _, req := range pa.readerAddRequestsOnHold { - pa.addReaderPost(req) - } - pa.readerAddRequestsOnHold = nil } + pa.consumeOnHoldRequests() + req.Res <- defs.PathSourceStaticSetReadyRes{Stream: pa.stream} } @@ -649,25 +638,14 @@ func (pa *path) doStartPublisher(req pathStartPublisherReq) { pa.name, mediaInfo(req.desc.Medias)) - if pa.conf.HasOnDemandPublisher() { + if pa.conf.HasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial { pa.onDemandPublisherReadyTimer.Stop() pa.onDemandPublisherReadyTimer = newEmptyTimer() - pa.onDemandPublisherScheduleClose() - - for _, req := range pa.describeRequestsOnHold { - req.res <- pathDescribeRes{ - stream: pa.stream, - } - } - pa.describeRequestsOnHold = nil - - for _, req := range pa.readerAddRequestsOnHold { - pa.addReaderPost(req) - } - pa.readerAddRequestsOnHold = nil } + pa.consumeOnHoldRequests() + req.res <- pathStartPublisherRes{stream: pa.stream} } @@ -840,20 +818,15 @@ func (pa *path) onDemandPublisherScheduleClose() { } func (pa *path) onDemandPublisherStop(reason string) { - if pa.source != nil { - pa.source.(publisher).close() - pa.executeRemovePublisher() - } - if pa.onDemandPublisherState == pathOnDemandStateClosing { pa.onDemandPublisherCloseTimer.Stop() pa.onDemandPublisherCloseTimer = newEmptyTimer() } - pa.onDemandPublisherState = pathOnDemandStateInitial - pa.onUnDemandHook(reason) pa.onUnDemandHook = nil + + pa.onDemandPublisherState = pathOnDemandStateInitial } func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { @@ -881,6 +854,20 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error return nil } +func (pa *path) consumeOnHoldRequests() { + for _, req := range pa.describeRequestsOnHold { + req.res <- pathDescribeRes{ + stream: pa.stream, + } + } + pa.describeRequestsOnHold = nil + + for _, req := range pa.readerAddRequestsOnHold { + pa.addReaderPost(req) + } + pa.readerAddRequestsOnHold = nil +} + func (pa *path) setNotReady() { pa.parent.pathNotReady(pa) @@ -1048,16 +1035,6 @@ func (pa *path) describe(req pathDescribeReq) pathDescribeRes { } } -// removePublisher is called by a publisher. -func (pa *path) removePublisher(req pathRemovePublisherReq) { - req.res = make(chan struct{}) - select { - case pa.chRemovePublisher <- req: - <-req.res - case <-pa.ctx.Done(): - } -} - // addPublisher is called by a publisher through pathManager. func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes { select { @@ -1068,6 +1045,16 @@ func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes { } } +// removePublisher is called by a publisher. +func (pa *path) removePublisher(req pathRemovePublisherReq) { + req.res = make(chan struct{}) + select { + case pa.chRemovePublisher <- req: + <-req.res + case <-pa.ctx.Done(): + } +} + // startPublisher is called by a publisher. func (pa *path) startPublisher(req pathStartPublisherReq) pathStartPublisherRes { req.res = make(chan pathStartPublisherRes) diff --git a/mediamtx.yml b/mediamtx.yml index a9c60706..e5beee19 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -468,7 +468,8 @@ pathDefaults: # Restart the command if it exits. runOnInitRestart: no - # Command to run when this path is requested by a reader. + # Command to run when this path is requested by a reader + # and no one is publishing to this path yet. # This can be used to publish a stream on demand. # This is terminated with SIGINT when the path is not requested anymore. # The following environment variables are available: