[FFmpeg-devel] [PATCH 3/3] lavc/pthread_frame: rework the logic for updating thread contexts

Anton Khirnov anton at khirnov.net
Wed Nov 13 15:06:58 EET 2024


Propagating decoder state between per-thread contexts with frame
threading currently works as follows:
0)  Every frame thread has its own "child" decoder context,
1)  Frame thread T0 decodes the frame header and updates its context
    accordingly. At most one frame thread can be in this stage at any
    given time.
2)  Frame thread T0 calls ff_thread_finish_setup() to indicate that
    header decoding is done.
3a) Frame thread T0 proceeds with decoding frame data.
3b) The main thread calls the decoder's update_thread_context()
    callback, transferring T0's state to the next thread T1.

Since 3a) and 3b) run concurrently, during 3a) T0 must not write to any
context variables accessed by update_thread_context(), otherwise a data
race occurs. This approach turns out to be highly fragile in practice,
as developers are either not aware of this constraint, or fail to keep
it in mind while modifying decoders.

This commit aims to eliminate the possibility of such races by changing
the logic in the folowing way:
* child decoders are no longer permanently bound to worker threads, but
  are instead assigned dynamically only while decoding a single frame; a
  different decoder may be assigned to the same thread for a later frame
* with N frame threads, N+1 child decoder contexts are allocated
  (instead of N, as before), so at any time at least one decoder is
  idle (unassigned)
* when a frame thread calls ff_thread_finish_setup(), its context state
  is immediately and synchronously transferred to the idle context
* the idle context is then assigned to the next frame thread, whose
  previous decoder now becomes idle

With this approach, improperly updating a decoder context after
ff_thread_finish_setup() transforms from a race into a deterministic
failure to propagate the relevant variables to following frame threads,
which
* should be much easier to debug
* is no longer UB
---
 libavcodec/pthread_frame.c | 142 +++++++++++++++++++++----------------
 1 file changed, 81 insertions(+), 61 deletions(-)

diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index 8a52e418ad..1a5c5ea9ea 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -135,6 +135,7 @@ typedef struct FrameThreadContext {
      * Decoder contexts that get assigned to frame threads.
      */
     ChildDecoder *decoders;
+    unsigned   nb_decoders;
 
     PerThreadContext *threads;     ///< The contexts for each thread.
     PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on.
@@ -150,6 +151,8 @@ typedef struct FrameThreadContext {
     pthread_cond_t async_cond;
     int async_lock;
 
+    const FFCodec *codec;
+
     DecodedFrames df;
     int result;
 
@@ -158,8 +161,9 @@ typedef struct FrameThreadContext {
      */
     AVPacket *next_pkt;
 
-    int next_decoding;             ///< The next context to submit a packet to.
-    int next_finished;             ///< The next context to return output from.
+    int next_thread_submit;
+    int next_decoder;
+    int next_finished;             ///< The next thread to return output from.
 
     /* hwaccel state for thread-unsafe hwaccels is temporarily stored here in
      * order to transfer its ownership to the next decoding thread without the
@@ -194,11 +198,11 @@ static void async_unlock(FrameThreadContext *fctx)
 
 static void thread_set_name(PerThreadContext *p)
 {
-    AVCodecContext *avctx = p->avctx;
-    int idx = p - p->parent->threads;
+    const FrameThreadContext *fctx = p->parent;
+    int idx = p - fctx->threads;
     char name[16];
 
-    snprintf(name, sizeof(name), "av:%.7s:df%d", avctx->codec->name, idx);
+    snprintf(name, sizeof(name), "av:%.7s:df%d", fctx->codec->p.name, idx);
 
     ff_thread_setname(name);
 }
@@ -259,13 +263,13 @@ static void decoded_frames_free(DecodedFrames *df)
 static attribute_align_arg void *frame_worker_thread(void *arg)
 {
     PerThreadContext *p = arg;
-    AVCodecContext *avctx = p->avctx;
-    const FFCodec *codec = ffcodec(avctx->codec);
+    const FFCodec *codec = p->parent->codec;
 
     thread_set_name(p);
 
     pthread_mutex_lock(&p->mutex);
     while (1) {
+        AVCodecContext *avctx;
         int ret;
 
         while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die)
@@ -273,6 +277,8 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
 
         if (p->die) break;
 
+        // a decoder context was assigned to us by the main thread
+        avctx = p->avctx;
         if (!codec->update_thread_context) {
             ret = ff_thread_finish_setup(avctx);
             if (ret < 0) {
@@ -524,16 +530,30 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
                          AVPacket *in_pkt)
 {
     FrameThreadContext *fctx = p->parent;
-    ChildDecoder         *cd = p->avctx->internal->thread_ctx;
     PerThreadContext *prev_thread = fctx->prev_thread;
-    const AVCodec *codec = p->avctx->codec;
+    ChildDecoder *cd;
     int ret;
 
+    if (prev_thread) {
+        if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
+            pthread_mutex_lock(&prev_thread->progress_mutex);
+            while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
+                pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
+            pthread_mutex_unlock(&prev_thread->progress_mutex);
+        }
+
+    }
+    cd = &fctx->decoders[fctx->next_decoder];
+    fctx->next_decoder = (fctx->next_decoder + 1) % (user_avctx->thread_count + 1);
+
     pthread_mutex_lock(&p->mutex);
 
     av_packet_unref(p->avpkt);
     av_packet_move_ref(p->avpkt, in_pkt);
 
+    p->avctx   = cd->ctx;
+    cd->thread = p;
+
     if (AVPACKET_IS_EMPTY(p->avpkt))
         p->avctx->internal->draining = 1;
 
@@ -546,27 +566,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
                           (p->avctx->debug & FF_DEBUG_THREADS) != 0,
                           memory_order_relaxed);
 
-    if (prev_thread) {
-        if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
-            pthread_mutex_lock(&prev_thread->progress_mutex);
-            while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
-                pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
-            pthread_mutex_unlock(&prev_thread->progress_mutex);
-        }
-
-        /* codecs without delay might not be prepared to be called repeatedly here during
-         * flushing (vp3/theora), and also don't need to be, since from this point on, they
-         * will always return EOF anyway */
-        if (!p->avctx->internal->draining ||
-            (codec->capabilities & AV_CODEC_CAP_DELAY)) {
-            ret = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
-            if (ret) {
-                pthread_mutex_unlock(&p->mutex);
-                return ret;
-            }
-        }
-    }
-
     /* transfer the stashed hwaccel state, if any */
     av_assert0(!p->avctx->hwaccel || cd->hwaccel_threadsafe);
     if (!cd->hwaccel_threadsafe) {
@@ -580,7 +579,7 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
     pthread_mutex_unlock(&p->mutex);
 
     fctx->prev_thread = p;
-    fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count;
+    fctx->next_thread_submit = (fctx->next_thread_submit + 1) % user_avctx->thread_count;
 
     return 0;
 }
@@ -596,6 +595,7 @@ int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
 
     /* submit packets to threads while there are no buffered results to return */
     while (!fctx->df.nb_f && !fctx->result) {
+        ChildDecoder *cd;
         PerThreadContext *p;
 
         /* get a packet to be submitted to the next thread */
@@ -604,17 +604,18 @@ int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
         if (ret < 0 && ret != AVERROR_EOF)
             goto finish;
 
-        ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
+        ret = submit_packet(&fctx->threads[fctx->next_thread_submit], avctx,
                             fctx->next_pkt);
         if (ret < 0)
              goto finish;
 
         /* do not return any frames until all threads have something to do */
-        if (fctx->next_decoding != fctx->next_finished &&
+        if (fctx->next_thread_submit != fctx->next_finished &&
             !avctx->internal->draining)
             continue;
 
         p                   = &fctx->threads[fctx->next_finished];
+        cd                  = p->avctx->internal->thread_ctx;
         fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
 
         if (atomic_load(&p->state) != STATE_INPUT_READY) {
@@ -629,6 +630,9 @@ int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
         p->result    = 0;
         if (p->df.nb_f)
             FFSWAP(DecodedFrames, fctx->df, p->df);
+
+        p->avctx = NULL;
+        cd->thread = NULL;
     }
 
     /* a thread may return multiple frames AND an error
@@ -696,18 +700,30 @@ void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
 
 int ff_thread_finish_setup(AVCodecContext *avctx)
 {
+    FrameThreadContext *fctx;
     ChildDecoder *cd;
     PerThreadContext *p;
+    int ret = 0, err;
 
     if (!(avctx->active_thread_type & FF_THREAD_FRAME))
         return 0;
 
     cd = avctx->internal->thread_ctx;
     p  = cd->thread;
+    fctx = cd->parent;
 
     cd->hwaccel_threadsafe = avctx->hwaccel &&
                              (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE);
 
+    // transfer decoder state to an idle context that will then be submitted to
+    // the next worker thread
+    if (!avctx->internal->draining ||
+        (avctx->codec->capabilities & AV_CODEC_CAP_DELAY)) {
+        err = update_context_from_thread(fctx->decoders[fctx->next_decoder].ctx, avctx, 0);
+        if (err < 0)
+            ret = err;
+    }
+
     if (hwaccel_serial(avctx) && !p->hwaccel_serializing) {
         pthread_mutex_lock(&p->parent->hwaccel_mutex);
         p->hwaccel_serializing = 1;
@@ -742,7 +758,7 @@ int ff_thread_finish_setup(AVCodecContext *avctx)
     pthread_cond_broadcast(&p->progress_cond);
     pthread_mutex_unlock(&p->progress_mutex);
 
-    return 0;
+    return ret;
 }
 
 /// Waits for all threads to finish.
@@ -806,10 +822,13 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
     av_freep(&fctx->threads);
 
     // clean up child decoders
-    for (int i = 0; i < thread_count && fctx->decoders; i++) {
+    for (int i = 0; i < fctx->nb_decoders && fctx->decoders; i++) {
         ChildDecoder    *cd = &fctx->decoders[i];
         AVCodecContext *ctx = cd->ctx;
 
+        if (!ctx)
+            continue;
+
         if (ctx->internal) {
             if (codec->close && cd->needs_close)
                 codec->close(ctx);
@@ -853,8 +872,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
     av_freep(&avctx->internal->thread_ctx);
 }
 
-static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free,
-                                AVCodecContext *avctx,
+static av_cold int dec_ctx_init(ChildDecoder *cd, AVCodecContext *avctx,
                                 const FFCodec *codec, int first)
 {
     AVCodecContext *copy;
@@ -867,10 +885,6 @@ static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free,
     copy->decoded_side_data = NULL;
     copy->nb_decoded_side_data = 0;
 
-    /* From now on, this ChildDecoder will be cleaned up by
-     * ff_frame_thread_free in case of errors. */
-    (*decoders_to_free)++;
-
     cd->ctx = copy;
 
     copy->internal = ff_decode_internal_alloc();
@@ -934,7 +948,7 @@ static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free,
 }
 
 static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx,
-                               AVCodecContext *avctx, AVCodecContext *child)
+                               AVCodecContext *avctx)
 {
     int ret = 0;
 
@@ -948,7 +962,6 @@ static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx,
     atomic_init(&p->state, STATE_INPUT_READY);
 
     p->parent = fctx;
-    p->avctx  = child;
 
     ret = ff_pthread_init(p, per_thread_offsets);
     if (ret < 0)
@@ -958,7 +971,7 @@ static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx,
     if (!p->avpkt)
         return AVERROR(ENOMEM);
 
-    atomic_init(&p->debug_threads, (child->debug & FF_DEBUG_THREADS) != 0);
+    atomic_init(&p->debug_threads, (avctx->debug & FF_DEBUG_THREADS) != 0);
 
     ret = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p));
     if (ret < 0)
