ffmpeg/libavformat/fifo.c
Andreas Rheinhardt 41ab25cb2d avformat/fifo_test: Move into tests/fifo_muxer.c
This muxer solely exists to test the fifo muxer via a dedicated
test tool in libavformat/tests/fifo_muxer.c. It fulfills no
other role and it is only designed with this role in mind.

The latter can be seen in two facts: The muxer uses printf
for logging and it simply presumes the packets' data to contain
a FailingMuxerPacketData (a struct duplicated in fifo_test.c
and tests/fifo_muxer.c.); in particular, it presumes packets
to have data at all, but this need not be true with side-data
only packets and a segfault can easily be triggered by e.g.
encoding flac (our native encoder sends a side-data only packet
with updated extradata at the end of encoding).

This patch fixes this by moving the test muxer into the fifo
test tool, making it inaccessible via the API (and actually
removing it from libavformat.so and libavformat.a).
While this muxer was accessible via e.g. av_guess_format(),
it was not really usable for an API user as FailingMuxerPacketData
was not public. Therefore this is not considered a breaking change.

In order to continue to use the test muxer in the test tool,
the ordinary fifo muxer had to be overridden: fifo_muxer.c
includes lavf/fifo.c but with FIFO_TEST defined which makes
it support the fifo_test muxer. This is possible because
test tools are always linked statically to their respective
library.

Reviewed-by: Stefano Sabatini <stefasab@gmail.com>
Signed-off-by: Andreas Rheinhardt <andreas.rheinhardt@outlook.com>
2024-03-12 22:39:54 +01:00

733 lines
22 KiB
C

