fix race condition when reading tcp streams

This commit is contained in:
aler9 2020-09-06 16:34:18 +02:00
parent d71fd25916
commit 1065fa010b
3 changed files with 35 additions and 4 deletions

View File

@ -57,6 +57,11 @@ type clientFrameTCPReq struct {
buf []byte
}
type readRequestPair struct {
req *gortsplib.Request
res chan error
}
type udpClient struct {
client *client
trackId int
@ -983,6 +988,9 @@ func (c *client) runPlayUDP() {
}
func (c *client) runPlayTCP() {
readRequest := make(chan readRequestPair)
defer close(readRequest)
readDone := make(chan error)
go func() {
frame := &gortsplib.InterleavedFrame{}
@ -1003,7 +1011,9 @@ func (c *client) runPlayTCP() {
// rtcp feedback is handled by gortsplib
case *gortsplib.Request:
err := c.handleRequest(recvt)
res := make(chan error)
readRequest <- readRequestPair{recvt, res}
err := <-res
if err != nil {
readDone <- err
break
@ -1014,6 +1024,10 @@ func (c *client) runPlayTCP() {
for {
select {
// responses must be written in the same routine of frames
case req := <-readRequest:
req.res <- c.handleRequest(req.req)
case err := <-readDone:
c.conn.Close()
if err != io.EOF && err != errRunTerminate {
@ -1031,6 +1045,11 @@ func (c *client) runPlayTCP() {
c.conn.WriteFrame(frame)
case <-c.terminate:
go func() {
for req := range readRequest {
req.res <- fmt.Errorf("terminated")
}
}()
c.conn.Close()
<-readDone
return
@ -1173,6 +1192,9 @@ func (c *client) runRecordTCP() {
frame := &gortsplib.InterleavedFrame{}
readBuf := newMultiBuffer(3, clientTCPReadBufferSize)
readRequest := make(chan readRequestPair)
defer close(readRequest)
readDone := make(chan error)
go func() {
for {
@ -1215,6 +1237,10 @@ func (c *client) runRecordTCP() {
for {
select {
// responses must be written in the same routine of receiver reports
case req := <-readRequest:
req.res <- c.handleRequest(req.req)
case err := <-readDone:
c.conn.Close()
if err != io.EOF && err != errRunTerminate {
@ -1235,6 +1261,11 @@ func (c *client) runRecordTCP() {
}
case <-c.terminate:
go func() {
for req := range readRequest {
req.res <- fmt.Errorf("terminated")
}
}()
c.conn.Close()
<-readDone
return

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.12
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200906111848-a4d59365fbab
github.com/aler9/gortsplib v0.0.0-20200906125342-391bc2441f59
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/stretchr/testify v1.6.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200906111848-a4d59365fbab h1:S+A5atjNLtej/3gF4YFLBmIHXCnAefdFNEmc0SLJ6/g=
github.com/aler9/gortsplib v0.0.0-20200906111848-a4d59365fbab/go.mod h1:XybE/Zt1yFtnNEjNhAyg2w6VjD8aJ79GFfpSjkfLNd8=
github.com/aler9/gortsplib v0.0.0-20200906125342-391bc2441f59 h1:FT9kSPp7+cI9ClakRXnuDWpcVFgy9BxcZwvJGN8+WJ8=
github.com/aler9/gortsplib v0.0.0-20200906125342-391bc2441f59/go.mod h1:XybE/Zt1yFtnNEjNhAyg2w6VjD8aJ79GFfpSjkfLNd8=
github.com/aler9/sdp-dirty/v3 v3.0.0-20200905103724-214b7cc25cfd h1:s/l20rPNGiyjggMdkhsLu0aQ0K0OFcROUMBDu7fGT+I=
github.com/aler9/sdp-dirty/v3 v3.0.0-20200905103724-214b7cc25cfd/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=