mirror of https://github.com/mpv-player/mpv
misc: add a thread pool
To be used by the following commits.
This commit is contained in:
parent
7bab24b9b3
commit
ec3dd7164c
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
* This file is part of mpv.
|
||||
*
|
||||
* mpv is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2.1 of the License, or (at your option) any later version.
|
||||
*
|
||||
* mpv is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with mpv. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include "common/common.h"
|
||||
|
||||
#include "thread_pool.h"
|
||||
|
||||
struct work {
|
||||
void (*fn)(void *ctx);
|
||||
void *fn_ctx;
|
||||
};
|
||||
|
||||
struct mp_thread_pool {
|
||||
pthread_t *threads;
|
||||
int num_threads;
|
||||
|
||||
pthread_mutex_t lock;
|
||||
pthread_cond_t wakeup;
|
||||
|
||||
// --- the following fields are protected by lock
|
||||
bool terminate;
|
||||
struct work *work;
|
||||
int num_work;
|
||||
};
|
||||
|
||||
static void *worker_thread(void *arg)
|
||||
{
|
||||
struct mp_thread_pool *pool = arg;
|
||||
|
||||
pthread_mutex_lock(&pool->lock);
|
||||
while (1) {
|
||||
while (!pool->num_work && !pool->terminate)
|
||||
pthread_cond_wait(&pool->wakeup, &pool->lock);
|
||||
|
||||
if (!pool->num_work && pool->terminate)
|
||||
break;
|
||||
|
||||
assert(pool->num_work > 0);
|
||||
struct work work = pool->work[pool->num_work - 1];
|
||||
pool->num_work -= 1;
|
||||
|
||||
pthread_mutex_unlock(&pool->lock);
|
||||
work.fn(work.fn_ctx);
|
||||
pthread_mutex_lock(&pool->lock);
|
||||
}
|
||||
assert(pool->num_work == 0);
|
||||
pthread_mutex_unlock(&pool->lock);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void thread_pool_dtor(void *ctx)
|
||||
{
|
||||
struct mp_thread_pool *pool = ctx;
|
||||
|
||||
pthread_mutex_lock(&pool->lock);
|
||||
pool->terminate = true;
|
||||
pthread_cond_broadcast(&pool->wakeup);
|
||||
pthread_mutex_unlock(&pool->lock);
|
||||
|
||||
for (int n = 0; n < pool->num_threads; n++)
|
||||
pthread_join(pool->threads[n], NULL);
|
||||
|
||||
assert(pool->num_work == 0);
|
||||
pthread_cond_destroy(&pool->wakeup);
|
||||
pthread_mutex_destroy(&pool->lock);
|
||||
}
|
||||
|
||||
// Create a thread pool with the given number of worker threads. This can return
|
||||
// NULL if the worker threads could not be created. The thread pool can be
|
||||
// destroyed with talloc_free(pool), or indirectly with talloc_free(ta_parent).
|
||||
// If there are still work items on freeing, it will block until all work items
|
||||
// are done, and the threads terminate.
|
||||
struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads)
|
||||
{
|
||||
assert(threads > 0);
|
||||
|
||||
struct mp_thread_pool *pool = talloc_zero(ta_parent, struct mp_thread_pool);
|
||||
talloc_set_destructor(pool, thread_pool_dtor);
|
||||
|
||||
pthread_mutex_init(&pool->lock, NULL);
|
||||
pthread_cond_init(&pool->wakeup, NULL);
|
||||
|
||||
for (int n = 0; n < threads; n++) {
|
||||
pthread_t thread;
|
||||
if (pthread_create(&thread, NULL, worker_thread, pool)) {
|
||||
talloc_free(pool);
|
||||
return NULL;
|
||||
}
|
||||
MP_TARRAY_APPEND(pool, pool->threads, pool->num_threads, thread);
|
||||
}
|
||||
|
||||
return pool;
|
||||
}
|
||||
|
||||
// Queue a function to be run on a worker thread: fn(fn_ctx)
|
||||
// If no worker thread is currently available, it's appended to a list in memory
|
||||
// with unbounded size. This function always returns immediately.
|
||||
// Concurrent queue calls are allowed, as long as it does not overlap with
|
||||
// pool destruction.
|
||||
void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
|
||||
void *fn_ctx)
|
||||
{
|
||||
pthread_mutex_lock(&pool->lock);
|
||||
struct work work = {fn, fn_ctx};
|
||||
MP_TARRAY_INSERT_AT(pool, pool->work, pool->num_work, 0, work);
|
||||
pthread_cond_signal(&pool->wakeup);
|
||||
pthread_mutex_unlock(&pool->lock);
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
#ifndef MPV_MP_THREAD_POOL_H
|
||||
#define MPV_MP_THREAD_POOL_H
|
||||
|
||||
struct mp_thread_pool;
|
||||
|
||||
struct mp_thread_pool *mp_thread_pool_create(void *ta_parent, int threads);
|
||||
void mp_thread_pool_queue(struct mp_thread_pool *pool, void (*fn)(void *ctx),
|
||||
void *fn_ctx);
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue