mirror of
https://git.ffmpeg.org/ffmpeg.git
synced 2025-01-15 11:51:33 +00:00
da5f7799a0
This matches a similar cap on the number of automatic threads in libavcodec/pthread_slice.c. On systems with lots of cores, this fixes a couple fate failures in 32 bit mode on such machines (where spawning a huge number of threads runs out of address space). Signed-off-by: Martin Storsjö <martin@martin.st>
260 lines
7.4 KiB
C
260 lines
7.4 KiB
C
/*
|
|
* This file is part of FFmpeg.
|
|
*
|
|
* FFmpeg is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2.1 of the License, or (at your option) any later version.
|
|
*
|
|
* FFmpeg is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public
|
|
* License along with FFmpeg; if not, write to the Free Software
|
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
#include <stdatomic.h>
|
|
#include "cpu.h"
|
|
#include "internal.h"
|
|
#include "slicethread.h"
|
|
#include "mem.h"
|
|
#include "thread.h"
|
|
#include "avassert.h"
|
|
|
|
#define MAX_AUTO_THREADS 16
|
|
|
|
#if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
|
|
|
|
typedef struct WorkerContext {
|
|
AVSliceThread *ctx;
|
|
pthread_mutex_t mutex;
|
|
pthread_cond_t cond;
|
|
pthread_t thread;
|
|
int done;
|
|
} WorkerContext;
|
|
|
|
struct AVSliceThread {
|
|
WorkerContext *workers;
|
|
int nb_threads;
|
|
int nb_active_threads;
|
|
int nb_jobs;
|
|
|
|
atomic_uint first_job;
|
|
atomic_uint current_job;
|
|
pthread_mutex_t done_mutex;
|
|
pthread_cond_t done_cond;
|
|
int done;
|
|
int finished;
|
|
|
|
void *priv;
|
|
void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
|
|
void (*main_func)(void *priv);
|
|
};
|
|
|
|
static int run_jobs(AVSliceThread *ctx)
|
|
{
|
|
unsigned nb_jobs = ctx->nb_jobs;
|
|
unsigned nb_active_threads = ctx->nb_active_threads;
|
|
unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
|
|
unsigned current_job = first_job;
|
|
|
|
do {
|
|
ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
|
|
} while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
|
|
|
|
return current_job == nb_jobs + nb_active_threads - 1;
|
|
}
|
|
|
|
static void *attribute_align_arg thread_worker(void *v)
|
|
{
|
|
WorkerContext *w = v;
|
|
AVSliceThread *ctx = w->ctx;
|
|
|
|
pthread_mutex_lock(&w->mutex);
|
|
pthread_cond_signal(&w->cond);
|
|
|
|
while (1) {
|
|
w->done = 1;
|
|
while (w->done)
|
|
pthread_cond_wait(&w->cond, &w->mutex);
|
|
|
|
if (ctx->finished) {
|
|
pthread_mutex_unlock(&w->mutex);
|
|
return NULL;
|
|
}
|
|
|
|
if (run_jobs(ctx)) {
|
|
pthread_mutex_lock(&ctx->done_mutex);
|
|
ctx->done = 1;
|
|
pthread_cond_signal(&ctx->done_cond);
|
|
pthread_mutex_unlock(&ctx->done_mutex);
|
|
}
|
|
}
|
|
}
|
|
|
|
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
|
|
void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
|
|
void (*main_func)(void *priv),
|
|
int nb_threads)
|
|
{
|
|
AVSliceThread *ctx;
|
|
int nb_workers, i;
|
|
|
|
av_assert0(nb_threads >= 0);
|
|
if (!nb_threads) {
|
|
int nb_cpus = av_cpu_count();
|
|
if (nb_cpus > 1)
|
|
nb_threads = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
|
|
else
|
|
nb_threads = 1;
|
|
}
|
|
|
|
nb_workers = nb_threads;
|
|
if (!main_func)
|
|
nb_workers--;
|
|
|
|
*pctx = ctx = av_mallocz(sizeof(*ctx));
|
|
if (!ctx)
|
|
return AVERROR(ENOMEM);
|
|
|
|
if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
|
|
av_freep(pctx);
|
|
return AVERROR(ENOMEM);
|
|
}
|
|
|
|
ctx->priv = priv;
|
|
ctx->worker_func = worker_func;
|
|
ctx->main_func = main_func;
|
|
ctx->nb_threads = nb_threads;
|
|
ctx->nb_active_threads = 0;
|
|
ctx->nb_jobs = 0;
|
|
ctx->finished = 0;
|
|
|
|
atomic_init(&ctx->first_job, 0);
|
|
atomic_init(&ctx->current_job, 0);
|
|
pthread_mutex_init(&ctx->done_mutex, NULL);
|
|
pthread_cond_init(&ctx->done_cond, NULL);
|
|
ctx->done = 0;
|
|
|
|
for (i = 0; i < nb_workers; i++) {
|
|
WorkerContext *w = &ctx->workers[i];
|
|
int ret;
|
|
w->ctx = ctx;
|
|
pthread_mutex_init(&w->mutex, NULL);
|
|
pthread_cond_init(&w->cond, NULL);
|
|
pthread_mutex_lock(&w->mutex);
|
|
w->done = 0;
|
|
|
|
if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
|
|
ctx->nb_threads = main_func ? i : i + 1;
|
|
pthread_mutex_unlock(&w->mutex);
|
|
pthread_cond_destroy(&w->cond);
|
|
pthread_mutex_destroy(&w->mutex);
|
|
avpriv_slicethread_free(pctx);
|
|
return AVERROR(ret);
|
|
}
|
|
|
|
while (!w->done)
|
|
pthread_cond_wait(&w->cond, &w->mutex);
|
|
pthread_mutex_unlock(&w->mutex);
|
|
}
|
|
|
|
return nb_threads;
|
|
}
|
|
|
|
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
|
|
{
|
|
int nb_workers, i, is_last = 0;
|
|
|
|
av_assert0(nb_jobs > 0);
|
|
ctx->nb_jobs = nb_jobs;
|
|
ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
|
|
atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
|
|
atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
|
|
nb_workers = ctx->nb_active_threads;
|
|
if (!ctx->main_func || !execute_main)
|
|
nb_workers--;
|
|
|
|
for (i = 0; i < nb_workers; i++) {
|
|
WorkerContext *w = &ctx->workers[i];
|
|
pthread_mutex_lock(&w->mutex);
|
|
w->done = 0;
|
|
pthread_cond_signal(&w->cond);
|
|
pthread_mutex_unlock(&w->mutex);
|
|
}
|
|
|
|
if (ctx->main_func && execute_main)
|
|
ctx->main_func(ctx->priv);
|
|
else
|
|
is_last = run_jobs(ctx);
|
|
|
|
if (!is_last) {
|
|
pthread_mutex_lock(&ctx->done_mutex);
|
|
while (!ctx->done)
|
|
pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
|
|
ctx->done = 0;
|
|
pthread_mutex_unlock(&ctx->done_mutex);
|
|
}
|
|
}
|
|
|
|
void avpriv_slicethread_free(AVSliceThread **pctx)
|
|
{
|
|
AVSliceThread *ctx;
|
|
int nb_workers, i;
|
|
|
|
if (!pctx || !*pctx)
|
|
return;
|
|
|
|
ctx = *pctx;
|
|
nb_workers = ctx->nb_threads;
|
|
if (!ctx->main_func)
|
|
nb_workers--;
|
|
|
|
ctx->finished = 1;
|
|
for (i = 0; i < nb_workers; i++) {
|
|
WorkerContext *w = &ctx->workers[i];
|
|
pthread_mutex_lock(&w->mutex);
|
|
w->done = 0;
|
|
pthread_cond_signal(&w->cond);
|
|
pthread_mutex_unlock(&w->mutex);
|
|
}
|
|
|
|
for (i = 0; i < nb_workers; i++) {
|
|
WorkerContext *w = &ctx->workers[i];
|
|
pthread_join(w->thread, NULL);
|
|
pthread_cond_destroy(&w->cond);
|
|
pthread_mutex_destroy(&w->mutex);
|
|
}
|
|
|
|
pthread_cond_destroy(&ctx->done_cond);
|
|
pthread_mutex_destroy(&ctx->done_mutex);
|
|
av_freep(&ctx->workers);
|
|
av_freep(pctx);
|
|
}
|
|
|
|
#else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
|
|
|
|
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
|
|
void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
|
|
void (*main_func)(void *priv),
|
|
int nb_threads)
|
|
{
|
|
*pctx = NULL;
|
|
return AVERROR(ENOSYS);
|
|
}
|
|
|
|
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
|
|
{
|
|
av_assert0(0);
|
|
}
|
|
|
|
void avpriv_slicethread_free(AVSliceThread **pctx)
|
|
{
|
|
av_assert0(!pctx || !*pctx);
|
|
}
|
|
|
|
#endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
|