[FFmpeg-devel] [PATCH 1/3] lavc/pthread_frame: separate child decoders from thread state

Anton Khirnov anton at khirnov.net
Wed Nov 13 15:06:56 EET 2024


Should have no functional effect on its own, but will be useful in
following commits.
---
 libavcodec/pthread_frame.c | 253 +++++++++++++++++++++++--------------
 1 file changed, 155 insertions(+), 98 deletions(-)

diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index 1b1b96623f..78e6cf668b 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -59,12 +59,6 @@ enum {
     STATE_SETUP_FINISHED,
 };
 
-enum {
-    UNINITIALIZED,  ///< Thread has not been created, AVCodec->close mustn't be called
-    NEEDS_CLOSE,    ///< FFCodec->close needs to be called
-    INITIALIZED,    ///< Thread has been properly set up
-};
-
 typedef struct DecodedFrames {
     AVFrame  **f;
     size_t  nb_f;
@@ -82,7 +76,8 @@ typedef struct PerThreadContext {
     struct FrameThreadContext *parent;
 
     pthread_t      thread;
-    int            thread_init;
+    int            thread_started;
+
     unsigned       pthread_init_cnt;///< Number of successfully initialized mutexes/conditions
     pthread_cond_t input_cond;      ///< Used to wait for a new packet from the main thread.
     pthread_cond_t progress_cond;   ///< Used by child threads to wait for progress to change.
@@ -108,12 +103,6 @@ typedef struct PerThreadContext {
     int hwaccel_serializing;
     int async_serializing;
 
-    // set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used;
-    // cannot check hwaccel caps directly, because
-    // worked threads clear hwaccel state for thread-unsafe hwaccels
-    // after each decode call
-    int hwaccel_threadsafe;
-
     atomic_int debug_threads;       ///< Set if the FF_DEBUG_THREADS option is set.
 
     /// The following two fields have the same semantics as the DecodeContext field
@@ -121,10 +110,32 @@ typedef struct PerThreadContext {
     enum AVPictureType initial_pict_type;
 } PerThreadContext;
 
+typedef struct ChildDecoder {
+    AVCodecContext             *ctx;
+    int                         needs_close;
+
+    // set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used;
+    // cannot check hwaccel caps directly, because
+    // worked threads clear hwaccel state for thread-unsafe hwaccels
+    // after each decode call
+    int hwaccel_threadsafe;
+
+    struct FrameThreadContext  *parent;
+
+    // The worker thread this decoder is currently assigned to.
+    // May change between individual decode calls.
+    PerThreadContext           *thread;
+} ChildDecoder;
+
 /**
  * Context stored in the client AVCodecInternal thread_ctx.
  */
 typedef struct FrameThreadContext {
+    /**
+     * Decoder contexts that get assigned to frame threads.
+     */
+    ChildDecoder *decoders;
+
     PerThreadContext *threads;     ///< The contexts for each thread.
     PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on.
 
@@ -413,8 +424,8 @@ FF_ENABLE_DEPRECATION_WARNINGS
         if (codec->update_thread_context_for_user)
             err = codec->update_thread_context_for_user(dst, src);
     } else {
-        const PerThreadContext *p_src = src->internal->thread_ctx;
-        PerThreadContext       *p_dst = dst->internal->thread_ctx;
+        const ChildDecoder    *cd_src = src->internal->thread_ctx;
+        ChildDecoder          *cd_dst = dst->internal->thread_ctx;
 
         if (codec->update_thread_context) {
             err = codec->update_thread_context(dst, src);
@@ -423,16 +434,16 @@ FF_ENABLE_DEPRECATION_WARNINGS
         }
 
         // reset dst hwaccel state if needed
-        av_assert0(p_dst->hwaccel_threadsafe ||
+        av_assert0(cd_dst->hwaccel_threadsafe ||
                    (!dst->hwaccel && !dst->internal->hwaccel_priv_data));
-        if (p_dst->hwaccel_threadsafe &&
-            (!p_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) {
+        if (cd_dst->hwaccel_threadsafe &&
+            (!cd_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) {
             ff_hwaccel_uninit(dst);
-            p_dst->hwaccel_threadsafe = 0;
+            cd_dst->hwaccel_threadsafe = 0;
         }
 
         // propagate hwaccel state for threadsafe hwaccels
-        if (p_src->hwaccel_threadsafe) {
+        if (cd_src->hwaccel_threadsafe) {
             const FFHWAccel *hwaccel = ffhwaccel(src->hwaccel);
             if (!dst->hwaccel) {
                 if (hwaccel->priv_data_size) {
@@ -455,7 +466,7 @@ FF_ENABLE_DEPRECATION_WARNINGS
                     return err;
                 }
             }
-            p_dst->hwaccel_threadsafe = 1;
+            cd_dst->hwaccel_threadsafe = 1;
         }
     }
 
@@ -503,6 +514,7 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
                          AVPacket *in_pkt)
 {
     FrameThreadContext *fctx = p->parent;
+    ChildDecoder         *cd = p->avctx->internal->thread_ctx;
     PerThreadContext *prev_thread = fctx->prev_thread;
     const AVCodec *codec = p->avctx->codec;
     int ret;
@@ -546,8 +558,8 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
     }
 
     /* transfer the stashed hwaccel state, if any */
-    av_assert0(!p->avctx->hwaccel || p->hwaccel_threadsafe);
-    if (!p->hwaccel_threadsafe) {
+    av_assert0(!p->avctx->hwaccel || cd->hwaccel_threadsafe);
+    if (!cd->hwaccel_threadsafe) {
         FFSWAP(const AVHWAccel*, p->avctx->hwaccel,                     fctx->stash_hwaccel);
         FFSWAP(void*,            p->avctx->hwaccel_context,             fctx->stash_hwaccel_context);
         FFSWAP(void*,            p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv);
@@ -626,6 +638,7 @@ finish:
 
 void ff_thread_report_progress(ThreadFrame *f, int n, int field)
 {
+    ChildDecoder *cd;
     PerThreadContext *p;
     atomic_int *progress = f->progress ? f->progress->progress : NULL;
 
@@ -633,7 +646,8 @@ void ff_thread_report_progress(ThreadFrame *f, int n, int field)
         atomic_load_explicit(&progress[field], memory_order_relaxed) >= n)
         return;
 
-    p = f->owner[field]->internal->thread_ctx;
+    cd = f->owner[field]->internal->thread_ctx;
+    p  = cd->thread;
 
     if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
         av_log(f->owner[field], AV_LOG_DEBUG,
@@ -649,6 +663,7 @@ void ff_thread_report_progress(ThreadFrame *f, int n, int field)
 
 void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
 {
+    ChildDecoder *cd;
     PerThreadContext *p;
     atomic_int *progress = f->progress ? f->progress->progress : NULL;
 
@@ -656,7 +671,8 @@ void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
         atomic_load_explicit(&progress[field], memory_order_acquire) >= n)
         return;
 
-    p = f->owner[field]->internal->thread_ctx;
+    cd = f->owner[field]->internal->thread_ctx;
+    p  = cd->thread;
 
     if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
         av_log(f->owner[field], AV_LOG_DEBUG,
@@ -669,14 +685,16 @@ void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
 }
 
 void ff_thread_finish_setup(AVCodecContext *avctx) {
+    ChildDecoder *cd;
     PerThreadContext *p;
 
     if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return;
 
-    p = avctx->internal->thread_ctx;
+    cd = avctx->internal->thread_ctx;
+    p  = cd->thread;
 
-    p->hwaccel_threadsafe = avctx->hwaccel &&
-                            (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE);
+    cd->hwaccel_threadsafe = avctx->hwaccel &&
+                             (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE);
 
     if (hwaccel_serial(avctx) && !p->hwaccel_serializing) {
         pthread_mutex_lock(&p->parent->hwaccel_mutex);
@@ -716,13 +734,14 @@ void ff_thread_finish_setup(AVCodecContext *avctx) {
 /// Waits for all threads to finish.
 static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count)
 {
-    int i;
-
     async_unlock(fctx);
 
-    for (i = 0; i < thread_count; i++) {
+    for (int i = 0; i < thread_count && fctx->threads; i++) {
         PerThreadContext *p = &fctx->threads[i];
 
+        if (!p->thread_started)
+            break;
+
         if (atomic_load(&p->state) != STATE_INPUT_READY) {
             pthread_mutex_lock(&p->progress_mutex);
             while (atomic_load(&p->state) != STATE_INPUT_READY)
@@ -750,24 +769,35 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
 {
     FrameThreadContext *fctx = avctx->internal->thread_ctx;
     const FFCodec *codec = ffcodec(avctx->codec);
-    int i;
 
     park_frame_worker_threads(fctx, thread_count);
 
-    for (i = 0; i < thread_count; i++) {
+    // clean up per-thread contexts
+    for (int i = 0; i < thread_count && fctx->threads; i++) {
         PerThreadContext *p = &fctx->threads[i];
-        AVCodecContext *ctx = p->avctx;
+
+        if (p->thread_started) {
+            pthread_mutex_lock(&p->mutex);
+            p->die = 1;
+            pthread_cond_signal(&p->input_cond);
+            pthread_mutex_unlock(&p->mutex);
+
+            pthread_join(p->thread, NULL);
+        }
+
+        ff_pthread_free(p, per_thread_offsets);
+        av_packet_free(&p->avpkt);
+        decoded_frames_free(&p->df);
+    }
+    av_freep(&fctx->threads);
+
+    // clean up child decoders
+    for (int i = 0; i < thread_count && fctx->decoders; i++) {
+        ChildDecoder    *cd = &fctx->decoders[i];
+        AVCodecContext *ctx = cd->ctx;
 
         if (ctx->internal) {
-            if (p->thread_init == INITIALIZED) {
-                pthread_mutex_lock(&p->mutex);
-                p->die = 1;
-                pthread_cond_signal(&p->input_cond);
-                pthread_mutex_unlock(&p->mutex);
-
-                pthread_join(p->thread, NULL);
-            }
-            if (codec->close && p->thread_init != UNINITIALIZED)
+            if (codec->close && cd->needs_close)
                 codec->close(ctx);
 
             /* When using a threadsafe hwaccel, this is where
@@ -790,18 +820,13 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
                                     &ctx->nb_decoded_side_data);
         }
 
-        decoded_frames_free(&p->df);
-
-        ff_pthread_free(p, per_thread_offsets);
-        av_packet_free(&p->avpkt);
-
-        av_freep(&p->avctx);
+        av_freep(&cd->ctx);
     }
+    av_freep(&fctx->decoders);
 
     decoded_frames_free(&fctx->df);
     av_packet_free(&fctx->next_pkt);
 
-    av_freep(&fctx->threads);
     ff_pthread_free(fctx, thread_ctx_offsets);
 
     /* if we have stashed hwaccel state, move it to the user-facing context,
@@ -814,22 +839,13 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
     av_freep(&avctx->internal->thread_ctx);
 }
 
-static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
-                               FrameThreadContext *fctx, AVCodecContext *avctx,
-                               const FFCodec *codec, int first)
+static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free,
+                                AVCodecContext *avctx,
+                                const FFCodec *codec, int first)
 {
     AVCodecContext *copy;
     int err;
 
-    p->initial_pict_type = AV_PICTURE_TYPE_NONE;
-    if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) {
-        p->intra_only_flag = AV_FRAME_FLAG_KEY;
-        if (avctx->codec_type == AVMEDIA_TYPE_VIDEO)
-            p->initial_pict_type = AV_PICTURE_TYPE_I;
-    }
-
-    atomic_init(&p->state, STATE_INPUT_READY);
-
     copy = av_memdup(avctx, sizeof(*avctx));
     if (!copy)
         return AVERROR(ENOMEM);
@@ -837,19 +853,18 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
     copy->decoded_side_data = NULL;
     copy->nb_decoded_side_data = 0;
 
-    /* From now on, this PerThreadContext will be cleaned up by
+    /* From now on, this ChildDecoder will be cleaned up by
      * ff_frame_thread_free in case of errors. */
-    (*threads_to_free)++;
+    (*decoders_to_free)++;
 
-    p->parent = fctx;
-    p->avctx  = copy;
+    cd->ctx = copy;
 
     copy->internal = ff_decode_internal_alloc();
     if (!copy->internal)
         return AVERROR(ENOMEM);
     ff_decode_internal_sync(copy, avctx);
-    copy->internal->thread_ctx = p;
     copy->internal->progress_frame_pool = avctx->internal->progress_frame_pool;
+    copy->internal->thread_ctx = cd;
 
     copy->delay = avctx->delay;
 
@@ -866,13 +881,6 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
         }
     }
 
-    err = ff_pthread_init(p, per_thread_offsets);
-    if (err < 0)
-        return err;
-
-    if (!(p->avpkt = av_packet_alloc()))
-        return AVERROR(ENOMEM);
-
     copy->internal->is_frame_mt = 1;
     if (!first)
         copy->internal->is_copy = 1;
@@ -889,11 +897,11 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
         err = codec->init(copy);
         if (err < 0) {
             if (codec->caps_internal & FF_CODEC_CAP_INIT_CLEANUP)
-                p->thread_init = NEEDS_CLOSE;
+                cd->needs_close = 1;
             return err;
         }
     }
-    p->thread_init = NEEDS_CLOSE;
+    cd->needs_close = 1;
 
     if (first) {
         update_context_from_thread(avctx, copy, 1);
@@ -908,12 +916,40 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
         }
     }
 
-    atomic_init(&p->debug_threads, (copy->debug & FF_DEBUG_THREADS) != 0);
+    return 0;
+}
 
-    err = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p));
-    if (err < 0)
-        return err;
-    p->thread_init = INITIALIZED;
+static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx,
+                               AVCodecContext *avctx, AVCodecContext *child)
+{
+    int ret = 0;
+
+    p->initial_pict_type = AV_PICTURE_TYPE_NONE;
+    if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) {
+        p->intra_only_flag = AV_FRAME_FLAG_KEY;
+        if (avctx->codec_type == AVMEDIA_TYPE_VIDEO)
+            p->initial_pict_type = AV_PICTURE_TYPE_I;
+    }
+
+    atomic_init(&p->state, STATE_INPUT_READY);
+
+    p->parent = fctx;
+    p->avctx  = child;
+
+    ret = ff_pthread_init(p, per_thread_offsets);
+    if (ret < 0)
+        return ret;
+
+    p->avpkt = av_packet_alloc();
+    if (!p->avpkt)
+        return AVERROR(ENOMEM);
+
+    atomic_init(&p->debug_threads, (child->debug & FF_DEBUG_THREADS) != 0);
+
+    ret = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p));
+    if (ret < 0)
+        return ret;
+    p->thread_started = 1;
 
     return 0;
 }
@@ -923,7 +959,7 @@ int ff_frame_thread_init(AVCodecContext *avctx)
     int thread_count = avctx->thread_count;
     const FFCodec *codec = ffcodec(avctx->codec);
     FrameThreadContext *fctx;
-    int err, i = 0;
+    int err, decoders_to_free = 0;
 
     if (!thread_count) {
         int nb_cpus = av_cpu_count();
@@ -959,17 +995,35 @@ int ff_frame_thread_init(AVCodecContext *avctx)
     if (codec->p.type == AVMEDIA_TYPE_VIDEO)
         avctx->delay = avctx->thread_count - 1;
 
+    fctx->decoders = av_calloc(thread_count, sizeof(*fctx->decoders));
+    if (!fctx->decoders) {
+        err = AVERROR(ENOMEM);
+        goto error;
+    }
+
+    for (; decoders_to_free < thread_count; ) {
+        ChildDecoder *cd = &fctx->decoders[decoders_to_free];
+        int first = !decoders_to_free;
+
+        cd->parent = fctx;
+
+        err = dec_ctx_init(cd, &decoders_to_free, avctx, codec, first);
+        if (err < 0)
+            goto error;
+    }
+
     fctx->threads = av_calloc(thread_count, sizeof(*fctx->threads));
     if (!fctx->threads) {
         err = AVERROR(ENOMEM);
         goto error;
     }
 
-    for (; i < thread_count; ) {
-        PerThreadContext *p  = &fctx->threads[i];
-        int first = !i;
+    for (int i = 0; i < thread_count; i++) {
+        PerThreadContext *p = &fctx->threads[i];
 
-        err = init_thread(p, &i, fctx, avctx, codec, first);
+        fctx->decoders[i].thread = p;
+
+        err = init_thread(p, fctx, avctx, fctx->decoders[i].ctx);
         if (err < 0)
             goto error;
     }
@@ -977,13 +1031,12 @@ int ff_frame_thread_init(AVCodecContext *avctx)
     return 0;
 
 error:
-    ff_frame_thread_free(avctx, i);
+    ff_frame_thread_free(avctx, decoders_to_free);
     return err;
 }
 
 void ff_thread_flush(AVCodecContext *avctx)
 {
-    int i;
     FrameThreadContext *fctx = avctx->internal->thread_ctx;
 
     if (!fctx) return;
@@ -1000,23 +1053,24 @@ void ff_thread_flush(AVCodecContext *avctx)
     decoded_frames_flush(&fctx->df);
     fctx->result = 0;
 
-    for (i = 0; i < avctx->thread_count; i++) {
+    for (int i = 0; i < avctx->thread_count; i++) {
         PerThreadContext *p = &fctx->threads[i];
 
         decoded_frames_flush(&p->df);
         p->result = 0;
-
-        avcodec_flush_buffers(p->avctx);
     }
+
+    for (int i = 0; i < avctx->thread_count; i++)
+        avcodec_flush_buffers(fctx->decoders[i].ctx);
 }
 
 int ff_thread_can_start_frame(AVCodecContext *avctx)
 {
     if ((avctx->active_thread_type & FF_THREAD_FRAME) &&
         ffcodec(avctx->codec)->update_thread_context) {
-        PerThreadContext *p = avctx->internal->thread_ctx;
+        ChildDecoder *cd = avctx->internal->thread_ctx;
 
-        if (atomic_load(&p->state) != STATE_SETTING_UP)
+        if (atomic_load(&cd->thread->state) != STATE_SETTING_UP)
             return 0;
     }
 
@@ -1025,13 +1079,15 @@ int ff_thread_can_start_frame(AVCodecContext *avctx)
 
 static int thread_get_buffer_internal(AVCodecContext *avctx, AVFrame *f, int flags)
 {
+    ChildDecoder *cd;
     PerThreadContext *p;
     int err;
 
     if (!(avctx->active_thread_type & FF_THREAD_FRAME))
         return ff_get_buffer(avctx, f, flags);
 
-    p = avctx->internal->thread_ctx;
+    cd = avctx->internal->thread_ctx;
+    p  = cd->thread;
     if (atomic_load(&p->state) != STATE_SETTING_UP &&
         ffcodec(avctx->codec)->update_thread_context) {
         av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n");
@@ -1085,18 +1141,18 @@ void ff_thread_release_ext_buffer(ThreadFrame *f)
 
 enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset)
 {
-    PerThreadContext *p;
+    ChildDecoder *cd;
     const void *ref;
 
     if (!avctx->internal->is_copy)
         return avctx->active_thread_type & FF_THREAD_FRAME ?
                   FF_THREAD_IS_FIRST_THREAD : FF_THREAD_NO_FRAME_THREADING;
 
-    p = avctx->internal->thread_ctx;
+    cd = avctx->internal->thread_ctx;
 
     av_assert1(memcpy(&ref, (char*)avctx->priv_data + offset, sizeof(ref)) && ref == NULL);
 
-    memcpy(&ref, (const char*)p->parent->threads[0].avctx->priv_data + offset, sizeof(ref));
+    memcpy(&ref, (const char*)cd->parent->decoders[0].ctx->priv_data + offset, sizeof(ref));
     av_assert1(ref);
     ff_refstruct_replace((char*)avctx->priv_data + offset, ref);
 
@@ -1105,7 +1161,8 @@ enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset)
 
 int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt)
 {
-    PerThreadContext *p = avctx->internal->thread_ctx;
+    ChildDecoder *cd = avctx->internal->thread_ctx;
+    PerThreadContext *p = cd->thread;
 
     if (!AVPACKET_IS_EMPTY(p->avpkt)) {
         av_packet_move_ref(pkt, p->avpkt);
-- 
2.43.0



More information about the ffmpeg-devel mailing list