mediamtx/internal/protocols/whip/client.go

376 lines
7.7 KiB
Go

// Package whip contains a WHIP/WHEP client.
package whip
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"time"
"github.com/pion/sdp/v3"
pwebrtc "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/httpp"
"github.com/bluenviron/mediamtx/internal/protocols/webrtc"
)
const (
handshakeTimeout = 10 * time.Second
trackGatherTimeout = 2 * time.Second
)
// Client is a WHIP client.
type Client struct {
HTTPClient *http.Client
URL *url.URL
Log logger.Writer
pc *webrtc.PeerConnection
patchIsSupported bool
}
// Publish publishes tracks.
func (c *Client) Publish(
ctx context.Context,
outgoingTracks []*webrtc.OutgoingTrack,
) error {
iceServers, err := c.optionsICEServers(ctx)
if err != nil {
return err
}
c.pc = &webrtc.PeerConnection{
ICEServers: iceServers,
HandshakeTimeout: conf.StringDuration(10 * time.Second),
TrackGatherTimeout: conf.StringDuration(2 * time.Second),
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
OutgoingTracks: outgoingTracks,
Log: c.Log,
}
err = c.pc.Start()
if err != nil {
return err
}
offer, err := c.pc.CreatePartialOffer()
if err != nil {
c.pc.Close()
return err
}
res, err := c.postOffer(ctx, offer)
if err != nil {
c.pc.Close()
return err
}
c.URL, err = c.URL.Parse(res.Location)
if err != nil {
c.pc.Close()
return err
}
err = c.pc.SetAnswer(res.Answer)
if err != nil {
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return err
}
t := time.NewTimer(handshakeTimeout)
defer t.Stop()
outer:
for {
select {
case ca := <-c.pc.NewLocalCandidate():
err := c.patchCandidate(ctx, offer, res.ETag, ca)
if err != nil {
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return err
}
case <-c.pc.GatheringDone():
case <-c.pc.Ready():
break outer
case <-t.C:
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return fmt.Errorf("deadline exceeded while waiting connection")
}
}
return nil
}
// Read reads tracks.
func (c *Client) Read(ctx context.Context) ([]*webrtc.IncomingTrack, error) {
iceServers, err := c.optionsICEServers(ctx)
if err != nil {
return nil, err
}
c.pc = &webrtc.PeerConnection{
ICEServers: iceServers,
HandshakeTimeout: conf.StringDuration(10 * time.Second),
TrackGatherTimeout: conf.StringDuration(2 * time.Second),
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: false,
Log: c.Log,
}
err = c.pc.Start()
if err != nil {
return nil, err
}
offer, err := c.pc.CreatePartialOffer()
if err != nil {
c.pc.Close()
return nil, err
}
res, err := c.postOffer(ctx, offer)
if err != nil {
c.pc.Close()
return nil, err
}
c.URL, err = c.URL.Parse(res.Location)
if err != nil {
c.pc.Close()
return nil, err
}
var sdp sdp.SessionDescription
err = sdp.Unmarshal([]byte(res.Answer.SDP))
if err != nil {
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return nil, err
}
err = webrtc.TracksAreValid(sdp.MediaDescriptions)
if err != nil {
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return nil, err
}
err = c.pc.SetAnswer(res.Answer)
if err != nil {
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return nil, err
}
t := time.NewTimer(handshakeTimeout)
defer t.Stop()
outer:
for {
select {
case ca := <-c.pc.NewLocalCandidate():
err = c.patchCandidate(ctx, offer, res.ETag, ca)
if err != nil {
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return nil, err
}
case <-c.pc.GatheringDone():
case <-c.pc.Ready():
break outer
case <-t.C:
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return nil, fmt.Errorf("deadline exceeded while waiting connection")
}
}
tracks, err := c.pc.GatherIncomingTracks(ctx)
if err != nil {
c.deleteSession(context.Background()) //nolint:errcheck
c.pc.Close()
return nil, err
}
return tracks, nil
}
// PeerConnection returns the underlying peer connection.
func (c *Client) PeerConnection() *webrtc.PeerConnection {
return c.pc
}
// StartReading starts reading all incoming tracks.
func (c *Client) StartReading() {
c.pc.StartReading()
}
// Close closes the client.
func (c *Client) Close() error {
err := c.deleteSession(context.Background())
c.pc.Close()
return err
}
// Wait waits for client errors.
func (c *Client) Wait(ctx context.Context) error {
select {
case <-c.pc.Failed():
return fmt.Errorf("peer connection closed")
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}
func (c *Client) optionsICEServers(
ctx context.Context,
) ([]pwebrtc.ICEServer, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodOptions, c.URL.String(), nil)
if err != nil {
return nil, err
}
res, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNoContent {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
return LinkHeaderUnmarshal(res.Header["Link"])
}
type whipPostOfferResponse struct {
Answer *pwebrtc.SessionDescription
Location string
ETag string
}
func (c *Client) postOffer(
ctx context.Context,
offer *pwebrtc.SessionDescription,
) (*whipPostOfferResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.URL.String(), bytes.NewReader([]byte(offer.SDP)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/sdp")
res, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
contentType := httpp.ParseContentType(req.Header.Get("Content-Type"))
if contentType != "application/sdp" {
return nil, fmt.Errorf("bad Content-Type: expected 'application/sdp', got '%s'", contentType)
}
c.patchIsSupported = (res.Header.Get("Accept-Patch") == "application/trickle-ice-sdpfrag")
Location := res.Header.Get("Location")
etag := res.Header.Get("ETag")
if etag == "" {
return nil, fmt.Errorf("ETag is missing")
}
sdp, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
answer := &pwebrtc.SessionDescription{
Type: pwebrtc.SDPTypeAnswer,
SDP: string(sdp),
}
return &whipPostOfferResponse{
Answer: answer,
Location: Location,
ETag: etag,
}, nil
}
func (c *Client) patchCandidate(
ctx context.Context,
offer *pwebrtc.SessionDescription,
etag string,
candidate *pwebrtc.ICECandidateInit,
) error {
if !c.patchIsSupported {
return nil
}
frag, err := ICEFragmentMarshal(offer.SDP, []*pwebrtc.ICECandidateInit{candidate})
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, c.URL.String(), bytes.NewReader(frag))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag")
req.Header.Set("If-Match", etag)
res, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %v", res.StatusCode)
}
return nil
}
func (c *Client) deleteSession(
ctx context.Context,
) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.URL.String(), nil)
if err != nil {
return err
}
res, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("bad status code: %v", res.StatusCode)
}
return nil
}