rename publisher into source

This commit is contained in:
aler9 2020-10-13 20:00:40 +02:00
parent 2626867369
commit 810643ab30
5 changed files with 55 additions and 55 deletions

View File

@ -159,11 +159,11 @@ func (c *client) close() {
}
}
c.path.onPublisherSetNotReady()
c.path.onSourceSetNotReady()
}
if c.path != nil && c.path.publisher == c {
c.path.onPublisherRemove()
if c.path != nil && c.path.source == c {
c.path.onSourceRemove()
}
close(c.terminate)
@ -175,7 +175,7 @@ func (c *client) log(format string, args ...interface{}) {
c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
}
func (c *client) isPublisher() {}
func (c *client) isSource() {}
func (c *client) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
@ -642,7 +642,7 @@ func (c *client) handleRequest(req *base.Request) error {
return errRunTerminate
}
if len(c.streamTracks) >= c.path.publisherTrackCount {
if len(c.streamTracks) >= c.path.sourceTrackCount {
c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return errRunTerminate
}
@ -697,7 +697,7 @@ func (c *client) handleRequest(req *base.Request) error {
return errRunTerminate
}
if len(c.streamTracks) >= c.path.publisherTrackCount {
if len(c.streamTracks) >= c.path.sourceTrackCount {
c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("all the tracks have already been setup"))
return errRunTerminate
}
@ -781,7 +781,7 @@ func (c *client) handleRequest(req *base.Request) error {
return errRunTerminate
}
if len(c.streamTracks) != c.path.publisherTrackCount {
if len(c.streamTracks) != c.path.sourceTrackCount {
c.writeResError(cseq, base.StatusBadRequest, fmt.Errorf("not all tracks have been setup"))
return errRunTerminate
}

22
main.go
View File

@ -243,15 +243,15 @@ outer:
p.paths[req.pathName] = newPath(p, req.pathName, req.pathConf)
} else {
if path.publisher != nil {
if path.source != nil {
req.res <- fmt.Errorf("someone is already publishing on path '%s'", req.pathName)
continue
}
}
p.paths[req.pathName].publisher = req.client
p.paths[req.pathName].publisherTrackCount = req.trackCount
p.paths[req.pathName].publisherSdp = req.sdp
p.paths[req.pathName].source = req.client
p.paths[req.pathName].sourceTrackCount = req.trackCount
p.paths[req.pathName].sourceSdp = req.sdp
req.client.path = p.paths[req.pathName]
req.client.state = clientStatePreRecord
@ -259,12 +259,12 @@ outer:
case req := <-p.clientSetupPlay:
path, ok := p.paths[req.pathName]
if !ok || !path.publisherReady {
if !ok || !path.sourceReady {
req.res <- fmt.Errorf("no one is publishing on path '%s'", req.pathName)
continue
}
if req.trackId >= path.publisherTrackCount {
if req.trackId >= path.sourceTrackCount {
req.res <- fmt.Errorf("track %d does not exist", req.trackId)
continue
}
@ -300,19 +300,19 @@ outer:
}
}
client.path.onPublisherSetReady()
client.path.onSourceSetReady()
case s := <-p.sourceRtspReady:
s.path.onPublisherSetReady()
s.path.onSourceSetReady()
case s := <-p.sourceRtspNotReady:
s.path.onPublisherSetNotReady()
s.path.onSourceSetNotReady()
case s := <-p.sourceRtmpReady:
s.path.onPublisherSetReady()
s.path.onSourceSetReady()
case s := <-p.sourceRtmpNotReady:
s.path.onPublisherSetNotReady()
s.path.onSourceSetNotReady()
case <-p.terminate:
break outer

62
path.go
View File

@ -13,19 +13,19 @@ const (
onDemandCmdStopAfterDescribePeriod = 10 * time.Second
)
// a publisher can be a client, a sourceRtsp or a sourceRtmp
type publisher interface {
isPublisher()
// a source can be a client, a sourceRtsp or a sourceRtmp
type source interface {
isSource()
}
type path struct {
p *program
name string
conf *pathConf
publisher publisher
publisherReady bool
publisherTrackCount int
publisherSdp []byte
source source
sourceReady bool
sourceTrackCount int
sourceSdp []byte
lastDescribeReq time.Time
lastDescribeActivation time.Time
onInitCmd *externalCmd
@ -41,11 +41,11 @@ func newPath(p *program, name string, conf *pathConf) *path {
if strings.HasPrefix(conf.Source, "rtsp://") {
s := newSourceRtsp(p, pa)
pa.publisher = s
pa.source = s
} else if strings.HasPrefix(conf.Source, "rtmp://") {
s := newSourceRtmp(p, pa)
pa.publisher = s
pa.source = s
}
return pa
@ -56,10 +56,10 @@ func (pa *path) log(format string, args ...interface{}) {
}
func (pa *path) onInit() {
if source, ok := pa.publisher.(*sourceRtsp); ok {
if source, ok := pa.source.(*sourceRtsp); ok {
go source.run(source.state)
} else if source, ok := pa.publisher.(*sourceRtmp); ok {
} else if source, ok := pa.source.(*sourceRtmp); ok {
go source.run(source.state)
}
@ -75,11 +75,11 @@ func (pa *path) onInit() {
}
func (pa *path) onClose(wait bool) {
if source, ok := pa.publisher.(*sourceRtsp); ok {
if source, ok := pa.source.(*sourceRtsp); ok {
close(source.terminate)
<-source.done
} else if source, ok := pa.publisher.(*sourceRtmp); ok {
} else if source, ok := pa.source.(*sourceRtmp); ok {
close(source.terminate)
<-source.done
}
@ -131,7 +131,7 @@ func (pa *path) hasClientsWaitingDescribe() bool {
func (pa *path) hasClientReaders() bool {
for c := range pa.p.clients {
if c.path == pa && c != pa.publisher {
if c.path == pa && c != pa.source {
return true
}
}
@ -153,7 +153,7 @@ func (pa *path) onCheck() {
}
// stop on demand rtsp source if needed
if source, ok := pa.publisher.(*sourceRtsp); ok {
if source, ok := pa.source.(*sourceRtsp); ok {
if pa.conf.SourceOnDemand &&
source.state == sourceRtspStateRunning &&
!pa.hasClients() &&
@ -165,7 +165,7 @@ func (pa *path) onCheck() {
}
// stop on demand rtmp source if needed
} else if source, ok := pa.publisher.(*sourceRtmp); ok {
} else if source, ok := pa.source.(*sourceRtmp); ok {
if pa.conf.SourceOnDemand &&
source.state == sourceRtmpStateRunning &&
!pa.hasClients() &&
@ -188,28 +188,28 @@ func (pa *path) onCheck() {
// remove regular expression paths
if pa.conf.regexp != nil &&
pa.publisher == nil &&
pa.source == nil &&
!pa.hasClients() {
pa.onClose(false)
delete(pa.p.paths, pa.name)
}
}
func (pa *path) onPublisherRemove() {
pa.publisher = nil
func (pa *path) onSourceRemove() {
pa.source = nil
// close all clients that are reading or waiting for reading
for c := range pa.p.clients {
if c.path == pa &&
c.state != clientStateWaitDescription &&
c != pa.publisher {
c != pa.source {
c.close()
}
}
}
func (pa *path) onPublisherSetReady() {
pa.publisherReady = true
func (pa *path) onSourceSetReady() {
pa.sourceReady = true
// reply to all clients that are waiting for a description
for c := range pa.p.clients {
@ -217,19 +217,19 @@ func (pa *path) onPublisherSetReady() {
c.path == pa {
c.path = nil
c.state = clientStateInitial
c.describe <- describeRes{pa.publisherSdp, nil}
c.describe <- describeRes{pa.sourceSdp, nil}
}
}
}
func (pa *path) onPublisherSetNotReady() {
pa.publisherReady = false
func (pa *path) onSourceSetNotReady() {
pa.sourceReady = false
// close all clients that are reading or waiting for reading
for c := range pa.p.clients {
if c.path == pa &&
c.state != clientStateWaitDescription &&
c != pa.publisher {
c != pa.source {
c.close()
}
}
@ -239,7 +239,7 @@ func (pa *path) onDescribe(client *client) {
pa.lastDescribeReq = time.Now()
// publisher not found
if pa.publisher == nil {
if pa.source == nil {
// on demand command is available: put the client on hold
if pa.conf.RunOnDemand != "" {
if pa.onDemandCmd == nil { // start if needed
@ -262,9 +262,9 @@ func (pa *path) onDescribe(client *client) {
}
// publisher was found but is not ready: put the client on hold
} else if !pa.publisherReady {
} else if !pa.sourceReady {
// start rtsp source if needed
if source, ok := pa.publisher.(*sourceRtsp); ok {
if source, ok := pa.source.(*sourceRtsp); ok {
if source.state == sourceRtspStateStopped {
pa.log("starting on demand rtsp source")
pa.lastDescribeActivation = time.Now()
@ -274,7 +274,7 @@ func (pa *path) onDescribe(client *client) {
}
// start rtmp source if needed
} else if source, ok := pa.publisher.(*sourceRtmp); ok {
} else if source, ok := pa.source.(*sourceRtmp); ok {
if source.state == sourceRtmpStateStopped {
pa.log("starting on demand rtmp source")
pa.lastDescribeActivation = time.Now()
@ -289,6 +289,6 @@ func (pa *path) onDescribe(client *client) {
// publisher was found and is ready
} else {
client.describe <- describeRes{pa.publisherSdp, nil}
client.describe <- describeRes{pa.sourceSdp, nil}
}
}

View File

@ -59,7 +59,7 @@ func newSourceRtmp(p *program, path *path) *sourceRtmp {
return s
}
func (s *sourceRtmp) isPublisher() {}
func (s *sourceRtmp) isSource() {}
func (s *sourceRtmp) run(initialState sourceRtmpState) {
s.applyState(initialState)
@ -248,8 +248,8 @@ func (s *sourceRtmp) runInnerInner() bool {
return true
}
s.path.publisherSdp = tracks.Write()
s.path.publisherTrackCount = len(tracks)
s.path.sourceSdp = tracks.Write()
s.path.sourceTrackCount = len(tracks)
s.p.sourceRtmpReady <- s
s.path.log("rtmp source ready")

View File

@ -54,7 +54,7 @@ func newSourceRtsp(p *program, path *path) *sourceRtsp {
return s
}
func (s *sourceRtsp) isPublisher() {}
func (s *sourceRtsp) isSource() {}
func (s *sourceRtsp) run(initialState sourceRtspState) {
s.applyState(initialState)
@ -161,8 +161,8 @@ func (s *sourceRtsp) runInnerInner() bool {
}
// create a filtered SDP that is used by the server (not by the client)
s.path.publisherSdp = tracks.Write()
s.path.publisherTrackCount = len(tracks)
s.path.sourceSdp = tracks.Write()
s.path.sourceTrackCount = len(tracks)
s.tracks = tracks
if s.path.conf.sourceProtocolParsed == gortsplib.StreamProtocolUDP {