From 28970b323e023df30023ab6654b611a0ed4adc12 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Wed, 25 Dec 2024 17:42:19 +0100 Subject: [PATCH] fix MTX_CONN_TYPE value with RTSP connections (#3967) (#4075) --- README.md | 8 +- internal/core/metrics_test.go | 2 +- internal/core/path_test.go | 272 ++++++++++++++++++--------- internal/core/test_on_demand/main.go | 54 ++++++ internal/servers/rtsp/conn.go | 2 +- mediamtx.yml | 16 +- 6 files changed, 248 insertions(+), 106 deletions(-) create mode 100644 internal/core/test_on_demand/main.go diff --git a/README.md b/README.md index 5be829db..fdcd35e6 100644 --- a/README.md +++ b/README.md @@ -1787,9 +1787,9 @@ The server allows to specify commands that are executed when a certain event hap # Command to run when a client connects to the server. # This is terminated with SIGINT when a client disconnects from the server. # The following environment variables are available: -# * RTSP_PORT: RTSP server port # * MTX_CONN_TYPE: connection type # * MTX_CONN_ID: connection ID +# * RTSP_PORT: RTSP server port runOnConnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&conn_id=$MTX_CONN_ID # Restart the command if it exits. runOnConnectRestart: no @@ -1911,10 +1911,10 @@ pathDefaults: # Command to run when a recording segment is created. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_SEGMENT_PATH: segment file path # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. - # * MTX_SEGMENT_PATH: segment file path runOnRecordSegmentCreate: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH ``` @@ -1925,11 +1925,11 @@ pathDefaults: # Command to run when a recording segment is complete. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_SEGMENT_PATH: segment file path + # * MTX_SEGMENT_DURATION: segment duration # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. - # * MTX_SEGMENT_PATH: segment file path - # * MTX_SEGMENT_DURATION: segment duration runOnRecordSegmentComplete: curl http://my-custom-server/webhook?path=$MTX_PATH&segment_path=$MTX_SEGMENT_PATH ``` diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go index 8fcd1ede..b061e522 100644 --- a/internal/core/metrics_test.go +++ b/internal/core/metrics_test.go @@ -213,7 +213,7 @@ webrtc_sessions_bytes_sent 0 go func() { defer wg.Done() - u, err := url.Parse("rtmp://localhost:1936/rtmps_path") + u, err := url.Parse("rtmps://localhost:1936/rtmps_path") require.NoError(t, err) nconn, err := tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true}) diff --git a/internal/core/path_test.go b/internal/core/path_test.go index 3a44cb6a..60f24046 100644 --- a/internal/core/path_test.go +++ b/internal/core/path_test.go @@ -3,6 +3,7 @@ package core import ( "bufio" "context" + "crypto/tls" "fmt" "net" "net/http" @@ -28,60 +29,6 @@ import ( "github.com/bluenviron/mediamtx/internal/test" ) -var runOnDemandSampleScript = ` -package main - -import ( - "os" - "os/signal" - "syscall" - "github.com/bluenviron/gortsplib/v4" - "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/format" -) - -func main() { - if os.Getenv("MTX_PATH") != "ondemand" || - os.Getenv("MTX_QUERY") != "param=value" || - os.Getenv("G1") != "on" { - panic("environment not set") - } - - medi := &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - SPS: []byte{ - 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, - 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, - 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, - }, - PPS: []byte{0x01, 0x02, 0x03, 0x04}, - PacketizationMode: 1, - }}, - } - - source := gortsplib.Client{} - - err := source.StartRecording( - "rtsp://localhost:" + os.Getenv("RTSP_PORT") + "/" + os.Getenv("MTX_PATH"), - &description.Session{Medias: []*description.Media{medi}}) - if err != nil { - panic(err) - } - defer source.Close() - - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT) - <-c - - err = os.WriteFile("ON_DEMAND_FILE", []byte(""), 0644) - if err != nil { - panic(err) - } -} -` - type testServer struct { onDescribe func(*gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) onSetup func(*gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) @@ -107,12 +54,6 @@ func TestPathRunOnDemand(t *testing.T) { onDemand := filepath.Join(os.TempDir(), "on_demand") onUnDemand := filepath.Join(os.TempDir(), "on_undemand") - srcFile := filepath.Join(os.TempDir(), "ondemand.go") - err := os.WriteFile(srcFile, - []byte(strings.ReplaceAll(runOnDemandSampleScript, "ON_DEMAND_FILE", onDemand)), 0o644) - require.NoError(t, err) - defer os.Remove(srcFile) - for _, ca := range []string{"describe", "setup", "describe and setup"} { t.Run(ca, func(t *testing.T) { defer os.Remove(onDemand) @@ -123,9 +64,9 @@ func TestPathRunOnDemand(t *testing.T) { "webrtc: no\n"+ "paths:\n"+ " '~^(on)demand$':\n"+ - " runOnDemand: go run %s\n"+ + " runOnDemand: sh -c \"ON_DEMAND=%s go run ./test_on_demand/main.go\"\n"+ " runOnDemandCloseAfter: 1s\n"+ - " runOnUnDemand: touch %s\n", srcFile, onUnDemand)) + " runOnUnDemand: touch %s\n", onDemand, onUnDemand)) require.Equal(t, true, ok) defer p1.Close() @@ -209,7 +150,15 @@ func TestPathRunOnDemand(t *testing.T) { } func TestPathRunOnConnect(t *testing.T) { - for _, ca := range []string{"rtsp", "rtmp", "srt"} { + serverCertFpath, err := test.CreateTempFile(test.TLSCertPub) + require.NoError(t, err) + defer os.Remove(serverCertFpath) + + serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey) + require.NoError(t, err) + defer os.Remove(serverKeyFpath) + + for _, ca := range []string{"rtsp", "rtsps", "rtmp", "rtmps", "srt"} { t.Run(ca, func(t *testing.T) { onConnect := filepath.Join(os.TempDir(), "on_connect") defer os.Remove(onConnect) @@ -217,18 +166,28 @@ func TestPathRunOnConnect(t *testing.T) { onDisconnect := filepath.Join(os.TempDir(), "on_disconnect") defer os.Remove(onDisconnect) + connType := "" + func() { p, ok := newInstance(fmt.Sprintf( - "paths:\n"+ + "encryption: optional\n"+ + "serverCert: "+serverCertFpath+"\n"+ + "serverKey: "+serverKeyFpath+"\n"+ + "rtmpEncryption: optional\n"+ + "rtmpServerCert: "+serverCertFpath+"\n"+ + "rtmpServerKey: "+serverKeyFpath+"\n"+ + "paths:\n"+ " test:\n"+ - "runOnConnect: touch %s\n"+ - "runOnDisconnect: touch %s\n", + "runOnConnect: sh -c 'echo \"$MTX_CONN_TYPE $MTX_CONN_ID $RTSP_PORT\" > %s'\n"+ + "runOnDisconnect: sh -c 'echo \"$MTX_CONN_TYPE $MTX_CONN_ID $RTSP_PORT\" > %s'\n", onConnect, onDisconnect)) require.Equal(t, true, ok) defer p.Close() switch ca { case "rtsp": + connType = "rtspConn" + c := gortsplib.Client{} err := c.StartRecording( @@ -237,7 +196,20 @@ func TestPathRunOnConnect(t *testing.T) { require.NoError(t, err) defer c.Close() + case "rtsps": + connType = "rtspsConn" + + c := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}} + + err := c.StartRecording( + "rtsps://localhost:8322/test", + &description.Session{Medias: []*description.Media{test.UniqueMediaH264()}}) + require.NoError(t, err) + defer c.Close() + case "rtmp": + connType = "rtmpConn" + u, err := url.Parse("rtmp://127.0.0.1:1935/test") require.NoError(t, err) @@ -248,7 +220,22 @@ func TestPathRunOnConnect(t *testing.T) { _, err = rtmp.NewClientConn(nconn, u, true) require.NoError(t, err) + case "rtmps": + connType = "rtmpsConn" + + u, err := url.Parse("rtmps://127.0.0.1:1936/test") + require.NoError(t, err) + + nconn, err := tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true}) + require.NoError(t, err) + defer nconn.Close() //nolint:errcheck + + _, err = rtmp.NewClientConn(nconn, u, true) + require.NoError(t, err) + case "srt": + connType = "srtConn" + conf := srt.DefaultConfig() address, err := conf.UnmarshalURL("srt://localhost:8890?streamid=publish:test") require.NoError(t, err) @@ -264,11 +251,19 @@ func TestPathRunOnConnect(t *testing.T) { time.Sleep(500 * time.Millisecond) }() - _, err := os.Stat(onConnect) + byts, err := os.ReadFile(onConnect) require.NoError(t, err) + fields := strings.Split(string(byts[:len(byts)-1]), " ") + require.Equal(t, connType, fields[0]) + require.NotEmpty(t, fields[1]) + require.Equal(t, "8554", fields[2]) - _, err = os.Stat(onDisconnect) + byts, err = os.ReadFile(onDisconnect) require.NoError(t, err) + fields = strings.Split(string(byts[:len(byts)-1]), " ") + require.Equal(t, connType, fields[0]) + require.NotEmpty(t, fields[1]) + require.Equal(t, "8554", fields[2]) }) } } @@ -285,9 +280,9 @@ func TestPathRunOnReady(t *testing.T) { "hls: no\n"+ "webrtc: no\n"+ "paths:\n"+ - " test:\n"+ - " runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+ - " runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n", + " ~te(st):\n"+ + " runOnReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY $MTX_SOURCE_TYPE $MTX_SOURCE_ID $RTSP_PORT $G1\" > %s'\n"+ + " runOnNotReady: sh -c 'echo \"$MTX_PATH $MTX_QUERY $MTX_SOURCE_TYPE $MTX_SOURCE_ID $RTSP_PORT $G1\" > %s'\n", onReady, onNotReady)) require.Equal(t, true, ok) defer p.Close() @@ -305,15 +300,35 @@ func TestPathRunOnReady(t *testing.T) { byts, err := os.ReadFile(onReady) require.NoError(t, err) - require.Equal(t, "test query=value\n", string(byts)) + fields := strings.Split(string(byts[:len(byts)-1]), " ") + require.Equal(t, "test", fields[0]) + require.Equal(t, "query=value", fields[1]) + require.Equal(t, "rtspSession", fields[2]) + require.NotEmpty(t, fields[3]) + require.Equal(t, "8554", fields[4]) + require.Equal(t, "st", fields[5]) byts, err = os.ReadFile(onNotReady) require.NoError(t, err) - require.Equal(t, "test query=value\n", string(byts)) + fields = strings.Split(string(byts[:len(byts)-1]), " ") + require.Equal(t, "test", fields[0]) + require.Equal(t, "query=value", fields[1]) + require.Equal(t, "rtspSession", fields[2]) + require.NotEmpty(t, fields[3]) + require.Equal(t, "8554", fields[4]) + require.Equal(t, "st", fields[5]) } func TestPathRunOnRead(t *testing.T) { - for _, ca := range []string{"rtsp", "rtmp", "srt", "webrtc"} { + serverCertFpath, err := test.CreateTempFile(test.TLSCertPub) + require.NoError(t, err) + defer os.Remove(serverCertFpath) + + serverKeyFpath, err := test.CreateTempFile(test.TLSCertKey) + require.NoError(t, err) + defer os.Remove(serverKeyFpath) + + for _, ca := range []string{"rtsp", "rtsps", "rtmp", "rtmps", "srt", "webrtc"} { t.Run(ca, func(t *testing.T) { onRead := filepath.Join(os.TempDir(), "on_read") defer os.Remove(onRead) @@ -323,10 +338,16 @@ func TestPathRunOnRead(t *testing.T) { func() { p, ok := newInstance(fmt.Sprintf( - "paths:\n"+ - " test:\n"+ - " runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n"+ - " runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY\" > %s'\n", + "encryption: optional\n"+ + "serverCert: "+serverCertFpath+"\n"+ + "serverKey: "+serverKeyFpath+"\n"+ + "rtmpEncryption: optional\n"+ + "rtmpServerCert: "+serverCertFpath+"\n"+ + "rtmpServerKey: "+serverKeyFpath+"\n"+ + "paths:\n"+ + " ~te(st):\n"+ + " runOnRead: sh -c 'echo \"$MTX_PATH $MTX_QUERY $MTX_READER_TYPE $MTX_READER_ID $RTSP_PORT $G1\" > %s'\n"+ + " runOnUnread: sh -c 'echo \"$MTX_PATH $MTX_QUERY $MTX_READER_TYPE $MTX_READER_ID $RTSP_PORT $G1\" > %s'\n", onRead, onUnread)) require.Equal(t, true, ok) defer p.Close() @@ -361,6 +382,25 @@ func TestPathRunOnRead(t *testing.T) { _, err = reader.Play(nil) require.NoError(t, err) + case "rtsps": + reader := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}} + + u, err := base.ParseURL("rtsps://127.0.0.1:8322/test?query=value") + require.NoError(t, err) + + err = reader.Start(u.Scheme, u.Host) + require.NoError(t, err) + defer reader.Close() + + desc, _, err := reader.Describe(u) + require.NoError(t, err) + + err = reader.SetupAll(desc.BaseURL, desc.Medias) + require.NoError(t, err) + + _, err = reader.Play(nil) + require.NoError(t, err) + case "rtmp": u, err := url.Parse("rtmp://127.0.0.1:1935/test?query=value") require.NoError(t, err) @@ -375,6 +415,20 @@ func TestPathRunOnRead(t *testing.T) { _, err = rtmp.NewReader(conn) require.NoError(t, err) + case "rtmps": + u, err := url.Parse("rtmps://127.0.0.1:1936/test?query=value") + require.NoError(t, err) + + nconn, err := tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true}) + require.NoError(t, err) + defer nconn.Close() //nolint:errcheck + + conn, err := rtmp.NewClientConn(nconn, u, false) + require.NoError(t, err) + + _, err = rtmp.NewReader(conn) + require.NoError(t, err) + case "srt": conf := srt.DefaultConfig() address, err := conf.UnmarshalURL("srt://localhost:8890?streamid=read:test:query=value") @@ -440,13 +494,42 @@ func TestPathRunOnRead(t *testing.T) { time.Sleep(500 * time.Millisecond) }() + var readerType string + + switch ca { + case "rtsp": + readerType = "rtspSession" + case "rtsps": + readerType = "rtspsSession" + case "rtmp": + readerType = "rtmpConn" + case "rtmps": + readerType = "rtmpsConn" + case "srt": + readerType = "srtConn" + case "webrtc": + readerType = "webrtcSession" + } + byts, err := os.ReadFile(onRead) require.NoError(t, err) - require.Equal(t, "test query=value\n", string(byts)) + fields := strings.Split(string(byts[:len(byts)-1]), " ") + require.Equal(t, "test", fields[0]) + require.Equal(t, "query=value", fields[1]) + require.Equal(t, readerType, fields[2]) + require.NotEmpty(t, fields[3]) + require.Equal(t, "8554", fields[4]) + require.Equal(t, "st", fields[5]) byts, err = os.ReadFile(onUnread) require.NoError(t, err) - require.Equal(t, "test query=value\n", string(byts)) + fields = strings.Split(string(byts[:len(byts)-1]), " ") + require.Equal(t, "test", fields[0]) + require.Equal(t, "query=value", fields[1]) + require.Equal(t, readerType, fields[2]) + require.NotEmpty(t, fields[3]) + require.Equal(t, "8554", fields[4]) + require.Equal(t, "st", fields[5]) }) } } @@ -463,14 +546,16 @@ func TestPathRunOnRecordSegment(t *testing.T) { defer os.RemoveAll(recordDir) func() { - p, ok := newInstance("record: yes\n" + - "recordPath: " + filepath.Join(recordDir, "%path/%Y-%m-%d_%H-%M-%S-%f") + "\n" + - "paths:\n" + - " test:\n" + - " runOnRecordSegmentCreate: " + - "sh -c 'echo \"$MTX_SEGMENT_PATH\" > " + onRecordSegmentCreate + "'\n" + - " runOnRecordSegmentComplete: " + - "sh -c 'echo \"$MTX_SEGMENT_PATH $MTX_SEGMENT_DURATION\" > " + onRecordSegmentComplete + "'\n") + p, ok := newInstance(fmt.Sprintf("record: yes\n"+ + "recordPath: %s\n"+ + "paths:\n"+ + " test:\n"+ + " runOnRecordSegmentCreate: sh -c 'echo \"$MTX_SEGMENT_PATH $RTSP_PORT\" > %s'\n"+ + " runOnRecordSegmentComplete: sh -c 'echo \"$MTX_SEGMENT_PATH $MTX_SEGMENT_DURATION $RTSP_PORT\" > %s'\n", + filepath.Join(recordDir, "%path/%Y-%m-%d_%H-%M-%S-%f"), + onRecordSegmentCreate, + onRecordSegmentComplete, + )) require.Equal(t, true, ok) defer p.Close() @@ -504,13 +589,16 @@ func TestPathRunOnRecordSegment(t *testing.T) { byts, err := os.ReadFile(onRecordSegmentCreate) require.NoError(t, err) - require.Equal(t, true, strings.HasPrefix(string(byts), recordDir)) + fields := strings.Split(string(byts[:len(byts)-1]), " ") + require.True(t, strings.HasPrefix(fields[0], recordDir)) + require.Equal(t, "8554", fields[1]) byts, err = os.ReadFile(onRecordSegmentComplete) require.NoError(t, err) - parts := strings.Split(string(byts[:len(byts)-1]), " ") - require.Equal(t, true, strings.HasPrefix(parts[0], recordDir)) - require.Equal(t, "3", parts[1]) + fields = strings.Split(string(byts[:len(byts)-1]), " ") + require.True(t, strings.HasPrefix(fields[0], recordDir)) + require.Equal(t, "3", fields[1]) + require.Equal(t, "8554", fields[2]) } func TestPathMaxReaders(t *testing.T) { diff --git a/internal/core/test_on_demand/main.go b/internal/core/test_on_demand/main.go new file mode 100644 index 00000000..c18b00f3 --- /dev/null +++ b/internal/core/test_on_demand/main.go @@ -0,0 +1,54 @@ +// This is used for testing purposes. +package main + +import ( + "os" + "os/signal" + "syscall" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" +) + +func main() { + if os.Getenv("MTX_QUERY") != "param=value" { + panic("unexpected MTX_QUERY") + } + if os.Getenv("G1") != "on" { + panic("unexpected G1") + } + + medi := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + SPS: []byte{ + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, + }, + PPS: []byte{0x01, 0x02, 0x03, 0x04}, + PacketizationMode: 1, + }}, + } + + source := gortsplib.Client{} + + err := source.StartRecording( + "rtsp://localhost:"+os.Getenv("RTSP_PORT")+"/"+os.Getenv("MTX_PATH"), + &description.Session{Medias: []*description.Media{medi}}) + if err != nil { + panic(err) + } + defer source.Close() + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT) + <-c + + err = os.WriteFile(os.Getenv("ON_DEMAND"), []byte(""), 0o644) + if err != nil { + panic(err) + } +} diff --git a/internal/servers/rtsp/conn.go b/internal/servers/rtsp/conn.go index 290c3430..216083dd 100644 --- a/internal/servers/rtsp/conn.go +++ b/internal/servers/rtsp/conn.go @@ -60,7 +60,7 @@ func (c *conn) initialize() { if c.isTLS { return "rtspsConn" } - return "conn" + return "rtspConn" }(), ID: c.uuid.String(), } diff --git a/mediamtx.yml b/mediamtx.yml index b7339df5..2546d233 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -27,9 +27,9 @@ udpMaxPayloadSize: 1472 # Command to run when a client connects to the server. # This is terminated with SIGINT when a client disconnects from the server. # The following environment variables are available: -# * RTSP_PORT: RTSP server port # * MTX_CONN_TYPE: connection type # * MTX_CONN_ID: connection ID +# * RTSP_PORT: RTSP server port runOnConnect: # Restart the command if it exits. runOnConnectRestart: no @@ -638,11 +638,11 @@ pathDefaults: # The following environment variables are available: # * MTX_PATH: path name # * MTX_QUERY: query parameters (passed by publisher) + # * MTX_SOURCE_TYPE: source type + # * MTX_SOURCE_ID: source ID # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. - # * MTX_SOURCE_TYPE: source type - # * MTX_SOURCE_ID: source ID runOnReady: # Restart the command if it exits. runOnReadyRestart: no @@ -655,11 +655,11 @@ pathDefaults: # The following environment variables are available: # * MTX_PATH: path name # * MTX_QUERY: query parameters (passed by reader) + # * MTX_READER_TYPE: reader type + # * MTX_READER_ID: reader ID # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. - # * MTX_READER_TYPE: reader type - # * MTX_READER_ID: reader ID runOnRead: # Restart the command if it exits. runOnReadRestart: no @@ -670,20 +670,20 @@ pathDefaults: # Command to run when a recording segment is created. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_SEGMENT_PATH: segment file path # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. - # * MTX_SEGMENT_PATH: segment file path runOnRecordSegmentCreate: # Command to run when a recording segment is complete. # The following environment variables are available: # * MTX_PATH: path name + # * MTX_SEGMENT_PATH: segment file path + # * MTX_SEGMENT_DURATION: segment duration # * RTSP_PORT: RTSP server port # * G1, G2, ...: regular expression groups, if path name is # a regular expression. - # * MTX_SEGMENT_PATH: segment file path - # * MTX_SEGMENT_DURATION: segment duration runOnRecordSegmentComplete: ###############################################