fix MTX_CONN_TYPE value with RTSP connections (#3967) (#4075)

This commit is contained in:
Alessandro Ros 2024-12-25 17:42:19 +01:00 committed by GitHub
parent 8a808ac0f6
commit 28970b323e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 248 additions and 106 deletions

View File

@ -1787,9 +1787,9 @@ The server allows to specify commands that are executed when a certain event hap
# Command to run when a client connects to the server.
# This is terminated with SIGINT when a client disconnects from the server.
# The following environment variables are available:
# * RTSP_PORT: RTSP server port
# * MTX_CONN_TYPE: connection type
# * MTX_CONN_ID: connection ID
# * RTSP_PORT: RTSP server port
runOnConnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&conn_id=$MTX_CONN_ID
# Restart the command if it exits.
runOnConnectRestart: no
@ -1911,10 +1911,10 @@ pathDefaults:
# Command to run when a recording segment is created.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SEGMENT_PATH: segment file path
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentCreate: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
```
@ -1925,11 +1925,11 @@ pathDefaults:
# Command to run when a recording segment is complete.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_DURATION: segment duration
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_DURATION: segment duration
runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH
```

View File

@ -213,7 +213,7 @@ webrtc_sessions_bytes_sent 0
go func() {
defer wg.Done()
u, err := url.Parse("rtmp://localhost:1936/rtmps_path")
u, err := url.Parse("rtmps://localhost:1936/rtmps_path")
require.NoError(t, err)
nconn, err := tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})

View File