@@ -973,7 +986,7 @@ int ff_frame_thread_init(AVCodecContext *avctx)
     int thread_count = avctx->thread_count;
     const FFCodec *codec = ffcodec(avctx->codec);
     FrameThreadContext *fctx;
-    int err, decoders_to_free = 0;
+    int err;
 
     if (!thread_count) {
         int nb_cpus = av_cpu_count();
@@ -1004,24 +1017,28 @@ int ff_frame_thread_init(AVCodecContext *avctx)
     if (!fctx->next_pkt)
         return AVERROR(ENOMEM);
 
+    fctx->codec = codec;
+
     fctx->async_lock = 1;
 
     if (codec->p.type == AVMEDIA_TYPE_VIDEO)
         avctx->delay = avctx->thread_count - 1;
 
-    fctx->decoders = av_calloc(thread_count, sizeof(*fctx->decoders));
+    fctx->nb_decoders = thread_count + 1;
+    fctx->decoders = av_calloc(fctx->nb_decoders, sizeof(*fctx->decoders));
     if (!fctx->decoders) {
+        fctx->nb_decoders = 0;
         err = AVERROR(ENOMEM);
         goto error;
     }
 
-    for (; decoders_to_free < thread_count; ) {
-        ChildDecoder *cd = &fctx->decoders[decoders_to_free];
-        int first = !decoders_to_free;
+    for (int i = 0; i < fctx->nb_decoders; i++) {
+        ChildDecoder *cd = &fctx->decoders[i];
+        int first = !i;
 
         cd->parent = fctx;
 
-        err = dec_ctx_init(cd, &decoders_to_free, avctx, codec, first);
+        err = dec_ctx_init(cd, avctx, codec, first);
         if (err < 0)
             goto error;
     }
@@ -1035,9 +1052,7 @@ int ff_frame_thread_init(AVCodecContext *avctx)
     for (int i = 0; i < thread_count; i++) {
         PerThreadContext *p = &fctx->threads[i];
 
-        fctx->decoders[i].thread = p;
-
-        err = init_thread(p, fctx, avctx, fctx->decoders[i].ctx);
+        err = init_thread(p, fctx, avctx);
         if (err < 0)
             goto error;
     }
@@ -1045,7 +1060,7 @@ int ff_frame_thread_init(AVCodecContext *avctx)
     return 0;
 
 error:
-    ff_frame_thread_free(avctx, decoders_to_free);
+    ff_frame_thread_free(avctx, thread_count);
     return err;
 }
 
@@ -1056,12 +1071,12 @@ void ff_thread_flush(AVCodecContext *avctx)
     if (!fctx) return;
 
     park_frame_worker_threads(fctx, avctx->thread_count);
-    if (fctx->prev_thread) {
-        if (fctx->prev_thread != &fctx->threads[0])
-            update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0);
-    }
+    if (fctx->prev_thread &&
+        fctx->prev_thread->avctx != fctx->decoders[0].ctx)
+        update_context_from_thread(fctx->decoders[0].ctx, fctx->prev_thread->avctx, 0);
 
-    fctx->next_decoding = fctx->next_finished = 0;
+    fctx->next_thread_submit = fctx->next_finished = 0;
+    fctx->next_decoder = 0;
     fctx->prev_thread = NULL;
 
     decoded_frames_flush(&fctx->df);
@@ -1072,10 +1087,15 @@ void ff_thread_flush(AVCodecContext *avctx)
 
         decoded_frames_flush(&p->df);
         p->result = 0;
+        p->avctx = NULL;
     }
 
-    for (int i = 0; i < avctx->thread_count; i++)
-        avcodec_flush_buffers(fctx->decoders[i].ctx);
+    for (int i = 0; i < fctx->nb_decoders; i++) {
+        ChildDecoder *cd = &fctx->decoders[i];
+
+        cd->thread = NULL;
+        avcodec_flush_buffers(cd->ctx);
+    }
 }
 
 int ff_thread_can_start_frame(AVCodecContext *avctx)
-- 
2.43.0



More information about the ffmpeg-devel mailing list