[FFmpeg-devel] [PATCH 01/14] vvcdec: add thread executor

Nuo Mi nuomi2021 at gmail.com
Sun May 21 17:24:56 EEST 2023


Hi Rémi,
Thank you for the cmments

On Sun, May 21, 2023 at 9:15 PM Rémi Denis-Courmont <remi at remlab.net> wrote:

> Le sunnuntaina 21. toukokuuta 2023, 16.03.06 EEST Nuo Mi a écrit :
> > The executor design pattern was inroduced by java
> > <
> https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/util/conc
> > urrent/Executor.html> it also adapted by python
> > <https://docs.python.org/3/library/concurrent.futures.html>
> > Compared to handcrafted thread pool management, it greatly simplifies the
> > thread code. ---
> >  libavcodec/Makefile           |   1 +
> >  libavcodec/vvc/Makefile       |   4 +
> >  libavcodec/vvc/vvc_executor.c | 193 ++++++++++++++++++++++++++++++++++
> >  libavcodec/vvc/vvc_executor.h |  73 +++++++++++++
> >  4 files changed, 271 insertions(+)
> >  create mode 100644 libavcodec/vvc/Makefile
> >  create mode 100644 libavcodec/vvc/vvc_executor.c
> >  create mode 100644 libavcodec/vvc/vvc_executor.h
> >
> > diff --git a/libavcodec/Makefile b/libavcodec/Makefile
> > index dab09f483a..b1fcbf71b2 100644
> > --- a/libavcodec/Makefile
> > +++ b/libavcodec/Makefile
> > @@ -62,6 +62,7 @@ OBJS = ac3_parser.o
>
> >              \ xiph.o
>
> >     \
> >
> >  # subsystems
> > +include $(SRC_PATH)/libavcodec/vvc/Makefile
> >  OBJS-$(CONFIG_AANDCTTABLES)            += aandcttab.o
> >  OBJS-$(CONFIG_AC3DSP)                  += ac3dsp.o ac3.o ac3tab.o
> >  OBJS-$(CONFIG_ADTS_HEADER)             += adts_header.o
> > mpeg4audio_sample_rates.o diff --git a/libavcodec/vvc/Makefile
> > b/libavcodec/vvc/Makefile
> > new file mode 100644
> > index 0000000000..c4b93e0389
> > --- /dev/null
> > +++ b/libavcodec/vvc/Makefile
> > @@ -0,0 +1,4 @@
> > +clean::
> > +     $(RM) $(CLEANSUFFIXES:%=libavcodec/vvc/%)
> > +
> > +OBJS-$(CONFIG_VVC_DECODER)          +=  vvc/vvc_executor.o
> > diff --git a/libavcodec/vvc/vvc_executor.c
> b/libavcodec/vvc/vvc_executor.c
> > new file mode 100644
> > index 0000000000..f2afdf79ae
> > --- /dev/null
> > +++ b/libavcodec/vvc/vvc_executor.c
> > @@ -0,0 +1,193 @@
> > +/*
> > + * VVC video Decoder
> > + *
> > + * Copyright (C) 2022 Nuo Mi
> > + *
> > + * This file is part of FFmpeg.
> > + *
> > + * FFmpeg is free software; you can redistribute it and/or
> > + * modify it under the terms of the GNU Lesser General Public
> > + * License as published by the Free Software Foundation; either
> > + * version 2.1 of the License, or (at your option) any later version.
> > + *
> > + * FFmpeg is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> > + * Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public
> > + * License along with FFmpeg; if not, write to the Free Software
> > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
> 02110-1301
> > USA + */
> > +#include "libavutil/avutil.h"
> > +#include "libavutil/thread.h"
> > +
> > +#include "vvc_executor.h"
>
> This does not seem specific to VVC in any way, so the naming (and choice
> of
> folder placement) is rather weird.
>
> > +
> > +typedef struct ThreadInfo {
> > +    int idx;
> > +    VVCExecutor *e;
> > +    pthread_t thread;
> > +} ThreadInfo;
> > +
> > +struct VVCExecutor {
> > +    VVCTaskCallbacks cb;
> > +    ThreadInfo *threads;
> > +    uint8_t *local_contexts;
>
> It seems odd and needless complex to separate this from the thread info.
> It
> looks like you could simply append a pointer or a flexible array to
> ThreadInfo
> instead.
>
 Do you mean in VVCTasklet? No, we do not want to expose ThreadInfo details
to users.


> > +    int thread_count;
> > +
> > +    pthread_mutex_t lock;
> > +    pthread_cond_t cond;
> > +    int die;
> > +    VVCTasklet *tasks;
> > +};
> > +
> > +static void remove_task(VVCTasklet **prev, VVCTasklet *t)
> > +{
> > +    *prev  = t->next;
> > +    t->next = NULL;
> > +}
> > +
> > +static void add_task(VVCTasklet **prev, VVCTasklet *t)
> > +{
> > +    t->next = *prev;
> > +    *prev   = t;
> > +}
> > +
> > +static void *executor_worker_task(void *data)
> > +{
> > +    ThreadInfo *ti = (ThreadInfo*)data;
> > +    VVCExecutor *e = ti->e;
> > +    void *lc       = e->local_contexts + ti->idx *
> > e->cb.local_context_size;
> > +    VVCTasklet **prev;
> > +    VVCTaskCallbacks *cb = &e->cb;
> > +
> > +    pthread_mutex_lock(&e->lock);
> > +    while (1) {
> > +        VVCTasklet* t = NULL;
> > +        if (e->die) break;
> > +
> > +        for (prev = &e->tasks; *prev; prev = &(*prev)->next) {
> > +            if (cb->ready(*prev, cb->user_data)) {
> > +                t = *prev;
> > +                break;
> > +            }
> > +        }
> > +        if (t) {
> > +            //found one task
> > +            remove_task(prev, t);
> > +            pthread_mutex_unlock(&e->lock);
> > +            cb->run(t, lc, cb->user_data);
> > +            pthread_mutex_lock(&e->lock);
> > +        } else {
> > +            //no task in one loop
> > +            pthread_cond_wait(&e->cond, &e->lock);
> > +        }
> > +    }
> > +    pthread_mutex_unlock(&e->lock);
> > +    return NULL;
> > +}
> > +
> > +VVCExecutor* ff_vvc_executor_alloc(const VVCTaskCallbacks *cb, int
> > thread_count) +{
> > +    VVCExecutor *e;
> > +    int i, j, ret;
> > +    if (!cb || !cb->user_data || !cb->ready || !cb->run ||
> > !cb->priority_higher) +        return NULL;
> > +    e = av_calloc(1, sizeof(*e));
> > +    if (!e)
> > +        return NULL;
> > +    e->cb = *cb;
> > +
> > +    e->local_contexts = av_malloc(thread_count *
> e->cb.local_context_size);
> > +    if (!e->local_contexts)
> > +        goto free_executor;
> > +
> > +    e->threads = av_calloc(thread_count, sizeof(*e->threads));
> > +    if (!e->threads)
> > +        goto free_contexts;
> > +    for (i = 0; i < thread_count; i++) {
> > +        ThreadInfo *ti = e->threads + i;
> > +        ti->e = e;
> > +        ti->idx = i;
> > +    }
> > +
> > +    ret = pthread_mutex_init(&e->lock, NULL);
> > +    if (ret)
> > +        goto free_threads;
> > +
> > +    ret = pthread_cond_init(&e->cond, NULL);
> > +    if (ret)
> > +        goto destroy_lock;
> > +
> > +    for (i = 0; i < thread_count; i++) {
> > +        ThreadInfo *ti = e->threads + i;
> > +        ret = pthread_create(&ti->thread, NULL, executor_worker_task,
> ti);
> > +        if (ret)
> > +            goto join_threads;
> > +    }
> > +    e->thread_count = thread_count;
> > +    return e;
> > +
> > +join_threads:
> > +    pthread_mutex_lock(&e->lock);
> > +    e->die = 1;
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +    for (j = 0; j < i; j++)
> > +        pthread_join(e->threads[j].thread, NULL);
> > +    pthread_cond_destroy(&e->cond);
> > +destroy_lock:
> > +    pthread_mutex_destroy(&e->lock);
> > +free_threads:
> > +    av_free(e->threads);
> > +free_contexts:
> > +    av_free(e->local_contexts);
> > +free_executor:
> > +    free(e);
> > +    return NULL;
> > +}
> > +
> > +void ff_vvc_executor_free(VVCExecutor **executor)
> > +{
> > +    VVCExecutor *e;
> > +    if (!executor || !*executor)
> > +        return;
> > +    e = *executor;
> > +
> > +    //singal die
> > +    pthread_mutex_lock(&e->lock);
> > +    e->die = 1;
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +
> > +    for (int i = 0; i < e->thread_count; i++)
> > +        pthread_join(e->threads[i].thread, NULL);
> > +    pthread_cond_destroy(&e->cond);
> > +    pthread_mutex_destroy(&e->lock);
> > +
> > +    av_free(e->threads);
> > +    av_free(e->local_contexts);
> > +
> > +    av_freep(executor);
> > +}
> > +
> > +void ff_vvc_executor_execute(VVCExecutor *e, VVCTasklet *t)
> > +{
> > +    VVCTaskCallbacks *cb = &e->cb;
> > +    VVCTasklet **prev;
> > +
> > +    pthread_mutex_lock(&e->lock);
> > +    for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev
> =
> > &(*prev)->next) +        /* nothing */;
> > +    add_task(prev, t);
> > +    pthread_cond_signal(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
> > +}
> > +
> > +void ff_vvc_executor_wakeup(VVCExecutor *e)
> > +{
> > +    pthread_mutex_lock(&e->lock);
> > +    pthread_cond_broadcast(&e->cond);
> > +    pthread_mutex_unlock(&e->lock);
>
> Signaling a condition variable without changing any state makes no sense.
>
It's for error handling.

Imaging last all thread are waiting for ready jobs except the last one.
The last one thread has some error, it reports an error to the caller.
The caller needs to wake up threads to flush the task list.


> > +}
>
> --
> Rémi Denis-Courmont
> http://www.remlab.net/
>
>
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
>


More information about the ffmpeg-devel mailing list