diff --git a/internal/protocols/webrtc/whip_client.go b/internal/protocols/webrtc/whip_client.go index 61c076a4..933612c1 100644 --- a/internal/protocols/webrtc/whip_client.go +++ b/internal/protocols/webrtc/whip_client.go @@ -1,14 +1,17 @@ package webrtc import ( + "bytes" "context" "fmt" + "io" "net/http" "net/url" "time" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/sdp/v3" + "github.com/pion/webrtc/v3" "github.com/bluenviron/mediamtx/internal/logger" ) @@ -28,7 +31,7 @@ func (c *WHIPClient) Publish( videoTrack format.Format, audioTrack format.Format, ) ([]*OutgoingTrack, error) { - iceServers, err := WHIPOptionsICEServers(ctx, c.HTTPClient, c.URL.String()) + iceServers, err := c.optionsICEServers(ctx, c.URL.String()) if err != nil { return nil, err } @@ -64,7 +67,7 @@ func (c *WHIPClient) Publish( return nil, err } - res, err := PostOffer(ctx, c.HTTPClient, c.URL.String(), offer) + res, err := c.postOffer(ctx, c.URL.String(), offer) if err != nil { c.pc.Close() return nil, err @@ -78,7 +81,7 @@ func (c *WHIPClient) Publish( err = c.pc.SetAnswer(res.Answer) if err != nil { - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, err } @@ -90,9 +93,9 @@ outer: for { select { case ca := <-c.pc.NewLocalCandidate(): - err := WHIPPatchCandidate(ctx, c.HTTPClient, c.URL.String(), offer, res.ETag, ca) + err := c.patchCandidate(ctx, c.URL.String(), offer, res.ETag, ca) if err != nil { - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, err } @@ -103,7 +106,7 @@ outer: break outer case <-t.C: - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, fmt.Errorf("deadline exceeded while waiting connection") } @@ -114,7 +117,7 @@ outer: // Read reads tracks. func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) { - iceServers, err := WHIPOptionsICEServers(ctx, c.HTTPClient, c.URL.String()) + iceServers, err := c.optionsICEServers(ctx, c.URL.String()) if err != nil { return nil, err } @@ -144,7 +147,7 @@ func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) { return nil, err } - res, err := PostOffer(ctx, c.HTTPClient, c.URL.String(), offer) + res, err := c.postOffer(ctx, c.URL.String(), offer) if err != nil { c.pc.Close() return nil, err @@ -159,7 +162,7 @@ func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) { var sdp sdp.SessionDescription err = sdp.Unmarshal([]byte(res.Answer.SDP)) if err != nil { - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, err } @@ -167,14 +170,14 @@ func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) { // check that there are at most two tracks _, err = TrackCount(sdp.MediaDescriptions) if err != nil { - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, err } err = c.pc.SetAnswer(res.Answer) if err != nil { - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, err } @@ -186,9 +189,9 @@ outer: for { select { case ca := <-c.pc.NewLocalCandidate(): - err := WHIPPatchCandidate(ctx, c.HTTPClient, c.URL.String(), offer, res.ETag, ca) + err := c.patchCandidate(ctx, c.URL.String(), offer, res.ETag, ca) if err != nil { - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, err } @@ -199,7 +202,7 @@ outer: break outer case <-t.C: - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, fmt.Errorf("deadline exceeded while waiting connection") } @@ -207,7 +210,7 @@ outer: tracks, err := c.pc.GatherIncomingTracks(ctx, 0) if err != nil { - WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck + c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck c.pc.Close() return nil, err } @@ -217,7 +220,7 @@ outer: // Close closes the client. func (c *WHIPClient) Close() error { - err := WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) + err := c.deleteSession(context.Background(), c.URL.String()) c.pc.Close() return err } @@ -232,3 +235,142 @@ func (c *WHIPClient) Wait(ctx context.Context) error { return fmt.Errorf("terminated") } } + +func (c *WHIPClient) optionsICEServers( + ctx context.Context, + ur string, +) ([]webrtc.ICEServer, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodOptions, ur, nil) + if err != nil { + return nil, err + } + + res, err := c.HTTPClient.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNoContent { + return nil, fmt.Errorf("bad status code: %v", res.StatusCode) + } + + return LinkHeaderUnmarshal(res.Header["Link"]) +} + +type whipPostOfferResponse struct { + Answer *webrtc.SessionDescription + Location string + ETag string +} + +func (c *WHIPClient) postOffer( + ctx context.Context, + ur string, + offer *webrtc.SessionDescription, +) (*whipPostOfferResponse, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, ur, bytes.NewReader([]byte(offer.SDP))) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/sdp") + + res, err := c.HTTPClient.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusCreated { + return nil, fmt.Errorf("bad status code: %v", res.StatusCode) + } + + contentType := res.Header.Get("Content-Type") + if contentType != "application/sdp" { + return nil, fmt.Errorf("bad Content-Type: expected 'application/sdp', got '%s'", contentType) + } + + acceptPatch := res.Header.Get("Accept-Patch") + if acceptPatch != "application/trickle-ice-sdpfrag" { + return nil, fmt.Errorf("wrong Accept-Patch: expected 'application/trickle-ice-sdpfrag', got '%s'", acceptPatch) + } + + Location := res.Header.Get("Location") + + etag := res.Header.Get("ETag") + if etag == "" { + return nil, fmt.Errorf("ETag is missing") + } + + sdp, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + answer := &webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: string(sdp), + } + + return &whipPostOfferResponse{ + Answer: answer, + Location: Location, + ETag: etag, + }, nil +} + +func (c *WHIPClient) patchCandidate( + ctx context.Context, + ur string, + offer *webrtc.SessionDescription, + etag string, + candidate *webrtc.ICECandidateInit, +) error { + frag, err := ICEFragmentMarshal(offer.SDP, []*webrtc.ICECandidateInit{candidate}) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, ur, bytes.NewReader(frag)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag") + req.Header.Set("If-Match", etag) + + res, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusNoContent { + return fmt.Errorf("bad status code: %v", res.StatusCode) + } + + return nil +} + +func (c *WHIPClient) deleteSession( + ctx context.Context, + ur string, +) error { + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, ur, nil) + if err != nil { + return err + } + + res, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("bad status code: %v", res.StatusCode) + } + + return nil +} diff --git a/internal/protocols/webrtc/whip_delete_session.go b/internal/protocols/webrtc/whip_delete_session.go deleted file mode 100644 index 19277f59..00000000 --- a/internal/protocols/webrtc/whip_delete_session.go +++ /dev/null @@ -1,31 +0,0 @@ -package webrtc - -import ( - "context" - "fmt" - "net/http" -) - -// WHIPDeleteSession deletes a WHIP/WHEP session. -func WHIPDeleteSession( - ctx context.Context, - hc *http.Client, - ur string, -) error { - req, err := http.NewRequestWithContext(ctx, http.MethodDelete, ur, nil) - if err != nil { - return err - } - - res, err := hc.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - - if res.StatusCode != http.StatusOK { - return fmt.Errorf("bad status code: %v", res.StatusCode) - } - - return nil -} diff --git a/internal/protocols/webrtc/whip_options_ice_servers.go b/internal/protocols/webrtc/whip_options_ice_servers.go deleted file mode 100644 index 15bdce94..00000000 --- a/internal/protocols/webrtc/whip_options_ice_servers.go +++ /dev/null @@ -1,33 +0,0 @@ -package webrtc - -import ( - "context" - "fmt" - "net/http" - - "github.com/pion/webrtc/v3" -) - -// WHIPOptionsICEServers sends a WHIP/WHEP request for ICE servers. -func WHIPOptionsICEServers( - ctx context.Context, - hc *http.Client, - ur string, -) ([]webrtc.ICEServer, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodOptions, ur, nil) - if err != nil { - return nil, err - } - - res, err := hc.Do(req) - if err != nil { - return nil, err - } - defer res.Body.Close() - - if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNoContent { - return nil, fmt.Errorf("bad status code: %v", res.StatusCode) - } - - return LinkHeaderUnmarshal(res.Header["Link"]) -} diff --git a/internal/protocols/webrtc/whip_patch_candidate.go b/internal/protocols/webrtc/whip_patch_candidate.go deleted file mode 100644 index 975e8840..00000000 --- a/internal/protocols/webrtc/whip_patch_candidate.go +++ /dev/null @@ -1,45 +0,0 @@ -package webrtc - -import ( - "bytes" - "context" - "fmt" - "net/http" - - "github.com/pion/webrtc/v3" -) - -// WHIPPatchCandidate sends a WHIP/WHEP candidate. -func WHIPPatchCandidate( - ctx context.Context, - hc *http.Client, - ur string, - offer *webrtc.SessionDescription, - etag string, - candidate *webrtc.ICECandidateInit, -) error { - frag, err := ICEFragmentMarshal(offer.SDP, []*webrtc.ICECandidateInit{candidate}) - if err != nil { - return err - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPatch, ur, bytes.NewReader(frag)) - if err != nil { - return err - } - - req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag") - req.Header.Set("If-Match", etag) - - res, err := hc.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - - if res.StatusCode != http.StatusNoContent { - return fmt.Errorf("bad status code: %v", res.StatusCode) - } - - return nil -} diff --git a/internal/protocols/webrtc/whip_post_offer.go b/internal/protocols/webrtc/whip_post_offer.go deleted file mode 100644 index c32abd05..00000000 --- a/internal/protocols/webrtc/whip_post_offer.go +++ /dev/null @@ -1,76 +0,0 @@ -package webrtc - -import ( - "bytes" - "context" - "fmt" - "io" - "net/http" - - "github.com/pion/webrtc/v3" -) - -// WHIPPostOfferResponse is the response to a post offer. -type WHIPPostOfferResponse struct { - Answer *webrtc.SessionDescription - Location string - ETag string -} - -// PostOffer posts a WHIP/WHEP offer. -func PostOffer( - ctx context.Context, - hc *http.Client, - ur string, - offer *webrtc.SessionDescription, -) (*WHIPPostOfferResponse, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodPost, ur, bytes.NewReader([]byte(offer.SDP))) - if err != nil { - return nil, err - } - - req.Header.Set("Content-Type", "application/sdp") - - res, err := hc.Do(req) - if err != nil { - return nil, err - } - defer res.Body.Close() - - if res.StatusCode != http.StatusCreated { - return nil, fmt.Errorf("bad status code: %v", res.StatusCode) - } - - contentType := res.Header.Get("Content-Type") - if contentType != "application/sdp" { - return nil, fmt.Errorf("bad Content-Type: expected 'application/sdp', got '%s'", contentType) - } - - acceptPatch := res.Header.Get("Accept-Patch") - if acceptPatch != "application/trickle-ice-sdpfrag" { - return nil, fmt.Errorf("wrong Accept-Patch: expected 'application/trickle-ice-sdpfrag', got '%s'", acceptPatch) - } - - Location := res.Header.Get("Location") - - etag := res.Header.Get("ETag") - if etag == "" { - return nil, fmt.Errorf("ETag is missing") - } - - sdp, err := io.ReadAll(res.Body) - if err != nil { - return nil, err - } - - answer := &webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: string(sdp), - } - - return &WHIPPostOfferResponse{ - Answer: answer, - Location: Location, - ETag: etag, - }, nil -} diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index 32767828..103a7316 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -207,8 +207,17 @@ func TestServerOptionsICEServer(t *testing.T) { defer tr.CloseIdleConnections() hc := &http.Client{Transport: tr} - iceServers, err := webrtc.WHIPOptionsICEServers(context.Background(), hc, - "http://myuser:mypass@localhost:8886/nonexisting/whep") + req, err := http.NewRequest(http.MethodOptions, + "http://myuser:mypass@localhost:8886/nonexisting/whep", nil) + require.NoError(t, err) + + res, err := hc.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusNoContent, res.StatusCode) + + iceServers, err := webrtc.LinkHeaderUnmarshal(res.Header["Link"]) require.NoError(t, err) require.Equal(t, []pwebrtc.ICEServer{{