From 34dbcfb50867f8865f91dfa5b6a90630982c4a7e Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sun, 18 Feb 2024 22:15:08 +0100 Subject: [PATCH] move WebRTC tests into internal/servers/webrtc (#3043) --- internal/core/webrtc_server_test.go | 365 ---------------------- internal/servers/webrtc/http_server.go | 2 +- internal/servers/webrtc/server.go | 9 +- internal/servers/webrtc/server_test.go | 410 +++++++++++++++++++++++++ internal/servers/webrtc/session.go | 10 +- 5 files changed, 424 insertions(+), 372 deletions(-) delete mode 100644 internal/core/webrtc_server_test.go create mode 100644 internal/servers/webrtc/server_test.go diff --git a/internal/core/webrtc_server_test.go b/internal/core/webrtc_server_test.go deleted file mode 100644 index 9b952047..00000000 --- a/internal/core/webrtc_server_test.go +++ /dev/null @@ -1,365 +0,0 @@ -package core - -import ( - "bytes" - "context" - "net/http" - "net/url" - "testing" - "time" - - "github.com/bluenviron/gortsplib/v4" - "github.com/bluenviron/gortsplib/v4/pkg/base" - "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/pion/rtp" - pwebrtc "github.com/pion/webrtc/v3" - "github.com/stretchr/testify/require" - - "github.com/bluenviron/mediamtx/internal/protocols/webrtc" - "github.com/bluenviron/mediamtx/internal/test" -) - -func TestWebRTCPages(t *testing.T) { - p, ok := newInstance("paths:\n" + - " all:\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - for _, path := range []string{"/stream", "/stream/publish", "/publish"} { - func() { - req, err := http.NewRequest(http.MethodGet, "http://localhost:8889"+path, nil) - require.NoError(t, err) - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusOK, res.StatusCode) - }() - } -} - -func TestWebRTCRead(t *testing.T) { - for _, auth := range []string{ - "none", - "internal", - "external", - } { - t.Run("auth_"+auth, func(t *testing.T) { - var conf string - - switch auth { - case "none": - conf = "paths:\n" + - " all_others:\n" - - case "internal": - conf = "paths:\n" + - " all_others:\n" + - " readUser: myuser\n" + - " readPass: mypass\n" - - case "external": - conf = "externalAuthenticationURL: http://localhost:9120/auth\n" + - "paths:\n" + - " all_others:\n" - } - - p, ok := newInstance(conf) - require.Equal(t, true, ok) - defer p.Close() - - var a *testHTTPAuthenticator - if auth == "external" { - a = newTestHTTPAuthenticator(t, "rtsp", "publish") - } - - medi := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - }}, - } - - v := gortsplib.TransportTCP - source := gortsplib.Client{ - Transport: &v, - } - err := source.StartRecording( - "rtsp://testpublisher:testpass@localhost:8554/teststream?param=value", - &description.Session{Medias: []*description.Media{medi}}) - require.NoError(t, err) - defer source.Close() - - if auth == "external" { - a.close() - a = newTestHTTPAuthenticator(t, "webrtc", "read") - defer a.close() - } - - hc := &http.Client{Transport: &http.Transport{}} - - user := "" - pass := "" - - switch auth { - case "internal": - user = "myuser" - pass = "mypass" - - case "external": - user = "testreader" - pass = "testpass" - } - - ur := "http://" - if user != "" { - ur += user + ":" + pass + "@" - } - ur += "localhost:8889/teststream/whep?param=value" - - go func() { - time.Sleep(500 * time.Millisecond) - - err := source.WritePacketRTP(medi, &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 123, - Timestamp: 45343, - SSRC: 563423, - }, - Payload: []byte{5, 3}, - }) - require.NoError(t, err) - }() - - u, err := url.Parse(ur) - require.NoError(t, err) - - c := &webrtc.WHIPClient{ - HTTPClient: hc, - URL: u, - Log: test.NilLogger{}, - } - - tracks, err := c.Read(context.Background()) - require.NoError(t, err) - defer checkClose(t, c.Close) - - pkt, err := tracks[0].ReadRTP() - require.NoError(t, err) - require.Equal(t, &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 100, - SequenceNumber: pkt.SequenceNumber, - Timestamp: pkt.Timestamp, - SSRC: pkt.SSRC, - CSRC: []uint32{}, - }, - Payload: []byte{5, 3}, - }, pkt) - }) - } -} - -func TestWebRTCReadNotFound(t *testing.T) { - p, ok := newInstance("paths:\n" + - " all_others:\n") - require.Equal(t, true, ok) - defer p.Close() - - hc := &http.Client{Transport: &http.Transport{}} - - iceServers, err := webrtc.WHIPOptionsICEServers(context.Background(), hc, "http://localhost:8889/stream/whep") - require.NoError(t, err) - - pc, err := pwebrtc.NewPeerConnection(pwebrtc.Configuration{ - ICEServers: iceServers, - }) - require.NoError(t, err) - defer pc.Close() //nolint:errcheck - - _, err = pc.AddTransceiverFromKind(pwebrtc.RTPCodecTypeVideo) - require.NoError(t, err) - - offer, err := pc.CreateOffer(nil) - require.NoError(t, err) - - req, err := http.NewRequest(http.MethodPost, "http://localhost:8889/stream/whep", bytes.NewReader([]byte(offer.SDP))) - require.NoError(t, err) - - req.Header.Set("Content-Type", "application/sdp") - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusNotFound, res.StatusCode) -} - -func TestWebRTCPublish(t *testing.T) { - for _, auth := range []string{ - "none", - "internal", - "external", - } { - t.Run("auth_"+auth, func(t *testing.T) { - var conf string - - switch auth { - case "none": - conf = "paths:\n" + - " all_others:\n" - - case "internal": - conf = "paths:\n" + - " all_others:\n" + - " publishUser: myuser\n" + - " publishPass: mypass\n" - - case "external": - conf = "externalAuthenticationURL: http://localhost:9120/auth\n" + - "paths:\n" + - " all_others:\n" - } - - p, ok := newInstance(conf) - require.Equal(t, true, ok) - defer p.Close() - - var a *testHTTPAuthenticator - if auth == "external" { - a = newTestHTTPAuthenticator(t, "webrtc", "publish") - } - - hc := &http.Client{Transport: &http.Transport{}} - - // preflight requests must always work, without authentication - func() { - req, err := http.NewRequest(http.MethodOptions, "http://localhost:8889/teststream/whip", nil) - require.NoError(t, err) - - req.Header.Set("Access-Control-Request-Method", "OPTIONS") - - res, err := hc.Do(req) - require.NoError(t, err) - defer res.Body.Close() - - require.Equal(t, http.StatusNoContent, res.StatusCode) - - if auth != "none" { - _, ok := res.Header["Link"] - require.Equal(t, false, ok) - } - }() - - user := "" - pass := "" - - switch auth { - case "internal": - user = "myuser" - pass = "mypass" - - case "external": - user = "testpublisher" - pass = "testpass" - } - - ur := "http://" - if user != "" { - ur += user + ":" + pass + "@" - } - ur += "localhost:8889/teststream/whip?param=value" - - su, err := url.Parse(ur) - require.NoError(t, err) - - s := &webrtc.WHIPClient{ - HTTPClient: hc, - URL: su, - Log: test.NilLogger{}, - } - - tracks, err := s.Publish(context.Background(), testMediaH264.Formats[0], nil) - require.NoError(t, err) - defer checkClose(t, s.Close) - - err = tracks[0].WriteRTP(&rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 123, - Timestamp: 45343, - SSRC: 563423, - }, - Payload: []byte{1}, - }) - require.NoError(t, err) - - time.Sleep(200 * time.Millisecond) - - if auth == "external" { - a.close() - a = newTestHTTPAuthenticator(t, "rtsp", "read") - defer a.close() - } - - c := gortsplib.Client{ - OnDecodeError: func(err error) { - panic(err) - }, - } - - u, err := base.ParseURL("rtsp://testreader:testpass@127.0.0.1:8554/teststream?param=value") - require.NoError(t, err) - - err = c.Start(u.Scheme, u.Host) - require.NoError(t, err) - defer c.Close() - - desc, _, err := c.Describe(u) - require.NoError(t, err) - - var forma *format.H264 - medi := desc.FindFormat(&forma) - - _, err = c.Setup(desc.BaseURL, medi, 0, 0) - require.NoError(t, err) - - received := make(chan struct{}) - - c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - require.Equal(t, []byte{3}, pkt.Payload) - close(received) - }) - - _, err = c.Play(nil) - require.NoError(t, err) - - err = tracks[0].WriteRTP(&rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 124, - Timestamp: 45343, - SSRC: 563423, - }, - Payload: []byte{3}, - }) - require.NoError(t, err) - - <-received - }) - } -} diff --git a/internal/servers/webrtc/http_server.go b/internal/servers/webrtc/http_server.go index fdd48761..de82cfe5 100644 --- a/internal/servers/webrtc/http_server.go +++ b/internal/servers/webrtc/http_server.go @@ -58,7 +58,7 @@ type httpServer struct { allowOrigin string trustedProxies conf.IPsOrCIDRs readTimeout conf.StringDuration - pathManager defs.PathManager + pathManager serverPathManager parent *Server inner *httpp.WrappedServer diff --git a/internal/servers/webrtc/server.go b/internal/servers/webrtc/server.go index 5748d82b..cb99ec8a 100644 --- a/internal/servers/webrtc/server.go +++ b/internal/servers/webrtc/server.go @@ -26,6 +26,7 @@ import ( "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/webrtc" "github.com/bluenviron/mediamtx/internal/restrictnetwork" + "github.com/bluenviron/mediamtx/internal/stream" ) const ( @@ -164,6 +165,12 @@ type webRTCDeleteSessionReq struct { res chan webRTCDeleteSessionRes } +type serverPathManager interface { + FindPathConf(req defs.PathFindPathConfReq) (*conf.Path, error) + AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error) + AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) +} + type serverParent interface { logger.Writer } @@ -185,7 +192,7 @@ type Server struct { AdditionalHosts []string ICEServers []conf.WebRTCICEServer ExternalCmdPool *externalcmd.Pool - PathManager defs.PathManager + PathManager serverPathManager Parent serverParent ctx context.Context diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go new file mode 100644 index 00000000..e15f92dc --- /dev/null +++ b/internal/servers/webrtc/server_test.go @@ -0,0 +1,410 @@ +package webrtc + +import ( + "bytes" + "context" + "net/http" + "net/url" + "testing" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediamtx/internal/asyncwriter" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/externalcmd" + "github.com/bluenviron/mediamtx/internal/protocols/webrtc" + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/bluenviron/mediamtx/internal/unit" + "github.com/pion/rtp" + pwebrtc "github.com/pion/webrtc/v3" + "github.com/stretchr/testify/require" +) + +func checkClose(t *testing.T, closeFunc func() error) { + require.NoError(t, closeFunc()) +} + +type dummyPath struct { + stream *stream.Stream + streamCreated chan struct{} +} + +func (p *dummyPath) Name() string { + return "teststream" +} + +func (p *dummyPath) SafeConf() *conf.Path { + return &conf.Path{} +} + +func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment { + return externalcmd.Environment{} +} + +func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { + var err error + p.stream, err = stream.New( + 1460, + req.Desc, + true, + test.NilLogger{}, + ) + if err != nil { + return nil, err + } + close(p.streamCreated) + return p.stream, nil +} + +func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) { +} + +func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) { +} + +func (p *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) { +} + +type dummyPathManager struct { + path *dummyPath +} + +func (pm *dummyPathManager) FindPathConf(_ defs.PathFindPathConfReq) (*conf.Path, error) { + return &conf.Path{}, nil +} + +func (pm *dummyPathManager) AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error) { + return pm.path, nil +} + +func (pm *dummyPathManager) AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + if req.AccessRequest.Name == "nonexisting" { + return nil, nil, &defs.PathNoOnePublishingError{} + } + return pm.path, pm.path.stream, nil +} + +func TestServerStaticPages(t *testing.T) { + s := &Server{ + Address: "127.0.0.1:8889", + Encryption: false, + ServerKey: "", + ServerCert: "", + AllowOrigin: "", + TrustedProxies: conf.IPsOrCIDRs{}, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + LocalUDPAddress: "127.0.0.1:8887", + LocalTCPAddress: "127.0.0.1:8887", + IPsFromInterfaces: true, + IPsFromInterfacesList: []string{}, + AdditionalHosts: []string{}, + ICEServers: []conf.WebRTCICEServer{}, + ExternalCmdPool: nil, + PathManager: &dummyPathManager{}, + Parent: test.NilLogger{}, + } + err := s.Initialize() + require.NoError(t, err) + defer s.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + for _, path := range []string{"/stream", "/stream/publish", "/publish"} { + func() { + req, err := http.NewRequest(http.MethodGet, "http://localhost:8889"+path, nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + }() + } +} + +func TestServerPublish(t *testing.T) { + path := &dummyPath{ + streamCreated: make(chan struct{}), + } + + pathManager := &dummyPathManager{path: path} + + s := &Server{ + Address: "127.0.0.1:8889", + Encryption: false, + ServerKey: "", + ServerCert: "", + AllowOrigin: "", + TrustedProxies: conf.IPsOrCIDRs{}, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + LocalUDPAddress: "127.0.0.1:8887", + LocalTCPAddress: "127.0.0.1:8887", + IPsFromInterfaces: true, + IPsFromInterfacesList: []string{}, + AdditionalHosts: []string{}, + ICEServers: []conf.WebRTCICEServer{}, + ExternalCmdPool: nil, + PathManager: pathManager, + Parent: test.NilLogger{}, + } + err := s.Initialize() + require.NoError(t, err) + defer s.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + // preflight requests must always work, without authentication + func() { + req, err := http.NewRequest(http.MethodOptions, "http://localhost:8889/teststream/whip", nil) + require.NoError(t, err) + + req.Header.Set("Access-Control-Request-Method", "OPTIONS") + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusNoContent, res.StatusCode) + + _, ok := res.Header["Link"] + require.Equal(t, false, ok) + }() + + ur := "http://" + ur += "localhost:8889/teststream/whip?param=value" + + su, err := url.Parse(ur) + require.NoError(t, err) + + wc := &webrtc.WHIPClient{ + HTTPClient: hc, + URL: su, + Log: test.NilLogger{}, + } + + tracks, err := wc.Publish(context.Background(), test.FormatH264, nil) + require.NoError(t, err) + defer checkClose(t, wc.Close) + + err = tracks[0].WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{1}, + }) + require.NoError(t, err) + + <-path.streamCreated + + aw := asyncwriter.New(512, &test.NilLogger{}) + + recv := make(chan struct{}) + + path.stream.AddReader(aw, + path.stream.Desc().Medias[0], + path.stream.Desc().Medias[0].Formats[0], + func(u unit.Unit) error { + require.Equal(t, [][]byte{ + {2}, + }, u.(*unit.H264).AU) + close(recv) + return nil + }) + + err = tracks[0].WriteRTP(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 124, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{2}, + }) + require.NoError(t, err) + + aw.Start() + <-recv + aw.Stop() +} + +func TestServerRead(t *testing.T) { + testMediaH264 := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{test.FormatH264}, + } + + /*testMediaAAC := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{test.FormatMPEG4Audio}, + }*/ + + desc := &description.Session{Medias: []*description.Media{testMediaH264}} + + stream, err := stream.New( + 1460, + desc, + true, + test.NilLogger{}, + ) + require.NoError(t, err) + + path := &dummyPath{stream: stream} + + pathManager := &dummyPathManager{path: path} + + s := &Server{ + Address: "127.0.0.1:8889", + Encryption: false, + ServerKey: "", + ServerCert: "", + AllowOrigin: "", + TrustedProxies: conf.IPsOrCIDRs{}, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + LocalUDPAddress: "127.0.0.1:8887", + LocalTCPAddress: "127.0.0.1:8887", + IPsFromInterfaces: true, + IPsFromInterfacesList: []string{}, + AdditionalHosts: []string{}, + ICEServers: []conf.WebRTCICEServer{}, + ExternalCmdPool: nil, + PathManager: pathManager, + Parent: test.NilLogger{}, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() + + ur := "http://" + ur += "localhost:8889/teststream/whep?param=value" + + u, err := url.Parse(ur) + require.NoError(t, err) + + hc := &http.Client{Transport: &http.Transport{}} + + wc := &webrtc.WHIPClient{ + HTTPClient: hc, + URL: u, + Log: test.NilLogger{}, + } + + writerDone := make(chan struct{}) + defer func() { <-writerDone }() + + writerTerminate := make(chan struct{}) + defer close(writerTerminate) + + go func() { + defer close(writerDone) + for { + select { + case <-time.After(100 * time.Millisecond): + case <-writerTerminate: + return + } + stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ + Base: unit.Base{ + NTP: time.Time{}, + }, + AU: [][]byte{ + {5, 1}, + }, + }) + } + }() + + tracks, err := wc.Read(context.Background()) + require.NoError(t, err) + defer checkClose(t, wc.Close) + + pkt, err := tracks[0].ReadRTP() + require.NoError(t, err) + require.Equal(t, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 100, + SequenceNumber: pkt.SequenceNumber, + Timestamp: pkt.Timestamp, + SSRC: pkt.SSRC, + CSRC: []uint32{}, + }, + Payload: []byte{ + 0x18, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9, + 0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00, 0x00, + 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xf0, + 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, + 0x07, 0x08, 0x00, 0x02, 0x05, 0x01, + }, + }, pkt) +} + +func TestServerReadNotFound(t *testing.T) { + pathManager := &dummyPathManager{} + + s := &Server{ + Address: "127.0.0.1:8889", + Encryption: false, + ServerKey: "", + ServerCert: "", + AllowOrigin: "", + TrustedProxies: conf.IPsOrCIDRs{}, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + LocalUDPAddress: "127.0.0.1:8887", + LocalTCPAddress: "127.0.0.1:8887", + IPsFromInterfaces: true, + IPsFromInterfacesList: []string{}, + AdditionalHosts: []string{}, + ICEServers: []conf.WebRTCICEServer{}, + ExternalCmdPool: nil, + PathManager: pathManager, + Parent: test.NilLogger{}, + } + err := s.Initialize() + require.NoError(t, err) + defer s.Close() + + hc := &http.Client{Transport: &http.Transport{}} + + iceServers, err := webrtc.WHIPOptionsICEServers(context.Background(), hc, "http://localhost:8889/nonexisting/whep") + require.NoError(t, err) + + pc, err := pwebrtc.NewPeerConnection(pwebrtc.Configuration{ + ICEServers: iceServers, + }) + require.NoError(t, err) + defer pc.Close() //nolint:errcheck + + _, err = pc.AddTransceiverFromKind(pwebrtc.RTPCodecTypeVideo) + require.NoError(t, err) + + offer, err := pc.CreateOffer(nil) + require.NoError(t, err) + + req, err := http.NewRequest(http.MethodPost, + "http://localhost:8889/nonexisting/whep", bytes.NewReader([]byte(offer.SDP))) + require.NoError(t, err) + + req.Header.Set("Content-Type", "application/sdp") + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusNotFound, res.StatusCode) +} diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 620d4c3f..525235dc 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -7,7 +7,6 @@ import ( "fmt" "net" "net/http" - "strings" "sync" "time" @@ -283,7 +282,7 @@ type session struct { req webRTCNewSessionReq wg *sync.WaitGroup externalCmdPool *externalcmd.Pool - pathManager defs.PathManager + pathManager serverPathManager parent *Server ctx context.Context @@ -511,14 +510,15 @@ func (s *session) runRead() (int, error) { }, }) if err != nil { - var terr defs.AuthenticationError - if errors.As(err, &terr) { + var terr1 defs.AuthenticationError + if errors.As(err, &terr1) { // wait some seconds to mitigate brute force attacks <-time.After(pauseAfterAuthError) return http.StatusUnauthorized, err } - if strings.HasPrefix(err.Error(), "no one is publishing") { + var terr2 *defs.PathNoOnePublishingError + if errors.As(err, &terr2) { return http.StatusNotFound, err }