initial commit

This commit is contained in:
aler9 2019-12-28 22:07:03 +01:00
parent e9edb62249
commit 2f3baafb54
17 changed files with 1465 additions and 1 deletions

2
.dockerignore Normal file
View File

@ -0,0 +1,2 @@
.git
/release

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/release

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2019 aler9
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

101
Makefile Normal file
View File

@ -0,0 +1,101 @@
.PHONY: $(shell ls)
BASE_IMAGE = amd64/golang:1.13-alpine3.10
help:
@echo "usage: make [action]"
@echo ""
@echo "available actions:"
@echo ""
@echo " mod-tidy run go mod tidy"
@echo " format format source files"
@echo " test run available tests"
@echo " run run app"
@echo ""
mod-tidy:
docker run --rm -it -v $(PWD):/s $(BASE_IMAGE) \
sh -c "apk add git && cd /s && go get && go mod tidy"
format:
docker run --rm -it -v $(PWD):/s $(BASE_IMAGE) \
sh -c "cd /s && find . -type f -name '*.go' | xargs gofmt -l -w -s"
define DOCKERFILE_TEST
FROM $(BASE_IMAGE)
RUN apk add --no-cache make git
WORKDIR /s
COPY go.mod go.sum ./
RUN go mod download
COPY . ./
endef
export DOCKERFILE_TEST
test:
echo "$$DOCKERFILE_TEST" | docker build -q . -f - -t temp
docker run --rm -it \
--name temp \
temp \
make test-nodocker
IMAGES = $(shell echo test-images/*/ | xargs -n1 basename)
test-nodocker:
$(eval export CGO_ENABLED = 0)
go test -v ./rtsp
define DOCKERFILE_RUN
FROM $(BASE_IMAGE)
RUN apk add --no-cache git
WORKDIR /s
COPY go.mod go.sum ./
RUN go mod download
COPY . ./
RUN go build -o /out .
endef
export DOCKERFILE_RUN
run:
echo "$$DOCKERFILE_RUN" | docker build -q . -f - -t temp
docker run --rm -it \
--network=host \
--name temp \
temp \
/out
define DOCKERFILE_RELEASE
FROM $(BASE_IMAGE)
RUN apk add --no-cache zip make git tar
WORKDIR /s
COPY go.mod go.sum ./
RUN go mod download
COPY . ./
RUN make release-nodocker
endef
export DOCKERFILE_RELEASE
release:
echo "$$DOCKERFILE_RELEASE" | docker build . -f - -t temp \
&& docker run --rm -it -v $(PWD):/out \
temp sh -c "rm -rf /out/release && cp -r /s/release /out/"
release-nodocker:
$(eval VERSION := $(shell git describe --tags))
$(eval GOBUILD := go build -ldflags '-X "main.Version=$(VERSION)"')
rm -rf release && mkdir release
CGO_ENABLED=0 GOOS=windows GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server.exe
cd /tmp && zip -q $(PWD)/release/rtsp-simple-server_$(VERSION)_windows_amd64.zip rtsp-simple-server.exe
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=6 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm6.tar.gz --owner=0 --group=0 rtsp-simple-server
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm7.tar.gz --owner=0 --group=0 rtsp-simple-server
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 $(GOBUILD) -o /tmp/rtsp-simple-server
tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm64.tar.gz --owner=0 --group=0 rtsp-simple-server

View File

@ -1,2 +1,58 @@
# rtsp-simple-server
RTSP video server written in Go
_rtsp-simple-server_ is a simple an ready-to-use RTSP video server, a program that allows multiple users to publish or read live video and audio streams. RTSP a standardized protocol that defines how to perform these operations with the help of a server, that both publishers and readers can contact in order to negotiate a streaming protocol and write or read data. The server is then responsible of linking the publisher stream with the readers.
This software was developed with the aim of simulating a live camera feed for debugging purposes, and therefore to use files instead of real streams. Another reason for the development was the deprecation of _FFserver_, the component of the FFmpeg project that allowed to create a RTSP server with _FFmpeg_ (but this server can be used with any software that supports RTSP).
It actually supports *one* publisher, while readers can be more than one.
## Installation
Precompiled binaries are available in the [release](https://github.com/aler9/rtsp-simple-server/releases) page. Just download and extract the executable.
## Usage
1. Start the server:
```
./rtsp-simple-server
```
2. In another terminal, publish something with FFmpeg (in this example it's a video file, but it can be anything you want):
```
ffmpeg -re -stream_loop -1 -i file.ts -map 0:v:0 -c:v copy -f rtsp rtsp://localhost:8554/
```
3. Open the stream with VLC:
```
vlc rtsp://localhost:8554/
```
you can alternatively use GStreamer:
```
gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/ ! rtph264depay ! decodebin ! autovideosink
```
## Full command-line usage
```
usage: rtsp-simple-server [<flags>]
rtsp-simple-server
RTSP server.
Flags:
--help Show context-sensitive help (also try --help-long and --help-man).
--version print rtsp-simple-server version
--rtsp-port=8554 port of the RTSP TCP listener
--rtp-port=8000 port of the RTP UDP listener
--rtcp-port=8001 port of the RTCP UDP listener
```
## Links
Specifications
* https://tools.ietf.org/html/rfc7826

10
go.mod Normal file
View File

@ -0,0 +1,10 @@
module rtsp-server
go 1.13
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/stretchr/testify v1.4.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)

17
go.sum Normal file
View File

@ -0,0 +1,17 @@
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

116
main.go Normal file
View File

@ -0,0 +1,116 @@
package main
import (
"fmt"
"log"
"net"
"os"
"sync"
"gopkg.in/alecthomas/kingpin.v2"
)
var Version string = "v0.0.0"
type program struct {
rtspPort int
rtpPort int
rtcpPort int
mutex sync.Mutex
rtspl *rtspListener
rtpl *udpListener
rtcpl *udpListener
clients map[*rtspClient]struct{}
streamAuthor *rtspClient
streamSdp []byte
}
func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) {
p := &program{
rtspPort: rtspPort,
rtpPort: rtpPort,
rtcpPort: rtcpPort,
clients: make(map[*rtspClient]struct{}),
}
var err error
p.rtpl, err = newUdpListener(rtpPort, "RTP", func(l *udpListener, buf []byte) {
p.mutex.Lock()
defer p.mutex.Unlock()
for c := range p.clients {
if c.state == "PLAY" {
l.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtpPort,
})
}
}
})
if err != nil {
return nil, err
}
p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", func(l *udpListener, buf []byte) {
p.mutex.Lock()
defer p.mutex.Unlock()
for c := range p.clients {
if c.state == "PLAY" {
l.nconn.WriteTo(buf, &net.UDPAddr{
IP: c.IP,
Port: c.rtcpPort,
})
}
}
})
if err != nil {
return nil, err
}
p.rtspl, err = newRtspListener(p)
if err != nil {
return nil, err
}
return p, nil
}
func (p *program) run() {
var wg sync.WaitGroup
wg.Add(1)
go p.rtpl.run(wg)
wg.Add(1)
go p.rtcpl.run(wg)
wg.Add(1)
go p.rtspl.run(wg)
wg.Wait()
}
func main() {
kingpin.CommandLine.Help = "rtsp-simple-server " + Version + "\n\n" +
"RTSP server."
version := kingpin.Flag("version", "print rtsp-simple-server version").Bool()
rtspPort := kingpin.Flag("rtsp-port", "port of the RTSP TCP listener").Default("8554").Int()
rtpPort := kingpin.Flag("rtp-port", "port of the RTP UDP listener").Default("8000").Int()
rtcpPort := kingpin.Flag("rtcp-port", "port of the RTCP UDP listener").Default("8001").Int()
kingpin.Parse()
if *version == true {
fmt.Println("rtsp-simple-server " + Version)
os.Exit(0)
}
p, err := newProgram(*rtspPort, *rtpPort, *rtcpPort)
if err != nil {
log.Fatal("ERR:", err)
}
p.run()
}

39
rtsp/conn.go Normal file
View File

@ -0,0 +1,39 @@
package rtsp
import (
"net"
)
type Conn struct {
c net.Conn
}
func NewConn(c net.Conn) *Conn {
return &Conn{
c: c,
}
}
func (c *Conn) Close() error {
return c.c.Close()
}
func (c *Conn) RemoteAddr() net.Addr {
return c.c.RemoteAddr()
}
func (c *Conn) ReadRequest() (*Request, error) {
return requestDecode(c.c)
}
func (c *Conn) WriteRequest(req *Request) error {
return requestEncode(c.c, req)
}
func (c *Conn) ReadResponse() (*Response, error) {
return responseDecode(c.c)
}
func (c *Conn) WriteResponse(res *Response) error {
return responseEncode(c.c, res)
}

88
rtsp/request.go Normal file
View File

@ -0,0 +1,88 @@
package rtsp
import (
"bufio"
"fmt"
"io"
)
type Request struct {
Method string
Path string
Headers map[string]string
Content []byte
}
func requestDecode(r io.Reader) (*Request, error) {
rb := bufio.NewReader(r)
req := &Request{}
byts, err := readBytesLimited(rb, ' ', 255)
if err != nil {
return nil, err
}
req.Method = string(byts[:len(byts)-1])
if len(req.Method) == 0 {
return nil, fmt.Errorf("empty method")
}
byts, err = readBytesLimited(rb, ' ', 255)
if err != nil {
return nil, err
}
req.Path = string(byts[:len(byts)-1])
if len(req.Path) == 0 {
return nil, fmt.Errorf("empty path")
}
byts, err = readBytesLimited(rb, '\r', 255)
if err != nil {
return nil, err
}
proto := string(byts[:len(byts)-1])
if proto != _RTSP_PROTO {
return nil, fmt.Errorf("expected '%s', got '%s'", _RTSP_PROTO, proto)
}
err = readByteEqual(rb, '\n')
if err != nil {
return nil, err
}
req.Headers, err = readHeaders(rb)
if err != nil {
return nil, err
}
req.Content, err = readContent(rb, req.Headers)
if err != nil {
return nil, err
}
return req, nil
}
func requestEncode(w io.Writer, req *Request) error {
wb := bufio.NewWriter(w)
_, err := wb.Write([]byte(req.Method + " " + req.Path + " " + _RTSP_PROTO + "\r\n"))
if err != nil {
return err
}
err = writeHeaders(wb, req.Headers)
if err != nil {
return err
}
err = writeContent(wb, req.Content)
if err != nil {
return err
}
return wb.Flush()
}

134
rtsp/request_test.go Normal file
View File

@ -0,0 +1,134 @@
package rtsp
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
)
var casesRequest = []struct {
name string
byts []byte
req *Request
}{
{
"options",
[]byte("OPTIONS rtsp://example.com/media.mp4 RTSP/1.0\r\n" +
"CSeq: 1\r\n" +
"Proxy-Require: gzipped-messages\r\n" +
"Require: implicit-play\r\n" +
"\r\n"),
&Request{
Method: "OPTIONS",
Path: "rtsp://example.com/media.mp4",
Headers: map[string]string{
"CSeq": "1",
"Require": "implicit-play",
"Proxy-Require": "gzipped-messages",
},
},
},
{
"describe",
[]byte("DESCRIBE rtsp://example.com/media.mp4 RTSP/1.0\r\n" +
"CSeq: 2\r\n" +
"\r\n"),
&Request{
Method: "DESCRIBE",
Path: "rtsp://example.com/media.mp4",
Headers: map[string]string{
"CSeq": "2",
},
},
},
{
"announce",
[]byte("ANNOUNCE rtsp://example.com/media.mp4 RTSP/1.0\r\n" +
"CSeq: 7\r\n" +
"Content-Length: 306\r\n" +
"Content-Type: application/sdp\r\n" +
"Date: 23 Jan 1997 15:35:06 GMT\r\n" +
"Session: 12345678\r\n" +
"\r\n" +
"v=0\n" +
"o=mhandley 2890844526 2890845468 IN IP4 126.16.64.4\n" +
"s=SDP Seminar\n" +
"i=A Seminar on the session description protocol\n" +
"u=http://www.cs.ucl.ac.uk/staff/M.Handley/sdp.03.ps\n" +
"e=mjh@isi.edu (Mark Handley)\n" +
"c=IN IP4 224.2.17.12/127\n" +
"t=2873397496 2873404696\n" +
"a=recvonly\n" +
"m=audio 3456 RTP/AVP 0\n" +
"m=video 2232 RTP/AVP 31\n"),
&Request{
Method: "ANNOUNCE",
Path: "rtsp://example.com/media.mp4",
Headers: map[string]string{
"CSeq": "7",
"Date": "23 Jan 1997 15:35:06 GMT",
"Session": "12345678",
"Content-Type": "application/sdp",
"Content-Length": "306",
},
Content: []byte("v=0\n" +
"o=mhandley 2890844526 2890845468 IN IP4 126.16.64.4\n" +
"s=SDP Seminar\n" +
"i=A Seminar on the session description protocol\n" +
"u=http://www.cs.ucl.ac.uk/staff/M.Handley/sdp.03.ps\n" +
"e=mjh@isi.edu (Mark Handley)\n" +
"c=IN IP4 224.2.17.12/127\n" +
"t=2873397496 2873404696\n" +
"a=recvonly\n" +
"m=audio 3456 RTP/AVP 0\n" +
"m=video 2232 RTP/AVP 31\n",
),
},
},
{
"get_parameter",
[]byte("GET_PARAMETER rtsp://example.com/media.mp4 RTSP/1.0\r\n" +
"CSeq: 9\r\n" +
"Content-Length: 24\r\n" +
"Content-Type: text/parameters\r\n" +
"Session: 12345678\r\n" +
"\r\n" +
"packets_received\n" +
"jitter\n"),
&Request{
Method: "GET_PARAMETER",
Path: "rtsp://example.com/media.mp4",
Headers: map[string]string{
"CSeq": "9",
"Content-Type": "text/parameters",
"Session": "12345678",
"Content-Length": "24",
},
Content: []byte("packets_received\n" +
"jitter\n",
),
},
},
}
func TestRequestDecode(t *testing.T) {
for _, c := range casesRequest {
t.Run(c.name, func(t *testing.T) {
req, err := requestDecode(bytes.NewBuffer(c.byts))
require.NoError(t, err)
require.Equal(t, c.req, req)
})
}
}
func TestRequestEncode(t *testing.T) {
for _, c := range casesRequest {
t.Run(c.name, func(t *testing.T) {
var buf bytes.Buffer
err := requestEncode(&buf, c.req)
require.NoError(t, err)
require.Equal(t, c.byts, buf.Bytes())
})
}
}

95
rtsp/response.go Normal file
View File

@ -0,0 +1,95 @@
package rtsp
import (
"bufio"
"fmt"
"io"
"strconv"
)
type Response struct {
StatusCode int
Status string
Headers map[string]string
Content []byte
}
func responseDecode(r io.Reader) (*Response, error) {
rb := bufio.NewReader(r)
res := &Response{}
byts, err := readBytesLimited(rb, ' ', 255)
if err != nil {
return nil, err
}
proto := string(byts[:len(byts)-1])
if proto != _RTSP_PROTO {
return nil, fmt.Errorf("expected '%s', got '%s'", _RTSP_PROTO, proto)
}
byts, err = readBytesLimited(rb, ' ', 4)
if err != nil {
return nil, err
}
statusCodeStr := string(byts[:len(byts)-1])
statusCode64, err := strconv.ParseInt(statusCodeStr, 10, 32)
res.StatusCode = int(statusCode64)
if err != nil {
return nil, fmt.Errorf("unable to parse status code")
}
byts, err = readBytesLimited(rb, '\r', 255)
if err != nil {
return nil, err
}
res.Status = string(byts[:len(byts)-1])
if len(res.Status) == 0 {
return nil, fmt.Errorf("empty status")
}
err = readByteEqual(rb, '\n')
if err != nil {
return nil, err
}
res.Headers, err = readHeaders(rb)
if err != nil {
return nil, err
}
res.Content, err = readContent(rb, res.Headers)
if err != nil {
return nil, err
}
return res, nil
}
func responseEncode(w io.Writer, res *Response) error {
wb := bufio.NewWriter(w)
_, err := wb.Write([]byte(_RTSP_PROTO + " " + strconv.FormatInt(int64(res.StatusCode), 10) + " " + res.Status + "\r\n"))
if err != nil {
return err
}
if len(res.Content) != 0 {
res.Headers["Content-Length"] = strconv.FormatInt(int64(len(res.Content)), 10)
}
err = writeHeaders(wb, res.Headers)
if err != nil {
return err
}
err = writeContent(wb, res.Content)
if err != nil {
return err
}
return wb.Flush()
}

105
rtsp/response_test.go Normal file
View File

@ -0,0 +1,105 @@
package rtsp
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
)
var casesResponse = []struct {
name string
byts []byte
res *Response
}{
{
"ok",
[]byte("RTSP/1.0 200 OK\r\n" +
"CSeq: 1\r\n" +
"Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r\n" +
"\r\n",
),
&Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": "1",
"Public": "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE",
},
},
},
{
"ok with content",
[]byte("RTSP/1.0 200 OK\r\n" +
"CSeq: 2\r\n" +
"Content-Base: rtsp://example.com/media.mp4\r\n" +
"Content-Length: 444\r\n" +
"Content-Type: application/sdp\r\n" +
"\r\n" +
"m=video 0 RTP/AVP 96\n" +
"a=control:streamid=0\n" +
"a=range:npt=0-7.741000\n" +
"a=length:npt=7.741000\n" +
"a=rtpmap:96 MP4V-ES/5544\n" +
"a=mimetype:string;\"video/MP4V-ES\"\n" +
"a=AvgBitRate:integer;304018\n" +
"a=StreamName:string;\"hinted video track\"\n" +
"m=audio 0 RTP/AVP 97\n" +
"a=control:streamid=1\n" +
"a=range:npt=0-7.712000\n" +
"a=length:npt=7.712000\n" +
"a=rtpmap:97 mpeg4-generic/32000/2\n" +
"a=mimetype:string;\"audio/mpeg4-generic\"\n" +
"a=AvgBitRate:integer;65790\n" +
"a=StreamName:string;\"hinted audio track\"\n",
),
&Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"Content-Base": "rtsp://example.com/media.mp4",
"Content-Length": "444",
"Content-Type": "application/sdp",
"CSeq": "2",
},
Content: []byte("m=video 0 RTP/AVP 96\n" +
"a=control:streamid=0\n" +
"a=range:npt=0-7.741000\n" +
"a=length:npt=7.741000\n" +
"a=rtpmap:96 MP4V-ES/5544\n" +
"a=mimetype:string;\"video/MP4V-ES\"\n" +
"a=AvgBitRate:integer;304018\n" +
"a=StreamName:string;\"hinted video track\"\n" +
"m=audio 0 RTP/AVP 97\n" +
"a=control:streamid=1\n" +
"a=range:npt=0-7.712000\n" +
"a=length:npt=7.712000\n" +
"a=rtpmap:97 mpeg4-generic/32000/2\n" +
"a=mimetype:string;\"audio/mpeg4-generic\"\n" +
"a=AvgBitRate:integer;65790\n" +
"a=StreamName:string;\"hinted audio track\"\n",
),
},
},
}
func TestResponseDecode(t *testing.T) {
for _, c := range casesResponse {
t.Run(c.name, func(t *testing.T) {
res, err := responseDecode(bytes.NewBuffer(c.byts))
require.NoError(t, err)
require.Equal(t, c.res, res)
})
}
}
func TestResponseEncode(t *testing.T) {
for _, c := range casesResponse {
t.Run(c.name, func(t *testing.T) {
var buf bytes.Buffer
err := responseEncode(&buf, c.res)
require.NoError(t, err)
require.Equal(t, c.byts, buf.Bytes())
})
}
}

161
rtsp/utils.go Normal file
View File

@ -0,0 +1,161 @@
package rtsp
import (
"bufio"
"fmt"
"io"
"sort"
"strconv"
)
const (
_RTSP_PROTO = "RTSP/1.0"
_MAX_HEADER_COUNT = 255
_MAX_HEADER_KEY_LENGTH = 255
_MAX_HEADER_VALUE_LENGTH = 255
_MAX_CONTENT_LENGTH = 4096
)
func readBytesLimited(rb *bufio.Reader, delim byte, n int) ([]byte, error) {
for i := 1; i <= n; i++ {
byts, err := rb.Peek(i)
if err != nil {
return nil, err
}
if byts[len(byts)-1] == delim {
rb.Discard(len(byts))
return byts, nil
}
}
return nil, fmt.Errorf("buffer length exceeds %d", n)
}
func readByteEqual(rb *bufio.Reader, cmp byte) error {
byt, err := rb.ReadByte()
if err != nil {
return err
}
if byt != cmp {
return fmt.Errorf("expected '%c', got '%c'", cmp, byt)
}
return nil
}
func readHeaders(rb *bufio.Reader) (map[string]string, error) {
ret := make(map[string]string)
for {
byt, err := rb.ReadByte()
if err != nil {
return nil, err
}
if byt == '\r' {
err := readByteEqual(rb, '\n')
if err != nil {
return nil, err
}
break
}
if len(ret) >= _MAX_HEADER_COUNT {
return nil, fmt.Errorf("headers count exceeds %d", _MAX_HEADER_COUNT)
}
key := string([]byte{byt})
byts, err := readBytesLimited(rb, ':', _MAX_HEADER_KEY_LENGTH-1)
if err != nil {
return nil, err
}
key += string(byts[:len(byts)-1])
err = readByteEqual(rb, ' ')
if err != nil {
return nil, err
}
byts, err = readBytesLimited(rb, '\r', _MAX_HEADER_VALUE_LENGTH)
if err != nil {
return nil, err
}
val := string(byts[:len(byts)-1])
if len(val) == 0 {
return nil, fmt.Errorf("empty header value")
}
err = readByteEqual(rb, '\n')
if err != nil {
return nil, err
}
ret[key] = val
}
return ret, nil
}
func writeHeaders(wb *bufio.Writer, headers map[string]string) error {
// sort headers by key
// in order to obtain deterministic results
var keys []string
for key := range headers {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
_, err := wb.Write([]byte(key + ": " + headers[key] + "\r\n"))
if err != nil {
return err
}
}
_, err := wb.Write([]byte("\r\n"))
if err != nil {
return err
}
return nil
}
func readContent(rb *bufio.Reader, headers map[string]string) ([]byte, error) {
cls, ok := headers["Content-Length"]
if !ok {
return nil, nil
}
cl, err := strconv.ParseInt(cls, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid Content-Length")
}
if cl > _MAX_CONTENT_LENGTH {
return nil, fmt.Errorf("Content-Length exceeds %d", _MAX_CONTENT_LENGTH)
}
ret := make([]byte, cl)
n, err := io.ReadFull(rb, ret)
if err != nil && n != len(ret) {
return nil, err
}
return ret, nil
}
func writeContent(wb *bufio.Writer, content []byte) error {
if len(content) == 0 {
return nil
}
_, err := wb.Write(content)
if err != nil {
return err
}
return nil
}

414
rtsp_client.go Normal file
View File

@ -0,0 +1,414 @@
package main
import (
"fmt"
"io"
"log"
"net"
"net/url"
"strconv"
"strings"
"sync"
"rtsp-server/rtsp"
)
type rtspClient struct {
p *program
rconn *rtsp.Conn
state string
IP net.IP
rtpPort int
rtcpPort int
}
func newRtspClient(p *program, rconn *rtsp.Conn) *rtspClient {
c := &rtspClient{
p: p,
rconn: rconn,
state: "STARTING",
}
c.p.mutex.Lock()
c.p.clients[c] = struct{}{}
c.p.mutex.Unlock()
return c
}
func (c *rtspClient) close() error {
delete(c.p.clients, c)
if c.p.streamAuthor == c {
c.p.streamAuthor = nil
c.p.streamSdp = nil
// if the streamer has disconnected
// close all other connections
for oc := range c.p.clients {
oc.close()
}
}
return c.rconn.Close()
}
func (c *rtspClient) log(format string, args ...interface{}) {
format = "[RTSP client " + c.rconn.RemoteAddr().String() + "] " + format
log.Printf(format, args...)
}
func (c *rtspClient) run(wg sync.WaitGroup) {
defer wg.Done()
defer c.log("disconnected")
defer func() {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
c.close()
}()
c.log("connected")
for {
req, err := c.rconn.ReadRequest()
if err != nil {
if err != io.EOF {
c.log("ERR: %s", err)
}
return
}
c.log(req.Method)
cseq, ok := req.Headers["CSeq"]
if !ok {
c.log("ERR: cseq missing")
return
}
ur, err := url.Parse(req.Path)
if err != nil {
c.log("ERR: unable to parse path '%s'", req.Path)
return
}
switch req.Method {
case "OPTIONS":
// do not check state, since OPTIONS can be requested
// in any state
err = c.rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Public": strings.Join([]string{
"DESCRIBE",
"SETUP",
"PLAY",
"PAUSE",
"TEARDOWN",
}, ", "),
},
})
if err != nil {
c.log("ERR: %s", err)
return
}
case "DESCRIBE":
if c.state != "STARTING" {
c.log("ERR: client is in state '%s'", c.state)
return
}
sdp, err := func() ([]byte, error) {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
if len(c.p.streamSdp) == 0 {
return nil, fmt.Errorf("no one is streaming")
}
return c.p.streamSdp, nil
}()
if err != nil {
c.log("ERR: %s", err)
return
}
err = c.rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Content-Base": ur.String(),
"Content-Type": "application/sdp",
},
Content: sdp,
})
if err != nil {
c.log("ERR: %s", err)
return
}
case "SETUP":
transport, ok := req.Headers["Transport"]
if !ok {
c.log("ERR: transport header missing")
return
}
transports := strings.Split(transport, ";")
ok = func() bool {
for _, t := range transports {
if t == "unicast" {
return true
}
}
return false
}()
if !ok {
c.log("ERR: transport header does not contain unicast")
return
}
clientPort1, clientPort2 := func() (int, int) {
for _, t := range transports {
if !strings.HasPrefix(t, "client_port=") {
continue
}
t = t[len("client_port="):]
ports := strings.Split(t, "-")
if len(ports) != 2 {
return 0, 0
}
port1, err := strconv.ParseInt(ports[0], 10, 64)
if err != nil {
return 0, 0
}
port2, err := strconv.ParseInt(ports[1], 10, 64)
if err != nil {
return 0, 0
}
return int(port1), int(port2)
}
return 0, 0
}()
if clientPort1 == 0 || clientPort2 == 0 {
c.log("ERR: transport header does not have valid client ports (%s)", transport)
return
}
switch c.state {
// play
case "STARTING":
ok = func() bool {
for _, t := range transports {
if t == "RTP/AVP" {
return true
}
}
return false
}()
if !ok {
c.log("ERR: transport header does not contain RTP/AVP")
return
}
err = c.rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP",
"unicast",
fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
"ssrc=1234ABCD",
}, ";"),
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
return
}
c.p.mutex.Lock()
c.rtpPort = clientPort1
c.rtcpPort = clientPort2
c.state = "PRE_PLAY"
c.p.mutex.Unlock()
// record
case "ANNOUNCE":
ok = func() bool {
for _, t := range transports {
if t == "RTP/AVP/UDP" {
return true
}
}
return false
}()
if !ok {
c.log("ERR: transport header does not contain RTP/AVP/UDP")
return
}
ok = func() bool {
for _, t := range transports {
if t == "mode=record" {
return true
}
}
return false
}()
if !ok {
c.log("ERR: transport header does not contain mode=record")
return
}
err = c.rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
"RTP/AVP",
"unicast",
fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
"ssrc=1234ABCD",
}, ";"),
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
return
}
c.p.mutex.Lock()
ipstr, _, _ := net.SplitHostPort(c.rconn.RemoteAddr().String())
c.IP = net.ParseIP(ipstr)
c.rtpPort = clientPort1
c.rtcpPort = clientPort2
c.state = "PRE_RECORD"
c.p.mutex.Unlock()
default:
c.log("ERR: client is in state '%s'", c.state)
return
}
case "PLAY":
if c.state != "PRE_PLAY" {
c.log("ERR: client is in state '%s'", c.state)
return
}
err = c.rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
return
}
c.p.mutex.Lock()
c.state = "PLAY"
c.p.mutex.Unlock()
case "RECORD":
if c.state != "PRE_RECORD" {
c.log("ERR: client is in state '%s'", c.state)
return
}
err = c.rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Session": "12345678",
},
})
if err != nil {
c.log("ERR: %s", err)
return
}
c.p.mutex.Lock()
c.state = "RECORD"
c.p.mutex.Unlock()
case "ANNOUNCE":
if c.state != "STARTING" {
c.log("ERR: client is in state '%s'", c.state)
return
}
ct, ok := req.Headers["Content-Type"]
if !ok {
c.log("ERR: Content-Type header missing")
return
}
if ct != "application/sdp" {
c.log("ERR: unsupported Content-Type '%s'", ct)
return
}
err := func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
if c.p.streamAuthor != nil {
return fmt.Errorf("another client is already streaming")
}
c.p.streamAuthor = c
c.p.streamSdp = req.Content
return nil
}()
if err != nil {
c.log("ERR: %s", err)
return
}
err = c.rconn.WriteResponse(&rtsp.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
},
})
if err != nil {
c.log("ERR: %s", err)
return
}
c.p.mutex.Lock()
c.state = "ANNOUNCE"
c.p.mutex.Unlock()
case "TEARDOWN":
return
default:
c.log("ERR: method %s unhandled", req.Method)
}
}
}

51
rtsp_listener.go Normal file
View File

@ -0,0 +1,51 @@
package main
import (
"log"
"net"
"sync"
"rtsp-server/rtsp"
)
type rtspListener struct {
p *program
netl *net.TCPListener
}
func newRtspListener(p *program) (*rtspListener, error) {
netl, err := net.ListenTCP("tcp", &net.TCPAddr{
Port: p.rtspPort,
})
if err != nil {
return nil, err
}
s := &rtspListener{
p: p,
netl: netl,
}
s.log("opened on :%d", p.rtspPort)
return s, nil
}
func (l *rtspListener) log(format string, args ...interface{}) {
log.Printf("[RTSP listener] "+format, args...)
}
func (l *rtspListener) run(wg sync.WaitGroup) {
defer wg.Done()
for {
conn, err := l.netl.AcceptTCP()
if err != nil {
break
}
rconn := rtsp.NewConn(conn)
rsc := newRtspClient(l.p, rconn)
wg.Add(1)
go rsc.run(wg)
}
}

53
udp_listener.go Normal file
View File

@ -0,0 +1,53 @@
package main
import (
"log"
"net"
"sync"
)
type udpListenerCb func(*udpListener, []byte)
type udpListener struct {
nconn *net.UDPConn
logPrefix string
cb udpListenerCb
}
func newUdpListener(port int, logPrefix string, cb udpListenerCb) (*udpListener, error) {
nconn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: port,
})
if err != nil {
return nil, err
}
l := &udpListener{
nconn: nconn,
logPrefix: logPrefix,
cb: cb,
}
l.log("opened on :%d", port)
return l, nil
}
func (l *udpListener) log(format string, args ...interface{}) {
log.Printf("["+l.logPrefix+" listener] "+format, args...)
}
func (l *udpListener) run(wg sync.WaitGroup) {
defer wg.Done()
buf := make([]byte, 2048) // UDP MTU is 1400
for {
n, _, err := l.nconn.ReadFromUDP(buf)
if err != nil {
l.log("ERR: %s", err)
break
}
l.cb(l, buf[:n])
}
}