From 58fe1cfe77d18347b4a6f4cd19964c487db2a403 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 5 Nov 2020 12:30:25 +0100 Subject: [PATCH] add docs --- README.md | 2 +- internal/client/client.go | 40 +++++++++++++-------- internal/clientman/clientman.go | 9 +++++ internal/conf/conf.go | 2 ++ internal/conf/pathconf.go | 2 ++ internal/conf/utils.go | 1 + internal/confenv/confenv.go | 1 + internal/confwatcher/confwatcher.go | 4 +++ internal/externalcmd/externalcmd.go | 4 +++ internal/loghandler/loghandler.go | 9 +++++ internal/metrics/metrics.go | 4 +++ internal/path/path.go | 56 ++++++++++++++++++++--------- internal/path/readersmap.go | 10 +++--- internal/pathman/pathman.go | 12 +++++++ internal/pprof/pprof.go | 6 ++++ internal/servertcp/server.go | 5 +++ internal/serverudp/server.go | 9 +++++ internal/sourcertmp/source.go | 6 ++++ internal/sourcertsp/source.go | 6 ++++ internal/stats/stats.go | 2 ++ internal/syslog/syslog_unix.go | 3 ++ internal/syslog/syslog_win.go | 3 ++ main.go | 1 + 23 files changed, 160 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 646542d5..d8146b0c 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/aler9/rtsp-simple-server)](https://goreportcard.com/report/github.com/aler9/rtsp-simple-server) [![Docker Hub](https://img.shields.io/badge/docker-aler9%2Frtsp--simple--server-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server) -_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server and RTSP proxy, a software that allows multiple users to publish and read live video and audio streams over time. RTSP is a standard protocol that describes how to perform these operations with the help of a server, that is contacted by both readers and publishers and relays the publisher streams to the readers. +_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server and RTSP proxy, a software that allows multiple users to publish, read and proxy live video and audio streams over time. RTSP is a standard protocol that describes how to perform these operations with the help of a server, that is contacted by both publishers and readers and relays the publisher's streams to the readers. Features: * Read and publish live streams with UDP and TCP diff --git a/internal/client/client.go b/internal/client/client.go index 2795ab69..06aa778e 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -78,6 +78,7 @@ func (cs state) String() string { return "Invalid" } +// Path is implemented by path.Path. type Path interface { Name() string SourceTrackCount() int @@ -88,6 +89,7 @@ type Path interface { OnFrame(int, gortsplib.StreamType, []byte) } +// Parent is implemented by clientman.ClientMan. type Parent interface { Log(string, ...interface{}) OnClientClose(*Client) @@ -96,6 +98,7 @@ type Parent interface { OnClientSetupPlay(*Client, string, int, *base.Request) (Path, error) } +// Client is a RTSP client. type Client struct { rtspPort int readTimeout time.Duration @@ -128,6 +131,7 @@ type Client struct { terminate chan struct{} } +// New allocates a Client. func New( rtspPort int, readTimeout time.Duration, @@ -174,11 +178,13 @@ func New( return c } +// Close closes a Client. func (c *Client) Close() { atomic.AddInt64(c.stats.CountClients, -1) close(c.terminate) } +// IsSource implementes path.source. func (c *Client) IsSource() {} func (c *Client) log(format string, args ...interface{}) { @@ -240,22 +246,23 @@ func (c *Client) writeResError(cseq base.HeaderValue, code base.StatusCode, err }) } -type ErrAuthNotCritical struct { +type errAuthNotCritical struct { *base.Response } -func (ErrAuthNotCritical) Error() string { +func (errAuthNotCritical) Error() string { return "auth not critical" } -type ErrAuthCritical struct { +type errAuthCritical struct { *base.Response } -func (ErrAuthCritical) Error() string { +func (errAuthCritical) Error() string { return "auth critical" } +// Authenticate performs an authentication. func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{}, user string, pass string, req *base.Request) error { // validate ip if ips != nil { @@ -264,7 +271,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ if !ipEqualOrInRange(ip, ips) { c.log("ERR: ip '%s' not allowed", ip) - return ErrAuthCritical{&base.Response{ + return errAuthCritical{&base.Response{ StatusCode: base.StatusUnauthorized, Header: base.Header{ "CSeq": req.Header["CSeq"], @@ -296,7 +303,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ if c.authFailures > 3 { c.log("ERR: unauthorized: %s", err) - return ErrAuthCritical{&base.Response{ + return errAuthCritical{&base.Response{ StatusCode: base.StatusUnauthorized, Header: base.Header{ "CSeq": req.Header["CSeq"], @@ -309,7 +316,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ c.log("WARN: unauthorized: %s", err) } - return ErrAuthNotCritical{&base.Response{ + return errAuthNotCritical{&base.Response{ StatusCode: base.StatusUnauthorized, Header: base.Header{ "CSeq": req.Header["CSeq"], @@ -382,11 +389,11 @@ func (c *Client) handleRequest(req *base.Request) error { path, err := c.parent.OnClientDescribe(c, basePath, req) if err != nil { switch terr := err.(type) { - case ErrAuthNotCritical: + case errAuthNotCritical: c.conn.WriteResponse(terr.Response) return nil - case ErrAuthCritical: + case errAuthCritical: c.conn.WriteResponse(terr.Response) return errRunTerminate @@ -441,11 +448,11 @@ func (c *Client) handleRequest(req *base.Request) error { path, err := c.parent.OnClientAnnounce(c, basePath, tracks, req) if err != nil { switch terr := err.(type) { - case ErrAuthNotCritical: + case errAuthNotCritical: c.conn.WriteResponse(terr.Response) return nil - case ErrAuthCritical: + case errAuthCritical: c.conn.WriteResponse(terr.Response) return errRunTerminate @@ -534,11 +541,11 @@ func (c *Client) handleRequest(req *base.Request) error { path, err := c.parent.OnClientSetupPlay(c, basePath, trackId, req) if err != nil { switch terr := err.(type) { - case ErrAuthNotCritical: + case errAuthNotCritical: c.conn.WriteResponse(terr.Response) return nil - case ErrAuthCritical: + case errAuthCritical: c.conn.WriteResponse(terr.Response) return errRunTerminate @@ -592,11 +599,11 @@ func (c *Client) handleRequest(req *base.Request) error { path, err := c.parent.OnClientSetupPlay(c, basePath, trackId, req) if err != nil { switch terr := err.(type) { - case ErrAuthNotCritical: + case errAuthNotCritical: c.conn.WriteResponse(terr.Response) return nil - case ErrAuthCritical: + case errAuthCritical: c.conn.WriteResponse(terr.Response) return errRunTerminate @@ -1288,6 +1295,7 @@ func (c *Client) runRecordTCP() { } } +// OnUdpPublisherFrame implements serverudp.Publisher. func (c *Client) OnUdpPublisherFrame(trackId int, streamType base.StreamType, buf []byte) { atomic.StoreInt64(c.udpLastFrameTimes[trackId], time.Now().Unix()) @@ -1295,6 +1303,7 @@ func (c *Client) OnUdpPublisherFrame(trackId int, streamType base.StreamType, bu c.path.OnFrame(trackId, streamType, buf) } +// OnReaderFrame implements path.Reader. func (c *Client) OnReaderFrame(trackId int, streamType base.StreamType, buf []byte) { track, ok := c.streamTracks[trackId] if !ok { @@ -1326,6 +1335,7 @@ func (c *Client) OnReaderFrame(trackId int, streamType base.StreamType, buf []by } } +// OnPathDescribeData is called by path.Path. func (c *Client) OnPathDescribeData(sdp []byte, redirect string, err error) { c.describeData <- describeData{sdp, redirect, err} } diff --git a/internal/clientman/clientman.go b/internal/clientman/clientman.go index 55707d9a..163227c8 100644 --- a/internal/clientman/clientman.go +++ b/internal/clientman/clientman.go @@ -15,10 +15,12 @@ import ( "github.com/aler9/rtsp-simple-server/internal/stats" ) +// Parent is implemented by program. type Parent interface { Log(string, ...interface{}) } +// ClientManager is a client.Client manager. type ClientManager struct { rtspPort int readTimeout time.Duration @@ -44,6 +46,7 @@ type ClientManager struct { done chan struct{} } +// New allocates a ClientManager. func New( rtspPort int, readTimeout time.Duration, @@ -81,11 +84,13 @@ func New( return cm } +// Close closes a ClientManager. func (cm *ClientManager) Close() { close(cm.terminate) <-cm.done } +// Log is the main logging function. func (cm *ClientManager) Log(format string, args ...interface{}) { cm.parent.Log(format, args...) } @@ -137,18 +142,22 @@ outer: close(cm.clientClose) } +// OnClientClose is called by client.Client. func (cm *ClientManager) OnClientClose(c *client.Client) { cm.clientClose <- c } +// OnClientDescribe is called by client.Client. func (cm *ClientManager) OnClientDescribe(c *client.Client, pathName string, req *base.Request) (client.Path, error) { return cm.pathMan.OnClientDescribe(c, pathName, req) } +// OnClientAnnounce is called by client.Client. func (cm *ClientManager) OnClientAnnounce(c *client.Client, pathName string, tracks gortsplib.Tracks, req *base.Request) (client.Path, error) { return cm.pathMan.OnClientAnnounce(c, pathName, tracks, req) } +// OnClientSetupPlay is called by client.Client. func (cm *ClientManager) OnClientSetupPlay(c *client.Client, pathName string, trackId int, req *base.Request) (client.Path, error) { return cm.pathMan.OnClientSetupPlay(c, pathName, trackId, req) } diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 560f4024..38ef44e2 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -13,6 +13,7 @@ import ( "github.com/aler9/rtsp-simple-server/internal/loghandler" ) +// Conf is the program configuration. type Conf struct { Protocols []string `yaml:"protocols"` ProtocolsParsed map[gortsplib.StreamProtocol]struct{} `yaml:"-" json:"-"` @@ -143,6 +144,7 @@ func (conf *Conf) fillAndCheck() error { return nil } +// Load loads a Conf. func Load(fpath string) (*Conf, error) { conf := &Conf{} diff --git a/internal/conf/pathconf.go b/internal/conf/pathconf.go index 31afd0ea..12eb3e1f 100644 --- a/internal/conf/pathconf.go +++ b/internal/conf/pathconf.go @@ -16,6 +16,7 @@ var reUserPass = regexp.MustCompile("^[a-zA-Z0-9!\\$\\(\\)\\*\\+\\.;<=>\\[\\]\\^ const userPassSupportedChars = "A-Z,0-9,!,$,(,),*,+,.,;,<,=,>,[,],^,_,-,{,}" +// PathConf is a path configuration. type PathConf struct { Regexp *regexp.Regexp `yaml:"-" json:"-"` Source string `yaml:"source"` @@ -224,6 +225,7 @@ func (pconf *PathConf) fillAndCheck(name string) error { return nil } +// Equal checks whether two PathConfs are equal. func (pconf *PathConf) Equal(other *PathConf) bool { a, _ := json.Marshal(pconf) b, _ := json.Marshal(pconf) diff --git a/internal/conf/utils.go b/internal/conf/utils.go index 9e01ecd3..f3953fb9 100644 --- a/internal/conf/utils.go +++ b/internal/conf/utils.go @@ -8,6 +8,7 @@ import ( var rePathName = regexp.MustCompile("^[0-9a-zA-Z_\\-/]+$") +// CheckPathName check if a path name is valid. func CheckPathName(name string) error { if name == "" { return fmt.Errorf("cannot be empty") diff --git a/internal/confenv/confenv.go b/internal/confenv/confenv.go index 91907e9b..b55483d0 100644 --- a/internal/confenv/confenv.go +++ b/internal/confenv/confenv.go @@ -125,6 +125,7 @@ func load(env map[string]string, envKey string, rv reflect.Value) error { return fmt.Errorf("unsupported type: %v", rt) } +// Load fills a structure with data from the environment. func Load(envKey string, v interface{}) error { env := make(map[string]string) for _, kv := range os.Environ() { diff --git a/internal/confwatcher/confwatcher.go b/internal/confwatcher/confwatcher.go index f9c2f189..d47ae8e3 100644 --- a/internal/confwatcher/confwatcher.go +++ b/internal/confwatcher/confwatcher.go @@ -7,6 +7,7 @@ import ( "github.com/fsnotify/fsnotify" ) +// ConfWatcher is a configuration file watcher. type ConfWatcher struct { inner *fsnotify.Watcher @@ -15,6 +16,7 @@ type ConfWatcher struct { done chan struct{} } +// New allocates a ConfWatcher. func New(confPath string) (*ConfWatcher, error) { inner, err := fsnotify.NewWatcher() if err != nil { @@ -39,6 +41,7 @@ func New(confPath string) (*ConfWatcher, error) { return w, nil } +// Close closes a ConfWatcher. func (w *ConfWatcher) Close() { go func() { for range w.signal { @@ -69,6 +72,7 @@ outer: close(w.signal) } +// Watch returns when the configuration file has changed. func (w *ConfWatcher) Watch() chan struct{} { return w.signal } diff --git a/internal/externalcmd/externalcmd.go b/internal/externalcmd/externalcmd.go index 3fe8ce72..88bd5678 100644 --- a/internal/externalcmd/externalcmd.go +++ b/internal/externalcmd/externalcmd.go @@ -12,11 +12,13 @@ const ( retryPause = 5 * time.Second ) +// Environment is a ExternalCmd environment. type Environment struct { Path string Port string } +// ExternalCmd is an external command. type ExternalCmd struct { cmdstr string restart bool @@ -29,6 +31,7 @@ type ExternalCmd struct { done chan struct{} } +// New allocates an ExternalCmd. func New(cmdstr string, restart bool, env Environment) *ExternalCmd { e := &ExternalCmd{ cmdstr: cmdstr, @@ -42,6 +45,7 @@ func New(cmdstr string, restart bool, env Environment) *ExternalCmd { return e } +// Close closes an ExternalCmd. func (e *ExternalCmd) Close() { close(e.terminate) <-e.done diff --git a/internal/loghandler/loghandler.go b/internal/loghandler/loghandler.go index e756589a..12d54946 100644 --- a/internal/loghandler/loghandler.go +++ b/internal/loghandler/loghandler.go @@ -8,11 +8,17 @@ import ( "github.com/aler9/rtsp-simple-server/internal/syslog" ) +// Destination is a log destination. type Destination int const ( + // DestinationStdout writes logs to the standard output. DestinationStdout Destination = iota + + // DestinationFile writes logs to a file. DestinationFile + + // DestinationSyslog writes logs to the system logger. DestinationSyslog ) @@ -22,6 +28,7 @@ func (f writeFunc) Write(p []byte) (int, error) { return f(p) } +// LogHandler is a log handler. type LogHandler struct { destinations map[Destination]struct{} @@ -29,6 +36,7 @@ type LogHandler struct { syslog io.WriteCloser } +// New allocates a log handler. func New(destinations map[Destination]struct{}, filePath string) (*LogHandler, error) { lh := &LogHandler{ destinations: destinations, @@ -57,6 +65,7 @@ func New(destinations map[Destination]struct{}, filePath string) (*LogHandler, e return lh, nil } +// Close closes a log handler. func (lh *LogHandler) Close() { if lh.file != nil { lh.file.Close() diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 56d24564..bf010c1f 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -16,10 +16,12 @@ const ( address = ":9998" ) +// Parent is implemented by program. type Parent interface { Log(string, ...interface{}) } +// Metrics is a metrics exporter. type Metrics struct { stats *stats.Stats @@ -28,6 +30,7 @@ type Metrics struct { server *http.Server } +// New allocates a metrics. func New(stats *stats.Stats, parent Parent) (*Metrics, error) { listener, err := net.Listen("tcp", address) if err != nil { @@ -52,6 +55,7 @@ func New(stats *stats.Stats, parent Parent) (*Metrics, error) { return m, nil } +// Close closes a Metrics. func (m *Metrics) Close() { m.server.Shutdown(context.Background()) } diff --git a/internal/path/path.go b/internal/path/path.go index b2fe4f30..fcdee065 100644 --- a/internal/path/path.go +++ b/internal/path/path.go @@ -25,6 +25,7 @@ func newEmptyTimer() *time.Timer { return t } +// Parent is implemented by pathman.PathMan. type Parent interface { Log(string, ...interface{}) OnPathClose(*Path) @@ -53,11 +54,13 @@ type sourceRedirect struct{} func (*sourceRedirect) IsSource() {} +// ClientDescribeRes is a client describe response. type ClientDescribeRes struct { Path client.Path Err error } +// ClientDescribeReq is a client describe request. type ClientDescribeReq struct { Res chan ClientDescribeRes Client *client.Client @@ -65,11 +68,13 @@ type ClientDescribeReq struct { Req *base.Request } +// ClientAnnounceRes is a client announce response. type ClientAnnounceRes struct { Path client.Path Err error } +// ClientAnnounceReq is a client announce request. type ClientAnnounceReq struct { Res chan ClientAnnounceRes Client *client.Client @@ -78,11 +83,13 @@ type ClientAnnounceReq struct { Req *base.Request } +// ClientSetupPlayRes is a setup/play response. type ClientSetupPlayRes struct { Path client.Path Err error } +// ClientSetupPlayReq is a setup/play request. type ClientSetupPlayReq struct { Res chan ClientSetupPlayRes Client *client.Client @@ -125,6 +132,7 @@ const ( sourceStateReady ) +// Path is a path. type Path struct { rtspPort int readTimeout time.Duration @@ -166,6 +174,7 @@ type Path struct { terminate chan struct{} } +// New allocates a Path. func New( rtspPort int, readTimeout time.Duration, @@ -209,10 +218,12 @@ func New( return pa } +// Close closes a path. func (pa *Path) Close() { close(pa.terminate) } +// Log is the main logging function. func (pa *Path) Log(format string, args ...interface{}) { pa.parent.Log("[path "+pa.name+"] "+format, args...) } @@ -747,62 +758,75 @@ func (pa *Path) scheduleClose() { pa.closeTimerStarted = true } +// ConfName returns the configuration name of this path. +func (pa *Path) ConfName() string { + return pa.confName +} + +// Conf returns the configuration of this path. +func (pa *Path) Conf() *conf.PathConf { + return pa.conf +} + +// Name returns the name of this path. +func (pa *Path) Name() string { + return pa.name +} + +// SourceTrackCount returns the number of tracks of the source this path. +func (pa *Path) SourceTrackCount() int { + return pa.sourceTrackCount +} + +// OnSourceSetReady is called by a source. func (pa *Path) OnSourceSetReady(tracks gortsplib.Tracks) { pa.sourceSdp = tracks.Write() pa.sourceTrackCount = len(tracks) pa.sourceSetReady <- struct{}{} } +// OnSourceSetNotReady is called by a source. func (pa *Path) OnSourceSetNotReady() { pa.sourceSetNotReady <- struct{}{} } -func (pa *Path) ConfName() string { - return pa.confName -} - -func (pa *Path) Conf() *conf.PathConf { - return pa.conf -} - -func (pa *Path) Name() string { - return pa.name -} - -func (pa *Path) SourceTrackCount() int { - return pa.sourceTrackCount -} - +// OnPathManDescribe is called by pathman.PathMan. func (pa *Path) OnPathManDescribe(req ClientDescribeReq) { pa.clientDescribe <- req } +// OnPathManSetupPlay is called by pathman.PathMan. func (pa *Path) OnPathManSetupPlay(req ClientSetupPlayReq) { pa.clientSetupPlay <- req } +// OnPathManAnnounce is called by pathman.PathMan. func (pa *Path) OnPathManAnnounce(req ClientAnnounceReq) { pa.clientAnnounce <- req } +// OnClientRemove is called by client.Client. func (pa *Path) OnClientRemove(c *client.Client) { res := make(chan struct{}) pa.clientRemove <- clientRemoveReq{res, c} <-res } +// OnClientPlay is called by client.Client. func (pa *Path) OnClientPlay(c *client.Client) { res := make(chan struct{}) pa.clientPlay <- clientPlayReq{res, c} <-res } +// OnClientRecord is called by client.Client. func (pa *Path) OnClientRecord(c *client.Client) { res := make(chan struct{}) pa.clientRecord <- clientRecordReq{res, c} <-res } +// OnFrame is called by a source or by a client.Client. func (pa *Path) OnFrame(trackId int, streamType gortsplib.StreamType, buf []byte) { pa.readers.forwardFrame(trackId, streamType, buf) } diff --git a/internal/path/readersmap.go b/internal/path/readersmap.go index acdf5767..0ae7d9a2 100644 --- a/internal/path/readersmap.go +++ b/internal/path/readersmap.go @@ -7,29 +7,29 @@ import ( "github.com/aler9/gortsplib/base" ) -type Reader interface { +type reader interface { OnReaderFrame(int, base.StreamType, []byte) } type readersMap struct { mutex sync.RWMutex - ma map[Reader]struct{} + ma map[reader]struct{} } func newReadersMap() *readersMap { return &readersMap{ - ma: make(map[Reader]struct{}), + ma: make(map[reader]struct{}), } } -func (m *readersMap) add(reader Reader) { +func (m *readersMap) add(reader reader) { m.mutex.Lock() defer m.mutex.Unlock() m.ma[reader] = struct{}{} } -func (m *readersMap) remove(reader Reader) { +func (m *readersMap) remove(reader reader) { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/internal/pathman/pathman.go b/internal/pathman/pathman.go index 4371f312..25e6b223 100644 --- a/internal/pathman/pathman.go +++ b/internal/pathman/pathman.go @@ -15,10 +15,12 @@ import ( "github.com/aler9/rtsp-simple-server/internal/stats" ) +// Parent is implemented by program. type Parent interface { Log(string, ...interface{}) } +// PathManager is a path.Path manager. type PathManager struct { rtspPort int readTimeout time.Duration @@ -44,6 +46,7 @@ type PathManager struct { done chan struct{} } +// New allocates a PathManager. func New( rtspPort int, readTimeout time.Duration, @@ -78,6 +81,7 @@ func New( return pm } +// Close closes a PathManager. func (pm *PathManager) Close() { go func() { for range pm.clientClose { @@ -87,6 +91,7 @@ func (pm *PathManager) Close() { <-pm.done } +// Log is the main logging function. func (pm *PathManager) Log(format string, args ...interface{}) { pm.parent.Log(format, args...) } @@ -283,18 +288,22 @@ func (pm *PathManager) findPathConf(name string) (string, *conf.PathConf, error) return "", nil, fmt.Errorf("unable to find a valid configuration for path '%s'", name) } +// OnProgramConfReload is called by program. func (pm *PathManager) OnProgramConfReload(pathConfs map[string]*conf.PathConf) { pm.confReload <- pathConfs } +// OnPathClose is called by path.Path. func (pm *PathManager) OnPathClose(pa *path.Path) { pm.pathClose <- pa } +// OnPathClientClose is called by path.Path. func (pm *PathManager) OnPathClientClose(c *client.Client) { pm.clientClose <- c } +// OnClientDescribe is called by client.Client. func (pm *PathManager) OnClientDescribe(c *client.Client, pathName string, req *base.Request) (client.Path, error) { res := make(chan path.ClientDescribeRes) pm.clientDescribe <- path.ClientDescribeReq{res, c, pathName, req} @@ -302,6 +311,7 @@ func (pm *PathManager) OnClientDescribe(c *client.Client, pathName string, req * return re.Path, re.Err } +// OnClientAnnounce is called by client.Client. func (pm *PathManager) OnClientAnnounce(c *client.Client, pathName string, tracks gortsplib.Tracks, req *base.Request) (client.Path, error) { res := make(chan path.ClientAnnounceRes) pm.clientAnnounce <- path.ClientAnnounceReq{res, c, pathName, tracks, req} @@ -309,6 +319,7 @@ func (pm *PathManager) OnClientAnnounce(c *client.Client, pathName string, track return re.Path, re.Err } +// OnClientSetupPlay is called by client.Client. func (pm *PathManager) OnClientSetupPlay(c *client.Client, pathName string, trackId int, req *base.Request) (client.Path, error) { res := make(chan path.ClientSetupPlayRes) pm.clientSetupPlay <- path.ClientSetupPlayReq{res, c, pathName, trackId, req} @@ -316,6 +327,7 @@ func (pm *PathManager) OnClientSetupPlay(c *client.Client, pathName string, trac return re.Path, re.Err } +// ClientClose is called by client.Client. func (pm *PathManager) ClientClose() chan *client.Client { return pm.clientClose } diff --git a/internal/pprof/pprof.go b/internal/pprof/pprof.go index 8275e304..ecb87f10 100644 --- a/internal/pprof/pprof.go +++ b/internal/pprof/pprof.go @@ -4,6 +4,8 @@ import ( "context" "net" "net/http" + + // start pprof _ "net/http/pprof" ) @@ -11,15 +13,18 @@ const ( address = ":9999" ) +// Parent is implemented by program. type Parent interface { Log(string, ...interface{}) } +// Pprof is a performance metrics exporter. type Pprof struct { listener net.Listener server *http.Server } +// New allocates a Pprof. func New(parent Parent) (*Pprof, error) { listener, err := net.Listen("tcp", address) if err != nil { @@ -40,6 +45,7 @@ func New(parent Parent) (*Pprof, error) { return pp, nil } +// Close closes a Pprof. func (pp *Pprof) Close() { pp.server.Shutdown(context.Background()) } diff --git a/internal/servertcp/server.go b/internal/servertcp/server.go index 0e9c366f..6926cb80 100644 --- a/internal/servertcp/server.go +++ b/internal/servertcp/server.go @@ -4,10 +4,12 @@ import ( "net" ) +// Parent is implemented by program. type Parent interface { Log(string, ...interface{}) } +// Server is a RTSP TCP server. type Server struct { parent Parent @@ -18,6 +20,7 @@ type Server struct { done chan struct{} } +// New allocates a Server. func New(port int, parent Parent) (*Server, error) { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ Port: port, @@ -39,6 +42,7 @@ func New(port int, parent Parent) (*Server, error) { return s, nil } +// Close closes a Server. func (s *Server) Close() { go func() { for co := range s.accept { @@ -64,6 +68,7 @@ func (s *Server) run() { close(s.accept) } +// Accept returns a channel to accept incoming connections. func (s *Server) Accept() <-chan net.Conn { return s.accept } diff --git a/internal/serverudp/server.go b/internal/serverudp/server.go index bc6bf0f7..702fd73e 100644 --- a/internal/serverudp/server.go +++ b/internal/serverudp/server.go @@ -14,6 +14,7 @@ const ( readBufferSize = 2048 ) +// Publisher is implemented by client.Client. type Publisher interface { OnUdpPublisherFrame(int, base.StreamType, []byte) } @@ -28,6 +29,7 @@ type bufAddrPair struct { addr *net.UDPAddr } +// Parent is implemented by program. type Parent interface { Log(string, ...interface{}) } @@ -48,6 +50,7 @@ func (p *publisherAddr) fill(ip net.IP, port int) { } } +// Server is a RTSP UDP server. type Server struct { writeTimeout time.Duration streamType gortsplib.StreamType @@ -64,6 +67,7 @@ type Server struct { done chan struct{} } +// New allocates a Server. func New(writeTimeout time.Duration, port int, streamType gortsplib.StreamType, @@ -98,6 +102,7 @@ func New(writeTimeout time.Duration, return s, nil } +// Close closes a Server. func (s *Server) Close() { s.pc.Close() <-s.done @@ -142,14 +147,17 @@ func (s *Server) run() { <-writeDone } +// Port returns the server local port. func (s *Server) Port() int { return s.pc.LocalAddr().(*net.UDPAddr).Port } +// Write writes a UDP packet. func (s *Server) Write(data []byte, addr *net.UDPAddr) { s.write <- bufAddrPair{data, addr} } +// AddPublisher adds a publisher. func (s *Server) AddPublisher(ip net.IP, port int, publisher Publisher, trackId int) { s.publishersMutex.Lock() defer s.publishersMutex.Unlock() @@ -163,6 +171,7 @@ func (s *Server) AddPublisher(ip net.IP, port int, publisher Publisher, trackId } } +// RemovePublisher removes a publisher. func (s *Server) RemovePublisher(ip net.IP, port int, publisher Publisher) { s.publishersMutex.Lock() defer s.publishersMutex.Unlock() diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index a085d9a9..b5aee9b9 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -21,6 +21,7 @@ const ( retryPause = 5 * time.Second ) +// Parent is implemeneted by path.Path. type Parent interface { Log(string, ...interface{}) OnSourceSetReady(gortsplib.Tracks) @@ -28,6 +29,7 @@ type Parent interface { OnFrame(int, gortsplib.StreamType, []byte) } +// Source is a RTMP source. type Source struct { ur string state bool @@ -39,6 +41,7 @@ type Source struct { terminate chan struct{} } +// New allocates a Source. func New(ur string, wg *sync.WaitGroup, stats *stats.Stats, @@ -59,14 +62,17 @@ func New(ur string, return s } +// Close closes a Source. func (s *Source) Close() { atomic.AddInt64(s.stats.CountSourcesRtmpRunning, -1) s.parent.Log("rtmp source stopped") close(s.terminate) } +// IsSource implements path.source. func (s *Source) IsSource() {} +// IsSourceExternal implements path.sourceExternal. func (s *Source) IsSourceExternal() {} func (s *Source) run() { diff --git a/internal/sourcertsp/source.go b/internal/sourcertsp/source.go index a427e1e6..b4c3ce5b 100644 --- a/internal/sourcertsp/source.go +++ b/internal/sourcertsp/source.go @@ -14,6 +14,7 @@ const ( retryPause = 5 * time.Second ) +// Parent is implemented by path.Path. type Parent interface { Log(string, ...interface{}) OnSourceSetReady(gortsplib.Tracks) @@ -21,6 +22,7 @@ type Parent interface { OnFrame(int, gortsplib.StreamType, []byte) } +// Source is a RTSP source. type Source struct { ur string proto gortsplib.StreamProtocol @@ -37,6 +39,7 @@ type Source struct { done chan struct{} } +// New allocates a Source. func New(ur string, proto gortsplib.StreamProtocol, readTimeout time.Duration, @@ -63,14 +66,17 @@ func New(ur string, return s } +// Close closes a Source. func (s *Source) Close() { atomic.AddInt64(s.stats.CountSourcesRtsp, -1) s.parent.Log("rtsp source stopped") close(s.terminate) } +// IsSource implements path.source. func (s *Source) IsSource() {} +// IsSourceExternal implements path.sourceExternal. func (s *Source) IsSourceExternal() {} func (s *Source) run() { diff --git a/internal/stats/stats.go b/internal/stats/stats.go index 24422e5c..6491ae0c 100644 --- a/internal/stats/stats.go +++ b/internal/stats/stats.go @@ -5,6 +5,7 @@ func ptrInt64() *int64 { return &v } +// Stats contains statistics. type Stats struct { // use pointers to avoid a crash on 32bit platforms // https://github.com/golang/go/issues/9959 @@ -17,6 +18,7 @@ type Stats struct { CountSourcesRtmpRunning *int64 } +// New allocates a Stats. func New() *Stats { return &Stats{ CountClients: ptrInt64(), diff --git a/internal/syslog/syslog_unix.go b/internal/syslog/syslog_unix.go index 87f1184c..1c5c8440 100644 --- a/internal/syslog/syslog_unix.go +++ b/internal/syslog/syslog_unix.go @@ -11,6 +11,7 @@ type syslog struct { inner *native.Writer } +// New allocates a io.WriteCloser that writes to the system log. func New(prefix string) (io.WriteCloser, error) { inner, err := native.New(native.LOG_INFO|native.LOG_DAEMON, prefix) if err != nil { @@ -22,10 +23,12 @@ func New(prefix string) (io.WriteCloser, error) { }, nil } +// Close implements io.WriteCloser. func (ls *syslog) Close() error { return ls.inner.Close() } +// Write implements io.WriteCloser. func (ls *syslog) Write(p []byte) (int, error) { return ls.inner.Write(p) } diff --git a/internal/syslog/syslog_win.go b/internal/syslog/syslog_win.go index 0e2929d1..7f23156b 100644 --- a/internal/syslog/syslog_win.go +++ b/internal/syslog/syslog_win.go @@ -10,14 +10,17 @@ import ( type syslog struct { } +// New allocates a io.WriteCloser that writes to the system log. func New(prefix string) (io.WriteCloser, error) { return nil, fmt.Errorf("not implemented on windows") } +// Close implements io.WriteCloser. func (ls *syslog) Close() error { return nil } +// Write implements io.WriteCloser. func (ls *syslog) Write(p []byte) (int, error) { return 0, nil } diff --git a/main.go b/main.go index 1f5b9108..da4ecefc 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( "github.com/aler9/rtsp-simple-server/internal/stats" ) +// Version can be overridden by build flags. var Version = "v0.0.0" type program struct {