mediamtx/proxy.go

360 lines
6.4 KiB
Go
Raw Normal View History

package main
import (
"math/rand"
2020-07-19 20:39:38 +00:00
"sync"
2020-09-19 21:52:06 +00:00
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
)
const (
2020-09-19 21:56:08 +00:00
proxyRetryInterval = 5 * time.Second
proxyUDPReadBufferSize = 2048
proxyTCPReadBufferSize = 128 * 1024
)
2020-09-19 21:56:08 +00:00
type proxyState int
const (
2020-09-19 21:56:08 +00:00
proxyStateStopped proxyState = iota
proxyStateRunning
)
2020-09-19 21:56:08 +00:00
type proxy struct {
2020-08-31 13:46:03 +00:00
p *program
path *path
2020-09-19 15:13:45 +00:00
pathConf *pathConf
2020-09-19 21:56:08 +00:00
state proxyState
2020-08-31 13:46:03 +00:00
tracks []*gortsplib.Track
innerRunning bool
innerTerminate chan struct{}
innerDone chan struct{}
2020-09-19 21:56:08 +00:00
setState chan proxyState
2020-08-31 13:46:03 +00:00
terminate chan struct{}
done chan struct{}
}
2020-09-19 21:56:08 +00:00
func newProxy(p *program, path *path, pathConf *pathConf) *proxy {
s := &proxy{
p: p,
path: path,
2020-09-19 15:13:45 +00:00
pathConf: pathConf,
2020-09-19 21:56:08 +00:00
setState: make(chan proxyState),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
2020-09-19 21:52:06 +00:00
atomic.AddInt64(&p.countProxies, +1)
2020-09-19 15:13:45 +00:00
if pathConf.SourceOnDemand {
2020-09-19 21:56:08 +00:00
s.state = proxyStateStopped
} else {
2020-09-19 21:56:08 +00:00
s.state = proxyStateRunning
2020-09-19 21:52:06 +00:00
atomic.AddInt64(&p.countProxiesRunning, +1)
}
return s
}
2020-09-19 21:56:08 +00:00
func (s *proxy) isPublisher() {}
2020-09-19 21:56:08 +00:00
func (s *proxy) run(initialState proxyState) {
2020-09-03 14:31:52 +00:00
s.applyState(initialState)
outer:
for {
select {
case state := <-s.setState:
2020-08-31 13:46:03 +00:00
s.applyState(state)
case <-s.terminate:
break outer
}
}
2020-08-31 13:46:03 +00:00
if s.innerRunning {
close(s.innerTerminate)
<-s.innerDone
}
close(s.setState)
close(s.done)
}
2020-09-19 21:56:08 +00:00
func (s *proxy) applyState(state proxyState) {
if state == proxyStateRunning {
2020-08-31 13:46:03 +00:00
if !s.innerRunning {
2020-09-19 21:56:08 +00:00
s.path.log("proxy started")
2020-08-31 13:46:03 +00:00
s.innerRunning = true
s.innerTerminate = make(chan struct{})
s.innerDone = make(chan struct{})
go s.runInner()
}
} else {
if s.innerRunning {
close(s.innerTerminate)
<-s.innerDone
s.innerRunning = false
2020-09-19 21:56:08 +00:00
s.path.log("proxy stopped")
2020-08-31 13:46:03 +00:00
}
}
}
2020-09-19 21:56:08 +00:00
func (s *proxy) runInner() {
2020-08-31 13:46:03 +00:00
defer close(s.innerDone)
for {
2020-07-30 11:31:18 +00:00
ok := func() bool {
2020-08-31 13:46:03 +00:00
ok := s.runInnerInner()
2020-07-30 11:31:18 +00:00
if !ok {
return false
}
2020-09-19 21:56:08 +00:00
t := time.NewTimer(proxyRetryInterval)
defer t.Stop()
2020-07-30 11:31:18 +00:00
select {
2020-08-31 13:46:03 +00:00
case <-s.innerTerminate:
return false
case <-t.C:
}
2020-07-30 11:31:18 +00:00
return true
}()
if !ok {
break
}
}
}
2020-09-19 21:56:08 +00:00
func (s *proxy) runInnerInner() bool {
s.path.log("proxy connecting")
2020-07-19 20:39:38 +00:00
var conn *gortsplib.ConnClient
var err error
dialDone := make(chan struct{})
go func() {
2020-07-19 20:39:38 +00:00
conn, err = gortsplib.NewConnClient(gortsplib.ConnClientConf{
2020-09-19 15:13:45 +00:00
Host: s.pathConf.sourceUrl.Host,
2020-07-19 20:39:38 +00:00
ReadTimeout: s.p.conf.ReadTimeout,
WriteTimeout: s.p.conf.WriteTimeout,
})
close(dialDone)
}()
select {
2020-08-31 13:46:03 +00:00
case <-s.innerTerminate:
return false
case <-dialDone:
}
if err != nil {
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
return true
}
2020-09-19 15:13:45 +00:00
_, err = conn.Options(s.pathConf.sourceUrl)
if err != nil {
conn.Close()
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
return true
}
2020-09-19 15:13:45 +00:00
tracks, _, err := conn.Describe(s.pathConf.sourceUrl)
if err != nil {
conn.Close()
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
return true
}
// create a filtered SDP that is used by the server (not by the client)
2020-09-05 12:51:36 +00:00
serverSdp := tracks.Write()
2020-07-19 15:54:31 +00:00
s.tracks = tracks
2020-09-05 12:51:36 +00:00
s.path.publisherTrackCount = len(tracks)
s.path.publisherSdp = serverSdp
2020-09-19 15:13:45 +00:00
if s.pathConf.sourceProtocolParsed == gortsplib.StreamProtocolUDP {
2020-09-05 11:19:55 +00:00
return s.runUDP(conn)
} else {
2020-09-05 11:19:55 +00:00
return s.runTCP(conn)
}
}
2020-09-19 21:56:08 +00:00
func (s *proxy) runUDP(conn *gortsplib.ConnClient) bool {
2020-09-05 11:19:55 +00:00
var rtpReads []gortsplib.UDPReadFunc
var rtcpReads []gortsplib.UDPReadFunc
2020-07-19 20:39:38 +00:00
for _, track := range s.tracks {
for {
2020-09-15 08:40:32 +00:00
// choose two consecutive ports in range 65535-10000
2020-08-17 16:43:53 +00:00
// rtp must be even and rtcp odd
2020-07-19 20:39:38 +00:00
rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort := rtpPort + 1
2020-09-19 15:13:45 +00:00
rtpRead, rtcpRead, _, err := conn.SetupUDP(s.pathConf.sourceUrl, track, rtpPort, rtcpPort)
2020-07-19 20:39:38 +00:00
if err != nil {
2020-09-15 08:40:32 +00:00
if isBindError(err) {
continue // retry
}
conn.Close()
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
2020-07-19 20:39:38 +00:00
return true
}
rtpReads = append(rtpReads, rtpRead)
rtcpReads = append(rtcpReads, rtcpRead)
2020-07-19 20:39:38 +00:00
break
}
}
2020-09-19 15:13:45 +00:00
_, err := conn.Play(s.pathConf.sourceUrl)
if err != nil {
conn.Close()
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
return true
}
2020-09-19 21:56:08 +00:00
s.p.proxyReady <- s
2020-07-19 20:39:38 +00:00
var wg sync.WaitGroup
// receive RTP packets
for trackId, rtpRead := range rtpReads {
wg.Add(1)
2020-09-05 11:19:55 +00:00
go func(trackId int, rtpRead gortsplib.UDPReadFunc) {
2020-07-19 20:39:38 +00:00
defer wg.Done()
2020-09-19 21:56:08 +00:00
multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize)
2020-07-19 20:39:38 +00:00
for {
buf := multiBuf.next()
2020-07-19 20:39:38 +00:00
n, err := rtpRead(buf)
2020-07-19 20:39:38 +00:00
if err != nil {
break
}
s.p.readersMap.forwardFrame(s.path, trackId,
gortsplib.StreamTypeRtp, buf[:n])
2020-07-19 20:39:38 +00:00
}
}(trackId, rtpRead)
}
2020-07-19 20:39:38 +00:00
// receive RTCP packets
for trackId, rtcpRead := range rtcpReads {
wg.Add(1)
2020-09-05 11:19:55 +00:00
go func(trackId int, rtcpRead gortsplib.UDPReadFunc) {
2020-07-19 20:39:38 +00:00
defer wg.Done()
2020-09-19 21:56:08 +00:00
multiBuf := newMultiBuffer(2, proxyUDPReadBufferSize)
2020-07-19 20:39:38 +00:00
for {
buf := multiBuf.next()
2020-07-19 20:39:38 +00:00
n, err := rtcpRead(buf)
2020-07-19 20:39:38 +00:00
if err != nil {
break
}
s.p.readersMap.forwardFrame(s.path, trackId,
gortsplib.StreamTypeRtcp, buf[:n])
2020-07-19 20:39:38 +00:00
}
}(trackId, rtcpRead)
2020-07-19 20:39:38 +00:00
}
2020-07-19 15:54:31 +00:00
2020-07-19 16:50:43 +00:00
tcpConnDone := make(chan error)
2020-07-19 15:54:31 +00:00
go func() {
2020-09-19 15:13:45 +00:00
tcpConnDone <- conn.LoopUDP(s.pathConf.sourceUrl)
2020-07-19 15:54:31 +00:00
}()
var ret bool
outer:
for {
select {
2020-08-31 13:46:03 +00:00
case <-s.innerTerminate:
conn.Close()
2020-07-19 16:50:43 +00:00
<-tcpConnDone
ret = false
break outer
2020-07-19 15:54:31 +00:00
case err := <-tcpConnDone:
conn.Close()
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
2020-07-19 15:54:31 +00:00
ret = true
break outer
}
}
2020-07-19 20:39:38 +00:00
wg.Wait()
2020-09-19 21:56:08 +00:00
s.p.proxyNotReady <- s
return ret
}
2020-09-19 21:56:08 +00:00
func (s *proxy) runTCP(conn *gortsplib.ConnClient) bool {
2020-07-19 15:54:31 +00:00
for _, track := range s.tracks {
2020-09-19 15:13:45 +00:00
_, err := conn.SetupTCP(s.pathConf.sourceUrl, track)
if err != nil {
conn.Close()
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
return true
}
}
2020-09-19 15:13:45 +00:00
_, err := conn.Play(s.pathConf.sourceUrl)
if err != nil {
conn.Close()
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
return true
}
2020-09-19 21:56:08 +00:00
s.p.proxyReady <- s
2020-07-12 20:53:22 +00:00
frame := &gortsplib.InterleavedFrame{}
2020-09-19 21:56:08 +00:00
multiBuf := newMultiBuffer(2, proxyTCPReadBufferSize)
2020-07-12 20:53:22 +00:00
2020-07-19 16:50:43 +00:00
tcpConnDone := make(chan error)
go func() {
for {
frame.Content = multiBuf.next()
frame.Content = frame.Content[:cap(frame.Content)]
2020-07-12 20:53:22 +00:00
err := conn.ReadFrame(frame)
if err != nil {
tcpConnDone <- err
return
}
s.p.readersMap.forwardFrame(s.path, frame.TrackId, frame.StreamType, frame.Content)
}
}()
var ret bool
2020-07-12 20:53:22 +00:00
outer:
for {
select {
2020-08-31 13:46:03 +00:00
case <-s.innerTerminate:
conn.Close()
2020-07-19 16:50:43 +00:00
<-tcpConnDone
ret = false
2020-07-12 20:53:22 +00:00
break outer
case err := <-tcpConnDone:
conn.Close()
2020-09-19 21:56:08 +00:00
s.path.log("proxy ERR: %s", err)
ret = true
2020-07-12 20:53:22 +00:00
break outer
}
}
2020-09-19 21:56:08 +00:00
s.p.proxyNotReady <- s
return ret
}