implement reading with HLS

This commit is contained in:
aler9 2021-04-11 19:05:08 +02:00
parent ea6b616759
commit dee045f961
21 changed files with 1513 additions and 79 deletions

3
go.mod
View File

@ -6,6 +6,7 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210417174800-805d578b6c63
github.com/asticode/go-astits v0.0.0-00010101000000-000000000000
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
@ -18,3 +19,5 @@ require (
)
replace github.com/notedit/rtmp => github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927
replace github.com/asticode/go-astits => github.com/aler9/go-astits v0.0.0-20210405093936-0a65b1259eb7

5
go.sum
View File

@ -2,10 +2,14 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/go-astits v0.0.0-20210405093936-0a65b1259eb7 h1:x55BTS9tsWtmMbHZxR7MoMfLJSdUG7rzshMS0Y7TZg0=
github.com/aler9/go-astits v0.0.0-20210405093936-0a65b1259eb7/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ=
github.com/aler9/gortsplib v0.0.0-20210417174800-805d578b6c63 h1:Ex7N/9GT4SQNHS49iJdQV8SiYJ7BXux3GCG3/z2KC6I=
github.com/aler9/gortsplib v0.0.0-20210417174800-805d578b6c63/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=
github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@ -28,6 +32,7 @@ github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g=
github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

191
internal/aac/adts.go Normal file
View File

@ -0,0 +1,191 @@
package aac
import (
"fmt"
)
// ADTSPacket is an ADTS packet
type ADTSPacket struct {
SampleRate int
ChannelCount int
Frame []byte
}
// DecodeADTS decodes an ADTS stream into ADTS packets.
func DecodeADTS(byts []byte) ([]*ADTSPacket, error) {
// refs: https://wiki.multimedia.cx/index.php/ADTS
var ret []*ADTSPacket
for len(byts) > 0 {
syncWord := (uint16(byts[0]) << 4) | (uint16(byts[1]) >> 4)
if syncWord != 0xfff {
return nil, fmt.Errorf("invalid syncword")
}
protectionAbsent := byts[1] & 0x01
if protectionAbsent != 1 {
return nil, fmt.Errorf("ADTS with CRC is not supported")
}
pkt := &ADTSPacket{}
profile := (byts[2] >> 6)
if profile != 0 {
return nil, fmt.Errorf("only AAC-LC is supported")
}
sampleRateIndex := (byts[2] >> 2) & 0x0F
switch sampleRateIndex {
case 0:
pkt.SampleRate = 96000
case 1:
pkt.SampleRate = 88200
case 2:
pkt.SampleRate = 64000
case 3:
pkt.SampleRate = 48000
case 4:
pkt.SampleRate = 44100
case 5:
pkt.SampleRate = 32000
case 6:
pkt.SampleRate = 24000
case 7:
pkt.SampleRate = 22050
case 8:
pkt.SampleRate = 16000
case 9:
pkt.SampleRate = 12000
case 10:
pkt.SampleRate = 11025
case 11:
pkt.SampleRate = 8000
case 12:
pkt.SampleRate = 7350
default:
return nil, fmt.Errorf("invalid sample rate index: %d", sampleRateIndex)
}
channelConfig := ((byts[2] & 0x01) << 2) | ((byts[3] >> 6) & 0x03)
switch channelConfig {
case 1:
pkt.ChannelCount = 1
case 2:
pkt.ChannelCount = 2
case 3:
pkt.ChannelCount = 3
case 4:
pkt.ChannelCount = 4
case 5:
pkt.ChannelCount = 5
case 6:
pkt.ChannelCount = 6
case 7:
pkt.ChannelCount = 8
default:
return nil, fmt.Errorf("invalid channel configuration: %d", channelConfig)
}
frameLen := int(((uint16(byts[3])&0x03)<<11)|
(uint16(byts[4])<<3)|
((uint16(byts[5])>>5)&0x07)) - 7
fullness := ((uint16(byts[5]) & 0x1F) << 6) | ((uint16(byts[6]) >> 2) & 0x3F)
if fullness != 1800 {
return nil, fmt.Errorf("fullness not supported")
}
frameCount := byts[6] & 0x03
if frameCount != 0 {
return nil, fmt.Errorf("multiple frame count not supported")
}
if len(byts[7:]) < frameLen {
return nil, fmt.Errorf("invalid frame length")
}
pkt.Frame = byts[7 : 7+frameLen]
byts = byts[7+frameLen:]
ret = append(ret, pkt)
}
return ret, nil
}
// EncodeADTS encodes ADTS packets into an ADTS stream.
func EncodeADTS(pkts []*ADTSPacket) ([]byte, error) {
var ret []byte
for _, pkt := range pkts {
frameLen := len(pkt.Frame) + 7
fullness := 1800
var channelConf uint8
switch pkt.ChannelCount {
case 1:
channelConf = 1
case 2:
channelConf = 2
case 3:
channelConf = 3
case 4:
channelConf = 4
case 5:
channelConf = 5
case 6:
channelConf = 6
case 8:
channelConf = 7
default:
return nil, fmt.Errorf("invalid channel count: %v", pkt.ChannelCount)
}
var sampleRateIndex uint8
switch pkt.SampleRate {
case 96000:
sampleRateIndex = 0
case 88200:
sampleRateIndex = 1
case 64000:
sampleRateIndex = 2
case 48000:
sampleRateIndex = 3
case 44100:
sampleRateIndex = 4
case 32000:
sampleRateIndex = 5
case 24000:
sampleRateIndex = 6
case 22050:
sampleRateIndex = 7
case 16000:
sampleRateIndex = 8
case 12000:
sampleRateIndex = 9
case 11025:
sampleRateIndex = 10
case 8000:
sampleRateIndex = 11
case 7350:
sampleRateIndex = 12
default:
return nil, fmt.Errorf("invalid sample rate: %v", pkt.SampleRate)
}
header := make([]byte, 7)
header[0] = 0xFF
header[1] = 0xF1
header[2] = (sampleRateIndex << 2) | ((channelConf >> 2) & 0x01)
header[3] = (channelConf&0x03)<<6 | uint8((frameLen>>11)&0x03)
header[4] = uint8((frameLen >> 3) & 0xFF)
header[5] = uint8((frameLen&0x07)<<5 | ((fullness >> 6) & 0x1F))
header[6] = uint8((fullness & 0x3F) << 2)
ret = append(ret, header...)
ret = append(ret, pkt.Frame...)
}
return ret, nil
}

