mediamtx/main.go

626 lines
15 KiB
Go

package main
import (
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"regexp"
"strings"
"time"
"github.com/aler9/gortsplib"
"gopkg.in/alecthomas/kingpin.v2"
"gortc.io/sdp"
)
var Version = "v0.0.0"
func parseIpCidrList(in string) ([]interface{}, error) {
if in == "" {
return nil, nil
}
var ret []interface{}
for _, t := range strings.Split(in, ",") {
_, ipnet, err := net.ParseCIDR(t)
if err == nil {
ret = append(ret, ipnet)
continue
}
ip := net.ParseIP(t)
if ip != nil {
ret = append(ret, ip)
continue
}
return nil, fmt.Errorf("unable to parse ip/network '%s'", t)
}
return ret, nil
}
type trackFlowType int
const (
_TRACK_FLOW_RTP trackFlowType = iota
_TRACK_FLOW_RTCP
)
type track struct {
rtpPort int
rtcpPort int
}
type streamProtocol int
const (
_STREAM_PROTOCOL_UDP streamProtocol = iota
_STREAM_PROTOCOL_TCP
)
func (s streamProtocol) String() string {
if s == _STREAM_PROTOCOL_UDP {
return "udp"
}
return "tcp"
}
type programEvent interface {
isProgramEvent()
}
type programEventClientNew struct {
nconn net.Conn
}
func (programEventClientNew) isProgramEvent() {}
type programEventClientClose struct {
done chan struct{}
client *serverClient
}
func (programEventClientClose) isProgramEvent() {}
type programEventClientGetStreamSdp struct {
path string
res chan []byte
}
func (programEventClientGetStreamSdp) isProgramEvent() {}
type programEventClientAnnounce struct {
res chan error
client *serverClient
path string
sdpText []byte
sdpParsed *sdp.Message
}
func (programEventClientAnnounce) isProgramEvent() {}
type programEventClientSetupPlay struct {
res chan error
client *serverClient
path string
protocol streamProtocol
rtpPort int
rtcpPort int
}
func (programEventClientSetupPlay) isProgramEvent() {}
type programEventClientSetupRecord struct {
res chan error
client *serverClient
protocol streamProtocol
rtpPort int
rtcpPort int
}
func (programEventClientSetupRecord) isProgramEvent() {}
type programEventClientPlay1 struct {
res chan error
client *serverClient
}
func (programEventClientPlay1) isProgramEvent() {}
type programEventClientPlay2 struct {
res chan error
client *serverClient
}
func (programEventClientPlay2) isProgramEvent() {}
type programEventClientPause struct {
res chan error
client *serverClient
}
func (programEventClientPause) isProgramEvent() {}
type programEventClientRecord struct {
res chan error
client *serverClient
}
func (programEventClientRecord) isProgramEvent() {}
type programEventFrameUdp struct {
trackFlowType trackFlowType
addr *net.UDPAddr
buf []byte
}
func (programEventFrameUdp) isProgramEvent() {}
type programEventFrameTcp struct {
path string
trackId int
trackFlowType trackFlowType
buf []byte
}
func (programEventFrameTcp) isProgramEvent() {}
type programEventTerminate struct{}
func (programEventTerminate) isProgramEvent() {}
type args struct {
version bool
protocolsStr string
rtspPort int
rtpPort int
rtcpPort int
readTimeout time.Duration
writeTimeout time.Duration
publishUser string
publishPass string
publishIps string
readUser string
readPass string
readIps string
preScript string
postScript string
pprof bool
}
type program struct {
args args
protocols map[streamProtocol]struct{}
publishIps []interface{}
readIps []interface{}
tcpl *serverTcpListener
udplRtp *serverUdpListener
udplRtcp *serverUdpListener
clients map[*serverClient]struct{}
publishers map[string]*serverClient
publisherCount int
receiverCount int
events chan programEvent
done chan struct{}
}
func newProgram(sargs []string) (*program, error) {
kingpin.CommandLine.Help = "rtsp-simple-server " + Version + "\n\n" +
"RTSP server."
argVersion := kingpin.Flag("version", "print version").Bool()
argProtocolsStr := kingpin.Flag("protocols", "supported protocols").Default("udp,tcp").String()
argRtspPort := kingpin.Flag("rtsp-port", "port of the RTSP TCP listener").Default("8554").Int()
argRtpPort := kingpin.Flag("rtp-port", "port of the RTP UDP listener").Default("8000").Int()
argRtcpPort := kingpin.Flag("rtcp-port", "port of the RTCP UDP listener").Default("8001").Int()
argReadTimeout := kingpin.Flag("read-timeout", "timeout of read operations").Default("5s").Duration()
argWriteTimeout := kingpin.Flag("write-timeout", "timeout of write operations").Default("5s").Duration()
argPublishUser := kingpin.Flag("publish-user", "optional username required to publish").Default("").String()
argPublishPass := kingpin.Flag("publish-pass", "optional password required to publish").Default("").String()
argPublishIps := kingpin.Flag("publish-ips", "comma-separated list of IPs or networks (x.x.x.x/24) that can publish").Default("").String()
argReadUser := kingpin.Flag("read-user", "optional username required to read").Default("").String()
argReadPass := kingpin.Flag("read-pass", "optional password required to read").Default("").String()
argReadIps := kingpin.Flag("read-ips", "comma-separated list of IPs or networks (x.x.x.x/24) that can read").Default("").String()
argPreScript := kingpin.Flag("pre-script", "optional script to run on client connect").Default("").String()
argPostScript := kingpin.Flag("post-script", "optional script to run on client disconnect").Default("").String()
argPprof := kingpin.Flag("pprof", "enable pprof on port 9999 to monitor performance").Default("false").Bool()
kingpin.MustParse(kingpin.CommandLine.Parse(sargs))
args := args{
version: *argVersion,
protocolsStr: *argProtocolsStr,
rtspPort: *argRtspPort,
rtpPort: *argRtpPort,
rtcpPort: *argRtcpPort,
readTimeout: *argReadTimeout,
writeTimeout: *argWriteTimeout,
publishUser: *argPublishUser,
publishPass: *argPublishPass,
publishIps: *argPublishIps,
readUser: *argReadUser,
readPass: *argReadPass,
readIps: *argReadIps,
preScript: *argPreScript,
postScript: *argPostScript,
pprof: *argPprof,
}
if args.version == true {
fmt.Println(Version)
os.Exit(0)
}
protocols := make(map[streamProtocol]struct{})
for _, proto := range strings.Split(args.protocolsStr, ",") {
switch proto {
case "udp":
protocols[_STREAM_PROTOCOL_UDP] = struct{}{}
case "tcp":
protocols[_STREAM_PROTOCOL_TCP] = struct{}{}
default:
return nil, fmt.Errorf("unsupported protocol: %s", proto)
}
}
if len(protocols) == 0 {
return nil, fmt.Errorf("no protocols provided")
}
if (args.rtpPort % 2) != 0 {
return nil, fmt.Errorf("rtp port must be even")
}
if args.rtcpPort != (args.rtpPort + 1) {
return nil, fmt.Errorf("rtcp and rtp ports must be consecutive")
}
if args.publishUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(args.publishUser) {
return nil, fmt.Errorf("publish username must be alphanumeric")
}
}
if args.publishPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(args.publishPass) {
return nil, fmt.Errorf("publish password must be alphanumeric")
}
}
publishIps, err := parseIpCidrList(args.publishIps)
if err != nil {
return nil, err
}
if args.readUser != "" && args.readPass == "" || args.readUser == "" && args.readPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
if args.readUser != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(args.readUser) {
return nil, fmt.Errorf("read username must be alphanumeric")
}
}
if args.readPass != "" {
if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(args.readPass) {
return nil, fmt.Errorf("read password must be alphanumeric")
}
}
if args.readUser != "" && args.readPass == "" || args.readUser == "" && args.readPass != "" {
return nil, fmt.Errorf("read username and password must be both filled")
}
readIps, err := parseIpCidrList(args.readIps)
if err != nil {
return nil, err
}
p := &program{
args: args,
protocols: protocols,
publishIps: publishIps,
readIps: readIps,
clients: make(map[*serverClient]struct{}),
publishers: make(map[string]*serverClient),
events: make(chan programEvent),
done: make(chan struct{}),
}
p.log("rtsp-simple-server %s", Version)
if args.pprof {
go func(mux *http.ServeMux) {
server := &http.Server{
Addr: ":9999",
Handler: mux,
}
p.log("pprof is available on :9999")
panic(server.ListenAndServe())
}(http.DefaultServeMux)
http.DefaultServeMux = http.NewServeMux()
}
p.udplRtp, err = newServerUdpListener(p, args.rtpPort, _TRACK_FLOW_RTP)
if err != nil {
return nil, err
}
p.udplRtcp, err = newServerUdpListener(p, args.rtcpPort, _TRACK_FLOW_RTCP)
if err != nil {
return nil, err
}
p.tcpl, err = newServerTcpListener(p)
if err != nil {
return nil, err
}
go p.udplRtp.run()
go p.udplRtcp.run()
go p.tcpl.run()
go p.run()
return p, nil
}
func (p *program) log(format string, args ...interface{}) {
log.Printf("[%d/%d/%d] "+format, append([]interface{}{len(p.clients),
p.publisherCount, p.receiverCount}, args...)...)
}
func (p *program) run() {
outer:
for rawEvt := range p.events {
switch evt := rawEvt.(type) {
case programEventClientNew:
c := newServerClient(p, evt.nconn)
p.clients[c] = struct{}{}
c.log("connected")
case programEventClientClose:
// already deleted
if _, ok := p.clients[evt.client]; !ok {
close(evt.done)
continue
}
delete(p.clients, evt.client)
if evt.client.path != "" {
if pub, ok := p.publishers[evt.client.path]; ok && pub == evt.client {
delete(p.publishers, evt.client.path)
// if the publisher has disconnected
// close all other connections that share the same path
for oc := range p.clients {
if oc.path == evt.client.path {
go oc.close()
}
}
}
}
switch evt.client.state {
case _CLIENT_STATE_PLAY:
p.receiverCount -= 1
case _CLIENT_STATE_RECORD:
p.publisherCount -= 1
}
evt.client.log("disconnected")
close(evt.done)
case programEventClientGetStreamSdp:
pub, ok := p.publishers[evt.path]
if !ok {
evt.res <- nil
continue
}
evt.res <- pub.streamSdpText
case programEventClientAnnounce:
_, ok := p.publishers[evt.path]
if ok {
evt.res <- fmt.Errorf("another client is already publishing on path '%s'", evt.path)
continue
}
evt.client.path = evt.path
evt.client.streamSdpText = evt.sdpText
evt.client.streamSdpParsed = evt.sdpParsed
evt.client.state = _CLIENT_STATE_ANNOUNCE
p.publishers[evt.path] = evt.client
evt.res <- nil
case programEventClientSetupPlay:
pub, ok := p.publishers[evt.path]
if !ok {
evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.path)
continue
}
if len(evt.client.streamTracks) >= len(pub.streamSdpParsed.Medias) {
evt.res <- fmt.Errorf("all the tracks have already been setup")
continue
}
evt.client.path = evt.path
evt.client.streamProtocol = evt.protocol
evt.client.streamTracks = append(evt.client.streamTracks, &track{
rtpPort: evt.rtpPort,
rtcpPort: evt.rtcpPort,
})
evt.client.state = _CLIENT_STATE_PRE_PLAY
evt.res <- nil
case programEventClientSetupRecord:
evt.client.streamProtocol = evt.protocol
evt.client.streamTracks = append(evt.client.streamTracks, &track{
rtpPort: evt.rtpPort,
rtcpPort: evt.rtcpPort,
})
evt.client.state = _CLIENT_STATE_PRE_RECORD
evt.res <- nil
case programEventClientPlay1:
pub, ok := p.publishers[evt.client.path]
if !ok {
evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.client.path)
continue
}
if len(evt.client.streamTracks) != len(pub.streamSdpParsed.Medias) {
evt.res <- fmt.Errorf("not all tracks have been setup")
continue
}
evt.res <- nil
case programEventClientPlay2:
p.receiverCount += 1
evt.client.state = _CLIENT_STATE_PLAY
evt.res <- nil
case programEventClientPause:
p.receiverCount -= 1
evt.client.state = _CLIENT_STATE_PRE_PLAY
evt.res <- nil
case programEventClientRecord:
p.publisherCount += 1
evt.client.state = _CLIENT_STATE_RECORD
evt.res <- nil
case programEventFrameUdp:
// find publisher and track id from ip and port
pub, trackId := func() (*serverClient, int) {
for _, pub := range p.publishers {
if pub.streamProtocol != _STREAM_PROTOCOL_UDP ||
pub.state != _CLIENT_STATE_RECORD ||
!pub.ip().Equal(evt.addr.IP) {
continue
}
for i, t := range pub.streamTracks {
if evt.trackFlowType == _TRACK_FLOW_RTP {
if t.rtpPort == evt.addr.Port {
return pub, i
}
} else {
if t.rtcpPort == evt.addr.Port {
return pub, i
}
}
}
}
return nil, -1
}()
if pub == nil {
continue
}
pub.udpLastFrameTime = time.Now()
p.forwardTrack(pub.path, trackId, evt.trackFlowType, evt.buf)
case programEventFrameTcp:
p.forwardTrack(evt.path, evt.trackId, evt.trackFlowType, evt.buf)
case programEventTerminate:
break outer
}
}
go func() {
for rawEvt := range p.events {
switch evt := rawEvt.(type) {
case programEventClientClose:
close(evt.done)
case programEventClientGetStreamSdp:
evt.res <- nil
case programEventClientAnnounce:
evt.res <- fmt.Errorf("terminated")
case programEventClientSetupPlay:
evt.res <- fmt.Errorf("terminated")
case programEventClientSetupRecord:
evt.res <- fmt.Errorf("terminated")
case programEventClientPlay1:
evt.res <- fmt.Errorf("terminated")
case programEventClientPlay2:
evt.res <- fmt.Errorf("terminated")
case programEventClientPause:
evt.res <- fmt.Errorf("terminated")
case programEventClientRecord:
evt.res <- fmt.Errorf("terminated")
}
}
}()
p.tcpl.close()
p.udplRtcp.close()
p.udplRtp.close()
for c := range p.clients {
c.close()
}
close(p.events)
close(p.done)
}
func (p *program) close() {
p.events <- programEventTerminate{}
<-p.done
}
func (p *program) forwardTrack(path string, id int, trackFlowType trackFlowType, frame []byte) {
for c := range p.clients {
if c.path == path && c.state == _CLIENT_STATE_PLAY {
if c.streamProtocol == _STREAM_PROTOCOL_UDP {
if trackFlowType == _TRACK_FLOW_RTP {
p.udplRtp.write <- &udpWrite{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[id].rtpPort,
},
buf: frame,
}
} else {
p.udplRtcp.write <- &udpWrite{
addr: &net.UDPAddr{
IP: c.ip(),
Zone: c.zone(),
Port: c.streamTracks[id].rtcpPort,
},
buf: frame,
}
}
} else {
c.write <- &gortsplib.InterleavedFrame{
Channel: trackToInterleavedChannel(id, trackFlowType),
Content: frame,
}
}
}
}
}
func main() {
_, err := newProgram(os.Args[1:])
if err != nil {
log.Fatal("ERR: ", err)
}
select {}
}