readd seqnum to RTP-Info (#233)

This commit is contained in:
aler9 2021-03-23 21:31:43 +01:00
parent 83e51e2bf8
commit d338e04df7
8 changed files with 89 additions and 89 deletions

2
go.mod
View File

@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027
github.com/aler9/gortsplib v0.0.0-20210323202043-7e9e266054fc
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum
View File

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027 h1:cJkd74/wqKvjAUmvIoBElY12m+R2I0Pzk0UR14xyT0c=
github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/gortsplib v0.0.0-20210323202043-7e9e266054fc h1:uxCYfD2G2vlGMjxB3sGP++PJP1sJFd6ABl2TTV0/r9g=
github.com/aler9/gortsplib v0.0.0-20210323202043-7e9e266054fc/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

View File

@ -104,7 +104,7 @@ type RemoveReq struct {
// PlayRes is a play response.
type PlayRes struct {
TrackStartingPoints []streamproc.TrackStartingPoint
TrackInfos []streamproc.TrackInfo
}
// PlayReq is a play request.

View File

@ -326,8 +326,8 @@ func (c *Client) run() {
// add RTP-Info
var ri headers.RTPInfo
for trackID, v := range res.TrackStartingPoints {
if !v.Filled {
for trackID, ti := range res.TrackInfos {
if !ti.Initialized {
continue
}
@ -345,13 +345,14 @@ func (c *Client) run() {
u.AddControlAttribute("trackID=" + strconv.FormatInt(int64(trackID), 10))
clockRate, _ := track.ClockRate()
ts := uint32(uint64(time.Since(v.NTPTime).Seconds()*float64(clockRate)) +
uint64(v.RTPTime))
ts := uint32(uint64(time.Since(ti.NTPTime).Seconds()*float64(clockRate)) +
uint64(ti.RTPTime))
lsn := ti.LastSequenceNumber
ri = append(ri, &headers.RTPInfoEntry{
URL: u,
SequenceNumber: 0,
Timestamp: ts,
SequenceNumber: &lsn,
Timestamp: &ts,
})
}
if len(ri) > 0 {

View File

@ -77,7 +77,6 @@ type Path struct {
setupPlayRequests []client.SetupPlayReq
source source.Source
sourceTracks gortsplib.Tracks
sourceTrackStartingPoints []streamproc.TrackStartingPoint
sp *streamproc.StreamProc
readers *readersMap
onDemandCmd *externalcmd.Cmd
@ -101,7 +100,6 @@ type Path struct {
clientRecord chan client.RecordReq
clientPause chan client.PauseReq
clientRemove chan client.RemoveReq
spSetStartingPoint chan streamproc.SetStartingPointReq
terminate chan struct{}
}
@ -146,7 +144,6 @@ func New(
clientRecord: make(chan client.RecordReq),
clientPause: make(chan client.PauseReq),
clientRemove: make(chan client.RemoveReq),
spSetStartingPoint: make(chan streamproc.SetStartingPointReq),
terminate: make(chan struct{}),
}
@ -228,7 +225,6 @@ outer:
case req := <-pa.extSourceSetReady:
pa.sourceTracks = req.Tracks
pa.sourceTrackStartingPoints = make([]streamproc.TrackStartingPoint, len(req.Tracks))
pa.sp = streamproc.New(pa, len(req.Tracks))
pa.onSourceSetReady()
req.Res <- source.ExtSetReadyRes{SP: pa.sp}
@ -269,9 +265,6 @@ outer:
pa.clientsWg.Done()
close(req.Res)
case req := <-pa.spSetStartingPoint:
pa.onSPSetStartingPoint(req)
case <-pa.terminate:
pa.exhaustChannels()
break outer
@ -330,7 +323,6 @@ outer:
close(pa.clientRecord)
close(pa.clientPause)
close(pa.clientRemove)
close(pa.spSetStartingPoint)
}
func (pa *Path) exhaustChannels() {
@ -397,12 +389,6 @@ func (pa *Path) exhaustChannels() {
pa.clientsWg.Done()
close(req.Res)
case _, ok := <-pa.spSetStartingPoint:
if !ok {
return
}
}
}
}()
@ -657,13 +643,7 @@ func (pa *Path) onClientPlay(req client.PlayReq) {
pa.clients[req.Client] = clientStatePlay
pa.readers.add(req.Client)
// clone
cl := make([]streamproc.TrackStartingPoint, len(pa.sourceTrackStartingPoints))
for k, v := range pa.sourceTrackStartingPoints {
cl[k] = v
}
req.Res <- client.PlayRes{cl} // nolint:govet
req.Res <- client.PlayRes{TrackInfos: pa.sp.TrackInfos()}
}
func (pa *Path) onClientAnnounce(req client.AnnounceReq) {
@ -713,7 +693,6 @@ func (pa *Path) onClientRecord(req client.RecordReq) {
pa.clients[req.Client] = clientStateRecord
pa.onSourceSetReady()
pa.sourceTrackStartingPoints = make([]streamproc.TrackStartingPoint, len(pa.sourceTracks))
pa.sp = streamproc.New(pa, len(pa.sourceTracks))
req.Res <- client.RecordRes{SP: pa.sp, Err: nil}
@ -740,14 +719,6 @@ func (pa *Path) onClientPause(req client.PauseReq) {
close(req.Res)
}
func (pa *Path) onSPSetStartingPoint(req streamproc.SetStartingPointReq) {
if req.SP != pa.sp {
return
}
pa.sourceTrackStartingPoints[req.TrackID] = req.StartingPoint
}
func (pa *Path) scheduleSourceClose() {
if !pa.hasExternalSource() || !pa.conf.SourceOnDemand || pa.source == nil {
return
@ -855,11 +826,6 @@ func (pa *Path) OnClientPause(req client.PauseReq) {
pa.clientPause <- req
}
// OnSPSetStartingPoint is called by streamproc.StreamProc.
func (pa *Path) OnSPSetStartingPoint(req streamproc.SetStartingPointReq) {
pa.spSetStartingPoint <- req
}
// OnSPFrame is called by streamproc.StreamProc.
func (pa *Path) OnSPFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
pa.readers.forwardFrame(trackID, streamType, payload)

View File

@ -1,65 +1,83 @@
package streamproc
import (
"encoding/binary"
"sync"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
"github.com/pion/rtp"
)
// TrackStartingPoint is the starting point of a track.
type TrackStartingPoint struct {
Filled bool // used to avoid mutexes
RTPTime uint32
NTPTime time.Time
}
// Path is implemented by path.path.
type Path interface {
OnSPSetStartingPoint(SetStartingPointReq)
OnSPFrame(int, gortsplib.StreamType, []byte)
}
// SetStartingPointReq is a set starting point request.
type SetStartingPointReq struct {
SP *StreamProc
TrackID int
StartingPoint TrackStartingPoint
// TrackInfo contains infos about a track.
type TrackInfo struct {
Initialized bool
LastSequenceNumber uint16
RTPTime uint32
NTPTime time.Time
}
type trackInfo struct {
initialized bool
lastSequenceNumber uint32
timeMutex sync.Mutex
rtpTime uint32
ntpTime time.Time
}
// StreamProc is a stream processor, an intermediate layer between a source and a path.
type StreamProc struct {
path Path
startingPoints []TrackStartingPoint
path Path
trackInfos []trackInfo
}
// New allocates a StreamProc.
func New(path Path, tracksLen int) *StreamProc {
return &StreamProc{
path: path,
startingPoints: make([]TrackStartingPoint, tracksLen),
path: path,
trackInfos: make([]trackInfo, tracksLen),
}
}
// TrackInfos returns infos about the tracks of the stream.
func (sp *StreamProc) TrackInfos() []TrackInfo {
ret := make([]TrackInfo, len(sp.trackInfos))
for trackID := range sp.trackInfos {
sp.trackInfos[trackID].timeMutex.Lock()
ret[trackID] = TrackInfo{
Initialized: sp.trackInfos[trackID].initialized,
LastSequenceNumber: uint16(atomic.LoadUint32(&sp.trackInfos[trackID].lastSequenceNumber)),
RTPTime: sp.trackInfos[trackID].rtpTime,
NTPTime: sp.trackInfos[trackID].ntpTime,
}
sp.trackInfos[trackID].timeMutex.Unlock()
}
return ret
}
// OnFrame processes a frame.
func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP &&
!sp.startingPoints[trackID].Filled {
pkt := rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
return
if streamType == gortsplib.StreamTypeRTP && len(payload) >= 8 {
// store last sequence number
sequenceNumber := binary.BigEndian.Uint16(payload[2 : 2+2])
atomic.StoreUint32(&sp.trackInfos[trackID].lastSequenceNumber, uint32(sequenceNumber))
// store time mapping
if !sp.trackInfos[trackID].initialized {
timestamp := binary.BigEndian.Uint32(payload[4 : 4+4])
sp.trackInfos[trackID].timeMutex.Lock()
sp.trackInfos[trackID].initialized = true
sp.trackInfos[trackID].rtpTime = timestamp
sp.trackInfos[trackID].ntpTime = time.Now()
sp.trackInfos[trackID].timeMutex.Unlock()
}
sp.startingPoints[trackID].Filled = true
sp.startingPoints[trackID].RTPTime = pkt.Timestamp
sp.startingPoints[trackID].NTPTime = time.Now()
sp.path.OnSPSetStartingPoint(SetStartingPointReq{
SP: sp,
TrackID: trackID,
StartingPoint: sp.startingPoints[trackID],
})
}
sp.path.OnSPFrame(trackID, streamType, payload)

View File

@ -670,8 +670,11 @@ func TestClientRTSPRTPInfo(t *testing.T) {
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
},
SequenceNumber: 0,
Timestamp: (*dest.RTPInfo())[0].Timestamp,
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
}, dest.RTPInfo())
}()
@ -705,8 +708,11 @@ func TestClientRTSPRTPInfo(t *testing.T) {
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
},
SequenceNumber: 0,
Timestamp: (*dest.RTPInfo())[0].Timestamp,
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: &base.URL{
@ -714,8 +720,11 @@ func TestClientRTSPRTPInfo(t *testing.T) {
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=1",
},
SequenceNumber: 0,
Timestamp: (*dest.RTPInfo())[1].Timestamp,
SequenceNumber: func() *uint16 {
v := uint16(87)
return &v
}(),
Timestamp: (*dest.RTPInfo())[1].Timestamp,
},
}, dest.RTPInfo())
}()

View File

@ -301,8 +301,11 @@ func TestSourceRTSPRTPInfo(t *testing.T) {
Host: "127.0.1.2:8554",
Path: "/proxied/trackID=0",
},
SequenceNumber: 0,
Timestamp: (*dest.RTPInfo())[0].Timestamp,
SequenceNumber: func() *uint16 {
v := uint16(87)
return &v
}(),
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: &base.URL{
@ -310,8 +313,11 @@ func TestSourceRTSPRTPInfo(t *testing.T) {
Host: "127.0.1.2:8554",
Path: "/proxied/trackID=1",
},
SequenceNumber: 0,
Timestamp: (*dest.RTPInfo())[1].Timestamp,
SequenceNumber: func() *uint16 {
v := uint16(34254)
return &v
}(),
Timestamp: (*dest.RTPInfo())[1].Timestamp,
},
}, dest.RTPInfo())
}