webrtc: unexport WHIP primitives (#3233)

This commit is contained in:
Alessandro Ros 2024-04-13 11:08:30 +02:00 committed by GitHub
parent a6cc52f0b2
commit a18bebfa58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 169 additions and 203 deletions

View File

@ -1,14 +1,17 @@
package webrtc
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/logger"
)
@ -28,7 +31,7 @@ func (c *WHIPClient) Publish(
videoTrack format.Format,
audioTrack format.Format,
) ([]*OutgoingTrack, error) {
iceServers, err := WHIPOptionsICEServers(ctx, c.HTTPClient, c.URL.String())
iceServers, err := c.optionsICEServers(ctx, c.URL.String())
if err != nil {
return nil, err
}
@ -64,7 +67,7 @@ func (c *WHIPClient) Publish(
return nil, err
}
res, err := PostOffer(ctx, c.HTTPClient, c.URL.String(), offer)
res, err := c.postOffer(ctx, c.URL.String(), offer)
if err != nil {
c.pc.Close()
return nil, err
@ -78,7 +81,7 @@ func (c *WHIPClient) Publish(
err = c.pc.SetAnswer(res.Answer)
if err != nil {
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, err
}
@ -90,9 +93,9 @@ outer:
for {
select {
case ca := <-c.pc.NewLocalCandidate():
err := WHIPPatchCandidate(ctx, c.HTTPClient, c.URL.String(), offer, res.ETag, ca)
err := c.patchCandidate(ctx, c.URL.String(), offer, res.ETag, ca)
if err != nil {
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, err
}
@ -103,7 +106,7 @@ outer:
break outer
case <-t.C:
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, fmt.Errorf("deadline exceeded while waiting connection")
}
@ -114,7 +117,7 @@ outer:
// Read reads tracks.
func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) {
iceServers, err := WHIPOptionsICEServers(ctx, c.HTTPClient, c.URL.String())
iceServers, err := c.optionsICEServers(ctx, c.URL.String())
if err != nil {
return nil, err
}
@ -144,7 +147,7 @@ func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) {
return nil, err
}
res, err := PostOffer(ctx, c.HTTPClient, c.URL.String(), offer)
res, err := c.postOffer(ctx, c.URL.String(), offer)
if err != nil {
c.pc.Close()
return nil, err
@ -159,7 +162,7 @@ func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) {
var sdp sdp.SessionDescription
err = sdp.Unmarshal([]byte(res.Answer.SDP))
if err != nil {
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, err
}
@ -167,14 +170,14 @@ func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) {
// check that there are at most two tracks
_, err = TrackCount(sdp.MediaDescriptions)
if err != nil {
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, err
}
err = c.pc.SetAnswer(res.Answer)
if err != nil {
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, err
}
@ -186,9 +189,9 @@ outer:
for {
select {
case ca := <-c.pc.NewLocalCandidate():
err := WHIPPatchCandidate(ctx, c.HTTPClient, c.URL.String(), offer, res.ETag, ca)
err := c.patchCandidate(ctx, c.URL.String(), offer, res.ETag, ca)
if err != nil {
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, err
}
@ -199,7 +202,7 @@ outer:
break outer
case <-t.C:
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, fmt.Errorf("deadline exceeded while waiting connection")
}
@ -207,7 +210,7 @@ outer:
tracks, err := c.pc.GatherIncomingTracks(ctx, 0)
if err != nil {
WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String()) //nolint:errcheck
c.deleteSession(context.Background(), c.URL.String()) //nolint:errcheck
c.pc.Close()
return nil, err
}
@ -217,7 +220,7 @@ outer:
// Close closes the client.
func (c *WHIPClient) Close() error {
err := WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String())
err := c.deleteSession(context.Background(), c.URL.String())
c.pc.Close()
return err
}
@ -232,3 +235,142 @@ func (c *WHIPClient) Wait(ctx context.Context) error {
return fmt.Errorf("terminated")
}
}
func (c *WHIPClient) optionsICEServers(
ctx context.Context,
ur string,
) ([]webrtc.ICEServer, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodOptions, ur, nil)
if err != nil {
return nil, err
}
res, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNoContent {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
return LinkHeaderUnmarshal(res.Header["Link"])
}
type whipPostOfferResponse struct {
Answer *webrtc.SessionDescription
Location string
ETag string
}
func (c *WHIPClient) postOffer(
ctx context.Context,
ur string,
offer *webrtc.SessionDescription,
) (*whipPostOfferResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ur, bytes.NewReader([]byte(offer.SDP)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/sdp")
res, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
contentType := res.Header.Get("Content-Type")
if contentType != "application/sdp" {
return nil, fmt.Errorf("bad Content-Type: expected 'application/sdp', got '%s'", contentType)
}
acceptPatch := res.Header.Get("Accept-Patch")
if acceptPatch != "application/trickle-ice-sdpfrag" {
return nil, fmt.Errorf("wrong Accept-Patch: expected 'application/trickle-ice-sdpfrag', got '%s'", acceptPatch)
}
Location := res.Header.Get("Location")
etag := res.Header.Get("ETag")
if etag == "" {
return nil, fmt.Errorf("ETag is missing")
}
sdp, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
answer := &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: string(sdp),
}
return &whipPostOfferResponse{
Answer: answer,
Location: Location,
ETag: etag,
}, nil
}
func (c *WHIPClient) patchCandidate(
ctx context.Context,
ur string,
offer *webrtc.SessionDescription,
etag string,
candidate *webrtc.ICECandidateInit,
) error {
frag, err := ICEFragmentMarshal(offer.SDP, []*webrtc.ICECandidateInit{candidate})
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, ur, bytes.NewReader(frag))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag")
req.Header.Set("If-Match", etag)
res, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %v", res.StatusCode)
}
return nil
}
func (c *WHIPClient) deleteSession(
ctx context.Context,
ur string,
) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, ur, nil)
if err != nil {
return err
}
res, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("bad status code: %v", res.StatusCode)
}
return nil
}

