[FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization
Steven Liu
lingjiujianke at gmail.com
Sun Aug 21 07:10:18 EEST 2022
Lukas Fellechner <Lukas.Fellechner at gmx.net> 于2022年8月21日周日 05:54写道:
>
> Trying with inline PATCH since attached file was not showing up...
>
> ---
>
> From: Lukas Fellechner <lukas.fellechner at gmx.net>
> Subject: [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization
>
> Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times.
> ---
> libavformat/dashdec.c | 421 +++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 375 insertions(+), 46 deletions(-)
>
> diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> index 63bf7e96a5..69a6c2ba79 100644
> --- a/libavformat/dashdec.c
> +++ b/libavformat/dashdec.c
> @@ -24,6 +24,7 @@
> #include "libavutil/opt.h"
> #include "libavutil/time.h"
> #include "libavutil/parseutils.h"
> +#include "libavutil/thread.h"
> #include "internal.h"
> #include "avio_internal.h"
> #include "dash.h"
> @@ -152,6 +153,8 @@ typedef struct DASHContext {
> int max_url_size;
> char *cenc_decryption_key;
>
> + int init_threads;
> +
> /* Flags for init section*/
> int is_init_section_common_video;
> int is_init_section_common_audio;
> @@ -1918,22 +1921,40 @@ fail:
> return ret;
> }
>
> -static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
> +static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> + int ret = 0;
> +
> + ret = begin_open_demux_for_component(s, pls);
> + if (ret < 0)
> + return ret;
> +
> + ret = end_open_demux_for_component(s, pls);
> +
> + return ret;
> +}
> +
> +static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls)
> {
> int ret = 0;
> - int i;
>
> pls->parent = s;
> - pls->cur_seq_no = calc_cur_seg_no(s, pls);
> + pls->cur_seq_no = calc_cur_seg_no(s, pls);
>
> if (!pls->last_seq_no) {
> pls->last_seq_no = calc_max_seg_no(pls, s->priv_data);
> }
>
> ret = reopen_demux_for_component(s, pls);
> - if (ret < 0) {
> - goto fail;
> - }
> +
> + return ret;
> +}
> +
> +static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> + int ret = 0;
> + int i;
> +
> for (i = 0; i < pls->ctx->nb_streams; i++) {
> AVStream *st = avformat_new_stream(s, NULL);
> AVStream *ist = pls->ctx->streams[i];
> @@ -2015,6 +2036,131 @@ static void move_metadata(AVStream *st, const char *key, char **value)
> }
> }
>
> +struct work_pool_data
> +{
> + AVFormatContext* ctx;
> + struct representation* pls;
> + struct representation* common_pls;
> + pthread_mutex_t* common_mutex;
> + pthread_cond_t* common_condition;
Should add #if HAVE_THREADS to check if the pthread supported.
> + int is_common;
> + int is_started;
> + int result;
> +};
> +
> +struct thread_data
> +{
> + pthread_t thread;
> + pthread_mutex_t* mutex;
> + struct work_pool_data* work_pool;
> + int work_pool_size;
> + int is_started;
> +};
> +
> +static void *worker_thread(void *ptr)
> +{
> + int ret = 0;
> + int i;
> + struct thread_data* thread_data = (struct thread_data*)ptr;
> + struct work_pool_data* work_pool = NULL;
> + struct work_pool_data* data = NULL;
> + for (;;) {
> +
> + // get next work item
> + pthread_mutex_lock(thread_data->mutex);
> + data = NULL;
> + work_pool = thread_data->work_pool;
> + for (i = 0; i < thread_data->work_pool_size; i++) {
> + if (!work_pool->is_started) {
> + data = work_pool;
> + data->is_started = 1;
> + break;
> + }
> + work_pool++;
> + }
> + pthread_mutex_unlock(thread_data->mutex);
> +
> + if (!data) {
> + // no more work to do
> + return NULL;
> + }
> +
> + // if we are common section provider, init and signal
> + if (data->is_common) {
> + data->pls->parent = data->ctx;
> + ret = update_init_section(data->pls);
> + if (ret < 0) {
> + pthread_cond_signal(data->common_condition);
> + goto end;
> + }
> + else
> + ret = AVERROR(pthread_cond_signal(data->common_condition));
> + }
> +
> + // if we depend on common section provider, wait for signal and copy
> + if (data->common_pls) {
> + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> + if (ret < 0)
> + goto end;
> +
> + if (!data->common_pls->init_sec_buf) {
> + goto end;
> + ret = AVERROR(EFAULT);
> + }
> +
> + ret = copy_init_section(data->pls, data->common_pls);
> + if (ret < 0)
> + goto end;
> + }
> +
> + ret = begin_open_demux_for_component(data->ctx, data->pls);
> + if (ret < 0)
> + goto end;
> +
> + end:
> + data->result = ret;
> + }
> +
> +
> + return NULL;
> +}
> +
> +static void create_work_pool_data(AVFormatContext* ctx, int stream_index,
> + struct representation* pls, struct representation* common_pls,
> + struct work_pool_data* init_data, pthread_mutex_t* common_mutex,
> + pthread_cond_t* common_condition)
> +{
> + init_data->ctx = ctx;
> + init_data->pls = pls;
> + init_data->pls->stream_index = stream_index;
> + init_data->common_condition = common_condition;
> + init_data->common_mutex = common_mutex;
> + init_data->result = -1;
> +
> + if (pls == common_pls) {
> + init_data->is_common = 1;
> + }
> + else if (common_pls) {
> + init_data->common_pls = common_pls;
> + }
> +}
> +
> +static int start_thread(struct thread_data *thread_data,
> + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> +{
> + int ret;
> +
> + thread_data->mutex = mutex;
> + thread_data->work_pool = work_pool;
> + thread_data->work_pool_size = work_pool_size;
> +
> + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> + if (ret == 0)
> + thread_data->is_started = 1;
> +
> + return ret;
> +}
> +
> static int dash_read_header(AVFormatContext *s)
> {
> DASHContext *c = s->priv_data;
> @@ -2040,63 +2186,245 @@ static int dash_read_header(AVFormatContext *s)
> av_dict_set(&c->avio_opts, "seekable", "0", 0);
> }
>
> - if(c->n_videos)
> + if (c->n_videos)
> c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
>
> - /* Open the demuxer for video and audio components if available */
> - for (i = 0; i < c->n_videos; i++) {
> - rep = c->videos[i];
> - if (i > 0 && c->is_init_section_common_video) {
> - ret = copy_init_section(rep, c->videos[0]);
> + if (c->n_audios)
> + c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> +
> + if (c->n_subtitles)
> + c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> +
> + int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> + int threads = FFMIN(nstreams, c->init_threads);
> +
> + if (threads > 1)
> + {
> + // alloc data
> + struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> + if (!init_data)
> + return AVERROR(ENOMEM);
> +
> + struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> + if (!thread_data)
> + return AVERROR(ENOMEM);
> +
> + // alloc mutex and conditions
> + pthread_mutex_t work_pool_mutex;
> +
> + pthread_mutex_t common_video_mutex;
> + pthread_cond_t common_video_cond;
> +
> + pthread_mutex_t common_audio_mutex;
> + pthread_cond_t common_audio_cond;
> +
> + pthread_mutex_t common_subtitle_mutex;
> + pthread_cond_t common_subtitle_cond;
> +
> + // init mutex and conditions
> + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
> +
> + if (c->is_init_section_common_video) {
> + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> - }
> + if (c->is_init_section_common_audio) {
> + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
>
> - if(c->n_audios)
> - c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> + }
>
> - for (i = 0; i < c->n_audios; i++) {
> - rep = c->audios[i];
> - if (i > 0 && c->is_init_section_common_audio) {
> - ret = copy_init_section(rep, c->audios[0]);
> + if (c->is_init_section_common_subtitle) {
> + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> - }
> + // init work pool data
> + struct work_pool_data* current_data = init_data;
>
> - if (c->n_subtitles)
> - c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> + for (i = 0; i < c->n_videos; i++) {
> + create_work_pool_data(s, stream_index, c->videos[i],
> + c->is_init_section_common_video ? c->videos[0] : NULL,
> + current_data, &common_video_mutex, &common_video_cond);
>
> - for (i = 0; i < c->n_subtitles; i++) {
> - rep = c->subtitles[i];
> - if (i > 0 && c->is_init_section_common_subtitle) {
> - ret = copy_init_section(rep, c->subtitles[0]);
> + stream_index++;
> + current_data++;
> + }
> +
> + for (i = 0; i < c->n_audios; i++) {
> + create_work_pool_data(s, stream_index, c->audios[i],
> + c->is_init_section_common_audio ? c->audios[0] : NULL,
> + current_data, &common_audio_mutex, &common_audio_cond);
> +
> + stream_index++;
> + current_data++;
> + }
> +
> + for (i = 0; i < c->n_subtitles; i++) {
> + create_work_pool_data(s, stream_index, c->subtitles[i],
> + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> + current_data, &common_subtitle_mutex, &common_subtitle_cond);
> +
> + stream_index++;
> + current_data++;
> + }
> +
> + // start threads
> + struct thread_data* current_thread = thread_data;
> + for (i = 0; i < threads; i++) {
> + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + current_thread++;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> + cleanup:
> + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> + int initResult = ret;
> + int runResult = 0;
> + int cleanupResult = 0;
> +
> + // join threads
> + current_thread = thread_data;
> + for (i = 0; i < threads; i++) {
> + if (current_thread->is_started) {
> + ret = AVERROR(pthread_join(current_thread->thread, NULL));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> + current_thread++;
> + }
> +
> + // finalize streams and collect results
> + current_data = init_data;
> + for (i = 0; i < nstreams; i++) {
> + if (current_data->result < 0) {
> + // thread ran into error: collect result
> + runResult = current_data->result;
> + }
> + else {
> + // thread success: create streams on AVFormatContext
> + ret = end_open_demux_for_component(s, current_data->pls);
> + if (ret < 0)
> + runResult = ret;
> + }
> + current_data++;
> + }
> +
> + // cleanup mutex and conditions
> + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + if (c->is_init_section_common_video) {
> + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + if (c->is_init_section_common_audio) {
> + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + if (c->is_init_section_common_subtitle) {
> + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + // return results if errors have occured in one of the phases
> + if (initResult < 0)
> + return initResult;
> +
> + if (runResult < 0)
> + return runResult;
> +
> + if (cleanupResult < 0)
> + return cleanupResult;
> }
> + else
> + {
> + /* Open the demuxer for video and audio components if available */
> + for (i = 0; i < c->n_videos; i++) {
> + rep = c->videos[i];
> + if (i > 0 && c->is_init_section_common_video) {
> + ret = copy_init_section(rep, c->videos[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
>
> - if (!stream_index)
> - return AVERROR_INVALIDDATA;
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + for (i = 0; i < c->n_audios; i++) {
> + rep = c->audios[i];
> + if (i > 0 && c->is_init_section_common_audio) {
> + ret = copy_init_section(rep, c->audios[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
> +
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + for (i = 0; i < c->n_subtitles; i++) {
> + rep = c->subtitles[i];
> + if (i > 0 && c->is_init_section_common_subtitle) {
> + ret = copy_init_section(rep, c->subtitles[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
> +
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + if (!stream_index)
> + return AVERROR_INVALIDDATA;
> + }
>
> /* Create a program */
> program = av_new_program(s, 0);
> @@ -2349,6 +2677,7 @@ static const AVOption dash_options[] = {
> {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
> INT_MIN, INT_MAX, FLAGS},
> { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> + { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
> {NULL}
> };
>
> --
> 2.31.1.windows.1
>
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
More information about the ffmpeg-devel
mailing list