webrtc: restart ICE only on failed connection states (#3899)

* webrtc: Restart ICE only on failed connection states

* rename "connected" into "ready" since WebRTC can emit the "connected" state multiple times

---------

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
This commit is contained in:
Andres Uribe 2024-10-31 07:47:35 -04:00 committed by GitHub
parent 5ff4f90ff9
commit f8b366c604
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 35 additions and 34 deletions

View File

@ -85,8 +85,8 @@ type PeerConnection struct {
wr *webrtc.PeerConnection wr *webrtc.PeerConnection
stateChangeMutex sync.Mutex stateChangeMutex sync.Mutex
newLocalCandidate chan *webrtc.ICECandidateInit newLocalCandidate chan *webrtc.ICECandidateInit
connected chan struct{} ready chan struct{}
disconnected chan struct{} failed chan struct{}
done chan struct{} done chan struct{}
gatheringDone chan struct{} gatheringDone chan struct{}
incomingTrack chan trackRecvPair incomingTrack chan trackRecvPair
@ -213,8 +213,8 @@ func (co *PeerConnection) Start() error {
} }
co.newLocalCandidate = make(chan *webrtc.ICECandidateInit) co.newLocalCandidate = make(chan *webrtc.ICECandidateInit)
co.connected = make(chan struct{}) co.ready = make(chan struct{})
co.disconnected = make(chan struct{}) co.failed = make(chan struct{})
co.done = make(chan struct{}) co.done = make(chan struct{})
co.gatheringDone = make(chan struct{}) co.gatheringDone = make(chan struct{})
co.incomingTrack = make(chan trackRecvPair) co.incomingTrack = make(chan trackRecvPair)
@ -268,10 +268,11 @@ func (co *PeerConnection) Start() error {
switch state { switch state {
case webrtc.PeerConnectionStateConnected: case webrtc.PeerConnectionStateConnected:
// for some reasons, PeerConnectionStateConnected can arrive twice. // PeerConnectionStateConnected can arrive twice, since state can
// https://github.com/bluenviron/mediamtx/issues/3813 // switch from "disconnected" to "connected".
// contrarily, we're interested into emitting "ready" once.
select { select {
case <-co.connected: case <-co.ready:
return return
default: default:
} }
@ -279,10 +280,10 @@ func (co *PeerConnection) Start() error {
co.Log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v", co.Log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v",
co.LocalCandidate(), co.RemoteCandidate()) co.LocalCandidate(), co.RemoteCandidate())
close(co.connected) close(co.ready)
case webrtc.PeerConnectionStateDisconnected: case webrtc.PeerConnectionStateFailed:
close(co.disconnected) close(co.failed)
case webrtc.PeerConnectionStateClosed: case webrtc.PeerConnectionStateClosed:
close(co.done) close(co.done)
@ -294,7 +295,7 @@ func (co *PeerConnection) Start() error {
v := i.ToJSON() v := i.ToJSON()
select { select {
case co.newLocalCandidate <- &v: case co.newLocalCandidate <- &v:
case <-co.connected: case <-co.ready:
case <-co.ctx.Done(): case <-co.ctx.Done():
} }
} else { } else {
@ -380,8 +381,8 @@ func (co *PeerConnection) waitGatheringDone(ctx context.Context) error {
} }
} }
// WaitUntilConnected waits until connection is established. // WaitUntilReady waits until connection is established.
func (co *PeerConnection) WaitUntilConnected( func (co *PeerConnection) WaitUntilReady(
ctx context.Context, ctx context.Context,
) error { ) error {
t := time.NewTimer(time.Duration(co.HandshakeTimeout)) t := time.NewTimer(time.Duration(co.HandshakeTimeout))
@ -393,7 +394,7 @@ outer:
case <-t.C: case <-t.C:
return fmt.Errorf("deadline exceeded while waiting connection") return fmt.Errorf("deadline exceeded while waiting connection")
case <-co.connected: case <-co.ready:
break outer break outer
case <-ctx.Done(): case <-ctx.Done():
@ -436,7 +437,7 @@ func (co *PeerConnection) GatherIncomingTracks(ctx context.Context) ([]*Incoming
return co.incomingTracks, nil return co.incomingTracks, nil
} }
case <-co.Disconnected(): case <-co.Failed():
return nil, fmt.Errorf("peer connection closed") return nil, fmt.Errorf("peer connection closed")
case <-ctx.Done(): case <-ctx.Done():
@ -445,14 +446,14 @@ func (co *PeerConnection) GatherIncomingTracks(ctx context.Context) ([]*Incoming
} }
} }
// Connected returns when connected. // Ready returns when ready.
func (co *PeerConnection) Connected() <-chan struct{} { func (co *PeerConnection) Ready() <-chan struct{} {
return co.connected return co.ready
} }
// Disconnected returns when disconnected. // Failed returns when failed.
func (co *PeerConnection) Disconnected() <-chan struct{} { func (co *PeerConnection) Failed() <-chan struct{} {
return co.disconnected return co.failed
} }
// NewLocalCandidate returns when there's a new local candidate. // NewLocalCandidate returns when there's a new local candidate.

