mediamtx/client.go

845 lines
19 KiB
Go
Raw Normal View History

2019-12-31 12:48:17 +00:00
package main
import (
"fmt"
"io"
"log"
"net"
"net/url"
"strings"
2020-01-20 21:24:55 +00:00
"time"
2019-12-31 12:48:17 +00:00
2020-01-20 09:21:05 +00:00
"github.com/aler9/gortsplib"
2019-12-31 12:48:17 +00:00
"gortc.io/sdp"
)
2020-01-20 21:24:55 +00:00
const (
_READ_TIMEOUT = 5 * time.Second
_WRITE_TIMEOUT = 5 * time.Second
)
2019-12-31 13:55:46 +00:00
func interleavedChannelToTrack(channel int) (int, trackFlow) {
2019-12-31 12:48:17 +00:00
if (channel % 2) == 0 {
2019-12-31 13:55:46 +00:00
return (channel / 2), _TRACK_FLOW_RTP
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
return ((channel - 1) / 2), _TRACK_FLOW_RTCP
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
func trackToInterleavedChannel(id int, flow trackFlow) int {
2019-12-31 12:48:17 +00:00
if flow == _TRACK_FLOW_RTP {
return id * 2
}
return (id * 2) + 1
}
type clientState int
const (
_CLIENT_STATE_STARTING clientState = iota
_CLIENT_STATE_ANNOUNCE
_CLIENT_STATE_PRE_PLAY
_CLIENT_STATE_PLAY
_CLIENT_STATE_PRE_RECORD
_CLIENT_STATE_RECORD
)
2019-12-31 12:48:17 +00:00
type client struct {
p *program
2020-01-20 15:44:02 +00:00
conn *gortsplib.Conn
state clientState
2019-12-31 12:48:17 +00:00
ip net.IP
2019-12-31 13:55:46 +00:00
path string
2019-12-31 12:48:17 +00:00
streamSdpText []byte // filled only if publisher
streamSdpParsed *sdp.Message // filled only if publisher
streamProtocol streamProtocol
streamTracks []*track
}
2019-12-31 13:55:46 +00:00
func newClient(p *program, nconn net.Conn) *client {
2019-12-31 12:48:17 +00:00
c := &client{
p: p,
2020-01-20 15:44:02 +00:00
conn: gortsplib.NewConn(nconn),
state: _CLIENT_STATE_STARTING,
2019-12-31 12:48:17 +00:00
}
c.p.mutex.Lock()
c.p.clients[c] = struct{}{}
c.p.mutex.Unlock()
return c
}
func (c *client) close() error {
// already deleted
if _, ok := c.p.clients[c]; !ok {
return nil
}
delete(c.p.clients, c)
2020-01-20 15:44:02 +00:00
c.conn.NetConn().Close()
2019-12-31 12:48:17 +00:00
2019-12-31 13:55:46 +00:00
if c.path != "" {
if pub, ok := c.p.publishers[c.path]; ok && pub == c {
delete(c.p.publishers, c.path)
2019-12-31 12:48:17 +00:00
2019-12-31 13:55:46 +00:00
// if the publisher has disconnected
// close all other connections that share the same path
for oc := range c.p.clients {
if oc.path == c.path {
oc.close()
}
}
2019-12-31 12:48:17 +00:00
}
}
return nil
}
func (c *client) log(format string, args ...interface{}) {
2020-01-20 15:44:02 +00:00
format = "[RTSP client " + c.conn.NetConn().RemoteAddr().String() + "] " + format
2019-12-31 12:48:17 +00:00
log.Printf(format, args...)
}
func (c *client) run() {
defer c.log("disconnected")
defer func() {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
c.close()
}()
2020-01-20 15:44:02 +00:00
ipstr, _, _ := net.SplitHostPort(c.conn.NetConn().RemoteAddr().String())
2019-12-31 12:48:17 +00:00
c.ip = net.ParseIP(ipstr)
c.log("connected")
for {
2020-01-20 15:44:02 +00:00
req, err := c.conn.ReadRequest()
2019-12-31 12:48:17 +00:00
if err != nil {
if err != io.EOF {
c.log("ERR: %s", err)
}
return
}
2020-01-03 22:05:06 +00:00
ok := c.handleRequest(req)
if !ok {
2019-12-31 12:48:17 +00:00
return
2020-01-03 22:05:06 +00:00
}
}
}
2019-12-31 12:48:17 +00:00
2020-01-20 21:24:55 +00:00
func (c *client) writeResDeadline(res *gortsplib.Response) {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(_WRITE_TIMEOUT))
2020-01-20 15:44:02 +00:00
c.conn.WriteResponse(res)
2020-01-03 22:05:06 +00:00
}
2020-01-03 21:39:55 +00:00
2020-01-20 09:21:05 +00:00
func (c *client) writeResError(req *gortsplib.Request, err error) {
2020-01-03 22:05:06 +00:00
c.log("ERR: %s", err)
2019-12-31 12:48:17 +00:00
2020-01-03 22:05:06 +00:00
if cseq, ok := req.Headers["CSeq"]; ok {
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2020-01-03 22:05:06 +00:00
StatusCode: 400,
Status: "Bad Request",
Headers: map[string]string{
"CSeq": cseq,
},
})
} else {
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2020-01-03 22:05:06 +00:00
StatusCode: 400,
Status: "Bad Request",
})
2019-12-31 12:48:17 +00:00
}
}
2020-01-20 09:21:05 +00:00
func (c *client) handleRequest(req *gortsplib.Request) bool {
2020-01-03 22:05:06 +00:00
c.log(req.Method)
2019-12-31 12:48:17 +00:00
cseq, ok := req.Headers["CSeq"]
if !ok {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("cseq missing"))
return false
2019-12-31 12:48:17 +00:00
}
2020-01-03 21:39:55 +00:00
ur, err := url.Parse(req.Url)
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("unable to parse path '%s'", req.Url))
return false
2020-01-03 21:39:55 +00:00
}
path := func() string {
ret := ur.Path
2019-12-31 13:55:46 +00:00
// remove leading slash
2020-01-03 21:39:55 +00:00
if len(ret) > 1 {
ret = ret[1:]
2019-12-31 13:55:46 +00:00
}
// strip any subpath
2020-01-03 21:39:55 +00:00
if n := strings.Index(ret, "/"); n >= 0 {
ret = ret[:n]
2019-12-31 13:55:46 +00:00
}
2020-01-03 21:39:55 +00:00
return ret
2019-12-31 13:55:46 +00:00
}()
2019-12-31 12:48:17 +00:00
switch req.Method {
case "OPTIONS":
// do not check state, since OPTIONS can be requested
// in any state
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Public": strings.Join([]string{
"DESCRIBE",
"ANNOUNCE",
"SETUP",
"PLAY",
"PAUSE",
"RECORD",
"TEARDOWN",
}, ", "),
},
2020-01-03 22:05:06 +00:00
})
return true
2019-12-31 12:48:17 +00:00
case "DESCRIBE":
if c.state != _CLIENT_STATE_STARTING {
c.writeResError(req, fmt.Errorf("client is in state '%d'", c.state))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
sdp, err := func() ([]byte, error) {
c.p.mutex.RLock()
defer c.p.mutex.RUnlock()
2019-12-31 13:55:46 +00:00
pub, ok := c.p.publishers[path]
if !ok {
return nil, fmt.Errorf("no one is streaming on path '%s'", path)
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
return pub.streamSdpText, nil
2019-12-31 12:48:17 +00:00
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, err)
return false
2019-12-31 12:48:17 +00:00
}
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
2019-12-31 13:55:46 +00:00
"Content-Base": req.Url,
2019-12-31 12:48:17 +00:00
"Content-Type": "application/sdp",
},
Content: sdp,
2020-01-03 22:05:06 +00:00
})
return true
2019-12-31 12:48:17 +00:00
case "ANNOUNCE":
if c.state != _CLIENT_STATE_STARTING {
c.writeResError(req, fmt.Errorf("client is in state '%d'", c.state))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
ct, ok := req.Headers["Content-Type"]
if !ok {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("Content-Type header missing"))
return false
2019-12-31 12:48:17 +00:00
}
if ct != "application/sdp" {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("unsupported Content-Type '%s'", ct))
return false
2019-12-31 12:48:17 +00:00
}
sdpParsed, err := func() (*sdp.Message, error) {
s, err := sdp.DecodeSession(req.Content, nil)
if err != nil {
return nil, err
}
m := &sdp.Message{}
d := sdp.NewDecoder(s)
err = d.Decode(m)
if err != nil {
return nil, err
}
return m, nil
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("invalid SDP: %s", err))
return false
2019-12-31 12:48:17 +00:00
}
2020-01-03 21:39:55 +00:00
if c.p.publishKey != "" {
q, err := url.ParseQuery(ur.RawQuery)
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("unable to parse query"))
return false
2020-01-03 21:39:55 +00:00
}
key, ok := q["key"]
if !ok || len(key) != 1 || key[0] != c.p.publishKey {
2020-01-03 22:05:06 +00:00
// reply with 401 and exit
c.log("ERR: publish key wrong or missing")
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2020-01-03 22:05:06 +00:00
StatusCode: 401,
Status: "Unauthorized",
Headers: map[string]string{
"CSeq": req.Headers["CSeq"],
},
})
return false
2020-01-03 21:39:55 +00:00
}
}
2019-12-31 12:48:17 +00:00
err = func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
2019-12-31 13:55:46 +00:00
_, ok := c.p.publishers[path]
if ok {
return fmt.Errorf("another client is already publishing on path '%s'", path)
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
c.path = path
c.p.publishers[path] = c
2019-12-31 12:48:17 +00:00
c.streamSdpText = req.Content
c.streamSdpParsed = sdpParsed
c.state = _CLIENT_STATE_ANNOUNCE
2019-12-31 12:48:17 +00:00
return nil
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, err)
return false
2019-12-31 12:48:17 +00:00
}
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
},
2020-01-03 22:05:06 +00:00
})
return true
2019-12-31 12:48:17 +00:00
case "SETUP":
2020-01-15 21:27:44 +00:00
transportStr, ok := req.Headers["Transport"]
2019-12-31 12:48:17 +00:00
if !ok {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("transport header missing"))
return false
2019-12-31 12:48:17 +00:00
}
2020-01-20 11:53:06 +00:00
th := gortsplib.NewTransportHeader(transportStr)
2019-12-31 12:48:17 +00:00
if _, ok := th["unicast"]; !ok {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("transport header does not contain unicast"))
return false
2019-12-31 12:48:17 +00:00
}
switch c.state {
// play
case _CLIENT_STATE_STARTING, _CLIENT_STATE_PRE_PLAY:
2019-12-31 12:48:17 +00:00
// play via UDP
if func() bool {
_, ok := th["RTP/AVP"]
if ok {
return true
}
_, ok = th["RTP/AVP/UDP"]
if ok {
return true
}
return false
}() {
if _, ok := c.p.protocols[_STREAM_PROTOCOL_UDP]; !ok {
c.log("ERR: udp streaming is disabled")
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
"CSeq": cseq,
},
})
return false
}
2020-01-20 11:53:06 +00:00
rtpPort, rtcpPort := th.GetPorts("client_port")
2019-12-31 12:48:17 +00:00
if rtpPort == 0 || rtcpPort == 0 {
2020-01-15 21:27:44 +00:00
c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", transportStr))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if c.path != "" && path != c.path {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("path has changed"))
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
err = func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
2019-12-31 13:55:46 +00:00
pub, ok := c.p.publishers[path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path)
}
2019-12-31 12:48:17 +00:00
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP {
return fmt.Errorf("client want to send tracks with different protocols")
}
2019-12-31 13:55:46 +00:00
if len(c.streamTracks) >= len(pub.streamSdpParsed.Medias) {
2019-12-31 12:48:17 +00:00
return fmt.Errorf("all the tracks have already been setup")
}
2019-12-31 13:55:46 +00:00
c.path = path
2019-12-31 12:48:17 +00:00
c.streamProtocol = _STREAM_PROTOCOL_UDP
c.streamTracks = append(c.streamTracks, &track{
rtpPort: rtpPort,
rtcpPort: rtcpPort,
})
c.state = _CLIENT_STATE_PRE_PLAY
2019-12-31 12:48:17 +00:00
return nil
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, err)
return false
2019-12-31 12:48:17 +00:00
}
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP/UDP",
2019-12-31 12:48:17 +00:00
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
2019-12-31 12:48:17 +00:00
}, ";"),
"Session": "12345678",
},
2020-01-03 22:05:06 +00:00
})
return true
2019-12-31 12:48:17 +00:00
// play via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.protocols[_STREAM_PROTOCOL_TCP]; !ok {
c.log("ERR: tcp streaming is disabled")
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
"CSeq": cseq,
},
})
return false
}
2019-12-31 13:55:46 +00:00
if c.path != "" && path != c.path {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("path has changed"))
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
err = func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
2019-12-31 13:55:46 +00:00
pub, ok := c.p.publishers[path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path)
}
2019-12-31 12:48:17 +00:00
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP {
return fmt.Errorf("client want to send tracks with different protocols")
}
2019-12-31 13:55:46 +00:00
if len(c.streamTracks) >= len(pub.streamSdpParsed.Medias) {
2019-12-31 12:48:17 +00:00
return fmt.Errorf("all the tracks have already been setup")
}
2019-12-31 13:55:46 +00:00
c.path = path
2019-12-31 12:48:17 +00:00
c.streamProtocol = _STREAM_PROTOCOL_TCP
c.streamTracks = append(c.streamTracks, &track{
rtpPort: 0,
rtcpPort: 0,
})
c.state = _CLIENT_STATE_PRE_PLAY
2019-12-31 12:48:17 +00:00
return nil
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, err)
return false
2019-12-31 12:48:17 +00:00
}
interleaved := fmt.Sprintf("%d-%d", ((len(c.streamTracks) - 1) * 2), ((len(c.streamTracks)-1)*2)+1)
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%s", interleaved),
}, ";"),
"Session": "12345678",
},
2020-01-03 22:05:06 +00:00
})
return true
2019-12-31 12:48:17 +00:00
} else {
2020-01-15 21:27:44 +00:00
c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", transportStr))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
// record
case _CLIENT_STATE_ANNOUNCE, _CLIENT_STATE_PRE_RECORD:
2019-12-31 12:48:17 +00:00
if _, ok := th["mode=record"]; !ok {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("transport header does not contain mode=record"))
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if path != c.path {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("path has changed"))
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
// record via UDP
2020-01-15 21:27:44 +00:00
if func() bool {
_, ok := th["RTP/AVP"]
if ok {
return true
}
_, ok = th["RTP/AVP/UDP"]
if ok {
return true
}
return false
}() {
if _, ok := c.p.protocols[_STREAM_PROTOCOL_UDP]; !ok {
c.log("ERR: udp streaming is disabled")
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
"CSeq": cseq,
},
})
return false
}
2020-01-20 11:53:06 +00:00
rtpPort, rtcpPort := th.GetPorts("client_port")
2019-12-31 12:48:17 +00:00
if rtpPort == 0 || rtcpPort == 0 {
2020-01-15 21:27:44 +00:00
c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", transportStr))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
err = func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP {
return fmt.Errorf("client want to send tracks with different protocols")
}
if len(c.streamTracks) >= len(c.streamSdpParsed.Medias) {
return fmt.Errorf("all the tracks have already been setup")
}
c.streamProtocol = _STREAM_PROTOCOL_UDP
c.streamTracks = append(c.streamTracks, &track{
rtpPort: rtpPort,
rtcpPort: rtcpPort,
})
c.state = _CLIENT_STATE_PRE_RECORD
2019-12-31 12:48:17 +00:00
return nil
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, err)
return false
2019-12-31 12:48:17 +00:00
}
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP/UDP",
2019-12-31 12:48:17 +00:00
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
}, ";"),
"Session": "12345678",
},
2020-01-03 22:05:06 +00:00
})
return true
2019-12-31 12:48:17 +00:00
// record via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.protocols[_STREAM_PROTOCOL_TCP]; !ok {
c.log("ERR: tcp streaming is disabled")
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
"CSeq": cseq,
},
})
return false
}
2019-12-31 12:48:17 +00:00
var interleaved string
err = func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP {
return fmt.Errorf("client want to send tracks with different protocols")
}
if len(c.streamTracks) >= len(c.streamSdpParsed.Medias) {
return fmt.Errorf("all the tracks have already been setup")
}
2020-01-20 11:53:06 +00:00
interleaved = th.GetValue("interleaved")
2019-12-31 12:48:17 +00:00
if interleaved == "" {
return fmt.Errorf("transport header does not contain interleaved field")
}
expInterleaved := fmt.Sprintf("%d-%d", 0+len(c.streamTracks)*2, 1+len(c.streamTracks)*2)
if interleaved != expInterleaved {
return fmt.Errorf("wrong interleaved value, expected '%s', got '%s'", expInterleaved, interleaved)
}
c.streamProtocol = _STREAM_PROTOCOL_TCP
c.streamTracks = append(c.streamTracks, &track{
rtpPort: 0,
rtcpPort: 0,
})
c.state = _CLIENT_STATE_PRE_RECORD
2019-12-31 12:48:17 +00:00
return nil
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, err)
return false
2019-12-31 12:48:17 +00:00
}
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%s", interleaved),
}, ";"),
"Session": "12345678",
},
2020-01-03 22:05:06 +00:00
})
return true
2019-12-31 12:48:17 +00:00
} else {
2020-01-15 21:27:44 +00:00
c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", transportStr))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
default:
c.writeResError(req, fmt.Errorf("client is in state '%d'", c.state))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
case "PLAY":
if c.state != _CLIENT_STATE_PRE_PLAY {
c.writeResError(req, fmt.Errorf("client is in state '%d'", c.state))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if path != c.path {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("path has changed"))
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
err := func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
2019-12-31 13:55:46 +00:00
pub, ok := c.p.publishers[c.path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", c.path)
}
if len(c.streamTracks) != len(pub.streamSdpParsed.Medias) {
2019-12-31 12:48:17 +00:00
return fmt.Errorf("not all tracks have been setup")
}
return nil
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, err)
return false
2019-12-31 12:48:17 +00:00
}
2020-01-03 22:05:06 +00:00
// first write response, then set state
// otherwise, in case of TCP connections, RTP packets could be written
// before the response
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Session": "12345678",
},
2020-01-03 22:05:06 +00:00
})
c.log("is receiving on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
return "tracks"
}(), c.streamProtocol)
c.p.mutex.Lock()
c.state = _CLIENT_STATE_PLAY
2020-01-03 22:05:06 +00:00
c.p.mutex.Unlock()
// when protocol is TCP, the RTSP connection becomes a RTP connection
// receive RTP feedback, do not parse it, wait until connection closes
if c.streamProtocol == _STREAM_PROTOCOL_TCP {
buf := make([]byte, 2048)
for {
2020-01-20 15:44:02 +00:00
_, err := c.conn.NetConn().Read(buf)
2020-01-03 22:05:06 +00:00
if err != nil {
if err != io.EOF {
c.log("ERR: %s", err)
}
return false
}
}
}
return true
2019-12-31 12:48:17 +00:00
case "PAUSE":
if c.state != _CLIENT_STATE_PLAY {
c.writeResError(req, fmt.Errorf("client is in state '%d'", c.state))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if path != c.path {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("path has changed"))
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
c.log("paused")
c.p.mutex.Lock()
c.state = _CLIENT_STATE_PRE_PLAY
2019-12-31 12:48:17 +00:00
c.p.mutex.Unlock()
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Session": "12345678",
},
2020-01-03 22:05:06 +00:00
})
return true
2019-12-31 12:48:17 +00:00
case "RECORD":
if c.state != _CLIENT_STATE_PRE_RECORD {
c.writeResError(req, fmt.Errorf("client is in state '%d'", c.state))
2020-01-03 22:05:06 +00:00
return false
2019-12-31 12:48:17 +00:00
}
2019-12-31 13:55:46 +00:00
if path != c.path {
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("path has changed"))
return false
2019-12-31 13:55:46 +00:00
}
2019-12-31 12:48:17 +00:00
err := func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
if len(c.streamTracks) != len(c.streamSdpParsed.Medias) {
return fmt.Errorf("not all tracks have been setup")
}
return nil
}()
if err != nil {
2020-01-03 22:05:06 +00:00
c.writeResError(req, err)
return false
2019-12-31 12:48:17 +00:00
}
2020-01-20 21:24:55 +00:00
c.writeResDeadline(&gortsplib.Response{
2019-12-31 12:48:17 +00:00
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Session": "12345678",
},
2020-01-03 22:05:06 +00:00
})
c.p.mutex.Lock()
c.state = _CLIENT_STATE_RECORD
2020-01-03 22:05:06 +00:00
c.p.mutex.Unlock()
c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
return "tracks"
}(), c.streamProtocol)
// when protocol is TCP, the RTSP connection becomes a RTP connection
// receive RTP data and parse it
if c.streamProtocol == _STREAM_PROTOCOL_TCP {
buf := make([]byte, 2048)
for {
2020-01-20 21:24:55 +00:00
c.conn.NetConn().SetReadDeadline(time.Now().Add(_READ_TIMEOUT))
2020-01-20 15:44:02 +00:00
channel, n, err := c.conn.ReadInterleavedFrame(buf)
2020-01-03 22:05:06 +00:00
if err != nil {
2020-01-20 21:24:55 +00:00
c.log("ERR: %s", err)
2020-01-03 22:05:06 +00:00
return false
}
trackId, trackFlow := interleavedChannelToTrack(channel)
if trackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", trackId)
return false
}
c.p.mutex.RLock()
c.p.forwardTrack(c.path, trackId, trackFlow, buf[:n])
c.p.mutex.RUnlock()
}
}
return true
2019-12-31 12:48:17 +00:00
case "TEARDOWN":
2020-01-03 22:05:06 +00:00
// close connection silently
return false
2019-12-31 12:48:17 +00:00
default:
2020-01-03 22:05:06 +00:00
c.writeResError(req, fmt.Errorf("unhandled method '%s'", req.Method))
return false
2019-12-31 12:48:17 +00:00
}
}