diff --git a/README.md b/README.md index 3b2f8c02..b5ea7ad9 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Live streams can be published to the server with: |RTMP clients (OBS Studio)|RTMP, RTMPS|H264, H265, MPEG4 Audio (AAC)| |RTMP servers and cameras|RTMP, RTMPS|H264, MPEG4 Audio (AAC)| |HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, H265, MPEG4 Audio (AAC), Opus| +|UDP/MPEG-TS streams|Unicast, broadcast, multicast|H264, H265, MPEG4 Audio (AAC), Opus| |Raspberry Pi Cameras||H264| And can be read from the server with: @@ -84,6 +85,7 @@ In the next months, the repository name and the docker image name will be change * [From a Raspberry Pi Camera](#from-a-raspberry-pi-camera) * [From OBS Studio](#from-obs-studio) * [From OpenCV](#from-opencv) + * [From a UDP stream](#from-a-udp-stream) * [Read from the server](#read-from-the-server) * [From VLC and Ubuntu](#from-vlc-and-ubuntu) * [RTSP protocol](#rtsp-protocol) @@ -751,6 +753,26 @@ while True: sleep(1 / fps) ``` +### From a UDP stream + +The server supports ingesting UDP/MPEG-TS packets (i.e. MPEG-TS packets sent with UDP). Packets can be unicast, broadcast or multicast. For instance, you can generate a multicast UDP/MPEG-TS stream with: + +``` +gst-launch-1.0 -v mpegtsmux name=mux alignment=1 ! udpsink host=238.0.0.1 port=1234 \ +videotestsrc ! video/x-raw,width=1280,height=720 ! x264enc speed-preset=ultrafast bitrate=6000 key-int-max=40 ! mux. \ +audiotestsrc ! audioconvert ! avenc_aac ! mux. +``` + +Edit `rtsp-simple-server.yml` and replace everything inside section `paths` with the following content: + +```yml +paths: + udp: + source: udp://238.0.0.1:1234 +``` + +After starting the server, the stream can be reached on `rtsp://localhost:8554/udp`. + ## Read from the server ### From VLC and Ubuntu diff --git a/go.mod b/go.mod index 6e99455a..12f7547a 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/pion/webrtc/v3 v3.1.47 github.com/stretchr/testify v1.8.2 golang.org/x/crypto v0.5.0 + golang.org/x/net v0.7.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -63,7 +64,6 @@ require ( github.com/ugorji/go/codec v1.2.9 // indirect github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect - golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect google.golang.org/protobuf v1.28.1 // indirect diff --git a/internal/conf/path.go b/internal/conf/path.go index f002173f..914fcc43 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -2,6 +2,7 @@ package conf import ( "fmt" + "net" gourl "net/url" "reflect" "regexp" @@ -178,6 +179,21 @@ func (pconf *PathConf) checkAndFillMissing(conf *Conf, name string) error { } } + case strings.HasPrefix(pconf.Source, "udp://"): + if pconf.Regexp != nil { + return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a HLS source. use another path") + } + + host, _, err := net.SplitHostPort(pconf.Source[len("udp://"):]) + if err != nil { + return fmt.Errorf("'%s' is not a valid UDP URL", pconf.Source) + } + + ip := net.ParseIP(host) + if ip == nil { + return fmt.Errorf("'%s' is not a valid IP", host) + } + case pconf.Source == "redirect": if pconf.SourceRedirect == "" { return fmt.Errorf("source redirect must be filled") @@ -324,3 +340,25 @@ func (pconf *PathConf) checkAndFillMissing(conf *Conf, name string) error { func (pconf *PathConf) Equal(other *PathConf) bool { return reflect.DeepEqual(pconf, other) } + +// HasStaticSource checks whether the path has a static source. +func (pconf PathConf) HasStaticSource() bool { + return strings.HasPrefix(pconf.Source, "rtsp://") || + strings.HasPrefix(pconf.Source, "rtsps://") || + strings.HasPrefix(pconf.Source, "rtmp://") || + strings.HasPrefix(pconf.Source, "rtmps://") || + strings.HasPrefix(pconf.Source, "http://") || + strings.HasPrefix(pconf.Source, "https://") || + strings.HasPrefix(pconf.Source, "udp://") || + pconf.Source == "rpiCamera" +} + +// HasOnDemandStaticSource checks whether the path has a on demand static source. +func (pconf PathConf) HasOnDemandStaticSource() bool { + return pconf.HasStaticSource() && pconf.SourceOnDemand +} + +// HasOnDemandPublisher checks whether the path has a on-demand publisher. +func (pconf PathConf) HasOnDemandPublisher() bool { + return pconf.RunOnDemand != "" +} diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 17f578df..d11d5b3f 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -61,14 +61,14 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan Formats: []format.Format{track}, } medias = append(medias, medi) - ctrack := track + cformat := track switch track.(type) { case *format.H264: medi.Type = media.TypeVideo c.OnData(track, func(pts time.Duration, unit interface{}) { - err := stream.writeData(medi, ctrack, &formatprocessor.UnitH264{ + err := stream.writeData(medi, cformat, &formatprocessor.UnitH264{ PTS: pts, AU: unit.([][]byte), NTP: time.Now(), @@ -82,7 +82,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan medi.Type = media.TypeVideo c.OnData(track, func(pts time.Duration, unit interface{}) { - err := stream.writeData(medi, ctrack, &formatprocessor.UnitH265{ + err := stream.writeData(medi, cformat, &formatprocessor.UnitH265{ PTS: pts, AU: unit.([][]byte), NTP: time.Now(), @@ -96,7 +96,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan medi.Type = media.TypeAudio c.OnData(track, func(pts time.Duration, unit interface{}) { - err := stream.writeData(medi, ctrack, &formatprocessor.UnitMPEG4Audio{ + err := stream.writeData(medi, cformat, &formatprocessor.UnitMPEG4Audio{ PTS: pts, AUs: [][]byte{unit.([]byte)}, NTP: time.Now(), @@ -110,7 +110,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan medi.Type = media.TypeAudio c.OnData(track, func(pts time.Duration, unit interface{}) { - err := stream.writeData(medi, ctrack, &formatprocessor.UnitOpus{ + err := stream.writeData(medi, cformat, &formatprocessor.UnitOpus{ PTS: pts, Frame: unit.([]byte), NTP: time.Now(), diff --git a/internal/core/path.go b/internal/core/path.go index b04a4287..8745c947 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -305,24 +305,6 @@ func (pa *path) log(level logger.Level, format string, args ...interface{}) { pa.parent.log(level, "[path "+pa.name+"] "+format, args...) } -func (pa *path) hasStaticSource() bool { - return strings.HasPrefix(pa.conf.Source, "rtsp://") || - strings.HasPrefix(pa.conf.Source, "rtsps://") || - strings.HasPrefix(pa.conf.Source, "rtmp://") || - strings.HasPrefix(pa.conf.Source, "rtmps://") || - strings.HasPrefix(pa.conf.Source, "http://") || - strings.HasPrefix(pa.conf.Source, "https://") || - pa.conf.Source == "rpiCamera" -} - -func (pa *path) hasOnDemandStaticSource() bool { - return pa.hasStaticSource() && pa.conf.SourceOnDemand -} - -func (pa *path) hasOnDemandPublisher() bool { - return pa.conf.RunOnDemand != "" -} - func (pa *path) safeConf() *conf.PathConf { pa.confMutex.RLock() defer pa.confMutex.RUnlock() @@ -335,7 +317,7 @@ func (pa *path) run() { if pa.conf.Source == "redirect" { pa.source = &sourceRedirect{} - } else if pa.hasStaticSource() { + } else if pa.conf.HasStaticSource() { pa.source = newSourceStatic( pa.conf, pa.readTimeout, @@ -414,7 +396,7 @@ func (pa *path) run() { } case newConf := <-pa.chReloadConf: - if pa.hasStaticSource() { + if pa.conf.HasStaticSource() { go pa.source.(*sourceStatic).reloadConf(newConf) } @@ -427,7 +409,7 @@ func (pa *path) run() { if err != nil { req.res <- pathSourceStaticSetReadyRes{err: err} } else { - if pa.hasOnDemandStaticSource() { + if pa.conf.HasOnDemandStaticSource() { pa.onDemandStaticSourceReadyTimer.Stop() pa.onDemandStaticSourceReadyTimer = newEmptyTimer() @@ -456,7 +438,7 @@ func (pa *path) run() { // in order to avoid a deadlock due to sourceStatic.stop() close(req.res) - if pa.hasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial { + if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial { pa.onDemandStaticSourceStop() } @@ -700,7 +682,7 @@ func (pa *path) doReaderRemove(r reader) { func (pa *path) doPublisherRemove() { if pa.stream != nil { - if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial { + if pa.conf.HasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial { pa.onDemandPublisherStop() } else { pa.sourceSetNotReady() @@ -725,7 +707,7 @@ func (pa *path) handleDescribe(req pathDescribeReq) { return } - if pa.hasOnDemandStaticSource() { + if pa.conf.HasOnDemandStaticSource() { if pa.onDemandStaticSourceState == pathOnDemandStateInitial { pa.onDemandStaticSourceStart() } @@ -733,7 +715,7 @@ func (pa *path) handleDescribe(req pathDescribeReq) { return } - if pa.hasOnDemandPublisher() { + if pa.conf.HasOnDemandPublisher() { if pa.onDemandPublisherState == pathOnDemandStateInitial { pa.onDemandPublisherStart() } @@ -804,7 +786,7 @@ func (pa *path) handlePublisherStart(req pathPublisherStartReq) { return } - if pa.hasOnDemandPublisher() { + if pa.conf.HasOnDemandPublisher() { pa.onDemandPublisherReadyTimer.Stop() pa.onDemandPublisherReadyTimer = newEmptyTimer() @@ -828,7 +810,7 @@ func (pa *path) handlePublisherStart(req pathPublisherStartReq) { func (pa *path) handlePublisherStop(req pathPublisherStopReq) { if req.author == pa.source && pa.stream != nil { - if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial { + if pa.conf.HasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial { pa.onDemandPublisherStop() } else { pa.sourceSetNotReady() @@ -844,11 +826,11 @@ func (pa *path) handleReaderRemove(req pathReaderRemoveReq) { close(req.res) if len(pa.readers) == 0 { - if pa.hasOnDemandStaticSource() { + if pa.conf.HasOnDemandStaticSource() { if pa.onDemandStaticSourceState == pathOnDemandStateReady { pa.onDemandStaticSourceScheduleClose() } - } else if pa.hasOnDemandPublisher() { + } else if pa.conf.HasOnDemandPublisher() { if pa.onDemandPublisherState == pathOnDemandStateReady { pa.onDemandPublisherScheduleClose() } @@ -862,7 +844,7 @@ func (pa *path) handleReaderAdd(req pathReaderAddReq) { return } - if pa.hasOnDemandStaticSource() { + if pa.conf.HasOnDemandStaticSource() { if pa.onDemandStaticSourceState == pathOnDemandStateInitial { pa.onDemandStaticSourceStart() } @@ -870,7 +852,7 @@ func (pa *path) handleReaderAdd(req pathReaderAddReq) { return } - if pa.hasOnDemandPublisher() { + if pa.conf.HasOnDemandPublisher() { if pa.onDemandPublisherState == pathOnDemandStateInitial { pa.onDemandPublisherStart() } @@ -884,13 +866,13 @@ func (pa *path) handleReaderAdd(req pathReaderAddReq) { func (pa *path) handleReaderAddPost(req pathReaderAddReq) { pa.readers[req.author] = struct{}{} - if pa.hasOnDemandStaticSource() { + if pa.conf.HasOnDemandStaticSource() { if pa.onDemandStaticSourceState == pathOnDemandStateClosing { pa.onDemandStaticSourceState = pathOnDemandStateReady pa.onDemandStaticSourceCloseTimer.Stop() pa.onDemandStaticSourceCloseTimer = newEmptyTimer() } - } else if pa.hasOnDemandPublisher() { + } else if pa.conf.HasOnDemandPublisher() { if pa.onDemandPublisherState == pathOnDemandStateClosing { pa.onDemandPublisherState = pathOnDemandStateReady pa.onDemandPublisherCloseTimer.Stop() diff --git a/internal/core/source_static.go b/internal/core/source_static.go index 8844c629..60274a08 100644 --- a/internal/core/source_static.go +++ b/internal/core/source_static.go @@ -81,6 +81,11 @@ func newSourceStatic( s.impl = newHLSSource( s) + case strings.HasPrefix(cnf.Source, "udp://"): + s.impl = newUDPSource( + readTimeout, + s) + case cnf.Source == "rpiCamera": s.impl = newRPICameraSource( s) diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go new file mode 100644 index 00000000..147de175 --- /dev/null +++ b/internal/core/udp_source.go @@ -0,0 +1,350 @@ +package core + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/aler9/gortsplib/v2/pkg/codecs/h264" + "github.com/aler9/gortsplib/v2/pkg/codecs/mpeg4audio" + "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/media" + "github.com/asticode/go-astits" + "github.com/bluenviron/gohlslib/pkg/mpegts" + "golang.org/x/net/ipv4" + + "github.com/aler9/rtsp-simple-server/internal/conf" + "github.com/aler9/rtsp-simple-server/internal/formatprocessor" + "github.com/aler9/rtsp-simple-server/internal/logger" +) + +const ( + multicastTTL = 16 +) + +var opusDurations = [32]int{ + 480, 960, 1920, 2880, /* Silk NB */ + 480, 960, 1920, 2880, /* Silk MB */ + 480, 960, 1920, 2880, /* Silk WB */ + 480, 960, /* Hybrid SWB */ + 480, 960, /* Hybrid FB */ + 120, 240, 480, 960, /* CELT NB */ + 120, 240, 480, 960, /* CELT NB */ + 120, 240, 480, 960, /* CELT NB */ + 120, 240, 480, 960, /* CELT NB */ +} + +func opusGetPacketDuration(pkt []byte) time.Duration { + if len(pkt) == 0 { + return 0 + } + + frameDuration := opusDurations[pkt[0]>>3] + + frameCount := 0 + switch pkt[0] & 3 { + case 0: + frameCount = 1 + case 1: + frameCount = 2 + case 2: + frameCount = 2 + case 3: + if len(pkt) < 2 { + return 0 + } + frameCount = int(pkt[1] & 63) + } + + return (time.Duration(frameDuration) * time.Duration(frameCount) * time.Millisecond) / 48 +} + +type readerFunc func([]byte) (int, error) + +func (rf readerFunc) Read(p []byte) (int, error) { + return rf(p) +} + +type udpSourceParent interface { + log(logger.Level, string, ...interface{}) + sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) +} + +type udpSource struct { + readTimeout conf.StringDuration + parent udpSourceParent +} + +func newUDPSource( + readTimeout conf.StringDuration, + parent udpSourceParent, +) *udpSource { + return &udpSource{ + readTimeout: readTimeout, + parent: parent, + } +} + +func (s *udpSource) Log(level logger.Level, format string, args ...interface{}) { + s.parent.log(level, "[udp source] "+format, args...) +} + +// run implements sourceStaticImpl. +func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error { + s.Log(logger.Debug, "connecting") + + hostPort := cnf.Source[len("udp://"):] + + pc, err := net.ListenPacket("udp", hostPort) + if err != nil { + return err + } + defer pc.Close() + + host, _, _ := net.SplitHostPort(hostPort) + ip := net.ParseIP(host) + + if ip.IsMulticast() { + p := ipv4.NewPacketConn(pc) + + err = p.SetMulticastTTL(multicastTTL) + if err != nil { + return err + } + + intfs, err := net.Interfaces() + if err != nil { + return err + } + + for _, intf := range intfs { + err := p.JoinGroup(&intf, &net.UDPAddr{IP: ip}) + if err != nil { + return err + } + } + } + + midbuffer := make([]byte, 0, 1472) // UDP MTU + midbufferPos := 0 + + readPacket := func(buf []byte) (int, error) { + if midbufferPos < len(midbuffer) { + n := copy(buf, midbuffer[midbufferPos:]) + midbufferPos += n + return n, nil + } + + mn, _, err := pc.ReadFrom(midbuffer[:cap(midbuffer)]) + if err != nil { + return 0, err + } + + if (mn % 188) != 0 { + return 0, fmt.Errorf("received packet with size %d not multiple of 188", mn) + } + + midbuffer = midbuffer[:mn] + n := copy(buf, midbuffer) + midbufferPos = n + return n, nil + } + + dem := astits.NewDemuxer( + context.Background(), + readerFunc(readPacket), + astits.DemuxerOptPacketSize(188)) + + readerErr := make(chan error) + + go func() { + readerErr <- func() error { + pc.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) + tracks, err := mpegts.FindTracks(dem) + if err != nil { + return err + } + + var medias media.Medias + mediaCallbacks := make(map[uint16]func(time.Duration, []byte), len(tracks)) + var stream *stream + + for _, track := range tracks { + medi := &media.Media{ + Formats: []format.Format{track.Format}, + } + medias = append(medias, medi) + cformat := track.Format + + switch track.Format.(type) { + case *format.H264: + medi.Type = media.TypeVideo + + mediaCallbacks[track.ES.ElementaryPID] = func(pts time.Duration, data []byte) { + au, err := h264.AnnexBUnmarshal(data) + if err != nil { + s.Log(logger.Warn, "%v", err) + return + } + + err = stream.writeData(medi, cformat, &formatprocessor.UnitH264{ + PTS: pts, + AU: au, + NTP: time.Now(), + }) + if err != nil { + s.Log(logger.Warn, "%v", err) + } + } + + case *format.H265: + medi.Type = media.TypeVideo + + mediaCallbacks[track.ES.ElementaryPID] = func(pts time.Duration, data []byte) { + au, err := h264.AnnexBUnmarshal(data) + if err != nil { + s.Log(logger.Warn, "%v", err) + return + } + + err = stream.writeData(medi, cformat, &formatprocessor.UnitH265{ + PTS: pts, + AU: au, + NTP: time.Now(), + }) + if err != nil { + s.Log(logger.Warn, "%v", err) + } + } + + case *format.MPEG4Audio: + medi.Type = media.TypeAudio + + mediaCallbacks[track.ES.ElementaryPID] = func(pts time.Duration, data []byte) { + var pkts mpeg4audio.ADTSPackets + err := pkts.Unmarshal(data) + if err != nil { + s.Log(logger.Warn, "%v", err) + return + } + + aus := make([][]byte, len(pkts)) + for i, pkt := range pkts { + aus[i] = pkt.AU + } + + err = stream.writeData(medi, cformat, &formatprocessor.UnitMPEG4Audio{ + PTS: pts, + AUs: aus, + NTP: time.Now(), + }) + if err != nil { + s.Log(logger.Warn, "%v", err) + } + } + + case *format.Opus: + medi.Type = media.TypeAudio + + mediaCallbacks[track.ES.ElementaryPID] = func(pts time.Duration, data []byte) { + pos := 0 + + for { + var au mpegts.OpusAccessUnit + n, err := au.Unmarshal(data[pos:]) + if err != nil { + s.Log(logger.Warn, "%v", err) + return + } + pos += n + + err = stream.writeData(medi, cformat, &formatprocessor.UnitOpus{ + PTS: pts, + Frame: au.Frame, + NTP: time.Now(), + }) + if err != nil { + s.Log(logger.Warn, "%v", err) + } + + if len(data[pos:]) == 0 { + break + } + + pts += opusGetPacketDuration(au.Frame) + } + } + } + } + + res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{ + medias: medias, + generateRTPPackets: true, + }) + if res.err != nil { + return res.err + } + + defer func() { + s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{}) + }() + + s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) + + stream = res.stream + var timedec *mpegts.TimeDecoder + + for { + pc.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) + data, err := dem.NextData() + if err != nil { + return err + } + + if data.PES == nil { + continue + } + + if data.PES.Header.OptionalHeader == nil || + data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorNoPTSOrDTS || + data.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorIsForbidden { + return fmt.Errorf("PTS is missing") + } + + var pts time.Duration + if timedec == nil { + timedec = mpegts.NewTimeDecoder(data.PES.Header.OptionalHeader.PTS.Base) + pts = 0 + } else { + pts = timedec.Decode(data.PES.Header.OptionalHeader.PTS.Base) + } + + cb, ok := mediaCallbacks[data.PID] + if !ok { + continue + } + + cb(pts, data.PES.Data) + } + }() + }() + + select { + case err := <-readerErr: + return err + + case <-ctx.Done(): + pc.Close() + <-readerErr + return fmt.Errorf("terminated") + } +} + +// apiSourceDescribe implements sourceStaticImpl. +func (*udpSource) apiSourceDescribe() interface{} { + return struct { + Type string `json:"type"` + }{"udpSource"} +} diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index ddb83a47..ac491e33 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -235,6 +235,7 @@ paths: # * rtmps://existing-url -> the stream is pulled from another RTMP server / camera with RTMPS # * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server # * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server with HTTPS + # * udp://ip:port -> the stream is pulled from UDP, by listening on the specified IP and port # * redirect -> the stream is provided by another path or server # * rpiCamera -> the stream is provided by a Raspberry Pi Camera source: publisher