mirror of
https://github.com/bluenviron/mediamtx
synced 2025-01-03 21:42:26 +00:00
1070 lines
26 KiB
Go
1070 lines
26 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aler9/gortsplib"
|
|
"github.com/aler9/sdp/v3"
|
|
)
|
|
|
|
const (
|
|
clientCheckStreamInterval = 5 * time.Second
|
|
clientReceiverReportInterval = 10 * time.Second
|
|
clientTcpReadBufferSize = 128 * 1024
|
|
clientTcpWriteBufferSize = 128 * 1024
|
|
clientUdpReadBufferSize = 2048
|
|
clientUdpWriteBufferSize = 128 * 1024
|
|
)
|
|
|
|
type serverClientTrack struct {
|
|
rtpPort int
|
|
rtcpPort int
|
|
}
|
|
|
|
type serverClientEvent interface {
|
|
isServerClientEvent()
|
|
}
|
|
|
|
type serverClientEventFrameTcp struct {
|
|
frame *gortsplib.InterleavedFrame
|
|
}
|
|
|
|
func (serverClientEventFrameTcp) isServerClientEvent() {}
|
|
|
|
type serverClientState int
|
|
|
|
const (
|
|
clientStateStarting serverClientState = iota
|
|
clientStateAnnounce
|
|
clientStatePrePlay
|
|
clientStatePlay
|
|
clientStatePreRecord
|
|
clientStateRecord
|
|
)
|
|
|
|
func (cs serverClientState) String() string {
|
|
switch cs {
|
|
case clientStateStarting:
|
|
return "STARTING"
|
|
|
|
case clientStateAnnounce:
|
|
return "ANNOUNCE"
|
|
|
|
case clientStatePrePlay:
|
|
return "PRE_PLAY"
|
|
|
|
case clientStatePlay:
|
|
return "PLAY"
|
|
|
|
case clientStatePreRecord:
|
|
return "PRE_RECORD"
|
|
|
|
case clientStateRecord:
|
|
return "RECORD"
|
|
}
|
|
return "UNKNOWN"
|
|
}
|
|
|
|
type serverClient struct {
|
|
p *program
|
|
conn *gortsplib.ConnServer
|
|
state serverClientState
|
|
path string
|
|
authUser string
|
|
authPass string
|
|
authHelper *gortsplib.AuthServer
|
|
authFailures int
|
|
streamSdpText []byte // only if publisher
|
|
streamSdpParsed *sdp.SessionDescription // only if publisher
|
|
streamProtocol gortsplib.StreamProtocol
|
|
streamTracks []*serverClientTrack
|
|
rtcpReceivers []*gortsplib.RtcpReceiver
|
|
readBuf *doubleBuffer
|
|
writeBuf *doubleBuffer
|
|
|
|
events chan serverClientEvent // only if state = Play and gortsplib.StreamProtocol = TCP
|
|
done chan struct{}
|
|
}
|
|
|
|
func newServerClient(p *program, nconn net.Conn) *serverClient {
|
|
c := &serverClient{
|
|
p: p,
|
|
conn: gortsplib.NewConnServer(gortsplib.ConnServerConf{
|
|
Conn: nconn,
|
|
ReadTimeout: p.conf.ReadTimeout,
|
|
WriteTimeout: p.conf.WriteTimeout,
|
|
}),
|
|
state: clientStateStarting,
|
|
readBuf: newDoubleBuffer(clientTcpReadBufferSize),
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
go c.run()
|
|
return c
|
|
}
|
|
|
|
func (c *serverClient) log(format string, args ...interface{}) {
|
|
c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
|
|
}
|
|
|
|
func (c *serverClient) ip() net.IP {
|
|
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
|
|
}
|
|
|
|
func (c *serverClient) zone() string {
|
|
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone
|
|
}
|
|
|
|
func (c *serverClient) publisherIsReady() bool {
|
|
return c.state == clientStateRecord
|
|
}
|
|
|
|
func (c *serverClient) publisherSdpText() []byte {
|
|
return c.streamSdpText
|
|
}
|
|
|
|
func (c *serverClient) publisherSdpParsed() *sdp.SessionDescription {
|
|
return c.streamSdpParsed
|
|
}
|
|
|
|
func (c *serverClient) run() {
|
|
var runOnConnectCmd *exec.Cmd
|
|
if c.p.conf.RunOnConnect != "" {
|
|
runOnConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect)
|
|
runOnConnectCmd.Stdout = os.Stdout
|
|
runOnConnectCmd.Stderr = os.Stderr
|
|
err := runOnConnectCmd.Start()
|
|
if err != nil {
|
|
c.log("ERR: %s", err)
|
|
}
|
|
}
|
|
|
|
outer:
|
|
for {
|
|
req, err := c.conn.ReadRequest()
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
c.log("ERR: %s", err)
|
|
}
|
|
break outer
|
|
}
|
|
|
|
ok := c.handleRequest(req)
|
|
if !ok {
|
|
break outer
|
|
}
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
c.p.events <- programEventClientClose{done, c}
|
|
<-done
|
|
|
|
c.conn.NetConn().Close() // close socket in case it has not been closed yet
|
|
|
|
if runOnConnectCmd != nil {
|
|
runOnConnectCmd.Process.Signal(os.Interrupt)
|
|
runOnConnectCmd.Wait()
|
|
}
|
|
|
|
close(c.done) // close() never blocks
|
|
}
|
|
|
|
func (c *serverClient) close() {
|
|
c.conn.NetConn().Close()
|
|
<-c.done
|
|
}
|
|
|
|
func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) {
|
|
c.log("ERR: %s", err)
|
|
|
|
header := gortsplib.Header{}
|
|
if cseq, ok := req.Header["CSeq"]; ok && len(cseq) == 1 {
|
|
header["CSeq"] = cseq
|
|
}
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: code,
|
|
Header: header,
|
|
})
|
|
}
|
|
|
|
func (c *serverClient) findConfForPath(path string) *ConfPath {
|
|
if pconf, ok := c.p.conf.Paths[path]; ok {
|
|
return pconf
|
|
}
|
|
|
|
if pconf, ok := c.p.conf.Paths["all"]; ok {
|
|
return pconf
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var errAuthCritical = errors.New("auth critical")
|
|
var errAuthNotCritical = errors.New("auth not critical")
|
|
|
|
func (c *serverClient) authenticate(ips []interface{}, user string, pass string, req *gortsplib.Request) error {
|
|
// validate ip
|
|
err := func() error {
|
|
if ips == nil {
|
|
return nil
|
|
}
|
|
|
|
ip := c.ip()
|
|
if !ipEqualOrInRange(ip, ips) {
|
|
c.log("ERR: ip '%s' not allowed", ip)
|
|
return errAuthCritical
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// validate credentials
|
|
err = func() error {
|
|
if user == "" {
|
|
return nil
|
|
}
|
|
|
|
// reset authHelper every time the credentials change
|
|
if c.authHelper == nil || c.authUser != user || c.authPass != pass {
|
|
c.authUser = user
|
|
c.authPass = pass
|
|
c.authHelper = gortsplib.NewAuthServer(user, pass, c.p.conf.authMethodsParsed)
|
|
}
|
|
|
|
err := c.authHelper.ValidateHeader(req.Header["Authorization"], req.Method, req.Url)
|
|
if err != nil {
|
|
c.authFailures += 1
|
|
|
|
// vlc with login prompt sends 4 requests:
|
|
// 1) without credentials
|
|
// 2) with password but without the username
|
|
// 3) without credentials
|
|
// 4) with password and username
|
|
// hence we must allow up to 3 failures
|
|
var retErr error
|
|
if c.authFailures > 3 {
|
|
c.log("ERR: unauthorized: %s", err)
|
|
retErr = errAuthCritical
|
|
|
|
} else if c.authFailures > 1 {
|
|
c.log("WARN: unauthorized: %s", err)
|
|
retErr = errAuthNotCritical
|
|
|
|
} else {
|
|
retErr = errAuthNotCritical
|
|
}
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusUnauthorized,
|
|
Header: gortsplib.Header{
|
|
"CSeq": req.Header["CSeq"],
|
|
"WWW-Authenticate": c.authHelper.GenerateHeader(),
|
|
},
|
|
})
|
|
|
|
return retErr
|
|
}
|
|
|
|
// reset authFailures after a successful login
|
|
c.authFailures = 0
|
|
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
|
|
c.log(string(req.Method))
|
|
|
|
cseq, ok := req.Header["CSeq"]
|
|
if !ok || len(cseq) != 1 {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("cseq missing"))
|
|
return false
|
|
}
|
|
|
|
path := func() string {
|
|
ret := req.Url.Path
|
|
|
|
// remove leading slash
|
|
if len(ret) > 1 {
|
|
ret = ret[1:]
|
|
}
|
|
|
|
// strip any subpath
|
|
if n := strings.Index(ret, "/"); n >= 0 {
|
|
ret = ret[:n]
|
|
}
|
|
|
|
return ret
|
|
}()
|
|
|
|
switch req.Method {
|
|
case gortsplib.OPTIONS:
|
|
// do not check state, since OPTIONS can be requested
|
|
// in any state
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
"Public": gortsplib.HeaderValue{strings.Join([]string{
|
|
string(gortsplib.DESCRIBE),
|
|
string(gortsplib.ANNOUNCE),
|
|
string(gortsplib.SETUP),
|
|
string(gortsplib.PLAY),
|
|
string(gortsplib.RECORD),
|
|
string(gortsplib.TEARDOWN),
|
|
}, ", ")},
|
|
},
|
|
})
|
|
return true
|
|
|
|
case gortsplib.DESCRIBE:
|
|
if c.state != clientStateStarting {
|
|
c.writeResError(req, gortsplib.StatusBadRequest,
|
|
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting))
|
|
return false
|
|
}
|
|
|
|
pconf := c.findConfForPath(path)
|
|
if pconf == nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest,
|
|
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
|
|
return false
|
|
}
|
|
|
|
err := c.authenticate(pconf.readIpsParsed, pconf.ReadUser, pconf.ReadPass, req)
|
|
if err != nil {
|
|
if err == errAuthCritical {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
res := make(chan []byte)
|
|
c.p.events <- programEventClientDescribe{path, res}
|
|
sdp := <-res
|
|
if sdp == nil {
|
|
c.writeResError(req, gortsplib.StatusNotFound, fmt.Errorf("no one is publishing on path '%s'", path))
|
|
return false
|
|
}
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
"Content-Base": gortsplib.HeaderValue{req.Url.String() + "/"},
|
|
"Content-Type": gortsplib.HeaderValue{"application/sdp"},
|
|
},
|
|
Content: sdp,
|
|
})
|
|
return true
|
|
|
|
case gortsplib.ANNOUNCE:
|
|
if c.state != clientStateStarting {
|
|
c.writeResError(req, gortsplib.StatusBadRequest,
|
|
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateStarting))
|
|
return false
|
|
}
|
|
|
|
if len(path) == 0 {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path can't be empty"))
|
|
return false
|
|
}
|
|
|
|
pconf := c.findConfForPath(path)
|
|
if pconf == nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest,
|
|
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
|
|
return false
|
|
}
|
|
|
|
err := c.authenticate(pconf.publishIpsParsed, pconf.PublishUser, pconf.PublishPass, req)
|
|
if err != nil {
|
|
if err == errAuthCritical {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
ct, ok := req.Header["Content-Type"]
|
|
if !ok || len(ct) != 1 {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("Content-Type header missing"))
|
|
return false
|
|
}
|
|
|
|
if ct[0] != "application/sdp" {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unsupported Content-Type '%s'", ct))
|
|
return false
|
|
}
|
|
|
|
sdpParsed := &sdp.SessionDescription{}
|
|
err = sdpParsed.Unmarshal(req.Content)
|
|
if err != nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("invalid SDP: %s", err))
|
|
return false
|
|
}
|
|
|
|
if len(sdpParsed.MediaDescriptions) == 0 {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("no tracks defined"))
|
|
return false
|
|
}
|
|
|
|
var tracks []*gortsplib.Track
|
|
for i, media := range sdpParsed.MediaDescriptions {
|
|
tracks = append(tracks, &gortsplib.Track{
|
|
Id: i,
|
|
Media: media,
|
|
})
|
|
}
|
|
sdpParsed, req.Content = sdpForServer(tracks)
|
|
|
|
res := make(chan error)
|
|
c.p.events <- programEventClientAnnounce{res, c, path}
|
|
err = <-res
|
|
if err != nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, err)
|
|
return false
|
|
}
|
|
|
|
c.streamSdpText = req.Content
|
|
c.streamSdpParsed = sdpParsed
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
},
|
|
})
|
|
return true
|
|
|
|
case gortsplib.SETUP:
|
|
th, err := gortsplib.ReadHeaderTransport(req.Header["Transport"])
|
|
if err != nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header: %s", err))
|
|
return false
|
|
}
|
|
|
|
if _, ok := th["multicast"]; ok {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("multicast is not supported"))
|
|
return false
|
|
}
|
|
|
|
switch c.state {
|
|
// play
|
|
case clientStateStarting, clientStatePrePlay:
|
|
pconf := c.findConfForPath(path)
|
|
if pconf == nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest,
|
|
fmt.Errorf("unable to find a valid configuration for path '%s'", path))
|
|
return false
|
|
}
|
|
|
|
err := c.authenticate(pconf.readIpsParsed, pconf.ReadUser, pconf.ReadPass, req)
|
|
if err != nil {
|
|
if err == errAuthCritical {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// play via UDP
|
|
if func() bool {
|
|
_, ok := th["RTP/AVP"]
|
|
if ok {
|
|
return true
|
|
}
|
|
_, ok = th["RTP/AVP/UDP"]
|
|
if ok {
|
|
return true
|
|
}
|
|
return false
|
|
}() {
|
|
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
|
|
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
|
|
return false
|
|
}
|
|
|
|
rtpPort, rtcpPort := th.Ports("client_port")
|
|
if rtpPort == 0 || rtcpPort == 0 {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%v)", req.Header["Transport"]))
|
|
return false
|
|
}
|
|
|
|
if c.path != "" && path != c.path {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
|
|
return false
|
|
}
|
|
|
|
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
|
|
return false
|
|
}
|
|
|
|
res := make(chan error)
|
|
c.p.events <- programEventClientSetupPlay{res, c, path, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort}
|
|
err = <-res
|
|
if err != nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, err)
|
|
return false
|
|
}
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
"Transport": gortsplib.HeaderValue{strings.Join([]string{
|
|
"RTP/AVP/UDP",
|
|
"unicast",
|
|
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
|
|
fmt.Sprintf("server_port=%d-%d", c.p.conf.RtpPort, c.p.conf.RtcpPort),
|
|
}, ";")},
|
|
"Session": gortsplib.HeaderValue{"12345678"},
|
|
},
|
|
})
|
|
return true
|
|
|
|
// play via TCP
|
|
} else if _, ok := th["RTP/AVP/TCP"]; ok {
|
|
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
|
|
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
|
|
return false
|
|
}
|
|
|
|
if c.path != "" && path != c.path {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
|
|
return false
|
|
}
|
|
|
|
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols"))
|
|
return false
|
|
}
|
|
|
|
res := make(chan error)
|
|
c.p.events <- programEventClientSetupPlay{res, c, path, gortsplib.StreamProtocolTcp, 0, 0}
|
|
err = <-res
|
|
if err != nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, err)
|
|
return false
|
|
}
|
|
|
|
interleaved := fmt.Sprintf("%d-%d", ((len(c.streamTracks) - 1) * 2), ((len(c.streamTracks)-1)*2)+1)
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
"Transport": gortsplib.HeaderValue{strings.Join([]string{
|
|
"RTP/AVP/TCP",
|
|
"unicast",
|
|
fmt.Sprintf("interleaved=%s", interleaved),
|
|
}, ";")},
|
|
"Session": gortsplib.HeaderValue{"12345678"},
|
|
},
|
|
})
|
|
return true
|
|
|
|
} else {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", req.Header["Transport"]))
|
|
return false
|
|
}
|
|
|
|
// record
|
|
case clientStateAnnounce, clientStatePreRecord:
|
|
if strings.ToLower(th.Value("mode")) != "record" {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record"))
|
|
return false
|
|
}
|
|
|
|
// after ANNOUNCE, c.path is already set
|
|
if path != c.path {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
|
|
return false
|
|
}
|
|
|
|
// record via UDP
|
|
if func() bool {
|
|
_, ok := th["RTP/AVP"]
|
|
if ok {
|
|
return true
|
|
}
|
|
_, ok = th["RTP/AVP/UDP"]
|
|
if ok {
|
|
return true
|
|
}
|
|
return false
|
|
}() {
|
|
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok {
|
|
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled"))
|
|
return false
|
|
}
|
|
|
|
rtpPort, rtcpPort := th.Ports("client_port")
|
|
if rtpPort == 0 || rtcpPort == 0 {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"]))
|
|
return false
|
|
}
|
|
|
|
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
|
|
return false
|
|
}
|
|
|
|
if len(c.streamTracks) >= len(c.streamSdpParsed.MediaDescriptions) {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
|
|
return false
|
|
}
|
|
|
|
res := make(chan error)
|
|
c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort}
|
|
err := <-res
|
|
if err != nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, err)
|
|
return false
|
|
}
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
"Transport": gortsplib.HeaderValue{strings.Join([]string{
|
|
"RTP/AVP/UDP",
|
|
"unicast",
|
|
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
|
|
fmt.Sprintf("server_port=%d-%d", c.p.conf.RtpPort, c.p.conf.RtcpPort),
|
|
}, ";")},
|
|
"Session": gortsplib.HeaderValue{"12345678"},
|
|
},
|
|
})
|
|
return true
|
|
|
|
// record via TCP
|
|
} else if _, ok := th["RTP/AVP/TCP"]; ok {
|
|
if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok {
|
|
c.writeResError(req, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled"))
|
|
return false
|
|
}
|
|
|
|
if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols"))
|
|
return false
|
|
}
|
|
|
|
interleaved := th.Value("interleaved")
|
|
if interleaved == "" {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain the interleaved field"))
|
|
return false
|
|
}
|
|
|
|
expInterleaved := fmt.Sprintf("%d-%d", 0+len(c.streamTracks)*2, 1+len(c.streamTracks)*2)
|
|
if interleaved != expInterleaved {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("wrong interleaved value, expected '%s', got '%s'", expInterleaved, interleaved))
|
|
return false
|
|
}
|
|
|
|
if len(c.streamTracks) >= len(c.streamSdpParsed.MediaDescriptions) {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
|
|
return false
|
|
}
|
|
|
|
res := make(chan error)
|
|
c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolTcp, 0, 0}
|
|
err := <-res
|
|
if err != nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, err)
|
|
return false
|
|
}
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
"Transport": gortsplib.HeaderValue{strings.Join([]string{
|
|
"RTP/AVP/TCP",
|
|
"unicast",
|
|
fmt.Sprintf("interleaved=%s", interleaved),
|
|
}, ";")},
|
|
"Session": gortsplib.HeaderValue{"12345678"},
|
|
},
|
|
})
|
|
return true
|
|
|
|
} else {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", req.Header["Transport"]))
|
|
return false
|
|
}
|
|
|
|
default:
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s'", c.state))
|
|
return false
|
|
}
|
|
|
|
case gortsplib.PLAY:
|
|
if c.state != clientStatePrePlay {
|
|
c.writeResError(req, gortsplib.StatusBadRequest,
|
|
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePrePlay))
|
|
return false
|
|
}
|
|
|
|
if path != c.path {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
|
|
return false
|
|
}
|
|
|
|
// check publisher existence
|
|
res := make(chan error)
|
|
c.p.events <- programEventClientPlay1{res, c}
|
|
err := <-res
|
|
if err != nil {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, err)
|
|
return false
|
|
}
|
|
|
|
// write response before setting state
|
|
// otherwise, in case of TCP connections, RTP packets could be sent
|
|
// before the response
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
"Session": gortsplib.HeaderValue{"12345678"},
|
|
},
|
|
})
|
|
|
|
c.runPlay(path)
|
|
return false
|
|
|
|
case gortsplib.RECORD:
|
|
if c.state != clientStatePreRecord {
|
|
c.writeResError(req, gortsplib.StatusBadRequest,
|
|
fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePreRecord))
|
|
return false
|
|
}
|
|
|
|
if path != c.path {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed"))
|
|
return false
|
|
}
|
|
|
|
if len(c.streamTracks) != len(c.streamSdpParsed.MediaDescriptions) {
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup"))
|
|
return false
|
|
}
|
|
|
|
c.conn.WriteResponse(&gortsplib.Response{
|
|
StatusCode: gortsplib.StatusOK,
|
|
Header: gortsplib.Header{
|
|
"CSeq": cseq,
|
|
"Session": gortsplib.HeaderValue{"12345678"},
|
|
},
|
|
})
|
|
|
|
c.runRecord(path)
|
|
return false
|
|
|
|
case gortsplib.TEARDOWN:
|
|
// close connection silently
|
|
return false
|
|
|
|
default:
|
|
c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("unhandled method '%s'", req.Method))
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *serverClient) runPlay(path string) {
|
|
pconf := c.findConfForPath(path)
|
|
|
|
if c.streamProtocol == gortsplib.StreamProtocolTcp {
|
|
c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize)
|
|
c.events = make(chan serverClientEvent)
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
c.p.events <- programEventClientPlay2{done, c}
|
|
<-done
|
|
|
|
c.log("is receiving on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
|
|
if len(c.streamTracks) == 1 {
|
|
return "track"
|
|
}
|
|
return "tracks"
|
|
}(), c.streamProtocol)
|
|
|
|
var runOnReadCmd *exec.Cmd
|
|
if pconf.RunOnRead != "" {
|
|
runOnReadCmd = exec.Command("/bin/sh", "-c", pconf.RunOnRead)
|
|
runOnReadCmd.Stdout = os.Stdout
|
|
runOnReadCmd.Stderr = os.Stderr
|
|
err := runOnReadCmd.Start()
|
|
if err != nil {
|
|
c.log("ERR: %s", err)
|
|
}
|
|
}
|
|
|
|
if c.streamProtocol == gortsplib.StreamProtocolTcp {
|
|
readDone := make(chan error)
|
|
go func() {
|
|
buf := make([]byte, 2048)
|
|
|
|
for {
|
|
_, err := c.conn.NetConn().Read(buf)
|
|
if err != nil {
|
|
readDone <- err
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
|
|
outer:
|
|
for {
|
|
select {
|
|
case err := <-readDone:
|
|
if err != io.EOF {
|
|
c.log("ERR: %s", err)
|
|
}
|
|
break outer
|
|
|
|
case rawEvt := <-c.events:
|
|
switch evt := rawEvt.(type) {
|
|
case serverClientEventFrameTcp:
|
|
c.conn.WriteFrame(evt.frame)
|
|
}
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
for range c.events {
|
|
}
|
|
}()
|
|
|
|
done := make(chan struct{})
|
|
c.p.events <- programEventClientPlayStop{done, c}
|
|
<-done
|
|
|
|
close(c.events)
|
|
|
|
} else {
|
|
for {
|
|
req, err := c.conn.ReadRequest()
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
c.log("ERR: %s", err)
|
|
}
|
|
break
|
|
}
|
|
|
|
ok := c.handleRequest(req)
|
|
if !ok {
|
|
break
|
|
}
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
c.p.events <- programEventClientPlayStop{done, c}
|
|
<-done
|
|
}
|
|
|
|
if runOnReadCmd != nil {
|
|
runOnReadCmd.Process.Signal(os.Interrupt)
|
|
runOnReadCmd.Wait()
|
|
}
|
|
}
|
|
|
|
func (c *serverClient) runRecord(path string) {
|
|
pconf := c.findConfForPath(path)
|
|
|
|
c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
|
|
for trackId := range c.streamTracks {
|
|
c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
c.p.events <- programEventClientRecord{done, c}
|
|
<-done
|
|
|
|
c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
|
|
if len(c.streamTracks) == 1 {
|
|
return "track"
|
|
}
|
|
return "tracks"
|
|
}(), c.streamProtocol)
|
|
|
|
var runOnPublishCmd *exec.Cmd
|
|
if pconf.RunOnPublish != "" {
|
|
runOnPublishCmd = exec.Command("/bin/sh", "-c", pconf.RunOnPublish)
|
|
runOnPublishCmd.Stdout = os.Stdout
|
|
runOnPublishCmd.Stderr = os.Stderr
|
|
err := runOnPublishCmd.Start()
|
|
if err != nil {
|
|
c.log("ERR: %s", err)
|
|
}
|
|
}
|
|
|
|
if c.streamProtocol == gortsplib.StreamProtocolTcp {
|
|
frame := &gortsplib.InterleavedFrame{}
|
|
|
|
readDone := make(chan error)
|
|
go func() {
|
|
for {
|
|
frame.Content = c.readBuf.swap()
|
|
frame.Content = frame.Content[:cap(frame.Content)]
|
|
|
|
recv, err := c.conn.ReadFrameOrRequest(frame)
|
|
if err != nil {
|
|
readDone <- err
|
|
break
|
|
}
|
|
|
|
switch recvt := recv.(type) {
|
|
case *gortsplib.InterleavedFrame:
|
|
if frame.TrackId >= len(c.streamTracks) {
|
|
c.log("ERR: invalid track id '%d'", frame.TrackId)
|
|
readDone <- nil
|
|
break
|
|
}
|
|
|
|
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
|
|
c.p.events <- programEventClientFrameTcp{
|
|
c.path,
|
|
frame.TrackId,
|
|
frame.StreamType,
|
|
frame.Content,
|
|
}
|
|
|
|
case *gortsplib.Request:
|
|
ok := c.handleRequest(recvt)
|
|
if !ok {
|
|
readDone <- nil
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
checkStreamTicker := time.NewTicker(clientCheckStreamInterval)
|
|
receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
|
|
|
|
outer1:
|
|
for {
|
|
select {
|
|
case err := <-readDone:
|
|
if err != nil && err != io.EOF {
|
|
c.log("ERR: %s", err)
|
|
}
|
|
break outer1
|
|
|
|
case <-checkStreamTicker.C:
|
|
for trackId := range c.streamTracks {
|
|
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout {
|
|
c.log("ERR: stream is dead")
|
|
c.conn.NetConn().Close()
|
|
<-readDone
|
|
break outer1
|
|
}
|
|
}
|
|
|
|
case <-receiverReportTicker.C:
|
|
for trackId := range c.streamTracks {
|
|
frame := c.rtcpReceivers[trackId].Report()
|
|
c.conn.WriteFrame(&gortsplib.InterleavedFrame{
|
|
TrackId: trackId,
|
|
StreamType: gortsplib.StreamTypeRtcp,
|
|
Content: frame,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
checkStreamTicker.Stop()
|
|
receiverReportTicker.Stop()
|
|
|
|
} else {
|
|
readDone := make(chan error)
|
|
go func() {
|
|
for {
|
|
req, err := c.conn.ReadRequest()
|
|
if err != nil {
|
|
readDone <- err
|
|
break
|
|
}
|
|
|
|
ok := c.handleRequest(req)
|
|
if !ok {
|
|
readDone <- nil
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
|
|
checkStreamTicker := time.NewTicker(clientCheckStreamInterval)
|
|
receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
|
|
|
|
outer2:
|
|
for {
|
|
select {
|
|
case err := <-readDone:
|
|
if err != nil && err != io.EOF {
|
|
c.log("ERR: %s", err)
|
|
}
|
|
break outer2
|
|
|
|
case <-checkStreamTicker.C:
|
|
for trackId := range c.streamTracks {
|
|
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout {
|
|
c.log("ERR: stream is dead")
|
|
c.conn.NetConn().Close()
|
|
<-readDone
|
|
break outer2
|
|
}
|
|
}
|
|
|
|
case <-receiverReportTicker.C:
|
|
for trackId := range c.streamTracks {
|
|
frame := c.rtcpReceivers[trackId].Report()
|
|
c.p.rtcpl.writeChan <- &udpAddrBufPair{
|
|
addr: &net.UDPAddr{
|
|
IP: c.ip(),
|
|
Zone: c.zone(),
|
|
Port: c.streamTracks[trackId].rtcpPort,
|
|
},
|
|
buf: frame,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
checkStreamTicker.Stop()
|
|
receiverReportTicker.Stop()
|
|
}
|
|
|
|
done = make(chan struct{})
|
|
c.p.events <- programEventClientRecordStop{done, c}
|
|
<-done
|
|
|
|
for trackId := range c.streamTracks {
|
|
c.rtcpReceivers[trackId].Close()
|
|
}
|
|
|
|
if runOnPublishCmd != nil {
|
|
runOnPublishCmd.Process.Signal(os.Interrupt)
|
|
runOnPublishCmd.Wait()
|
|
}
|
|
}
|