fftools/ffmpeg_sched: allow connecting encoder output to decoders

This commit is contained in:
Anton Khirnov 2024-02-29 18:50:02 +01:00
parent da17c4d24a
commit efab83c156
2 changed files with 181 additions and 39 deletions

View File

@ -1051,25 +1051,44 @@ int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
}
case SCH_NODE_TYPE_ENC: {
SchEnc *enc;
SchMuxStream *ms;
av_assert0(src.idx < sch->nb_enc);
// encoding packets go to muxing
av_assert0(dst.type == SCH_NODE_TYPE_MUX &&
dst.idx < sch->nb_mux &&
dst.idx_stream < sch->mux[dst.idx].nb_streams);
enc = &sch->enc[src.idx];
ms = &sch->mux[dst.idx].streams[dst.idx_stream];
av_assert0(!ms->src.type);
ret = GROW_ARRAY(enc->dst, enc->nb_dst);
if (ret < 0)
return ret;
enc->dst[enc->nb_dst - 1] = dst;
// encoding packets go to muxing or decoding
switch (dst.type) {
case SCH_NODE_TYPE_MUX: {
SchMuxStream *ms;
av_assert0(dst.idx < sch->nb_mux &&
dst.idx_stream < sch->mux[dst.idx].nb_streams);
ms = &sch->mux[dst.idx].streams[dst.idx_stream];
av_assert0(!ms->src.type);
ms->src = src;
break;
}
case SCH_NODE_TYPE_DEC: {
SchDec *dec;
av_assert0(dst.idx < sch->nb_dec);
dec = &sch->dec[dst.idx];
av_assert0(!dec->src.type);
dec->src = src;
break;
}
default: av_assert0(0);
}
break;
}
default: av_assert0(0);
@ -1217,6 +1236,31 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_
return 0;
}
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
{
while (1) {
SchFilterGraph *fg;
// fed directly by a demuxer (i.e. not through a filtergraph)
if (src.type == SCH_NODE_TYPE_DEMUX) {
sch->demux[src.idx].waiter.choked_next = 0;
return;
}
av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
fg = &sch->filters[src.idx];
// the filtergraph contains internal sources and
// requested to be scheduled directly
if (fg->best_input == fg->nb_inputs) {
fg->waiter.choked_next = 0;
return;
}
src = fg->inputs[fg->best_input].src_sched;
}
}
static void schedule_update_locked(Scheduler *sch)
{
int64_t dts;
@ -1245,7 +1289,6 @@ static void schedule_update_locked(Scheduler *sch)
for (unsigned j = 0; j < mux->nb_streams; j++) {
SchMuxStream *ms = &mux->streams[j];
SchDemux *d;
// unblock sources for output streams that are not finished
// and not too far ahead of the trailing stream
@ -1256,27 +1299,8 @@ static void schedule_update_locked(Scheduler *sch)
if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
continue;
// for outputs fed from filtergraphs, consider that filtergraph's
// best_input information, in other cases there is a well-defined
// source demuxer
if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) {
SchFilterGraph *fg = &sch->filters[ms->src_sched.idx];
SchFilterIn *fi;
// the filtergraph contains internal sources and
// requested to be scheduled directly
if (fg->best_input == fg->nb_inputs) {
fg->waiter.choked_next = 0;
have_unchoked = 1;
continue;
}
fi = &fg->inputs[fg->best_input];
d = &sch->demux[fi->src_sched.idx];
} else
d = &sch->demux[ms->src_sched.idx];
d->waiter.choked_next = 0;
// resolve the source to unchoke
unchoke_for_stream(sch, ms->src_sched);
have_unchoked = 1;
}
}
@ -1303,6 +1327,105 @@ static void schedule_update_locked(Scheduler *sch)
}
enum {
CYCLE_NODE_NEW = 0,
CYCLE_NODE_STARTED,
CYCLE_NODE_DONE,
};
static int
check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
uint8_t *filters_visited, SchedulerNode *filters_stack)
{
unsigned nb_filters_stack = 0;
memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
while (1) {
const SchFilterGraph *fg = &sch->filters[src.idx];
filters_visited[src.idx] = CYCLE_NODE_STARTED;
// descend into every input, depth first
if (src.idx_stream < fg->nb_inputs) {
const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
// connected to demuxer, no cycles possible
if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
continue;
// otherwise connected to another filtergraph
av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
// found a cycle
if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
return AVERROR(EINVAL);
// place current position on stack and descend
av_assert0(nb_filters_stack < sch->nb_filters);
filters_stack[nb_filters_stack++] = src;
src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
continue;
}
filters_visited[src.idx] = CYCLE_NODE_DONE;
// previous search finished,
if (nb_filters_stack) {
src = filters_stack[--nb_filters_stack];
continue;
}
return 0;
}
}
static int check_acyclic(Scheduler *sch)
{
uint8_t *filters_visited = NULL;
SchedulerNode *filters_stack = NULL;
int ret = 0;
if (!sch->nb_filters)
return 0;
filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
if (!filters_visited)
return AVERROR(ENOMEM);
filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
if (!filters_stack) {
ret = AVERROR(ENOMEM);
goto fail;
}
// trace the transcoding graph upstream from every output stream
// fed by a filtergraph
for (unsigned i = 0; i < sch->nb_mux; i++) {
SchMux *mux = &sch->mux[i];
for (unsigned j = 0; j < mux->nb_streams; j++) {
SchMuxStream *ms = &mux->streams[j];
SchedulerNode src = ms->src_sched;
if (src.type != SCH_NODE_TYPE_FILTER_OUT)
continue;
src.idx_stream = 0;
ret = check_acyclic_for_output(sch, src, filters_visited, filters_stack);
if (ret < 0) {
av_log(mux, AV_LOG_ERROR, "Transcoding graph has a cycle\n");
goto fail;
}
}
}
fail:
av_freep(&filters_visited);
av_freep(&filters_stack);
return ret;
}
static int start_prepare(Scheduler *sch)
{
int ret;
@ -1402,14 +1525,21 @@ static int start_prepare(Scheduler *sch)
for (unsigned j = 0; j < fg->nb_inputs; j++) {
SchFilterIn *fi = &fg->inputs[j];
SchDec *dec;
if (!fi->src.type) {
av_log(fg, AV_LOG_ERROR,
"Filtergraph input %u not connected to a source\n", j);
return AVERROR(EINVAL);
}
av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
dec = &sch->dec[fi->src.idx];
fi->src_sched = sch->dec[fi->src.idx].src;
switch (dec->src.type) {
case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
default: av_assert0(0);
}
}
for (unsigned j = 0; j < fg->nb_outputs; j++) {
@ -1423,6 +1553,11 @@ static int start_prepare(Scheduler *sch)
}
}
// Check that the transcoding graph has no cycles.
ret = check_acyclic(sch);
if (ret < 0)
return ret;
return 0;
}
@ -1575,6 +1710,8 @@ static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
SchMux *mux;
SchMuxStream *ms;
if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
continue;
mux = &sch->mux[enc->dst[i].idx];
ms = &mux->streams[enc->dst[i].idx_stream];
@ -2150,14 +2287,19 @@ static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
if (!pkt)
goto finish;
ret = send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt);
ret = (dst.type == SCH_NODE_TYPE_MUX) ?
send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
tq_send(sch->dec[dst.idx].queue, 0, pkt);
if (ret == AVERROR_EOF)
goto finish;
return ret;
finish:
if (dst.type == SCH_NODE_TYPE_MUX)
send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
else
tq_send_finish(sch->dec[dst.idx].queue, 0);
*dst_finished = 1;

View File

@ -35,9 +35,9 @@
* - demuxers, each containing any number of demuxed streams; demuxed packets
* belonging to some stream are sent to any number of decoders (transcoding)
* and/or muxers (streamcopy);
* - decoders, which receive encoded packets from some demuxed stream, decode
* them, and send decoded frames to any number of filtergraph inputs
* (audio/video) or encoders (subtitles);
* - decoders, which receive encoded packets from some demuxed stream or
* encoder, decode them, and send decoded frames to any number of filtergraph
* inputs (audio/video) or encoders (subtitles);
* - filtergraphs, each containing zero or more inputs (0 in case the
* filtergraph contains a lavfi source filter), and one or more outputs; the
* inputs and outputs need not have matching media types;
@ -45,7 +45,7 @@
* filtered frames from each output are sent to some encoder;
* - encoders, which receive decoded frames from some decoder (subtitles) or
* some filtergraph output (audio/video), encode them, and send encoded
* packets to some muxed stream;
* packets to any number of muxed streams or decoders;
* - muxers, each containing any number of muxed streams; each muxed stream
* receives encoded packets from some demuxed stream (streamcopy) or some
* encoder (transcoding); those packets are interleaved and written out by the