diff --git a/libavcodec/frame_thread_encoder.c b/libavcodec/frame_thread_encoder.c index 9ca34e7ffb..bcd3c94f8b 100644 --- a/libavcodec/frame_thread_encoder.c +++ b/libavcodec/frame_thread_encoder.c @@ -32,13 +32,18 @@ #include "thread.h" #define MAX_THREADS 64 -#define BUFFER_SIZE (2*MAX_THREADS) +/* There can be as many as MAX_THREADS + 1 outstanding tasks. + * An additional + 1 is needed so that one can distinguish + * the case of zero and MAX_THREADS + 1 outstanding tasks modulo + * the number of buffers. */ +#define BUFFER_SIZE (MAX_THREADS + 2) typedef struct{ AVFrame *indata; AVPacket *outdata; int64_t return_code; unsigned index; + int finished; } Task; typedef struct{ @@ -49,8 +54,9 @@ typedef struct{ pthread_mutex_t task_fifo_mutex; pthread_cond_t task_fifo_cond; - Task finished_tasks[BUFFER_SIZE]; - pthread_mutex_t finished_task_mutex; + unsigned max_tasks; + Task tasks[BUFFER_SIZE]; + pthread_mutex_t finished_task_mutex; /* Guards tasks[i].finished */ pthread_cond_t finished_task_cond; unsigned task_index; @@ -63,17 +69,13 @@ typedef struct{ static void * attribute_align_arg worker(void *v){ AVCodecContext *avctx = v; ThreadContext *c = avctx->internal->frame_thread_encoder; - AVPacket *pkt = NULL; while (!atomic_load(&c->exit)) { int got_packet = 0, ret; + AVPacket *pkt; AVFrame *frame; Task task; - if(!pkt) pkt = av_packet_alloc(); - if(!pkt) continue; - av_init_packet(pkt); - pthread_mutex_lock(&c->task_fifo_mutex); while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) { if (atomic_load(&c->exit)) { @@ -84,7 +86,12 @@ static void * attribute_align_arg worker(void *v){ } av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL); pthread_mutex_unlock(&c->task_fifo_mutex); + /* The main thread ensures that any two outstanding tasks have + * different indices, ergo each worker thread owns its element + * of c->tasks with the exception of finished, which is shared + * with the main thread and guarded by finished_task_mutex. */ frame = task.indata; + pkt = c->tasks[task.index].outdata; ret = avctx->codec->encode2(avctx, pkt, frame, &got_packet); if(got_packet) { @@ -101,13 +108,12 @@ static void * attribute_align_arg worker(void *v){ pthread_mutex_unlock(&c->buffer_mutex); av_frame_free(&frame); pthread_mutex_lock(&c->finished_task_mutex); - c->finished_tasks[task.index].outdata = pkt; pkt = NULL; - c->finished_tasks[task.index].return_code = ret; + c->tasks[task.index].return_code = ret; + c->tasks[task.index].finished = 1; pthread_cond_signal(&c->finished_task_cond); pthread_mutex_unlock(&c->finished_task_mutex); } end: - av_free(pkt); pthread_mutex_lock(&c->buffer_mutex); avcodec_close(avctx); pthread_mutex_unlock(&c->buffer_mutex); @@ -194,6 +200,12 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, AVDictionary *options){ pthread_cond_init(&c->finished_task_cond, NULL); atomic_init(&c->exit, 0); + c->max_tasks = avctx->thread_count + 2; + for (unsigned i = 0; i < c->max_tasks; i++) { + if (!(c->tasks[i].outdata = av_packet_alloc())) + goto fail; + } + for(i=0; ithread_count ; i++){ AVDictionary *tmp = NULL; int ret; @@ -261,8 +273,8 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){ av_frame_free(&task.indata); } - for (i=0; ifinished_tasks[i].outdata); + for (unsigned i = 0; i < c->max_tasks; i++) { + av_packet_free(&c->tasks[i].outdata); } pthread_mutex_destroy(&c->task_fifo_mutex); @@ -276,7 +288,7 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){ int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVFrame *frame, int *got_packet_ptr){ ThreadContext *c = avctx->internal->frame_thread_encoder; - Task task; + Task *outtask, task; int ret; av_assert1(!*got_packet_ptr); @@ -298,27 +310,28 @@ int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const AVF pthread_cond_signal(&c->task_fifo_cond); pthread_mutex_unlock(&c->task_fifo_mutex); - c->task_index = (c->task_index+1) % BUFFER_SIZE; + c->task_index = (c->task_index + 1) % c->max_tasks; } + outtask = &c->tasks[c->finished_task_index]; pthread_mutex_lock(&c->finished_task_mutex); if (c->task_index == c->finished_task_index || - (frame && !c->finished_tasks[c->finished_task_index].outdata && - (c->task_index - c->finished_task_index) % BUFFER_SIZE <= avctx->thread_count)) { + (frame && !outtask->finished && + (c->task_index - c->finished_task_index + c->max_tasks) % c->max_tasks <= avctx->thread_count)) { pthread_mutex_unlock(&c->finished_task_mutex); return 0; } - - while (!c->finished_tasks[c->finished_task_index].outdata) { + while (!outtask->finished) { pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex); } - task = c->finished_tasks[c->finished_task_index]; - *pkt = *(AVPacket*)(task.outdata); + /* We now own outtask completely: No worker thread touches it any more, + * because there is no outstanding task with this index. */ + outtask->finished = 0; + av_packet_move_ref(pkt, outtask->outdata); if(pkt->data) *got_packet_ptr = 1; - av_freep(&c->finished_tasks[c->finished_task_index].outdata); - c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE; + c->finished_task_index = (c->finished_task_index + 1) % c->max_tasks; pthread_mutex_unlock(&c->finished_task_mutex); - return task.return_code; + return outtask->return_code; }