mirror of
https://github.com/bluenviron/mediamtx
synced 2025-01-18 13:01:49 +00:00
409 lines
8.8 KiB
Go
409 lines
8.8 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/aler9/gortsplib"
|
|
"gopkg.in/alecthomas/kingpin.v2"
|
|
|
|
"github.com/aler9/rtsp-simple-server/conf"
|
|
"github.com/aler9/rtsp-simple-server/loghandler"
|
|
)
|
|
|
|
var Version = "v0.0.0"
|
|
|
|
const (
|
|
checkPathPeriod = 5 * time.Second
|
|
)
|
|
|
|
type program struct {
|
|
conf *conf.Conf
|
|
logHandler *loghandler.LogHandler
|
|
metrics *metrics
|
|
pprof *pprof
|
|
paths map[string]*path
|
|
serverUdpRtp *serverUDP
|
|
serverUdpRtcp *serverUDP
|
|
serverTcp *serverTCP
|
|
clients map[*client]struct{}
|
|
clientsWg sync.WaitGroup
|
|
udpPublishersMap *udpPublishersMap
|
|
readersMap *readersMap
|
|
// use pointers to avoid a crash on 32bit platforms
|
|
// https://github.com/golang/go/issues/9959
|
|
countClients *int64
|
|
countPublishers *int64
|
|
countReaders *int64
|
|
countSourcesRtsp *int64
|
|
countSourcesRtspRunning *int64
|
|
countSourcesRtmp *int64
|
|
countSourcesRtmpRunning *int64
|
|
|
|
clientNew chan net.Conn
|
|
clientClose chan *client
|
|
clientDescribe chan clientDescribeReq
|
|
clientAnnounce chan clientAnnounceReq
|
|
clientSetupPlay chan clientSetupPlayReq
|
|
clientPlay chan *client
|
|
clientRecord chan *client
|
|
sourceRtspReady chan *sourceRtsp
|
|
sourceRtspNotReady chan *sourceRtsp
|
|
sourceRtmpReady chan *sourceRtmp
|
|
sourceRtmpNotReady chan *sourceRtmp
|
|
terminate chan struct{}
|
|
done chan struct{}
|
|
}
|
|
|
|
func newProgram(args []string) (*program, error) {
|
|
k := kingpin.New("rtsp-simple-server",
|
|
"rtsp-simple-server "+Version+"\n\nRTSP server.")
|
|
|
|
argVersion := k.Flag("version", "print version").Bool()
|
|
argConfPath := k.Arg("confpath", "path to a config file. The default is rtsp-simple-server.yml.").Default("rtsp-simple-server.yml").String()
|
|
|
|
kingpin.MustParse(k.Parse(args))
|
|
|
|
if *argVersion == true {
|
|
fmt.Println(Version)
|
|
os.Exit(0)
|
|
}
|
|
|
|
conf, err := conf.Load(*argConfPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logHandler, err := loghandler.New(conf.LogDestinationsParsed, conf.LogFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p := &program{
|
|
conf: conf,
|
|
logHandler: logHandler,
|
|
paths: make(map[string]*path),
|
|
clients: make(map[*client]struct{}),
|
|
udpPublishersMap: newUdpPublisherMap(),
|
|
readersMap: newReadersMap(),
|
|
countClients: func() *int64 {
|
|
v := int64(0)
|
|
return &v
|
|
}(),
|
|
countPublishers: func() *int64 {
|
|
v := int64(0)
|
|
return &v
|
|
}(),
|
|
countReaders: func() *int64 {
|
|
v := int64(0)
|
|
return &v
|
|
}(),
|
|
countSourcesRtsp: func() *int64 {
|
|
v := int64(0)
|
|
return &v
|
|
}(),
|
|
countSourcesRtspRunning: func() *int64 {
|
|
v := int64(0)
|
|
return &v
|
|
}(),
|
|
countSourcesRtmp: func() *int64 {
|
|
v := int64(0)
|
|
return &v
|
|
}(),
|
|
countSourcesRtmpRunning: func() *int64 {
|
|
v := int64(0)
|
|
return &v
|
|
}(),
|
|
clientNew: make(chan net.Conn),
|
|
clientClose: make(chan *client),
|
|
clientDescribe: make(chan clientDescribeReq),
|
|
clientAnnounce: make(chan clientAnnounceReq),
|
|
clientSetupPlay: make(chan clientSetupPlayReq),
|
|
clientPlay: make(chan *client),
|
|
clientRecord: make(chan *client),
|
|
sourceRtspReady: make(chan *sourceRtsp),
|
|
sourceRtspNotReady: make(chan *sourceRtsp),
|
|
sourceRtmpReady: make(chan *sourceRtmp),
|
|
sourceRtmpNotReady: make(chan *sourceRtmp),
|
|
terminate: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
p.log("rtsp-simple-server %s", Version)
|
|
|
|
if conf.Metrics {
|
|
p.metrics, err = newMetrics(p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if conf.Pprof {
|
|
p.pprof, err = newPprof(p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
for name, pathConf := range conf.Paths {
|
|
if pathConf.Regexp == nil {
|
|
p.paths[name] = newPath(p, name, pathConf)
|
|
}
|
|
}
|
|
|
|
if _, ok := conf.ProtocolsParsed[gortsplib.StreamProtocolUDP]; ok {
|
|
p.serverUdpRtp, err = newServerUDP(p, conf.RtpPort, gortsplib.StreamTypeRtp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p.serverUdpRtcp, err = newServerUDP(p, conf.RtcpPort, gortsplib.StreamTypeRtcp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
p.serverTcp, err = newServerTCP(p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go p.run()
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *program) log(format string, args ...interface{}) {
|
|
countClients := atomic.LoadInt64(p.countClients)
|
|
countPublishers := atomic.LoadInt64(p.countPublishers)
|
|
countReaders := atomic.LoadInt64(p.countReaders)
|
|
|
|
log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClients,
|
|
countPublishers, countReaders}, args...)...))
|
|
}
|
|
|
|
func (p *program) run() {
|
|
defer close(p.done)
|
|
|
|
if p.metrics != nil {
|
|
go p.metrics.run()
|
|
}
|
|
|
|
if p.pprof != nil {
|
|
go p.pprof.run()
|
|
}
|
|
|
|
if p.serverUdpRtp != nil {
|
|
go p.serverUdpRtp.run()
|
|
}
|
|
|
|
if p.serverUdpRtcp != nil {
|
|
go p.serverUdpRtcp.run()
|
|
}
|
|
|
|
go p.serverTcp.run()
|
|
|
|
for _, p := range p.paths {
|
|
p.onInit()
|
|
}
|
|
|
|
checkPathsTicker := time.NewTicker(checkPathPeriod)
|
|
defer checkPathsTicker.Stop()
|
|
|
|
outer:
|
|
for {
|
|
select {
|
|
case <-checkPathsTicker.C:
|
|
for _, path := range p.paths {
|
|
path.onCheck()
|
|
}
|
|
|
|
case conn := <-p.clientNew:
|
|
newClient(p, conn)
|
|
|
|
case client := <-p.clientClose:
|
|
if _, ok := p.clients[client]; !ok {
|
|
continue
|
|
}
|
|
client.close()
|
|
|
|
case req := <-p.clientDescribe:
|
|
// create path if it doesn't exist
|
|
if _, ok := p.paths[req.pathName]; !ok {
|
|
p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf)
|
|
}
|
|
|
|
p.paths[req.pathName].onDescribe(req.client)
|
|
|
|
case req := <-p.clientAnnounce:
|
|
// create path if it doesn't exist
|
|
if path, ok := p.paths[req.pathName]; !ok {
|
|
p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf)
|
|
|
|
} else {
|
|
if path.source != nil {
|
|
req.res <- fmt.Errorf("someone is already publishing on path '%s'", req.pathName)
|
|
continue
|
|
}
|
|
}
|
|
|
|
p.paths[req.pathName].source = req.client
|
|
p.paths[req.pathName].sourceTrackCount = req.trackCount
|
|
p.paths[req.pathName].sourceSdp = req.sdp
|
|
|
|
req.client.path = p.paths[req.pathName]
|
|
req.client.state = clientStatePreRecord
|
|
req.res <- nil
|
|
|
|
case req := <-p.clientSetupPlay:
|
|
path, ok := p.paths[req.pathName]
|
|
if !ok || !path.sourceReady {
|
|
req.res <- fmt.Errorf("no one is publishing on path '%s'", req.pathName)
|
|
continue
|
|
}
|
|
|
|
if req.trackId >= path.sourceTrackCount {
|
|
req.res <- fmt.Errorf("track %d does not exist", req.trackId)
|
|
continue
|
|
}
|
|
|
|
req.client.path = path
|
|
req.client.state = clientStatePrePlay
|
|
req.res <- nil
|
|
|
|
case client := <-p.clientPlay:
|
|
atomic.AddInt64(p.countReaders, 1)
|
|
client.state = clientStatePlay
|
|
p.readersMap.add(client)
|
|
|
|
case client := <-p.clientRecord:
|
|
atomic.AddInt64(p.countPublishers, 1)
|
|
client.state = clientStateRecord
|
|
|
|
if client.streamProtocol == gortsplib.StreamProtocolUDP {
|
|
for trackId, track := range client.streamTracks {
|
|
addr := makeUDPPublisherAddr(client.ip(), track.rtpPort)
|
|
p.udpPublishersMap.add(addr, &udpPublisher{
|
|
client: client,
|
|
trackId: trackId,
|
|
streamType: gortsplib.StreamTypeRtp,
|
|
})
|
|
|
|
addr = makeUDPPublisherAddr(client.ip(), track.rtcpPort)
|
|
p.udpPublishersMap.add(addr, &udpPublisher{
|
|
client: client,
|
|
trackId: trackId,
|
|
streamType: gortsplib.StreamTypeRtcp,
|
|
})
|
|
}
|
|
}
|
|
|
|
client.path.onSourceSetReady()
|
|
|
|
case s := <-p.sourceRtspReady:
|
|
s.path.onSourceSetReady()
|
|
|
|
case s := <-p.sourceRtspNotReady:
|
|
s.path.onSourceSetNotReady()
|
|
|
|
case s := <-p.sourceRtmpReady:
|
|
s.path.onSourceSetReady()
|
|
|
|
case s := <-p.sourceRtmpNotReady:
|
|
s.path.onSourceSetNotReady()
|
|
|
|
case <-p.terminate:
|
|
break outer
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case _, ok := <-p.clientNew:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
case <-p.clientClose:
|
|
case <-p.clientDescribe:
|
|
|
|
case req := <-p.clientAnnounce:
|
|
req.res <- fmt.Errorf("terminated")
|
|
|
|
case req := <-p.clientSetupPlay:
|
|
req.res <- fmt.Errorf("terminated")
|
|
|
|
case <-p.clientPlay:
|
|
case <-p.clientRecord:
|
|
case <-p.sourceRtspReady:
|
|
case <-p.sourceRtspNotReady:
|
|
case <-p.sourceRtmpReady:
|
|
case <-p.sourceRtmpNotReady:
|
|
}
|
|
}
|
|
}()
|
|
|
|
p.udpPublishersMap.clear()
|
|
p.readersMap.clear()
|
|
|
|
for _, p := range p.paths {
|
|
p.onClose()
|
|
}
|
|
|
|
p.serverTcp.close()
|
|
|
|
if p.serverUdpRtcp != nil {
|
|
p.serverUdpRtcp.close()
|
|
}
|
|
|
|
if p.serverUdpRtp != nil {
|
|
p.serverUdpRtp.close()
|
|
}
|
|
|
|
for c := range p.clients {
|
|
c.close()
|
|
}
|
|
|
|
p.clientsWg.Wait()
|
|
|
|
if p.metrics != nil {
|
|
p.metrics.close()
|
|
}
|
|
|
|
if p.pprof != nil {
|
|
p.pprof.close()
|
|
}
|
|
|
|
p.logHandler.Close()
|
|
|
|
close(p.clientNew)
|
|
close(p.clientClose)
|
|
close(p.clientDescribe)
|
|
close(p.clientAnnounce)
|
|
close(p.clientSetupPlay)
|
|
close(p.clientPlay)
|
|
close(p.clientRecord)
|
|
close(p.sourceRtspReady)
|
|
close(p.sourceRtspNotReady)
|
|
}
|
|
|
|
func (p *program) close() {
|
|
close(p.terminate)
|
|
<-p.done
|
|
}
|
|
|
|
func main() {
|
|
_, err := newProgram(os.Args[1:])
|
|
if err != nil {
|
|
log.Fatal("ERR: ", err)
|
|
}
|
|
|
|
select {}
|
|
}
|