add metrics exporter (#37)

This commit is contained in:
aler9 2020-07-30 17:30:50 +02:00
parent bd012fbf1a
commit 90aa47d2a3
11 changed files with 180 additions and 45 deletions

View File

@ -68,6 +68,7 @@ define CONFIG_RUN
#rtspPort: 8555 #rtspPort: 8555
#rtpPort: 8002 #rtpPort: 8002
#rtcpPort: 8003 #rtcpPort: 8003
metrics: yes
paths: paths:
all: all:

View File

@ -188,14 +188,32 @@ systemctl enable rtsp-simple-server
systemctl start rtsp-simple-server systemctl start rtsp-simple-server
``` ```
#### Client count #### Monitoring
The current number of clients, publishers and readers is printed in each log line; for instance, the line: There are multiple ways to monitor the server usage over time:
``` * The current number of clients, publishers and readers is printed in each log line; for instance, the line:
2020/01/01 00:00:00 [2/1/1] [client 127.0.0.1:44428] OPTION ```
``` 2020/01/01 00:00:00 [2/1/1] [client 127.0.0.1:44428] OPTION
```
means that there are 2 clients, 1 publisher and 1 receiver.
means that there are 2 clients, 1 publisher and 1 receiver. * A metrics exporter, compatible with Prometheus, can be enabled with the option `metrics: yes`; then the server can be queried for metrics with Prometheus or with a simple HTTP request:
```
wget -qO- localhost:9998
```
Obtaining:
```
clients 23 1596122687740
publishers 15 1596122687740
readers 8 1596122687740
```
* A performance monitor, compatible with pprof, can be enabled with the option `pprof: yes`; then the server can be queried for metrics with pprof-compatible tools, like:
```
docker run --rm -it --network=host golang:1.14-alpine3.12 go tool pprof -text http://localhost:9999/debug/pprof/goroutine
docker run --rm -it --network=host golang:1.14-alpine3.12 go tool pprof -text http://localhost:9999/debug/pprof/heap
docker run --rm -it --network=host golang:1.14-alpine3.12 go tool pprof -text http://localhost:9999/debug/pprof/profile?seconds=30
```
#### Full command-line usage #### Full command-line usage

View File

@ -932,7 +932,7 @@ func (c *client) runRecord(path string) {
case <-receiverReportTicker.C: case <-receiverReportTicker.C:
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
frame := c.rtcpReceivers[trackId].Report() frame := c.rtcpReceivers[trackId].Report()
c.p.rtcpl.writeChan <- &udpAddrBufPair{ c.p.serverRtcp.writeChan <- &udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
IP: c.ip(), IP: c.ip(),
Zone: c.zone(), Zone: c.zone(),

View File

@ -42,6 +42,7 @@ type conf struct {
WriteTimeout time.Duration `yaml:"writeTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"`
AuthMethods []string `yaml:"authMethods"` AuthMethods []string `yaml:"authMethods"`
authMethodsParsed []gortsplib.AuthMethod `` authMethodsParsed []gortsplib.AuthMethod ``
Metrics bool `yaml:"metrics"`
Pprof bool `yaml:"pprof"` Pprof bool `yaml:"pprof"`
Paths map[string]*confPath `yaml:"paths"` Paths map[string]*confPath `yaml:"paths"`
} }

5
go.mod
View File

@ -5,9 +5,10 @@ go 1.12
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200728150440-efac0eb60921 github.com/aler9/gortsplib v0.0.0-20200730132448-3b14ef755b05
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.6.1 github.com/stretchr/testify v1.6.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.2.2 gopkg.in/yaml.v2 v2.2.8
) )

8
go.sum
View File

@ -2,12 +2,14 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200728150440-efac0eb60921 h1:GkF8VqCVRDK1jlOwnorAh2uLEhzZoqNJ7et/uYBkHI4= github.com/aler9/gortsplib v0.0.0-20200730132448-3b14ef755b05 h1:DWxXmtYvLYt3FNAZYKENS8NmxtkpBZJIbtZyOZ8Xu+c=
github.com/aler9/gortsplib v0.0.0-20200728150440-efac0eb60921/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= github.com/aler9/gortsplib v0.0.0-20200730132448-3b14ef755b05/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY=
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc=
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA=
@ -27,5 +29,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

73
main.go
View File

