[FFmpeg-devel] [PATCH] Improved the performance of 1 decode + N filter graphs and adaptive bitrate.
Shaofei Wang
shaofei.wang at intel.com
Wed Mar 27 00:07:21 EET 2019
It enabled MULTIPLE SIMPLE filter graph concurrency, which bring above about
4%~20% improvement in some 1:N scenarios by CPU or GPU acceleration
Below are some test cases and comparison as reference.
(Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
(Software: Intel iHD driver - 16.9.00100, CentOS 7)
For 1:N transcode by GPU acceleration with vaapi:
./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
-hwaccel_output_format vaapi \
-i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
-vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
-vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null
test results:
2 encoders 5 encoders 10 encoders
Improved 6.1% 6.9% 5.5%
For 1:N transcode by GPU acceleration with QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
-i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
-vf "scale_qsv=1280:720:format=nv12" -c:v h264_qsv -f null /dev/null \
-vf "scale_qsv=720:480:format=nv12" -c:v h264_qsv -f null /dev/null
test results:
2 encoders 5 encoders 10 encoders
Improved 6% 4% 15%
For Intel GPU acceleration case, 1 decode to N scaling, by QSV:
./ffmpeg -hwaccel qsv -c:v h264_qsv \
-i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
-vf "scale_qsv=1280:720:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null \
-vf "scale_qsv=720:480:format=nv12,hwdownload" -pix_fmt nv12 -f null /dev/null
test results:
2 scale 5 scale 10 scale
Improved 12% 21% 21%
For CPU only 1 decode to N scaling:
./ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
-vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
-vf "scale=720:480" -pix_fmt nv12 -f null /dev/null
test results:
2 scale 5 scale 10 scale
Improved 25% 107% 148%
Signed-off-by: Wang, Shaofei <shaofei.wang at intel.com>
---
The patch will only effect on multiple SIMPLE filter graphs pipeline,
Passed fate and refine the possible data race,
AFL tested, without introducing extra crashs/hangs
fftools/ffmpeg.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++++------
fftools/ffmpeg.h | 13 +++++
2 files changed, 169 insertions(+), 16 deletions(-)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 544f1a1..5f6e712 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -164,7 +164,13 @@ static struct termios oldtty;
static int restore_tty;
#endif
+/* enable abr threads when there were multiple simple filter graphs*/
+static int abr_threads_enabled = 0;
+
#if HAVE_THREADS
+pthread_mutex_t fg_config_mutex;
+pthread_mutex_t ost_init_mutex;
+
static void free_input_threads(void);
#endif
@@ -509,6 +515,17 @@ static void ffmpeg_cleanup(int ret)
}
av_fifo_freep(&fg->inputs[j]->ist->sub2video.sub_queue);
}
+#if HAVE_THREADS
+ if (abr_threads_enabled) {
+ av_frame_free(&fg->inputs[j]->input_frm);
+ pthread_mutex_lock(&fg->inputs[j]->process_mutex);
+ fg->inputs[j]->waited_frm = NULL;
+ fg->inputs[j]->t_end = 1;
+ pthread_cond_signal(&fg->inputs[j]->process_cond);
+ pthread_mutex_unlock(&fg->inputs[j]->process_mutex);
+ pthread_join(fg->inputs[j]->abr_thread, NULL);
+ }
+#endif
av_buffer_unref(&fg->inputs[j]->hw_frames_ctx);
av_freep(&fg->inputs[j]->name);
av_freep(&fg->inputs[j]);
@@ -1419,12 +1436,13 @@ static void finish_output_stream(OutputStream *ost)
*
* @return 0 for success, <0 for severe errors
*/
-static int reap_filters(int flush)
+static int reap_filters(int flush, InputFilter * ifilter)
{
AVFrame *filtered_frame = NULL;
int i;
- /* Reap all buffers present in the buffer sinks */
+ /* Reap all buffers present in the buffer sinks or just reap specified
+ * buffer which related with the filter graph who got ifilter as input*/
for (i = 0; i < nb_output_streams; i++) {
OutputStream *ost = output_streams[i];
OutputFile *of = output_files[ost->file_index];
@@ -1432,13 +1450,25 @@ static int reap_filters(int flush)
AVCodecContext *enc = ost->enc_ctx;
int ret = 0;
+ if (ifilter && abr_threads_enabled)
+ if (ost != ifilter->graph->outputs[0]->ost)
+ continue;
+
if (!ost->filter || !ost->filter->graph->graph)
continue;
filter = ost->filter->filter;
if (!ost->initialized) {
char error[1024] = "";
+#if HAVE_THREADS
+ if (abr_threads_enabled)
+ pthread_mutex_lock(&ost_init_mutex);
+#endif
ret = init_output_stream(ost, error, sizeof(error));
+#if HAVE_THREADS
+ if (abr_threads_enabled)
+ pthread_mutex_unlock(&ost_init_mutex);
+#endif
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error initializing output stream %d:%d -- %s\n",
ost->file_index, ost->index, error);
@@ -2179,13 +2209,22 @@ static int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame)
}
}
- ret = reap_filters(1);
+ ret = (HAVE_THREADS && abr_threads_enabled) ? reap_filters(1, ifilter) : reap_filters(1, NULL);
+
if (ret < 0 && ret != AVERROR_EOF) {
av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
return ret;
}
+#if HAVE_THREADS
+ if (abr_threads_enabled)
+ pthread_mutex_lock(&fg_config_mutex);
+#endif
ret = configure_filtergraph(fg);
+#if HAVE_THREADS
+ if (abr_threads_enabled)
+ pthread_mutex_unlock(&fg_config_mutex);
+#endif
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error reinitializing filters!\n");
return ret;
@@ -2252,29 +2291,98 @@ static int decode(AVCodecContext *avctx, AVFrame *frame, int *got_frame, AVPacke
return 0;
}
+#if HAVE_THREADS
+static void *filter_pipeline(void *arg)
+{
+ InputFilter *fl = arg;
+ AVFrame *frm;
+ int ret;
+ while(1) {
+ pthread_mutex_lock(&fl->process_mutex);
+ while (fl->waited_frm == NULL && !fl->t_end)
+ pthread_cond_wait(&fl->process_cond, &fl->process_mutex);
+ pthread_mutex_unlock(&fl->process_mutex);
+
+ if (fl->t_end) break;
+
+ frm = fl->waited_frm;
+ pthread_mutex_lock(&fl->ifilter_mutex);
+ ret = ifilter_send_frame(fl, frm);
+ pthread_mutex_unlock(&fl->ifilter_mutex);
+ if (ret == AVERROR_EOF)
+ ret = 0;
+ else if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+ } else {
+ ret = reap_filters(1, fl);
+ }
+ fl->t_error = ret;
+
+ pthread_mutex_lock(&fl->finish_mutex);
+ pthread_cond_signal(&fl->finish_cond);
+ fl->waited_frm = NULL;
+ pthread_mutex_unlock(&fl->finish_mutex);
+ }
+ fl->waited_frm = NULL;
+ pthread_mutex_lock(&fl->finish_mutex);
+ pthread_cond_signal(&fl->finish_cond);
+ pthread_mutex_unlock(&fl->finish_mutex);
+ return fl;
+}
+#endif
+
static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
{
- int i, ret;
+ int i, ret = 0;
AVFrame *f;
av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
+
for (i = 0; i < ist->nb_filters; i++) {
if (i < ist->nb_filters - 1) {
- f = ist->filter_frame;
+ f = (HAVE_THREADS && abr_threads_enabled) ? ist->filters[i]->input_frm : ist->filter_frame;
ret = av_frame_ref(f, decoded_frame);
if (ret < 0)
break;
} else
f = decoded_frame;
- ret = ifilter_send_frame(ist->filters[i], f);
- if (ret == AVERROR_EOF)
- ret = 0; /* ignore */
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR,
- "Failed to inject frame into filter network: %s\n", av_err2str(ret));
- break;
+ if (!HAVE_THREADS || !abr_threads_enabled) {
+ ret = ifilter_send_frame(ist->filters[i], f);
+ if (ret == AVERROR_EOF)
+ ret = 0; /* ignore */
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Failed to inject frame into filter network: %s\n", av_err2str(ret));
+ break;
+ }
+ }
+#if HAVE_THREADS
+ if (abr_threads_enabled) {
+ pthread_mutex_lock(&ist->filters[i]->process_mutex);
+ ist->filters[i]->waited_frm = f;
+ pthread_cond_signal(&ist->filters[i]->process_cond);
+ pthread_mutex_unlock(&ist->filters[i]->process_mutex);
+ }
+#endif
+ }
+#if HAVE_THREADS
+ if (abr_threads_enabled && ret >= 0) {
+ for (i = 0; i < ist->nb_filters; i++) {
+ pthread_mutex_lock(&ist->filters[i]->finish_mutex);
+ while(ist->filters[i]->waited_frm != NULL)
+ pthread_cond_wait(&ist->filters[i]->finish_cond,
+ &ist->filters[i]->finish_mutex);
+ pthread_mutex_unlock(&ist->filters[i]->finish_mutex);
+ }
+ for (i = 0; i < ist->nb_filters; i++) {
+ if (ist->filters[i]->t_error < 0) {
+ ret = ist->filters[i]->t_error;
+ break;
+ }
}
}
+#endif
return ret;
}
@@ -2334,7 +2442,6 @@ static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output,
(AVRational){1, avctx->sample_rate});
ist->nb_samples = decoded_frame->nb_samples;
err = send_frame_to_filters(ist, decoded_frame);
-
av_frame_unref(ist->filter_frame);
av_frame_unref(decoded_frame);
return err < 0 ? err : ret;
@@ -3737,6 +3844,39 @@ static int transcode_init(void)
}
}
+ if (nb_filtergraphs > 1 && filtergraph_is_simple(filtergraphs[0]))
+ abr_threads_enabled = 1;
+#if HAVE_THREADS
+ if (abr_threads_enabled) {
+ for (i = 0; i < nb_input_streams; i++) {
+ ist = input_streams[i];
+ for (j = 0; j < ist->nb_filters; j++) {
+ pthread_mutex_init(&ist->filters[j]->process_mutex, NULL);
+ pthread_mutex_init(&ist->filters[j]->finish_mutex, NULL);
+ pthread_cond_init(&ist->filters[j]->process_cond, NULL);
+ pthread_cond_init(&ist->filters[j]->finish_cond, NULL);
+ pthread_mutex_init(&ist->filters[j]->ifilter_mutex, NULL);
+ if (i == 0) {
+ pthread_mutex_init(&fg_config_mutex, NULL);
+ pthread_mutex_init(&ost_init_mutex, NULL);
+ }
+ ist->filters[j]->t_end = 0;
+ ist->filters[j]->t_error = 0;
+ ist->filters[j]->input_frm = av_frame_alloc();
+ if (!ist->filters[j]->input_frm)
+ return AVERROR(ENOMEM);
+
+ if ((ret = pthread_create(&ist->filters[j]->abr_thread, NULL, filter_pipeline,
+ ist->filters[j]))) {
+ av_log(NULL, AV_LOG_ERROR,
+ "abr pipeline pthread_create failed.\n");
+ return AVERROR(ret);
+ }
+ }
+ }
+ }
+#endif
+
dump_format:
/* dump the stream mapping */
av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
@@ -4537,10 +4677,10 @@ static int transcode_from_filter(FilterGraph *graph, InputStream **best_ist)
*best_ist = NULL;
ret = avfilter_graph_request_oldest(graph->graph);
if (ret >= 0)
- return reap_filters(0);
+ return reap_filters(0, NULL);
if (ret == AVERROR_EOF) {
- ret = reap_filters(1);
+ ret = reap_filters(1, NULL);
for (i = 0; i < graph->nb_outputs; i++)
close_output_stream(graph->outputs[i]->ost);
return ret;
@@ -4642,7 +4782,7 @@ static int transcode_step(void)
if (ret < 0)
return ret == AVERROR_EOF ? 0 : ret;
- return reap_filters(0);
+ return (HAVE_THREADS && abr_threads_enabled) ? ret : reap_filters(0, NULL);
}
/*
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index eb1eaf6..b1179f9 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -253,6 +253,19 @@ typedef struct InputFilter {
AVBufferRef *hw_frames_ctx;
+ // for abr pipeline
+ AVFrame *waited_frm;
+ AVFrame *input_frm;
+#if HAVE_THREADS
+ pthread_t abr_thread;
+ pthread_cond_t process_cond;
+ pthread_cond_t finish_cond;
+ pthread_mutex_t process_mutex;
+ pthread_mutex_t finish_mutex;
+ pthread_mutex_t ifilter_mutex;
+ int t_end;
+ int t_error;
+#endif
int eof;
} InputFilter;
--
1.8.3.1
More information about the ffmpeg-devel
mailing list