remove redundant stream check mechanism in case of tcp publishers

This commit is contained in:
aler9 2020-07-28 15:10:33 +02:00
parent 4c87cdaa9a
commit efa31937c9
4 changed files with 71 additions and 83 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.12
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200719202520-de32b1f15ecb
github.com/aler9/gortsplib v0.0.0-20200728125613-6edf6a8f9e09
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436
github.com/stretchr/testify v1.6.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200719202520-de32b1f15ecb h1:R9F835QLbnfLQrOoHZULCrASRC23287Lb6v5LpOt0TY=
github.com/aler9/gortsplib v0.0.0-20200719202520-de32b1f15ecb/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY=
github.com/aler9/gortsplib v0.0.0-20200728125613-6edf6a8f9e09 h1:Oqs9cVlb/cgeh/jDU/thamzvHESb3cjy04vgGXo4we0=
github.com/aler9/gortsplib v0.0.0-20200728125613-6edf6a8f9e09/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY=
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc=
github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

View File

@ -538,7 +538,7 @@ func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.St
func main() {
_, err := newProgram(os.Args[1:], os.Stdin)
if err != nil {
log.Fatal("ERR: ", err)
log.Fatal("ERR:", err)
}
select {}

View File

@ -917,85 +917,7 @@ func (c *serverClient) runRecord(path string) {
}
}
if c.streamProtocol == gortsplib.StreamProtocolTcp {
frame := &gortsplib.InterleavedFrame{}
readDone := make(chan error)
go func() {
for {
frame.Content = c.readBuf.swap()
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame)
if err != nil {
readDone <- err
break
}
switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame:
if frame.TrackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", frame.TrackId)
readDone <- nil
break
}
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.events <- programEventClientFrameTcp{
c.path,
frame.TrackId,
frame.StreamType,
frame.Content,
}
case *gortsplib.Request:
ok := c.handleRequest(recvt)
if !ok {
readDone <- nil
break
}
}
}
}()
checkStreamTicker := time.NewTicker(clientCheckStreamInterval)
receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
outer1:
for {
select {
case err := <-readDone:
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
}
break outer1
case <-checkStreamTicker.C:
for trackId := range c.streamTracks {
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout {
c.log("ERR: stream is dead")
c.conn.NetConn().Close()
<-readDone
break outer1
}
}
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
frame := c.rtcpReceivers[trackId].Report()
c.conn.WriteFrame(&gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: gortsplib.StreamTypeRtcp,
Content: frame,
})
}
}
}
checkStreamTicker.Stop()
receiverReportTicker.Stop()
} else {
if c.streamProtocol == gortsplib.StreamProtocolUdp {
readDone := make(chan error)
go func() {
for {
@ -1052,6 +974,72 @@ func (c *serverClient) runRecord(path string) {
checkStreamTicker.Stop()
receiverReportTicker.Stop()
} else {
frame := &gortsplib.InterleavedFrame{}
readDone := make(chan error)
go func() {
for {
frame.Content = c.readBuf.swap()
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.conn.ReadFrameOrRequest(frame)
if err != nil {
readDone <- err
break
}
switch recvt := recv.(type) {
case *gortsplib.InterleavedFrame:
if frame.TrackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", frame.TrackId)
readDone <- nil
break
}
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
c.p.events <- programEventClientFrameTcp{
c.path,
frame.TrackId,
frame.StreamType,
frame.Content,
}
case *gortsplib.Request:
ok := c.handleRequest(recvt)
if !ok {
readDone <- nil
break
}
}
}
}()
receiverReportTicker := time.NewTicker(clientReceiverReportInterval)
outer1:
for {
select {
case err := <-readDone:
if err != nil && err != io.EOF {
c.log("ERR: %s", err)
}
break outer1
case <-receiverReportTicker.C:
for trackId := range c.streamTracks {
frame := c.rtcpReceivers[trackId].Report()
c.conn.WriteFrame(&gortsplib.InterleavedFrame{
TrackId: trackId,
StreamType: gortsplib.StreamTypeRtcp,
Content: frame,
})
}
}
}
receiverReportTicker.Stop()
}
done = make(chan struct{})