diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 18cbab01..a77157a7 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -389,6 +389,8 @@ components: state: type: string enum: [idle, read, publish] + path: + type: string bytesReceived: type: integer format: int64 @@ -408,6 +410,8 @@ components: state: type: string enum: [idle, read, publish] + path: + type: string bytesReceived: type: integer format: int64 @@ -496,6 +500,8 @@ components: state: type: string enum: [read, publish] + path: + type: string bytesReceived: type: integer format: int64 diff --git a/internal/core/api_defs.go b/internal/core/api_defs.go index bcebf590..04d97d6d 100644 --- a/internal/core/api_defs.go +++ b/internal/core/api_defs.go @@ -57,6 +57,7 @@ type apiRTMPConn struct { Created time.Time `json:"created"` RemoteAddr string `json:"remoteAddr"` State string `json:"state"` + Path string `json:"path"` BytesReceived uint64 `json:"bytesReceived"` BytesSent uint64 `json:"bytesSent"` } @@ -72,6 +73,7 @@ type apiRTSPSession struct { Created time.Time `json:"created"` RemoteAddr string `json:"remoteAddr"` State string `json:"state"` + Path string `json:"path"` BytesReceived uint64 `json:"bytesReceived"` BytesSent uint64 `json:"bytesSent"` } @@ -90,6 +92,7 @@ type apiWebRTCSession struct { LocalCandidate string `json:"localCandidate"` RemoteCandidate string `json:"remoteCandidate"` State string `json:"state"` + Path string `json:"path"` BytesReceived uint64 `json:"bytesReceived"` BytesSent uint64 `json:"bytesSent"` } diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 871ed30f..f5957e88 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -685,25 +685,33 @@ func TestAPIProtocolList(t *testing.T) { pa = "rtmpsconns" } + type item struct { + State string `json:"state"` + Path string `json:"path"` + } + var out struct { - ItemCount int `json:"itemCount"` - Items []struct { - State string `json:"state"` - } `json:"items"` + ItemCount int `json:"itemCount"` + Items []item `json:"items"` } httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/"+pa+"/list", nil, &out) if ca != "rtsp conns" && ca != "rtsps conns" { - require.Equal(t, "publish", out.Items[0].State) + require.Equal(t, item{ + State: "publish", + Path: "mypath", + }, out.Items[0]) } case "hls": + type item struct { + Created string `json:"created"` + LastRequest string `json:"lastRequest"` + } + var out struct { - ItemCount int `json:"itemCount"` - Items []struct { - Created string `json:"created"` - LastRequest string `json:"lastRequest"` - } `json:"items"` + ItemCount int `json:"itemCount"` + Items []item `json:"items"` } httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/hlsmuxers/list", nil, &out) @@ -713,13 +721,9 @@ func TestAPIProtocolList(t *testing.T) { case "webrtc": type item struct { - Created time.Time `json:"created"` - RemoteAddr string `json:"remoteAddr"` - PeerConnectionEstablished bool `json:"peerConnectionEstablished"` - LocalCandidate string `json:"localCandidate"` - RemoteCandidate string `json:"remoteCandidate"` - BytesReceived uint64 `json:"bytesReceived"` - BytesSent uint64 `json:"bytesSent"` + PeerConnectionEstablished bool `json:"peerConnectionEstablished"` + State string `json:"state"` + Path string `json:"path"` } var out struct { @@ -728,7 +732,11 @@ func TestAPIProtocolList(t *testing.T) { } httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v2/webrtcsessions/list", nil, &out) - require.Equal(t, true, out.Items[0].PeerConnectionEstablished) + require.Equal(t, item{ + PeerConnectionEstablished: true, + State: "read", + Path: "mypath", + }, out.Items[0]) } }) } diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 2cfaf4a7..5e978f0c 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -210,12 +210,13 @@ type rtmpConn struct { pathManager rtmpConnPathManager parent rtmpConnParent - ctx context.Context - ctxCancel func() - uuid uuid.UUID - created time.Time - state rtmpConnState - stateMutex sync.Mutex + ctx context.Context + ctxCancel func() + uuid uuid.UUID + created time.Time + mutex sync.Mutex + state rtmpConnState + pathName string } func newRTMPConn( @@ -279,12 +280,6 @@ func (c *rtmpConn) ip() net.IP { return c.nconn.RemoteAddr().(*net.TCPAddr).IP } -func (c *rtmpConn) safeState() rtmpConnState { - c.stateMutex.Lock() - defer c.stateMutex.Unlock() - return c.state -} - func (c *rtmpConn) run() { defer c.wg.Done() @@ -380,9 +375,10 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { defer res.path.readerRemove(pathReaderRemoveReq{author: c}) - c.stateMutex.Lock() + c.mutex.Lock() c.state = rtmpConnStateRead - c.stateMutex.Unlock() + c.pathName = pathName + c.mutex.Unlock() ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount)) go func() { @@ -794,9 +790,10 @@ func (c *rtmpConn) runPublish(u *url.URL) error { defer res.path.publisherRemove(pathPublisherRemoveReq{author: c}) - c.stateMutex.Lock() + c.mutex.Lock() c.state = rtmpConnStatePublish - c.stateMutex.Unlock() + c.pathName = pathName + c.mutex.Unlock() videoFormat, audioFormat, err := c.conn.ReadTracks() if err != nil { @@ -892,12 +889,15 @@ func (c *rtmpConn) apiSourceDescribe() pathAPISourceOrReader { } func (c *rtmpConn) apiItem() *apiRTMPConn { + c.mutex.Lock() + defer c.mutex.Unlock() + return &apiRTMPConn{ ID: c.uuid, Created: c.created, RemoteAddr: c.remoteAddr().String(), State: func() string { - switch c.safeState() { + switch c.state { case rtmpConnStateRead: return "read" @@ -906,6 +906,7 @@ func (c *rtmpConn) apiItem() *apiRTMPConn { } return "idle" }(), + Path: c.pathName, BytesReceived: c.conn.BytesReceived(), BytesSent: c.conn.BytesSent(), } diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index aa8496e5..d9438106 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -37,13 +37,14 @@ type rtspSession struct { pathManager rtspSessionPathManager parent rtspSessionParent - uuid uuid.UUID - created time.Time - path *path - stream *stream - state gortsplib.ServerSessionState - stateMutex sync.Mutex - onReadCmd *externalcmd.Cmd // read + uuid uuid.UUID + created time.Time + path *path + stream *stream + onReadCmd *externalcmd.Cmd // read + mutex sync.Mutex + state gortsplib.ServerSessionState + pathName string } func newRTSPSession( @@ -77,12 +78,6 @@ func (s *rtspSession) close() { s.session.Close() } -func (s *rtspSession) safeState() gortsplib.ServerSessionState { - s.stateMutex.Lock() - defer s.stateMutex.Unlock() - return s.state -} - func (s *rtspSession) remoteAddr() net.Addr { return s.author.NetConn().RemoteAddr() } @@ -157,9 +152,10 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno s.path = res.path - s.stateMutex.Lock() + s.mutex.Lock() s.state = gortsplib.ServerSessionStatePreRecord - s.stateMutex.Unlock() + s.pathName = ctx.Path + s.mutex.Unlock() return &base.Response{ StatusCode: base.StatusOK, @@ -242,9 +238,10 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt s.path = res.path s.stream = res.stream - s.stateMutex.Lock() + s.mutex.Lock() s.state = gortsplib.ServerSessionStatePrePlay - s.stateMutex.Unlock() + s.pathName = ctx.Path + s.mutex.Unlock() return &base.Response{ StatusCode: base.StatusOK, @@ -281,9 +278,9 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons }) } - s.stateMutex.Lock() + s.mutex.Lock() s.state = gortsplib.ServerSessionStatePlay - s.stateMutex.Unlock() + s.mutex.Unlock() } return &base.Response{ @@ -323,9 +320,9 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R } } - s.stateMutex.Lock() + s.mutex.Lock() s.state = gortsplib.ServerSessionStateRecord - s.stateMutex.Unlock() + s.mutex.Unlock() return &base.Response{ StatusCode: base.StatusOK, @@ -341,16 +338,16 @@ func (s *rtspSession) onPause(_ *gortsplib.ServerHandlerOnPauseCtx) (*base.Respo s.onReadCmd.Close() } - s.stateMutex.Lock() + s.mutex.Lock() s.state = gortsplib.ServerSessionStatePrePlay - s.stateMutex.Unlock() + s.mutex.Unlock() case gortsplib.ServerSessionStateRecord: s.path.publisherStop(pathPublisherStopReq{author: s}) - s.stateMutex.Lock() + s.mutex.Lock() s.state = gortsplib.ServerSessionStatePreRecord - s.stateMutex.Unlock() + s.mutex.Unlock() } return &base.Response{ @@ -387,12 +384,15 @@ func (s *rtspSession) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx } func (s *rtspSession) apiItem() *apiRTSPSession { + s.mutex.Lock() + defer s.mutex.Unlock() + return &apiRTSPSession{ ID: s.uuid, Created: s.created, RemoteAddr: s.remoteAddr().String(), State: func() string { - switch s.safeState() { + switch s.state { case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay: return "read" @@ -403,6 +403,7 @@ func (s *rtspSession) apiItem() *apiRTSPSession { } return "idle" }(), + Path: s.pathName, BytesReceived: s.session.BytesReceived(), BytesSent: s.session.BytesSent(), } diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index 053a4fbb..d13af765 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -126,9 +126,9 @@ type webRTCSession struct { created time.Time uuid uuid.UUID secret uuid.UUID - pcMutex sync.RWMutex - pc *peerConnection answerSent bool + mutex sync.RWMutex + pc *peerConnection chAddRemoteCandidates chan webRTCSessionAddCandidatesReq } @@ -180,12 +180,6 @@ func (s *webRTCSession) close() { s.ctxCancel() } -func (s *webRTCSession) safePC() *peerConnection { - s.pcMutex.RLock() - defer s.pcMutex.RUnlock() - return s.pc -} - func (s *webRTCSession) run() { defer s.wg.Done() @@ -491,9 +485,9 @@ outer: } } - s.pcMutex.Lock() + s.mutex.Lock() s.pc = pc - s.pcMutex.Unlock() + s.mutex.Unlock() return nil } @@ -542,19 +536,21 @@ func (s *webRTCSession) apiReaderDescribe() pathAPISourceOrReader { } func (s *webRTCSession) apiItem() *apiWebRTCSession { + s.mutex.RLock() + defer s.mutex.RUnlock() + peerConnectionEstablished := false localCandidate := "" remoteCandidate := "" bytesReceived := uint64(0) bytesSent := uint64(0) - pc := s.safePC() - if pc != nil { + if s.pc != nil { peerConnectionEstablished = true - localCandidate = pc.localCandidate() - remoteCandidate = pc.remoteCandidate() - bytesReceived = pc.bytesReceived() - bytesSent = pc.bytesSent() + localCandidate = s.pc.localCandidate() + remoteCandidate = s.pc.remoteCandidate() + bytesReceived = s.pc.bytesReceived() + bytesSent = s.pc.bytesSent() } return &apiWebRTCSession{ @@ -570,6 +566,7 @@ func (s *webRTCSession) apiItem() *apiWebRTCSession { } return "read" }(), + Path: s.req.pathName, BytesReceived: bytesReceived, BytesSent: bytesSent, }