2019-12-31 12:48:17 +00:00
package main
import (
2020-05-10 20:56:46 +00:00
"errors"
2019-12-31 12:48:17 +00:00
"fmt"
"io"
"log"
"net"
2020-02-15 17:25:29 +00:00
"os/exec"
2019-12-31 12:48:17 +00:00
"strings"
2020-01-20 09:21:05 +00:00
"github.com/aler9/gortsplib"
2019-12-31 12:48:17 +00:00
"gortc.io/sdp"
)
2020-01-26 11:41:26 +00:00
func interleavedChannelToTrack ( channel uint8 ) ( int , trackFlow ) {
2019-12-31 12:48:17 +00:00
if ( channel % 2 ) == 0 {
2020-01-26 11:41:26 +00:00
return int ( channel / 2 ) , _TRACK_FLOW_RTP
2019-12-31 12:48:17 +00:00
}
2020-01-26 11:41:26 +00:00
return int ( ( channel - 1 ) / 2 ) , _TRACK_FLOW_RTCP
2019-12-31 12:48:17 +00:00
}
2020-01-26 11:41:26 +00:00
func trackToInterleavedChannel ( id int , flow trackFlow ) uint8 {
2019-12-31 12:48:17 +00:00
if flow == _TRACK_FLOW_RTP {
2020-01-26 11:41:26 +00:00
return uint8 ( id * 2 )
2019-12-31 12:48:17 +00:00
}
2020-01-26 11:41:26 +00:00
return uint8 ( ( id * 2 ) + 1 )
2019-12-31 12:48:17 +00:00
}
2020-01-20 13:41:04 +00:00
type clientState int
const (
_CLIENT_STATE_STARTING clientState = iota
_CLIENT_STATE_ANNOUNCE
_CLIENT_STATE_PRE_PLAY
_CLIENT_STATE_PLAY
_CLIENT_STATE_PRE_RECORD
_CLIENT_STATE_RECORD
)
2020-05-21 19:46:22 +00:00
func ( cs clientState ) String ( ) string {
switch cs {
case _CLIENT_STATE_STARTING :
return "STARTING"
case _CLIENT_STATE_ANNOUNCE :
return "ANNOUNCE"
case _CLIENT_STATE_PRE_PLAY :
return "PRE_PLAY"
case _CLIENT_STATE_PLAY :
return "PLAY"
case _CLIENT_STATE_PRE_RECORD :
return "PRE_RECORD"
case _CLIENT_STATE_RECORD :
return "RECORD"
}
return "UNKNOWN"
}
2020-05-03 10:38:11 +00:00
type serverClient struct {
2019-12-31 12:48:17 +00:00
p * program
2020-01-26 11:41:26 +00:00
conn * gortsplib . ConnServer
2020-01-20 13:41:04 +00:00
state clientState
2019-12-31 13:55:46 +00:00
path string
2020-05-10 20:56:46 +00:00
publishAuth * gortsplib . AuthServer
readAuth * gortsplib . AuthServer
2019-12-31 12:48:17 +00:00
streamSdpText [ ] byte // filled only if publisher
streamSdpParsed * sdp . Message // filled only if publisher
streamProtocol streamProtocol
streamTracks [ ] * track
2020-05-10 13:38:01 +00:00
write chan * gortsplib . InterleavedFrame
2020-05-10 14:23:57 +00:00
done chan struct { }
2019-12-31 12:48:17 +00:00
}
2020-05-03 10:38:11 +00:00
func newServerClient ( p * program , nconn net . Conn ) * serverClient {
c := & serverClient {
2020-05-07 20:30:03 +00:00
p : p ,
conn : gortsplib . NewConnServer ( gortsplib . ConnServerConf {
NConn : nconn ,
2020-05-10 20:35:56 +00:00
ReadTimeout : p . args . readTimeout ,
WriteTimeout : p . args . writeTimeout ,
2020-05-07 20:30:03 +00:00
} ) ,
2020-05-10 13:38:01 +00:00
state : _CLIENT_STATE_STARTING ,
write : make ( chan * gortsplib . InterleavedFrame ) ,
2020-05-10 14:23:57 +00:00
done : make ( chan struct { } ) ,
2019-12-31 12:48:17 +00:00
}
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
c . p . tcpl . clients [ c ] = struct { } { }
c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
2020-05-10 14:23:57 +00:00
go c . run ( )
2019-12-31 12:48:17 +00:00
return c
}
2020-05-03 10:38:11 +00:00
func ( c * serverClient ) close ( ) error {
2019-12-31 12:48:17 +00:00
// already deleted
2020-05-10 14:08:41 +00:00
if _ , ok := c . p . tcpl . clients [ c ] ; ! ok {
2019-12-31 12:48:17 +00:00
return nil
}
2020-05-10 14:08:41 +00:00
delete ( c . p . tcpl . clients , c )
2020-01-20 15:44:02 +00:00
c . conn . NetConn ( ) . Close ( )
2020-05-10 13:38:01 +00:00
close ( c . write )
2019-12-31 12:48:17 +00:00
2019-12-31 13:55:46 +00:00
if c . path != "" {
2020-05-10 14:08:41 +00:00
if pub , ok := c . p . tcpl . publishers [ c . path ] ; ok && pub == c {
delete ( c . p . tcpl . publishers , c . path )
2019-12-31 12:48:17 +00:00
2019-12-31 13:55:46 +00:00
// if the publisher has disconnected
// close all other connections that share the same path
2020-05-10 14:08:41 +00:00
for oc := range c . p . tcpl . clients {
2019-12-31 13:55:46 +00:00
if oc . path == c . path {
oc . close ( )
}
}
2019-12-31 12:48:17 +00:00
}
}
return nil
}
2020-05-03 10:38:11 +00:00
func ( c * serverClient ) log ( format string , args ... interface { } ) {
2020-05-03 17:24:54 +00:00
// keep remote address outside format, since it can contain %
log . Println ( "[RTSP client " + c . conn . NetConn ( ) . RemoteAddr ( ) . String ( ) + "] " +
fmt . Sprintf ( format , args ... ) )
2019-12-31 12:48:17 +00:00
}
2020-05-03 21:26:41 +00:00
func ( c * serverClient ) ip ( ) net . IP {
return c . conn . NetConn ( ) . RemoteAddr ( ) . ( * net . TCPAddr ) . IP
}
func ( c * serverClient ) zone ( ) string {
return c . conn . NetConn ( ) . RemoteAddr ( ) . ( * net . TCPAddr ) . Zone
}
2020-05-03 10:38:11 +00:00
func ( c * serverClient ) run ( ) {
2019-12-31 12:48:17 +00:00
c . log ( "connected" )
2020-05-10 13:33:42 +00:00
if c . p . args . preScript != "" {
preScript := exec . Command ( c . p . args . preScript )
2020-02-16 12:04:43 +00:00
err := preScript . Run ( )
if err != nil {
c . log ( "ERR: %s" , err )
}
}
2020-02-15 17:25:29 +00:00
2019-12-31 12:48:17 +00:00
for {
2020-01-20 15:44:02 +00:00
req , err := c . conn . ReadRequest ( )
2019-12-31 12:48:17 +00:00
if err != nil {
if err != io . EOF {
c . log ( "ERR: %s" , err )
}
2020-05-10 14:23:57 +00:00
break
2019-12-31 12:48:17 +00:00
}
2020-01-03 22:05:06 +00:00
ok := c . handleRequest ( req )
if ! ok {
2020-05-10 14:23:57 +00:00
break
2020-01-03 22:05:06 +00:00
}
}
2020-05-10 14:23:57 +00:00
func ( ) {
c . p . tcpl . mutex . Lock ( )
defer c . p . tcpl . mutex . Unlock ( )
c . close ( )
} ( )
c . log ( "disconnected" )
func ( ) {
if c . p . args . postScript != "" {
postScript := exec . Command ( c . p . args . postScript )
err := postScript . Run ( )
if err != nil {
c . log ( "ERR: %s" , err )
}
}
} ( )
close ( c . done )
2020-01-03 22:05:06 +00:00
}
2019-12-31 12:48:17 +00:00
2020-05-03 12:40:06 +00:00
func ( c * serverClient ) writeResError ( req * gortsplib . Request , code gortsplib . StatusCode , err error ) {
2020-01-03 22:05:06 +00:00
c . log ( "ERR: %s" , err )
2019-12-31 12:48:17 +00:00
2020-05-03 12:40:06 +00:00
header := gortsplib . Header { }
2020-01-26 11:41:26 +00:00
if cseq , ok := req . Header [ "CSeq" ] ; ok && len ( cseq ) == 1 {
2020-05-03 12:40:06 +00:00
header [ "CSeq" ] = [ ] string { cseq [ 0 ] }
2019-12-31 12:48:17 +00:00
}
2020-05-03 12:40:06 +00:00
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : code ,
Header : header ,
} )
2019-12-31 12:48:17 +00:00
}
2020-05-10 20:56:46 +00:00
var errAuthCritical = errors . New ( "auth critical" )
var errAuthNotCritical = errors . New ( "auth not critical" )
2020-06-15 20:41:14 +00:00
func ( c * serverClient ) validateAuth ( req * gortsplib . Request , user string , pass string , auth * * gortsplib . AuthServer , ips [ ] interface { } ) error {
err := func ( ) error {
if ips == nil {
return nil
}
2020-05-10 20:56:46 +00:00
2020-06-15 20:41:14 +00:00
connIp := c . conn . NetConn ( ) . LocalAddr ( ) . ( * net . TCPAddr ) . IP
2020-05-10 20:56:46 +00:00
2020-06-15 20:41:14 +00:00
for _ , item := range ips {
switch titem := item . ( type ) {
case net . IP :
if titem . Equal ( connIp ) {
return nil
}
case * net . IPNet :
if titem . Contains ( connIp ) {
return nil
}
}
2020-05-10 20:56:46 +00:00
}
2020-06-15 20:41:14 +00:00
c . log ( "ERR: ip '%s' not allowed" , connIp )
return errAuthCritical
} ( )
if err != nil {
return err
}
err = func ( ) error {
if user == "" {
return nil
}
2020-05-10 20:56:46 +00:00
2020-06-15 20:41:14 +00:00
initialRequest := false
if * auth == nil {
initialRequest = true
* auth = gortsplib . NewAuthServer ( user , pass , nil )
2020-05-10 20:56:46 +00:00
}
2020-06-15 20:41:14 +00:00
err := ( * auth ) . ValidateHeader ( req . Header [ "Authorization" ] , req . Method , req . Url )
if err != nil {
if ! initialRequest {
c . log ( "ERR: unauthorized: %s" , err )
}
c . conn . WriteResponse ( & gortsplib . Response {
StatusCode : gortsplib . StatusUnauthorized ,
Header : gortsplib . Header {
"CSeq" : [ ] string { req . Header [ "CSeq" ] [ 0 ] } ,
"WWW-Authenticate" : ( * auth ) . GenerateHeader ( ) ,
} ,
} )
if ! initialRequest {
return errAuthCritical
}
return errAuthNotCritical
}
return nil
} ( )
if err != nil {
return err
2020-05-10 20:56:46 +00:00
}
return nil
}
2020-05-03 10:38:11 +00:00
func ( c * serverClient ) handleRequest ( req * gortsplib . Request ) bool {
2020-05-03 14:07:56 +00:00
c . log ( string ( req . Method ) )
2020-01-03 22:05:06 +00:00
2020-01-26 11:41:26 +00:00
cseq , ok := req . Header [ "CSeq" ]
if ! ok || len ( cseq ) != 1 {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "cseq missing" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-01-03 21:39:55 +00:00
path := func ( ) string {
2020-05-03 17:47:23 +00:00
ret := req . Url . Path
2019-12-31 13:55:46 +00:00
// remove leading slash
2020-01-03 21:39:55 +00:00
if len ( ret ) > 1 {
ret = ret [ 1 : ]
2019-12-31 13:55:46 +00:00
}
// strip any subpath
2020-01-03 21:39:55 +00:00
if n := strings . Index ( ret , "/" ) ; n >= 0 {
ret = ret [ : n ]
2019-12-31 13:55:46 +00:00
}
2020-01-03 21:39:55 +00:00
return ret
2019-12-31 13:55:46 +00:00
} ( )
2019-12-31 12:48:17 +00:00
switch req . Method {
2020-05-03 14:07:56 +00:00
case gortsplib . OPTIONS :
2019-12-31 12:48:17 +00:00
// do not check state, since OPTIONS can be requested
// in any state
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
2020-01-26 17:08:15 +00:00
"CSeq" : [ ] string { cseq [ 0 ] } ,
2020-01-26 11:41:26 +00:00
"Public" : [ ] string { strings . Join ( [ ] string {
2020-05-03 14:07:56 +00:00
string ( gortsplib . DESCRIBE ) ,
string ( gortsplib . ANNOUNCE ) ,
string ( gortsplib . SETUP ) ,
string ( gortsplib . PLAY ) ,
string ( gortsplib . PAUSE ) ,
string ( gortsplib . RECORD ) ,
string ( gortsplib . TEARDOWN ) ,
2020-01-26 11:41:26 +00:00
} , ", " ) } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
return true
2019-12-31 12:48:17 +00:00
2020-05-03 14:07:56 +00:00
case gortsplib . DESCRIBE :
2020-01-20 13:41:04 +00:00
if c . state != _CLIENT_STATE_STARTING {
2020-05-21 19:46:22 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest ,
fmt . Errorf ( "client is in state '%s' instead of '%s'" , c . state , _CLIENT_STATE_STARTING ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-06-15 20:41:14 +00:00
err := c . validateAuth ( req , c . p . args . readUser , c . p . args . readPass , & c . readAuth , c . p . readIps )
2020-05-10 20:56:46 +00:00
if err != nil {
if err == errAuthCritical {
return false
}
return true
}
2019-12-31 12:48:17 +00:00
sdp , err := func ( ) ( [ ] byte , error ) {
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . RLock ( )
defer c . p . tcpl . mutex . RUnlock ( )
2019-12-31 12:48:17 +00:00
2020-05-10 14:08:41 +00:00
pub , ok := c . p . tcpl . publishers [ path ]
2019-12-31 13:55:46 +00:00
if ! ok {
return nil , fmt . Errorf ( "no one is streaming on path '%s'" , path )
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
return pub . streamSdpText , nil
2019-12-31 12:48:17 +00:00
} ( )
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , err )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
2020-05-03 17:47:23 +00:00
"Content-Base" : [ ] string { req . Url . String ( ) } ,
2020-01-26 11:41:26 +00:00
"Content-Type" : [ ] string { "application/sdp" } ,
2019-12-31 12:48:17 +00:00
} ,
Content : sdp ,
2020-01-03 22:05:06 +00:00
} )
return true
2019-12-31 12:48:17 +00:00
2020-05-03 14:07:56 +00:00
case gortsplib . ANNOUNCE :
2020-01-20 13:41:04 +00:00
if c . state != _CLIENT_STATE_STARTING {
2020-05-21 19:46:22 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest ,
fmt . Errorf ( "client is in state '%s' instead of '%s'" , c . state , _CLIENT_STATE_STARTING ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-06-15 20:41:14 +00:00
err := c . validateAuth ( req , c . p . args . publishUser , c . p . args . publishPass , & c . publishAuth , c . p . publishIps )
2020-05-10 20:56:46 +00:00
if err != nil {
if err == errAuthCritical {
return false
2020-02-16 15:05:08 +00:00
}
2020-05-10 20:56:46 +00:00
return true
2020-02-16 15:05:08 +00:00
}
2020-01-26 11:41:26 +00:00
ct , ok := req . Header [ "Content-Type" ]
if ! ok || len ( ct ) != 1 {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "Content-Type header missing" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-01-26 11:41:26 +00:00
if ct [ 0 ] != "application/sdp" {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "unsupported Content-Type '%s'" , ct ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-17 14:49:21 +00:00
sdpParsed , err := gortsplib . SDPParse ( req . Content )
2019-12-31 12:48:17 +00:00
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "invalid SDP: %s" , err ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-17 14:49:21 +00:00
sdpParsed , req . Content = gortsplib . SDPFilter ( sdpParsed , req . Content )
2020-01-21 09:08:53 +00:00
2019-12-31 12:48:17 +00:00
err = func ( ) error {
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
defer c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
2020-05-10 14:08:41 +00:00
_ , ok := c . p . tcpl . publishers [ path ]
2019-12-31 13:55:46 +00:00
if ok {
return fmt . Errorf ( "another client is already publishing on path '%s'" , path )
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
c . path = path
2020-05-10 14:08:41 +00:00
c . p . tcpl . publishers [ path ] = c
2019-12-31 12:48:17 +00:00
c . streamSdpText = req . Content
c . streamSdpParsed = sdpParsed
2020-01-20 13:41:04 +00:00
c . state = _CLIENT_STATE_ANNOUNCE
2019-12-31 12:48:17 +00:00
return nil
} ( )
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , err )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
return true
2019-12-31 12:48:17 +00:00
2020-05-03 14:07:56 +00:00
case gortsplib . SETUP :
2020-01-26 11:41:26 +00:00
tsRaw , ok := req . Header [ "Transport" ]
if ! ok || len ( tsRaw ) != 1 {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "transport header missing" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-01-26 11:41:26 +00:00
th := gortsplib . ReadHeaderTransport ( tsRaw [ 0 ] )
2020-06-14 15:46:27 +00:00
if _ , ok := th [ "multicast" ] ; ok {
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "multicast is not supported" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
switch c . state {
// play
2020-01-20 13:41:04 +00:00
case _CLIENT_STATE_STARTING , _CLIENT_STATE_PRE_PLAY :
2020-06-15 20:41:14 +00:00
err := c . validateAuth ( req , c . p . args . readUser , c . p . args . readPass , & c . readAuth , c . p . readIps )
2020-05-10 20:56:46 +00:00
if err != nil {
if err == errAuthCritical {
return false
}
return true
}
2019-12-31 12:48:17 +00:00
// play via UDP
2020-01-14 20:19:25 +00:00
if func ( ) bool {
_ , ok := th [ "RTP/AVP" ]
if ok {
return true
}
_ , ok = th [ "RTP/AVP/UDP" ]
if ok {
return true
}
return false
} ( ) {
2020-01-15 21:48:51 +00:00
if _ , ok := c . p . protocols [ _STREAM_PROTOCOL_UDP ] ; ! ok {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusUnsupportedTransport , fmt . Errorf ( "UDP streaming is disabled" ) )
2020-01-15 21:48:51 +00:00
return false
}
2020-01-20 11:53:06 +00:00
rtpPort , rtcpPort := th . GetPorts ( "client_port" )
2019-12-31 12:48:17 +00:00
if rtpPort == 0 || rtcpPort == 0 {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "transport header does not have valid client ports (%s)" , tsRaw [ 0 ] ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if c . path != "" && path != c . path {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "path has changed" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 13:55:46 +00:00
}
2020-05-03 17:47:23 +00:00
err := func ( ) error {
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
defer c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
2020-05-10 14:08:41 +00:00
pub , ok := c . p . tcpl . publishers [ path ]
2019-12-31 13:55:46 +00:00
if ! ok {
return fmt . Errorf ( "no one is streaming on path '%s'" , path )
}
2019-12-31 12:48:17 +00:00
if len ( c . streamTracks ) > 0 && c . streamProtocol != _STREAM_PROTOCOL_UDP {
2020-05-03 10:28:46 +00:00
return fmt . Errorf ( "client wants to read tracks with different protocols" )
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if len ( c . streamTracks ) >= len ( pub . streamSdpParsed . Medias ) {
2019-12-31 12:48:17 +00:00
return fmt . Errorf ( "all the tracks have already been setup" )
}
2019-12-31 13:55:46 +00:00
c . path = path
2019-12-31 12:48:17 +00:00
c . streamProtocol = _STREAM_PROTOCOL_UDP
c . streamTracks = append ( c . streamTracks , & track {
rtpPort : rtpPort ,
rtcpPort : rtcpPort ,
} )
2020-01-20 13:41:04 +00:00
c . state = _CLIENT_STATE_PRE_PLAY
2019-12-31 12:48:17 +00:00
return nil
} ( )
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , err )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
"Transport" : [ ] string { strings . Join ( [ ] string {
2020-01-14 20:33:53 +00:00
"RTP/AVP/UDP" ,
2019-12-31 12:48:17 +00:00
"unicast" ,
fmt . Sprintf ( "client_port=%d-%d" , rtpPort , rtcpPort ) ,
2020-05-10 13:33:42 +00:00
fmt . Sprintf ( "server_port=%d-%d" , c . p . args . rtpPort , c . p . args . rtcpPort ) ,
2020-01-26 11:41:26 +00:00
} , ";" ) } ,
"Session" : [ ] string { "12345678" } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
return true
2019-12-31 12:48:17 +00:00
// play via TCP
} else if _ , ok := th [ "RTP/AVP/TCP" ] ; ok {
2020-01-15 21:48:51 +00:00
if _ , ok := c . p . protocols [ _STREAM_PROTOCOL_TCP ] ; ! ok {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusUnsupportedTransport , fmt . Errorf ( "TCP streaming is disabled" ) )
2020-01-15 21:48:51 +00:00
return false
}
2019-12-31 13:55:46 +00:00
if c . path != "" && path != c . path {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "path has changed" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 13:55:46 +00:00
}
2020-05-03 17:47:23 +00:00
err := func ( ) error {
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
defer c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
2020-05-10 14:08:41 +00:00
pub , ok := c . p . tcpl . publishers [ path ]
2019-12-31 13:55:46 +00:00
if ! ok {
return fmt . Errorf ( "no one is streaming on path '%s'" , path )
}
2019-12-31 12:48:17 +00:00
if len ( c . streamTracks ) > 0 && c . streamProtocol != _STREAM_PROTOCOL_TCP {
2020-05-03 10:28:46 +00:00
return fmt . Errorf ( "client wants to read tracks with different protocols" )
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if len ( c . streamTracks ) >= len ( pub . streamSdpParsed . Medias ) {
2019-12-31 12:48:17 +00:00
return fmt . Errorf ( "all the tracks have already been setup" )
}
2019-12-31 13:55:46 +00:00
c . path = path
2019-12-31 12:48:17 +00:00
c . streamProtocol = _STREAM_PROTOCOL_TCP
c . streamTracks = append ( c . streamTracks , & track {
rtpPort : 0 ,
rtcpPort : 0 ,
} )
2020-01-20 13:41:04 +00:00
c . state = _CLIENT_STATE_PRE_PLAY
2019-12-31 12:48:17 +00:00
return nil
} ( )
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , err )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
interleaved := fmt . Sprintf ( "%d-%d" , ( ( len ( c . streamTracks ) - 1 ) * 2 ) , ( ( len ( c . streamTracks ) - 1 ) * 2 ) + 1 )
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
"Transport" : [ ] string { strings . Join ( [ ] string {
2019-12-31 12:48:17 +00:00
"RTP/AVP/TCP" ,
"unicast" ,
fmt . Sprintf ( "interleaved=%s" , interleaved ) ,
2020-01-26 11:41:26 +00:00
} , ";" ) } ,
"Session" : [ ] string { "12345678" } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
return true
2019-12-31 12:48:17 +00:00
} else {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)" , tsRaw [ 0 ] ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
// record
2020-01-20 13:41:04 +00:00
case _CLIENT_STATE_ANNOUNCE , _CLIENT_STATE_PRE_RECORD :
2019-12-31 12:48:17 +00:00
if _ , ok := th [ "mode=record" ] ; ! ok {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "transport header does not contain mode=record" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if path != c . path {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "path has changed" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
// record via UDP
2020-01-15 21:27:44 +00:00
if func ( ) bool {
_ , ok := th [ "RTP/AVP" ]
if ok {
return true
}
_ , ok = th [ "RTP/AVP/UDP" ]
if ok {
return true
}
return false
2020-01-15 21:48:51 +00:00
} ( ) {
if _ , ok := c . p . protocols [ _STREAM_PROTOCOL_UDP ] ; ! ok {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusUnsupportedTransport , fmt . Errorf ( "UDP streaming is disabled" ) )
2020-01-15 21:48:51 +00:00
return false
}
2020-01-20 11:53:06 +00:00
rtpPort , rtcpPort := th . GetPorts ( "client_port" )
2019-12-31 12:48:17 +00:00
if rtpPort == 0 || rtcpPort == 0 {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "transport header does not have valid client ports (%s)" , tsRaw [ 0 ] ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-03 17:47:23 +00:00
err := func ( ) error {
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
defer c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
if len ( c . streamTracks ) > 0 && c . streamProtocol != _STREAM_PROTOCOL_UDP {
2020-05-03 10:28:46 +00:00
return fmt . Errorf ( "client wants to publish tracks with different protocols" )
2019-12-31 12:48:17 +00:00
}
if len ( c . streamTracks ) >= len ( c . streamSdpParsed . Medias ) {
return fmt . Errorf ( "all the tracks have already been setup" )
}
c . streamProtocol = _STREAM_PROTOCOL_UDP
c . streamTracks = append ( c . streamTracks , & track {
rtpPort : rtpPort ,
rtcpPort : rtcpPort ,
} )
2020-01-20 13:41:04 +00:00
c . state = _CLIENT_STATE_PRE_RECORD
2019-12-31 12:48:17 +00:00
return nil
} ( )
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , err )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
"Transport" : [ ] string { strings . Join ( [ ] string {
2020-01-14 20:33:53 +00:00
"RTP/AVP/UDP" ,
2019-12-31 12:48:17 +00:00
"unicast" ,
fmt . Sprintf ( "client_port=%d-%d" , rtpPort , rtcpPort ) ,
2020-05-10 13:33:42 +00:00
fmt . Sprintf ( "server_port=%d-%d" , c . p . args . rtpPort , c . p . args . rtcpPort ) ,
2020-01-26 11:41:26 +00:00
} , ";" ) } ,
"Session" : [ ] string { "12345678" } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
return true
2019-12-31 12:48:17 +00:00
// record via TCP
} else if _ , ok := th [ "RTP/AVP/TCP" ] ; ok {
2020-01-15 21:48:51 +00:00
if _ , ok := c . p . protocols [ _STREAM_PROTOCOL_TCP ] ; ! ok {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusUnsupportedTransport , fmt . Errorf ( "TCP streaming is disabled" ) )
2020-01-15 21:48:51 +00:00
return false
}
2019-12-31 12:48:17 +00:00
var interleaved string
2020-05-03 17:47:23 +00:00
err := func ( ) error {
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
defer c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
if len ( c . streamTracks ) > 0 && c . streamProtocol != _STREAM_PROTOCOL_TCP {
2020-05-03 10:28:46 +00:00
return fmt . Errorf ( "client wants to publish tracks with different protocols" )
2019-12-31 12:48:17 +00:00
}
if len ( c . streamTracks ) >= len ( c . streamSdpParsed . Medias ) {
return fmt . Errorf ( "all the tracks have already been setup" )
}
2020-01-20 11:53:06 +00:00
interleaved = th . GetValue ( "interleaved" )
2019-12-31 12:48:17 +00:00
if interleaved == "" {
return fmt . Errorf ( "transport header does not contain interleaved field" )
}
expInterleaved := fmt . Sprintf ( "%d-%d" , 0 + len ( c . streamTracks ) * 2 , 1 + len ( c . streamTracks ) * 2 )
if interleaved != expInterleaved {
return fmt . Errorf ( "wrong interleaved value, expected '%s', got '%s'" , expInterleaved , interleaved )
}
c . streamProtocol = _STREAM_PROTOCOL_TCP
c . streamTracks = append ( c . streamTracks , & track {
rtpPort : 0 ,
rtcpPort : 0 ,
} )
2020-01-20 13:41:04 +00:00
c . state = _CLIENT_STATE_PRE_RECORD
2019-12-31 12:48:17 +00:00
return nil
} ( )
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , err )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
"Transport" : [ ] string { strings . Join ( [ ] string {
2019-12-31 12:48:17 +00:00
"RTP/AVP/TCP" ,
"unicast" ,
fmt . Sprintf ( "interleaved=%s" , interleaved ) ,
2020-01-26 11:41:26 +00:00
} , ";" ) } ,
"Session" : [ ] string { "12345678" } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
return true
2019-12-31 12:48:17 +00:00
} else {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)" , tsRaw [ 0 ] ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
default :
2020-05-21 19:46:22 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "client is in state '%s'" , c . state ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-03 14:07:56 +00:00
case gortsplib . PLAY :
2020-01-20 13:41:04 +00:00
if c . state != _CLIENT_STATE_PRE_PLAY {
2020-05-21 19:46:22 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest ,
fmt . Errorf ( "client is in state '%s' instead of '%s'" , c . state , _CLIENT_STATE_PRE_PLAY ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if path != c . path {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "path has changed" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
err := func ( ) error {
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
defer c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
2020-05-10 14:08:41 +00:00
pub , ok := c . p . tcpl . publishers [ c . path ]
2019-12-31 13:55:46 +00:00
if ! ok {
return fmt . Errorf ( "no one is streaming on path '%s'" , c . path )
}
if len ( c . streamTracks ) != len ( pub . streamSdpParsed . Medias ) {
2019-12-31 12:48:17 +00:00
return fmt . Errorf ( "not all tracks have been setup" )
}
return nil
} ( )
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , err )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-01-03 22:05:06 +00:00
// first write response, then set state
// otherwise, in case of TCP connections, RTP packets could be written
// before the response
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
"Session" : [ ] string { "12345678" } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
c . log ( "is receiving on path '%s', %d %s via %s" , c . path , len ( c . streamTracks ) , func ( ) string {
if len ( c . streamTracks ) == 1 {
return "track"
}
return "tracks"
} ( ) , c . streamProtocol )
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
2020-01-20 13:41:04 +00:00
c . state = _CLIENT_STATE_PLAY
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Unlock ( )
2020-01-03 22:05:06 +00:00
// when protocol is TCP, the RTSP connection becomes a RTP connection
if c . streamProtocol == _STREAM_PROTOCOL_TCP {
2020-01-26 17:08:15 +00:00
// write RTP frames sequentially
go func ( ) {
2020-05-10 13:38:01 +00:00
for frame := range c . write {
2020-01-26 17:08:15 +00:00
c . conn . WriteInterleavedFrame ( frame )
}
} ( )
// receive RTP feedback, do not parse it, wait until connection closes
2020-01-03 22:05:06 +00:00
buf := make ( [ ] byte , 2048 )
for {
2020-01-20 15:44:02 +00:00
_ , err := c . conn . NetConn ( ) . Read ( buf )
2020-01-03 22:05:06 +00:00
if err != nil {
if err != io . EOF {
c . log ( "ERR: %s" , err )
}
return false
}
}
}
return true
2019-12-31 12:48:17 +00:00
2020-05-03 14:07:56 +00:00
case gortsplib . PAUSE :
2020-01-20 13:41:04 +00:00
if c . state != _CLIENT_STATE_PLAY {
2020-05-21 19:46:22 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest ,
fmt . Errorf ( "client is in state '%s' instead of '%s'" , c . state , _CLIENT_STATE_PLAY ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if path != c . path {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "path has changed" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
c . log ( "paused" )
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
2020-01-20 13:41:04 +00:00
c . state = _CLIENT_STATE_PRE_PLAY
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
"Session" : [ ] string { "12345678" } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
return true
2019-12-31 12:48:17 +00:00
2020-05-03 14:07:56 +00:00
case gortsplib . RECORD :
2020-01-20 13:41:04 +00:00
if c . state != _CLIENT_STATE_PRE_RECORD {
2020-05-21 19:46:22 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest ,
fmt . Errorf ( "client is in state '%s' instead of '%s'" , c . state , _CLIENT_STATE_PRE_RECORD ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if path != c . path {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "path has changed" ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
err := func ( ) error {
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
defer c . p . tcpl . mutex . Unlock ( )
2019-12-31 12:48:17 +00:00
if len ( c . streamTracks ) != len ( c . streamSdpParsed . Medias ) {
return fmt . Errorf ( "not all tracks have been setup" )
}
return nil
} ( )
if err != nil {
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , err )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2020-05-03 13:22:41 +00:00
c . conn . WriteResponse ( & gortsplib . Response {
2020-05-03 12:40:06 +00:00
StatusCode : gortsplib . StatusOK ,
2020-01-26 11:41:26 +00:00
Header : gortsplib . Header {
"CSeq" : [ ] string { cseq [ 0 ] } ,
"Session" : [ ] string { "12345678" } ,
2019-12-31 12:48:17 +00:00
} ,
2020-01-03 22:05:06 +00:00
} )
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Lock ( )
2020-01-20 13:41:04 +00:00
c . state = _CLIENT_STATE_RECORD
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . Unlock ( )
2020-01-03 22:05:06 +00:00
c . log ( "is publishing on path '%s', %d %s via %s" , c . path , len ( c . streamTracks ) , func ( ) string {
if len ( c . streamTracks ) == 1 {
return "track"
}
return "tracks"
} ( ) , c . streamProtocol )
// when protocol is TCP, the RTSP connection becomes a RTP connection
// receive RTP data and parse it
if c . streamProtocol == _STREAM_PROTOCOL_TCP {
for {
2020-01-26 11:41:26 +00:00
frame , err := c . conn . ReadInterleavedFrame ( )
2020-01-03 22:05:06 +00:00
if err != nil {
2020-05-10 19:37:44 +00:00
if err != io . EOF {
c . log ( "ERR: %s" , err )
}
2020-01-03 22:05:06 +00:00
return false
}
2020-01-26 11:41:26 +00:00
trackId , trackFlow := interleavedChannelToTrack ( frame . Channel )
2020-01-03 22:05:06 +00:00
if trackId >= len ( c . streamTracks ) {
c . log ( "ERR: invalid track id '%d'" , trackId )
return false
}
2020-05-10 14:08:41 +00:00
c . p . tcpl . mutex . RLock ( )
c . p . tcpl . forwardTrack ( c . path , trackId , trackFlow , frame . Content )
c . p . tcpl . mutex . RUnlock ( )
2020-01-03 22:05:06 +00:00
}
}
return true
2019-12-31 12:48:17 +00:00
2020-05-03 14:07:56 +00:00
case gortsplib . TEARDOWN :
2020-01-03 22:05:06 +00:00
// close connection silently
return false
2019-12-31 12:48:17 +00:00
default :
2020-05-03 12:40:06 +00:00
c . writeResError ( req , gortsplib . StatusBadRequest , fmt . Errorf ( "unhandled method '%s'" , req . Method ) )
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
}