[FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization

Steven Liu lingjiujianke at gmail.com
Wed Aug 31 10:25:41 EEST 2022


Andreas Rheinhardt <andreas.rheinhardt at outlook.com> 于2022年8月31日周三 10:54写道:
>
> Lukas Fellechner:
> > This patch adds an "init-threads" option, specifying the max
> > number of threads to use. Multiple worker threads are spun up
> > to massively bring down init times.
> > ---
> >  libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 350 insertions(+), 1 deletion(-)
> >
> > diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> > index e82da45e43..20f2557ea3 100644
> > --- a/libavformat/dashdec.c
> > +++ b/libavformat/dashdec.c
> > @@ -24,6 +24,7 @@
> >  #include "libavutil/opt.h"
> >  #include "libavutil/time.h"
> >  #include "libavutil/parseutils.h"
> > +#include "libavutil/thread.h"
> >  #include "internal.h"
> >  #include "avio_internal.h"
> >  #include "dash.h"
> > @@ -152,6 +153,8 @@ typedef struct DASHContext {
> >      int max_url_size;
> >      char *cenc_decryption_key;
> >
> > +    int init_threads;
> > +
> >      /* Flags for init section*/
> >      int is_init_section_common_video;
> >      int is_init_section_common_audio;
> > @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value)
> >      }
> >  }
> >
> > +#if HAVE_THREADS
> > +
> > +struct work_pool_data
> > +{
> > +    AVFormatContext *ctx;
> > +    struct representation *pls;
> > +    struct representation *common_pls;
> > +    pthread_mutex_t *common_mutex;
> > +    pthread_cond_t *common_condition;
> > +    int is_common;
> > +    int is_started;
> > +    int result;
> > +};
> > +
> > +struct thread_data
>
> This is against our naming conventions: CamelCase for struct tags and
> typedefs, lowercase names with underscore for variable names.
>
> > +{
> > +    pthread_t thread;
> > +    pthread_mutex_t *mutex;
> > +    struct work_pool_data *work_pool;
> > +    int work_pool_size;
> > +    int is_started;
> > +    int has_error;
> > +};
> > +
> > +static void *worker_thread(void *ptr)
> > +{
> > +    int ret = 0;
> > +    int i;
> > +    struct thread_data *thread_data = (struct thread_data*)ptr;
> > +    struct work_pool_data *work_pool = NULL;
> > +    struct work_pool_data *data = NULL;
> > +    for (;;) {
> > +
> > +        // get next work item unless there was an error
> > +        pthread_mutex_lock(thread_data->mutex);
> > +        data = NULL;
> > +        if (!thread_data->has_error) {
> > +            work_pool = thread_data->work_pool;
> > +            for (i = 0; i < thread_data->work_pool_size; i++) {
> > +                if (!work_pool->is_started) {
> > +                    data = work_pool;
> > +                    data->is_started = 1;
> > +                    break;
> > +                }
> > +                work_pool++;
> > +            }
> > +        }
> > +        pthread_mutex_unlock(thread_data->mutex);
> > +
> > +        if (!data) {
> > +            // no more work to do
> > +            return NULL;
> > +        }
> > +
> > +        // if we are common section provider, init and signal
> > +        if (data->is_common) {
> > +            data->pls->parent = data->ctx;
> > +            ret = update_init_section(data->pls);
> > +            if (ret < 0) {
> > +                pthread_cond_signal(data->common_condition);
> > +                goto end;
> > +            }
> > +            else
> > +                ret = AVERROR(pthread_cond_signal(data->common_condition));
> > +        }
> > +
> > +        // if we depend on common section provider, wait for signal and copy
> > +        if (data->common_pls) {
> > +            ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> > +            if (ret < 0)
> > +                goto end;
> > +
> > +            if (!data->common_pls->init_sec_buf) {
> > +                goto end;
> > +                ret = AVERROR(EFAULT);
> > +            }
> > +
> > +            ret = copy_init_section(data->pls, data->common_pls);
> > +            if (ret < 0)
> > +                goto end;
> > +        }
> > +
> > +        ret = begin_open_demux_for_component(data->ctx, data->pls);
> > +        if (ret < 0)
> > +            goto end;
> > +
> > +    end:
> > +        data->result = ret;
> > +
> > +        // notify error to other threads and exit
> > +        if (ret < 0) {
> > +            pthread_mutex_lock(thread_data->mutex);
> > +            thread_data->has_error = 1;
> > +            pthread_mutex_unlock(thread_data->mutex);
> > +            return NULL;
> > +        }
> > +    }
> > +
> > +
> > +    return NULL;
> > +}
> > +
> > +static void create_work_pool_data(AVFormatContext *ctx, int stream_index,
> > +    struct representation *pls, struct representation *common_pls,
> > +    struct work_pool_data *init_data, pthread_mutex_t *common_mutex,
> > +    pthread_cond_t *common_condition)
> > +{
> > +    init_data->ctx = ctx;
> > +    init_data->pls = pls;
> > +    init_data->pls->stream_index = stream_index;
> > +    init_data->common_condition = common_condition;
> > +    init_data->common_mutex = common_mutex;
> > +    init_data->result = -1;
> > +
> > +    if (pls == common_pls) {
> > +        init_data->is_common = 1;
> > +    }
> > +    else if (common_pls) {
> > +        init_data->common_pls = common_pls;
> > +    }
> > +}
> > +
> > +static int start_thread(struct thread_data *thread_data,
> > +    struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> > +{
> > +    int ret;
> > +
> > +    thread_data->mutex = mutex;
> > +    thread_data->work_pool = work_pool;
> > +    thread_data->work_pool_size = work_pool_size;
> > +
> > +    ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> > +    if (ret == 0)
> > +        thread_data->is_started = 1;
> > +
> > +    return ret;
> > +}
> > +
> > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
> > +{
> > +    DASHContext *c = s->priv_data;
> > +    int ret = 0;
> > +    int stream_index = 0;
> > +    int i;
>
> We allow "for (int i = 0;"
>
> > +
> > +    // alloc data
> > +    struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> > +    if (!init_data)
> > +        return AVERROR(ENOMEM);
> > +
> > +    struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> > +    if (!thread_data)
> > +        return AVERROR(ENOMEM);
>
> 1. init_data leaks here on error.
> 2. In fact, it seems to me that both init_data and thread_data are
> nowhere freed.
>
> > +
> > +    // alloc mutex and conditions
> > +    pthread_mutex_t work_pool_mutex;
> > +
> > +    pthread_mutex_t common_video_mutex;
> > +    pthread_cond_t common_video_cond;
> > +
> > +    pthread_mutex_t common_audio_mutex;
> > +    pthread_cond_t common_audio_cond;
> > +
> > +    pthread_mutex_t common_subtitle_mutex;
> > +    pthread_cond_t common_subtitle_cond;
> > +
> > +    // init mutex and conditions
> > +    ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> > +    if (ret < 0)
> > +        goto cleanup;
> > +
> > +    if (c->is_init_section_common_video) {
> > +        ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +
> > +        ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +    }
> > +
> > +    if (c->is_init_section_common_audio) {
> > +        ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +
> > +        ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +    }
> > +
> > +    if (c->is_init_section_common_subtitle) {
> > +        ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +
> > +        ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> > +        if (ret < 0)
> > +            goto cleanup;
> > +    }
> > +
> > +    // init work pool data
> > +    struct work_pool_data* current_data = init_data;
> > +
> > +    for (i = 0; i < c->n_videos; i++) {
> > +        create_work_pool_data(s, stream_index, c->videos[i],
> > +            c->is_init_section_common_video ? c->videos[0] : NULL,
> > +            current_data, &common_video_mutex, &common_video_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
> > +
> > +    for (i = 0; i < c->n_audios; i++) {
> > +        create_work_pool_data(s, stream_index, c->audios[i],
> > +            c->is_init_section_common_audio ? c->audios[0] : NULL,
> > +            current_data, &common_audio_mutex, &common_audio_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
> > +
> > +    for (i = 0; i < c->n_subtitles; i++) {
> > +        create_work_pool_data(s, stream_index, c->subtitles[i],
> > +            c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> > +            current_data, &common_subtitle_mutex, &common_subtitle_cond);
> > +
> > +        stream_index++;
> > +        current_data++;
> > +    }
>
> This is very repetitive.
>
> > +
> > +    // start threads
> > +    struct thread_data *current_thread = thread_data;
> > +    for (i = 0; i < threads; i++) {
> > +        ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
> > +        if (ret < 0)
> > +            goto cleanup;
> > +
> > +        current_thread++;
> > +    }
> > +
> > +cleanup:
> > +    // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> > +    int initResult = ret;
> > +    int runResult = 0;
> > +    int cleanupResult = 0;
> > +
> > +    // join threads
> > +    current_thread = thread_data;
> > +    for (i = 0; i < threads; i++) {
> > +        if (current_thread->is_started) {
> > +            ret = AVERROR(pthread_join(current_thread->thread, NULL));
> > +            if (ret < 0)
> > +                cleanupResult = ret;
> > +        }
> > +        current_thread++;
> > +    }
> > +
> > +    // finalize streams and collect results
> > +    current_data = init_data;
> > +    for (i = 0; i < nstreams; i++) {
> > +        if (current_data->result < 0) {
> > +            // thread ran into error: collect result and break
> > +            runResult = current_data->result;
> > +            break;
> > +        }
> > +        else {
> > +            // thread success: create streams on AVFormatContext
> > +            ret = end_open_demux_for_component(s, current_data->pls);
> > +            if (ret < 0)
> > +                runResult = ret;
> > +        }
> > +        current_data++;
> > +    }
> > +
> > +    // cleanup mutex and conditions
> > +    ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> > +    if (ret < 0)
> > +        cleanupResult = ret;
> > +
> > +    if (c->is_init_section_common_video) {
> > +        ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +
> > +        ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +    }
> > +
> > +    if (c->is_init_section_common_audio) {
> > +        ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +
> > +        ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +    }
> > +
> > +    if (c->is_init_section_common_subtitle) {
> > +        ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +
> > +        ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> > +        if (ret < 0)
> > +            cleanupResult = ret;
> > +    }
> > +
> > +    // return results if errors have occured in one of the phases
> > +    if (initResult < 0)
> > +        return initResult;
> > +
> > +    if (runResult < 0)
> > +        return runResult;
> > +
> > +    if (cleanupResult < 0)
> > +        return cleanupResult;
> > +
> > +    return 0;
> > +}
> > +
> > +#endif
> > +
> >  static int dash_read_header(AVFormatContext *s)
> >  {
> >      DASHContext *c = s->priv_data;
> > @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s)
> >      if (c->n_subtitles)
> >          c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> >
> > +    int threads = 0;
> > +    int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> > +
> > +#if HAVE_THREADS
> > +    threads = FFMIN(nstreams, c->init_threads);
> > +#endif
> > +
> > +    if (threads > 1)
> > +    {
> > +#if HAVE_THREADS
> > +        ret = init_streams_multithreaded(s, nstreams, threads);
> > +        if (ret < 0)
> > +            return ret;
> > +#endif
> > +    }
> > +    else
> > +    {
> >      /* Open the demuxer for video and audio components if available */
> >      for (i = 0; i < c->n_videos; i++) {
> >          rep = c->videos[i];
> > @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s)
> >
> >      if (!stream_index)
> >          return AVERROR_INVALIDDATA;
> > +    }
> >
> >      /* Create a program */
> >      program = av_new_program(s, 0);
> > @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = {
> >          OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
> >          {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
> >          INT_MIN, INT_MAX, FLAGS},
> > -    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> > +    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
> > +        AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> > +    { "init_threads", "Number of threads to use for initializing the DASH stream",
> > +        OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
> >      {NULL}
> >  };
> >
> > --
> > 2.31.1.windows.1
> >
>
> 1. We actually have an API to process multiple tasks by different
> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
I saw that usually be used in avfilters for slice multi-thread, or i
misunderstand something?

> 2. In case initialization of one of the conditions/mutexes fails, you
> are nevertheless destroying them; you are even destroying completely
> uninitialized mutexes. This is undefined behaviour. Checking the result
> of it does not fix this.
>
> - Andreas


Thanks
Steven


More information about the ffmpeg-devel mailing list