From b0ac164530dc9dcff8ef9e39f6398ae496247993 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 10 May 2020 16:23:57 +0200 Subject: [PATCH] add serverTcpListener.close() --- server-client.go | 46 ++++++++++++++++++++++++++-------------------- server-tcpl.go | 32 +++++++++++++++++++++++++++----- server-udpl.go | 2 +- 3 files changed, 54 insertions(+), 26 deletions(-) diff --git a/server-client.go b/server-client.go index 1df104c6..62edbf30 100644 --- a/server-client.go +++ b/server-client.go @@ -114,6 +114,7 @@ type serverClient struct { streamProtocol streamProtocol streamTracks []*track write chan *gortsplib.InterleavedFrame + done chan struct{} } func newServerClient(p *program, nconn net.Conn) *serverClient { @@ -126,12 +127,15 @@ func newServerClient(p *program, nconn net.Conn) *serverClient { }), state: _CLIENT_STATE_STARTING, write: make(chan *gortsplib.InterleavedFrame), + done: make(chan struct{}), } c.p.tcpl.mutex.Lock() c.p.tcpl.clients[c] = struct{}{} c.p.tcpl.mutex.Unlock() + go c.run() + return c } @@ -176,24 +180,6 @@ func (c *serverClient) zone() string { } func (c *serverClient) run() { - defer func() { - if c.p.args.postScript != "" { - postScript := exec.Command(c.p.args.postScript) - err := postScript.Run() - if err != nil { - c.log("ERR: %s", err) - } - } - }() - - defer c.log("disconnected") - - defer func() { - c.p.tcpl.mutex.Lock() - defer c.p.tcpl.mutex.Unlock() - c.close() - }() - c.log("connected") if c.p.args.preScript != "" { @@ -210,14 +196,34 @@ func (c *serverClient) run() { if err != io.EOF { c.log("ERR: %s", err) } - return + break } ok := c.handleRequest(req) if !ok { - return + break } } + + func() { + c.p.tcpl.mutex.Lock() + defer c.p.tcpl.mutex.Unlock() + c.close() + }() + + c.log("disconnected") + + func() { + if c.p.args.postScript != "" { + postScript := exec.Command(c.p.args.postScript) + err := postScript.Run() + if err != nil { + c.log("ERR: %s", err) + } + } + }() + + close(c.done) } func (c *serverClient) writeResError(req *gortsplib.Request, code gortsplib.StatusCode, err error) { diff --git a/server-tcpl.go b/server-tcpl.go index cb73cb1f..623a19d8 100644 --- a/server-tcpl.go +++ b/server-tcpl.go @@ -14,6 +14,7 @@ type serverTcpListener struct { mutex sync.RWMutex clients map[*serverClient]struct{} publishers map[string]*serverClient + done chan struct{} } func newServerTcpListener(p *program) (*serverTcpListener, error) { @@ -24,15 +25,16 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) { return nil, err } - s := &serverTcpListener{ + l := &serverTcpListener{ p: p, nconn: nconn, clients: make(map[*serverClient]struct{}), publishers: make(map[string]*serverClient), + done: make(chan struct{}), } - s.log("opened on :%d", p.args.rtspPort) - return s, nil + l.log("opened on :%d", p.args.rtspPort) + return l, nil } func (l *serverTcpListener) log(format string, args ...interface{}) { @@ -46,9 +48,29 @@ func (l *serverTcpListener) run() { break } - rsc := newServerClient(l.p, nconn) - go rsc.run() + newServerClient(l.p, nconn) } + + // close clients + var doneChans []chan struct{} + func() { + l.mutex.Lock() + defer l.mutex.Unlock() + for c := range l.clients { + c.close() + doneChans = append(doneChans, c.done) + } + }() + for _, c := range doneChans { + <-c + } + + close(l.done) +} + +func (l *serverTcpListener) close() { + l.nconn.Close() + <-l.done } func (l *serverTcpListener) forwardTrack(path string, id int, flow trackFlow, frame []byte) { diff --git a/server-udpl.go b/server-udpl.go index 146130d2..cd1da2c5 100644 --- a/server-udpl.go +++ b/server-udpl.go @@ -102,7 +102,7 @@ func (l *serverUdpListener) run() { close(l.write) - l.done <- struct{}{} + close(l.done) } func (l *serverUdpListener) close() {