@ -3,6 +3,7 @@ package core
import (
"bufio"
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
@ -28,60 +29,6 @@ import (
"github.com/bluenviron/mediamtx/internal/test"
)
var runOnDemandSampleScript = `
package main
import (
"os"
"os/signal"
"syscall"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
)
func main() {
if os.Getenv("MTX_PATH") != "ondemand" ||
os.Getenv("MTX_QUERY") != "param=value" ||
os.Getenv("G1") != "on" {
panic("environment not set")
}
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
SPS: []byte{
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
PacketizationMode: 1,
}},
}
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:" + os.Getenv("RTSP_PORT") + "/" + os.Getenv("MTX_PATH"),
&description.Session{Medias: []*description.Media{medi}})
if err != nil {
panic(err)
}
defer source.Close()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
<-c
err = os.WriteFile("ON_DEMAND_FILE", []byte(""), 0644)
if err != nil {
panic(err)
}
}
`
type testServer struct {
onDescribe func(*gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error)
onSetup func(*gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error)
@ -107,12 +54,6 @@ func TestPathRunOnDemand(t *testing.T) {
onDemand := filepath.Join(os.TempDir(), "on_demand")
onUnDemand := filepath.Join(os.TempDir(), "on_undemand")
srcFile := filepath.Join(os.TempDir(), "ondemand.go")
err := os.WriteFile(srcFile,
[]byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemand)), 0o644)
require.NoError(t, err)
defer os.Remove(srcFile)
for _, ca := range []string{"describe", "setup", "describe and setup"} {
t.Run(ca, func(t *testing.T) {
defer os.Remove(onDemand)
@ -123,9 +64,9 @@ func TestPathRunOnDemand(t *testing.T) {
"webrtc: no\n"+
"paths:\n"+
" '~^(on)demand$':\n"+
" runOnDemand: go run %s\n"+
" runOnDemand: sh -c \"ON_DEMAND=%s go run ./test_on_demand/main.go\"\n"+
" runOnDemandCloseAfter: 1s\n"+
" runOnUnDemand: touch %s\n", srcFile, onUnDemand))
" runOnUnDemand: touch %s\n", onDemand, onUnDemand))
require.Equal(t, true, ok)
defer p1.Close()
@ -209,7 +150,15 @@ func TestPathRunOnDemand(t *testing.T) {
}
func TestPathRunOnConnect(t *testing.T) {
for _, ca := range []string{"rtsp", "rtmp", "srt"} {
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{"rtsp", "rtsps", "rtmp", "rtmps", "srt"} {
t.Run(ca, func(t *testing.T) {
onConnect := filepath.Join(os.TempDir(), "on_connect")
defer os.Remove(onConnect)
@ -217,18 +166,28 @@ func TestPathRunOnConnect(t *testing.T) {
onDisconnect := filepath.Join(os.TempDir(), "on_disconnect")
defer os.Remove(onDisconnect)
connType := ""
func() {
p, ok := newInstance(fmt.Sprintf(
"paths:\n"+
"encryption: optional\n"+
"serverCert: "+serverCertFpath+"\n"+
"serverKey: "+serverKeyFpath+"\n"+
"rtmpEncryption: optional\n"+
"rtmpServerCert: "+serverCertFpath+"\n"+
"rtmpServerKey: "+serverKeyFpath+"\n"+
"paths:\n"+
" test:\n"+
"runOnConnect: touch %s\n"+
"runOnDisconnect: touch %s\n",
"runOnConnect: sh -c 'echo \"$MTX_CONN_TYPE $MTX_CONN_ID $RTSP_PORT\" > %s'\n"+
"runOnDisconnect: sh -c 'echo \"$MTX_CONN_TYPE $MTX_CONN_ID $RTSP_PORT\" > %s'\n",
onConnect, onDisconnect))
require.Equal(t, true, ok)
defer p.Close()
switch ca {
case "rtsp":
connType = "rtspConn"
c := gortsplib.Client{}
err := c.StartRecording(
@ -237,7 +196,20 @@ func TestPathRunOnConnect(t *testing.T) {
require.NoError(t, err)
defer c.Close()
case "rtsps":
connType = "rtspsConn"
c := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err := c.StartRecording(
"rtsps://localhost:8322/test",
&description.Session{Medias: []*description.Media{test.UniqueMediaH264()}})
require.NoError(t, err)
defer c.Close()
case "rtmp":
connType = "rtmpConn"
u, err := url.Parse("rtmp://127.0.0.1:1935/test")
require.NoError(t, err)
@ -248,7 +220,22 @@ func TestPathRunOnConnect(t *testing.T) {
_, err = rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
case "rtmps":
connType = "rtmpsConn"
u, err := url.Parse("rtmps://127.0.0.1:1936/test")
require.NoError(t, err)
nconn, err := tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})
require.NoError(t, err)
defer nconn.Close() //nolint:errcheck
_, err = rtmp.NewClientConn(nconn, u, true)
require.NoError(t, err)
case "srt":
connType = "srtConn"
conf := srt.DefaultConfig()
address, err := conf.UnmarshalURL("srt://localhost:8890?streamid=publish:test")
require.NoError(t, err)
@ -264,11 +251,19 @@ func TestPathRunOnConnect(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}()
_, err := os.Stat(onConnect)
byts, err := os.ReadFile(onConnect)
require.NoError(t, err)
fields := strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, connType, fields[0])
require.NotEmpty(t, fields[1])
require.Equal(t, "8554", fields[2])
_, err = os.Stat(onDisconnect)
byts, err = os.ReadFile(onDisconnect)
require.NoError(t, err)
fields = strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, connType, fields[0])
require.NotEmpty(t, fields[1])
require.Equal(t, "8554", fields[2])
})
}
}
@ -285,9 +280,9 @@ func TestPathRunOnReady(t *testing.T) {
"hls: no\n"+
"webrtc: no\n"+
"paths:\n"+
" test:\n"+
" runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+
" runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n",
" ~te(st):\n"+
" runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY $MTX_SOURCE_TYPE $MTX_SOURCE_ID $RTSP_PORT $G1\" > %s'\n"+
" runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY $MTX_SOURCE_TYPE $MTX_SOURCE_ID $RTSP_PORT $G1\" > %s'\n",
onReady, onNotReady))
require.Equal(t, true, ok)
defer p.Close()
@ -305,15 +300,35 @@ func TestPathRunOnReady(t *testing.T) {
byts, err := os.ReadFile(onReady)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))
fields := strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, "test", fields[0])
require.Equal(t, "query=value", fields[1])
require.Equal(t, "rtspSession", fields[2])
require.NotEmpty(t, fields[3])
require.Equal(t, "8554", fields[4])
require.Equal(t, "st", fields[5])
byts, err = os.ReadFile(onNotReady)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))
fields = strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, "test", fields[0])
require.Equal(t, "query=value", fields[1])
require.Equal(t, "rtspSession", fields[2])
require.NotEmpty(t, fields[3])
require.Equal(t, "8554", fields[4])
require.Equal(t, "st", fields[5])
}
func TestPathRunOnRead(t *testing.T) {
for _, ca := range []string{"rtsp", "rtmp", "srt", "webrtc"} {
serverCertFpath, err := test.CreateTempFile(test.TLSCertPub)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
for _, ca := range []string{"rtsp", "rtsps", "rtmp", "rtmps", "srt", "webrtc"} {
t.Run(ca, func(t *testing.T) {
onRead := filepath.Join(os.TempDir(), "on_read")
defer os.Remove(onRead)
@ -323,10 +338,16 @@ func TestPathRunOnRead(t *testing.T) {
func() {
p, ok := newInstance(fmt.Sprintf(
"paths:\n"+
" test:\n"+
" runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+
" runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n",
"encryption: optional\n"+
"serverCert: "+serverCertFpath+"\n"+
"serverKey: "+serverKeyFpath+"\n"+
"rtmpEncryption: optional\n"+
"rtmpServerCert: "+serverCertFpath+"\n"+
"rtmpServerKey: "+serverKeyFpath+"\n"+
"paths:\n"+
" ~te(st):\n"+
" runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY $MTX_READER_TYPE $MTX_READER_ID $RTSP_PORT $G1\" > %s'\n"+
" runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY $MTX_READER_TYPE $MTX_READER_ID $RTSP_PORT $G1\" > %s'\n",
onRead, onUnread))
require.Equal(t, true, ok)
defer p.Close()
@ -361,6 +382,25 @@ func TestPathRunOnRead(t *testing.T) {
_, err = reader.Play(nil)
require.NoError(t, err)
case "rtsps":
reader := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
u, err := base.ParseURL("rtsps://127.0.0.1:8322/test?query=value")
require.NoError(t, err)
err = reader.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer reader.Close()
desc, _, err := reader.Describe(u)
require.NoError(t, err)
err = reader.SetupAll(desc.BaseURL, desc.Medias)
require.NoError(t, err)
_, err = reader.Play(nil)
require.NoError(t, err)
case "rtmp":
u, err := url.Parse("rtmp://127.0.0.1:1935/test?query=value")
require.NoError(t, err)
@ -375,6 +415,20 @@ func TestPathRunOnRead(t *testing.T) {
_, err = rtmp.NewReader(conn)
require.NoError(t, err)
case "rtmps":
u, err := url.Parse("rtmps://127.0.0.1:1936/test?query=value")
require.NoError(t, err)
nconn, err := tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})
require.NoError(t, err)
defer nconn.Close() //nolint:errcheck
conn, err := rtmp.NewClientConn(nconn, u, false)
require.NoError(t, err)
_, err = rtmp.NewReader(conn)
require.NoError(t, err)
case "srt":
conf := srt.DefaultConfig()
address, err := conf.UnmarshalURL("srt://localhost:8890?streamid=read:test:query=value")
@ -440,13 +494,42 @@ func TestPathRunOnRead(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}()
var readerType string
switch ca {
case "rtsp":
readerType = "rtspSession"
case "rtsps":
readerType = "rtspsSession"
case "rtmp":
readerType = "rtmpConn"
case "rtmps":
readerType = "rtmpsConn"
case "srt":
readerType = "srtConn"
case "webrtc":
readerType = "webrtcSession"
}
byts, err := os.ReadFile(onRead)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))
fields := strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, "test", fields[0])
require.Equal(t, "query=value", fields[1])
require.Equal(t, readerType, fields[2])
require.NotEmpty(t, fields[3])
require.Equal(t, "8554", fields[4])
require.Equal(t, "st", fields[5])
byts, err = os.ReadFile(onUnread)
require.NoError(t, err)
require.Equal(t, "test query=value\n", string(byts))
fields = strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, "test", fields[0])
require.Equal(t, "query=value", fields[1])
require.Equal(t, readerType, fields[2])
require.NotEmpty(t, fields[3])
require.Equal(t, "8554", fields[4])
require.Equal(t, "st", fields[5])
})
}
}
@ -463,14 +546,16 @@ func TestPathRunOnRecordSegment(t *testing.T) {
defer os.RemoveAll(recordDir)
func() {
p, ok := newInstance("record: yes\n" +
"recordPath: " + filepath.Join(recordDir, "%path/%Y-%m-%d_%H-%M-%S-%f") + "\n" +
"paths:\n" +
" test:\n" +
" runOnRecordSegmentCreate: " +
"sh -c 'echo \"$MTX_SEGMENT_PATH\" > " + onRecordSegmentCreate + "'\n" +
" runOnRecordSegmentComplete: " +
"sh -c 'echo \"$MTX_SEGMENT_PATH $MTX_SEGMENT_DURATION\" > " + onRecordSegmentComplete + "'\n")
p, ok := newInstance(fmt.Sprintf("record: yes\n"+
"recordPath: %s\n"+
"paths:\n"+
" test:\n"+
" runOnRecordSegmentCreate: sh -c 'echo \"$MTX_SEGMENT_PATH $RTSP_PORT\" > %s'\n"+
" runOnRecordSegmentComplete: sh -c 'echo \"$MTX_SEGMENT_PATH $MTX_SEGMENT_DURATION $RTSP_PORT\" > %s'\n",
filepath.Join(recordDir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
onRecordSegmentCreate,
onRecordSegmentComplete,
))
require.Equal(t, true, ok)
defer p.Close()
@ -504,13 +589,16 @@ func TestPathRunOnRecordSegment(t *testing.T) {
byts, err := os.ReadFile(onRecordSegmentCreate)
require.NoError(t, err)
require.Equal(t, true, strings.HasPrefix(string(byts), recordDir))
fields := strings.Split(string(byts[:len(byts)-1]), " ")
require.True(t, strings.HasPrefix(fields[0], recordDir))
require.Equal(t, "8554", fields[1])
byts, err = os.ReadFile(onRecordSegmentComplete)
require.NoError(t, err)
parts := strings.Split(string(byts[:len(byts)-1]), " ")
require.Equal(t, true, strings.HasPrefix(parts[0], recordDir))
require.Equal(t, "3", parts[1])
fields = strings.Split(string(byts[:len(byts)-1]), " ")
require.True(t, strings.HasPrefix(fields[0], recordDir))
require.Equal(t, "3", fields[1])
require.Equal(t, "8554", fields[2])
}
func TestPathMaxReaders(t *testing.T) {

View File

@ -0,0 +1,54 @@
// This is used for testing purposes.
package main
import (
"os"
"os/signal"
"syscall"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
)
func main() {
if os.Getenv("MTX_QUERY") != "param=value" {
panic("unexpected MTX_QUERY")
}
if os.Getenv("G1") != "on" {
panic("unexpected G1")
}
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
SPS: []byte{
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
PacketizationMode: 1,
}},
}
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:"+os.Getenv("RTSP_PORT")+"/"+os.Getenv("MTX_PATH"),
&description.Session{Medias: []*description.Media{medi}})
if err != nil {
panic(err)
}
defer source.Close()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
<-c
err = os.WriteFile(os.Getenv("ON_DEMAND"), []byte(""), 0o644)
if err != nil {
panic(err)
}
}

View File

@ -60,7 +60,7 @@ func (c *conn) initialize() {
if c.isTLS {
return "rtspsConn"
}
return "conn"
return "rtspConn"
}(),
ID: c.uuid.String(),
}

