diff --git a/client.go b/client.go index 99633aaf..3e50a9a1 100644 --- a/client.go +++ b/client.go @@ -50,32 +50,21 @@ func makeUdpClientAddr(ip net.IP, port int) udpClientAddr { return ret } -type describeRes struct { - sdp []byte - err error -} - type clientTrack struct { rtpPort int rtcpPort int } -type clientEvent interface { - isServerClientEvent() +type describeRes struct { + sdp []byte + err error } -type clientEventFrameTcp struct { - frame *gortsplib.InterleavedFrame -} - -func (clientEventFrameTcp) isServerClientEvent() {} - type clientState int const ( clientStateInitial clientState = iota - clientStateWaitingDescription - clientStateAnnounce + clientStateWaitDescription clientStatePrePlay clientStatePlay clientStatePreRecord @@ -85,24 +74,24 @@ const ( func (cs clientState) String() string { switch cs { case clientStateInitial: - return "INITIAL" + return "Initial" - case clientStateAnnounce: - return "ANNOUNCE" + case clientStateWaitDescription: + return "WaitDescription" case clientStatePrePlay: - return "PRE_PLAY" + return "PrePlay" case clientStatePlay: - return "PLAY" + return "Play" case clientStatePreRecord: - return "PRE_RECORD" + return "PreRecord" case clientStateRecord: - return "RECORD" + return "Record" } - return "UNKNOWN" + return "Invalid" } type client struct { @@ -117,10 +106,13 @@ type client struct { streamProtocol gortsplib.StreamProtocol streamTracks map[int]*clientTrack rtcpReceivers []*gortsplib.RtcpReceiver + describeCSeq gortsplib.HeaderValue + describeUrl string - describeRes chan describeRes - events chan clientEvent // only if state = Play and gortsplib.StreamProtocol = TCP - done chan struct{} + describe chan describeRes + tcpFrame chan *gortsplib.InterleavedFrame + terminate chan struct{} + done chan struct{} } func newClient(p *program, nconn net.Conn) *client { @@ -133,6 +125,9 @@ func newClient(p *program, nconn net.Conn) *client { }), state: clientStateInitial, streamTracks: make(map[int]*clientTrack), + describe: make(chan describeRes), + tcpFrame: make(chan *gortsplib.InterleavedFrame), + terminate: make(chan struct{}), done: make(chan struct{}), } @@ -154,6 +149,11 @@ func (c *client) zone() string { return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone } +var errRunTerminate = errors.New("terminate") +var errRunWaitDescription = errors.New("wait description") +var errRunPlay = errors.New("play") +var errRunRecord = errors.New("record") + func (c *client) run() { var onConnectCmd *exec.Cmd if c.p.conf.RunOnConnect != "" { @@ -166,34 +166,20 @@ func (c *client) run() { } } -outer: for { - req, err := c.conn.ReadRequest() - if err != nil { - if err != io.EOF { - c.log("ERR: %s", err) - } - break outer - } - - ok := c.handleRequest(req) - if !ok { - break outer + if !c.runInitial() { + break } } - done := make(chan struct{}) - c.p.events <- programEventClientClose{done, c} - <-done - - c.conn.NetConn().Close() // close socket in case it has not been closed yet - if onConnectCmd != nil { onConnectCmd.Process.Signal(os.Interrupt) onConnectCmd.Wait() } - close(c.done) // close() never blocks + close(c.describe) + close(c.tcpFrame) + close(c.done) } func (c *client) writeResError(cseq gortsplib.HeaderValue, code gortsplib.StatusCode, err error) { @@ -288,21 +274,21 @@ func (c *client) authenticate(ips []interface{}, user string, pass string, req * return nil } -func (c *client) handleRequest(req *gortsplib.Request) bool { +func (c *client) handleRequest(req *gortsplib.Request) error { c.log(string(req.Method)) cseq, ok := req.Header["CSeq"] if !ok || len(cseq) != 1 { c.writeResError(nil, gortsplib.StatusBadRequest, fmt.Errorf("cseq missing")) - return false + return errRunTerminate } - path := req.Url.Path - if len(path) < 1 || path[0] != '/' { + pathName := req.Url.Path + if len(pathName) < 1 || pathName[0] != '/' { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path must begin with a slash")) - return false + return errRunTerminate } - path = path[1:] // strip leading slash + pathName = pathName[1:] // strip leading slash switch req.Method { case gortsplib.OPTIONS: @@ -320,103 +306,91 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { }, ", ")}, }, }) - return true + return nil case gortsplib.DESCRIBE: if c.state != clientStateInitial { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial)) - return false + return errRunTerminate } - confp := c.p.findConfForPathName(path) + confp := c.p.findConfForPathName(pathName) if confp == nil { c.writeResError(cseq, gortsplib.StatusBadRequest, - fmt.Errorf("unable to find a valid configuration for path '%s'", path)) - return false + fmt.Errorf("unable to find a valid configuration for path '%s'", pathName)) + return errRunTerminate } err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req) if err != nil { if err == errAuthCritical { - return false + return errRunTerminate } - return true + return nil } - c.describeRes = make(chan describeRes) - c.p.events <- programEventClientDescribe{c, path} - describeRes := <-c.describeRes - if describeRes.err != nil { - c.writeResError(cseq, gortsplib.StatusNotFound, describeRes.err) - return false - } + c.p.events <- programEventClientDescribe{c, pathName} - c.conn.WriteResponse(&gortsplib.Response{ - StatusCode: gortsplib.StatusOK, - Header: gortsplib.Header{ - "CSeq": cseq, - "Content-Base": gortsplib.HeaderValue{req.Url.String() + "/"}, - "Content-Type": gortsplib.HeaderValue{"application/sdp"}, - }, - Content: describeRes.sdp, - }) - return true + c.describeCSeq = cseq + c.describeUrl = req.Url.String() + + return errRunWaitDescription case gortsplib.ANNOUNCE: if c.state != clientStateInitial { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStateInitial)) - return false + return errRunTerminate } - if len(path) == 0 { + if len(pathName) == 0 { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("empty base path")) - return false + return errRunTerminate } - err := checkPathName(path) + err := checkPathName(pathName) if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid path name: %s (%s)", err, path)) - return false + c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid path name: %s (%s)", err, pathName)) + return errRunTerminate } - confp := c.p.findConfForPathName(path) + confp := c.p.findConfForPathName(pathName) if confp == nil { c.writeResError(cseq, gortsplib.StatusBadRequest, - fmt.Errorf("unable to find a valid configuration for path '%s'", path)) - return false + fmt.Errorf("unable to find a valid configuration for path '%s'", pathName)) + return errRunTerminate } err = c.authenticate(confp.publishIpsParsed, confp.PublishUser, confp.PublishPass, req) if err != nil { if err == errAuthCritical { - return false + return errRunTerminate } - return true + return nil } ct, ok := req.Header["Content-Type"] if !ok || len(ct) != 1 { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("Content-Type header missing")) - return false + return errRunTerminate } if ct[0] != "application/sdp" { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("unsupported Content-Type '%s'", ct)) - return false + return errRunTerminate } sdpParsed := &sdp.SessionDescription{} err = sdpParsed.Unmarshal(req.Content) if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid SDP: %s", err)) - return false + return errRunTerminate } if len(sdpParsed.MediaDescriptions) == 0 { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("no tracks defined")) - return false + return errRunTerminate } var tracks []*gortsplib.Track @@ -429,11 +403,11 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { sdpParsed, req.Content = sdpForServer(tracks) res := make(chan error) - c.p.events <- programEventClientAnnounce{res, c, path, req.Content, sdpParsed} + c.p.events <- programEventClientAnnounce{res, c, pathName, req.Content, sdpParsed} err = <-res if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, err) - return false + return errRunTerminate } c.conn.WriteResponse(&gortsplib.Response{ @@ -442,24 +416,24 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { "CSeq": cseq, }, }) - return true + return nil case gortsplib.SETUP: th, err := gortsplib.ReadHeaderTransport(req.Header["Transport"]) if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header: %s", err)) - return false + return errRunTerminate } if _, ok := th["multicast"]; ok { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("multicast is not supported")) - return false + return errRunTerminate } - basePath, controlPath, err := splitPath(path) + basePath, controlPath, err := splitPath(pathName) if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, err) - return false + return errRunTerminate } switch c.state { @@ -469,65 +443,55 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { if confp == nil { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("unable to find a valid configuration for path '%s'", basePath)) - return false + return errRunTerminate } err := c.authenticate(confp.readIpsParsed, confp.ReadUser, confp.ReadPass, req) if err != nil { if err == errAuthCritical { - return false + return errRunTerminate } - return true + return nil } if c.path != nil && basePath != c.path.name { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath)) - return false + return errRunTerminate } if !strings.HasPrefix(controlPath, "trackID=") { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid control path (%s)", controlPath)) - return false + return errRunTerminate } tmp, err := strconv.ParseInt(controlPath[len("trackID="):], 10, 64) if err != nil || tmp < 0 { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("invalid track id (%s)", controlPath)) - return false + return errRunTerminate } trackId := int(tmp) if _, ok := c.streamTracks[trackId]; ok { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("track %d has already been setup", trackId)) - return false + return errRunTerminate } // play via UDP - if func() bool { - _, ok := th["RTP/AVP"] - if ok { - return true - } - _, ok = th["RTP/AVP/UDP"] - if ok { - return true - } - return false - }() { + if th.IsUdp() { if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok { c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) - return false + return errRunTerminate } if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) - return false + return errRunTerminate } rtpPort, rtcpPort := th.Ports("client_port") if rtpPort == 0 || rtcpPort == 0 { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%v)", req.Header["Transport"])) - return false + return errRunTerminate } res := make(chan error) @@ -535,7 +499,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { err = <-res if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, err) - return false + return errRunTerminate } c.streamProtocol = gortsplib.StreamProtocolUdp @@ -557,18 +521,18 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { "Session": gortsplib.HeaderValue{"12345678"}, }, }) - return true + return nil // play via TCP - } else if _, ok := th["RTP/AVP/TCP"]; ok { + } else if th.IsTcp() { if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok { c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) - return false + return errRunTerminate } if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) - return false + return errRunTerminate } res := make(chan error) @@ -576,7 +540,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { err = <-res if err != nil { c.writeResError(cseq, gortsplib.StatusBadRequest, err) - return false + return errRunTerminate } c.streamProtocol = gortsplib.StreamProtocolTcp @@ -599,65 +563,47 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { "Session": gortsplib.HeaderValue{"12345678"}, }, }) - return true + return nil } else { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", req.Header["Transport"])) - return false + return errRunTerminate } // record - case clientStateAnnounce, clientStatePreRecord: + case clientStatePreRecord: if strings.ToLower(th.Value("mode")) != "record" { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain mode=record")) - return false + return errRunTerminate } // after ANNOUNCE, c.path is already set if basePath != c.path.name { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, basePath)) - return false + return errRunTerminate } // record via UDP - if func() bool { - _, ok := th["RTP/AVP"] - if ok { - return true - } - _, ok = th["RTP/AVP/UDP"] - if ok { - return true - } - return false - }() { + if th.IsUdp() { if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolUdp]; !ok { c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("UDP streaming is disabled")) - return false + return errRunTerminate } if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) - return false + return errRunTerminate } rtpPort, rtcpPort := th.Ports("client_port") if rtpPort == 0 || rtcpPort == 0 { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"])) - return false + return errRunTerminate } if len(c.streamTracks) >= len(c.path.publisherSdpParsed.MediaDescriptions) { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) - return false - } - - res := make(chan error) - c.p.events <- programEventClientSetupRecord{res, c} - err := <-res - if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) - return false + return errRunTerminate } c.streamProtocol = gortsplib.StreamProtocolUdp @@ -679,43 +625,35 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { "Session": gortsplib.HeaderValue{"12345678"}, }, }) - return true + return nil // record via TCP - } else if _, ok := th["RTP/AVP/TCP"]; ok { + } else if th.IsTcp() { if _, ok := c.p.conf.protocolsParsed[gortsplib.StreamProtocolTcp]; !ok { c.writeResError(cseq, gortsplib.StatusUnsupportedTransport, fmt.Errorf("TCP streaming is disabled")) - return false + return errRunTerminate } if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) - return false + return errRunTerminate } interleaved := th.Value("interleaved") if interleaved == "" { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain the interleaved field")) - return false + return errRunTerminate } expInterleaved := fmt.Sprintf("%d-%d", 0+len(c.streamTracks)*2, 1+len(c.streamTracks)*2) if interleaved != expInterleaved { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("wrong interleaved value, expected '%s', got '%s'", expInterleaved, interleaved)) - return false + return errRunTerminate } if len(c.streamTracks) >= len(c.path.publisherSdpParsed.MediaDescriptions) { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("all the tracks have already been setup")) - return false - } - - res := make(chan error) - c.p.events <- programEventClientSetupRecord{res, c} - err := <-res - if err != nil { - c.writeResError(cseq, gortsplib.StatusBadRequest, err) - return false + return errRunTerminate } c.streamProtocol = gortsplib.StreamProtocolTcp @@ -736,36 +674,36 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { "Session": gortsplib.HeaderValue{"12345678"}, }, }) - return true + return nil } else { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", req.Header["Transport"])) - return false + return errRunTerminate } default: c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s'", c.state)) - return false + return errRunTerminate } case gortsplib.PLAY: if c.state != clientStatePrePlay { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePrePlay)) - return false + return errRunTerminate } // path can end with a slash, remove it - path = strings.TrimSuffix(path, "/") + pathName = strings.TrimSuffix(pathName, "/") - if path != c.path.name { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, path)) - return false + if pathName != c.path.name { + c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, pathName)) + return errRunTerminate } if len(c.streamTracks) == 0 { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("no tracks have been setup")) - return false + return errRunTerminate } // write response before setting state @@ -779,27 +717,26 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { }, }) - c.runPlay(path) - return false + return errRunPlay case gortsplib.RECORD: if c.state != clientStatePreRecord { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("client is in state '%s' instead of '%s'", c.state, clientStatePreRecord)) - return false + return errRunTerminate } // path can end with a slash, remove it - path = strings.TrimSuffix(path, "/") + pathName = strings.TrimSuffix(pathName, "/") - if path != c.path.name { - c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, path)) - return false + if pathName != c.path.name { + c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.name, pathName)) + return errRunTerminate } if len(c.streamTracks) != len(c.path.publisherSdpParsed.MediaDescriptions) { c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("not all tracks have been setup")) - return false + return errRunTerminate } c.conn.WriteResponse(&gortsplib.Response{ @@ -810,30 +747,93 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { }, }) - c.runRecord(path) - return false + return errRunRecord case gortsplib.TEARDOWN: // close connection silently - return false + return errRunTerminate default: c.writeResError(cseq, gortsplib.StatusBadRequest, fmt.Errorf("unhandled method '%s'", req.Method)) + return errRunTerminate + } +} + +func (c *client) runInitial() bool { + readDone := make(chan error) + go func() { + for { + req, err := c.conn.ReadRequest() + if err != nil { + readDone <- err + break + } + + err = c.handleRequest(req) + if err != nil { + readDone <- err + break + } + } + }() + + select { + case err := <-readDone: + switch err { + case errRunWaitDescription: + return c.runWaitDescription() + + case errRunPlay: + return c.runPlay() + + case errRunRecord: + return c.runRecord() + + default: + c.conn.Close() + if err != io.EOF && err != errRunTerminate { + c.log("ERR: %s", err) + } + c.p.events <- programEventClientClose{c} + <-c.terminate + return false + } + + case <-c.terminate: + c.conn.Close() + <-readDone return false } } -func (c *client) runPlay(path string) { - confp := c.p.findConfForPathName(path) +func (c *client) runWaitDescription() bool { + select { + case res := <-c.describe: + if res.err != nil { + c.writeResError(c.describeCSeq, gortsplib.StatusNotFound, res.err) + return true + } - if c.streamProtocol == gortsplib.StreamProtocolTcp { - c.events = make(chan clientEvent) + c.conn.WriteResponse(&gortsplib.Response{ + StatusCode: gortsplib.StatusOK, + Header: gortsplib.Header{ + "CSeq": c.describeCSeq, + "Content-Base": gortsplib.HeaderValue{c.describeUrl + "/"}, + "Content-Type": gortsplib.HeaderValue{"application/sdp"}, + }, + Content: res.sdp, + }) + return true + + case <-c.terminate: + c.conn.Close() + return false } +} +func (c *client) runPlay() bool { // start sending frames only after sending the response to the PLAY request - done := make(chan struct{}) - c.p.events <- programEventClientPlay{done, c} - <-done + c.p.events <- programEventClientPlay{c} c.log("is receiving on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { @@ -843,10 +843,10 @@ func (c *client) runPlay(path string) { }(), c.streamProtocol) var onReadCmd *exec.Cmd - if confp.RunOnRead != "" { - onReadCmd = exec.Command("/bin/sh", "-c", confp.RunOnRead) + if c.path.confp.RunOnRead != "" { + onReadCmd = exec.Command("/bin/sh", "-c", c.path.confp.RunOnRead) onReadCmd.Env = append(os.Environ(), - "RTSP_SERVER_PATH="+path, + "RTSP_SERVER_PATH="+c.path.name, ) onReadCmd.Stdout = os.Stdout onReadCmd.Stderr = os.Stderr @@ -857,101 +857,117 @@ func (c *client) runPlay(path string) { } if c.streamProtocol == gortsplib.StreamProtocolUdp { - for { - req, err := c.conn.ReadRequest() - if err != nil { - if err != io.EOF { - c.log("ERR: %s", err) - } - break - } - - ok := c.handleRequest(req) - if !ok { - break - } - } - - done := make(chan struct{}) - c.p.events <- programEventClientPlayStop{done, c} - <-done - + c.runPlayUdp() } else { - readDone := make(chan error) - go func() { - frame := &gortsplib.InterleavedFrame{} - readBuf := make([]byte, clientTcpReadBufferSize) - - for { - frame.Content = readBuf - frame.Content = frame.Content[:cap(frame.Content)] - - recv, err := c.conn.ReadFrameOrRequest(frame, false) - if err != nil { - readDone <- err - break - } - - switch recvt := recv.(type) { - case *gortsplib.InterleavedFrame: - // rtcp feedback is handled by gortsplib - - case *gortsplib.Request: - ok := c.handleRequest(recvt) - if !ok { - readDone <- nil - break - } - } - } - }() - - outer: - for { - select { - case err := <-readDone: - if err != nil && err != io.EOF { - c.log("ERR: %s", err) - } - break outer - - case rawEvt := <-c.events: - switch evt := rawEvt.(type) { - case clientEventFrameTcp: - c.conn.WriteFrame(evt.frame) - } - } - } - - go func() { - for range c.events { - } - }() - - done := make(chan struct{}) - c.p.events <- programEventClientPlayStop{done, c} - <-done - - close(c.events) + c.runPlayTcp() } if onReadCmd != nil { onReadCmd.Process.Signal(os.Interrupt) onReadCmd.Wait() } + + return false } -func (c *client) runRecord(path string) { - confp := c.p.findConfForPathName(path) +func (c *client) runPlayUdp() { + readDone := make(chan error) + go func() { + for { + req, err := c.conn.ReadRequest() + if err != nil { + readDone <- err + break + } + err = c.handleRequest(req) + if err != nil { + readDone <- err + break + } + } + }() + + select { + case err := <-readDone: + c.conn.Close() + if err != io.EOF && err != errRunTerminate { + c.log("ERR: %s", err) + } + c.p.events <- programEventClientClose{c} + <-c.terminate + return + + case <-c.terminate: + c.conn.Close() + <-readDone + return + } +} + +func (c *client) runPlayTcp() { + readDone := make(chan error) + go func() { + frame := &gortsplib.InterleavedFrame{} + readBuf := make([]byte, clientTcpReadBufferSize) + + for { + frame.Content = readBuf + frame.Content = frame.Content[:cap(frame.Content)] + + recv, err := c.conn.ReadFrameOrRequest(frame, false) + if err != nil { + readDone <- err + break + } + + switch recvt := recv.(type) { + case *gortsplib.InterleavedFrame: + // rtcp feedback is handled by gortsplib + + case *gortsplib.Request: + err := c.handleRequest(recvt) + if err != nil { + readDone <- err + break + } + } + } + }() + + for { + select { + case err := <-readDone: + c.conn.Close() + if err != io.EOF && err != errRunTerminate { + c.log("ERR: %s", err) + } + go func() { + for range c.tcpFrame { + } + }() + c.p.events <- programEventClientClose{c} + <-c.terminate + return + + case frame := <-c.tcpFrame: + c.conn.WriteFrame(frame) + + case <-c.terminate: + c.conn.Close() + <-readDone + return + } + } +} + +func (c *client) runRecord() bool { c.rtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks)) for trackId := range c.streamTracks { c.rtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() } - done := make(chan struct{}) - c.p.events <- programEventClientRecord{done, c} - <-done + c.p.events <- programEventClientRecord{c} c.log("is publishing on path '%s', %d %s via %s", c.path.name, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { @@ -961,10 +977,10 @@ func (c *client) runRecord(path string) { }(), c.streamProtocol) var onPublishCmd *exec.Cmd - if confp.RunOnPublish != "" { - onPublishCmd = exec.Command("/bin/sh", "-c", confp.RunOnPublish) + if c.path.confp.RunOnPublish != "" { + onPublishCmd = exec.Command("/bin/sh", "-c", c.path.confp.RunOnPublish) onPublishCmd.Env = append(os.Environ(), - "RTSP_SERVER_PATH="+path, + "RTSP_SERVER_PATH="+c.path.name, ) onPublishCmd.Stdout = os.Stdout onPublishCmd.Stderr = os.Stderr @@ -975,141 +991,160 @@ func (c *client) runRecord(path string) { } if c.streamProtocol == gortsplib.StreamProtocolUdp { - readDone := make(chan error) - go func() { - for { - req, err := c.conn.ReadRequest() - if err != nil { - readDone <- err - break - } - - ok := c.handleRequest(req) - if !ok { - readDone <- nil - break - } - } - }() - - checkStreamTicker := time.NewTicker(clientCheckStreamInterval) - receiverReportTicker := time.NewTicker(clientReceiverReportInterval) - - outer2: - for { - select { - case err := <-readDone: - if err != nil && err != io.EOF { - c.log("ERR: %s", err) - } - break outer2 - - case <-checkStreamTicker.C: - for trackId := range c.streamTracks { - if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout { - c.log("ERR: stream is dead") - c.conn.NetConn().Close() - <-readDone - break outer2 - } - } - - case <-receiverReportTicker.C: - for trackId := range c.streamTracks { - frame := c.rtcpReceivers[trackId].Report() - c.p.serverRtcp.writeChan <- &udpAddrBufPair{ - addr: &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: c.streamTracks[trackId].rtcpPort, - }, - buf: frame, - } - } - } - } - - checkStreamTicker.Stop() - receiverReportTicker.Stop() - + c.runRecordUdp() } else { - frame := &gortsplib.InterleavedFrame{} - readBuf := newMultiBuffer(3, clientTcpReadBufferSize) - - readDone := make(chan error) - go func() { - for { - frame.Content = readBuf.next() - frame.Content = frame.Content[:cap(frame.Content)] - - recv, err := c.conn.ReadFrameOrRequest(frame, true) - if err != nil { - readDone <- err - break - } - - switch recvt := recv.(type) { - case *gortsplib.InterleavedFrame: - if frame.TrackId >= len(c.streamTracks) { - c.log("ERR: invalid track id '%d'", frame.TrackId) - readDone <- nil - break - } - - c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) - c.p.events <- programEventClientFrameTcp{ - c.path, - frame.TrackId, - frame.StreamType, - frame.Content, - } - - case *gortsplib.Request: - ok := c.handleRequest(recvt) - if !ok { - readDone <- nil - break - } - } - } - }() - - receiverReportTicker := time.NewTicker(clientReceiverReportInterval) - - outer1: - for { - select { - case err := <-readDone: - if err != nil && err != io.EOF { - c.log("ERR: %s", err) - } - break outer1 - - case <-receiverReportTicker.C: - for trackId := range c.streamTracks { - frame := c.rtcpReceivers[trackId].Report() - c.conn.WriteFrame(&gortsplib.InterleavedFrame{ - TrackId: trackId, - StreamType: gortsplib.StreamTypeRtcp, - Content: frame, - }) - } - } - } - - receiverReportTicker.Stop() - } - - done = make(chan struct{}) - c.p.events <- programEventClientRecordStop{done, c} - <-done - - for trackId := range c.streamTracks { - c.rtcpReceivers[trackId].Close() + c.runRecordTcp() } if onPublishCmd != nil { onPublishCmd.Process.Signal(os.Interrupt) onPublishCmd.Wait() } + + for trackId := range c.streamTracks { + c.rtcpReceivers[trackId].Close() + } + + return false +} + +func (c *client) runRecordUdp() { + readDone := make(chan error) + go func() { + for { + req, err := c.conn.ReadRequest() + if err != nil { + readDone <- err + break + } + + err = c.handleRequest(req) + if err != nil { + readDone <- err + break + } + } + }() + + checkStreamTicker := time.NewTicker(clientCheckStreamInterval) + defer checkStreamTicker.Stop() + + receiverReportTicker := time.NewTicker(clientReceiverReportInterval) + defer receiverReportTicker.Stop() + + for { + select { + case err := <-readDone: + c.conn.Close() + if err != io.EOF && err != errRunTerminate { + c.log("ERR: %s", err) + } + c.p.events <- programEventClientClose{c} + <-c.terminate + return + + case <-checkStreamTicker.C: + for trackId := range c.streamTracks { + if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.p.conf.ReadTimeout { + c.log("ERR: stream is dead") + c.conn.Close() + <-readDone + c.p.events <- programEventClientClose{c} + <-c.terminate + return + } + } + + case <-receiverReportTicker.C: + for trackId := range c.streamTracks { + frame := c.rtcpReceivers[trackId].Report() + c.p.serverRtcp.writeChan <- &udpAddrBufPair{ + addr: &net.UDPAddr{ + IP: c.ip(), + Zone: c.zone(), + Port: c.streamTracks[trackId].rtcpPort, + }, + buf: frame, + } + } + + case <-c.terminate: + c.conn.Close() + <-readDone + return + } + } +} + +func (c *client) runRecordTcp() { + frame := &gortsplib.InterleavedFrame{} + readBuf := newMultiBuffer(3, clientTcpReadBufferSize) + + readDone := make(chan error) + go func() { + for { + frame.Content = readBuf.next() + frame.Content = frame.Content[:cap(frame.Content)] + + recv, err := c.conn.ReadFrameOrRequest(frame, true) + if err != nil { + readDone <- err + break + } + + switch recvt := recv.(type) { + case *gortsplib.InterleavedFrame: + if frame.TrackId >= len(c.streamTracks) { + readDone <- fmt.Errorf("invalid track id '%d'", frame.TrackId) + break + } + + c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) + c.p.events <- programEventClientFrameTcp{ + c.path, + frame.TrackId, + frame.StreamType, + frame.Content, + } + + case *gortsplib.Request: + err := c.handleRequest(recvt) + if err != nil { + readDone <- err + break + } + } + } + }() + + receiverReportTicker := time.NewTicker(clientReceiverReportInterval) + defer receiverReportTicker.Stop() + + for { + select { + case err := <-readDone: + c.conn.Close() + if err != io.EOF && err != errRunTerminate { + c.log("ERR: %s", err) + } + c.p.events <- programEventClientClose{c} + <-c.terminate + return + + case <-receiverReportTicker.C: + for trackId := range c.streamTracks { + frame := c.rtcpReceivers[trackId].Report() + c.conn.WriteFrame(&gortsplib.InterleavedFrame{ + TrackId: trackId, + StreamType: gortsplib.StreamTypeRtcp, + Content: frame, + }) + } + + case <-c.terminate: + c.conn.Close() + <-readDone + return + } + } } diff --git a/go.mod b/go.mod index 54d0cf56..137987ff 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20200829164650-64af75a6b5f6 + github.com/aler9/gortsplib v0.0.0-20200831072723-57eae89cc552 github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 github.com/davecgh/go-spew v1.1.1 // indirect github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 857aff3b..f666db07 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20200829164650-64af75a6b5f6 h1:orZ3RyemnuPUk8K6Wiwga836KVKF64yQR4/gpA3xXu8= -github.com/aler9/gortsplib v0.0.0-20200829164650-64af75a6b5f6/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= +github.com/aler9/gortsplib v0.0.0-20200831072723-57eae89cc552 h1:iSF9Byglyx1yqa32kve+6V49wnfI1PB9ciSUBEUO0b0= +github.com/aler9/gortsplib v0.0.0-20200831072723-57eae89cc552/go.mod h1:kBMvjIdOHRjLdV+oT28JD72JUPpJuwxOc9u72GG8GpY= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436 h1:W0iNErWKvSAyJBNVx+qQoyFrWOFVgS6f/WEME/D3EZc= github.com/aler9/sdp/v3 v3.0.0-20200719093237-2c3d108a7436/go.mod h1:OnlEK3QI7YtM+ShZWtGajmOHLZ3bjU80AcIS5e34i1U= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= diff --git a/main.go b/main.go index 1ce59d33..09a051b0 100644 --- a/main.go +++ b/main.go @@ -43,7 +43,6 @@ type programEventClientNew struct { func (programEventClientNew) isProgramEvent() {} type programEventClientClose struct { - done chan struct{} client *client } @@ -75,41 +74,18 @@ type programEventClientSetupPlay struct { func (programEventClientSetupPlay) isProgramEvent() {} -type programEventClientSetupRecord struct { - res chan error - client *client -} - -func (programEventClientSetupRecord) isProgramEvent() {} - type programEventClientPlay struct { - done chan struct{} client *client } func (programEventClientPlay) isProgramEvent() {} -type programEventClientPlayStop struct { - done chan struct{} - client *client -} - -func (programEventClientPlayStop) isProgramEvent() {} - type programEventClientRecord struct { - done chan struct{} client *client } func (programEventClientRecord) isProgramEvent() {} -type programEventClientRecordStop struct { - done chan struct{} - client *client -} - -func (programEventClientRecordStop) isProgramEvent() {} - type programEventClientFrameUdp struct { addr *net.UDPAddr streamType gortsplib.StreamType @@ -312,14 +288,10 @@ outer: c.log("connected") case programEventClientClose: - delete(p.clients, evt.client) - - if evt.client.path != nil && evt.client.path.publisher == evt.client { - evt.client.path.onPublisherRemove() + if _, ok := p.clients[evt.client]; !ok { + continue } - - evt.client.log("disconnected") - close(evt.done) + p.closeClient(evt.client) case programEventClientDescribe: // create path if not exist @@ -341,7 +313,12 @@ outer: } } - p.paths[evt.pathName].onPublisherNew(evt.client, evt.sdpText, evt.sdpParsed) + p.paths[evt.pathName].publisher = evt.client + p.paths[evt.pathName].publisherSdpText = evt.sdpText + p.paths[evt.pathName].publisherSdpParsed = evt.sdpParsed + + evt.client.path = p.paths[evt.pathName] + evt.client.state = clientStatePreRecord evt.res <- nil case programEventClientSetupPlay: @@ -360,19 +337,9 @@ outer: evt.client.state = clientStatePrePlay evt.res <- nil - case programEventClientSetupRecord: - evt.client.state = clientStatePreRecord - evt.res <- nil - case programEventClientPlay: p.readerCount += 1 evt.client.state = clientStatePlay - close(evt.done) - - case programEventClientPlayStop: - p.readerCount -= 1 - evt.client.state = clientStatePrePlay - close(evt.done) case programEventClientRecord: p.publisherCount += 1 @@ -397,22 +364,6 @@ outer: } evt.client.path.onPublisherSetReady() - close(evt.done) - - case programEventClientRecordStop: - p.publisherCount -= 1 - evt.client.state = clientStatePreRecord - if evt.client.streamProtocol == gortsplib.StreamProtocolUdp { - for _, track := range evt.client.streamTracks { - key := makeUdpClientAddr(evt.client.ip(), track.rtpPort) - delete(p.udpClientsByAddr, key) - - key = makeUdpClientAddr(evt.client.ip(), track.rtcpPort) - delete(p.udpClientsByAddr, key) - } - } - evt.client.path.onPublisherSetNotReady() - close(evt.done) case programEventClientFrameUdp: pub, ok := p.udpClientsByAddr[makeUdpClientAddr(evt.addr.IP, evt.addr.Port)] @@ -454,32 +405,11 @@ outer: case programEventMetrics: evt.res <- nil - case programEventClientClose: - close(evt.done) - - case programEventClientDescribe: - evt.client.describeRes <- describeRes{nil, fmt.Errorf("terminated")} - case programEventClientAnnounce: evt.res <- fmt.Errorf("terminated") case programEventClientSetupPlay: evt.res <- fmt.Errorf("terminated") - - case programEventClientSetupRecord: - evt.res <- fmt.Errorf("terminated") - - case programEventClientPlay: - close(evt.done) - - case programEventClientPlayStop: - close(evt.done) - - case programEventClientRecord: - close(evt.done) - - case programEventClientRecordStop: - close(evt.done) } } }() @@ -499,7 +429,7 @@ outer: } for c := range p.clients { - c.conn.NetConn().Close() + p.closeClient(c) <-c.done } @@ -536,6 +466,38 @@ func (p *program) findConfForPathName(name string) *confPath { return nil } +func (p *program) closeClient(client *client) { + delete(p.clients, client) + + switch client.state { + case clientStatePlay: + p.readerCount -= 1 + + case clientStateRecord: + p.publisherCount -= 1 + + if client.streamProtocol == gortsplib.StreamProtocolUdp { + for _, track := range client.streamTracks { + key := makeUdpClientAddr(client.ip(), track.rtpPort) + delete(p.udpClientsByAddr, key) + + key = makeUdpClientAddr(client.ip(), track.rtcpPort) + delete(p.udpClientsByAddr, key) + } + } + + client.path.onPublisherSetNotReady() + } + + if client.path != nil && client.path.publisher == client { + client.path.onPublisherRemove() + } + + close(client.terminate) + + client.log("disconnected") +} + func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.StreamType, frame []byte) { for c := range p.clients { if c.path != path || @@ -570,12 +532,10 @@ func (p *program) forwardFrame(path *path, trackId int, streamType gortsplib.Str } } else { - c.events <- clientEventFrameTcp{ - frame: &gortsplib.InterleavedFrame{ - TrackId: trackId, - StreamType: streamType, - Content: frame, - }, + c.tcpFrame <- &gortsplib.InterleavedFrame{ + TrackId: trackId, + StreamType: streamType, + Content: frame, } } } diff --git a/path.go b/path.go index 4d62d866..9b876041 100644 --- a/path.go +++ b/path.go @@ -76,7 +76,7 @@ func (pa *path) onInit() { func (pa *path) onClose() { if pa.source != nil { - pa.source.events <- sourceEventTerminate{} + close(pa.source.terminate) <-pa.source.done } @@ -91,6 +91,18 @@ func (pa *path) onClose() { pa.onDemandCmd.Process.Signal(os.Interrupt) pa.onDemandCmd.Wait() } + + for c := range pa.p.clients { + if c.path == pa { + if c.state == clientStateWaitDescription { + c.path = nil + c.state = clientStateInitial + c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} + } else { + pa.p.closeClient(c) + } + } + } } func (pa *path) hasClients() bool { @@ -104,7 +116,7 @@ func (pa *path) hasClients() bool { func (pa *path) hasClientsWaitingDescribe() bool { for c := range pa.p.clients { - if c.state == clientStateWaitingDescription && c.path == pa { + if c.state == clientStateWaitDescription && c.path == pa { return true } } @@ -125,11 +137,11 @@ func (pa *path) onCheck() { if pa.hasClientsWaitingDescribe() && time.Since(pa.lastDescribeActivation) >= describeTimeout { for c := range pa.p.clients { - if c.state == clientStateWaitingDescription && + if c.state == clientStateWaitDescription && c.path == pa { c.path = nil c.state = clientStateInitial - c.describeRes <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} + c.describe <- describeRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} } } } @@ -142,7 +154,7 @@ func (pa *path) onCheck() { time.Since(pa.lastDescribeReq) >= sourceStopAfterDescribeSecs { pa.source.log("stopping since we're not requested anymore") pa.source.state = sourceStateStopped - pa.source.events <- sourceEventApplyState{pa.source.state} + pa.source.setState <- pa.source.state } // stop on demand command if needed @@ -164,17 +176,17 @@ func (pa *path) onCheck() { } } -func (pa *path) onPublisherNew(client *client, sdpText []byte, sdpParsed *sdp.SessionDescription) { - pa.publisher = client - pa.publisherSdpText = sdpText - pa.publisherSdpParsed = sdpParsed - - client.path = pa - client.state = clientStateAnnounce -} - func (pa *path) onPublisherRemove() { pa.publisher = nil + + // close all clients that are reading or waiting for reading + for c := range pa.p.clients { + if c.path == pa && + c.state != clientStateWaitDescription && + c != pa.publisher { + pa.p.closeClient(c) + } + } } func (pa *path) onPublisherSetReady() { @@ -182,11 +194,11 @@ func (pa *path) onPublisherSetReady() { // reply to all clients that are waiting for a description for c := range pa.p.clients { - if c.state == clientStateWaitingDescription && + if c.state == clientStateWaitDescription && c.path == pa { c.path = nil c.state = clientStateInitial - c.describeRes <- describeRes{pa.publisherSdpText, nil} + c.describe <- describeRes{pa.publisherSdpText, nil} } } } @@ -194,12 +206,12 @@ func (pa *path) onPublisherSetReady() { func (pa *path) onPublisherSetNotReady() { pa.publisherReady = false - // close all clients that are reading + // close all clients that are reading or waiting for reading for c := range pa.p.clients { - if c.state != clientStateWaitingDescription && - c != pa.publisher && - c.path == pa { - c.conn.NetConn().Close() + if c.path == pa && + c.state != clientStateWaitDescription && + c != pa.publisher { + pa.p.closeClient(c) } } } @@ -228,11 +240,11 @@ func (pa *path) onDescribe(client *client) { } client.path = pa - client.state = clientStateWaitingDescription + client.state = clientStateWaitDescription // no on-demand: reply with 404 } else { - client.describeRes <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)} + client.describe <- describeRes{nil, fmt.Errorf("no one is publishing on path '%s'", pa.name)} } // publisher was found but is not ready: put the client on hold @@ -241,14 +253,14 @@ func (pa *path) onDescribe(client *client) { pa.source.log("starting on demand") pa.lastDescribeActivation = time.Now() pa.source.state = sourceStateRunning - pa.source.events <- sourceEventApplyState{pa.source.state} + pa.source.setState <- pa.source.state } client.path = pa - client.state = clientStateWaitingDescription + client.state = clientStateWaitDescription // publisher was found and is ready } else { - client.describeRes <- describeRes{pa.publisherSdpText, nil} + client.describe <- describeRes{pa.publisherSdpText, nil} } } diff --git a/source.go b/source.go index 8b66231f..2116e877 100644 --- a/source.go +++ b/source.go @@ -23,20 +23,6 @@ const ( sourceStateRunning ) -type sourceEvent interface { - isSourceEvent() -} - -type sourceEventApplyState struct { - state sourceState -} - -func (sourceEventApplyState) isSourceEvent() {} - -type sourceEventTerminate struct{} - -func (sourceEventTerminate) isSourceEvent() {} - type source struct { p *program path *path @@ -44,17 +30,19 @@ type source struct { state sourceState tracks []*gortsplib.Track - events chan sourceEvent - done chan struct{} + setState chan sourceState + terminate chan struct{} + done chan struct{} } func newSource(p *program, path *path, confp *confPath) *source { s := &source{ - p: p, - path: path, - confp: confp, - events: make(chan sourceEvent), - done: make(chan struct{}), + p: p, + path: path, + confp: confp, + setState: make(chan sourceState), + terminate: make(chan struct{}), + done: make(chan struct{}), } if confp.SourceOnDemand { @@ -99,12 +87,12 @@ func (s *source) run() { applyState(s.state) outer: - for rawEvt := range s.events { - switch evt := rawEvt.(type) { - case sourceEventApplyState: - applyState(evt.state) + for { + select { + case state := <-s.setState: + applyState(state) - case sourceEventTerminate: + case <-s.terminate: break outer } } @@ -114,6 +102,7 @@ outer: <-doDone } + close(s.setState) close(s.done) }