From 1f11d95059a125a68018833f47eb552184d91cc9 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Fri, 22 Sep 2023 12:35:35 +0200 Subject: [PATCH] support recording M-JPEG tracks (#2391) --- README.md | 6 +- go.mod | 2 +- go.sum | 4 +- internal/formatprocessor/mjpeg.go | 113 ++++++++++++++++++++++++++ internal/formatprocessor/processor.go | 3 + internal/record/agent.go | 89 +++++++++++++++++++- internal/unit/mjpeg.go | 7 ++ 7 files changed, 217 insertions(+), 7 deletions(-) create mode 100644 internal/formatprocessor/mjpeg.go create mode 100644 internal/unit/mjpeg.go diff --git a/README.md b/README.md index 54283067..d1f01467 100644 --- a/README.md +++ b/README.md @@ -1155,7 +1155,7 @@ All available recording parameters are listed in the [sample configuration file] Currently the server supports recording tracks encoded with the following codecs: -* Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video +* Video: AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG * Audio: Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3 ### Forward streams to another server @@ -1187,7 +1187,7 @@ The command inserted into `runOnDemand` will start only when a client requests t ### Start on boot -#### Linux* +#### Linux Systemd is the service manager used by Ubuntu, Debian and many other Linux distributions, and allows to launch _MediaMTX_ on boot. @@ -1219,7 +1219,7 @@ sudo systemctl enable mediamtx sudo systemctl start mediamtx ``` -#### Windows* +#### Windows Download the [WinSW v2 executable](https://github.com/winsw/winsw/releases/download/v2.11.0/WinSW-x64.exe) and place it into the same folder of `mediamtx.exe`. diff --git a/go.mod b/go.mod index f6997d75..1ef21016 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/aler9/writerseeker v1.1.0 github.com/bluenviron/gohlslib v1.0.3 github.com/bluenviron/gortsplib/v4 v4.1.1-0.20230921145131-44da79f72d5e - github.com/bluenviron/mediacommon v1.3.1-0.20230919191723-607668055ebe + github.com/bluenviron/mediacommon v1.3.1-0.20230922102827-7fae03fb0e62 github.com/datarhei/gosrt v0.5.4 github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.9.1 diff --git a/go.sum b/go.sum index 16989169..bfbd561a 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/bluenviron/gohlslib v1.0.3 h1:FMHevlIrrZ67uzCXmlTSGflsfYREEtHb8L9BDyf github.com/bluenviron/gohlslib v1.0.3/go.mod h1:R/aIsSxLI61N0CVMjtcHqJouK6+Ddd5YIihcCr7IFIw= github.com/bluenviron/gortsplib/v4 v4.1.1-0.20230921145131-44da79f72d5e h1:Y8b0vKPQLerALedmNNBmxrJR6sBcnge+fQeCH+Kfh3A= github.com/bluenviron/gortsplib/v4 v4.1.1-0.20230921145131-44da79f72d5e/go.mod h1:0rVtKDafUA14isZuaBTm5+X9NPqLYs/lY8JIww6+doM= -github.com/bluenviron/mediacommon v1.3.1-0.20230919191723-607668055ebe h1:8kvIJfRXvv1Za1hdArKjvd/l8WCHJF+d+oLtANdFbr8= -github.com/bluenviron/mediacommon v1.3.1-0.20230919191723-607668055ebe/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM= +github.com/bluenviron/mediacommon v1.3.1-0.20230922102827-7fae03fb0e62 h1:kTUPhZIvCP8zRFWtsTx6Yl8OxDYvjBFcogo7yTkQwXI= +github.com/bluenviron/mediacommon v1.3.1-0.20230922102827-7fae03fb0e62/go.mod h1:/vlOVSebDwzdRtQONOKLua0fOSJg1tUDHpP+h9a0uqM= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/internal/formatprocessor/mjpeg.go b/internal/formatprocessor/mjpeg.go new file mode 100644 index 00000000..4dd5474a --- /dev/null +++ b/internal/formatprocessor/mjpeg.go @@ -0,0 +1,113 @@ +package formatprocessor //nolint:dupl + +import ( + "fmt" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmjpeg" + "github.com/pion/rtp" + + "github.com/bluenviron/mediamtx/internal/unit" +) + +type formatProcessorMJPEG struct { + udpMaxPayloadSize int + format *format.MJPEG + encoder *rtpmjpeg.Encoder + decoder *rtpmjpeg.Decoder +} + +func newMJPEG( + udpMaxPayloadSize int, + forma *format.MJPEG, + generateRTPPackets bool, +) (*formatProcessorMJPEG, error) { + t := &formatProcessorMJPEG{ + udpMaxPayloadSize: udpMaxPayloadSize, + format: forma, + } + + if generateRTPPackets { + err := t.createEncoder() + if err != nil { + return nil, err + } + } + + return t, nil +} + +func (t *formatProcessorMJPEG) createEncoder() error { + t.encoder = &rtpmjpeg.Encoder{ + PayloadMaxSize: t.udpMaxPayloadSize - 12, + } + return t.encoder.Init() +} + +func (t *formatProcessorMJPEG) ProcessUnit(uu unit.Unit) error { //nolint:dupl + u := uu.(*unit.MJPEG) + + // encode into RTP + pkts, err := t.encoder.Encode(u.Frame) + if err != nil { + return err + } + + ts := uint32(multiplyAndDivide(u.PTS, time.Duration(t.format.ClockRate()), time.Second)) + for _, pkt := range pkts { + pkt.Timestamp += ts + } + + u.RTPPackets = pkts + + return nil +} + +func (t *formatProcessorMJPEG) ProcessRTPPacket( //nolint:dupl + pkt *rtp.Packet, + ntp time.Time, + pts time.Duration, + hasNonRTSPReaders bool, +) (Unit, error) { + u := &unit.MJPEG{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: ntp, + PTS: pts, + }, + } + + // remove padding + pkt.Header.Padding = false + pkt.PaddingSize = 0 + + if pkt.MarshalSize() > t.udpMaxPayloadSize { + return nil, fmt.Errorf("payload size (%d) is greater than maximum allowed (%d)", + pkt.MarshalSize(), t.udpMaxPayloadSize) + } + + // decode from RTP + if hasNonRTSPReaders || t.decoder != nil { + if t.decoder == nil { + var err error + t.decoder, err = t.format.CreateDecoder() + if err != nil { + return nil, err + } + } + + frame, err := t.decoder.Decode(pkt) + if err != nil { + if err == rtpmjpeg.ErrNonStartingPacketAndNoPrevious || err == rtpmjpeg.ErrMorePacketsNeeded { + return u, nil + } + return nil, err + } + + u.Frame = frame + } + + // route packet as is + return u, nil +} diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 4968b870..a2fb68d1 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -69,6 +69,9 @@ func New( case *format.MPEG1Audio: return newMPEG1Audio(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.MJPEG: + return newMJPEG(udpMaxPayloadSize, forma, generateRTPPackets) + case *format.AC3: return newAC3(udpMaxPayloadSize, forma, generateRTPPackets) diff --git a/internal/record/agent.go b/internal/record/agent.go index 04ac5e36..93ac5156 100644 --- a/internal/record/agent.go +++ b/internal/record/agent.go @@ -13,6 +13,7 @@ import ( "github.com/bluenviron/mediacommon/pkg/codecs/av1" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/pkg/codecs/jpeg" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video" @@ -45,6 +46,61 @@ func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int { } } +func jpegExtractSize(image []byte) (int, int, error) { + l := len(image) + if l < 2 || image[0] != 0xFF || image[1] != jpeg.MarkerStartOfImage { + return 0, 0, fmt.Errorf("invalid header") + } + + image = image[2:] + + for { + if len(image) < 2 { + return 0, 0, fmt.Errorf("not enough bits") + } + + h0, h1 := image[0], image[1] + image = image[2:] + + if h0 != 0xFF { + return 0, 0, fmt.Errorf("invalid image") + } + + switch h1 { + case 0xE0, 0xE1, 0xE2, // JFIF + jpeg.MarkerDefineHuffmanTable, + jpeg.MarkerComment, + jpeg.MarkerDefineQuantizationTable, + jpeg.MarkerDefineRestartInterval: + mlen := int(image[0])<<8 | int(image[1]) + if len(image) < mlen { + return 0, 0, fmt.Errorf("not enough bits") + } + image = image[mlen:] + + case jpeg.MarkerStartOfFrame1: + mlen := int(image[0])<<8 | int(image[1]) + if len(image) < mlen { + return 0, 0, fmt.Errorf("not enough bits") + } + + var sof jpeg.StartOfFrame1 + err := sof.Unmarshal(image[2:mlen]) + if err != nil { + return 0, 0, err + } + + return sof.Width, sof.Height, nil + + case jpeg.MarkerStartOfScan: + return 0, 0, fmt.Errorf("SOF not found") + + default: + return 0, 0, fmt.Errorf("unknown marker: 0x%.2x", h1) + } + } +} + type sample struct { *fmp4.PartSample dts time.Duration @@ -533,7 +589,38 @@ func NewAgent( }) case *format.MJPEG: - // TODO + codec := &fmp4.CodecMJPEG{ + Width: 800, + Height: 600, + } + track := addTrack(codec) + + parsed := false + + stream.AddReader(r.writer, media, forma, func(u unit.Unit) error { + tunit := u.(*unit.MJPEG) + if tunit.Frame == nil { + return nil + } + + if !parsed { + parsed = true + width, height, err := jpegExtractSize(tunit.Frame) + if err != nil { + return err + } + codec.Width = width + codec.Height = height + r.updateCodecs() + } + + return track.record(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + }, + dts: tunit.PTS, + }) + }) case *format.Opus: codec := &fmp4.CodecOpus{ diff --git a/internal/unit/mjpeg.go b/internal/unit/mjpeg.go new file mode 100644 index 00000000..6c3b445f --- /dev/null +++ b/internal/unit/mjpeg.go @@ -0,0 +1,7 @@ +package unit + +// MJPEG is a M-JPEG data unit. +type MJPEG struct { + Base + Frame []byte +}