@ -17,10 +17,20 @@ import (
var Version = "v0.0.0" var Version = "v0.0.0"
const (
pprofAddress = ":9999"
)
type programEvent interface { type programEvent interface {
isProgramEvent() isProgramEvent()
} }
type programEventMetrics struct {
res chan *metricsData
}
func (programEventMetrics) isProgramEvent() {}
type programEventClientNew struct { type programEventClientNew struct {
nconn net.Conn nconn net.Conn
} }
@ -151,9 +161,10 @@ func (programEventTerminate) isProgramEvent() {}
type program struct { type program struct {
conf *conf conf *conf
rtspl *serverTcp metrics *metrics
rtpl *serverUdp serverRtsp *serverTcp
rtcpl *serverUdp serverRtp *serverUdp
serverRtcp *serverUdp
sources []*source sources []*source
clients map[*client]struct{} clients map[*client]struct{}
paths map[string]*path paths map[string]*path
@ -196,7 +207,7 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
continue continue
} }
newPath(p, path, confp, true) p.paths[path] = newPath(p, path, confp, true)
if confp.Source != "record" { if confp.Source != "record" {
s := newSource(p, path, confp) s := newSource(p, path, confp)
@ -207,36 +218,42 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
p.log("rtsp-simple-server %s", Version) p.log("rtsp-simple-server %s", Version)
if conf.Metrics {
p.metrics = newMetrics(p)
}
if conf.Pprof { if conf.Pprof {
go func(mux *http.ServeMux) { go func(mux *http.ServeMux) {
server := &http.Server{ p.log("[pprof] opened on " + pprofAddress)
Addr: ":9999", panic((&http.Server{
Addr: pprofAddress,
Handler: mux, Handler: mux,
} }).ListenAndServe())
p.log("pprof is available on :9999")
panic(server.ListenAndServe())
}(http.DefaultServeMux) }(http.DefaultServeMux)
http.DefaultServeMux = http.NewServeMux() http.DefaultServeMux = http.NewServeMux()
} }
p.rtpl, err = newServerUdp(p, conf.RtpPort, gortsplib.StreamTypeRtp) p.serverRtp, err = newServerUdp(p, conf.RtpPort, gortsplib.StreamTypeRtp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.rtcpl, err = newServerUdp(p, conf.RtcpPort, gortsplib.StreamTypeRtcp) p.serverRtcp, err = newServerUdp(p, conf.RtcpPort, gortsplib.StreamTypeRtcp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.rtspl, err = newServerTcp(p) p.serverRtsp, err = newServerTcp(p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
go p.rtpl.run() if p.metrics != nil {
go p.rtcpl.run() go p.metrics.run()
go p.rtspl.run() }
go p.serverRtp.run()
go p.serverRtcp.run()
go p.serverRtsp.run()
for _, s := range p.sources { for _, s := range p.sources {
go s.run() go s.run()
} }
@ -264,6 +281,13 @@ outer:
case rawEvt := <-p.events: case rawEvt := <-p.events:
switch evt := rawEvt.(type) { switch evt := rawEvt.(type) {
case programEventMetrics:
evt.res <- &metricsData{
clientCount: len(p.clients),
publisherCount: p.publisherCount,
readerCount: p.readerCount,
}
case programEventClientNew: case programEventClientNew:
c := newClient(p, evt.nconn) c := newClient(p, evt.nconn)
p.clients[c] = struct{}{} p.clients[c] = struct{}{}
@ -304,7 +328,7 @@ outer:
} }
} else { } else {
newPath(p, evt.path, p.findConfForPath(evt.path), false) p.paths[evt.path] = newPath(p, evt.path, p.findConfForPath(evt.path), false)
} }
p.paths[evt.path].publisher = evt.client p.paths[evt.path].publisher = evt.client
@ -413,6 +437,9 @@ outer:
go func() { go func() {
for rawEvt := range p.events { for rawEvt := range p.events {
switch evt := rawEvt.(type) { switch evt := rawEvt.(type) {
case programEventMetrics:
evt.res <- nil
case programEventClientClose: case programEventClientClose:
close(evt.done) close(evt.done)
@ -451,15 +478,19 @@ outer:
<-s.done <-s.done
} }
p.rtspl.close() p.serverRtsp.close()
p.rtcpl.close() p.serverRtcp.close()
p.rtpl.close() p.serverRtp.close()
for c := range p.clients { for c := range p.clients {
c.conn.NetConn().Close() c.conn.NetConn().Close()
<-c.done <-c.done
} }
if p.metrics != nil {
p.metrics.close()
}
close(p.events) close(p.events)
close(p.done) close(p.done)
} }
@ -514,7 +545,7 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St
if c.pathId == path && c.state == clientStatePlay { if c.pathId == path && c.state == clientStatePlay {
if c.streamProtocol == gortsplib.StreamProtocolUdp { if c.streamProtocol == gortsplib.StreamProtocolUdp {
if streamType == gortsplib.StreamTypeRtp { if streamType == gortsplib.StreamTypeRtp {
p.rtpl.write(&udpAddrBufPair{ p.serverRtp.write(&udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
IP: c.ip(), IP: c.ip(),
Zone: c.zone(), Zone: c.zone(),
@ -523,7 +554,7 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St
buf: frame, buf: frame,
}) })
} else { } else {
p.rtcpl.write(&udpAddrBufPair{ p.serverRtcp.write(&udpAddrBufPair{
addr: &net.UDPAddr{ addr: &net.UDPAddr{
IP: c.ip(), IP: c.ip(),
Zone: c.zone(), Zone: c.zone(),

77
metrics.go Normal file
View File

@ -0,0 +1,77 @@
package main
import (
"context"
"fmt"
"io"
"net/http"
"time"
)
const (
metricsAddress = ":9998"
)
type metricsData struct {
clientCount int
publisherCount int
readerCount int
}
type metrics struct {
p *program
mux *http.ServeMux
server *http.Server
}
func newMetrics(p *program) *metrics {
m := &metrics{
p: p,
}
m.mux = http.NewServeMux()
m.mux.HandleFunc("/metrics", m.onMetrics)
m.server = &http.Server{
Addr: metricsAddress,
Handler: m.mux,
}
m.log("opened on " + metricsAddress)
return m
}
func (m *metrics) log(format string, args ...interface{}) {
m.p.log("[metrics] "+format, args...)
}
func (m *metrics) run() {
err := m.server.ListenAndServe()
if err != http.ErrServerClosed {
panic(err)
}
}
func (m *metrics) close() {
m.server.Shutdown(context.Background())
}
func (m *metrics) onMetrics(w http.ResponseWriter, req *http.Request) {
res := make(chan *metricsData)
m.p.events <- programEventMetrics{res}
data := <-res
if data == nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
out := ""
now := time.Now().UnixNano() / 1000000
out += fmt.Sprintf("clients %d %v\n", data.clientCount, now)
out += fmt.Sprintf("publishers %d %v\n", data.publisherCount, now)
out += fmt.Sprintf("readers %d %v\n", data.readerCount, now)
w.WriteHeader(http.StatusOK)
io.WriteString(w, out)
}

View File

@ -28,7 +28,7 @@ type path struct {
onDemandCmd *exec.Cmd onDemandCmd *exec.Cmd
} }
func newPath(p *program, id string, confp *confPath, permanent bool) { func newPath(p *program, id string, confp *confPath, permanent bool) *path {
pa := &path{ pa := &path{
p: p, p: p,
id: id, id: id,
@ -36,7 +36,7 @@ func newPath(p *program, id string, confp *confPath, permanent bool) {
permanent: permanent, permanent: permanent,
} }
p.paths[id] = pa return pa
} }
func (pa *path) check() { func (pa *path) check() {

View File

@ -17,6 +17,8 @@ writeTimeout: 5s
# supported authentication methods # supported authentication methods
# WARNING: both methods are insecure, use RTSP inside a VPN to enforce security. # WARNING: both methods are insecure, use RTSP inside a VPN to enforce security.
authMethods: [basic, digest] authMethods: [basic, digest]
# enable Prometheus-compatible metrics on port 9998
metrics: false
# enable pprof on port 9999 to monitor performances # enable pprof on port 9999 to monitor performances
pprof: false pprof: false

View File

@ -200,14 +200,14 @@ func (s *source) doInner(terminate chan struct{}) bool {
func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool { func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) bool {
type trackListenerPair struct { type trackListenerPair struct {
rtpl *gortsplib.ConnClientUdpListener serverRtp *gortsplib.ConnClientUdpListener
rtcpl *gortsplib.ConnClientUdpListener serverRtcp *gortsplib.ConnClientUdpListener
} }
var listeners []*trackListenerPair var listeners []*trackListenerPair
for _, track := range s.tracks { for _, track := range s.tracks {
var rtpl *gortsplib.ConnClientUdpListener var serverRtp *gortsplib.ConnClientUdpListener
var rtcpl *gortsplib.ConnClientUdpListener var serverRtcp *gortsplib.ConnClientUdpListener
var err error var err error
for { for {
@ -216,7 +216,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000 rtpPort := (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort := rtpPort + 1 rtcpPort := rtpPort + 1
rtpl, rtcpl, _, err = conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort) serverRtp, serverRtcp, _, err = conn.SetupUdp(s.confp.sourceUrl, track, rtpPort, rtcpPort)
if err != nil { if err != nil {
// retry if it's a bind error // retry if it's a bind error
if nerr, ok := err.(*net.OpError); ok { if nerr, ok := err.(*net.OpError); ok {
@ -235,8 +235,8 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
} }
listeners = append(listeners, &trackListenerPair{ listeners = append(listeners, &trackListenerPair{
rtpl: rtpl, serverRtp: serverRtp,
rtcpl: rtcpl, serverRtcp: serverRtcp,
}) })
} }
@ -268,7 +268,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]} s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtp, buf[:n]}
} }
}(trackId, lp.rtpl) }(trackId, lp.serverRtp)
// receive RTCP packets // receive RTCP packets
go func(trackId int, l *gortsplib.ConnClientUdpListener) { go func(trackId int, l *gortsplib.ConnClientUdpListener) {
@ -285,7 +285,7 @@ func (s *source) runUdp(terminate chan struct{}, conn *gortsplib.ConnClient) boo
s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]} s.p.events <- programEventSourceFrame{s, trackId, gortsplib.StreamTypeRtcp, buf[:n]}
} }
}(trackId, lp.rtcpl) }(trackId, lp.serverRtcp)
} }
tcpConnDone := make(chan error) tcpConnDone := make(chan error)
@ -314,8 +314,8 @@ outer:
s.p.events <- programEventSourceNotReady{s} s.p.events <- programEventSourceNotReady{s}
for _, lp := range listeners { for _, lp := range listeners {
lp.rtpl.Close() lp.serverRtp.Close()
lp.rtcpl.Close() lp.serverRtcp.Close()
} }
wg.Wait() wg.Wait()