View File

@ -1,31 +0,0 @@
package webrtc
import (
"context"
"fmt"
"net/http"
)
// WHIPDeleteSession deletes a WHIP/WHEP session.
func WHIPDeleteSession(
ctx context.Context,
hc *http.Client,
ur string,
) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, ur, nil)
if err != nil {
return err
}
res, err := hc.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("bad status code: %v", res.StatusCode)
}
return nil
}

View File

@ -1,33 +0,0 @@
package webrtc
import (
"context"
"fmt"
"net/http"
"github.com/pion/webrtc/v3"
)
// WHIPOptionsICEServers sends a WHIP/WHEP request for ICE servers.
func WHIPOptionsICEServers(
ctx context.Context,
hc *http.Client,
ur string,
) ([]webrtc.ICEServer, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodOptions, ur, nil)
if err != nil {
return nil, err
}
res, err := hc.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNoContent {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
return LinkHeaderUnmarshal(res.Header["Link"])
}

View File

@ -1,45 +0,0 @@
package webrtc
import (
"bytes"
"context"
"fmt"
"net/http"
"github.com/pion/webrtc/v3"
)
// WHIPPatchCandidate sends a WHIP/WHEP candidate.
func WHIPPatchCandidate(
ctx context.Context,
hc *http.Client,
ur string,
offer *webrtc.SessionDescription,
etag string,
candidate *webrtc.ICECandidateInit,
) error {
frag, err := ICEFragmentMarshal(offer.SDP, []*webrtc.ICECandidateInit{candidate})
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, ur, bytes.NewReader(frag))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag")
req.Header.Set("If-Match", etag)
res, err := hc.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %v", res.StatusCode)
}
return nil
}

View File

@ -1,76 +0,0 @@
package webrtc
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"github.com/pion/webrtc/v3"
)
// WHIPPostOfferResponse is the response to a post offer.
type WHIPPostOfferResponse struct {
Answer *webrtc.SessionDescription
Location string
ETag string
}
// PostOffer posts a WHIP/WHEP offer.
func PostOffer(
ctx context.Context,
hc *http.Client,
ur string,
offer *webrtc.SessionDescription,
) (*WHIPPostOfferResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ur, bytes.NewReader([]byte(offer.SDP)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/sdp")
res, err := hc.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
contentType := res.Header.Get("Content-Type")
if contentType != "application/sdp" {
return nil, fmt.Errorf("bad Content-Type: expected 'application/sdp', got '%s'", contentType)
}
acceptPatch := res.Header.Get("Accept-Patch")
if acceptPatch != "application/trickle-ice-sdpfrag" {
return nil, fmt.Errorf("wrong Accept-Patch: expected 'application/trickle-ice-sdpfrag', got '%s'", acceptPatch)
}
Location := res.Header.Get("Location")
etag := res.Header.Get("ETag")
if etag == "" {
return nil, fmt.Errorf("ETag is missing")
}
sdp, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
answer := &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: string(sdp),
}
return &WHIPPostOfferResponse{
Answer: answer,
Location: Location,
ETag: etag,
}, nil
}

View File

@ -207,8 +207,17 @@ func TestServerOptionsICEServer(t *testing.T) {
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
iceServers, err := webrtc.WHIPOptionsICEServers(context.Background(), hc,
"http://myuser:mypass@localhost:8886/nonexisting/whep")
req, err := http.NewRequest(http.MethodOptions,
"http://myuser:mypass@localhost:8886/nonexisting/whep", nil)
require.NoError(t, err)
res, err := hc.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusNoContent, res.StatusCode)
iceServers, err := webrtc.LinkHeaderUnmarshal(res.Header["Link"])
require.NoError(t, err)
require.Equal(t, []pwebrtc.ICEServer{{