update gohlslib (#1684)

This commit is contained in:
Alessandro Ros 2023-04-11 22:01:41 +02:00 committed by GitHub
parent 9571afd715
commit f3a728b918
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 206 additions and 103 deletions

2
go.mod
View File

@ -6,7 +6,7 @@ require (
code.cloudfoundry.org/bytefmt v0.0.0
github.com/alecthomas/kong v0.7.1
github.com/asticode/go-astits v1.11.0
github.com/bluenviron/gohlslib v0.1.1
github.com/bluenviron/gohlslib v0.2.0
github.com/bluenviron/gortsplib/v3 v3.2.0
github.com/bluenviron/mediacommon v0.2.0
github.com/fsnotify/fsnotify v1.6.0

4
go.sum
View File

@ -10,8 +10,8 @@ github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflx
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2ZWAng=
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/gohlslib v0.1.1 h1:rUwMsILN8Wkxx9NxZYKx3CdDZIIuMpfVNe7IXLxe26A=
github.com/bluenviron/gohlslib v0.1.1/go.mod h1:jPRjSDyELabTOcVXkI/H3U3/sCMQZzpBL+8+GAlquPE=
github.com/bluenviron/gohlslib v0.2.0 h1:EqUyOGyPBhwM10IfkZ1saG4TYK0+b90J3KKzsc9HzUQ=
github.com/bluenviron/gohlslib v0.2.0/go.mod h1:jPRjSDyELabTOcVXkI/H3U3/sCMQZzpBL+8+GAlquPE=
github.com/bluenviron/gortsplib/v3 v3.2.0 h1:Hnc29HzpU2RiX6WOrFHYHB6FuMh7AzOO9D0CwiA8wyI=
github.com/bluenviron/gortsplib/v3 v3.2.0/go.mod h1:gAN1zD0tywu09WKNdHfXWU17VBJUVchXnhc4s+2H9Sc=
github.com/bluenviron/mediacommon v0.2.0 h1:XEuIr8FA5bfzjsQhrITd6ILgN9JCl0e0Cu8IVFEp5Hk=

View File

@ -37,16 +37,22 @@ const (
//go:embed hls_index.html
var hlsIndex []byte
type hlsMuxerResponse struct {
muxer *hlsMuxer
cb func() *gohlslib.MuxerFileResponse
type responseWriterWithCounter struct {
http.ResponseWriter
bytesSent *uint64
}
func (w *responseWriterWithCounter) Write(p []byte) (int, error) {
n, err := w.ResponseWriter.Write(p)
atomic.AddUint64(w.bytesSent, uint64(n))
return n, err
}
type hlsMuxerRequest struct {
path string
file string
ctx *gin.Context
res chan *hlsMuxerResponse
path string
file string
clientIP string
res chan *hlsMuxer
}
type hlsMuxerPathManager interface {
@ -200,10 +206,7 @@ func (m *hlsMuxer) run() {
req.res <- nil
case isReady:
req.res <- &hlsMuxerResponse{
muxer: m,
cb: m.handleRequest(req),
}
req.res <- m
default:
m.requests = append(m.requests, req)
@ -220,10 +223,7 @@ func (m *hlsMuxer) run() {
case <-innerReady:
isReady = true
for _, req := range m.requests {
req.res <- &hlsMuxerResponse{
muxer: m,
cb: m.handleRequest(req),
}
req.res <- m
}
m.requests = nil
@ -546,52 +546,33 @@ func (m *hlsMuxer) runWriter() error {
}
}
func (m *hlsMuxer) handleRequest(req *hlsMuxerRequest) func() *gohlslib.MuxerFileResponse {
func (m *hlsMuxer) handleRequest(ctx *gin.Context) {
atomic.StoreInt64(m.lastRequestTime, time.Now().UnixNano())
err := m.authenticate(req.ctx)
w := &responseWriterWithCounter{
ResponseWriter: ctx.Writer,
bytesSent: m.bytesSent,
}
err := m.authenticate(ctx)
if err != nil {
if terr, ok := err.(pathErrAuthCritical); ok {
m.log(logger.Info, "authentication error: %s", terr.message)
return func() *gohlslib.MuxerFileResponse {
return &gohlslib.MuxerFileResponse{
Status: http.StatusUnauthorized,
Header: map[string]string{
"WWW-Authenticate": `Basic realm="mediamtx"`,
},
}
}
}
return func() *gohlslib.MuxerFileResponse {
return &gohlslib.MuxerFileResponse{
Status: http.StatusUnauthorized,
Header: map[string]string{
"WWW-Authenticate": `Basic realm="mediamtx"`,
},
}
}
ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`)
w.WriteHeader(http.StatusUnauthorized)
return
}
if req.file == "" {
return func() *gohlslib.MuxerFileResponse {
return &gohlslib.MuxerFileResponse{
Status: http.StatusOK,
Header: map[string]string{
"Content-Type": `text/html`,
},
Body: io.NopCloser(bytes.NewReader(hlsIndex)),
}
}
if ctx.Request.URL.Path == "" {
ctx.Header("Content-Type", `text/html`)
w.WriteHeader(http.StatusOK)
io.Copy(w, bytes.NewReader(hlsIndex))
return
}
return func() *gohlslib.MuxerFileResponse {
return m.muxer.File(
req.file,
req.ctx.Query("_HLS_msn"),
req.ctx.Query("_HLS_part"),
req.ctx.Query("_HLS_skip"))
}
m.muxer.Handle(w, ctx.Request)
}
func (m *hlsMuxer) authenticate(ctx *gin.Context) error {
@ -651,10 +632,6 @@ func (m *hlsMuxer) authenticate(ctx *gin.Context) error {
return nil
}
func (m *hlsMuxer) addSentBytes(n uint64) {
atomic.AddUint64(m.bytesSent, n)
}
// processRequest is called by hlsserver.Server (forwarded from ServeHTTP).
func (m *hlsMuxer) processRequest(req *hlsMuxerRequest) {
select {

View File

@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"log"
"net"
"net/http"
@ -224,7 +223,7 @@ outer:
req.res <- nil
default:
r := s.createMuxer(req.path, req.ctx.ClientIP())
r := s.createMuxer(req.path, req.clientIP)
r.processRequest(req)
}
@ -310,29 +309,18 @@ func (s *hlsServer) onRequest(ctx *gin.Context) {
dir = strings.TrimSuffix(dir, "/")
hreq := &hlsMuxerRequest{
path: dir,
file: fname,
ctx: ctx,
res: make(chan *hlsMuxerResponse),
path: dir,
file: fname,
clientIP: ctx.ClientIP(),
res: make(chan *hlsMuxer),
}
select {
case s.request <- hreq:
res1 := <-hreq.res
if res1 != nil {
res := res1.cb()
for k, v := range res.Header {
ctx.Writer.Header().Set(k, v)
}
ctx.Writer.WriteHeader(res.Status)
if res.Body != nil {
defer res.Body.Close()
n, _ := io.Copy(ctx.Writer, res.Body)
res1.muxer.addSentBytes(uint64(n))
}
muxer := <-hreq.res
if muxer != nil {
ctx.Request.URL.Path = fname
muxer.handleRequest(ctx)
}
case <-s.ctx.Done():

View File

@ -3,11 +3,18 @@ package core
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"testing"
"time"
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/gin-gonic/gin"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
@ -79,6 +86,20 @@ func (ts *testHTTPAuthenticator) onAuth(ctx *gin.Context) {
}
}
func httpPullFile(u string) ([]byte, error) {
res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
return io.ReadAll(res.Body)
}
func TestHLSServerNotFound(t *testing.T) {
p, ok := newInstance("")
require.Equal(t, true, ok)
@ -92,3 +113,111 @@ func TestHLSServerNotFound(t *testing.T) {
defer res.Body.Close()
require.Equal(t, http.StatusNotFound, res.StatusCode)
}
func TestHLSServer(t *testing.T) {
p, ok := newInstance("hlsAlwaysRemux: yes\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
medi := &media.Media{
Type: media.TypeVideo,
Formats: []formats.Format{&formats.H264{
PayloadTyp: 96,
PacketizationMode: 1,
SPS: []byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x08, 0x06, 0x07, 0x08},
}},
}
v := gortsplib.TransportTCP
source := gortsplib.Client{
Transport: &v,
}
err := source.StartRecording("rtsp://localhost:8554/stream", media.Medias{medi})
require.NoError(t, err)
defer source.Close()
time.Sleep(500 * time.Millisecond)
for i := 0; i < 2; i++ {
source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123 + uint16(i),
Timestamp: 45343 + uint32(i*90000),
SSRC: 563423,
},
Payload: []byte{
0x05, 0x02, 0x03, 0x04, // IDR
},
})
}
cnt, err := httpPullFile("http://localhost:8888/stream/index.m3u8")
require.NoError(t, err)
require.Equal(t, "#EXTM3U\n"+
"#EXT-X-VERSION:9\n"+
"#EXT-X-INDEPENDENT-SEGMENTS\n"+
"\n"+
"#EXT-X-STREAM-INF:BANDWIDTH=1256,AVERAGE-BANDWIDTH=1256,"+
"CODECS=\"avc1.42c028\",RESOLUTION=1920x1080,FRAME-RATE=30.000\n"+
"stream.m3u8\n", string(cnt))
cnt, err = httpPullFile("http://localhost:8888/stream/stream.m3u8")
require.NoError(t, err)
require.Regexp(t, "#EXTM3U\n"+
"#EXT-X-VERSION:9\n"+
"#EXT-X-TARGETDURATION:1\n"+
"#EXT-X-SERVER-CONTROL:CAN-BLOCK-RELOAD=YES,PART-HOLD-BACK=2.50000,CAN-SKIP-UNTIL=6\n"+
"#EXT-X-PART-INF:PART-TARGET=1\n"+
"#EXT-X-MEDIA-SEQUENCE:1\n"+
"#EXT-X-MAP:URI=\"init.mp4\"\n"+
"#EXT-X-GAP\n"+
"#EXTINF:1\\.00000,\n"+
"gap.mp4\n"+
"#EXT-X-GAP\n"+
"#EXTINF:1\\.00000,\n"+
"gap.mp4\n"+
"#EXT-X-GAP\n"+
"#EXTINF:1\\.00000,\n"+
"gap.mp4\n"+
"#EXT-X-GAP\n"+
"#EXTINF:1\\.00000,\n"+
"gap.mp4\n"+
"#EXT-X-GAP\n"+
"#EXTINF:1\\.00000,\n"+
"gap.mp4\n"+
"#EXT-X-GAP\n"+
"#EXTINF:1\\.00000,\n"+
"gap.mp4\n"+
"#EXT-X-PROGRAM-DATE-TIME:.+?Z\n"+
"#EXT-X-PART:DURATION=1\\.00000,URI=\"part0.mp4\",INDEPENDENT=YES\n"+
"#EXTINF:1\\.00000,\n"+
"seg7.mp4\n"+
"#EXT-X-PRELOAD-HINT:TYPE=PART,URI=\"part1.mp4\"\n", string(cnt))
/*trak := <-c.track
pkt, _, err := trak.ReadRTP()
require.NoError(t, err)
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 102,
SequenceNumber: pkt.SequenceNumber,
Timestamp: pkt.Timestamp,
SSRC: pkt.SSRC,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, pkt)*/
}

View File

@ -2,6 +2,12 @@ package core
import (
"context"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"fmt"
"net/http"
"strings"
"time"
"github.com/bluenviron/gohlslib"
@ -46,9 +52,33 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
}()
var tlsConfig *tls.Config
if cnf.SourceFingerprint != "" {
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
VerifyConnection: func(cs tls.ConnectionState) error {
h := sha256.New()
h.Write(cs.PeerCertificates[0].Raw)
hstr := hex.EncodeToString(h.Sum(nil))
fingerprintLower := strings.ToLower(cnf.SourceFingerprint)
if hstr != fingerprintLower {
return fmt.Errorf("server fingerprint do not match: expected %s, got %s",
fingerprintLower, hstr)
}
return nil
},
}
}
c := &gohlslib.Client{
URI: cnf.Source,
Fingerprint: cnf.SourceFingerprint,
URI: cnf.Source,
HTTPClient: &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
},
Log: func(level gohlslib.LogLevel, format string, args ...interface{}) {
s.Log(logger.Level(level), format, args...)
},

View File

@ -2,8 +2,6 @@ package core
import (
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"net/url"
@ -18,25 +16,6 @@ import (
"github.com/aler9/mediamtx/internal/rtmp"
)
func pullMetrics() ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, "http://localhost:9998/metrics", nil)
if err != nil {
return nil, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
return io.ReadAll(res.Body)
}
func TestMetrics(t *testing.T) {
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
@ -57,7 +36,7 @@ func TestMetrics(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
bo, err := pullMetrics()
bo, err := httpPullFile("http://localhost:9998/metrics")
require.NoError(t, err)
require.Equal(t, `paths 0
@ -129,7 +108,7 @@ webrtc_conns_bytes_sent 0
require.Equal(t, 200, res.StatusCode)
}()
bo, err = pullMetrics()
bo, err = httpPullFile("http://localhost:9998/metrics")
require.NoError(t, err)
require.Regexp(t,