mediamtx/internal/core/hls_server.go
2022-08-31 08:53:19 +02:00

415 lines
9.3 KiB
Go

package core
import (
"context"
"crypto/tls"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httputil"
gopath "path"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/hls"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
type nilWriter struct{}
func (nilWriter) Write(p []byte) (int, error) {
return len(p), nil
}
type hlsServerAPIMuxersListItem struct {
Created time.Time `json:"created"`
LastRequest time.Time `json:"lastRequest"`
}
type hlsServerAPIMuxersListData struct {
Items map[string]hlsServerAPIMuxersListItem `json:"items"`
}
type hlsServerAPIMuxersListRes struct {
data *hlsServerAPIMuxersListData
muxers map[string]*hlsMuxer
err error
}
type hlsServerAPIMuxersListReq struct {
res chan hlsServerAPIMuxersListRes
}
type hlsServerAPIMuxersListSubReq struct {
data *hlsServerAPIMuxersListData
res chan struct{}
}
type hlsServerParent interface {
Log(logger.Level, string, ...interface{})
}
type hlsServer struct {
externalAuthenticationURL string
hlsAlwaysRemux bool
hlsVariant conf.HLSVariant
hlsSegmentCount int
hlsSegmentDuration conf.StringDuration
hlsPartDuration conf.StringDuration
hlsSegmentMaxSize conf.StringSize
hlsAllowOrigin string
hlsTrustedProxies conf.IPsOrCIDRs
readBufferCount int
pathManager *pathManager
metrics *metrics
parent hlsServerParent
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
ln net.Listener
tlsConfig *tls.Config
muxers map[string]*hlsMuxer
// in
chPathSourceReady chan *path
chPathSourceNotReady chan *path
request chan *hlsMuxerRequest
chMuxerClose chan *hlsMuxer
chAPIMuxerList chan hlsServerAPIMuxersListReq
}
func newHLSServer(
parentCtx context.Context,
address string,
externalAuthenticationURL string,
hlsAlwaysRemux bool,
hlsVariant conf.HLSVariant,
hlsSegmentCount int,
hlsSegmentDuration conf.StringDuration,
hlsPartDuration conf.StringDuration,
hlsSegmentMaxSize conf.StringSize,
hlsAllowOrigin string,
hlsEncryption bool,
hlsServerKey string,
hlsServerCert string,
hlsTrustedProxies conf.IPsOrCIDRs,
readBufferCount int,
pathManager *pathManager,
metrics *metrics,
parent hlsServerParent,
) (*hlsServer, error) {
ln, err := net.Listen("tcp", address)
if err != nil {
return nil, err
}
var tlsConfig *tls.Config
if hlsEncryption {
crt, err := tls.LoadX509KeyPair(hlsServerCert, hlsServerKey)
if err != nil {
ln.Close()
return nil, err
}
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{crt},
}
}
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &hlsServer{
externalAuthenticationURL: externalAuthenticationURL,
hlsAlwaysRemux: hlsAlwaysRemux,
hlsVariant: hlsVariant,
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
hlsPartDuration: hlsPartDuration,
hlsSegmentMaxSize: hlsSegmentMaxSize,
hlsAllowOrigin: hlsAllowOrigin,
hlsTrustedProxies: hlsTrustedProxies,
readBufferCount: readBufferCount,
pathManager: pathManager,
parent: parent,
metrics: metrics,
ctx: ctx,
ctxCancel: ctxCancel,
ln: ln,
tlsConfig: tlsConfig,
muxers: make(map[string]*hlsMuxer),
chPathSourceReady: make(chan *path),
chPathSourceNotReady: make(chan *path),
request: make(chan *hlsMuxerRequest),
chMuxerClose: make(chan *hlsMuxer),
chAPIMuxerList: make(chan hlsServerAPIMuxersListReq),
}
s.log(logger.Info, "listener opened on "+address)
s.pathManager.hlsServerSet(s)
if s.metrics != nil {
s.metrics.hlsServerSet(s)
}
s.wg.Add(1)
go s.run()
return s, nil
}
// Log is the main logging function.
func (s *hlsServer) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[HLS] "+format, append([]interface{}{}, args...)...)
}
func (s *hlsServer) close() {
s.log(logger.Info, "listener is closing")
s.ctxCancel()
s.wg.Wait()
}
func (s *hlsServer) run() {
defer s.wg.Done()
router := gin.New()
router.NoRoute(s.onRequest)
tmp := make([]string, len(s.hlsTrustedProxies))
for i, entry := range s.hlsTrustedProxies {
tmp[i] = entry.String()
}
router.SetTrustedProxies(tmp)
hs := &http.Server{
Handler: router,
TLSConfig: s.tlsConfig,
ErrorLog: log.New(&nilWriter{}, "", 0),
}
if s.tlsConfig != nil {
go hs.ServeTLS(s.ln, "", "")
} else {
go hs.Serve(s.ln)
}
outer:
for {
select {
case pa := <-s.chPathSourceReady:
if s.hlsAlwaysRemux {
s.findOrCreateMuxer(pa.Name(), "", nil)
}
case pa := <-s.chPathSourceNotReady:
if s.hlsAlwaysRemux {
c, ok := s.muxers[pa.Name()]
if ok {
c.close()
delete(s.muxers, pa.Name())
}
}
case req := <-s.request:
s.findOrCreateMuxer(req.dir, req.ctx.ClientIP(), req)
case c := <-s.chMuxerClose:
if c2, ok := s.muxers[c.PathName()]; !ok || c2 != c {
continue
}
delete(s.muxers, c.PathName())
if s.hlsAlwaysRemux && c.remoteAddr == "" {
s.findOrCreateMuxer(c.PathName(), "", nil)
}
case req := <-s.chAPIMuxerList:
muxers := make(map[string]*hlsMuxer)
for name, m := range s.muxers {
muxers[name] = m
}
req.res <- hlsServerAPIMuxersListRes{
muxers: muxers,
}
case <-s.ctx.Done():
break outer
}
}
s.ctxCancel()
hs.Shutdown(context.Background())
s.ln.Close() // in case Shutdown() is called before Serve()
s.pathManager.hlsServerSet(nil)
if s.metrics != nil {
s.metrics.hlsServerSet(nil)
}
}
func (s *hlsServer) onRequest(ctx *gin.Context) {
s.log(logger.Debug, "[conn %v] %s %s", ctx.ClientIP(), ctx.Request.Method, ctx.Request.URL.Path)
byts, _ := httputil.DumpRequest(ctx.Request, true)
s.log(logger.Debug, "[conn %v] [c->s] %s", ctx.ClientIP(), string(byts))
logw := &httpLogWriter{ResponseWriter: ctx.Writer}
ctx.Writer = logw
ctx.Writer.Header().Set("Server", "rtsp-simple-server")
ctx.Writer.Header().Set("Access-Control-Allow-Origin", s.hlsAllowOrigin)
ctx.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
switch ctx.Request.Method {
case http.MethodGet:
case http.MethodOptions:
ctx.Writer.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
ctx.Writer.Header().Set("Access-Control-Allow-Headers", ctx.Request.Header.Get("Access-Control-Request-Headers"))
ctx.Writer.WriteHeader(http.StatusOK)
return
default:
ctx.Writer.WriteHeader(http.StatusNotFound)
return
}
// remove leading prefix
pa := ctx.Request.URL.Path[1:]
switch pa {
case "", "favicon.ico":
ctx.Writer.WriteHeader(http.StatusNotFound)
return
}
dir, fname := func() (string, string) {
if strings.HasSuffix(pa, ".m3u8") ||
strings.HasSuffix(pa, ".ts") ||
strings.HasSuffix(pa, ".mp4") {
return gopath.Dir(pa), gopath.Base(pa)
}
return pa, ""
}()
if fname == "" && !strings.HasSuffix(dir, "/") {
ctx.Writer.Header().Set("Location", "/"+dir+"/")
ctx.Writer.WriteHeader(http.StatusMovedPermanently)
return
}
dir = strings.TrimSuffix(dir, "/")
cres := make(chan func() *hls.MuxerFileResponse)
hreq := &hlsMuxerRequest{
dir: dir,
file: fname,
ctx: ctx,
res: cres,
}
select {
case s.request <- hreq:
cb := <-cres
res := cb()
for k, v := range res.Header {
ctx.Writer.Header().Set(k, v)
}
ctx.Writer.WriteHeader(res.Status)
if res.Body != nil {
io.Copy(ctx.Writer, res.Body)
}
case <-s.ctx.Done():
}
s.log(logger.Debug, "[conn %v] [s->c] %s", ctx.ClientIP(), logw.dump())
}
func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *hlsMuxerRequest) *hlsMuxer {
r, ok := s.muxers[pathName]
if !ok {
r = newHLSMuxer(
s.ctx,
pathName,
remoteAddr,
s.externalAuthenticationURL,
s.hlsVariant,
s.hlsSegmentCount,
s.hlsSegmentDuration,
s.hlsPartDuration,
s.hlsSegmentMaxSize,
s.readBufferCount,
req,
&s.wg,
pathName,
s.pathManager,
s)
s.muxers[pathName] = r
} else if req != nil {
r.request(req)
}
return r
}
// muxerClose is called by hlsMuxer.
func (s *hlsServer) muxerClose(c *hlsMuxer) {
select {
case s.chMuxerClose <- c:
case <-s.ctx.Done():
}
}
// pathSourceReady is called by pathManager.
func (s *hlsServer) pathSourceReady(pa *path) {
select {
case s.chPathSourceReady <- pa:
case <-s.ctx.Done():
}
}
// pathSourceNotReady is called by pathManager.
func (s *hlsServer) pathSourceNotReady(pa *path) {
select {
case s.chPathSourceNotReady <- pa:
case <-s.ctx.Done():
}
}
// apiHLSMuxersList is called by api.
func (s *hlsServer) apiHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes {
req.res = make(chan hlsServerAPIMuxersListRes)
select {
case s.chAPIMuxerList <- req:
res := <-req.res
res.data = &hlsServerAPIMuxersListData{
Items: make(map[string]hlsServerAPIMuxersListItem),
}
for _, pa := range res.muxers {
pa.apiHLSMuxersList(hlsServerAPIMuxersListSubReq{data: res.data})
}
return res
case <-s.ctx.Done():
return hlsServerAPIMuxersListRes{err: fmt.Errorf("terminated")}
}
}