2019-12-28 21:07:03 +00:00
package main
import (
"fmt"
2020-06-27 15:23:43 +00:00
"io"
2019-12-28 21:07:03 +00:00
"log"
2020-06-15 20:41:14 +00:00
"net"
2019-12-28 21:07:03 +00:00
"os"
2020-07-29 21:30:42 +00:00
"time"
2019-12-28 21:07:03 +00:00
2020-07-12 10:34:35 +00:00
"github.com/aler9/gortsplib"
2019-12-28 21:07:03 +00:00
"gopkg.in/alecthomas/kingpin.v2"
)
2020-06-26 09:00:10 +00:00
var Version = "v0.0.0"
2019-12-28 21:07:03 +00:00
2020-08-30 13:27:03 +00:00
const (
checkPathPeriod = 5 * time . Second
)
2020-07-30 17:21:36 +00:00
type logDestination int
const (
logDestinationStdout logDestination = iota
logDestinationFile
)
2020-05-10 13:33:42 +00:00
type program struct {
2020-08-05 12:59:38 +00:00
conf * conf
logFile * os . File
metrics * metrics
2020-08-30 12:10:05 +00:00
pprof * pprof
2020-08-30 12:15:00 +00:00
paths map [ string ] * path
2020-08-05 12:59:38 +00:00
serverRtp * serverUdp
serverRtcp * serverUdp
2020-08-30 12:15:00 +00:00
serverRtsp * serverTcp
2020-08-05 12:59:38 +00:00
clients map [ * client ] struct { }
udpClientsByAddr map [ udpClientAddr ] * udpClient
publisherCount int
readerCount int
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
metricsGather chan metricsGatherReq
clientNew chan net . Conn
clientClose chan * client
clientDescribe chan clientDescribeReq
clientAnnounce chan clientAnnounceReq
clientSetupPlay chan clientSetupPlayReq
clientPlay chan * client
clientRecord chan * client
clientFrameUdp chan clientFrameUdpReq
clientFrameTcp chan clientFrameTcpReq
sourceReady chan * source
sourceNotReady chan * source
sourceFrame chan sourceFrameReq
terminate chan struct { }
done chan struct { }
2020-05-10 13:33:42 +00:00
}
2020-02-16 15:05:08 +00:00
2020-07-20 09:42:56 +00:00
func newProgram ( args [ ] string , stdin io . Reader ) ( * program , error ) {
2020-06-27 19:22:50 +00:00
k := kingpin . New ( "rtsp-simple-server" ,
"rtsp-simple-server " + Version + "\n\nRTSP server." )
2020-05-11 07:31:56 +00:00
2020-06-27 19:22:50 +00:00
argVersion := k . Flag ( "version" , "print version" ) . Bool ( )
2020-07-13 15:27:04 +00:00
argConfPath := k . Arg ( "confpath" , "path to a config file. The default is rtsp-simple-server.yml. Use 'stdin' to read config from stdin" ) . Default ( "rtsp-simple-server.yml" ) . String ( )
2020-05-11 07:31:56 +00:00
2020-07-20 09:42:56 +00:00
kingpin . MustParse ( k . Parse ( args ) )
2020-05-11 07:31:56 +00:00
2020-06-27 15:23:43 +00:00
if * argVersion == true {
2020-05-11 07:39:49 +00:00
fmt . Println ( Version )
2020-05-10 19:32:40 +00:00
os . Exit ( 0 )
2020-01-26 11:58:56 +00:00
}
2020-06-27 15:23:43 +00:00
conf , err := loadConf ( * argConfPath , stdin )
if err != nil {
return nil , err
}
2020-06-30 13:12:39 +00:00
p := & program {
2020-08-05 12:59:38 +00:00
conf : conf ,
2020-08-30 12:15:00 +00:00
paths : make ( map [ string ] * path ) ,
2020-08-05 12:59:38 +00:00
clients : make ( map [ * client ] struct { } ) ,
udpClientsByAddr : make ( map [ udpClientAddr ] * udpClient ) ,
2020-08-31 22:01:17 +00:00
metricsGather : make ( chan metricsGatherReq ) ,
clientNew : make ( chan net . Conn ) ,
clientClose : make ( chan * client ) ,
clientDescribe : make ( chan clientDescribeReq ) ,
clientAnnounce : make ( chan clientAnnounceReq ) ,
clientSetupPlay : make ( chan clientSetupPlayReq ) ,
clientPlay : make ( chan * client ) ,
clientRecord : make ( chan * client ) ,
clientFrameUdp : make ( chan clientFrameUdpReq ) ,
clientFrameTcp : make ( chan clientFrameTcpReq ) ,
sourceReady : make ( chan * source ) ,
sourceNotReady : make ( chan * source ) ,
sourceFrame : make ( chan sourceFrameReq ) ,
terminate : make ( chan struct { } ) ,
2020-08-05 12:59:38 +00:00
done : make ( chan struct { } ) ,
2020-06-30 13:12:39 +00:00
}
2020-07-30 17:21:36 +00:00
if _ , ok := p . conf . logDestinationsParsed [ logDestinationFile ] ; ok {
p . logFile , err = os . OpenFile ( p . conf . LogFile , os . O_APPEND | os . O_CREATE | os . O_WRONLY , 0644 )
if err != nil {
2020-08-30 11:44:15 +00:00
return nil , err
2020-07-30 17:21:36 +00:00
}
}
p . log ( "rtsp-simple-server %s" , Version )
2020-07-30 15:30:50 +00:00
if conf . Metrics {
2020-08-30 11:44:15 +00:00
p . metrics , err = newMetrics ( p )
if err != nil {
return nil , err
}
2020-07-30 15:30:50 +00:00
}
2020-06-27 15:23:43 +00:00
if conf . Pprof {
2020-08-30 12:10:05 +00:00
p . pprof , err = newPprof ( p )
if err != nil {
return nil , err
}
2020-06-27 13:42:54 +00:00
}
2020-08-30 12:15:00 +00:00
for name , confp := range conf . Paths {
if name == "all" {
continue
}
p . paths [ name ] = newPath ( p , name , confp , true )
}
2020-08-17 16:22:14 +00:00
if _ , ok := conf . protocolsParsed [ gortsplib . StreamProtocolUdp ] ; ok {
2020-08-29 17:48:41 +00:00
p . serverRtp , err = newServerUdp ( p , conf . RtpPort , gortsplib . StreamTypeRtp )
2020-08-17 16:22:14 +00:00
if err != nil {
return nil , err
}
2019-12-28 21:07:03 +00:00
2020-08-29 17:48:41 +00:00
p . serverRtcp , err = newServerUdp ( p , conf . RtcpPort , gortsplib . StreamTypeRtcp )
2020-08-17 16:22:14 +00:00
if err != nil {
return nil , err
}
2019-12-28 21:07:03 +00:00
}
2020-08-29 17:48:41 +00:00
p . serverRtsp , err = newServerTcp ( p )
if err != nil {
return nil , err
}
2020-06-27 11:38:35 +00:00
go p . run ( )
2019-12-28 21:07:03 +00:00
2020-05-10 13:33:42 +00:00
return p , nil
2019-12-28 21:07:03 +00:00
}
2020-06-27 12:18:16 +00:00
func ( p * program ) log ( format string , args ... interface { } ) {
2020-07-30 17:21:36 +00:00
line := fmt . Sprintf ( "[%d/%d/%d] " + format , append ( [ ] interface { } { len ( p . clients ) ,
2020-07-20 09:42:56 +00:00
p . publisherCount , p . readerCount } , args ... ) ... )
2020-07-30 17:21:36 +00:00
if _ , ok := p . conf . logDestinationsParsed [ logDestinationStdout ] ; ok {
log . Println ( line )
}
2020-08-30 13:27:03 +00:00
2020-07-30 17:21:36 +00:00
if _ , ok := p . conf . logDestinationsParsed [ logDestinationFile ] ; ok {
p . logFile . WriteString ( line + "\n" )
}
2020-06-27 12:18:16 +00:00
}
2020-06-27 11:38:35 +00:00
func ( p * program ) run ( ) {
2020-08-30 12:27:26 +00:00
if p . metrics != nil {
go p . metrics . run ( )
}
if p . pprof != nil {
go p . pprof . run ( )
}
if p . serverRtp != nil {
go p . serverRtp . run ( )
}
if p . serverRtcp != nil {
go p . serverRtcp . run ( )
}
go p . serverRtsp . run ( )
for _ , p := range p . paths {
p . onInit ( )
}
2020-08-30 13:27:03 +00:00
checkPathsTicker := time . NewTicker ( checkPathPeriod )
2020-07-29 21:30:42 +00:00
defer checkPathsTicker . Stop ( )
2020-06-27 11:38:35 +00:00
outer :
2020-07-29 21:30:42 +00:00
for {
select {
case <- checkPathsTicker . C :
for _ , path := range p . paths {
2020-08-30 11:18:43 +00:00
path . onCheck ( )
2020-06-27 11:38:35 +00:00
}
2020-08-31 22:01:17 +00:00
case req := <- p . metricsGather :
req . res <- & metricsData {
clientCount : len ( p . clients ) ,
publisherCount : p . publisherCount ,
readerCount : p . readerCount ,
}
2020-07-30 15:30:50 +00:00
2020-08-31 22:01:17 +00:00
case conn := <- p . clientNew :
c := newClient ( p , conn )
p . clients [ c ] = struct { } { }
c . log ( "connected" )
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case client := <- p . clientClose :
if _ , ok := p . clients [ client ] ; ! ok {
continue
}
client . close ( )
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case req := <- p . clientDescribe :
// create path if not exist
if _ , ok := p . paths [ req . pathName ] ; ! ok {
p . paths [ req . pathName ] = newPath ( p , req . pathName , p . findConfForPathName ( req . pathName ) , false )
}
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
p . paths [ req . pathName ] . onDescribe ( req . client )
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case req := <- p . clientAnnounce :
// create path if not exist
if path , ok := p . paths [ req . pathName ] ; ! ok {
p . paths [ req . pathName ] = newPath ( p , req . pathName , p . findConfForPathName ( req . pathName ) , false )
2020-08-30 13:27:03 +00:00
2020-08-31 22:01:17 +00:00
} else {
if path . publisher != nil {
req . res <- fmt . Errorf ( "someone is already publishing on path '%s'" , req . pathName )
continue
2020-07-29 21:30:42 +00:00
}
2020-08-31 22:01:17 +00:00
}
2020-06-30 13:12:39 +00:00
2020-08-31 22:01:17 +00:00
p . paths [ req . pathName ] . publisher = req . client
p . paths [ req . pathName ] . publisherSdpText = req . sdpText
p . paths [ req . pathName ] . publisherSdpParsed = req . sdpParsed
2020-08-31 13:31:37 +00:00
2020-08-31 22:01:17 +00:00
req . client . path = p . paths [ req . pathName ]
req . client . state = clientStatePreRecord
req . res <- nil
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case req := <- p . clientSetupPlay :
path , ok := p . paths [ req . pathName ]
if ! ok || ! path . publisherReady {
req . res <- fmt . Errorf ( "no one is publishing on path '%s'" , req . pathName )
continue
}
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
if req . trackId >= len ( path . publisherSdpParsed . MediaDescriptions ) {
req . res <- fmt . Errorf ( "track %d does not exist" , req . trackId )
continue
}
req . client . path = path
req . client . state = clientStatePrePlay
req . res <- nil
case client := <- p . clientPlay :
p . readerCount += 1
client . state = clientStatePlay
case client := <- p . clientRecord :
p . publisherCount += 1
client . state = clientStateRecord
if client . streamProtocol == gortsplib . StreamProtocolUdp {
for trackId , track := range client . streamTracks {
key := makeUdpClientAddr ( client . ip ( ) , track . rtpPort )
p . udpClientsByAddr [ key ] = & udpClient {
client : client ,
trackId : trackId ,
streamType : gortsplib . StreamTypeRtp ,
}
2020-06-30 13:12:39 +00:00
2020-08-31 22:01:17 +00:00
key = makeUdpClientAddr ( client . ip ( ) , track . rtcpPort )
p . udpClientsByAddr [ key ] = & udpClient {
client : client ,
trackId : trackId ,
streamType : gortsplib . StreamTypeRtcp ,
2020-08-05 12:59:38 +00:00
}
2020-08-05 08:18:38 +00:00
}
2020-08-31 22:01:17 +00:00
}
2020-08-05 12:59:38 +00:00
2020-08-31 22:01:17 +00:00
client . path . onPublisherSetReady ( )
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case req := <- p . clientFrameUdp :
pub , ok := p . udpClientsByAddr [ makeUdpClientAddr ( req . addr . IP , req . addr . Port ) ]
if ! ok {
continue
}
2020-08-05 12:59:38 +00:00
2020-08-31 22:01:17 +00:00
// client sent RTP on RTCP port or vice-versa
if pub . streamType != req . streamType {
continue
}
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
pub . client . rtcpReceivers [ pub . trackId ] . OnFrame ( req . streamType , req . buf )
p . forwardFrame ( pub . client . path , pub . trackId , req . streamType , req . buf )
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case req := <- p . clientFrameTcp :
p . forwardFrame ( req . path , req . trackId , req . streamType , req . buf )
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case source := <- p . sourceReady :
2020-08-31 22:13:27 +00:00
source . path . log ( "source ready" )
2020-08-31 22:01:17 +00:00
source . path . onPublisherSetReady ( )
2020-06-30 13:12:39 +00:00
2020-08-31 22:01:17 +00:00
case source := <- p . sourceNotReady :
2020-08-31 22:13:27 +00:00
source . path . log ( "source not ready" )
2020-08-31 22:01:17 +00:00
source . path . onPublisherSetNotReady ( )
2020-06-30 13:12:39 +00:00
2020-08-31 22:01:17 +00:00
case req := <- p . sourceFrame :
p . forwardFrame ( req . source . path , req . trackId , req . streamType , req . buf )
2020-06-30 13:12:39 +00:00
2020-08-31 22:01:17 +00:00
case <- p . terminate :
break outer
2020-06-27 11:38:35 +00:00
}
}
go func ( ) {
2020-08-31 22:01:17 +00:00
for {
select {
case req , ok := <- p . metricsGather :
if ! ok {
return
}
req . res <- nil
case <- p . clientNew :
case <- p . clientClose :
case <- p . clientDescribe :
case req := <- p . clientAnnounce :
req . res <- fmt . Errorf ( "terminated" )
2020-07-30 15:30:50 +00:00
2020-08-31 22:01:17 +00:00
case req := <- p . clientSetupPlay :
req . res <- fmt . Errorf ( "terminated" )
2020-06-27 11:38:35 +00:00
2020-08-31 22:01:17 +00:00
case <- p . clientPlay :
case <- p . clientRecord :
case <- p . clientFrameUdp :
case <- p . clientFrameTcp :
case <- p . sourceReady :
case <- p . sourceNotReady :
case <- p . sourceFrame :
2020-06-27 11:38:35 +00:00
}
}
} ( )
2020-08-30 11:31:46 +00:00
for _ , p := range p . paths {
2020-09-03 14:02:58 +00:00
p . onClose ( true )
2020-08-30 11:31:46 +00:00
}
2020-08-29 17:48:41 +00:00
p . serverRtsp . close ( )
2020-08-30 12:27:26 +00:00
if p . serverRtcp != nil {
2020-08-17 16:22:14 +00:00
p . serverRtcp . close ( )
2020-08-30 12:27:26 +00:00
}
if p . serverRtp != nil {
2020-08-17 16:22:14 +00:00
p . serverRtp . close ( )
}
2020-06-27 11:38:35 +00:00
for c := range p . clients {
2020-08-31 20:29:30 +00:00
c . close ( )
2020-07-29 21:30:42 +00:00
<- c . done
2020-06-27 11:38:35 +00:00
}
2020-07-30 15:30:50 +00:00
if p . metrics != nil {
p . metrics . close ( )
}
2020-08-30 12:27:26 +00:00
if p . pprof != nil {
p . pprof . close ( )
}
2020-07-30 17:21:36 +00:00
if p . logFile != nil {
p . logFile . Close ( )
}
2020-08-31 22:01:17 +00:00
close ( p . metricsGather )
close ( p . clientNew )
close ( p . clientClose )
close ( p . clientDescribe )
close ( p . clientAnnounce )
close ( p . clientSetupPlay )
close ( p . clientPlay )
close ( p . clientRecord )
close ( p . clientFrameUdp )
close ( p . clientFrameTcp )
close ( p . sourceReady )
close ( p . sourceNotReady )
close ( p . sourceFrame )
2020-06-27 11:38:35 +00:00
close ( p . done )
}
func ( p * program ) close ( ) {
2020-08-31 22:01:17 +00:00
close ( p . terminate )
2020-06-27 11:38:35 +00:00
<- p . done
}
2020-08-30 11:20:18 +00:00
func ( p * program ) findConfForPathName ( name string ) * confPath {
2020-08-05 09:49:36 +00:00
if confp , ok := p . conf . Paths [ name ] ; ok {
2020-07-30 11:31:18 +00:00
return confp
2020-07-29 21:30:42 +00:00
}
2020-07-30 11:31:18 +00:00
if confp , ok := p . conf . Paths [ "all" ] ; ok {
return confp
2020-07-29 21:30:42 +00:00
}
return nil
}
2020-08-30 13:51:28 +00:00
func ( p * program ) forwardFrame ( path * path , trackId int , streamType gortsplib . StreamType , frame [ ] byte ) {
2020-07-30 11:31:18 +00:00
for c := range p . clients {
2020-08-30 13:51:28 +00:00
if c . path != path ||
2020-08-03 15:35:34 +00:00
c . state != clientStatePlay {
continue
}
2020-06-27 11:38:35 +00:00
2020-08-03 15:35:34 +00:00
track , ok := c . streamTracks [ trackId ]
if ! ok {
continue
}
if c . streamProtocol == gortsplib . StreamProtocolUdp {
if streamType == gortsplib . StreamTypeRtp {
2020-09-02 08:59:02 +00:00
p . serverRtp . write ( frame , & net . UDPAddr {
IP : c . ip ( ) ,
Zone : c . zone ( ) ,
Port : track . rtpPort ,
2020-08-03 15:35:34 +00:00
} )
2020-09-02 08:59:02 +00:00
2020-06-27 11:38:35 +00:00
} else {
2020-09-02 08:59:02 +00:00
p . serverRtcp . write ( frame , & net . UDPAddr {
IP : c . ip ( ) ,
Zone : c . zone ( ) ,
Port : track . rtcpPort ,
2020-08-03 15:35:34 +00:00
} )
}
} else {
2020-08-31 13:31:37 +00:00
c . tcpFrame <- & gortsplib . InterleavedFrame {
TrackId : trackId ,
StreamType : streamType ,
Content : frame ,
2020-06-27 11:38:35 +00:00
}
}
}
2020-05-10 14:33:20 +00:00
}
2019-12-28 21:07:03 +00:00
func main ( ) {
2020-06-27 15:23:43 +00:00
_ , err := newProgram ( os . Args [ 1 : ] , os . Stdin )
2019-12-28 21:07:03 +00:00
if err != nil {
2020-08-30 12:10:05 +00:00
log . Fatal ( "ERR: " , err )
2019-12-28 21:07:03 +00:00
}
2020-06-24 20:42:39 +00:00
select { }
2019-12-28 21:07:03 +00:00
}