[FFmpeg-devel] [PATCH 1/3] lavc/pthread_frame: separate child decoders from thread state
Anton Khirnov
anton at khirnov.net
Wed Nov 13 15:06:56 EET 2024
Should have no functional effect on its own, but will be useful in
following commits.
---
libavcodec/pthread_frame.c | 253 +++++++++++++++++++++++--------------
1 file changed, 155 insertions(+), 98 deletions(-)
diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index 1b1b96623f..78e6cf668b 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -59,12 +59,6 @@ enum {
STATE_SETUP_FINISHED,
};
-enum {
- UNINITIALIZED, ///< Thread has not been created, AVCodec->close mustn't be called
- NEEDS_CLOSE, ///< FFCodec->close needs to be called
- INITIALIZED, ///< Thread has been properly set up
-};
-
typedef struct DecodedFrames {
AVFrame **f;
size_t nb_f;
@@ -82,7 +76,8 @@ typedef struct PerThreadContext {
struct FrameThreadContext *parent;
pthread_t thread;
- int thread_init;
+ int thread_started;
+
unsigned pthread_init_cnt;///< Number of successfully initialized mutexes/conditions
pthread_cond_t input_cond; ///< Used to wait for a new packet from the main thread.
pthread_cond_t progress_cond; ///< Used by child threads to wait for progress to change.
@@ -108,12 +103,6 @@ typedef struct PerThreadContext {
int hwaccel_serializing;
int async_serializing;
- // set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used;
- // cannot check hwaccel caps directly, because
- // worked threads clear hwaccel state for thread-unsafe hwaccels
- // after each decode call
- int hwaccel_threadsafe;
-
atomic_int debug_threads; ///< Set if the FF_DEBUG_THREADS option is set.
/// The following two fields have the same semantics as the DecodeContext field
@@ -121,10 +110,32 @@ typedef struct PerThreadContext {
enum AVPictureType initial_pict_type;
} PerThreadContext;
+typedef struct ChildDecoder {
+ AVCodecContext *ctx;
+ int needs_close;
+
+ // set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used;
+ // cannot check hwaccel caps directly, because
+ // worked threads clear hwaccel state for thread-unsafe hwaccels
+ // after each decode call
+ int hwaccel_threadsafe;
+
+ struct FrameThreadContext *parent;
+
+ // The worker thread this decoder is currently assigned to.
+ // May change between individual decode calls.
+ PerThreadContext *thread;
+} ChildDecoder;
+
/**
* Context stored in the client AVCodecInternal thread_ctx.
*/
typedef struct FrameThreadContext {
+ /**
+ * Decoder contexts that get assigned to frame threads.
+ */
+ ChildDecoder *decoders;
+
PerThreadContext *threads; ///< The contexts for each thread.
PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on.
@@ -413,8 +424,8 @@ FF_ENABLE_DEPRECATION_WARNINGS
if (codec->update_thread_context_for_user)
err = codec->update_thread_context_for_user(dst, src);
} else {
- const PerThreadContext *p_src = src->internal->thread_ctx;
- PerThreadContext *p_dst = dst->internal->thread_ctx;
+ const ChildDecoder *cd_src = src->internal->thread_ctx;
+ ChildDecoder *cd_dst = dst->internal->thread_ctx;
if (codec->update_thread_context) {
err = codec->update_thread_context(dst, src);
@@ -423,16 +434,16 @@ FF_ENABLE_DEPRECATION_WARNINGS
}
// reset dst hwaccel state if needed
- av_assert0(p_dst->hwaccel_threadsafe ||
+ av_assert0(cd_dst->hwaccel_threadsafe ||
(!dst->hwaccel && !dst->internal->hwaccel_priv_data));
- if (p_dst->hwaccel_threadsafe &&
- (!p_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) {
+ if (cd_dst->hwaccel_threadsafe &&
+ (!cd_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) {
ff_hwaccel_uninit(dst);
- p_dst->hwaccel_threadsafe = 0;
+ cd_dst->hwaccel_threadsafe = 0;
}
// propagate hwaccel state for threadsafe hwaccels
- if (p_src->hwaccel_threadsafe) {
+ if (cd_src->hwaccel_threadsafe) {
const FFHWAccel *hwaccel = ffhwaccel(src->hwaccel);
if (!dst->hwaccel) {
if (hwaccel->priv_data_size) {
@@ -455,7 +466,7 @@ FF_ENABLE_DEPRECATION_WARNINGS
return err;
}
}
- p_dst->hwaccel_threadsafe = 1;
+ cd_dst->hwaccel_threadsafe = 1;
}
}
@@ -503,6 +514,7 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
AVPacket *in_pkt)
{
FrameThreadContext *fctx = p->parent;
+ ChildDecoder *cd = p->avctx->internal->thread_ctx;
PerThreadContext *prev_thread = fctx->prev_thread;
const AVCodec *codec = p->avctx->codec;
int ret;
@@ -546,8 +558,8 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
}
/* transfer the stashed hwaccel state, if any */
- av_assert0(!p->avctx->hwaccel || p->hwaccel_threadsafe);
- if (!p->hwaccel_threadsafe) {
+ av_assert0(!p->avctx->hwaccel || cd->hwaccel_threadsafe);
+ if (!cd->hwaccel_threadsafe) {
FFSWAP(const AVHWAccel*, p->avctx->hwaccel, fctx->stash_hwaccel);
FFSWAP(void*, p->avctx->hwaccel_context, fctx->stash_hwaccel_context);
FFSWAP(void*, p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv);
@@ -626,6 +638,7 @@ finish:
void ff_thread_report_progress(ThreadFrame *f, int n, int field)
{
+ ChildDecoder *cd;
PerThreadContext *p;
atomic_int *progress = f->progress ? f->progress->progress : NULL;
@@ -633,7 +646,8 @@ void ff_thread_report_progress(ThreadFrame *f, int n, int field)
atomic_load_explicit(&progress[field], memory_order_relaxed) >= n)
return;
- p = f->owner[field]->internal->thread_ctx;
+ cd = f->owner[field]->internal->thread_ctx;
+ p = cd->thread;
if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
av_log(f->owner[field], AV_LOG_DEBUG,
@@ -649,6 +663,7 @@ void ff_thread_report_progress(ThreadFrame *f, int n, int field)
void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
{
+ ChildDecoder *cd;
PerThreadContext *p;
atomic_int *progress = f->progress ? f->progress->progress : NULL;
@@ -656,7 +671,8 @@ void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
atomic_load_explicit(&progress[field], memory_order_acquire) >= n)
return;
- p = f->owner[field]->internal->thread_ctx;
+ cd = f->owner[field]->internal->thread_ctx;
+ p = cd->thread;
if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed))
av_log(f->owner[field], AV_LOG_DEBUG,
@@ -669,14 +685,16 @@ void ff_thread_await_progress(const ThreadFrame *f, int n, int field)
}
void ff_thread_finish_setup(AVCodecContext *avctx) {
+ ChildDecoder *cd;
PerThreadContext *p;
if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return;
- p = avctx->internal->thread_ctx;
+ cd = avctx->internal->thread_ctx;
+ p = cd->thread;
- p->hwaccel_threadsafe = avctx->hwaccel &&
- (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE);
+ cd->hwaccel_threadsafe = avctx->hwaccel &&
+ (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE);
if (hwaccel_serial(avctx) && !p->hwaccel_serializing) {
pthread_mutex_lock(&p->parent->hwaccel_mutex);
@@ -716,13 +734,14 @@ void ff_thread_finish_setup(AVCodecContext *avctx) {
/// Waits for all threads to finish.
static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count)
{
- int i;
-
async_unlock(fctx);
- for (i = 0; i < thread_count; i++) {
+ for (int i = 0; i < thread_count && fctx->threads; i++) {
PerThreadContext *p = &fctx->threads[i];
+ if (!p->thread_started)
+ break;
+
if (atomic_load(&p->state) != STATE_INPUT_READY) {
pthread_mutex_lock(&p->progress_mutex);
while (atomic_load(&p->state) != STATE_INPUT_READY)
@@ -750,24 +769,35 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
{
FrameThreadContext *fctx = avctx->internal->thread_ctx;
const FFCodec *codec = ffcodec(avctx->codec);
- int i;
park_frame_worker_threads(fctx, thread_count);
- for (i = 0; i < thread_count; i++) {
+ // clean up per-thread contexts
+ for (int i = 0; i < thread_count && fctx->threads; i++) {
PerThreadContext *p = &fctx->threads[i];
- AVCodecContext *ctx = p->avctx;
+
+ if (p->thread_started) {
+ pthread_mutex_lock(&p->mutex);
+ p->die = 1;
+ pthread_cond_signal(&p->input_cond);
+ pthread_mutex_unlock(&p->mutex);
+
+ pthread_join(p->thread, NULL);
+ }
+
+ ff_pthread_free(p, per_thread_offsets);
+ av_packet_free(&p->avpkt);
+ decoded_frames_free(&p->df);
+ }
+ av_freep(&fctx->threads);
+
+ // clean up child decoders
+ for (int i = 0; i < thread_count && fctx->decoders; i++) {
+ ChildDecoder *cd = &fctx->decoders[i];
+ AVCodecContext *ctx = cd->ctx;
if (ctx->internal) {
- if (p->thread_init == INITIALIZED) {
- pthread_mutex_lock(&p->mutex);
- p->die = 1;
- pthread_cond_signal(&p->input_cond);
- pthread_mutex_unlock(&p->mutex);
-
- pthread_join(p->thread, NULL);
- }
- if (codec->close && p->thread_init != UNINITIALIZED)
+ if (codec->close && cd->needs_close)
codec->close(ctx);
/* When using a threadsafe hwaccel, this is where
@@ -790,18 +820,13 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
&ctx->nb_decoded_side_data);
}
- decoded_frames_free(&p->df);
-
- ff_pthread_free(p, per_thread_offsets);
- av_packet_free(&p->avpkt);
-
- av_freep(&p->avctx);
+ av_freep(&cd->ctx);
}
+ av_freep(&fctx->decoders);
decoded_frames_free(&fctx->df);
av_packet_free(&fctx->next_pkt);
- av_freep(&fctx->threads);
ff_pthread_free(fctx, thread_ctx_offsets);
/* if we have stashed hwaccel state, move it to the user-facing context,
@@ -814,22 +839,13 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
av_freep(&avctx->internal->thread_ctx);
}
-static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
- FrameThreadContext *fctx, AVCodecContext *avctx,
- const FFCodec *codec, int first)
+static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free,
+ AVCodecContext *avctx,
+ const FFCodec *codec, int first)
{
AVCodecContext *copy;
int err;
- p->initial_pict_type = AV_PICTURE_TYPE_NONE;
- if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) {
- p->intra_only_flag = AV_FRAME_FLAG_KEY;
- if (avctx->codec_type == AVMEDIA_TYPE_VIDEO)
- p->initial_pict_type = AV_PICTURE_TYPE_I;
- }
-
- atomic_init(&p->state, STATE_INPUT_READY);
-
copy = av_memdup(avctx, sizeof(*avctx));
if (!copy)
return AVERROR(ENOMEM);
@@ -837,19 +853,18 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
copy->decoded_side_data = NULL;
copy->nb_decoded_side_data = 0;
- /* From now on, this PerThreadContext will be cleaned up by
+ /* From now on, this ChildDecoder will be cleaned up by
* ff_frame_thread_free in case of errors. */
- (*threads_to_free)++;
+ (*decoders_to_free)++;
- p->parent = fctx;
- p->avctx = copy;
+ cd->ctx = copy;
copy->internal = ff_decode_internal_alloc();
if (!copy->internal)
return AVERROR(ENOMEM);
ff_decode_internal_sync(copy, avctx);
- copy->internal->thread_ctx = p;
copy->internal->progress_frame_pool = avctx->internal->progress_frame_pool;
+ copy->internal->thread_ctx = cd;
copy->delay = avctx->delay;
@@ -866,13 +881,6 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
}
}
- err = ff_pthread_init(p, per_thread_offsets);
- if (err < 0)
- return err;
-
- if (!(p->avpkt = av_packet_alloc()))
- return AVERROR(ENOMEM);
-
copy->internal->is_frame_mt = 1;
if (!first)
copy->internal->is_copy = 1;
@@ -889,11 +897,11 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
err = codec->init(copy);
if (err < 0) {
if (codec->caps_internal & FF_CODEC_CAP_INIT_CLEANUP)
- p->thread_init = NEEDS_CLOSE;
+ cd->needs_close = 1;
return err;
}
}
- p->thread_init = NEEDS_CLOSE;
+ cd->needs_close = 1;
if (first) {
update_context_from_thread(avctx, copy, 1);
@@ -908,12 +916,40 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
}
}
- atomic_init(&p->debug_threads, (copy->debug & FF_DEBUG_THREADS) != 0);
+ return 0;
+}
- err = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p));
- if (err < 0)
- return err;
- p->thread_init = INITIALIZED;
+static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx,
+ AVCodecContext *avctx, AVCodecContext *child)
+{
+ int ret = 0;
+
+ p->initial_pict_type = AV_PICTURE_TYPE_NONE;
+ if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) {
+ p->intra_only_flag = AV_FRAME_FLAG_KEY;
+ if (avctx->codec_type == AVMEDIA_TYPE_VIDEO)
+ p->initial_pict_type = AV_PICTURE_TYPE_I;
+ }
+
+ atomic_init(&p->state, STATE_INPUT_READY);
+
+ p->parent = fctx;
+ p->avctx = child;
+
+ ret = ff_pthread_init(p, per_thread_offsets);
+ if (ret < 0)
+ return ret;
+
+ p->avpkt = av_packet_alloc();
+ if (!p->avpkt)
+ return AVERROR(ENOMEM);
+
+ atomic_init(&p->debug_threads, (child->debug & FF_DEBUG_THREADS) != 0);
+
+ ret = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p));
+ if (ret < 0)
+ return ret;
+ p->thread_started = 1;
return 0;
}
@@ -923,7 +959,7 @@ int ff_frame_thread_init(AVCodecContext *avctx)
int thread_count = avctx->thread_count;
const FFCodec *codec = ffcodec(avctx->codec);
FrameThreadContext *fctx;
- int err, i = 0;
+ int err, decoders_to_free = 0;
if (!thread_count) {
int nb_cpus = av_cpu_count();
@@ -959,17 +995,35 @@ int ff_frame_thread_init(AVCodecContext *avctx)
if (codec->p.type == AVMEDIA_TYPE_VIDEO)
avctx->delay = avctx->thread_count - 1;
+ fctx->decoders = av_calloc(thread_count, sizeof(*fctx->decoders));
+ if (!fctx->decoders) {
+ err = AVERROR(ENOMEM);
+ goto error;
+ }
+
+ for (; decoders_to_free < thread_count; ) {
+ ChildDecoder *cd = &fctx->decoders[decoders_to_free];
+ int first = !decoders_to_free;
+
+ cd->parent = fctx;
+
+ err = dec_ctx_init(cd, &decoders_to_free, avctx, codec, first);
+ if (err < 0)
+ goto error;
+ }
+
fctx->threads = av_calloc(thread_count, sizeof(*fctx->threads));
if (!fctx->threads) {
err = AVERROR(ENOMEM);
goto error;
}
- for (; i < thread_count; ) {
- PerThreadContext *p = &fctx->threads[i];
- int first = !i;
+ for (int i = 0; i < thread_count; i++) {
+ PerThreadContext *p = &fctx->threads[i];
- err = init_thread(p, &i, fctx, avctx, codec, first);
+ fctx->decoders[i].thread = p;
+
+ err = init_thread(p, fctx, avctx, fctx->decoders[i].ctx);
if (err < 0)
goto error;
}
@@ -977,13 +1031,12 @@ int ff_frame_thread_init(AVCodecContext *avctx)
return 0;
error:
- ff_frame_thread_free(avctx, i);
+ ff_frame_thread_free(avctx, decoders_to_free);
return err;
}
void ff_thread_flush(AVCodecContext *avctx)
{
- int i;
FrameThreadContext *fctx = avctx->internal->thread_ctx;
if (!fctx) return;
@@ -1000,23 +1053,24 @@ void ff_thread_flush(AVCodecContext *avctx)
decoded_frames_flush(&fctx->df);
fctx->result = 0;
- for (i = 0; i < avctx->thread_count; i++) {
+ for (int i = 0; i < avctx->thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
decoded_frames_flush(&p->df);
p->result = 0;
-
- avcodec_flush_buffers(p->avctx);
}
+
+ for (int i = 0; i < avctx->thread_count; i++)
+ avcodec_flush_buffers(fctx->decoders[i].ctx);
}
int ff_thread_can_start_frame(AVCodecContext *avctx)
{
if ((avctx->active_thread_type & FF_THREAD_FRAME) &&
ffcodec(avctx->codec)->update_thread_context) {
- PerThreadContext *p = avctx->internal->thread_ctx;
+ ChildDecoder *cd = avctx->internal->thread_ctx;
- if (atomic_load(&p->state) != STATE_SETTING_UP)
+ if (atomic_load(&cd->thread->state) != STATE_SETTING_UP)
return 0;
}
@@ -1025,13 +1079,15 @@ int ff_thread_can_start_frame(AVCodecContext *avctx)
static int thread_get_buffer_internal(AVCodecContext *avctx, AVFrame *f, int flags)
{
+ ChildDecoder *cd;
PerThreadContext *p;
int err;
if (!(avctx->active_thread_type & FF_THREAD_FRAME))
return ff_get_buffer(avctx, f, flags);
- p = avctx->internal->thread_ctx;
+ cd = avctx->internal->thread_ctx;
+ p = cd->thread;
if (atomic_load(&p->state) != STATE_SETTING_UP &&
ffcodec(avctx->codec)->update_thread_context) {
av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n");
@@ -1085,18 +1141,18 @@ void ff_thread_release_ext_buffer(ThreadFrame *f)
enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset)
{
- PerThreadContext *p;
+ ChildDecoder *cd;
const void *ref;
if (!avctx->internal->is_copy)
return avctx->active_thread_type & FF_THREAD_FRAME ?
FF_THREAD_IS_FIRST_THREAD : FF_THREAD_NO_FRAME_THREADING;
- p = avctx->internal->thread_ctx;
+ cd = avctx->internal->thread_ctx;
av_assert1(memcpy(&ref, (char*)avctx->priv_data + offset, sizeof(ref)) && ref == NULL);
- memcpy(&ref, (const char*)p->parent->threads[0].avctx->priv_data + offset, sizeof(ref));
+ memcpy(&ref, (const char*)cd->parent->decoders[0].ctx->priv_data + offset, sizeof(ref));
av_assert1(ref);
ff_refstruct_replace((char*)avctx->priv_data + offset, ref);
@@ -1105,7 +1161,8 @@ enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset)
int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt)
{
- PerThreadContext *p = avctx->internal->thread_ctx;
+ ChildDecoder *cd = avctx->internal->thread_ctx;
+ PerThreadContext *p = cd->thread;
if (!AVPACKET_IS_EMPTY(p->avpkt)) {
av_packet_move_ref(pkt, p->avpkt);
--
2.43.0
More information about the ffmpeg-devel
mailing list