move clienthls inside serverhls; remove clientman

This commit is contained in:
aler9 2021-04-27 19:04:05 +02:00
parent 22ba5f3f18
commit e3dcdf2204
7 changed files with 171 additions and 288 deletions

View File

@ -3,6 +3,7 @@ package clienthls
import (
"bytes"
"fmt"
"io"
"net"
"net/http"
"strconv"
@ -20,7 +21,6 @@ import (
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/h264"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/serverhls"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
@ -100,6 +100,15 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool {
return false
}
// Request is an HTTP request received by an HLS server.
type Request struct {
Path string
Subpath string
Req *http.Request
W http.ResponseWriter
Res chan io.Reader
}
type trackIDPayloadPair struct {
trackID int
buf []byte
@ -113,7 +122,7 @@ type PathMan interface {
// Parent is implemented by clientman.ClientMan.
type Parent interface {
Log(logger.Level, string, ...interface{})
OnClientClose(client.Client)
OnClientClose(*Client)
}
// Client is a HLS client.
@ -136,7 +145,7 @@ type Client struct {
lastRequestTime int64
// in
request chan serverhls.Request
request chan Request
terminate chan struct{}
}
@ -162,12 +171,12 @@ func New(
parent: parent,
lastRequestTime: time.Now().Unix(),
tsByName: make(map[string]*tsFile),
request: make(chan serverhls.Request),
request: make(chan Request),
terminate: make(chan struct{}),
}
atomic.AddInt64(c.stats.CountClients, 1)
c.log(logger.Info, "connected (HLS)")
c.log(logger.Info, "connected")
c.wg.Add(1)
go c.run()
@ -178,6 +187,7 @@ func New(
// Close closes a Client.
func (c *Client) Close() {
atomic.AddInt64(c.stats.CountClients, -1)
c.log(logger.Info, "disconnected")
close(c.terminate)
}
@ -193,7 +203,7 @@ func (c *Client) IsClient() {}
func (c *Client) IsSource() {}
func (c *Client) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[client hls/%s] "+format, append([]interface{}{c.pathName}, args...)...)
c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.pathName}, args...)...)
}
// PathName returns the path name of the client.
@ -203,7 +213,6 @@ func (c *Client) PathName() string {
func (c *Client) run() {
defer c.wg.Done()
defer c.log(logger.Info, "disconnected")
var videoTrack *gortsplib.Track
var h264SPS []byte
@ -586,7 +595,7 @@ func (c *Client) runRequestHandler(done chan struct{}) {
}
// OnRequest is called by clientman.ClientMan.
func (c *Client) OnRequest(req serverhls.Request) {
func (c *Client) OnRequest(req Request) {
c.request <- req
}

View File

@ -1,175 +0,0 @@
package clientman
import (
"sync"
"time"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/clienthls"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/serverhls"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
// PathManager is implemented by pathman.PathManager.
type PathManager interface {
OnClientDescribe(client.DescribeReq)
OnClientAnnounce(client.AnnounceReq)
OnClientSetupPlay(client.SetupPlayReq)
}
// Parent is implemented by program.
type Parent interface {
Log(logger.Level, string, ...interface{})
}
// ClientManager is a client manager.
type ClientManager struct {
hlsSegmentCount int
hlsSegmentDuration time.Duration
rtspAddress string
readTimeout time.Duration
writeTimeout time.Duration
readBufferCount int
runOnConnect string
runOnConnectRestart bool
protocols map[base.StreamProtocol]struct{}
stats *stats.Stats
pathMan PathManager
serverHLS *serverhls.Server
parent Parent
clients map[client.Client]struct{}
clientsByHLSPath map[string]*clienthls.Client
wg sync.WaitGroup
// in
clientClose chan client.Client
terminate chan struct{}
// out
done chan struct{}
}
// New allocates a ClientManager.
func New(
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
rtspAddress string,
readTimeout time.Duration,
writeTimeout time.Duration,
readBufferCount int,
runOnConnect string,
runOnConnectRestart bool,
protocols map[base.StreamProtocol]struct{},
stats *stats.Stats,
pathMan PathManager,
serverHLS *serverhls.Server,
parent Parent) *ClientManager {
cm := &ClientManager{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
protocols: protocols,
stats: stats,
pathMan: pathMan,
serverHLS: serverHLS,
parent: parent,
clients: make(map[client.Client]struct{}),
clientsByHLSPath: make(map[string]*clienthls.Client),
clientClose: make(chan client.Client),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
go cm.run()
return cm
}
// Close closes a ClientManager.
func (cm *ClientManager) Close() {
close(cm.terminate)
<-cm.done
}
// Log is the main logging function.
func (cm *ClientManager) Log(level logger.Level, format string, args ...interface{}) {
cm.parent.Log(level, format, args...)
}
func (cm *ClientManager) run() {
defer close(cm.done)
hlsRequest := func() chan serverhls.Request {
if cm.serverHLS != nil {
return cm.serverHLS.Request()
}
return make(chan serverhls.Request)
}()
outer:
for {
select {
case req := <-hlsRequest:
c, ok := cm.clientsByHLSPath[req.Path]
if !ok {
c = clienthls.New(
cm.hlsSegmentCount,
cm.hlsSegmentDuration,
cm.readBufferCount,
&cm.wg,
cm.stats,
req.Path,
cm.pathMan,
cm)
cm.clients[c] = struct{}{}
cm.clientsByHLSPath[req.Path] = c
}
c.OnRequest(req)
case c := <-cm.clientClose:
if _, ok := cm.clients[c]; !ok {
continue
}
cm.onClientClose(c)
case <-cm.terminate:
break outer
}
}
go func() {
for range cm.clientClose {
}
}()
for c := range cm.clients {
c.Close()
}
cm.wg.Wait()
close(cm.clientClose)
}
func (cm *ClientManager) onClientClose(c client.Client) {
delete(cm.clients, c)
if hc, ok := c.(*clienthls.Client); ok {
delete(cm.clientsByHLSPath, hc.PathName())
}
c.Close()
}
// OnClientClose is called by a client.
func (cm *ClientManager) OnClientClose(c client.Client) {
cm.clientClose <- c
}

View File

@ -129,7 +129,7 @@ func New(
}
atomic.AddInt64(c.stats.CountClients, 1)
c.log(logger.Info, "connected (RTMP)")
c.log(logger.Info, "connected")
c.wg.Add(1)
go c.run()
@ -140,6 +140,7 @@ func New(
// Close closes a Client.
func (c *Client) Close() {
atomic.AddInt64(c.stats.CountClients, -1)
c.log(logger.Info, "disconnected")
close(c.terminate)
}
@ -164,7 +165,6 @@ func (c *Client) ip() net.IP {
func (c *Client) run() {
defer c.wg.Done()
defer c.log(logger.Info, "disconnected")
if c.runOnConnect != "" {
_, port, _ := net.SplitHostPort(c.rtspAddress)

View File

@ -6,19 +6,15 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/aler9/rtsp-simple-server/internal/clienthls"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/pathman"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
// Request is an HTTP request received by the HLS server.
type Request struct {
Path string
Subpath string
Req *http.Request
W http.ResponseWriter
Res chan io.Reader
}
// Parent is implemented by program.
type Parent interface {
Log(logger.Level, string, ...interface{})
@ -26,18 +22,34 @@ type Parent interface {
// Server is an HLS server.
type Server struct {
parent Parent
hlsSegmentCount int
hlsSegmentDuration time.Duration
readBufferCount int
stats *stats.Stats
pathMan *pathman.PathManager
parent Parent
ln net.Listener
s *http.Server
ln net.Listener
wg sync.WaitGroup
clients map[string]*clienthls.Client
// in
request chan clienthls.Request
clientClose chan *clienthls.Client
terminate chan struct{}
// out
request chan Request
done chan struct{}
}
// New allocates a Server.
func New(
address string,
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
readBufferCount int,
stats *stats.Stats,
pathMan *pathman.PathManager,
parent Parent,
) (*Server, error) {
@ -47,40 +59,104 @@ func New(
}
s := &Server{
parent: parent,
ln: ln,
request: make(chan Request),
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
readBufferCount: readBufferCount,
stats: stats,
pathMan: pathMan,
parent: parent,
ln: ln,
clients: make(map[string]*clienthls.Client),
request: make(chan clienthls.Request),
clientClose: make(chan *clienthls.Client),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
s.s = &http.Server{
Handler: s,
}
s.Log(logger.Info, "listener opened on "+address)
s.log(logger.Info, "opened on "+address)
go s.s.Serve(s.ln)
go s.run()
return s, nil
}
func (s *Server) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[HLS listener] "+format, append([]interface{}{}, args...)...)
// Log is the main logging function.
func (s *Server) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[HLS] "+format, append([]interface{}{}, args...)...)
}
// Close closes all the server resources.
func (s *Server) Close() {
close(s.terminate)
<-s.done
}
func (s *Server) run() {
defer close(s.done)
hs := &http.Server{Handler: s}
go hs.Serve(s.ln)
outer:
for {
select {
case req := <-s.request:
c, ok := s.clients[req.Path]
if !ok {
c = clienthls.New(
s.hlsSegmentCount,
s.hlsSegmentDuration,
s.readBufferCount,
&s.wg,
s.stats,
req.Path,
s.pathMan,
s)
s.clients[req.Path] = c
}
c.OnRequest(req)
case c := <-s.clientClose:
if c2, ok := s.clients[c.PathName()]; !ok || c2 != c {
continue
}
s.doClientClose(c)
case <-s.terminate:
break outer
}
}
go func() {
for req := range s.request {
req.Res <- nil
for {
select {
case req, ok := <-s.request:
if !ok {
return
}
req.Res <- nil
case _, ok := <-s.clientClose:
if !ok {
return
}
}
}
}()
s.s.Shutdown(context.Background())
for _, c := range s.clients {
s.doClientClose(c)
}
hs.Shutdown(context.Background())
close(s.request)
close(s.clientClose)
}
// ServeHTTP implements http.Handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.log(logger.Info, "%s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
s.Log(logger.Info, "%s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
// remove leading prefix
path := r.URL.Path[1:]
@ -98,7 +174,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
cres := make(chan io.Reader)
s.request <- Request{
s.request <- clienthls.Request{
Path: parts[0],
Subpath: parts[1],
Req: r,
@ -125,7 +201,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
// Request returns a channel to handle incoming HTTP requests.
func (s *Server) Request() chan Request {
return s.request
func (s *Server) doClientClose(c *clienthls.Client) {
delete(s.clients, c.PathName())
c.Close()
}
// OnClientClose is called by a client.
func (s *Server) OnClientClose(c *clienthls.Client) {
s.clientClose <- c
}

View File

@ -97,7 +97,7 @@ func (s *Server) run() {
defer close(s.done)
s.wg.Add(1)
clientNew := make(chan net.Conn)
connNew := make(chan net.Conn)
acceptErr := make(chan error)
go func() {
defer s.wg.Done()
@ -108,7 +108,7 @@ func (s *Server) run() {
return err
}
clientNew <- conn
connNew <- conn
}
}()
}()
@ -120,7 +120,7 @@ outer:
s.Log(logger.Warn, "ERR: %s", err)
break outer
case nconn := <-clientNew:
case nconn := <-connNew:
c := clientrtmp.New(
s.rtspAddress,
s.readTimeout,
@ -154,7 +154,7 @@ outer:
return
}
case conn, ok := <-clientNew:
case conn, ok := <-connNew:
if !ok {
return
}
@ -177,7 +177,7 @@ outer:
s.wg.Wait()
close(acceptErr)
close(clientNew)
close(connNew)
close(s.clientClose)
}

View File

@ -141,7 +141,7 @@ func (s *Server) run() {
defer close(s.done)
s.wg.Add(1)
clientNew := make(chan *gortsplib.ServerConn)
connNew := make(chan *gortsplib.ServerConn)
acceptErr := make(chan error)
go func() {
defer s.wg.Done()
@ -152,7 +152,7 @@ func (s *Server) run() {
return err
}
clientNew <- conn
connNew <- conn
}
}()
}()
@ -164,7 +164,7 @@ outer:
s.Log(logger.Warn, "ERR: %s", err)
break outer
case conn := <-clientNew:
case conn := <-connNew:
c := clientrtsp.New(
s.isTLS,
s.rtspAddress,
@ -198,7 +198,7 @@ outer:
return
}
case conn, ok := <-clientNew:
case conn, ok := <-connNew:
if !ok {
return
}
@ -221,7 +221,7 @@ outer:
s.wg.Wait()
close(acceptErr)
close(clientNew)
close(connNew)
close(s.clientClose)
}

90
main.go
View File

@ -9,7 +9,6 @@ import (
"github.com/aler9/gortsplib"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/aler9/rtsp-simple-server/internal/clientman"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/confwatcher"
"github.com/aler9/rtsp-simple-server/internal/logger"
@ -33,12 +32,11 @@ type program struct {
logger *logger.Logger
metrics *metrics.Metrics
pprof *pprof.PPROF
pathMan *pathman.PathManager
serverRTSPPlain *serverrtsp.Server
serverRTSPTLS *serverrtsp.Server
serverRTMP *serverrtmp.Server
serverHLS *serverhls.Server
pathMan *pathman.PathManager
clientMan *clientman.ClientManager
confWatcher *confwatcher.ConfWatcher
terminate chan struct{}
@ -191,17 +189,6 @@ func (p *program) createResources(initial bool) error {
}
}
if !p.conf.HLSDisable {
if p.serverHLS == nil {
p.serverHLS, err = serverhls.New(
p.conf.HLSAddress,
p)
if err != nil {
return err
}
}
}
if p.pathMan == nil {
p.pathMan = pathman.New(
p.conf.RTSPAddress,
@ -215,23 +202,6 @@ func (p *program) createResources(initial bool) error {
p)
}
if p.clientMan == nil {
p.clientMan = clientman.New(
p.conf.HLSSegmentCount,
p.conf.HLSSegmentDuration,
p.conf.RTSPAddress,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
p.conf.ReadBufferCount,
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.conf.ProtocolsParsed,
p.stats,
p.pathMan,
p.serverHLS,
p)
}
if !p.conf.RTSPDisable &&
(p.conf.EncryptionParsed == conf.EncryptionNo ||
p.conf.EncryptionParsed == conf.EncryptionOptional) {
@ -310,6 +280,22 @@ func (p *program) createResources(initial bool) error {
}
}
if !p.conf.HLSDisable {
if p.serverHLS == nil {
p.serverHLS, err = serverhls.New(
p.conf.HLSAddress,
p.conf.HLSSegmentCount,
p.conf.HLSSegmentDuration,
p.conf.ReadBufferCount,
p.stats,
p.pathMan,
p)
if err != nil {
return err
}
}
}
return nil
}
@ -342,14 +328,6 @@ func (p *program) closeResources(newConf *conf.Conf) {
closePPROF = true
}
closeServerHLS := false
if newConf == nil ||
newConf.HLSDisable != p.conf.HLSDisable ||
newConf.HLSAddress != p.conf.HLSAddress ||
closeStats {
closeServerHLS = true
}
closePathMan := false
if newConf == nil ||
newConf.RTSPAddress != p.conf.RTSPAddress ||
@ -364,23 +342,6 @@ func (p *program) closeResources(newConf *conf.Conf) {
p.pathMan.OnProgramConfReload(newConf.Paths)
}
closeClientMan := false
if newConf == nil ||
closeServerHLS ||
closePathMan ||
newConf.HLSSegmentCount != p.conf.HLSSegmentCount ||
newConf.HLSSegmentDuration != p.conf.HLSSegmentDuration ||
newConf.RTSPAddress != p.conf.RTSPAddress ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
!reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) ||
closeStats {
closeClientMan = true
}
closeServerPlain := false
if newConf == nil ||
newConf.RTSPDisable != p.conf.RTSPDisable ||
@ -435,6 +396,18 @@ func (p *program) closeResources(newConf *conf.Conf) {
closeServerRTMP = true
}
closeServerHLS := false
if newConf == nil ||
newConf.HLSDisable != p.conf.HLSDisable ||
newConf.HLSAddress != p.conf.HLSAddress ||
newConf.HLSSegmentCount != p.conf.HLSSegmentCount ||
newConf.HLSSegmentDuration != p.conf.HLSSegmentDuration ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
closeStats ||
closePathMan {
closeServerHLS = true
}
if closeServerTLS && p.serverRTSPTLS != nil {
p.serverRTSPTLS.Close()
p.serverRTSPTLS = nil
@ -445,11 +418,6 @@ func (p *program) closeResources(newConf *conf.Conf) {
p.serverRTSPPlain = nil
}
if closeClientMan && p.clientMan != nil {
p.clientMan.Close()
p.clientMan = nil
}
if closePathMan && p.pathMan != nil {
p.pathMan.Close()
p.pathMan = nil