lavc/hevcdec: unbreak WPP/progress2 code

The "progress2" API in pthread_slice.c currently associates a progress
value with a thread rather than a job, relying on the broken assumption
that a job's thread number is equal to its job number modulo thread
count.

This removes this API entirely, and changes hevcdec to use a
ThreadProgress-based implementation that associates a
mutex/cond/progress value with every job.

Fixes races and deadlocks in hevdec with slice threading, e.g. some of
those mentioned in #11221.
This commit is contained in:
Anton Khirnov 2024-10-10 16:01:21 +02:00
parent c50f79a0dc
commit 79c47dfd25
5 changed files with 46 additions and 139 deletions

View File

@ -54,6 +54,7 @@
#include "progressframe.h"
#include "refstruct.h"
#include "thread.h"
#include "threadprogress.h"
static const uint8_t hevc_pel_weight[65] = { [2] = 0, [4] = 1, [6] = 2, [8] = 3, [12] = 4, [16] = 5, [24] = 6, [32] = 7, [48] = 8, [64] = 9 };
@ -2751,6 +2752,8 @@ static int hls_decode_entry_wpp(AVCodecContext *avctx, void *hevc_lclist,
const uint8_t *data = s->data + s->sh.offset[ctb_row];
const size_t data_size = s->sh.size[ctb_row];
int progress = 0;
int ret;
if (ctb_row)
@ -2762,13 +2765,15 @@ static int hls_decode_entry_wpp(AVCodecContext *avctx, void *hevc_lclist,
hls_decode_neighbour(lc, l, pps, sps, x_ctb, y_ctb, ctb_addr_ts);
ff_thread_await_progress2(s->avctx, ctb_row, thread, SHIFT_CTB_WPP);
if (ctb_row)
ff_thread_progress_await(&s->wpp_progress[ctb_row - 1],
progress + SHIFT_CTB_WPP + 1);
/* atomic_load's prototype requires a pointer to non-const atomic variable
* (due to implementations via mutexes, where reads involve writes).
* Of course, casting const away here is nevertheless safe. */
if (atomic_load((atomic_int*)&s->wpp_err)) {
ff_thread_report_progress2(s->avctx, ctb_row , thread, SHIFT_CTB_WPP);
ff_thread_progress_report(&s->wpp_progress[ctb_row], INT_MAX);
return 0;
}
@ -2792,19 +2797,19 @@ static int hls_decode_entry_wpp(AVCodecContext *avctx, void *hevc_lclist,
ctb_addr_ts++;
ff_hevc_save_states(lc, pps, ctb_addr_ts);
ff_thread_report_progress2(s->avctx, ctb_row, thread, 1);
ff_thread_progress_report(&s->wpp_progress[ctb_row], ++progress);
ff_hevc_hls_filters(lc, l, pps, x_ctb, y_ctb, ctb_size);
if (!more_data && (x_ctb+ctb_size) < sps->width && ctb_row != s->sh.num_entry_point_offsets) {
/* Casting const away here is safe, because it is an atomic operation. */
atomic_store((atomic_int*)&s->wpp_err, 1);
ff_thread_report_progress2(s->avctx, ctb_row ,thread, SHIFT_CTB_WPP);
ff_thread_progress_report(&s->wpp_progress[ctb_row], INT_MAX);
return 0;
}
if ((x_ctb+ctb_size) >= sps->width && (y_ctb+ctb_size) >= sps->height ) {
ff_hevc_hls_filter(lc, l, pps, x_ctb, y_ctb, ctb_size);
ff_thread_report_progress2(s->avctx, ctb_row , thread, SHIFT_CTB_WPP);
ff_thread_progress_report(&s->wpp_progress[ctb_row], INT_MAX);
return ctb_addr_ts;
}
ctb_addr_rs = pps->ctb_addr_ts_to_rs[ctb_addr_ts];
@ -2814,17 +2819,43 @@ static int hls_decode_entry_wpp(AVCodecContext *avctx, void *hevc_lclist,
break;
}
}
ff_thread_report_progress2(s->avctx, ctb_row ,thread, SHIFT_CTB_WPP);
ff_thread_progress_report(&s->wpp_progress[ctb_row], INT_MAX);
return 0;
error:
l->tab_slice_address[ctb_addr_rs] = -1;
/* Casting const away here is safe, because it is an atomic operation. */
atomic_store((atomic_int*)&s->wpp_err, 1);
ff_thread_report_progress2(s->avctx, ctb_row ,thread, SHIFT_CTB_WPP);
ff_thread_progress_report(&s->wpp_progress[ctb_row], INT_MAX);
return ret;
}
static int wpp_progress_init(HEVCContext *s, unsigned count)
{
if (s->nb_wpp_progress < count) {
void *tmp = av_realloc_array(s->wpp_progress, count,
sizeof(*s->wpp_progress));
if (!tmp)
return AVERROR(ENOMEM);
s->wpp_progress = tmp;
memset(s->wpp_progress + s->nb_wpp_progress, 0,
(count - s->nb_wpp_progress) * sizeof(*s->wpp_progress));
for (int i = s->nb_wpp_progress; i < count; i++) {
int ret = ff_thread_progress_init(&s->wpp_progress[i], 1);
if (ret < 0)
return ret;
s->nb_wpp_progress = i + 1;
}
}
for (int i = 0; i < count; i++)
ff_thread_progress_reset(&s->wpp_progress[i]);
return 0;
}
static int hls_slice_data_wpp(HEVCContext *s, const H2645NAL *nal)
{
const HEVCPPS *const pps = s->pps;
@ -2909,7 +2940,7 @@ static int hls_slice_data_wpp(HEVCContext *s, const H2645NAL *nal)
}
atomic_store(&s->wpp_err, 0);
res = ff_slice_thread_allocz_entries(s->avctx, s->sh.num_entry_point_offsets + 1);
res = wpp_progress_init(s, s->sh.num_entry_point_offsets + 1);
if (res < 0)
return res;
@ -3826,6 +3857,10 @@ static av_cold int hevc_decode_free(AVCodecContext *avctx)
ff_hevc_ps_uninit(&s->ps);
for (int i = 0; i < s->nb_wpp_progress; i++)
ff_thread_progress_destroy(&s->wpp_progress[i]);
av_freep(&s->wpp_progress);
av_freep(&s->sh.entry_point_offset);
av_freep(&s->sh.offset);
av_freep(&s->sh.size);
@ -3981,12 +4016,6 @@ static av_cold int hevc_decode_init(AVCodecContext *avctx)
HEVCContext *s = avctx->priv_data;
int ret;
if (avctx->active_thread_type & FF_THREAD_SLICE) {
ret = ff_slice_thread_init_progress(avctx);
if (ret < 0)
return ret;
}
ret = hevc_init_context(avctx);
if (ret < 0)
return ret;

View File

@ -540,6 +540,9 @@ typedef struct HEVCContext {
/** The target for the common_cabac_state of the local contexts. */
HEVCCABACState cabac;
struct ThreadProgress *wpp_progress;
unsigned nb_wpp_progress;
atomic_int wpp_err;
const uint8_t *data;

View File

@ -41,11 +41,6 @@ typedef int (action_func)(AVCodecContext *c, void *arg);
typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
typedef int (main_func)(AVCodecContext *c);
typedef struct Progress {
pthread_cond_t cond;
pthread_mutex_t mutex;
} Progress;
typedef struct SliceThreadContext {
AVSliceThread *thread;
action_func *func;
@ -54,11 +49,6 @@ typedef struct SliceThreadContext {
void *args;
int *rets;
int job_size;
int *entries;
int entries_count;
int thread_count;
Progress *progress;
} SliceThreadContext;
static void main_function(void *priv) {
@ -82,18 +72,9 @@ static void worker_func(void *priv, int jobnr, int threadnr, int nb_jobs, int nb
void ff_slice_thread_free(AVCodecContext *avctx)
{
SliceThreadContext *c = avctx->internal->thread_ctx;
int i;
avpriv_slicethread_free(&c->thread);
for (i = 0; i < c->thread_count; i++) {
Progress *const progress = &c->progress[i];
pthread_mutex_destroy(&progress->mutex);
pthread_cond_destroy(&progress->cond);
}
av_freep(&c->entries);
av_freep(&c->progress);
av_freep(&avctx->internal->thread_ctx);
}
@ -175,86 +156,3 @@ int ff_slice_thread_init(AVCodecContext *avctx)
avctx->execute2 = thread_execute2;
return 0;
}
int av_cold ff_slice_thread_init_progress(AVCodecContext *avctx)
{
SliceThreadContext *const p = avctx->internal->thread_ctx;
int err, i = 0, thread_count = avctx->thread_count;
p->progress = av_calloc(thread_count, sizeof(*p->progress));
if (!p->progress) {
err = AVERROR(ENOMEM);
goto fail;
}
for (; i < thread_count; i++) {
Progress *const progress = &p->progress[i];
err = pthread_mutex_init(&progress->mutex, NULL);
if (err) {
err = AVERROR(err);
goto fail;
}
err = pthread_cond_init (&progress->cond, NULL);
if (err) {
err = AVERROR(err);
pthread_mutex_destroy(&progress->mutex);
goto fail;
}
}
err = 0;
fail:
p->thread_count = i;
return err;
}
void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n)
{
SliceThreadContext *p = avctx->internal->thread_ctx;
Progress *const progress = &p->progress[thread];
int *entries = p->entries;
pthread_mutex_lock(&progress->mutex);
entries[field] +=n;
pthread_cond_signal(&progress->cond);
pthread_mutex_unlock(&progress->mutex);
}
void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift)
{
SliceThreadContext *p = avctx->internal->thread_ctx;
Progress *progress;
int *entries = p->entries;
if (!entries || !field) return;
thread = thread ? thread - 1 : p->thread_count - 1;
progress = &p->progress[thread];
pthread_mutex_lock(&progress->mutex);
while ((entries[field - 1] - entries[field]) < shift){
pthread_cond_wait(&progress->cond, &progress->mutex);
}
pthread_mutex_unlock(&progress->mutex);
}
int ff_slice_thread_allocz_entries(AVCodecContext *avctx, int count)
{
if (avctx->active_thread_type & FF_THREAD_SLICE) {
SliceThreadContext *p = avctx->internal->thread_ctx;
if (p->entries_count == count) {
memset(p->entries, 0, p->entries_count * sizeof(*p->entries));
return 0;
}
av_freep(&p->entries);
p->entries = av_calloc(count, sizeof(*p->entries));
if (!p->entries) {
p->entries_count = 0;
return AVERROR(ENOMEM);
}
p->entries_count = count;
}
return 0;
}

View File

@ -56,10 +56,6 @@ int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f, int flags);
int ff_slice_thread_execute_with_mainfunc(AVCodecContext *avctx,
int (*action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr),
int (*main_func)(AVCodecContext *c), void *arg, int *ret, int job_count);
int ff_slice_thread_allocz_entries(AVCodecContext *avctx, int count);
int ff_slice_thread_init_progress(AVCodecContext *avctx);
void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n);
void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift);
enum ThreadingStatus {
FF_THREAD_IS_COPY,

View File

@ -913,25 +913,6 @@ int ff_thread_can_start_frame(AVCodecContext *avctx)
{
return 1;
}
int ff_slice_thread_init_progress(AVCodecContext *avctx)
{
return 0;
}
int ff_slice_thread_allocz_entries(AVCodecContext *avctx, int count)
{
return 0;
}
void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift)
{
}
void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n)
{
}
#endif
const uint8_t *avpriv_find_start_code(const uint8_t *restrict p,