From f8b366c6048d771168285920e2c57f4497377b42 Mon Sep 17 00:00:00 2001 From: Andres Uribe Date: Thu, 31 Oct 2024 07:47:35 -0400 Subject: [PATCH] webrtc: restart ICE only on failed connection states (#3899) * webrtc: Restart ICE only on failed connection states * rename "connected" into "ready" since WebRTC can emit the "connected" state multiple times --------- Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com> --- internal/protocols/webrtc/peer_connection.go | 43 ++++++++++---------- internal/protocols/webrtc/to_stream_test.go | 6 +-- internal/protocols/whip/client.go | 6 +-- internal/servers/webrtc/publisher.js | 2 +- internal/servers/webrtc/reader.js | 2 +- internal/servers/webrtc/session.go | 8 ++-- internal/staticsources/webrtc/source_test.go | 2 +- 7 files changed, 35 insertions(+), 34 deletions(-) diff --git a/internal/protocols/webrtc/peer_connection.go b/internal/protocols/webrtc/peer_connection.go index e4e8c11a..6ac441e4 100644 --- a/internal/protocols/webrtc/peer_connection.go +++ b/internal/protocols/webrtc/peer_connection.go @@ -85,8 +85,8 @@ type PeerConnection struct { wr *webrtc.PeerConnection stateChangeMutex sync.Mutex newLocalCandidate chan *webrtc.ICECandidateInit - connected chan struct{} - disconnected chan struct{} + ready chan struct{} + failed chan struct{} done chan struct{} gatheringDone chan struct{} incomingTrack chan trackRecvPair @@ -213,8 +213,8 @@ func (co *PeerConnection) Start() error { } co.newLocalCandidate = make(chan *webrtc.ICECandidateInit) - co.connected = make(chan struct{}) - co.disconnected = make(chan struct{}) + co.ready = make(chan struct{}) + co.failed = make(chan struct{}) co.done = make(chan struct{}) co.gatheringDone = make(chan struct{}) co.incomingTrack = make(chan trackRecvPair) @@ -268,10 +268,11 @@ func (co *PeerConnection) Start() error { switch state { case webrtc.PeerConnectionStateConnected: - // for some reasons, PeerConnectionStateConnected can arrive twice. - // https://github.com/bluenviron/mediamtx/issues/3813 + // PeerConnectionStateConnected can arrive twice, since state can + // switch from "disconnected" to "connected". + // contrarily, we're interested into emitting "ready" once. select { - case <-co.connected: + case <-co.ready: return default: } @@ -279,10 +280,10 @@ func (co *PeerConnection) Start() error { co.Log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", co.LocalCandidate(), co.RemoteCandidate()) - close(co.connected) + close(co.ready) - case webrtc.PeerConnectionStateDisconnected: - close(co.disconnected) + case webrtc.PeerConnectionStateFailed: + close(co.failed) case webrtc.PeerConnectionStateClosed: close(co.done) @@ -294,7 +295,7 @@ func (co *PeerConnection) Start() error { v := i.ToJSON() select { case co.newLocalCandidate <- &v: - case <-co.connected: + case <-co.ready: case <-co.ctx.Done(): } } else { @@ -380,8 +381,8 @@ func (co *PeerConnection) waitGatheringDone(ctx context.Context) error { } } -// WaitUntilConnected waits until connection is established. -func (co *PeerConnection) WaitUntilConnected( +// WaitUntilReady waits until connection is established. +func (co *PeerConnection) WaitUntilReady( ctx context.Context, ) error { t := time.NewTimer(time.Duration(co.HandshakeTimeout)) @@ -393,7 +394,7 @@ outer: case <-t.C: return fmt.Errorf("deadline exceeded while waiting connection") - case <-co.connected: + case <-co.ready: break outer case <-ctx.Done(): @@ -436,7 +437,7 @@ func (co *PeerConnection) GatherIncomingTracks(ctx context.Context) ([]*Incoming return co.incomingTracks, nil } - case <-co.Disconnected(): + case <-co.Failed(): return nil, fmt.Errorf("peer connection closed") case <-ctx.Done(): @@ -445,14 +446,14 @@ func (co *PeerConnection) GatherIncomingTracks(ctx context.Context) ([]*Incoming } } -// Connected returns when connected. -func (co *PeerConnection) Connected() <-chan struct{} { - return co.connected +// Ready returns when ready. +func (co *PeerConnection) Ready() <-chan struct{} { + return co.ready } -// Disconnected returns when disconnected. -func (co *PeerConnection) Disconnected() <-chan struct{} { - return co.disconnected +// Failed returns when failed. +func (co *PeerConnection) Failed() <-chan struct{} { + return co.failed } // NewLocalCandidate returns when there's a new local candidate. diff --git a/internal/protocols/webrtc/to_stream_test.go b/internal/protocols/webrtc/to_stream_test.go index 94a63a94..2c8befc0 100644 --- a/internal/protocols/webrtc/to_stream_test.go +++ b/internal/protocols/webrtc/to_stream_test.go @@ -375,16 +375,16 @@ func TestToStream(t *testing.T) { err2 := pc2.AddRemoteCandidate(cnd) require.NoError(t, err2) - case <-pc1.Connected(): + case <-pc1.Ready(): return } } }() - err = pc1.WaitUntilConnected(context.Background()) + err = pc1.WaitUntilReady(context.Background()) require.NoError(t, err) - err = pc2.WaitUntilConnected(context.Background()) + err = pc2.WaitUntilReady(context.Background()) require.NoError(t, err) err = pc1.OutgoingTracks[0].WriteRTP(&rtp.Packet{ diff --git a/internal/protocols/whip/client.go b/internal/protocols/whip/client.go index 1af6cdd4..824e88dc 100644 --- a/internal/protocols/whip/client.go +++ b/internal/protocols/whip/client.go @@ -100,7 +100,7 @@ outer: case <-c.pc.GatheringDone(): - case <-c.pc.Connected(): + case <-c.pc.Ready(): break outer case <-t.C: @@ -190,7 +190,7 @@ outer: case <-c.pc.GatheringDone(): - case <-c.pc.Connected(): + case <-c.pc.Ready(): break outer case <-t.C: @@ -230,7 +230,7 @@ func (c *Client) Close() error { // Wait waits for client errors. func (c *Client) Wait(ctx context.Context) error { select { - case <-c.pc.Disconnected(): + case <-c.pc.Failed(): return fmt.Errorf("peer connection closed") case <-ctx.Done(): diff --git a/internal/servers/webrtc/publisher.js b/internal/servers/webrtc/publisher.js index 33d0a235..6990f449 100644 --- a/internal/servers/webrtc/publisher.js +++ b/internal/servers/webrtc/publisher.js @@ -372,7 +372,7 @@ return; } - if (this.pc.iceConnectionState === 'disconnected') { + if (this.pc.iceConnectionState === 'failed') { this.handleError('peer connection closed'); } else if (this.pc.iceConnectionState === 'connected') { if (this.conf.onConnected !== undefined) { diff --git a/internal/servers/webrtc/reader.js b/internal/servers/webrtc/reader.js index bdb41f15..c7a49879 100644 --- a/internal/servers/webrtc/reader.js +++ b/internal/servers/webrtc/reader.js @@ -468,7 +468,7 @@ return; } - if (this.pc.iceConnectionState === 'disconnected') { + if (this.pc.iceConnectionState === 'failed') { this.handleError('peer connection closed'); } }; diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 9298282d..d8ebbd0d 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -193,7 +193,7 @@ func (s *session) runPublish() (int, error) { go s.readRemoteCandidates(pc) - err = pc.WaitUntilConnected(s.ctx) + err = pc.WaitUntilReady(s.ctx) if err != nil { return 0, err } @@ -226,7 +226,7 @@ func (s *session) runPublish() (int, error) { pc.StartReading() select { - case <-pc.Disconnected(): + case <-pc.Failed(): return 0, fmt.Errorf("peer connection closed") case <-s.ctx.Done(): @@ -300,7 +300,7 @@ func (s *session) runRead() (int, error) { go s.readRemoteCandidates(pc) - err = pc.WaitUntilConnected(s.ctx) + err = pc.WaitUntilReady(s.ctx) if err != nil { stream.RemoveReader(s) return 0, err @@ -327,7 +327,7 @@ func (s *session) runRead() (int, error) { defer stream.RemoveReader(s) select { - case <-pc.Disconnected(): + case <-pc.Failed(): return 0, fmt.Errorf("peer connection closed") case err := <-stream.ReaderError(s): diff --git a/internal/staticsources/webrtc/source_test.go b/internal/staticsources/webrtc/source_test.go index 96d2c140..7458fb72 100644 --- a/internal/staticsources/webrtc/source_test.go +++ b/internal/staticsources/webrtc/source_test.go @@ -81,7 +81,7 @@ func TestSource(t *testing.T) { w.Write([]byte(answer.SDP)) go func() { - err3 := pc.WaitUntilConnected(context.Background()) + err3 := pc.WaitUntilReady(context.Background()) require.NoError(t, err3) err3 = outgoingTracks[0].WriteRTP(&rtp.Packet{