[FFmpeg-devel] [PATCH 1/2] lavc/pthread_slice: unbreak WPP/progress2 code
Anton Khirnov
anton at khirnov.net
Thu Oct 10 18:28:13 EEST 2024
It currently associates a progress value with a thread rather than a
job, relying on the broken assumption that a job's thread number is
equal to its job number modulo thread count.
This changes the API to associate a mutex/cond/progress value with every
job. Since job count may change dynamically,
the ff_slice_thread_init_progress() function - previously called in
decoder init - is eliminated as there is nothing left for it to do.
The per-frame (and previously horribly-misnamed)
ff_slice_thread_allocz_entries() takes over the name and the
mutex/condvar-initializing role of ff_slice_thread_init_progress().
await/report_progress2() lose their thread argument, as it is no longer
used. The misnamed 'field' argument is renamed to 'job', is it is the
job whose progress value we are interested in. The functions also no
longer peform nontrivial manipulation of the progress target/value, as
that hardcodes caller/codec-specific semantics into what should be a
generic API.
Fixes races and deadlocks in hevdec with slice threading, e.g. some of
those mentioned in #11221.
---
libavcodec/hevc/hevcdec.c | 26 ++++----
libavcodec/pthread_slice.c | 125 ++++++++++++++++++-------------------
libavcodec/thread.h | 8 +--
3 files changed, 76 insertions(+), 83 deletions(-)
diff --git a/libavcodec/hevc/hevcdec.c b/libavcodec/hevc/hevcdec.c
index 0dc24f82f8..1f80bbe8ab 100644
--- a/libavcodec/hevc/hevcdec.c
+++ b/libavcodec/hevc/hevcdec.c
@@ -2751,6 +2751,8 @@ static int hls_decode_entry_wpp(AVCodecContext *avctx, void *hevc_lclist,
const uint8_t *data = s->data + s->sh.offset[ctb_row];
const size_t data_size = s->sh.size[ctb_row];
+ int progress = 0;
+
int ret;
if (ctb_row)
@@ -2762,13 +2764,15 @@ static int hls_decode_entry_wpp(AVCodecContext *avctx, void *hevc_lclist,
hls_decode_neighbour(lc, l, pps, sps, x_ctb, y_ctb, ctb_addr_ts);
- ff_thread_await_progress2(s->avctx, ctb_row, thread, SHIFT_CTB_WPP);
+ if (ctb_row)
+ ff_thread_await_progress2(s->avctx, ctb_row - 1,
+ progress + SHIFT_CTB_WPP + 1);
/* atomic_load's prototype requires a pointer to non-const atomic variable
* (due to implementations via mutexes, where reads involve writes).
* Of course, casting const away here is nevertheless safe. */
if (atomic_load((atomic_int*)&s->wpp_err)) {
- ff_thread_report_progress2(s->avctx, ctb_row , thread, SHIFT_CTB_WPP);
+ ff_thread_report_progress2(s->avctx, ctb_row, INT_MAX);
return 0;
}
@@ -2792,19 +2796,19 @@ static int hls_decode_entry_wpp(AVCodecContext *avctx, void *hevc_lclist,
ctb_addr_ts++;
ff_hevc_save_states(lc, pps, ctb_addr_ts);
- ff_thread_report_progress2(s->avctx, ctb_row, thread, 1);
+ ff_thread_report_progress2(s->avctx, ctb_row, ++progress);
ff_hevc_hls_filters(lc, l, pps, x_ctb, y_ctb, ctb_size);
if (!more_data && (x_ctb+ctb_size) < sps->width && ctb_row != s->sh.num_entry_point_offsets) {
/* Casting const away here is safe, because it is an atomic operation. */
atomic_store((atomic_int*)&s->wpp_err, 1);
- ff_thread_report_progress2(s->avctx, ctb_row ,thread, SHIFT_CTB_WPP);
+ ff_thread_report_progress2(s->avctx, ctb_row, INT_MAX);
return 0;
}
if ((x_ctb+ctb_size) >= sps->width && (y_ctb+ctb_size) >= sps->height ) {
ff_hevc_hls_filter(lc, l, pps, x_ctb, y_ctb, ctb_size);
- ff_thread_report_progress2(s->avctx, ctb_row , thread, SHIFT_CTB_WPP);
+ ff_thread_report_progress2(s->avctx, ctb_row, INT_MAX);
return ctb_addr_ts;
}
ctb_addr_rs = pps->ctb_addr_ts_to_rs[ctb_addr_ts];
@@ -2814,14 +2818,14 @@ static int hls_decode_entry_wpp(AVCodecContext *avctx, void *hevc_lclist,
break;
}
}
- ff_thread_report_progress2(s->avctx, ctb_row ,thread, SHIFT_CTB_WPP);
+ ff_thread_report_progress2(s->avctx, ctb_row, INT_MAX);
return 0;
error:
l->tab_slice_address[ctb_addr_rs] = -1;
/* Casting const away here is safe, because it is an atomic operation. */
atomic_store((atomic_int*)&s->wpp_err, 1);
- ff_thread_report_progress2(s->avctx, ctb_row ,thread, SHIFT_CTB_WPP);
+ ff_thread_report_progress2(s->avctx, ctb_row, INT_MAX);
return ret;
}
@@ -2909,7 +2913,7 @@ static int hls_slice_data_wpp(HEVCContext *s, const H2645NAL *nal)
}
atomic_store(&s->wpp_err, 0);
- res = ff_slice_thread_allocz_entries(s->avctx, s->sh.num_entry_point_offsets + 1);
+ res = ff_slice_thread_init_progress(s->avctx, s->sh.num_entry_point_offsets + 1);
if (res < 0)
return res;
@@ -3981,12 +3985,6 @@ static av_cold int hevc_decode_init(AVCodecContext *avctx)
HEVCContext *s = avctx->priv_data;
int ret;
- if (avctx->active_thread_type & FF_THREAD_SLICE) {
- ret = ff_slice_thread_init_progress(avctx);
- if (ret < 0)
- return ret;
- }
-
ret = hevc_init_context(avctx);
if (ret < 0)
return ret;
diff --git a/libavcodec/pthread_slice.c b/libavcodec/pthread_slice.c
index a4d31c6f4d..93e452e99f 100644
--- a/libavcodec/pthread_slice.c
+++ b/libavcodec/pthread_slice.c
@@ -44,6 +44,9 @@ typedef int (main_func)(AVCodecContext *c);
typedef struct Progress {
pthread_cond_t cond;
pthread_mutex_t mutex;
+ int initialized;
+
+ int progress;
} Progress;
typedef struct SliceThreadContext {
@@ -55,10 +58,9 @@ typedef struct SliceThreadContext {
int *rets;
int job_size;
- int *entries;
- int entries_count;
- int thread_count;
Progress *progress;
+ int nb_progress;
+ int nb_progress_allocated;
} SliceThreadContext;
static void main_function(void *priv) {
@@ -82,17 +84,19 @@ static void worker_func(void *priv, int jobnr, int threadnr, int nb_jobs, int nb
void ff_slice_thread_free(AVCodecContext *avctx)
{
SliceThreadContext *c = avctx->internal->thread_ctx;
- int i;
avpriv_slicethread_free(&c->thread);
- for (i = 0; i < c->thread_count; i++) {
+ for (int i = 0; i < c->nb_progress_allocated; i++) {
Progress *const progress = &c->progress[i];
+
+ if (!progress->initialized)
+ continue;
+
pthread_mutex_destroy(&progress->mutex);
pthread_cond_destroy(&progress->cond);
}
- av_freep(&c->entries);
av_freep(&c->progress);
av_freep(&avctx->internal->thread_ctx);
}
@@ -176,85 +180,76 @@ int ff_slice_thread_init(AVCodecContext *avctx)
return 0;
}
-int av_cold ff_slice_thread_init_progress(AVCodecContext *avctx)
-{
- SliceThreadContext *const p = avctx->internal->thread_ctx;
- int err, i = 0, thread_count = avctx->thread_count;
-
- p->progress = av_calloc(thread_count, sizeof(*p->progress));
- if (!p->progress) {
- err = AVERROR(ENOMEM);
- goto fail;
- }
-
- for (; i < thread_count; i++) {
- Progress *const progress = &p->progress[i];
- err = pthread_mutex_init(&progress->mutex, NULL);
- if (err) {
- err = AVERROR(err);
- goto fail;
- }
- err = pthread_cond_init (&progress->cond, NULL);
- if (err) {
- err = AVERROR(err);
- pthread_mutex_destroy(&progress->mutex);
- goto fail;
- }
- }
- err = 0;
-fail:
- p->thread_count = i;
- return err;
-}
-
-void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n)
+void ff_thread_report_progress2(AVCodecContext *avctx, int job, int val)
{
SliceThreadContext *p = avctx->internal->thread_ctx;
- Progress *const progress = &p->progress[thread];
- int *entries = p->entries;
+ Progress *const progress = &p->progress[job];
pthread_mutex_lock(&progress->mutex);
- entries[field] +=n;
+ progress->progress = FFMAX(val, progress->progress);
pthread_cond_signal(&progress->cond);
pthread_mutex_unlock(&progress->mutex);
}
-void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift)
+void ff_thread_await_progress2(AVCodecContext *avctx, int job, int val)
{
SliceThreadContext *p = avctx->internal->thread_ctx;
- Progress *progress;
- int *entries = p->entries;
-
- if (!entries || !field) return;
-
- thread = thread ? thread - 1 : p->thread_count - 1;
- progress = &p->progress[thread];
+ Progress *progress = &p->progress[job];
pthread_mutex_lock(&progress->mutex);
- while ((entries[field - 1] - entries[field]) < shift){
+
+ while (progress->progress < val)
pthread_cond_wait(&progress->cond, &progress->mutex);
- }
+
pthread_mutex_unlock(&progress->mutex);
}
-int ff_slice_thread_allocz_entries(AVCodecContext *avctx, int count)
+static int progress_init(Progress *progress)
{
- if (avctx->active_thread_type & FF_THREAD_SLICE) {
- SliceThreadContext *p = avctx->internal->thread_ctx;
+ int ret;
- if (p->entries_count == count) {
- memset(p->entries, 0, p->entries_count * sizeof(*p->entries));
- return 0;
- }
- av_freep(&p->entries);
+ ret = pthread_mutex_init(&progress->mutex, NULL);
+ if (ret)
+ return AVERROR(ret);
- p->entries = av_calloc(count, sizeof(*p->entries));
- if (!p->entries) {
- p->entries_count = 0;
- return AVERROR(ENOMEM);
- }
- p->entries_count = count;
+ ret = pthread_cond_init (&progress->cond, NULL);
+ if (ret) {
+ pthread_mutex_destroy(&progress->mutex);
+ return AVERROR(ret);
}
+ progress->initialized = 1;
+
+ return 0;
+}
+
+int ff_slice_thread_init_progress(AVCodecContext *avctx, int count)
+{
+ SliceThreadContext *c = avctx->internal->thread_ctx;
+
+ if (!(avctx->active_thread_type & FF_THREAD_SLICE))
+ return 0;
+
+ if (c->nb_progress_allocated < count) {
+ void *tmp = av_realloc_array(c->progress, count, sizeof(*c->progress));
+ if (!tmp)
+ return AVERROR(ENOMEM);
+
+ c->progress = tmp;
+ memset(c->progress + c->nb_progress_allocated, 0,
+ (count - c->nb_progress_allocated) * sizeof(*c->progress));
+
+ for (int i = c->nb_progress_allocated; i < count; i++) {
+ int ret = progress_init(&c->progress[i]);
+ if (ret < 0)
+ return ret;
+ c->nb_progress_allocated = i + 1;
+ }
+ }
+ c->nb_progress = count;
+
+ for (int i = 0; i < c->nb_progress; i++)
+ c->progress[i].progress = 0;
+
return 0;
}
diff --git a/libavcodec/thread.h b/libavcodec/thread.h
index 47c00a0ed2..db3ec0b98c 100644
--- a/libavcodec/thread.h
+++ b/libavcodec/thread.h
@@ -56,10 +56,10 @@ int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f, int flags);
int ff_slice_thread_execute_with_mainfunc(AVCodecContext *avctx,
int (*action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr),
int (*main_func)(AVCodecContext *c), void *arg, int *ret, int job_count);
-int ff_slice_thread_allocz_entries(AVCodecContext *avctx, int count);
-int ff_slice_thread_init_progress(AVCodecContext *avctx);
-void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n);
-void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift);
+
+int ff_slice_thread_init_progress(AVCodecContext *avctx, int job_count);
+void ff_thread_report_progress2(AVCodecContext *avctx, int job, int val);
+void ff_thread_await_progress2(AVCodecContext *avctx, int job, int val);
enum ThreadingStatus {
FF_THREAD_IS_COPY,
--
2.43.0
More information about the ffmpeg-devel
mailing list