ffmpeg/libavutil/slicethread.c

282 lines
8.0 KiB
C

/*
* This file is part of FFmpeg.
*
* FFmpeg is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* FFmpeg is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with FFmpeg; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <stdatomic.h>
#include "cpu.h"
#include "internal.h"
#include "slicethread.h"
#include "mem.h"
#include "thread.h"
#include "avassert.h"
#define MAX_AUTO_THREADS 16
#if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
typedef struct WorkerContext {
AVSliceThread *ctx;
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t thread;
int done;
} WorkerContext;
struct AVSliceThread {
WorkerContext *workers;
int nb_threads;
int nb_active_threads;
int nb_jobs;
atomic_uint first_job;
atomic_uint current_job;
pthread_mutex_t done_mutex;
pthread_cond_t done_cond;
int done;
int finished;
void *priv;
void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
void (*main_func)(void *priv);
};
static int run_jobs(AVSliceThread *ctx)
{
unsigned nb_jobs = ctx->nb_jobs;
unsigned nb_active_threads = ctx->nb_active_threads;
unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
unsigned current_job = first_job;
do {
ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
} while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
return current_job == nb_jobs + nb_active_threads - 1;
}
static void *attribute_align_arg thread_worker(void *v)
{
WorkerContext *w = v;
AVSliceThread *ctx = w->ctx;
pthread_mutex_lock(&w->mutex);
pthread_cond_signal(&w->cond);
while (1) {
w->done = 1;
while (w->done)
pthread_cond_wait(&w->cond, &w->mutex);
if (ctx->finished) {
pthread_mutex_unlock(&w->mutex);
return NULL;
}
if (run_jobs(ctx)) {
pthread_mutex_lock(&ctx->done_mutex);
ctx->done = 1;
pthread_cond_signal(&ctx->done_cond);
pthread_mutex_unlock(&ctx->done_mutex);
}
}
}
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
void (*main_func)(void *priv),
int nb_threads)
{
AVSliceThread *ctx;
int nb_workers, i;
int ret;
av_assert0(nb_threads >= 0);
if (!nb_threads) {
int nb_cpus = av_cpu_count();
if (nb_cpus > 1)
nb_threads = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
else
nb_threads = 1;
}
nb_workers = nb_threads;
if (!main_func)
nb_workers--;
*pctx = ctx = av_mallocz(sizeof(*ctx));
if (!ctx)
return AVERROR(ENOMEM);
if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
av_freep(pctx);
return AVERROR(ENOMEM);
}
ctx->priv = priv;
ctx->worker_func = worker_func;
ctx->main_func = main_func;
ctx->nb_threads = nb_threads;
ctx->nb_active_threads = 0;
ctx->nb_jobs = 0;
ctx->finished = 0;
atomic_init(&ctx->first_job, 0);
atomic_init(&ctx->current_job, 0);
ret = pthread_mutex_init(&ctx->done_mutex, NULL);
if (ret) {
av_freep(&ctx->workers);
av_freep(pctx);
return AVERROR(ret);
}
ret = pthread_cond_init(&ctx->done_cond, NULL);
if (ret) {
ctx->nb_threads = main_func ? 0 : 1;
avpriv_slicethread_free(pctx);
return AVERROR(ret);
}
ctx->done = 0;
for (i = 0; i < nb_workers; i++) {
WorkerContext *w = &ctx->workers[i];
int ret;
w->ctx = ctx;
ret = pthread_mutex_init(&w->mutex, NULL);
if (ret) {
ctx->nb_threads = main_func ? i : i + 1;
avpriv_slicethread_free(pctx);
return AVERROR(ret);
}
ret = pthread_cond_init(&w->cond, NULL);
if (ret) {
pthread_mutex_destroy(&w->mutex);
ctx->nb_threads = main_func ? i : i + 1;
avpriv_slicethread_free(pctx);
return AVERROR(ret);
}
pthread_mutex_lock(&w->mutex);
w->done = 0;
if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
ctx->nb_threads = main_func ? i : i + 1;
pthread_mutex_unlock(&w->mutex);
pthread_cond_destroy(&w->cond);
pthread_mutex_destroy(&w->mutex);
avpriv_slicethread_free(pctx);
return AVERROR(ret);
}
while (!w->done)
pthread_cond_wait(&w->cond, &w->mutex);
pthread_mutex_unlock(&w->mutex);
}
return nb_threads;
}
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
{
int nb_workers, i, is_last = 0;
av_assert0(nb_jobs > 0);
ctx->nb_jobs = nb_jobs;
ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
nb_workers = ctx->nb_active_threads;
if (!ctx->main_func || !execute_main)
nb_workers--;
for (i = 0; i < nb_workers; i++) {
WorkerContext *w = &ctx->workers[i];
pthread_mutex_lock(&w->mutex);
w->done = 0;
pthread_cond_signal(&w->cond);
pthread_mutex_unlock(&w->mutex);
}
if (ctx->main_func && execute_main)
ctx->main_func(ctx->priv);
else
is_last = run_jobs(ctx);
if (!is_last) {
pthread_mutex_lock(&ctx->done_mutex);
while (!ctx->done)
pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
ctx->done = 0;
pthread_mutex_unlock(&ctx->done_mutex);
}
}
void avpriv_slicethread_free(AVSliceThread **pctx)
{
AVSliceThread *ctx;
int nb_workers, i;
if (!pctx || !*pctx)
return;
ctx = *pctx;
nb_workers = ctx->nb_threads;
if (!ctx->main_func)
nb_workers--;
ctx->finished = 1;
for (i = 0; i < nb_workers; i++) {
WorkerContext *w = &ctx->workers[i];
pthread_mutex_lock(&w->mutex);
w->done = 0;
pthread_cond_signal(&w->cond);
pthread_mutex_unlock(&w->mutex);
}
for (i = 0; i < nb_workers; i++) {
WorkerContext *w = &ctx->workers[i];
pthread_join(w->thread, NULL);
pthread_cond_destroy(&w->cond);
pthread_mutex_destroy(&w->mutex);
}
pthread_cond_destroy(&ctx->done_cond);
pthread_mutex_destroy(&ctx->done_mutex);
av_freep(&ctx->workers);
av_freep(pctx);
}
#else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
void (*main_func)(void *priv),
int nb_threads)
{
*pctx = NULL;
return AVERROR(ENOSYS);
}
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
{
av_assert0(0);
}
void avpriv_slicethread_free(AVSliceThread **pctx)
{
av_assert0(!pctx || !*pctx);
}
#endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */