implement proxy mode; add conf.yml to release assets

This commit is contained in:
aler9 2020-06-30 15:12:39 +02:00
parent 70d5735fec
commit 9c3554d719
10 changed files with 1028 additions and 114 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
/tmp
/release

View File

@ -91,25 +91,27 @@ release-nodocker:
$(eval export CGO_ENABLED=0)
$(eval VERSION := $(shell git describe --tags))
$(eval GOBUILD := go build -ldflags '-X main.Version=$(VERSION)')
rm -rf tmp && mkdir tmp
rm -rf release && mkdir release
cp conf.yml tmp/
GOOS=windows GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server.exe
cd /tmp && zip -q $(PWD)/release/rtsp-simple-server_$(VERSION)_windows_amd64.zip rtsp-simple-server.exe
GOOS=windows GOARCH=amd64 $(GOBUILD) -o tmp/rtsp-simple-server.exe
cd tmp && zip -q $(PWD)/release/rtsp-simple-server_$(VERSION)_windows_amd64.zip rtsp-simple-server.exe conf.yml
GOOS=linux GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server
GOOS=linux GOARCH=amd64 $(GOBUILD) -o tmp/rtsp-simple-server
tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml
GOOS=linux GOARCH=arm GOARM=6 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm6.tar.gz --owner=0 --group=0 rtsp-simple-server
GOOS=linux GOARCH=arm GOARM=6 $(GOBUILD) -o tmp/rtsp-simple-server
tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm6.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml
GOOS=linux GOARCH=arm GOARM=7 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm7.tar.gz --owner=0 --group=0 rtsp-simple-server
GOOS=linux GOARCH=arm GOARM=7 $(GOBUILD) -o tmp/rtsp-simple-server
tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm7.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml
GOOS=linux GOARCH=arm64 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm64.tar.gz --owner=0 --group=0 rtsp-simple-server
GOOS=linux GOARCH=arm64 $(GOBUILD) -o tmp/rtsp-simple-server
tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm64.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml
GOOS=darwin GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_darwin_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server
GOOS=darwin GOARCH=amd64 $(GOBUILD) -o tmp/rtsp-simple-server
tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_darwin_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml
define DOCKERFILE_IMAGE
FROM --platform=linux/amd64 $(BASE_IMAGE) AS build
@ -125,7 +127,7 @@ RUN export CGO_ENABLED=0 $${OPTS} \
FROM scratch
COPY --from=build /rtsp-simple-server /rtsp-simple-server
ENTRYPOINT [ "/rtsp-simple-server"]
ENTRYPOINT [ "/rtsp-simple-server" ]
endef
export DOCKERFILE_IMAGE

View File