61
internal/aac/adts_test.go Normal file
View File

@ -0,0 +1,61 @@
package aac
import (
"testing"
"github.com/stretchr/testify/require"
)
var casesADTS = []struct {
name string
byts []byte
pkts []*ADTSPacket
}{
{
"single",
[]byte{0xff, 0xf1, 0xc, 0x80, 0x1, 0x3c, 0x20, 0xaa, 0xbb},
[]*ADTSPacket{
{
SampleRate: 48000,
ChannelCount: 2,
Frame: []byte{0xaa, 0xbb},
},
},
},
{
"multiple",
[]byte{0xff, 0xf1, 0x10, 0x40, 0x1, 0x3c, 0x20, 0xaa, 0xbb, 0xff, 0xf1, 0xc, 0x80, 0x1, 0x3c, 0x20, 0xcc, 0xdd},
[]*ADTSPacket{
{
SampleRate: 44100,
ChannelCount: 1,
Frame: []byte{0xaa, 0xbb},
},
{
SampleRate: 48000,
ChannelCount: 2,
Frame: []byte{0xcc, 0xdd},
},
},
},
}
func TestDecodeADTS(t *testing.T) {
for _, ca := range casesADTS {
t.Run(ca.name, func(t *testing.T) {
pkts, err := DecodeADTS(ca.byts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeADTS(t *testing.T) {
for _, ca := range casesADTS {
t.Run(ca.name, func(t *testing.T) {
byts, err := EncodeADTS(ca.pkts)
require.NoError(t, err)
require.Equal(t, ca.byts, byts)
})
}
}

View File

@ -0,0 +1,600 @@
package clienthls
import (
"bytes"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtpaac"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/h264"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/serverhls"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
const (
// an offset is needed to
// - avoid negative PTS values
// - avoid PTS < DTS during startup
ptsOffset = 2 * time.Second
segmentMinAUCount = 100
closeCheckPeriod = 1 * time.Second
closeAfterInactivity = 60 * time.Second
)
const index = `<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
#video {
width: 600px;
height: 600px;
background: black;
}
</style>
</head>
<body>
<script src="https://cdn.jsdelivr.net/npm/hls.js@1.0.0"></script>
<video id="video" muted controls></video>
<script>
const create = () => {
const video = document.getElementById('video');
const hls = new Hls({
progressive: false,
});
hls.on(Hls.Events.ERROR, (evt, data) => {
if (data.fatal) {
hls.destroy();
setTimeout(() => {
create();
}, 2000);
}
});
hls.loadSource('stream.m3u8');
hls.attachMedia(video);
video.play();
}
create();
</script>
</body>
</html>
`
func ipEqualOrInRange(ip net.IP, ips []interface{}) bool {
for _, item := range ips {
switch titem := item.(type) {
case net.IP:
if titem.Equal(ip) {
return true
}
case *net.IPNet:
if titem.Contains(ip) {
return true
}
}
}
return false
}
type trackIDPayloadPair struct {
trackID int
buf []byte
}
// PathMan is implemented by pathman.PathMan.
type PathMan interface {
OnClientSetupPlay(client.SetupPlayReq)
}
// Parent is implemented by clientman.ClientMan.
type Parent interface {
Log(logger.Level, string, ...interface{})
OnClientClose(client.Client)
}
// Client is a HLS client.
type Client struct {
hlsSegmentCount int
hlsSegmentDuration time.Duration
readBufferCount int
wg *sync.WaitGroup
stats *stats.Stats
pathName string
pathMan PathMan
parent Parent
path client.Path
ringBuffer *ringbuffer.RingBuffer
tsQueue []*tsFile
tsByName map[string]*tsFile
tsDeleteCount int
tsMutex sync.Mutex
lastRequestTime int64
// in
request chan serverhls.Request
terminate chan struct{}
}
// New allocates a Client.
func New(
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
readBufferCount int,
wg *sync.WaitGroup,
stats *stats.Stats,
pathName string,
pathMan PathMan,
parent Parent) *Client {
c := &Client{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
readBufferCount: readBufferCount,
wg: wg,
stats: stats,
pathName: pathName,
pathMan: pathMan,
parent: parent,
lastRequestTime: time.Now().Unix(),
tsByName: make(map[string]*tsFile),
request: make(chan serverhls.Request),
terminate: make(chan struct{}),
}
atomic.AddInt64(c.stats.CountClients, 1)
c.log(logger.Info, "connected (HLS)")
c.wg.Add(1)
go c.run()
return c
}
// Close closes a Client.
func (c *Client) Close() {
atomic.AddInt64(c.stats.CountClients, -1)
close(c.terminate)
}
// IsClient implements client.Client.
func (c *Client) IsClient() {}
// IsSource implements path.source.
func (c *Client) IsSource() {}
func (c *Client) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[client hls/%s] "+format, append([]interface{}{c.pathName}, args...)...)
}
// PathName returns the path name of the client.
func (c *Client) PathName() string {
return c.pathName
}
func (c *Client) run() {
defer c.wg.Done()
defer c.log(logger.Info, "disconnected")
var videoTrack *gortsplib.Track
var h264SPS []byte
var h264PPS []byte
var h264Decoder *rtph264.Decoder
var audioTrack *gortsplib.Track
var aacConfig rtpaac.MPEG4AudioConfig
var aacDecoder *rtpaac.Decoder
err := func() error {
pres := make(chan client.SetupPlayRes)
c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, c.pathName, nil, pres}) //nolint:govet
res := <-pres
if res.Err != nil {
return res.Err
}
c.path = res.Path
for i, t := range res.Tracks {
if t.IsH264() {
if videoTrack != nil {
return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1)
}
videoTrack = t
var err error
h264SPS, h264PPS, err = t.ExtractDataH264()
if err != nil {
return err
}
h264Decoder = rtph264.NewDecoder()
} else if t.IsAAC() {
if audioTrack != nil {
return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1)
}
audioTrack = t
byts, err := t.ExtractDataAAC()
if err != nil {
return err
}
err = aacConfig.Decode(byts)
if err != nil {
return err
}
aacDecoder = rtpaac.NewDecoder(aacConfig.SampleRate)
}
}
if videoTrack == nil && audioTrack == nil {
return fmt.Errorf("unable to find a video or audio track")
}
return nil
}()
if err != nil {
c.log(logger.Info, "ERR: %s", err)
go func() {
for req := range c.request {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
}
}()
if c.path != nil {
res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
}
c.parent.OnClientClose(c)
<-c.terminate
close(c.request)
return
}
curTSFile := newTSFile(videoTrack, audioTrack)
c.tsByName[curTSFile.Name()] = curTSFile
c.tsQueue = append(c.tsQueue, curTSFile)
defer func() {
curTSFile.Close()
}()
requestDone := make(chan struct{})
go c.runRequestHandler(requestDone)
defer func() {
close(c.request)
<-requestDone
}()
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
resc := make(chan client.PlayRes)
c.path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is reading from path '%s'", c.pathName)
writerDone := make(chan error)
go func() {
writerDone <- func() error {
startPCR := time.Now()
var videoBuf [][]byte
videoDTSEst := h264.NewDTSEstimator()
audioAUCount := 0
for {
data, ok := c.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
pair := data.(trackIDPayloadPair)
if videoTrack != nil && pair.trackID == videoTrack.ID {
nalus, pts, err := h264Decoder.Decode(pair.buf)
if err != nil {
if err != rtph264.ErrMorePacketsNeeded {
c.log(logger.Warn, "unable to decode video track: %v", err)
}
continue
}
for _, nalu := range nalus {
// remove SPS, PPS, AUD
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter:
continue
}
// add SPS and PPS before IDR
if typ == h264.NALUTypeIDR {
videoBuf = append(videoBuf, h264SPS)
videoBuf = append(videoBuf, h264PPS)
}
videoBuf = append(videoBuf, nalu)
}
// RTP marker means that all the NALUs with the same PTS have been received.
// send them together.
marker := (pair.buf[1] >> 7 & 0x1) > 0
if marker {
isIDR := func() bool {
for _, nalu := range videoBuf {
typ := h264.NALUType(nalu[0] & 0x1F)
if typ == h264.NALUTypeIDR {
return true
}
}
return false
}()
if isIDR {
if curTSFile.firstPacketWritten &&
time.Since(curTSFile.firstPacketWrittenTime) >= c.hlsSegmentDuration {
if curTSFile != nil {
curTSFile.Close()
}
curTSFile = newTSFile(videoTrack, audioTrack)
c.tsMutex.Lock()
c.tsByName[curTSFile.Name()] = curTSFile
c.tsQueue = append(c.tsQueue, curTSFile)
if len(c.tsQueue) > c.hlsSegmentCount {
delete(c.tsByName, c.tsQueue[0].Name())
c.tsQueue = c.tsQueue[1:]
c.tsDeleteCount++
}
c.tsMutex.Unlock()
}
} else {
if !curTSFile.firstPacketWritten {
continue
}
}
curTSFile.SetPCR(time.Since(startPCR))
err := curTSFile.WriteH264(
videoDTSEst.Feed(pts+ptsOffset),
pts+ptsOffset,
isIDR,
videoBuf)
if err != nil {
return err
}
videoBuf = nil
}
} else if audioTrack != nil && pair.trackID == audioTrack.ID {
aus, pts, err := aacDecoder.Decode(pair.buf)
if err != nil {
if err != rtpaac.ErrMorePacketsNeeded {
c.log(logger.Warn, "unable to decode audio track: %v", err)
}
continue
}
if videoTrack == nil {
if curTSFile.firstPacketWritten &&
(time.Since(curTSFile.firstPacketWrittenTime) >= c.hlsSegmentDuration &&
audioAUCount >= segmentMinAUCount) {
if curTSFile != nil {
curTSFile.Close()
}
audioAUCount = 0
curTSFile = newTSFile(videoTrack, audioTrack)
c.tsMutex.Lock()
c.tsByName[curTSFile.Name()] = curTSFile
c.tsQueue = append(c.tsQueue, curTSFile)
if len(c.tsQueue) > c.hlsSegmentCount {
delete(c.tsByName, c.tsQueue[0].Name())
c.tsQueue = c.tsQueue[1:]
c.tsDeleteCount++
}
c.tsMutex.Unlock()
}
} else {
if !curTSFile.firstPacketWritten {
continue
}
}
for i, au := range aus {
auPTS := pts + time.Duration(i)*1000*time.Second/time.Duration(aacConfig.SampleRate)
audioAUCount++
curTSFile.SetPCR(time.Since(startPCR))
err := curTSFile.WriteAAC(
aacConfig.SampleRate,
aacConfig.ChannelCount,
auPTS+ptsOffset,
au)
if err != nil {
return err
}
}
}
}
}()
}()
closeCheckTicker := time.NewTicker(closeCheckPeriod)
defer closeCheckTicker.Stop()
for {
select {
case <-closeCheckTicker.C:
t := time.Unix(atomic.LoadInt64(&c.lastRequestTime), 0)
if time.Since(t) >= closeAfterInactivity {
c.log(logger.Info, "closing due to inactivity")
c.ringBuffer.Close()
<-writerDone
res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
c.parent.OnClientClose(c)
<-c.terminate
return
}
case err := <-writerDone:
c.log(logger.Info, "ERR: %s", err)
res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
c.parent.OnClientClose(c)
<-c.terminate
return
case <-c.terminate:
res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
c.ringBuffer.Close()
<-writerDone
return
}
}
}
func (c *Client) runRequestHandler(done chan struct{}) {
defer close(done)
for preq := range c.request {
req := preq
atomic.StoreInt64(&c.lastRequestTime, time.Now().Unix())
conf := c.path.Conf()
if conf.ReadIpsParsed != nil {
tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr)
ip := net.ParseIP(tmp)
if !ipEqualOrInRange(ip, conf.ReadIpsParsed) {
c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
req.W.WriteHeader(http.StatusUnauthorized)
req.Res <- nil
continue
}
}
if conf.ReadUser != "" {
user, pass, ok := req.Req.BasicAuth()
if !ok || user != conf.ReadUser || pass != conf.ReadPass {
req.W.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`)
req.W.WriteHeader(http.StatusUnauthorized)
req.Res <- nil
continue
}
}
switch {
case req.Subpath == "stream.m3u8":
func() {
c.tsMutex.Lock()
defer c.tsMutex.Unlock()
if len(c.tsQueue) == 0 {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
return
}
cnt := "#EXTM3U\n"
cnt += "#EXT-X-VERSION:3\n"
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
cnt += "#EXT-X-TARGETDURATION:10\n"
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(c.tsDeleteCount), 10) + "\n"
for _, f := range c.tsQueue {
cnt += "#EXTINF:10,\n"
cnt += f.Name() + ".ts\n"
}
req.Res <- bytes.NewReader([]byte(cnt))
}()
case strings.HasSuffix(req.Subpath, ".ts"):
base := strings.TrimSuffix(req.Subpath, ".ts")
c.tsMutex.Lock()
f, ok := c.tsByName[base]
c.tsMutex.Unlock()
if !ok {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
continue
}
req.Res <- f.buf.NewReader()
case req.Subpath == "":
req.Res <- bytes.NewReader([]byte(index))
default:
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
}
}
}
// OnRequest is called by clientman.ClientMan.
func (c *Client) OnRequest(req serverhls.Request) {
c.request <- req
}
// Authenticate performs an authentication.
func (c *Client) Authenticate(authMethods []headers.AuthMethod,
pathName string, ips []interface{},
user string, pass string, req interface{}) error {
return nil
}
// OnFrame implements path.Reader.
func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP {
c.ringBuffer.Push(trackIDPayloadPair{trackID, payload})
}
}

View File

@ -0,0 +1,76 @@
package clienthls
import (
"bytes"
"io"
"sync"
)
type multiAccessBufferReader struct {
m *multiAccessBuffer
readPos int
}
func (r *multiAccessBufferReader) Read(p []byte) (int, error) {
newReadPos := r.readPos + len(p)
curBuf, err := func() ([]byte, error) {
r.m.mutex.Lock()
defer r.m.mutex.Unlock()
if r.m.closed && r.readPos >= r.m.writePos {
return nil, io.EOF
}
for !r.m.closed && newReadPos >= r.m.writePos {
r.m.cond.Wait()
}
return r.m.buf.Bytes(), nil
}()
if err != nil {
return 0, err
}
n := copy(p, curBuf[r.readPos:])
r.readPos += n
return n, nil
}
type multiAccessBuffer struct {
buf bytes.Buffer
closed bool
writePos int
mutex sync.Mutex
cond *sync.Cond
}
func newMultiAccessBuffer() *multiAccessBuffer {
m := &multiAccessBuffer{}
m.cond = sync.NewCond(&m.mutex)
return m
}
func (m *multiAccessBuffer) Close() error {
m.mutex.Lock()
m.closed = true
m.mutex.Unlock()
m.cond.Broadcast()
return nil
}
func (m *multiAccessBuffer) Write(p []byte) (int, error) {
m.mutex.Lock()
n, _ := m.buf.Write(p)
m.writePos += n
m.mutex.Unlock()
m.cond.Broadcast()
return n, nil
}
func (m *multiAccessBuffer) NewReader() *multiAccessBufferReader {
return &multiAccessBufferReader{
m: m,
}
}

View File

@ -0,0 +1,27 @@
package clienthls
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMultiAccessBuffer(t *testing.T) {
m := newMultiAccessBuffer()
m.Write([]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08})
r := m.NewReader()
buf := make([]byte, 4)
n, err := r.Read(buf)
require.NoError(t, err)
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, buf[:n])
m.Close()
buf = make([]byte, 10)
n, err = r.Read(buf)
require.NoError(t, err)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, buf[:n])
}

View File

@ -0,0 +1,160 @@
package clienthls
import (
"context"
"strconv"
"time"
"github.com/aler9/gortsplib"
"github.com/asticode/go-astits"
"github.com/aler9/rtsp-simple-server/internal/aac"
"github.com/aler9/rtsp-simple-server/internal/h264"
)
type tsFile struct {
name string
buf *multiAccessBuffer
mux *astits.Muxer
pcrTrackIsVideo bool
pcr time.Duration
firstPacketWritten bool
firstPacketWrittenTime time.Time
}
func newTSFile(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) *tsFile {
t := &tsFile{
buf: newMultiAccessBuffer(),
name: strconv.FormatInt(time.Now().Unix(), 10),
}
t.mux = astits.NewMuxer(context.Background(), t.buf)
if videoTrack != nil {
t.mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 256,
StreamType: astits.StreamTypeH264Video,
})
}
if audioTrack != nil {
t.mux.AddElementaryStream(astits.PMTElementaryStream{
ElementaryPID: 257,
StreamType: astits.StreamTypeAACAudio,
})
}
if videoTrack != nil {
t.pcrTrackIsVideo = true
t.mux.SetPCRPID(256)
} else {
t.pcrTrackIsVideo = false
t.mux.SetPCRPID(257)
}
return t
}
func (t *tsFile) Close() error {
return t.buf.Close()
}
func (t *tsFile) Name() string {
return t.name
}
func (t *tsFile) FirstPacketWritten() bool {
return t.firstPacketWritten
}
func (t *tsFile) FirstPacketWrittenTime() time.Time {
return t.firstPacketWrittenTime
}
func (t *tsFile) SetPCR(pcr time.Duration) {
t.pcr = pcr
}
func (t *tsFile) WriteH264(dts time.Duration, pts time.Duration, isIDR bool, nalus [][]byte) error {
if !t.firstPacketWritten {
t.firstPacketWritten = true
t.firstPacketWrittenTime = time.Now()
}
enc, err := h264.EncodeAnnexB(nalus)
if err != nil {
return err
}
af := &astits.PacketAdaptationField{
RandomAccessIndicator: isIDR,
}
if t.pcrTrackIsVideo {
af.HasPCR = true
af.PCR = &astits.ClockReference{Base: int64(t.pcr.Seconds() * 90000)}
}
_, err = t.mux.WriteData(&astits.MuxerData{
PID: 256,
AdaptationField: af,
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorBothPresent,
DTS: &astits.ClockReference{Base: int64(dts.Seconds() * 90000)},
PTS: &astits.ClockReference{Base: int64(pts.Seconds() * 90000)},
},
StreamID: 224, // = video
},
Data: enc,
},
})
return err
}
func (t *tsFile) WriteAAC(sampleRate int, channelCount int, pts time.Duration, au []byte) error {
if !t.firstPacketWritten {
t.firstPacketWritten = true
t.firstPacketWrittenTime = time.Now()
}
adtsPkt, err := aac.EncodeADTS([]*aac.ADTSPacket{
{
SampleRate: sampleRate,
ChannelCount: channelCount,
Frame: au,
},
})
if err != nil {
return err
}
af := &astits.PacketAdaptationField{
RandomAccessIndicator: true,
}
if !t.pcrTrackIsVideo {
af.HasPCR = true
af.PCR = &astits.ClockReference{Base: int64(t.pcr.Seconds() * 90000)}
}
_, err = t.mux.WriteData(&astits.MuxerData{
PID: 257,
AdaptationField: af,
PES: &astits.PESData{
Header: &astits.PESHeader{
OptionalHeader: &astits.PESOptionalHeader{
MarkerBits: 2,
PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS,
PTS: &astits.ClockReference{Base: int64(pts.Seconds() * 90000)},
},
PacketLength: uint16(len(adtsPkt) + 8),
StreamID: 192, // = audio
},
Data: adtsPkt,
},
})
return err
}

View File

@ -9,9 +9,11 @@ import (
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/clienthls"
"github.com/aler9/rtsp-simple-server/internal/clientrtmp"
"github.com/aler9/rtsp-simple-server/internal/clientrtsp"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/serverhls"
"github.com/aler9/rtsp-simple-server/internal/serverrtmp"
"github.com/aler9/rtsp-simple-server/internal/serverrtsp"
"github.com/aler9/rtsp-simple-server/internal/stats"
@ -32,6 +34,8 @@ type Parent interface {
// ClientManager is a client manager.
type ClientManager struct {
hlsSegmentCount int
hlsSegmentDuration time.Duration
rtspPort int
readTimeout time.Duration
writeTimeout time.Duration
@ -44,10 +48,12 @@ type ClientManager struct {
serverPlain *serverrtsp.Server
serverTLS *serverrtsp.Server
serverRTMP *serverrtmp.Server
serverHLS *serverhls.Server
parent Parent
clients map[client.Client]struct{}
wg sync.WaitGroup
clients map[client.Client]struct{}
clientsByHLSPath map[string]*clienthls.Client
wg sync.WaitGroup
// in
clientClose chan client.Client
@ -59,6 +65,8 @@ type ClientManager struct {
// New allocates a ClientManager.
func New(
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
rtspPort int,
readTimeout time.Duration,
writeTimeout time.Duration,
@ -71,9 +79,12 @@ func New(
serverPlain *serverrtsp.Server,
serverTLS *serverrtsp.Server,
serverRTMP *serverrtmp.Server,
serverHLS *serverhls.Server,
parent Parent) *ClientManager {
cm := &ClientManager{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
rtspPort: rtspPort,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
@ -86,8 +97,10 @@ func New(
serverPlain: serverPlain,
serverTLS: serverTLS,
serverRTMP: serverRTMP,
serverHLS: serverHLS,
parent: parent,
clients: make(map[client.Client]struct{}),
clientsByHLSPath: make(map[string]*clienthls.Client),
clientClose: make(chan client.Client),
terminate: make(chan struct{}),
done: make(chan struct{}),
@ -133,6 +146,13 @@ func (cm *ClientManager) run() {
return make(chan net.Conn)
}()
hlsRequest := func() chan serverhls.Request {
if cm.serverHLS != nil {
return cm.serverHLS.Request()
}
return make(chan serverhls.Request)
}()
outer:
for {
select {
@ -181,19 +201,34 @@ outer:
cm)
cm.clients[c] = struct{}{}
case req := <-hlsRequest:
c, ok := cm.clientsByHLSPath[req.Path]
if !ok {
c = clienthls.New(
cm.hlsSegmentCount,
cm.hlsSegmentDuration,
cm.readBufferCount,
&cm.wg,
cm.stats,
req.Path,
cm.pathMan,
cm)
cm.clients[c] = struct{}{}
cm.clientsByHLSPath[req.Path] = c
}
c.OnRequest(req)
case c := <-cm.pathMan.ClientClose():
if _, ok := cm.clients[c]; !ok {
continue
}
delete(cm.clients, c)
c.Close()
cm.onClientClose(c)
case c := <-cm.clientClose:
if _, ok := cm.clients[c]; !ok {
continue
}
delete(cm.clients, c)
c.Close()
cm.onClientClose(c)
case <-cm.terminate:
break outer
@ -222,6 +257,14 @@ outer:
close(cm.clientClose)
}
func (cm *ClientManager) onClientClose(c client.Client) {
delete(cm.clients, c)
if hc, ok := c.(*clienthls.Client); ok {
delete(cm.clientsByHLSPath, hc.PathName())
}
c.Close()
}
// OnClientClose is called by a client.
func (cm *ClientManager) OnClientClose(c client.Client) {
cm.clientClose <- c

View File

@ -191,14 +191,18 @@ func (c *Client) run() {
func (c *Client) runRead() {
var path client.Path
var tracks gortsplib.Tracks
var videoTrack *gortsplib.Track
var h264Decoder *rtph264.Decoder
var audioTrack *gortsplib.Track
var audioClockRate int
var aacDecoder *rtpaac.Decoder
err := func() error {
pathName, query := pathNameAndQuery(c.conn.URL())
resc := make(chan client.SetupPlayRes)
c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, pathName, query, resc}) //nolint:govet
res := <-resc
sres := make(chan client.SetupPlayRes)
c.pathMan.OnClientSetupPlay(client.SetupPlayReq{c, pathName, query, sres}) //nolint:govet
res := <-sres
if res.Err != nil {
if _, ok := res.Err.(client.ErrAuthCritical); ok {
@ -212,27 +216,8 @@ func (c *Client) runRead() {
}
path = res.Path
tracks = res.Tracks
return nil
}()
if err != nil {
c.log(logger.Info, "ERR: %s", err)
c.conn.NetConn().Close()
c.parent.OnClientClose(c)
<-c.terminate
return
}
var videoTrack *gortsplib.Track
var h264Decoder *rtph264.Decoder
var audioTrack *gortsplib.Track
var audioClockRate int
var aacDecoder *rtpaac.Decoder
err = func() error {
for i, t := range tracks {
for i, t := range res.Tracks {
if t.IsH264() {
if videoTrack != nil {
return fmt.Errorf("can't read track %d with RTMP: too many tracks", i+1)
@ -259,30 +244,31 @@ func (c *Client) runRead() {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
c.conn.WriteMetadata(videoTrack, audioTrack)
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
resc := make(chan client.PlayRes)
path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is reading from path '%s'", path.Name())
return nil
}()
if err != nil {
c.conn.NetConn().Close()
c.log(logger.Info, "ERR: %v", err)
res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
path = nil
if path != nil {
res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
}
c.parent.OnClientClose(c)
<-c.terminate
return
}
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
pres := make(chan client.PlayRes)
path.OnClientPlay(client.PlayReq{c, pres}) //nolint:govet
<-pres
c.log(logger.Info, "is reading from path '%s'", path.Name())
// disable read deadline
c.conn.NetConn().SetReadDeadline(time.Time{})
@ -381,7 +367,6 @@ func (c *Client) runRead() {
res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
path = nil
c.parent.OnClientClose(c)
<-c.terminate
@ -394,7 +379,6 @@ func (c *Client) runRead() {
c.ringBuffer.Close()
c.conn.NetConn().Close()
<-writerDone
path = nil
}
}

View File

@ -84,6 +84,12 @@ type Conf struct {
RTMPDisable bool `yaml:"rtmpDisable"`
RTMPPort int `yaml:"rtmpPort"`
// hls
HLSDisable bool `yaml:"hlsDisable"`
HLSPort int `yaml:"hlsPort"`
HLSSegmentCount int `yaml:"hlsSegmentCount"`
HLSSegmentDuration time.Duration `yaml:"hlsSegmentDuration"`
// path
Paths map[string]*PathConf `yaml:"paths"`
}
@ -234,6 +240,16 @@ func (conf *Conf) fillAndCheck() error {
conf.RTMPPort = 1935
}
if conf.HLSPort == 0 {
conf.HLSPort = 8888
}
if conf.HLSSegmentCount == 0 {
conf.HLSSegmentCount = 5
}
if conf.HLSSegmentDuration == 0 {
conf.HLSSegmentDuration = 1 * time.Second
}
if len(conf.Paths) == 0 {
conf.Paths = map[string]*PathConf{
"all": {},

View File

@ -80,6 +80,16 @@ type PathConf struct {
DisablePublisherOverride bool `yaml:"disablePublisherOverride"`
Fallback string `yaml:"fallback"`
// authentication
PublishUser string `yaml:"publishUser"`
PublishPass string `yaml:"publishPass"`
PublishIps []string `yaml:"publishIps"`
PublishIpsParsed []interface{} `yaml:"-" json:"-"`
ReadUser string `yaml:"readUser"`
ReadPass string `yaml:"readPass"`
ReadIps []string `yaml:"readIps"`
ReadIpsParsed []interface{} `yaml:"-" json:"-"`
// custom commands
RunOnInit string `yaml:"runOnInit"`
RunOnInitRestart bool `yaml:"runOnInitRestart"`
@ -91,16 +101,6 @@ type PathConf struct {
RunOnPublishRestart bool `yaml:"runOnPublishRestart"`
RunOnRead string `yaml:"runOnRead"`
RunOnReadRestart bool `yaml:"runOnReadRestart"`
// authentication
PublishUser string `yaml:"publishUser"`
PublishPass string `yaml:"publishPass"`
PublishIps []string `yaml:"publishIps"`
PublishIpsParsed []interface{} `yaml:"-" json:"-"`
ReadUser string `yaml:"readUser"`
ReadPass string `yaml:"readPass"`
ReadIps []string `yaml:"readIps"`
ReadIpsParsed []interface{} `yaml:"-" json:"-"`
}
func (pconf *PathConf) fillAndCheck(name string) error {

View File

@ -0,0 +1,120 @@
package serverhls
import (
"context"
"io"
"net"
"net/http"
"strconv"
"strings"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
// Request is an HTTP request received by the HLS server.
type Request struct {
Path string
Subpath string
Req *http.Request
W http.ResponseWriter
Res chan io.Reader
}
// Parent is implemented by program.
type Parent interface {
Log(logger.Level, string, ...interface{})
}
// Server is an HLS server.
type Server struct {
parent Parent
ln net.Listener
s *http.Server
// out
request chan Request
}
// New allocates a Server.
func New(
listenIP string,
port int,
parent Parent,
) (*Server, error) {
address := listenIP + ":" + strconv.FormatInt(int64(port), 10)
ln, err := net.Listen("tcp", address)
if err != nil {
return nil, err
}
s := &Server{
parent: parent,
ln: ln,
request: make(chan Request),
}
s.s = &http.Server{
Handler: s,
}
s.log(logger.Info, "opened on "+address)
go s.s.Serve(s.ln)
return s, nil
}
func (s *Server) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[HLS listener] "+format, append([]interface{}{}, args...)...)
}
// Close closes all the server resources.
func (s *Server) Close() {
go func() {
for req := range s.request {
req.Res <- nil
}
}()
s.s.Shutdown(context.Background())
close(s.request)
}
// ServeHTTP implements http.Handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.log(logger.Info, "%s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
// remove leading prefix
path := r.URL.Path[1:]
if path == "" || path == "favicon.ico" {
w.WriteHeader(http.StatusNotFound)
return
}
parts := strings.SplitN(path, "/", 2)
if len(parts) < 2 {
w.Header().Add("Location", parts[0]+"/")
w.WriteHeader(http.StatusMovedPermanently)
return
}
cres := make(chan io.Reader)
s.request <- Request{
Path: parts[0],
Subpath: parts[1],
Req: r,
W: w,
Res: cres,
}
res := <-cres
if res != nil {
io.Copy(w, res)
}
}
// Request returns a channel to handle incoming HTTP requests.
func (s *Server) Request() chan Request {
return s.request
}

View File

@ -226,13 +226,13 @@ func (s *Source) runInner() bool {
outNALUs = append(outNALUs, nalu)
}
frames, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime)
pkts, err := h264Encoder.Encode(outNALUs, pkt.Time+pkt.CTime)
if err != nil {
return fmt.Errorf("ERR while encoding H264: %v", err)
}
for _, frame := range frames {
onFrame(videoTrack.ID, frame)
for _, pkt := range pkts {
onFrame(videoTrack.ID, pkt)
}
case av.AAC:
@ -240,13 +240,13 @@ func (s *Source) runInner() bool {
return fmt.Errorf("ERR: received an AAC frame, but track is not set up")
}
frames, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime)
pkts, err := aacEncoder.Encode([][]byte{pkt.Data}, pkt.Time+pkt.CTime)
if err != nil {
return fmt.Errorf("ERR while encoding AAC: %v", err)
}
for _, frame := range frames {
onFrame(audioTrack.ID, frame)
for _, pkt := range pkts {
onFrame(audioTrack.ID, pkt)
}
default:

33
main.go
View File

@ -17,6 +17,7 @@ import (
"github.com/aler9/rtsp-simple-server/internal/pathman"
"github.com/aler9/rtsp-simple-server/internal/pprof"
"github.com/aler9/rtsp-simple-server/internal/rlimit"
"github.com/aler9/rtsp-simple-server/internal/serverhls"
"github.com/aler9/rtsp-simple-server/internal/serverrtmp"
"github.com/aler9/rtsp-simple-server/internal/serverrtsp"
"github.com/aler9/rtsp-simple-server/internal/stats"
@ -35,6 +36,7 @@ type program struct {
serverRTSPPlain *serverrtsp.Server
serverRTSPTLS *serverrtsp.Server
serverRTMP *serverrtmp.Server
serverHLS *serverhls.Server
pathMan *pathman.PathManager
clientMan *clientman.ClientManager
confWatcher *confwatcher.ConfWatcher
@ -252,6 +254,18 @@ func (p *program) createResources(initial bool) error {
}
}
if !p.conf.HLSDisable {
if p.serverHLS == nil {
p.serverHLS, err = serverhls.New(
p.conf.ListenIP,
p.conf.HLSPort,
p)
if err != nil {
return err
}
}
}
if p.pathMan == nil {
p.pathMan = pathman.New(
p.conf.RTSPPort,
@ -267,6 +281,8 @@ func (p *program) createResources(initial bool) error {
if p.clientMan == nil {
p.clientMan = clientman.New(
p.conf.HLSSegmentCount,
p.conf.HLSSegmentDuration,
p.conf.RTSPPort,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
@ -279,6 +295,7 @@ func (p *program) createResources(initial bool) error {
p.serverRTSPPlain,
p.serverRTSPTLS,
p.serverRTMP,
p.serverHLS,
p)
}
@ -347,6 +364,14 @@ func (p *program) closeResources(newConf *conf.Conf) {
closeServerRTMP = true
}
closeServerHLS := false
if newConf == nil ||
newConf.HLSDisable != p.conf.HLSDisable ||
newConf.ListenIP != p.conf.ListenIP ||
newConf.HLSPort != p.conf.HLSPort {
closeServerHLS = true
}
closePathMan := false
if newConf == nil ||
newConf.RTSPPort != p.conf.RTSPPort ||
@ -365,7 +390,10 @@ func (p *program) closeResources(newConf *conf.Conf) {
closeServerPlain ||
closeServerTLS ||
closeServerRTMP ||
closeServerHLS ||
closePathMan ||
newConf.HLSSegmentCount != p.conf.HLSSegmentCount ||
newConf.HLSSegmentDuration != p.conf.HLSSegmentDuration ||
newConf.RTSPPort != p.conf.RTSPPort ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
@ -391,6 +419,11 @@ func (p *program) closeResources(newConf *conf.Conf) {
p.pathMan = nil
}
if closeServerHLS && p.serverHLS != nil {
p.serverHLS.Close()
p.serverHLS = nil
}
if closeServerRTMP && p.serverRTMP != nil {
p.serverRTMP.Close()
p.serverRTMP = nil

71
main_clienthls_test.go Normal file
View File

@ -0,0 +1,71 @@
package main
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestClientHLSRead(t *testing.T) {
p, ok := testProgram("")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "rtsp",
"rtsp://" + ownDockerIP + ":8554/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "http://" + ownDockerIP + ":8888/teststream/stream.m3u8",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
func TestClientHLSReadAuth(t *testing.T) {
p, ok := testProgram(
"paths:\n" +
" all:\n" +
" readUser: testuser\n" +
" readPass: testpass\n" +
" readIps: [172.17.0.0/16]\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "rtsp",
"rtsp://" + ownDockerIP + ":8554/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "http://testuser:testpass@" + ownDockerIP + ":8888/teststream/stream.m3u8",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}

View File

@ -16,7 +16,7 @@ func TestClientRTMPPublish(t *testing.T) {
"video",
} {
t.Run(source, func(t *testing.T) {
p, ok := testProgram("")
p, ok := testProgram("hlsDisable: yes\n")
require.Equal(t, true, ok)
defer p.close()
@ -48,7 +48,7 @@ func TestClientRTMPPublish(t *testing.T) {
}
func TestClientRTMPRead(t *testing.T) {
p, ok := testProgram("")
p, ok := testProgram("hlsDisable: yes\n")
require.Equal(t, true, ok)
defer p.close()
@ -79,10 +79,12 @@ func TestClientRTMPRead(t *testing.T) {
func TestClientRTMPAuth(t *testing.T) {
t.Run("publish", func(t *testing.T) {
p, ok := testProgram("rtspDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: testuser\n" +
" publishPass: testpass\n")
" publishPass: testpass\n" +
" readIps: [172.17.0.0/16]\n")
require.Equal(t, true, ok)
defer p.close()
@ -112,10 +114,12 @@ func TestClientRTMPAuth(t *testing.T) {
t.Run("read", func(t *testing.T) {
p, ok := testProgram("rtspDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" readUser: testuser\n" +
" readPass: testpass\n")
" readPass: testpass\n" +
" readIps: [172.17.0.0/16]\n")
require.Equal(t, true, ok)
defer p.close()
@ -147,6 +151,7 @@ func TestClientRTMPAuth(t *testing.T) {
func TestClientRTMPAuthFail(t *testing.T) {
t.Run("publish", func(t *testing.T) {
p, ok := testProgram("rtspDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: testuser2\n" +
@ -180,6 +185,7 @@ func TestClientRTMPAuthFail(t *testing.T) {
t.Run("read", func(t *testing.T) {
p, ok := testProgram("rtspDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" readUser: testuser2\n" +
@ -213,7 +219,7 @@ func TestClientRTMPAuthFail(t *testing.T) {
}
func TestClientRTMPRTPInfo(t *testing.T) {
p, ok := testProgram("")
p, ok := testProgram("hlsDisable: yes\n")
require.Equal(t, true, ok)
defer p.close()

View File

@ -56,6 +56,7 @@ func TestClientRTSPPublishRead(t *testing.T) {
port = "8554"
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"readTimeout: 20s\n")
require.Equal(t, true, ok)
defer p.close()
@ -73,6 +74,7 @@ func TestClientRTSPPublishRead(t *testing.T) {
defer os.Remove(serverKeyFpath)
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"readTimeout: 20s\n" +
"protocols: [tcp]\n" +
"encryption: yes\n" +
@ -152,6 +154,7 @@ func TestClientRTSPPublishRead(t *testing.T) {
func TestClientRTSPAuth(t *testing.T) {
t.Run("publish", func(t *testing.T) {
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: testuser\n" +
@ -192,6 +195,7 @@ func TestClientRTSPAuth(t *testing.T) {
} {
t.Run("read_"+soft, func(t *testing.T) {
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" readUser: testuser\n" +
@ -239,6 +243,7 @@ func TestClientRTSPAuth(t *testing.T) {
t.Run("hashed", func(t *testing.T) {
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" readUser: sha256:rl3rgi4NcZkpAEcacZnQ2VuOfJ0FxAqCRaKB/SwdZoQ=\n" +
@ -294,12 +299,12 @@ func TestClientRTSPAuthFail(t *testing.T) {
},
} {
t.Run(ca.name, func(t *testing.T) {
p, ok := testProgram(
"rtmpDisable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: testuser\n" +
" publishPass: testpass\n")
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: testuser\n" +
" publishPass: testpass\n")
require.Equal(t, true, ok)
defer p.close()
@ -352,12 +357,12 @@ func TestClientRTSPAuthFail(t *testing.T) {
},
} {
t.Run(ca.name, func(t *testing.T) {
p, ok := testProgram(
"rtmpDisable: yes\n" +
"paths:\n" +
" all:\n" +
" readUser: testuser\n" +
" readPass: testpass\n")
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" readUser: testuser\n" +
" readPass: testpass\n")
require.Equal(t, true, ok)
defer p.close()
@ -391,6 +396,7 @@ func TestClientRTSPAuthFail(t *testing.T) {
func TestClientRTSPAuthIpFail(t *testing.T) {
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n" +
" publishIps: [127.0.0.1/32]\n")
@ -417,6 +423,7 @@ func TestClientRTSPAutomaticProtocol(t *testing.T) {
} {
t.Run(source, func(t *testing.T) {
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"protocols: [tcp]\n")
require.Equal(t, true, ok)
defer p.close()
@ -517,6 +524,7 @@ func TestClientRTSPPublisherOverride(t *testing.T) {
func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
t.Run("publish", func(t *testing.T) {
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"readBufferSize: 4500\n")
require.Equal(t, true, ok)
defer p.close()
@ -562,6 +570,7 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
t.Run("proxy", func(t *testing.T) {
p1, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"protocols: [tcp]\n" +
"readBufferSize: 4500\n")
require.Equal(t, true, ok)
@ -584,6 +593,7 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
defer source.Close()
p2, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"protocols: [tcp]\n" +
"readBufferSize: 4500\n" +
"rtspPort: 8555\n" +
@ -621,7 +631,8 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
}
func TestClientRTSPRTPInfo(t *testing.T) {
p, ok := testProgram("rtmpDisable: yes\n")
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n")
require.Equal(t, true, ok)
defer p.close()
@ -727,6 +738,7 @@ func TestClientRTSPRTPInfo(t *testing.T) {
func TestClientRTSPRedirect(t *testing.T) {
p1, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" path1:\n" +
" source: redirect\n" +
@ -775,6 +787,7 @@ func TestClientRTSPFallback(t *testing.T) {
}()
p1, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" path1:\n" +
" fallback: " + val + "\n" +
@ -827,6 +840,7 @@ wait
defer os.Remove(doneFile)
p1, ok := testProgram(fmt.Sprintf("rtmpDisable: yes\n"+
"hlsDisable: yes\n"+
"paths:\n"+
" all:\n"+
" runOnDemand: %s\n"+
@ -868,6 +882,7 @@ wait
defer os.Remove(doneFile)
p1, ok := testProgram(fmt.Sprintf("rtmpDisable: yes\n"+
"hlsDisable: yes\n"+
"paths:\n"+
" all:\n"+
" runOnDemand: %s\n"+
@ -934,6 +949,7 @@ wait
defer os.Remove(doneFile)
p1, ok := testProgram(fmt.Sprintf("rtmpDisable: yes\n"+
"hlsDisable: yes\n"+
"paths:\n"+
" all:\n"+
" runOnDemand: %s\n"+

View File

@ -35,7 +35,8 @@ func TestSourceRTMP(t *testing.T) {
time.Sleep(1 * time.Second)
p, ok := testProgram("paths:\n" +
p, ok := testProgram("hlsDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtmp://" + cnt1.ip() + "/stream/test\n" +
" sourceOnDemand: yes\n")

View File

@ -25,6 +25,7 @@ func TestSourceRTSP(t *testing.T) {
switch source {
case "udp", "tcp":
p1, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"rtspPort: 8555\n" +
"rtpPort: 8100\n" +
"rtcpPort: 8101\n" +
@ -48,6 +49,7 @@ func TestSourceRTSP(t *testing.T) {
defer cnt1.close()
p2, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://testuser:testpass@localhost:8555/teststream\n" +
@ -66,6 +68,7 @@ func TestSourceRTSP(t *testing.T) {
defer os.Remove(serverKeyFpath)
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"rtspPort: 8555\n" +
"rtpPort: 8100\n" +
"rtcpPort: 8101\n" +
@ -95,6 +98,7 @@ func TestSourceRTSP(t *testing.T) {
time.Sleep(1 * time.Second)
p2, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsps://testuser:testpass@localhost:8555/teststream\n" +
@ -282,6 +286,7 @@ func TestSourceRTSPRTPInfo(t *testing.T) {
}()
p1, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://localhost:8555/stream\n" +

View File

@ -81,6 +81,22 @@ rtmpDisable: no
# port of the RTMP listener.
rtmpPort: 1935
###############################################
# HLS options
# disable support for the HLS protocol.
hlsDisable: no
# port of the HLS listener.
hlsPort: 8888
# number of HLS segments to generate.
# increasing segments allows more buffering,
# decreasing segments decrease latency.
hlsSegmentCount: 3
# minimum duration of each segment.
# the real segment duration is also influenced by the interval of IDR frames
# (since the server edit the segment duration to include at least one IDR frame).
hlsSegmentDuration: 1s
###############################################
# Path options