mediamtx/internal/core/webrtc_server_test.go

236 lines
4.3 KiB
Go

package core
import (
"encoding/json"
"sync"
"testing"
"time"
"github.com/aler9/gortsplib/v2"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/media"
"github.com/gorilla/websocket"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
)
type webRTCTestClient struct {
wc *websocket.Conn
pc *webrtc.PeerConnection
track chan *webrtc.TrackRemote
closed chan struct{}
}
func newWebRTCTestClient(addr string) (*webRTCTestClient, error) {
wc, res, err := websocket.DefaultDialer.Dial(addr, nil)
if err != nil {
return nil, err
}
defer res.Body.Close()
_, msg, err := wc.ReadMessage()
if err != nil {
wc.Close()
return nil, err
}
var iceServers []webrtc.ICEServer
err = json.Unmarshal(msg, &iceServers)
if err != nil {
wc.Close()
return nil, err
}
pc, err := newPeerConnection(webrtc.Configuration{
ICEServers: iceServers,
})
if err != nil {
wc.Close()
return nil, err
}
pc.OnICECandidate(func(i *webrtc.ICECandidate) {
if i != nil {
enc, _ := json.Marshal(i.ToJSON())
wc.WriteMessage(websocket.TextMessage, enc)
}
})
connected := make(chan struct{})
closed := make(chan struct{})
var stateChangeMutex sync.Mutex
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
select {
case <-closed:
return
default:
}
switch state {
case webrtc.PeerConnectionStateConnected:
close(connected)
case webrtc.PeerConnectionStateClosed:
close(closed)
}
})
track := make(chan *webrtc.TrackRemote, 1)
pc.OnTrack(func(trak *webrtc.TrackRemote, recv *webrtc.RTPReceiver) {
track <- trak
})
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
localOffer, err := pc.CreateOffer(nil)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
enc, err := json.Marshal(localOffer)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = wc.WriteMessage(websocket.TextMessage, enc)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = pc.SetLocalDescription(localOffer)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
_, msg, err = wc.ReadMessage()
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
var remoteOffer webrtc.SessionDescription
err = json.Unmarshal(msg, &remoteOffer)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
err = pc.SetRemoteDescription(remoteOffer)
if err != nil {
wc.Close()
pc.Close()
return nil, err
}
go func() {
for {
_, msg, err := wc.ReadMessage()
if err != nil {
return
}
var candidate webrtc.ICECandidateInit
err = json.Unmarshal(msg, &candidate)
if err != nil {
return
}
pc.AddICECandidate(candidate)
}
}()
<-connected
return &webRTCTestClient{
wc: wc,
pc: pc,
track: track,
closed: closed,
}, nil
}
func (c *webRTCTestClient) close() {
c.pc.Close()
c.wc.Close()
<-c.closed
}
func TestWebRTCServer(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
medi := &media.Media{
Type: media.TypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
}
v := gortsplib.TransportTCP
source := gortsplib.Client{
Transport: &v,
}
err := source.StartRecording("rtsp://localhost:8554/stream", media.Medias{medi})
require.NoError(t, err)
defer source.Close()
c, err := newWebRTCTestClient("ws://localhost:8889/stream/ws")
require.NoError(t, err)
defer c.close()
time.Sleep(500 * time.Millisecond)
source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
trak := <-c.track
pkt, _, err := trak.ReadRTP()
require.NoError(t, err)
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 102,
SequenceNumber: pkt.SequenceNumber,
Timestamp: pkt.Timestamp,
SSRC: pkt.SSRC,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, pkt)
}