@ -3,12 +3,13 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/aler9/rtsp-simple-server)](https://goreportcard.com/report/github.com/aler9/rtsp-simple-server)
[![Build Status](https://travis-ci.org/aler9/rtsp-simple-server.svg?branch=master)](https://travis-ci.org/aler9/rtsp-simple-server)
[![Docker Hub](https://img.shields.io/badge/docker-aler9%2Frtsp--simple--proxy-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server)
[![Docker Hub](https://img.shields.io/badge/docker-aler9%2Frtsp--simple--server-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server)
_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server, a software that allows multiple users to publish and read live video and audio streams. RTSP is a standardized protocol that defines how to perform these operations with the help of a server, that is contacted by both readers and publishers in order to negotiate a streaming protocol. The server is then responsible of relaying the publisher streams to the readers.
_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server and RTSP proxy, a software that allows multiple users to publish and read live video and audio streams. RTSP is a standardized protocol that defines how to perform these operations with the help of a server, that is contacted by both readers and publishers in order to negotiate a streaming protocol. The server is then responsible of relaying the publisher streams to the readers.
Features:
* Read and publish streams via UDP and TCP
* Pull and serve streams from other RTSP servers (RTSP proxy)
* Each stream can have multiple video and audio tracks, encoded in any format
* Publish multiple streams at once, each in a separate path, that can be read by multiple users
* Supports the RTP/RTCP streaming protocol
@ -66,50 +67,33 @@ docker run --rm -it -v $PWD/conf.yml:/conf.yml -p 8554:8554 aler9/rtsp-simple-se
#### Full configuration file
To change the configuration, it's enough to create a file named `conf.yml` in the same folder of the executable. The default configuration is the following:
To change the configuration, it's enough to edit the file `conf.yml`, provided with the executable. The default configuration is [available here](conf.yml).
#### Usage as an RTSP Proxy
An RTSP proxy is usually deployed in one of these scenarios:
* when there are multiple users that are receiving a stream and the bandwidth is limited, so the proxy is used to receive the stream once. Users can then connect to the proxy instead of the original source.
* when there's a NAT / firewall between a stream and the users, in this case the proxy is installed in the NAT and makes the stream available to the outside world.
Edit `conf.yml` and replace everything inside section `paths` with the following content:
```yaml
# supported stream protocols (the handshake is always performed with TCP)
protocols: [udp, tcp]
# port of the TCP rtsp listener
rtspPort: 8554
# port of the UDP rtp listener
rtpPort: 8000
# port of the UDP rtcp listener
rtcpPort: 8001
# timeout of read operations
readTimeout: 5s
# timeout of write operations
writeTimeout: 5s
# script to run when a client connects
preScript:
# script to run when a client disconnects
postScript:
# enable pprof on port 9999 to monitor performance
pprof: false
# these settings are path-dependent. The settings under the path 'all' are
# applied to all paths that do not match a specific entry.
paths:
all:
# username required to publish
publishUser:
# password required to publish
publishPass:
# IPs or networks (x.x.x.x/24) allowed to publish
publishIps: []
# username required to read
readUser:
# password required to read
readPass:
# IPs or networks (x.x.x.x/24) allowed to read
readIps: []
# insert one or more entries
proxied:
# url of the source stream, in the format rtsp://user:pass@host:port/path
source: rtsp://original-url
```
Start the server:
```
./rtsp-simple-server
```
Users can then connect to `rtsp://localhost:8554/proxied`, instead of connecting to the original url.
#### Publisher authentication
Create a file named `conf.yml` in the same folder of the executable, with the following content:
Edit `conf.yml` and replace everything inside section `paths` with the following content:
```yaml
paths:
all:

44
conf.yml Normal file
View File

@ -0,0 +1,44 @@
# supported stream protocols (the handshake is always performed with TCP)
protocols: [udp, tcp]
# port of the TCP rtsp listener
rtspPort: 8554
# port of the UDP rtp listener
rtpPort: 8000
# port of the UDP rtcp listener
rtcpPort: 8001
# timeout of read operations
readTimeout: 5s
# timeout of write operations
writeTimeout: 5s
# script to run when a client connects
preScript:
# script to run when a client disconnects
postScript:
# enable pprof on port 9999 to monitor performance
pprof: false
# these settings are path-dependent. The settings under the path 'all' are
# applied to all paths that do not match a specific entry.
paths:
all:
# source of the stream - this can be:
# * record -> the stream is provided by a client through the RECORD command (like ffmpeg)
# * rtsp://url -> the stream is pulled from another RTSP server
source: record
# if the source is an RTSP url, this is the protocol that will be used to pull the stream
sourceProtocol: udp
# username required to publish
publishUser:
# password required to publish
publishPass:
# IPs or networks (x.x.x.x/24) allowed to publish
publishIps: []
# username required to read
readUser:
# password required to read
readPass:
# IPs or networks (x.x.x.x/24) allowed to read
readIps: []

202
main.go
View File

@ -85,19 +85,17 @@ type programEventClientClose struct {
func (programEventClientClose) isProgramEvent() {}
type programEventClientGetStreamSdp struct {
type programEventClientDescribe struct {
path string
res chan []byte
}
func (programEventClientGetStreamSdp) isProgramEvent() {}
func (programEventClientDescribe) isProgramEvent() {}
type programEventClientAnnounce struct {
res chan error
client *serverClient
path string
sdpText []byte
sdpParsed *sdp.Message
res chan error
client *serverClient
path string
}
func (programEventClientAnnounce) isProgramEvent() {}
@ -151,36 +149,59 @@ type programEventClientRecord struct {
func (programEventClientRecord) isProgramEvent() {}
type programEventFrameUdp struct {
type programEventClientFrameUdp struct {
trackFlowType trackFlowType
addr *net.UDPAddr
buf []byte
}
func (programEventFrameUdp) isProgramEvent() {}
func (programEventClientFrameUdp) isProgramEvent() {}
type programEventFrameTcp struct {
type programEventClientFrameTcp struct {
path string
trackId int
trackFlowType trackFlowType
buf []byte
}
func (programEventFrameTcp) isProgramEvent() {}
func (programEventClientFrameTcp) isProgramEvent() {}
type programEventStreamerReady struct {
streamer *streamer
}
func (programEventStreamerReady) isProgramEvent() {}
type programEventStreamerNotReady struct {
streamer *streamer
}
func (programEventStreamerNotReady) isProgramEvent() {}
type programEventStreamerFrame struct {
streamer *streamer
trackId int
trackFlowType trackFlowType
buf []byte
}
func (programEventStreamerFrame) isProgramEvent() {}
type programEventTerminate struct{}
func (programEventTerminate) isProgramEvent() {}
type ConfPath struct {
PublishUser string `yaml:"publishUser"`
PublishPass string `yaml:"publishPass"`
PublishIps []string `yaml:"publishIps"`
publishIps []interface{}
ReadUser string `yaml:"readUser"`
ReadPass string `yaml:"readPass"`
ReadIps []string `yaml:"readIps"`
readIps []interface{}
Source string `yaml:"source"`
SourceProtocol string `yaml:"sourceProtocol"`
PublishUser string `yaml:"publishUser"`
PublishPass string `yaml:"publishPass"`
PublishIps []string `yaml:"publishIps"`
publishIps []interface{}
ReadUser string `yaml:"readUser"`
ReadPass string `yaml:"readPass"`
ReadIps []string `yaml:"readIps"`
readIps []interface{}
}
type conf struct {
@ -230,6 +251,13 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
}
}
// a publisher can be either a serverClient or a streamer
type publisher interface {
publisherIsReady() bool
publisherSdpText() []byte
publisherSdpParsed() *sdp.Message
}
type program struct {
conf *conf
protocols map[streamProtocol]struct{}
@ -237,7 +265,8 @@ type program struct {
udplRtp *serverUdpListener
udplRtcp *serverUdpListener
clients map[*serverClient]struct{}
publishers map[string]*serverClient
streamers []*streamer
publishers map[string]publisher
publisherCount int
receiverCount int
@ -313,7 +342,20 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) {
}
}
for _, pconf := range conf.Paths {
p := &program{
conf: conf,
protocols: protocols,
clients: make(map[*serverClient]struct{}),
publishers: make(map[string]publisher),
events: make(chan programEvent),
done: make(chan struct{}),
}
for path, pconf := range conf.Paths {
if pconf.Source == "" {
pconf.Source = "record"
}
if pconf.PublishUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishUser) {
return nil, fmt.Errorf("publish username must be alphanumeric")
@ -349,15 +391,24 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) {
if err != nil {
return nil, err
}
}
p := &program{
conf: conf,
protocols: protocols,
clients: make(map[*serverClient]struct{}),
publishers: make(map[string]*serverClient),
events: make(chan programEvent),
done: make(chan struct{}),
if pconf.Source != "record" {
if path == "all" {
return nil, fmt.Errorf("path 'all' cannot have a RTSP source")
}
if pconf.SourceProtocol == "" {
pconf.SourceProtocol = "udp"
}
s, err := newStreamer(p, path, pconf.Source, pconf.SourceProtocol)
if err != nil {
return nil, err
}
p.streamers = append(p.streamers, s)
p.publishers[path] = s
}
}
p.log("rtsp-simple-server %s", Version)
@ -392,6 +443,9 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) {
go p.udplRtp.run()
go p.udplRtcp.run()
go p.tcpl.run()
for _, s := range p.streamers {
go s.run()
}
go p.run()
return p, nil
@ -424,11 +478,13 @@ outer:
if pub, ok := p.publishers[evt.client.path]; ok && pub == evt.client {
delete(p.publishers, evt.client.path)
// if the publisher has disconnected
// close all other connections that share the same path
for oc := range p.clients {
if oc.path == evt.client.path {
go oc.close()
// if the publisher has disconnected and was ready
// close all other clients that share the same path
if pub.publisherIsReady() {
for oc := range p.clients {
if oc.path == evt.client.path {
go oc.close()
}
}
}
}
@ -445,36 +501,37 @@ outer:
evt.client.log("disconnected")
close(evt.done)
case programEventClientGetStreamSdp:
case programEventClientDescribe:
pub, ok := p.publishers[evt.path]
if !ok {
if !ok || !pub.publisherIsReady() {
evt.res <- nil
continue
}
evt.res <- pub.streamSdpText
evt.res <- pub.publisherSdpText()
case programEventClientAnnounce:
_, ok := p.publishers[evt.path]
if ok {
evt.res <- fmt.Errorf("another client is already publishing on path '%s'", evt.path)
evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path)
continue
}
evt.client.path = evt.path
evt.client.streamSdpText = evt.sdpText
evt.client.streamSdpParsed = evt.sdpParsed
evt.client.state = _CLIENT_STATE_ANNOUNCE
p.publishers[evt.path] = evt.client
evt.res <- nil
case programEventClientSetupPlay:
pub, ok := p.publishers[evt.path]
if !ok {
if !ok || !pub.publisherIsReady() {
evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.path)
continue
}
if len(evt.client.streamTracks) >= len(pub.streamSdpParsed.Medias) {
sdpParsed := pub.publisherSdpParsed()
if len(evt.client.streamTracks) >= len(sdpParsed.Medias) {
evt.res <- fmt.Errorf("all the tracks have already been setup")
continue
}
@ -499,12 +556,14 @@ outer:
case programEventClientPlay1:
pub, ok := p.publishers[evt.client.path]
if !ok {
if !ok || !pub.publisherIsReady() {
evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.client.path)
continue
}
if len(evt.client.streamTracks) != len(pub.streamSdpParsed.Medias) {
sdpParsed := pub.publisherSdpParsed()
if len(evt.client.streamTracks) != len(sdpParsed.Medias) {
evt.res <- fmt.Errorf("not all tracks have been setup")
continue
}
@ -526,40 +585,65 @@ outer:
evt.client.state = _CLIENT_STATE_RECORD
evt.res <- nil
case programEventFrameUdp:
case programEventClientFrameUdp:
// find publisher and track id from ip and port
pub, trackId := func() (*serverClient, int) {
cl, trackId := func() (*serverClient, int) {
for _, pub := range p.publishers {
if pub.streamProtocol != _STREAM_PROTOCOL_UDP ||
pub.state != _CLIENT_STATE_RECORD ||
!pub.ip().Equal(evt.addr.IP) {
cl, ok := pub.(*serverClient)
if !ok {
continue
}
for i, t := range pub.streamTracks {
if cl.streamProtocol != _STREAM_PROTOCOL_UDP ||
cl.state != _CLIENT_STATE_RECORD ||
!cl.ip().Equal(evt.addr.IP) {
continue
}
for i, t := range cl.streamTracks {
if evt.trackFlowType == _TRACK_FLOW_RTP {
if t.rtpPort == evt.addr.Port {
return pub, i
return cl, i
}
} else {
if t.rtcpPort == evt.addr.Port {
return pub, i
return cl, i
}
}
}
}
return nil, -1
}()
if pub == nil {
if cl == nil {
continue
}
pub.udpLastFrameTime = time.Now()
p.forwardTrack(pub.path, trackId, evt.trackFlowType, evt.buf)
cl.udpLastFrameTime = time.Now()
p.forwardTrack(cl.path, trackId, evt.trackFlowType, evt.buf)
case programEventFrameTcp:
case programEventClientFrameTcp:
p.forwardTrack(evt.path, evt.trackId, evt.trackFlowType, evt.buf)
case programEventStreamerReady:
evt.streamer.ready = true
p.publisherCount += 1
evt.streamer.log("ready")
case programEventStreamerNotReady:
evt.streamer.ready = false
p.publisherCount -= 1
evt.streamer.log("not ready")
// close all clients that share the same path
for oc := range p.clients {
if oc.path == evt.streamer.path {
go oc.close()
}
}
case programEventStreamerFrame:
p.forwardTrack(evt.streamer.path, evt.trackId, evt.trackFlowType, evt.buf)
case programEventTerminate:
break outer
}
@ -571,7 +655,7 @@ outer:
case programEventClientClose:
close(evt.done)
case programEventClientGetStreamSdp:
case programEventClientDescribe:
evt.res <- nil
case programEventClientAnnounce:
@ -598,6 +682,10 @@ outer:
}
}()
for _, s := range p.streamers {
s.close()
}
p.tcpl.close()
p.udplRtcp.close()
p.udplRtp.close()

View File

@ -229,3 +229,70 @@ func TestReadAuth(t *testing.T) {
require.Equal(t, "all right\n", string(cnt2.stdout.Bytes()))
}
func TestProxy(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
} {
t.Run(proto, func(t *testing.T) {
stdin := []byte("\n" +
"paths:\n" +
" all:\n" +
" readUser: testuser\n" +
" readPass: testpass\n")
p1, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin))
require.NoError(t, err)
defer p1.close()
time.Sleep(1 * time.Second)
cnt1, err := newContainer("ffmpeg", "source", []string{
"-hide_banner",
"-loglevel", "panic",
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://" + ownDockerIp + ":8554/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
stdin = []byte("\n" +
"rtspPort: 8555\n" +
"rtpPort: 8100\n" +
"rtcpPort: 8101\n" +
"\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://testuser:testpass@localhost:8554/teststream\n" +
" sourceProtocol: " + proto + "\n")
p2, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin))
require.NoError(t, err)
defer p2.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-hide_banner",
"-loglevel", "panic",
"-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIp + ":8555/proxied",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
cnt2.wait()
require.Equal(t, "all right\n", string(cnt2.stdout.Bytes()))
})
}
}

View File

@ -123,6 +123,18 @@ func (c *serverClient) zone() string {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone
}
func (c *serverClient) publisherIsReady() bool {
return c.state == _CLIENT_STATE_RECORD
}
func (c *serverClient) publisherSdpText() []byte {
return c.streamSdpText
}
func (c *serverClient) publisherSdpParsed() *sdp.Message {
return c.streamSdpParsed
}
func (c *serverClient) run() {
if c.p.conf.PreScript != "" {
preScript := exec.Command(c.p.conf.PreScript)
@ -367,7 +379,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
res := make(chan []byte)
c.p.events <- programEventClientGetStreamSdp{path, res}
c.p.events <- programEventClientDescribe{path, res}
sdp := <-res
if sdp == nil {
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("no one is streaming on path '%s'", path))
@ -431,13 +443,16 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
res := make(chan error)
c.p.events <- programEventClientAnnounce{res, c, path, req.Content, sdpParsed}
c.p.events <- programEventClientAnnounce{res, c, path}
err = <-res
if err != nil {
c.writeResError(req, gortsplib.StatusBadRequest, err)
return false
}
c.streamSdpText = req.Content
c.streamSdpParsed = sdpParsed
c.conn.WriteResponse(&gortsplib.Response{
StatusCode: gortsplib.StatusOK,
Header: gortsplib.Header{
@ -869,7 +884,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
return false
}
c.p.events <- programEventFrameTcp{
c.p.events <- programEventClientFrameTcp{
c.path,
trackId,
trackFlowType,

View File

@ -81,7 +81,7 @@ func (l *serverUdpListener) run() {
break
}
l.p.events <- programEventFrameUdp{
l.p.events <- programEventClientFrameUdp{
l.trackFlowType,
addr,
buf[:n],

95
streamer-udpl.go Normal file
View File

@ -0,0 +1,95 @@
package main
import (
"net"
"time"
)
type streamerUdpListenerState int
const (
_UDPL_STATE_STARTING streamerUdpListenerState = iota
_UDPL_STATE_RUNNING
)
type streamerUdpListener struct {
p *program
streamer *streamer
trackId int
trackFlowType trackFlowType
publisherIp net.IP
publisherPort int
nconn *net.UDPConn
running bool
readBuf1 []byte
readBuf2 []byte
readCurBuf bool
lastFrameTime time.Time
done chan struct{}
}
func newStreamerUdpListener(p *program, port int, streamer *streamer,
trackId int, trackFlowType trackFlowType, publisherIp net.IP) (*streamerUdpListener, error) {
nconn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: port,
})
if err != nil {
return nil, err
}
l := &streamerUdpListener{
p: p,
streamer: streamer,
trackId: trackId,
trackFlowType: trackFlowType,
publisherIp: publisherIp,
nconn: nconn,
readBuf1: make([]byte, 2048),
readBuf2: make([]byte, 2048),
lastFrameTime: time.Now(),
done: make(chan struct{}),
}
return l, nil
}
func (l *streamerUdpListener) close() {
l.nconn.Close()
if l.running {
<-l.done
}
}
func (l *streamerUdpListener) start() {
l.running = true
go l.run()
}
func (l *streamerUdpListener) run() {
for {
var buf []byte
if !l.readCurBuf {
buf = l.readBuf1
} else {
buf = l.readBuf2
}
l.readCurBuf = !l.readCurBuf
n, addr, err := l.nconn.ReadFromUDP(buf)
if err != nil {
break
}
if !l.publisherIp.Equal(addr.IP) || addr.Port != l.publisherPort {
continue
}
l.lastFrameTime = time.Now()
l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.trackFlowType, buf[:n]}
}
close(l.done)
}

618
streamer.go Normal file
View File

@ -0,0 +1,618 @@
package main
import (
"fmt"
"math/rand"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/aler9/gortsplib"
"gortc.io/sdp"
)
const (
_DIAL_TIMEOUT = 10 * time.Second
_RETRY_INTERVAL = 5 * time.Second
_CHECK_STREAM_INTERVAL = 6 * time.Second
_STREAM_DEAD_AFTER = 5 * time.Second
_KEEPALIVE_INTERVAL = 60 * time.Second
)
type streamerUdpListenerPair struct {
udplRtp *streamerUdpListener
udplRtcp *streamerUdpListener
}
type streamer struct {
p *program
path string
ur *url.URL
proto streamProtocol
ready bool
clientSdpParsed *sdp.Message
serverSdpText []byte
serverSdpParsed *sdp.Message
firstTime bool
readBuf1 []byte
readBuf2 []byte
readCurBuf bool
terminate chan struct{}
done chan struct{}
}
func newStreamer(p *program, path string, source string, sourceProtocol string) (*streamer, error) {
ur, err := url.Parse(source)
if err != nil {
return nil, fmt.Errorf("'%s' is not a valid source not an RTSP url", source)
}
if ur.Scheme != "rtsp" {
return nil, fmt.Errorf("'%s' is not a valid RTSP url", source)
}
if ur.User != nil {
pass, _ := ur.User.Password()
user := ur.User.Username()
if user != "" && pass == "" ||
user == "" && pass != "" {
fmt.Errorf("username and password must be both provided")
}
}
proto, err := func() (streamProtocol, error) {
switch sourceProtocol {
case "udp":
return _STREAM_PROTOCOL_UDP, nil
case "tcp":
return _STREAM_PROTOCOL_TCP, nil
}
return streamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol)
}()
if err != nil {
return nil, err
}
s := &streamer{
p: p,
path: path,
ur: ur,
proto: proto,
firstTime: true,
readBuf1: make([]byte, 0, 512*1024),
readBuf2: make([]byte, 0, 512*1024),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
return s, nil
}
func (s *streamer) log(format string, args ...interface{}) {
s.p.log("[streamer "+s.path+"] "+format, args...)
}
func (s *streamer) publisherIsReady() bool {
return s.ready
}
func (s *streamer) publisherSdpText() []byte {
return s.serverSdpText
}
func (s *streamer) publisherSdpParsed() *sdp.Message {
return s.serverSdpParsed
}
func (s *streamer) run() {
for {
ok := s.do()
if !ok {
break
}
}
close(s.done)
}
func (s *streamer) do() bool {
if s.firstTime {
s.firstTime = false
} else {
t := time.NewTimer(_RETRY_INTERVAL)
select {
case <-s.terminate:
return false
case <-t.C:
}
}
s.log("initializing with protocol %s", s.proto)
var nconn net.Conn
var err error
dialDone := make(chan struct{})
go func() {
nconn, err = net.DialTimeout("tcp", s.ur.Host, _DIAL_TIMEOUT)
close(dialDone)
}()
select {
case <-s.terminate:
return false
case <-dialDone:
}
if err != nil {
s.log("ERR: %s", err)
return true
}
defer nconn.Close()
conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{
NConn: nconn,
Username: func() string {
if s.ur.User != nil {
return s.ur.User.Username()
}
return ""
}(),
Password: func() string {
if s.ur.User != nil {
pass, _ := s.ur.User.Password()
return pass
}
return ""
}(),
ReadTimeout: s.p.conf.ReadTimeout,
WriteTimeout: s.p.conf.WriteTimeout,
})
if err != nil {
s.log("ERR: %s", err)
return true
}
res, err := conn.WriteRequest(&gortsplib.Request{
Method: gortsplib.OPTIONS,
Url: &url.URL{
Scheme: "rtsp",
Host: s.ur.Host,
Path: "/",
},
})
if err != nil {
s.log("ERR: %s", err)
return true
}
// OPTIONS is not available in some cameras
if res.StatusCode != gortsplib.StatusOK && res.StatusCode != gortsplib.StatusNotFound {
s.log("ERR: OPTIONS returned code %d (%s)", res.StatusCode, res.StatusMessage)
return true
}
res, err = conn.WriteRequest(&gortsplib.Request{
Method: gortsplib.DESCRIBE,
Url: &url.URL{
Scheme: "rtsp",
Host: s.ur.Host,
Path: s.ur.Path,
RawQuery: s.ur.RawQuery,
},
})
if err != nil {
s.log("ERR: %s", err)
return true
}
if res.StatusCode != gortsplib.StatusOK {
s.log("ERR: DESCRIBE returned code %d (%s)", res.StatusCode, res.StatusMessage)
return true
}
contentType, ok := res.Header["Content-Type"]
if !ok || len(contentType) != 1 {
s.log("ERR: Content-Type not provided")
return true
}
if contentType[0] != "application/sdp" {
s.log("ERR: wrong Content-Type, expected application/sdp")
return true
}
clientSdpParsed, err := gortsplib.SDPParse(res.Content)
if err != nil {
s.log("ERR: invalid SDP: %s", err)
return true
}
// create a filtered SDP that is used by the server (not by the client)
serverSdpParsed, serverSdpText := gortsplib.SDPFilter(clientSdpParsed, res.Content)
s.clientSdpParsed = clientSdpParsed
s.serverSdpText = serverSdpText
s.serverSdpParsed = serverSdpParsed
if s.proto == _STREAM_PROTOCOL_UDP {
return s.runUdp(conn)
} else {
return s.runTcp(conn)
}
}
func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool {
publisherIp := conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
var streamerUdpListenerPairs []streamerUdpListenerPair
defer func() {
for _, pair := range streamerUdpListenerPairs {
pair.udplRtp.close()
pair.udplRtcp.close()
}
}()
for i, media := range s.clientSdpParsed.Medias {
var rtpPort int
var rtcpPort int
var udplRtp *streamerUdpListener
var udplRtcp *streamerUdpListener
func() {
for {
// choose two consecutive ports in range 65536-10000
// rtp must be pair and rtcp odd
rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort = rtpPort + 1
var err error
udplRtp, err = newStreamerUdpListener(s.p, rtpPort, s, i,
_TRACK_FLOW_RTP, publisherIp)
if err != nil {
continue
}
udplRtcp, err = newStreamerUdpListener(s.p, rtcpPort, s, i,
_TRACK_FLOW_RTCP, publisherIp)
if err != nil {
udplRtp.close()
continue
}
return
}
}()
res, err := conn.WriteRequest(&gortsplib.Request{
Method: gortsplib.SETUP,
Url: func() *url.URL {
control := media.Attributes.Value("control")
// no control attribute
if control == "" {
return s.ur
}
// absolute path
if strings.HasPrefix(control, "rtsp://") {
ur, err := url.Parse(control)
if err != nil {
return s.ur
}
return ur
}
// relative path
return &url.URL{
Scheme: "rtsp",
Host: s.ur.Host,
Path: func() string {
ret := s.ur.Path
if len(ret) == 0 || ret[len(ret)-1] != '/' {
ret += "/"
}
control := media.Attributes.Value("control")
if control != "" {
ret += control
} else {
ret += "trackID=" + strconv.FormatInt(int64(i+1), 10)
}
return ret
}(),
RawQuery: s.ur.RawQuery,
}
}(),
Header: gortsplib.Header{
"Transport": []string{strings.Join([]string{
"RTP/AVP/UDP",
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
}, ";")},
},
})
if err != nil {
s.log("ERR: %s", err)
udplRtp.close()
udplRtcp.close()
return true
}
if res.StatusCode != gortsplib.StatusOK {
s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage)
udplRtp.close()
udplRtcp.close()
return true
}
tsRaw, ok := res.Header["Transport"]
if !ok || len(tsRaw) != 1 {
s.log("ERR: transport header not provided")
udplRtp.close()
udplRtcp.close()
return true
}
th := gortsplib.ReadHeaderTransport(tsRaw[0])
rtpServerPort, rtcpServerPort := th.GetPorts("server_port")
if rtpServerPort == 0 {
s.log("ERR: server ports not provided")
udplRtp.close()
udplRtcp.close()
return true
}
udplRtp.publisherPort = rtpServerPort
udplRtcp.publisherPort = rtcpServerPort
streamerUdpListenerPairs = append(streamerUdpListenerPairs, streamerUdpListenerPair{
udplRtp: udplRtp,
udplRtcp: udplRtcp,
})
}
res, err := conn.WriteRequest(&gortsplib.Request{
Method: gortsplib.PLAY,
Url: &url.URL{
Scheme: "rtsp",
Host: s.ur.Host,
Path: s.ur.Path,
RawQuery: s.ur.RawQuery,
},
})
if err != nil {
s.log("ERR: %s", err)
return true
}
if res.StatusCode != gortsplib.StatusOK {
s.log("ERR: PLAY returned code %d (%s)", res.StatusCode, res.StatusMessage)
return true
}
for _, pair := range streamerUdpListenerPairs {
pair.udplRtp.start()
pair.udplRtcp.start()
}
tickerSendKeepalive := time.NewTicker(_KEEPALIVE_INTERVAL)
defer tickerSendKeepalive.Stop()
tickerCheckStream := time.NewTicker(_CHECK_STREAM_INTERVAL)
defer tickerSendKeepalive.Stop()
s.p.events <- programEventStreamerReady{s}
defer func() {
s.p.events <- programEventStreamerNotReady{s}
}()
for {
select {
case <-s.terminate:
return false
case <-tickerSendKeepalive.C:
_, err = conn.WriteRequest(&gortsplib.Request{
Method: gortsplib.OPTIONS,
Url: &url.URL{
Scheme: "rtsp",
Host: s.ur.Host,
Path: "/",
},
})
if err != nil {
s.log("ERR: %s", err)
return true
}
case <-tickerCheckStream.C:
lastFrameTime := time.Time{}
for _, pair := range streamerUdpListenerPairs {
lft := pair.udplRtp.lastFrameTime
if lft.After(lastFrameTime) {
lastFrameTime = lft
}
lft = pair.udplRtp.lastFrameTime
if lft.After(lastFrameTime) {
lastFrameTime = lft
}
}
if time.Since(lastFrameTime) >= _STREAM_DEAD_AFTER {
s.log("ERR: stream is dead")
return true
}
}
}
}
func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool {
for i, media := range s.clientSdpParsed.Medias {
interleaved := fmt.Sprintf("interleaved=%d-%d", (i * 2), (i*2)+1)
res, err := conn.WriteRequest(&gortsplib.Request{
Method: gortsplib.SETUP,
Url: func() *url.URL {
control := media.Attributes.Value("control")
// no control attribute
if control == "" {
return s.ur
}
// absolute path
if strings.HasPrefix(control, "rtsp://") {
ur, err := url.Parse(control)
if err != nil {
return s.ur
}
return ur
}
// relative path
return &url.URL{
Scheme: "rtsp",
Host: s.ur.Host,
Path: func() string {
ret := s.ur.Path
if len(ret) == 0 || ret[len(ret)-1] != '/' {
ret += "/"
}
control := media.Attributes.Value("control")
if control != "" {
ret += control
} else {
ret += "trackID=" + strconv.FormatInt(int64(i+1), 10)
}
return ret
}(),
RawQuery: s.ur.RawQuery,
}
}(),
Header: gortsplib.Header{
"Transport": []string{strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
interleaved,
}, ";")},
},
})
if err != nil {
s.log("ERR: %s", err)
return true
}
if res.StatusCode != gortsplib.StatusOK {
s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage)
return true
}
tsRaw, ok := res.Header["Transport"]
if !ok || len(tsRaw) != 1 {
s.log("ERR: transport header not provided")
return true
}
th := gortsplib.ReadHeaderTransport(tsRaw[0])
_, ok = th[interleaved]
if !ok {
s.log("ERR: transport header does not have %s (%s)", interleaved, tsRaw[0])
return true
}
}
err := conn.WriteRequestNoResponse(&gortsplib.Request{
Method: gortsplib.PLAY,
Url: &url.URL{
Scheme: "rtsp",
Host: s.ur.Host,
Path: s.ur.Path,
RawQuery: s.ur.RawQuery,
},
})
if err != nil {
s.log("ERR: %s", err)
return true
}
frame := &gortsplib.InterleavedFrame{}
outer:
for {
if !s.readCurBuf {
frame.Content = s.readBuf1
} else {
frame.Content = s.readBuf2
}
frame.Content = frame.Content[:cap(frame.Content)]
s.readCurBuf = !s.readCurBuf
vres, err := conn.ReadInterleavedFrameOrResponse(frame)
if err != nil {
s.log("ERR: %s", err)
return true
}
switch res := vres.(type) {
case *gortsplib.Response:
if res.StatusCode != gortsplib.StatusOK {
s.log("ERR: PLAY returned code %d (%s)", res.StatusCode, res.StatusMessage)
return true
}
break outer
case *gortsplib.InterleavedFrame:
// ignore the frames sent before the response
}
}
s.p.events <- programEventStreamerReady{s}
defer func() {
s.p.events <- programEventStreamerNotReady{s}
}()
chanConnError := make(chan struct{})
go func() {
for {
frame := &gortsplib.InterleavedFrame{
Content: make([]byte, 512*1024),
}
err := conn.ReadInterleavedFrame(frame)
if err != nil {
s.log("ERR: %s", err)
close(chanConnError)
break
}
trackId, trackFlowType := interleavedChannelToTrack(frame.Channel)
s.p.events <- programEventStreamerFrame{s, trackId, trackFlowType, frame.Content}
}
}()
select {
case <-s.terminate:
return false
case <-chanConnError:
return true
}
}
func (s *streamer) close() {
close(s.terminate)
<-s.done
}