View File

@ -27,9 +27,9 @@ udpMaxPayloadSize: 1472
# Command to run when a client connects to the server.
# This is terminated with SIGINT when a client disconnects from the server.
# The following environment variables are available:
# * RTSP_PORT: RTSP server port
# * MTX_CONN_TYPE: connection type
# * MTX_CONN_ID: connection ID
# * RTSP_PORT: RTSP server port
runOnConnect:
# Restart the command if it exits.
runOnConnectRestart: no
@ -638,11 +638,11 @@ pathDefaults:
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by publisher)
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SOURCE_TYPE: source type
# * MTX_SOURCE_ID: source ID
runOnReady:
# Restart the command if it exits.
runOnReadyRestart: no
@ -655,11 +655,11 @@ pathDefaults:
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_QUERY: query parameters (passed by reader)
# * MTX_READER_TYPE: reader type
# * MTX_READER_ID: reader ID
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_READER_TYPE: reader type
# * MTX_READER_ID: reader ID
runOnRead:
# Restart the command if it exits.
runOnReadRestart: no
@ -670,20 +670,20 @@ pathDefaults:
# Command to run when a recording segment is created.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SEGMENT_PATH: segment file path
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
runOnRecordSegmentCreate:
# Command to run when a recording segment is complete.
# The following environment variables are available:
# * MTX_PATH: path name
# * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_DURATION: segment duration
# * RTSP_PORT: RTSP server port
# * G1, G2, ...: regular expression groups, if path name is
# a regular expression.
# * MTX_SEGMENT_PATH: segment file path
# * MTX_SEGMENT_DURATION: segment duration
runOnRecordSegmentComplete:
###############################################