mirror of
https://github.com/bluenviron/mediamtx
synced 2025-02-10 00:18:22 +00:00
converterhls: use contexts
This commit is contained in:
parent
baf2100ad6
commit
e450881446
@ -161,13 +161,13 @@ func (c *Conn) run() {
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
connErr := make(chan error)
|
||||
runErr := make(chan error)
|
||||
go func() {
|
||||
connErr <- c.runInner(ctx)
|
||||
runErr <- c.runInner(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-connErr:
|
||||
case err := <-runErr:
|
||||
cancel()
|
||||
|
||||
if err != io.EOF {
|
||||
@ -176,7 +176,7 @@ func (c *Conn) run() {
|
||||
|
||||
case <-c.terminate:
|
||||
cancel()
|
||||
<-connErr
|
||||
<-runErr
|
||||
}
|
||||
|
||||
if c.path != nil {
|
||||
|
@ -2,6 +2,7 @@ package converterhls
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
@ -144,8 +145,9 @@ type Converter struct {
|
||||
lastRequestTime int64
|
||||
|
||||
// in
|
||||
request chan Request
|
||||
terminate chan struct{}
|
||||
request chan Request
|
||||
terminate chan struct{}
|
||||
parentTerminate chan struct{}
|
||||
}
|
||||
|
||||
// New allocates a Converter.
|
||||
@ -171,7 +173,8 @@ func New(
|
||||
lastRequestTime: time.Now().Unix(),
|
||||
tsByName: make(map[string]*tsFile),
|
||||
request: make(chan Request),
|
||||
terminate: make(chan struct{}),
|
||||
terminate: make(chan struct{}, 1),
|
||||
parentTerminate: make(chan struct{}),
|
||||
}
|
||||
|
||||
c.log(logger.Info, "opened")
|
||||
@ -185,12 +188,15 @@ func New(
|
||||
// ParentClose closes a Converter.
|
||||
func (c *Converter) ParentClose() {
|
||||
c.log(logger.Info, "closed")
|
||||
close(c.terminate)
|
||||
close(c.parentTerminate)
|
||||
}
|
||||
|
||||
// Close closes a Converter.
|
||||
func (c *Converter) Close() {
|
||||
c.parent.OnConverterClose(c)
|
||||
select {
|
||||
case c.terminate <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// IsReadPublisher implements readpublisher.ReadPublisher.
|
||||
@ -211,6 +217,57 @@ func (c *Converter) PathName() string {
|
||||
func (c *Converter) run() {
|
||||
defer c.wg.Done()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
runErr := make(chan error)
|
||||
go func() {
|
||||
runErr <- c.runInner(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-runErr:
|
||||
cancel()
|
||||
c.log(logger.Info, "ERR: %s", err)
|
||||
|
||||
case <-c.terminate:
|
||||
cancel()
|
||||
<-runErr
|
||||
}
|
||||
|
||||
go func() {
|
||||
for req := range c.request {
|
||||
req.W.WriteHeader(http.StatusInternalServerError)
|
||||
req.Res <- nil
|
||||
}
|
||||
}()
|
||||
|
||||
if c.path != nil {
|
||||
res := make(chan struct{})
|
||||
c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
|
||||
<-res
|
||||
}
|
||||
|
||||
c.parent.OnConverterClose(c)
|
||||
<-c.parentTerminate
|
||||
|
||||
close(c.request)
|
||||
}
|
||||
|
||||
func (c *Converter) runInner(ctx context.Context) error {
|
||||
pres := make(chan readpublisher.SetupPlayRes)
|
||||
c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{
|
||||
Author: c,
|
||||
PathName: c.pathName,
|
||||
IP: nil,
|
||||
ValidateCredentials: nil,
|
||||
Res: pres,
|
||||
})
|
||||
res := <-pres
|
||||
|
||||
if res.Err != nil {
|
||||
return res.Err
|
||||
}
|
||||
|
||||
c.path = res.Path
|
||||
var videoTrack *gortsplib.Track
|
||||
var h264SPS []byte
|
||||
var h264PPS []byte
|
||||
@ -219,85 +276,43 @@ func (c *Converter) run() {
|
||||
var aacConfig rtpaac.MPEG4AudioConfig
|
||||
var aacDecoder *rtpaac.Decoder
|
||||
|
||||
err := func() error {
|
||||
pres := make(chan readpublisher.SetupPlayRes)
|
||||
c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{
|
||||
Author: c,
|
||||
PathName: c.pathName,
|
||||
IP: nil,
|
||||
ValidateCredentials: nil,
|
||||
Res: pres,
|
||||
})
|
||||
res := <-pres
|
||||
|
||||
if res.Err != nil {
|
||||
return res.Err
|
||||
}
|
||||
|
||||
c.path = res.Path
|
||||
|
||||
for i, t := range res.Tracks {
|
||||
if t.IsH264() {
|
||||
if videoTrack != nil {
|
||||
return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1)
|
||||
}
|
||||
videoTrack = t
|
||||
|
||||
var err error
|
||||
h264SPS, h264PPS, err = t.ExtractDataH264()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h264Decoder = rtph264.NewDecoder()
|
||||
|
||||
} else if t.IsAAC() {
|
||||
if audioTrack != nil {
|
||||
return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1)
|
||||
}
|
||||
audioTrack = t
|
||||
|
||||
byts, err := t.ExtractDataAAC()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = aacConfig.Decode(byts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
aacDecoder = rtpaac.NewDecoder(aacConfig.SampleRate)
|
||||
for i, t := range res.Tracks {
|
||||
if t.IsH264() {
|
||||
if videoTrack != nil {
|
||||
return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1)
|
||||
}
|
||||
}
|
||||
videoTrack = t
|
||||
|
||||
if videoTrack == nil && audioTrack == nil {
|
||||
return fmt.Errorf("unable to find a video or audio track")
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
c.log(logger.Info, "ERR: %s", err)
|
||||
|
||||
go func() {
|
||||
for req := range c.request {
|
||||
req.W.WriteHeader(http.StatusNotFound)
|
||||
req.Res <- nil
|
||||
var err error
|
||||
h264SPS, h264PPS, err = t.ExtractDataH264()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
if c.path != nil {
|
||||
res := make(chan struct{})
|
||||
c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
|
||||
<-res
|
||||
h264Decoder = rtph264.NewDecoder()
|
||||
|
||||
} else if t.IsAAC() {
|
||||
if audioTrack != nil {
|
||||
return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1)
|
||||
}
|
||||
audioTrack = t
|
||||
|
||||
byts, err := t.ExtractDataAAC()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = aacConfig.Decode(byts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
aacDecoder = rtpaac.NewDecoder(aacConfig.SampleRate)
|
||||
}
|
||||
}
|
||||
|
||||
c.parent.OnConverterClose(c)
|
||||
<-c.terminate
|
||||
|
||||
close(c.request)
|
||||
return
|
||||
if videoTrack == nil && audioTrack == nil {
|
||||
return fmt.Errorf("unable to find a video or audio track")
|
||||
}
|
||||
|
||||
curTSFile := newTSFile(videoTrack, audioTrack)
|
||||
@ -308,12 +323,13 @@ func (c *Converter) run() {
|
||||
curTSFile.Close()
|
||||
}()
|
||||
|
||||
requestDone := make(chan struct{})
|
||||
go c.runRequestHandler(requestDone)
|
||||
requestHandlerTerminate := make(chan struct{})
|
||||
requestHandlerDone := make(chan struct{})
|
||||
go c.runRequestHandler(requestHandlerTerminate, requestHandlerDone)
|
||||
|
||||
defer func() {
|
||||
close(c.request)
|
||||
<-requestDone
|
||||
close(requestHandlerTerminate)
|
||||
<-requestHandlerDone
|
||||
}()
|
||||
|
||||
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
|
||||
@ -478,118 +494,102 @@ func (c *Converter) run() {
|
||||
case <-closeCheckTicker.C:
|
||||
t := time.Unix(atomic.LoadInt64(&c.lastRequestTime), 0)
|
||||
if time.Since(t) >= closeAfterInactivity {
|
||||
|
||||
c.ringBuffer.Close()
|
||||
<-writerDone
|
||||
|
||||
res := make(chan struct{})
|
||||
c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
|
||||
<-res
|
||||
|
||||
c.parent.OnConverterClose(c)
|
||||
<-c.terminate
|
||||
return
|
||||
return fmt.Errorf("TODO")
|
||||
}
|
||||
|
||||
case err := <-writerDone:
|
||||
c.log(logger.Info, "ERR: %s", err)
|
||||
return err
|
||||
|
||||
res := make(chan struct{})
|
||||
c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
|
||||
<-res
|
||||
|
||||
c.parent.OnConverterClose(c)
|
||||
<-c.terminate
|
||||
return
|
||||
|
||||
case <-c.terminate:
|
||||
res := make(chan struct{})
|
||||
c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
|
||||
<-res
|
||||
|
||||
c.ringBuffer.Close()
|
||||
<-writerDone
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("TODO")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Converter) runRequestHandler(done chan struct{}) {
|
||||
func (c *Converter) runRequestHandler(terminate chan struct{}, done chan struct{}) {
|
||||
defer close(done)
|
||||
|
||||
for preq := range c.request {
|
||||
req := preq
|
||||
for {
|
||||
select {
|
||||
case <-terminate:
|
||||
return
|
||||
|
||||
atomic.StoreInt64(&c.lastRequestTime, time.Now().Unix())
|
||||
case preq := <-c.request:
|
||||
req := preq
|
||||
|
||||
conf := c.path.Conf()
|
||||
atomic.StoreInt64(&c.lastRequestTime, time.Now().Unix())
|
||||
|
||||
if conf.ReadIPsParsed != nil {
|
||||
tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr)
|
||||
ip := net.ParseIP(tmp)
|
||||
if !ipEqualOrInRange(ip, conf.ReadIPsParsed) {
|
||||
c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
|
||||
req.W.WriteHeader(http.StatusUnauthorized)
|
||||
req.Res <- nil
|
||||
continue
|
||||
conf := c.path.Conf()
|
||||
|
||||
if conf.ReadIPsParsed != nil {
|
||||
tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr)
|
||||
ip := net.ParseIP(tmp)
|
||||
if !ipEqualOrInRange(ip, conf.ReadIPsParsed) {
|
||||
c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
|
||||
req.W.WriteHeader(http.StatusUnauthorized)
|
||||
req.Res <- nil
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if conf.ReadUser != "" {
|
||||
user, pass, ok := req.Req.BasicAuth()
|
||||
if !ok || user != conf.ReadUser || pass != conf.ReadPass {
|
||||
req.W.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`)
|
||||
req.W.WriteHeader(http.StatusUnauthorized)
|
||||
req.Res <- nil
|
||||
continue
|
||||
if conf.ReadUser != "" {
|
||||
user, pass, ok := req.Req.BasicAuth()
|
||||
if !ok || user != conf.ReadUser || pass != conf.ReadPass {
|
||||
req.W.Header().Set("WWW-Authenticate", `Basic realm="rtsp-simple-server"`)
|
||||
req.W.WriteHeader(http.StatusUnauthorized)
|
||||
req.Res <- nil
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case req.Subpath == "stream.m3u8":
|
||||
func() {
|
||||
switch {
|
||||
case req.Subpath == "stream.m3u8":
|
||||
func() {
|
||||
c.tsMutex.Lock()
|
||||
defer c.tsMutex.Unlock()
|
||||
|
||||
if len(c.tsQueue) == 0 {
|
||||
req.W.WriteHeader(http.StatusNotFound)
|
||||
req.Res <- nil
|
||||
return
|
||||
}
|
||||
|
||||
cnt := "#EXTM3U\n"
|
||||
cnt += "#EXT-X-VERSION:3\n"
|
||||
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
|
||||
cnt += "#EXT-X-TARGETDURATION:10\n"
|
||||
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(c.tsDeleteCount), 10) + "\n"
|
||||
for _, f := range c.tsQueue {
|
||||
cnt += "#EXTINF:10,\n"
|
||||
cnt += f.Name() + ".ts\n"
|
||||
}
|
||||
req.Res <- bytes.NewReader([]byte(cnt))
|
||||
}()
|
||||
|
||||
case strings.HasSuffix(req.Subpath, ".ts"):
|
||||
base := strings.TrimSuffix(req.Subpath, ".ts")
|
||||
|
||||
c.tsMutex.Lock()
|
||||
defer c.tsMutex.Unlock()
|
||||
f, ok := c.tsByName[base]
|
||||
c.tsMutex.Unlock()
|
||||
|
||||
if len(c.tsQueue) == 0 {
|
||||
if !ok {
|
||||
req.W.WriteHeader(http.StatusNotFound)
|
||||
req.Res <- nil
|
||||
return
|
||||
continue
|
||||
}
|
||||
|
||||
cnt := "#EXTM3U\n"
|
||||
cnt += "#EXT-X-VERSION:3\n"
|
||||
cnt += "#EXT-X-ALLOW-CACHE:NO\n"
|
||||
cnt += "#EXT-X-TARGETDURATION:10\n"
|
||||
cnt += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(c.tsDeleteCount), 10) + "\n"
|
||||
for _, f := range c.tsQueue {
|
||||
cnt += "#EXTINF:10,\n"
|
||||
cnt += f.Name() + ".ts\n"
|
||||
}
|
||||
req.Res <- bytes.NewReader([]byte(cnt))
|
||||
}()
|
||||
req.Res <- f.buf.NewReader()
|
||||
|
||||
case strings.HasSuffix(req.Subpath, ".ts"):
|
||||
base := strings.TrimSuffix(req.Subpath, ".ts")
|
||||
case req.Subpath == "":
|
||||
req.Res <- bytes.NewReader([]byte(index))
|
||||
|
||||
c.tsMutex.Lock()
|
||||
f, ok := c.tsByName[base]
|
||||
c.tsMutex.Unlock()
|
||||
|
||||
if !ok {
|
||||
default:
|
||||
req.W.WriteHeader(http.StatusNotFound)
|
||||
req.Res <- nil
|
||||
continue
|
||||
}
|
||||
|
||||
req.Res <- f.buf.NewReader()
|
||||
|
||||
case req.Subpath == "":
|
||||
req.Res <- bytes.NewReader([]byte(index))
|
||||
|
||||
default:
|
||||
req.W.WriteHeader(http.StatusNotFound)
|
||||
req.Res <- nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -112,9 +112,9 @@ func (s *Source) run() {
|
||||
func (s *Source) runInner() bool {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan error)
|
||||
runErr := make(chan error)
|
||||
go func() {
|
||||
done <- func() error {
|
||||
runErr <- func() error {
|
||||
s.log(logger.Debug, "connecting")
|
||||
|
||||
ctx2, cancel2 := context.WithTimeout(ctx, s.readTimeout)
|
||||
@ -261,14 +261,14 @@ func (s *Source) runInner() bool {
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
case err := <-runErr:
|
||||
cancel()
|
||||
s.log(logger.Info, "ERR: %s", err)
|
||||
return true
|
||||
|
||||
case <-s.terminate:
|
||||
cancel()
|
||||
<-done
|
||||
<-runErr
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user