[FFmpeg-devel] [PATCH v2] lavf/dashdec: Multithreaded DASH initialization
Lukas Fellechner
lukas.fellechner at gmx.net
Sun Aug 21 22:26:36 EEST 2022
Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times.
---
libavformat/dashdec.c | 432 +++++++++++++++++++++++++++++++++++++-----
1 file changed, 386 insertions(+), 46 deletions(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 63bf7e96a5..7eca3e3415 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,7 @@
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
#include "internal.h"
#include "avio_internal.h"
#include "dash.h"
@@ -152,6 +153,8 @@ typedef struct DASHContext {
int max_url_size;
char *cenc_decryption_key;
+ int init_threads;
+
/* Flags for init section*/
int is_init_section_common_video;
int is_init_section_common_audio;
@@ -1918,22 +1921,40 @@ fail:
return ret;
}
-static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
+static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+ int ret = 0;
+
+ ret = begin_open_demux_for_component(s, pls);
+ if (ret < 0)
+ return ret;
+
+ ret = end_open_demux_for_component(s, pls);
+
+ return ret;
+}
+
+static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls)
{
int ret = 0;
- int i;
pls->parent = s;
- pls->cur_seq_no = calc_cur_seg_no(s, pls);
+ pls->cur_seq_no = calc_cur_seg_no(s, pls);
if (!pls->last_seq_no) {
pls->last_seq_no = calc_max_seg_no(pls, s->priv_data);
}
ret = reopen_demux_for_component(s, pls);
- if (ret < 0) {
- goto fail;
- }
+
+ return ret;
+}
+
+static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+ int ret = 0;
+ int i;
+
for (i = 0; i < pls->ctx->nb_streams; i++) {
AVStream *st = avformat_new_stream(s, NULL);
AVStream *ist = pls->ctx->streams[i];
@@ -2015,6 +2036,135 @@ static void move_metadata(AVStream *st, const char *key, char **value)
}
}
+#if HAVE_THREADS
+
+struct work_pool_data
+{
+ AVFormatContext* ctx;
+ struct representation* pls;
+ struct representation* common_pls;
+ pthread_mutex_t* common_mutex;
+ pthread_cond_t* common_condition;
+ int is_common;
+ int is_started;
+ int result;
+};
+
+struct thread_data
+{
+ pthread_t thread;
+ pthread_mutex_t* mutex;
+ struct work_pool_data* work_pool;
+ int work_pool_size;
+ int is_started;
+};
+
+static void *worker_thread(void *ptr)
+{
+ int ret = 0;
+ int i;
+ struct thread_data* thread_data = (struct thread_data*)ptr;
+ struct work_pool_data* work_pool = NULL;
+ struct work_pool_data* data = NULL;
+ for (;;) {
+
+ // get next work item
+ pthread_mutex_lock(thread_data->mutex);
+ data = NULL;
+ work_pool = thread_data->work_pool;
+ for (i = 0; i < thread_data->work_pool_size; i++) {
+ if (!work_pool->is_started) {
+ data = work_pool;
+ data->is_started = 1;
+ break;
+ }
+ work_pool++;
+ }
+ pthread_mutex_unlock(thread_data->mutex);
+
+ if (!data) {
+ // no more work to do
+ return NULL;
+ }
+
+ // if we are common section provider, init and signal
+ if (data->is_common) {
+ data->pls->parent = data->ctx;
+ ret = update_init_section(data->pls);
+ if (ret < 0) {
+ pthread_cond_signal(data->common_condition);
+ goto end;
+ }
+ else
+ ret = AVERROR(pthread_cond_signal(data->common_condition));
+ }
+
+ // if we depend on common section provider, wait for signal and copy
+ if (data->common_pls) {
+ ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+ if (ret < 0)
+ goto end;
+
+ if (!data->common_pls->init_sec_buf) {
+ goto end;
+ ret = AVERROR(EFAULT);
+ }
+
+ ret = copy_init_section(data->pls, data->common_pls);
+ if (ret < 0)
+ goto end;
+ }
+
+ ret = begin_open_demux_for_component(data->ctx, data->pls);
+ if (ret < 0)
+ goto end;
+
+ end:
+ data->result = ret;
+ }
+
+
+ return NULL;
+}
+
+static void create_work_pool_data(AVFormatContext* ctx, int stream_index,
+ struct representation* pls, struct representation* common_pls,
+ struct work_pool_data* init_data, pthread_mutex_t* common_mutex,
+ pthread_cond_t* common_condition)
+{
+ init_data->ctx = ctx;
+ init_data->pls = pls;
+ init_data->pls->stream_index = stream_index;
+ init_data->common_condition = common_condition;
+ init_data->common_mutex = common_mutex;
+ init_data->result = -1;
+
+ if (pls == common_pls) {
+ init_data->is_common = 1;
+ }
+ else if (common_pls) {
+ init_data->common_pls = common_pls;
+ }
+}
+
+static int start_thread(struct thread_data *thread_data,
+ struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
+{
+ int ret;
+
+ thread_data->mutex = mutex;
+ thread_data->work_pool = work_pool;
+ thread_data->work_pool_size = work_pool_size;
+
+ ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
+ if (ret == 0)
+ thread_data->is_started = 1;
+
+ return ret;
+}
+
+#endif
+
static int dash_read_header(AVFormatContext *s)
{
DASHContext *c = s->priv_data;
@@ -2040,63 +2190,252 @@ static int dash_read_header(AVFormatContext *s)
av_dict_set(&c->avio_opts, "seekable", "0", 0);
}
- if(c->n_videos)
+ if (c->n_videos)
c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
- /* Open the demuxer for video and audio components if available */
- for (i = 0; i < c->n_videos; i++) {
- rep = c->videos[i];
- if (i > 0 && c->is_init_section_common_video) {
- ret = copy_init_section(rep, c->videos[0]);
+ if (c->n_audios)
+ c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+
+ if (c->n_subtitles)
+ c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+
+ int threads = 0;
+ int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+ threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+ if (threads > 1)
+ {
+#if HAVE_THREADS
+ // alloc data
+ struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
+ if (!init_data)
+ return AVERROR(ENOMEM);
+
+ struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
+ if (!thread_data)
+ return AVERROR(ENOMEM);
+
+ // alloc mutex and conditions
+ pthread_mutex_t work_pool_mutex;
+
+ pthread_mutex_t common_video_mutex;
+ pthread_cond_t common_video_cond;
+
+ pthread_mutex_t common_audio_mutex;
+ pthread_cond_t common_audio_cond;
+
+ pthread_mutex_t common_subtitle_mutex;
+ pthread_cond_t common_subtitle_cond;
+
+ // init mutex and conditions
+ ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
+
+ if (c->is_init_section_common_video) {
+ ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ if (c->is_init_section_common_audio) {
+ ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
- if(c->n_audios)
- c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+ ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
+ }
- for (i = 0; i < c->n_audios; i++) {
- rep = c->audios[i];
- if (i > 0 && c->is_init_section_common_audio) {
- ret = copy_init_section(rep, c->audios[0]);
+ if (c->is_init_section_common_subtitle) {
+ ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ // init work pool data
+ struct work_pool_data* current_data = init_data;
- if (c->n_subtitles)
- c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+ for (i = 0; i < c->n_videos; i++) {
+ create_work_pool_data(s, stream_index, c->videos[i],
+ c->is_init_section_common_video ? c->videos[0] : NULL,
+ current_data, &common_video_mutex, &common_video_cond);
- for (i = 0; i < c->n_subtitles; i++) {
- rep = c->subtitles[i];
- if (i > 0 && c->is_init_section_common_subtitle) {
- ret = copy_init_section(rep, c->subtitles[0]);
+ stream_index++;
+ current_data++;
+ }
+
+ for (i = 0; i < c->n_audios; i++) {
+ create_work_pool_data(s, stream_index, c->audios[i],
+ c->is_init_section_common_audio ? c->audios[0] : NULL,
+ current_data, &common_audio_mutex, &common_audio_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ for (i = 0; i < c->n_subtitles; i++) {
+ create_work_pool_data(s, stream_index, c->subtitles[i],
+ c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
+ current_data, &common_subtitle_mutex, &common_subtitle_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ // start threads
+ struct thread_data* current_thread = thread_data;
+ for (i = 0; i < threads; i++) {
+ ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ current_thread++;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
+ cleanup:
+ // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
+ int initResult = ret;
+ int runResult = 0;
+ int cleanupResult = 0;
+
+ // join threads
+ current_thread = thread_data;
+ for (i = 0; i < threads; i++) {
+ if (current_thread->is_started) {
+ ret = AVERROR(pthread_join(current_thread->thread, NULL));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+ current_thread++;
+ }
+
+ // finalize streams and collect results
+ current_data = init_data;
+ for (i = 0; i < nstreams; i++) {
+ if (current_data->result < 0) {
+ // thread ran into error: collect result
+ runResult = current_data->result;
+ }
+ else {
+ // thread success: create streams on AVFormatContext
+ ret = end_open_demux_for_component(s, current_data->pls);
+ if (ret < 0)
+ runResult = ret;
+ }
+ current_data++;
+ }
+
+ // cleanup mutex and conditions
+ ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ if (c->is_init_section_common_video) {
+ ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_video_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ if (c->is_init_section_common_audio) {
+ ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ if (c->is_init_section_common_subtitle) {
+ ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ // return results if errors have occured in one of the phases
+ if (initResult < 0)
+ return initResult;
+
+ if (runResult < 0)
+ return runResult;
+
+ if (cleanupResult < 0)
+ return cleanupResult;
+
+#endif
}
+ else
+ {
+ /* Open the demuxer for video and audio components if available */
+ for (i = 0; i < c->n_videos; i++) {
+ rep = c->videos[i];
+ if (i > 0 && c->is_init_section_common_video) {
+ ret = copy_init_section(rep, c->videos[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
- if (!stream_index)
- return AVERROR_INVALIDDATA;
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ for (i = 0; i < c->n_audios; i++) {
+ rep = c->audios[i];
+ if (i > 0 && c->is_init_section_common_audio) {
+ ret = copy_init_section(rep, c->audios[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
+
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ for (i = 0; i < c->n_subtitles; i++) {
+ rep = c->subtitles[i];
+ if (i > 0 && c->is_init_section_common_subtitle) {
+ ret = copy_init_section(rep, c->subtitles[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
+
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ if (!stream_index)
+ return AVERROR_INVALIDDATA;
+ }
/* Create a program */
program = av_new_program(s, 0);
@@ -2349,6 +2688,7 @@ static const AVOption dash_options[] = {
{.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
INT_MIN, INT_MAX, FLAGS},
{ "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
{NULL}
};
--
2.31.1.windows.1
More information about the ffmpeg-devel
mailing list