/*
* FIFO pseudo-muxer
* Copyright (c) 2016 Jan Sebechlebsky
*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <stdatomic.h>
#include "libavutil/avassert.h"
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavutil/thread.h"
#include "libavutil/threadmessage.h"
#include "avformat.h"
#include "internal.h"
#include "mux.h"
#define FIFO_DEFAULT_QUEUE_SIZE 60
#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
typedef struct FifoContext {
const AVClass *class;
AVFormatContext *avf;
char *format;
AVDictionary *format_options;
int queue_size;
AVThreadMessageQueue *queue;
pthread_t writer_thread;
/* Return value of last write_trailer_call */
int write_trailer_ret;
/* Time to wait before next recovery attempt
* This can refer to the time in processed stream,
* or real time. */
int64_t recovery_wait_time;
/* Maximal number of unsuccessful successive recovery attempts */
int max_recovery_attempts;
/* Whether to attempt recovery from failure */
int attempt_recovery;
/* If >0 stream time will be used when waiting
* for the recovery attempt instead of real time */
int recovery_wait_streamtime;
/* If >0 recovery will be attempted regardless of error code
* (except AVERROR_EXIT, so exit request is never ignored) */
int recover_any_error;
/* Whether to drop packets in case the queue is full. */
int drop_pkts_on_overflow;
/* Whether to wait for keyframe when recovering
* from failure or queue overflow */
int restart_with_keyframe;
pthread_mutex_t overflow_flag_lock;
int overflow_flag_lock_initialized;
/* Value > 0 signals queue overflow */
volatile uint8_t overflow_flag;
atomic_int_least64_t queue_duration;
int64_t last_sent_dts;
int64_t timeshift;
} FifoContext;
typedef struct FifoThreadContext {
AVFormatContext *avf;
/* Timestamp of last failure.
* This is either pts in case stream time is used,
* or microseconds as returned by av_gettime_relative() */
int64_t last_recovery_ts;
/* Number of current recovery process
* Value > 0 means we are in recovery process */
int recovery_nr;
/* If > 0 all frames will be dropped until keyframe is received */
uint8_t drop_until_keyframe;
/* Value > 0 means that the previous write_header call was successful
* so finalization by calling write_trailer and ff_io_close must be done
* before exiting / reinitialization of underlying muxer */
uint8_t header_written;
int64_t last_received_dts;
} FifoThreadContext;
typedef enum FifoMessageType {
FIFO_NOOP,
FIFO_WRITE_HEADER,
FIFO_WRITE_PACKET,
FIFO_FLUSH_OUTPUT
} FifoMessageType;
typedef struct FifoMessage {
FifoMessageType type;
AVPacket pkt;
} FifoMessage;
static int fifo_thread_write_header(FifoThreadContext *ctx)
{
AVFormatContext *avf = ctx->avf;
FifoContext *fifo = avf->priv_data;
AVFormatContext *avf2 = fifo->avf;
AVDictionary *format_options = NULL;
int ret, i;
ret = av_dict_copy(&format_options, fifo->format_options, 0);
if (ret < 0)
goto end;
ret = ff_format_output_open(avf2, avf->url, &format_options);
if (ret < 0) {
av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
av_err2str(ret));
goto end;
}
for (i = 0;i < avf2->nb_streams; i++)
ffstream(avf2->streams[i])->cur_dts = 0;
ret = avformat_write_header(avf2, &format_options);
if (!ret)
ctx->header_written = 1;
// Check for options unrecognized by underlying muxer
if (format_options) {
const AVDictionaryEntry *entry = NULL;
while ((entry = av_dict_iterate(format_options, entry)))
av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
ret = AVERROR(EINVAL);
}
end:
av_dict_free(&format_options);
return ret;
}
static int fifo_thread_flush_output(FifoThreadContext *ctx)
{
AVFormatContext *avf = ctx->avf;
FifoContext *fifo = avf->priv_data;
AVFormatContext *avf2 = fifo->avf;
return av_write_frame(avf2, NULL);
}
static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
{
AVStream *st = avf->streams[pkt->stream_index];
int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
*last_dts = dts;
return duration;
}
static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
{
AVFormatContext *avf = ctx->avf;
FifoContext *fifo = avf->priv_data;
AVFormatContext *avf2 = fifo->avf;
AVRational src_tb, dst_tb;
int ret, s_idx;
int64_t orig_pts, orig_dts, orig_duration;
if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
if (ctx->drop_until_keyframe) {
if (pkt->flags & AV_PKT_FLAG_KEY) {
ctx->drop_until_keyframe = 0;
av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
} else {
av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
av_packet_unref(pkt);
return 0;
}
}
orig_pts = pkt->pts;
orig_dts = pkt->dts;
orig_duration = pkt->duration;
s_idx = pkt->stream_index;
src_tb = avf->streams[s_idx]->time_base;
dst_tb = avf2->streams[s_idx]->time_base;
av_packet_rescale_ts(pkt, src_tb, dst_tb);
ret = av_write_frame(avf2, pkt);
if (ret >= 0) {
av_packet_unref(pkt);
} else {
// avoid scaling twice
pkt->pts = orig_pts;
pkt->dts = orig_dts;
pkt->duration = orig_duration;
}
return ret;
}
static int fifo_thread_write_trailer(FifoThreadContext *ctx)
{
AVFormatContext *avf = ctx->avf;
FifoContext *fifo = avf->priv_data;
AVFormatContext *avf2 = fifo->avf;
int ret;
if (!ctx->header_written)
return 0;
ret = av_write_trailer(avf2);
ff_format_io_close(avf2, &avf2->pb);
return ret;
}
static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
{
int ret = AVERROR(EINVAL);
if (msg->type == FIFO_NOOP)
return 0;
if (!ctx->header_written) {
ret = fifo_thread_write_header(ctx);
if (ret < 0)
return ret;
}
switch(msg->type) {
case FIFO_WRITE_HEADER:
av_assert0(ret >= 0);
return ret;
case FIFO_WRITE_PACKET:
return fifo_thread_write_packet(ctx, &msg->pkt);
case FIFO_FLUSH_OUTPUT:
return fifo_thread_flush_output(ctx);
}
av_assert0(0);
return AVERROR(EINVAL);
}
static int is_recoverable(const FifoContext *fifo, int err_no) {
if (!fifo->attempt_recovery)
return 0;
if (fifo->recover_any_error)
return err_no != AVERROR_EXIT;
switch (err_no) {
case AVERROR(EINVAL):
case AVERROR(ENOSYS):
case AVERROR_EOF:
case AVERROR_EXIT:
case AVERROR_PATCHWELCOME:
return 0;
default:
return 1;
}
}
static void free_message(void *msg)
{
FifoMessage *fifo_msg = msg;
if (fifo_msg->type == FIFO_WRITE_PACKET)
av_packet_unref(&fifo_msg->pkt);
}
static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
int err_no)
{
AVFormatContext *avf = ctx->avf;
FifoContext *fifo = avf->priv_data;
int ret;
av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
av_err2str(err_no));
if (fifo->recovery_wait_streamtime) {
if (pkt->pts == AV_NOPTS_VALUE)
av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
" timestamp, recovery will be attempted immediately");
ctx->last_recovery_ts = pkt->pts;
} else {
ctx->last_recovery_ts = av_gettime_relative();
}
if (fifo->max_recovery_attempts &&
ctx->recovery_nr >= fifo->max_recovery_attempts) {
av_log(avf, AV_LOG_ERROR,
"Maximal number of %d recovery attempts reached.\n",
fifo->max_recovery_attempts);
ret = err_no;
} else {
ret = AVERROR(EAGAIN);
}
return ret;
}
static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
{
AVFormatContext *avf = ctx->avf;
FifoContext *fifo = avf->priv_data;
AVPacket *pkt = &msg->pkt;
int64_t time_since_recovery;
int ret;
if (!is_recoverable(fifo, err_no)) {
ret = err_no;
goto fail;
}
if (ctx->header_written) {
fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
ctx->header_written = 0;
}
if (!ctx->recovery_nr) {
ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
AV_NOPTS_VALUE : 0;
} else {
if (fifo->recovery_wait_streamtime) {
if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
AVRational tb = avf->streams[pkt->stream_index]->time_base;
time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
tb, AV_TIME_BASE_Q);
} else {
/* Enforce recovery immediately */
time_since_recovery = fifo->recovery_wait_time;
}
} else {
time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
}
if (time_since_recovery < fifo->recovery_wait_time)
return AVERROR(EAGAIN);
}
ctx->recovery_nr++;
if (fifo->max_recovery_attempts) {
av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
ctx->recovery_nr, fifo->max_recovery_attempts);
} else {
av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
ctx->recovery_nr);
}
if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
ctx->drop_until_keyframe = 1;
ret = fifo_thread_dispatch_message(ctx, msg);
if (ret < 0) {
if (is_recoverable(fifo, ret)) {
return fifo_thread_process_recovery_failure(ctx, pkt, ret);
} else {
goto fail;
}
} else {
av_log(avf, AV_LOG_INFO, "Recovery successful\n");
ctx->recovery_nr = 0;
}
return 0;
fail:
free_message(msg);
return ret;
}
static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
{
AVFormatContext *avf = ctx->avf;
FifoContext *fifo = avf->priv_data;
int ret;
do {
if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
if (time_to_wait)
av_usleep(FFMIN(10000, time_to_wait));
}
ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
} while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
if (msg->type == FIFO_WRITE_PACKET)
av_packet_unref(&msg->pkt);
ret = 0;
}
return ret;
}
static void *fifo_consumer_thread(void *data)
{
AVFormatContext *avf = data;
FifoContext *fifo = avf->priv_data;
AVThreadMessageQueue *queue = fifo->queue;
FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
int ret;
FifoThreadContext fifo_thread_ctx;
memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
fifo_thread_ctx.avf = avf;
fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
ff_thread_setname("fifo-consumer");
while (1) {
uint8_t just_flushed = 0;
if (!fifo_thread_ctx.recovery_nr)
ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
if (rec_ret < 0) {
av_thread_message_queue_set_err_send(queue, rec_ret);
break;
}
}
/* If the queue is full at the moment when fifo_write_packet
* attempts to insert new message (packet) to the queue,
* it sets the fifo->overflow_flag to 1 and drops packet.
* Here in consumer thread, the flag is checked and if it is
* set, the queue is flushed and flag cleared. */
pthread_mutex_lock(&fifo->overflow_flag_lock);
if (fifo->overflow_flag) {
av_thread_message_flush(queue);
if (fifo->restart_with_keyframe)
fifo_thread_ctx.drop_until_keyframe = 1;
fifo->overflow_flag = 0;
just_flushed = 1;
}
pthread_mutex_unlock(&fifo->overflow_flag_lock);
if (just_flushed)
av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
if (fifo->timeshift)
while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
av_usleep(10000);
ret = av_thread_message_queue_recv(queue, &msg, 0);
if (ret < 0) {
av_thread_message_queue_set_err_send(queue, ret);
break;
}
}
fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
return NULL;
}
static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat,
const char *filename)
{
FifoContext *fifo = avf->priv_data;
AVFormatContext *avf2;
int ret = 0, i;
ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
if (ret < 0)
return ret;
fifo->avf = avf2;
avf2->interrupt_callback = avf->interrupt_callback;
avf2->max_delay = avf->max_delay;
ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
if (ret < 0)
return ret;
avf2->opaque = avf->opaque;
avf2->io_close2 = avf->io_close2;
avf2->io_open = avf->io_open;
avf2->flags = avf->flags;
for (i = 0; i < avf->nb_streams; ++i) {
AVStream *st = ff_stream_clone(avf2, avf->streams[i]);
if (!st)
return AVERROR(ENOMEM);
}
return 0;
}
static int fifo_init(AVFormatContext *avf)
{
FifoContext *fifo = avf->priv_data;
const AVOutputFormat *oformat;
int ret = 0;
if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
" only when drop_pkts_on_overflow is also turned on\n");
return AVERROR(EINVAL);
}
atomic_init(&fifo->queue_duration, 0);
fifo->last_sent_dts = AV_NOPTS_VALUE;
#ifdef FIFO_TEST
/* This exists for the fifo_muxer test tool. */
if (fifo->format && !strcmp(fifo->format, "fifo_test")) {
extern const FFOutputFormat ff_fifo_test_muxer;
oformat = &ff_fifo_test_muxer.p;
} else
#endif
oformat = av_guess_format(fifo->format, avf->url, NULL);
if (!oformat) {
ret = AVERROR_MUXER_NOT_FOUND;
return ret;
}
ret = fifo_mux_init(avf, oformat, avf->url);
if (ret < 0)
return ret;
ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
sizeof(FifoMessage));
if (ret < 0)
return ret;
av_thread_message_queue_set_free_func(fifo->queue, free_message);
ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
if (ret < 0)
return AVERROR(ret);
fifo->overflow_flag_lock_initialized = 1;
return 0;
}
static int fifo_write_header(AVFormatContext *avf)
{
FifoContext * fifo = avf->priv_data;
int ret;
ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
if (ret) {
av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
av_err2str(AVERROR(ret)));
ret = AVERROR(ret);
}
return ret;
}
static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
{
FifoContext *fifo = avf->priv_data;
FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
int ret;
if (pkt) {
ret = av_packet_ref(&msg.pkt,pkt);
if (ret < 0)
return ret;
}
ret = av_thread_message_queue_send(fifo->queue, &msg,
fifo->drop_pkts_on_overflow ?
AV_THREAD_MESSAGE_NONBLOCK : 0);
if (ret == AVERROR(EAGAIN)) {
uint8_t overflow_set = 0;
/* Queue is full, set fifo->overflow_flag to 1
* to let consumer thread know the queue should
* be flushed. */
pthread_mutex_lock(&fifo->overflow_flag_lock);
if (!fifo->overflow_flag)
fifo->overflow_flag = overflow_set = 1;
pthread_mutex_unlock(&fifo->overflow_flag_lock);
if (overflow_set)
av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
ret = 0;
goto fail;
} else if (ret < 0) {
goto fail;
}
if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
return ret;
fail:
if (pkt)
av_packet_unref(&msg.pkt);
return ret;
}
static int fifo_write_trailer(AVFormatContext *avf)
{
FifoContext *fifo= avf->priv_data;
int ret;
av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
if (fifo->timeshift) {
int64_t now = av_gettime_relative();
int64_t elapsed = 0;
FifoMessage msg = {FIFO_NOOP};
do {
int64_t delay = av_gettime_relative() - now;
if (delay < 0) { // Discontinuity?
delay = 10000;
now = av_gettime_relative();
} else {
now += delay;
}
atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
elapsed += delay;
if (elapsed > fifo->timeshift)
break;
av_usleep(10000);
ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
} while (ret >= 0 || ret == AVERROR(EAGAIN));
atomic_store(&fifo->queue_duration, INT64_MAX);
}
ret = pthread_join(fifo->writer_thread, NULL);
if (ret < 0) {
av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
av_err2str(AVERROR(ret)));
return AVERROR(ret);
}
ret = fifo->write_trailer_ret;
return ret;
}
static void fifo_deinit(AVFormatContext *avf)
{
FifoContext *fifo = avf->priv_data;
avformat_free_context(fifo->avf);
av_thread_message_queue_free(&fifo->queue);
if (fifo->overflow_flag_lock_initialized)
pthread_mutex_destroy(&fifo->overflow_flag_lock);
}
#define OFFSET(x) offsetof(FifoContext, x)
static const AVOption options[] = {
{"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
{"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
{"fifo_format", "Target muxer", OFFSET(format),
AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
{"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
{"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
{"queue_size", "Size of fifo queue", OFFSET(queue_size),
AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
{"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
{"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
{"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
{"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
{"timeshift", "Delay fifo output", OFFSET(timeshift),
AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
{NULL},
};
static const AVClass fifo_muxer_class = {
.class_name = "Fifo muxer",
.item_name = av_default_item_name,
.option = options,
.version = LIBAVUTIL_VERSION_INT,
};
const FFOutputFormat ff_fifo_muxer = {
.p.name = "fifo",
.p.long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
.p.priv_class = &fifo_muxer_class,
#if FF_API_ALLOW_FLUSH
.p.flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH | AVFMT_TS_NEGATIVE,
#else
.p.flags = AVFMT_NOFILE | AVFMT_TS_NEGATIVE,
#endif
.priv_data_size = sizeof(FifoContext),
.init = fifo_init,
.write_header = fifo_write_header,
.write_packet = fifo_write_packet,
.write_trailer = fifo_write_trailer,
.deinit = fifo_deinit,
.flags_internal = FF_FMT_ALLOW_FLUSH,
};