From e0da916b8f5b079a4865eef7f64863f50785463d Mon Sep 17 00:00:00 2001 From: Anton Khirnov Date: Wed, 24 Jan 2024 20:21:37 +0100 Subject: [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes Use 8 packets/frames by default rather than 1, which seems to provide better throughput. Allow -thread_queue_size to set the muxer queue size manually again. --- fftools/ffmpeg_mux.h | 2 -- fftools/ffmpeg_mux_init.c | 3 +-- fftools/ffmpeg_opt.c | 2 +- fftools/ffmpeg_sched.c | 15 ++++++++++----- fftools/ffmpeg_sched.h | 4 +++- tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat | 5 ----- 6 files changed, 15 insertions(+), 16 deletions(-) diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h index d0be8a51ea..e1b44142cf 100644 --- a/fftools/ffmpeg_mux.h +++ b/fftools/ffmpeg_mux.h @@ -94,8 +94,6 @@ typedef struct Muxer { AVDictionary *opts; - int thread_queue_size; - /* filesize limit expressed in bytes */ int64_t limit_filesize; atomic_int_least64_t last_filesize; diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c index 6b5e4f8b3c..8ada837555 100644 --- a/fftools/ffmpeg_mux_init.c +++ b/fftools/ffmpeg_mux_init.c @@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) of->start_time = o->start_time; of->shortest = o->shortest; - mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8; mux->limit_filesize = o->limit_filesize; av_dict_copy(&mux->opts, o->g->format_opts, 0); @@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) } err = sch_add_mux(sch, muxer_thread, mux_check_init, mux, - !strcmp(oc->oformat->name, "rtp")); + !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size); if (err < 0) return err; mux->sch = sch; diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index 304c493dcf..7505b0cf90 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -144,7 +144,7 @@ static void init_options(OptionsContext *o) o->limit_filesize = INT64_MAX; o->chapters_input_file = INT_MAX; o->accurate_seek = 1; - o->thread_queue_size = -1; + o->thread_queue_size = 0; o->input_sync_ref = -1; o->find_stream_info = 1; o->shortest_buf_duration = 10.f; diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index c6ed2e21ff..d91968822f 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -218,6 +218,7 @@ typedef struct SchMux { */ atomic_int mux_started; ThreadQueue *queue; + unsigned queue_size; AVPacket *sub_heartbeat_pkt; } SchMux; @@ -360,6 +361,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si ThreadQueue *tq; ObjPool *op; + queue_size = queue_size > 0 ? queue_size : 8; + op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : objpool_alloc_frames(); if (!op) @@ -655,7 +658,7 @@ static const AVClass sch_mux_class = { }; int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), - void *arg, int sdp_auto) + void *arg, int sdp_auto, unsigned thread_queue_size) { const unsigned idx = sch->nb_mux; @@ -669,6 +672,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), mux = &sch->mux[idx]; mux->class = &sch_mux_class; mux->init = init; + mux->queue_size = thread_queue_size; task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg); @@ -775,7 +779,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, if (!dec->send_frame) return AVERROR(ENOMEM); - ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS); + ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS); if (ret < 0) return ret; @@ -815,7 +819,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx); - ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES); + ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES); if (ret < 0) return ret; @@ -863,7 +867,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, if (ret < 0) return ret; - ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES); + ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES); if (ret < 0) return ret; @@ -1315,7 +1319,8 @@ int sch_start(Scheduler *sch) } } - ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS); + ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size, + QUEUE_PACKETS); if (ret < 0) return ret; diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h index 811146f6ed..95f9c1d4db 100644 --- a/fftools/ffmpeg_sched.h +++ b/fftools/ffmpeg_sched.h @@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, * streams in the muxer. * @param ctx Muxer state; will be passed to func/init and used for logging. * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename(). + * @param thread_queue_size number of packets that can be buffered before + * sending to the muxer blocks * * @retval ">=0" Index of the newly-created muxer. * @retval "<0" Error code. */ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *), - void *ctx, int sdp_auto); + void *ctx, int sdp_auto, unsigned thread_queue_size); /** * Add a muxed stream for a previously added muxer. * diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat index bc9b833799..3a3ec96637 100644 --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat @@ -33,8 +33,3 @@ {\an7}( inaudible radio chatter ) >> Safety remains our numb -9 -00:00:03,704 --> 00:00:04,004 -{\an7}( inaudible radio chatter ) ->> Safety remains our number one -