mediamtx/main.go

409 lines
8.8 KiB
Go
Raw Normal View History

2019-12-28 21:07:03 +00:00
package main
import (
"fmt"
"log"
"net"
2019-12-28 21:07:03 +00:00
"os"
"sync"
2020-09-03 14:24:39 +00:00
"sync/atomic"
"time"
2019-12-28 21:07:03 +00:00
"github.com/aler9/gortsplib"
2019-12-28 21:07:03 +00:00
"gopkg.in/alecthomas/kingpin.v2"
2020-10-13 22:50:08 +00:00
"github.com/aler9/rtsp-simple-server/conf"
"github.com/aler9/rtsp-simple-server/loghandler"
2019-12-28 21:07:03 +00:00
)
2020-06-26 09:00:10 +00:00
var Version = "v0.0.0"
2019-12-28 21:07:03 +00:00
2020-08-30 13:27:03 +00:00
const (
checkPathPeriod = 5 * time.Second
)
2020-05-10 13:33:42 +00:00
type program struct {
2020-10-13 22:50:08 +00:00
conf *conf.Conf
logHandler *loghandler.LogHandler
metrics *metrics
pprof *pprof
2020-08-30 12:15:00 +00:00
paths map[string]*path
2020-10-03 19:10:41 +00:00
serverUdpRtp *serverUDP
serverUdpRtcp *serverUDP
serverTcp *serverTCP
clients map[*client]struct{}
clientsWg sync.WaitGroup
udpPublishersMap *udpPublishersMap
readersMap *readersMap
// use pointers to avoid a crash on 32bit platforms
// https://github.com/golang/go/issues/9959
2020-10-03 19:10:41 +00:00
countClients *int64
countPublishers *int64
countReaders *int64
countSourcesRtsp *int64
countSourcesRtspRunning *int64
countSourcesRtmp *int64
countSourcesRtmpRunning *int64
clientNew chan net.Conn
clientClose chan *client
clientDescribe chan clientDescribeReq
clientAnnounce chan clientAnnounceReq
clientSetupPlay chan clientSetupPlayReq
clientPlay chan *client
clientRecord chan *client
sourceRtspReady chan *sourceRtsp
sourceRtspNotReady chan *sourceRtsp
sourceRtmpReady chan *sourceRtmp
sourceRtmpNotReady chan *sourceRtmp
terminate chan struct{}
done chan struct{}
2020-05-10 13:33:42 +00:00
}
2020-10-13 23:19:14 +00:00
func newProgram(args []string) (*program, error) {
2020-06-27 19:22:50 +00:00
k := kingpin.New("rtsp-simple-server",
"rtsp-simple-server "+Version+"\n\nRTSP server.")
2020-05-11 07:31:56 +00:00
2020-06-27 19:22:50 +00:00
argVersion := k.Flag("version", "print version").Bool()
argConfPath := k.Arg("confpath", "path to a config file. The default is rtsp-simple-server.yml.").Default("rtsp-simple-server.yml").String()
2020-05-11 07:31:56 +00:00
kingpin.MustParse(k.Parse(args))
2020-05-11 07:31:56 +00:00
if *argVersion == true {
2020-05-11 07:39:49 +00:00
fmt.Println(Version)
2020-05-10 19:32:40 +00:00
os.Exit(0)
2020-01-26 11:58:56 +00:00
}
2020-10-13 22:50:08 +00:00
conf, err := conf.Load(*argConfPath)
if err != nil {
return nil, err
}
2020-10-13 22:50:08 +00:00
logHandler, err := loghandler.New(conf.LogDestinationsParsed, conf.LogFile)
2020-09-18 22:19:55 +00:00
if err != nil {
return nil, err
}
p := &program{
conf: conf,
2020-09-18 22:19:55 +00:00
logHandler: logHandler,
2020-08-30 12:15:00 +00:00
paths: make(map[string]*path),
clients: make(map[*client]struct{}),
udpPublishersMap: newUdpPublisherMap(),
readersMap: newReadersMap(),
2020-09-22 06:58:38 +00:00
countClients: func() *int64 {
v := int64(0)
return &v
}(),
2020-09-22 06:58:38 +00:00
countPublishers: func() *int64 {
v := int64(0)
return &v
}(),
2020-09-22 06:58:38 +00:00
countReaders: func() *int64 {
v := int64(0)
return &v
}(),
2020-10-03 19:10:41 +00:00
countSourcesRtsp: func() *int64 {
2020-09-22 06:58:38 +00:00
v := int64(0)
return &v
}(),
2020-10-03 19:10:41 +00:00
countSourcesRtspRunning: func() *int64 {
v := int64(0)
return &v
}(),
2020-10-03 19:10:41 +00:00
countSourcesRtmp: func() *int64 {
v := int64(0)
return &v
}(),
countSourcesRtmpRunning: func() *int64 {
v := int64(0)
return &v
}(),
clientNew: make(chan net.Conn),
clientClose: make(chan *client),
clientDescribe: make(chan clientDescribeReq),
clientAnnounce: make(chan clientAnnounceReq),
clientSetupPlay: make(chan clientSetupPlayReq),
clientPlay: make(chan *client),
clientRecord: make(chan *client),
sourceRtspReady: make(chan *sourceRtsp),
sourceRtspNotReady: make(chan *sourceRtsp),
sourceRtmpReady: make(chan *sourceRtmp),
sourceRtmpNotReady: make(chan *sourceRtmp),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
p.log("rtsp-simple-server %s", Version)
2020-07-30 15:30:50 +00:00
if conf.Metrics {
p.metrics, err = newMetrics(p)
if err != nil {
return nil, err
}
2020-07-30 15:30:50 +00:00
}
if conf.Pprof {
p.pprof, err = newPprof(p)
if err != nil {
return nil, err
}
2020-06-27 13:42:54 +00:00
}
2020-09-19 15:13:45 +00:00
for name, pathConf := range conf.Paths {
2020-10-13 22:50:08 +00:00
if pathConf.Regexp == nil {
2020-09-19 15:13:45 +00:00
p.paths[name] = newPath(p, name, pathConf)
2020-08-30 12:15:00 +00:00
}
}
2020-10-13 22:50:08 +00:00
if _, ok := conf.ProtocolsParsed[gortsplib.StreamProtocolUDP]; ok {
2020-10-03 19:10:41 +00:00
p.serverUdpRtp, err = newServerUDP(p, conf.RtpPort, gortsplib.StreamTypeRtp)
if err != nil {
return nil, err
}
2019-12-28 21:07:03 +00:00
2020-10-03 19:10:41 +00:00
p.serverUdpRtcp, err = newServerUDP(p, conf.RtcpPort, gortsplib.StreamTypeRtcp)
if err != nil {
return nil, err
}
2019-12-28 21:07:03 +00:00
}
2020-10-03 19:10:41 +00:00
p.serverTcp, err = newServerTCP(p)
2020-08-29 17:48:41 +00:00
if err != nil {
return nil, err
}
2020-06-27 11:38:35 +00:00
go p.run()
2019-12-28 21:07:03 +00:00
2020-05-10 13:33:42 +00:00
return p, nil
2019-12-28 21:07:03 +00:00
}
2020-06-27 12:18:16 +00:00
func (p *program) log(format string, args ...interface{}) {
2020-09-22 06:58:38 +00:00
countClients := atomic.LoadInt64(p.countClients)
countPublishers := atomic.LoadInt64(p.countPublishers)
countReaders := atomic.LoadInt64(p.countReaders)
2020-09-03 14:24:39 +00:00
2020-09-19 21:37:54 +00:00
log.Printf(fmt.Sprintf("[%d/%d/%d] "+format, append([]interface{}{countClients,
countPublishers, countReaders}, args...)...))
}
2020-06-27 11:38:35 +00:00
func (p *program) run() {
defer close(p.done)
2020-08-30 12:27:26 +00:00
if p.metrics != nil {
go p.metrics.run()
}
if p.pprof != nil {
go p.pprof.run()
}
2020-10-03 19:10:41 +00:00
if p.serverUdpRtp != nil {
go p.serverUdpRtp.run()
2020-08-30 12:27:26 +00:00
}
2020-10-03 19:10:41 +00:00
if p.serverUdpRtcp != nil {
go p.serverUdpRtcp.run()
2020-08-30 12:27:26 +00:00
}
2020-10-03 19:10:41 +00:00
go p.serverTcp.run()
2020-08-30 12:27:26 +00:00
for _, p := range p.paths {
p.onInit()
}
2020-08-30 13:27:03 +00:00
checkPathsTicker := time.NewTicker(checkPathPeriod)
defer checkPathsTicker.Stop()
2020-06-27 11:38:35 +00:00
outer:
for {
select {
case <-checkPathsTicker.C:
for _, path := range p.paths {
path.onCheck()
2020-06-27 11:38:35 +00:00
}
2020-08-31 22:01:17 +00:00
case conn := <-p.clientNew:
newClient(p, conn)
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case client := <-p.clientClose:
if _, ok := p.clients[client]; !ok {
continue
}
client.close()
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case req := <-p.clientDescribe:
2020-10-03 19:10:41 +00:00
// create path if it doesn't exist
2020-08-31 22:01:17 +00:00
if _, ok := p.paths[req.pathName]; !ok {
2020-09-19 15:13:45 +00:00
p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf)
2020-08-31 22:01:17 +00:00
}
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
p.paths[req.pathName].onDescribe(req.client)
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case req := <-p.clientAnnounce:
2020-10-03 19:10:41 +00:00
// create path if it doesn't exist
2020-08-31 22:01:17 +00:00
if path, ok := p.paths[req.pathName]; !ok {
2020-09-19 15:13:45 +00:00
p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf)
2020-08-30 13:27:03 +00:00
2020-08-31 22:01:17 +00:00
} else {
2020-10-13 18:00:40 +00:00
if path.source != nil {
2020-08-31 22:01:17 +00:00
req.res <- fmt.Errorf("someone is already publishing on path '%s'", req.pathName)
continue
}
2020-08-31 22:01:17 +00:00
}
2020-10-13 18:00:40 +00:00
p.paths[req.pathName].source = req.client
p.paths[req.pathName].sourceTrackCount = req.trackCount
p.paths[req.pathName].sourceSdp = req.sdp
2020-08-31 22:01:17 +00:00
req.client.path = p.paths[req.pathName]
req.client.state = clientStatePreRecord
req.res <- nil
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case req := <-p.clientSetupPlay:
path, ok := p.paths[req.pathName]
2020-10-13 18:00:40 +00:00
if !ok || !path.sourceReady {
2020-08-31 22:01:17 +00:00
req.res <- fmt.Errorf("no one is publishing on path '%s'", req.pathName)
continue
}
2020-06-27 11:38:35 +00:00
2020-10-13 18:00:40 +00:00
if req.trackId >= path.sourceTrackCount {
2020-08-31 22:01:17 +00:00
req.res <- fmt.Errorf("track %d does not exist", req.trackId)
continue
}
req.client.path = path
req.client.state = clientStatePrePlay
req.res <- nil
case client := <-p.clientPlay:
2020-09-22 06:58:38 +00:00
atomic.AddInt64(p.countReaders, 1)
2020-08-31 22:01:17 +00:00
client.state = clientStatePlay
p.readersMap.add(client)
2020-08-31 22:01:17 +00:00
case client := <-p.clientRecord:
2020-09-22 06:58:38 +00:00
atomic.AddInt64(p.countPublishers, 1)
2020-08-31 22:01:17 +00:00
client.state = clientStateRecord
2020-09-05 11:19:55 +00:00
if client.streamProtocol == gortsplib.StreamProtocolUDP {
2020-08-31 22:01:17 +00:00
for trackId, track := range client.streamTracks {
addr := makeUDPPublisherAddr(client.ip(), track.rtpPort)
p.udpPublishersMap.add(addr, &udpPublisher{
2020-08-31 22:01:17 +00:00
client: client,
trackId: trackId,
streamType: gortsplib.StreamTypeRtp,
})
addr = makeUDPPublisherAddr(client.ip(), track.rtcpPort)
p.udpPublishersMap.add(addr, &udpPublisher{
2020-08-31 22:01:17 +00:00
client: client,
trackId: trackId,
streamType: gortsplib.StreamTypeRtcp,
})
}
2020-08-31 22:01:17 +00:00
}
2020-10-13 18:00:40 +00:00
client.path.onSourceSetReady()
2020-06-27 11:38:35 +00:00
2020-10-03 19:10:41 +00:00
case s := <-p.sourceRtspReady:
2020-10-13 18:00:40 +00:00
s.path.onSourceSetReady()
2020-10-03 19:10:41 +00:00
case s := <-p.sourceRtspNotReady:
2020-10-13 18:00:40 +00:00
s.path.onSourceSetNotReady()
2020-10-03 19:10:41 +00:00
case s := <-p.sourceRtmpReady:
2020-10-13 18:00:40 +00:00
s.path.onSourceSetReady()
2020-10-03 19:10:41 +00:00
case s := <-p.sourceRtmpNotReady:
2020-10-13 18:00:40 +00:00
s.path.onSourceSetNotReady()
2020-08-31 22:01:17 +00:00
case <-p.terminate:
break outer
2020-06-27 11:38:35 +00:00
}
}
go func() {
2020-08-31 22:01:17 +00:00
for {
select {
2020-09-19 21:37:54 +00:00
case _, ok := <-p.clientNew:
2020-08-31 22:01:17 +00:00
if !ok {
return
}
case <-p.clientClose:
case <-p.clientDescribe:
case req := <-p.clientAnnounce:
req.res <- fmt.Errorf("terminated")
2020-07-30 15:30:50 +00:00
2020-08-31 22:01:17 +00:00
case req := <-p.clientSetupPlay:
req.res <- fmt.Errorf("terminated")
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case <-p.clientPlay:
case <-p.clientRecord:
2020-10-03 19:10:41 +00:00
case <-p.sourceRtspReady:
case <-p.sourceRtspNotReady:
case <-p.sourceRtmpReady:
case <-p.sourceRtmpNotReady:
2020-06-27 11:38:35 +00:00
}
}
}()
2020-09-19 11:51:55 +00:00
p.udpPublishersMap.clear()
p.readersMap.clear()
2020-08-30 11:31:46 +00:00
for _, p := range p.paths {
p.onClose()
2020-08-30 11:31:46 +00:00
}
2020-10-03 19:10:41 +00:00
p.serverTcp.close()
2020-08-29 17:48:41 +00:00
2020-10-03 19:10:41 +00:00
if p.serverUdpRtcp != nil {
p.serverUdpRtcp.close()
2020-08-30 12:27:26 +00:00
}
2020-10-03 19:10:41 +00:00
if p.serverUdpRtp != nil {
p.serverUdpRtp.close()
}
2020-06-27 11:38:35 +00:00
for c := range p.clients {
c.close()
2020-06-27 11:38:35 +00:00
}
p.clientsWg.Wait()
2020-07-30 15:30:50 +00:00
if p.metrics != nil {
p.metrics.close()
}
2020-08-30 12:27:26 +00:00
if p.pprof != nil {
p.pprof.close()
}
p.logHandler.Close()
2020-08-31 22:01:17 +00:00
close(p.clientNew)
close(p.clientClose)
close(p.clientDescribe)
close(p.clientAnnounce)
close(p.clientSetupPlay)
close(p.clientPlay)
close(p.clientRecord)
2020-10-03 19:10:41 +00:00
close(p.sourceRtspReady)
close(p.sourceRtspNotReady)
2020-06-27 11:38:35 +00:00
}
func (p *program) close() {
2020-08-31 22:01:17 +00:00
close(p.terminate)
2020-06-27 11:38:35 +00:00
<-p.done
}
2019-12-28 21:07:03 +00:00
func main() {
2020-10-13 23:19:14 +00:00
_, err := newProgram(os.Args[1:])
2019-12-28 21:07:03 +00:00
if err != nil {
log.Fatal("ERR: ", err)
2019-12-28 21:07:03 +00:00
}
2020-06-24 20:42:39 +00:00
select {}
2019-12-28 21:07:03 +00:00
}