View File

@ -375,16 +375,16 @@ func TestToStream(t *testing.T) {
err2 := pc2.AddRemoteCandidate(cnd) err2 := pc2.AddRemoteCandidate(cnd)
require.NoError(t, err2) require.NoError(t, err2)
case <-pc1.Connected(): case <-pc1.Ready():
return return
} }
} }
}() }()
err = pc1.WaitUntilConnected(context.Background()) err = pc1.WaitUntilReady(context.Background())
require.NoError(t, err) require.NoError(t, err)
err = pc2.WaitUntilConnected(context.Background()) err = pc2.WaitUntilReady(context.Background())
require.NoError(t, err) require.NoError(t, err)
err = pc1.OutgoingTracks[0].WriteRTP(&rtp.Packet{ err = pc1.OutgoingTracks[0].WriteRTP(&rtp.Packet{

View File

@ -100,7 +100,7 @@ outer:
case <-c.pc.GatheringDone(): case <-c.pc.GatheringDone():
case <-c.pc.Connected(): case <-c.pc.Ready():
break outer break outer
case <-t.C: case <-t.C:
@ -190,7 +190,7 @@ outer:
case <-c.pc.GatheringDone(): case <-c.pc.GatheringDone():
case <-c.pc.Connected(): case <-c.pc.Ready():
break outer break outer
case <-t.C: case <-t.C:
@ -230,7 +230,7 @@ func (c *Client) Close() error {
// Wait waits for client errors. // Wait waits for client errors.
func (c *Client) Wait(ctx context.Context) error { func (c *Client) Wait(ctx context.Context) error {
select { select {
case <-c.pc.Disconnected(): case <-c.pc.Failed():
return fmt.Errorf("peer connection closed") return fmt.Errorf("peer connection closed")
case <-ctx.Done(): case <-ctx.Done():

View File

@ -372,7 +372,7 @@
return; return;
} }
if (this.pc.iceConnectionState === 'disconnected') { if (this.pc.iceConnectionState === 'failed') {
this.handleError('peer connection closed'); this.handleError('peer connection closed');
} else if (this.pc.iceConnectionState === 'connected') { } else if (this.pc.iceConnectionState === 'connected') {
if (this.conf.onConnected !== undefined) { if (this.conf.onConnected !== undefined) {

View File

@ -468,7 +468,7 @@
return; return;
} }
if (this.pc.iceConnectionState === 'disconnected') { if (this.pc.iceConnectionState === 'failed') {
this.handleError('peer connection closed'); this.handleError('peer connection closed');
} }
}; };

View File

@ -193,7 +193,7 @@ func (s *session) runPublish() (int, error) {
go s.readRemoteCandidates(pc) go s.readRemoteCandidates(pc)
err = pc.WaitUntilConnected(s.ctx) err = pc.WaitUntilReady(s.ctx)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -226,7 +226,7 @@ func (s *session) runPublish() (int, error) {
pc.StartReading() pc.StartReading()
select { select {
case <-pc.Disconnected(): case <-pc.Failed():
return 0, fmt.Errorf("peer connection closed") return 0, fmt.Errorf("peer connection closed")
case <-s.ctx.Done(): case <-s.ctx.Done():
@ -300,7 +300,7 @@ func (s *session) runRead() (int, error) {
go s.readRemoteCandidates(pc) go s.readRemoteCandidates(pc)
err = pc.WaitUntilConnected(s.ctx) err = pc.WaitUntilReady(s.ctx)
if err != nil { if err != nil {
stream.RemoveReader(s) stream.RemoveReader(s)
return 0, err return 0, err
@ -327,7 +327,7 @@ func (s *session) runRead() (int, error) {
defer stream.RemoveReader(s) defer stream.RemoveReader(s)
select { select {
case <-pc.Disconnected(): case <-pc.Failed():
return 0, fmt.Errorf("peer connection closed") return 0, fmt.Errorf("peer connection closed")
case err := <-stream.ReaderError(s): case err := <-stream.ReaderError(s):

View File

@ -81,7 +81,7 @@ func TestSource(t *testing.T) {
w.Write([]byte(answer.SDP)) w.Write([]byte(answer.SDP))
go func() { go func() {
err3 := pc.WaitUntilConnected(context.Background()) err3 := pc.WaitUntilReady(context.Background())
require.NoError(t, err3) require.NoError(t, err3)
err3 = outgoingTracks[0].WriteRTP(&rtp.Packet{ err3 = outgoingTracks[0].WriteRTP(&rtp.Packet{