From e6fa56dd0629aaabab567b4f099172ce0f49bbc5 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sat, 30 Nov 2024 11:25:31 +0100 Subject: [PATCH] rpicamera: fix restarting stream in case of disconnections or driver errors (#3988) --- internal/staticsources/rpicamera/camera.go | 78 +++++++++++-------- .../rpicamera/camera_disabled.go | 4 + .../rpicamera/mtxrpicamdownloader/VERSION | 2 +- internal/staticsources/rpicamera/source.go | 3 + 4 files changed, 52 insertions(+), 35 deletions(-) diff --git a/internal/staticsources/rpicamera/camera.go b/internal/staticsources/rpicamera/camera.go index 15b35a99..c06d55a0 100644 --- a/internal/staticsources/rpicamera/camera.go +++ b/internal/staticsources/rpicamera/camera.go @@ -17,9 +17,9 @@ type camera struct { Params params OnData func(time.Duration, [][]byte) - cmd *exec.Cmd - pipeConf *pipe - pipeVideo *pipe + cmd *exec.Cmd + pipeOut *pipe + pipeIn *pipe waitDone chan error readerDone chan error @@ -31,22 +31,22 @@ func (c *camera) initialize() error { return err } - c.pipeConf, err = newPipe() + c.pipeOut, err = newPipe() if err != nil { freeComponent() return err } - c.pipeVideo, err = newPipe() + c.pipeIn, err = newPipe() if err != nil { - c.pipeConf.close() + c.pipeOut.close() freeComponent() return err } env := []string{ - "PIPE_CONF_FD=" + strconv.FormatInt(int64(c.pipeConf.readFD), 10), - "PIPE_VIDEO_FD=" + strconv.FormatInt(int64(c.pipeVideo.writeFD), 10), + "PIPE_CONF_FD=" + strconv.FormatInt(int64(c.pipeOut.readFD), 10), + "PIPE_VIDEO_FD=" + strconv.FormatInt(int64(c.pipeIn.writeFD), 10), "LD_LIBRARY_PATH=" + dumpPath, } @@ -58,13 +58,13 @@ func (c *camera) initialize() error { err = c.cmd.Start() if err != nil { - c.pipeConf.close() - c.pipeVideo.close() + c.pipeOut.close() + c.pipeIn.close() freeComponent() return err } - c.pipeConf.write(append([]byte{'c'}, c.Params.serialize()...)) + c.pipeOut.write(append([]byte{'c'}, c.Params.serialize()...)) c.waitDone = make(chan error) go func() { @@ -78,18 +78,18 @@ func (c *camera) initialize() error { select { case err := <-c.waitDone: - c.pipeConf.close() - c.pipeVideo.close() + c.pipeOut.close() + c.pipeIn.close() <-c.readerDone freeComponent() return fmt.Errorf("process exited unexpectedly: %v", err) case err := <-c.readerDone: if err != nil { - c.pipeConf.write([]byte{'e'}) + c.pipeOut.write([]byte{'e'}) <-c.waitDone - c.pipeConf.close() - c.pipeVideo.close() + c.pipeOut.close() + c.pipeIn.close() freeComponent() return err } @@ -98,26 +98,27 @@ func (c *camera) initialize() error { c.readerDone = make(chan error) go func() { c.readerDone <- c.readData() + close(c.readerDone) }() return nil } func (c *camera) close() { - c.pipeConf.write([]byte{'e'}) + c.pipeOut.write([]byte{'e'}) <-c.waitDone - c.pipeConf.close() - c.pipeVideo.close() + c.pipeOut.close() + c.pipeIn.close() <-c.readerDone freeComponent() } func (c *camera) reloadParams(params params) { - c.pipeConf.write(append([]byte{'c'}, params.serialize()...)) + c.pipeOut.write(append([]byte{'c'}, params.serialize()...)) } func (c *camera) readReady() error { - buf, err := c.pipeVideo.read() + buf, err := c.pipeIn.read() if err != nil { return err } @@ -136,24 +137,33 @@ func (c *camera) readReady() error { func (c *camera) readData() error { for { - buf, err := c.pipeVideo.read() + buf, err := c.pipeIn.read() if err != nil { return err } - if buf[0] != 'b' { + switch buf[0] { + case 'e': + return fmt.Errorf(string(buf[1:])) + + case 'b': + tmp := uint64(buf[8])<<56 | uint64(buf[7])<<48 | uint64(buf[6])<<40 | uint64(buf[5])<<32 | + uint64(buf[4])<<24 | uint64(buf[3])<<16 | uint64(buf[2])<<8 | uint64(buf[1]) + dts := time.Duration(tmp) * time.Microsecond + + nalus, err := h264.AnnexBUnmarshal(buf[9:]) + if err != nil { + return err + } + + c.OnData(dts, nalus) + + default: return fmt.Errorf("unexpected output from pipe (%c)", buf[0]) } - - tmp := uint64(buf[8])<<56 | uint64(buf[7])<<48 | uint64(buf[6])<<40 | uint64(buf[5])<<32 | - uint64(buf[4])<<24 | uint64(buf[3])<<16 | uint64(buf[2])<<8 | uint64(buf[1]) - dts := time.Duration(tmp) * time.Microsecond - - nalus, err := h264.AnnexBUnmarshal(buf[9:]) - if err != nil { - return err - } - - c.OnData(dts, nalus) } } + +func (c *camera) error() chan error { + return c.readerDone +} diff --git a/internal/staticsources/rpicamera/camera_disabled.go b/internal/staticsources/rpicamera/camera_disabled.go index ccfc5a63..461649d6 100644 --- a/internal/staticsources/rpicamera/camera_disabled.go +++ b/internal/staticsources/rpicamera/camera_disabled.go @@ -21,3 +21,7 @@ func (c *camera) close() { func (c *camera) reloadParams(_ params) { } + +func (c *camera) error() chan error { + return nil +} diff --git a/internal/staticsources/rpicamera/mtxrpicamdownloader/VERSION b/internal/staticsources/rpicamera/mtxrpicamdownloader/VERSION index 20dd6322..45a16068 100644 --- a/internal/staticsources/rpicamera/mtxrpicamdownloader/VERSION +++ b/internal/staticsources/rpicamera/mtxrpicamdownloader/VERSION @@ -1 +1 @@ -v2.3.4 +v2.3.5 diff --git a/internal/staticsources/rpicamera/source.go b/internal/staticsources/rpicamera/source.go index 37e4b856..9331300f 100644 --- a/internal/staticsources/rpicamera/source.go +++ b/internal/staticsources/rpicamera/source.go @@ -135,6 +135,9 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { for { select { + case err := <-cam.error(): + return err + case cnf := <-params.ReloadConf: cam.reloadParams(paramsFromConf(s.LogLevel, cnf))