[FFmpeg-devel] [PATCH 1/2] Audio Video Filtering using threads & semaphores
Nicolas George
nicolas.george at normalesup.org
Thu Jul 26 17:00:47 CEST 2012
Le tridi 3 messidor, an CCXX, Manjunath Siddaiah a écrit :
> Ok, semaphores are replaced by POSIX mutexes and condition variables.
> This time only changes are in ffmpeg.c and
> no configure file changes and remains the same.
Thanks for the patch. I am afraid that with all the time that has passed, it
will need to be rebased and adapted. In particular, the logic of the
poll_filter function has been altered. Sorry for the delay.
Here are my comments.
As far as I understand, you intend to run the whole filtering process in a
different thread from the ffmpeg command-line tool. Is that right?
> diff --git a/ffmpeg.c b/ffmpeg.c
> old mode 100644
> new mode 100755
> index 17fe6e5..bcf23ca
> --- a/ffmpeg.c
> +++ b/ffmpeg.c
> @@ -327,6 +327,18 @@ typedef struct OutputStream {
> int copy_initial_nonkeyframes;
>
> int keep_pix_fmt;
> +#if CONFIG_AV_FILTER_THREADS
> + int avfilter_thread_alive;
> + pthread_t avfilter_thread;
> + pthread_mutex_t mutex_avfilter;
> + pthread_cond_t cond_avfilter;
> + int flag_avfilter;
> + pthread_mutex_t mutex_encoder;
> + pthread_cond_t cond_encoder;
> + int flag_encoder;
> + int graph_id;
> + int avfilter_ret;
> +#endif
Some comments about what the variables mean, and their relationships (who is
locking what) would be useful.
> } OutputStream;
>
>
> @@ -1894,6 +1906,144 @@ static void do_video_stats(AVFormatContext *os, OutputStream *ost,
> }
> }
>
> +#if CONFIG_AV_FILTER_THREADS
> +static int avfilter_frame(void *p)
It looks like this function is used as argument to pthread_create. The
prototype is wrong, and gratuitously so since the return value is
meaningless. Did your compiler not produce a warning for that?
> +{
> + OutputStream *ost = (OutputStream *)p;
> + while (ost->avfilter_thread_alive) {
ost->avfilter_thread_alive is shared: theoretically it must be protected by
a mutex.
> + pthread_mutex_lock(&ost->mutex_avfilter);
> + while (!ost->flag_avfilter)
> + pthread_cond_wait(&ost->cond_avfilter, &ost->mutex_avfilter);
> +
> + ost->avfilter_ret = avfilter_graph_request_oldest(filtergraphs[ost->graph_id]->graph);
> + pthread_mutex_unlock(&ost->mutex_avfilter);
> + ost->flag_avfilter = 0;
It looks wrong: you are doing the expensive stuff
(avfilter_graph_request_oldest is what triggers most of the work) while
holding the lock, and you release the lock just before you alter the
variable with the same name as the lock.
> +
> + pthread_mutex_lock(&ost->mutex_encoder);
> + ost->flag_encoder = 1;
> + pthread_cond_signal(&ost->cond_encoder);
> + pthread_mutex_unlock(&ost->mutex_encoder);
> + }
> + return 0;
> +}
> +/* check for new output on any of the filtergraphs */
> +static int poll_filters(InputStream *ist)
> +{
> + AVFilterBufferRef *picref;
> + AVFrame *filtered_frame = NULL;
> + int i, ret, ret_all;
> + unsigned nb_success, nb_eof;
> + int64_t frame_pts;
> +
> + if (ist->st->codec->codec_type != AVMEDIA_TYPE_AUDIO && ist->st->codec->codec_type != AVMEDIA_TYPE_VIDEO)
> + return 0;
> +
> + ret_all = nb_success = nb_eof = 0;
> + for (i = 0; i < nb_output_streams; i++) {
> + OutputStream *ost = output_streams[i];
> + if (ist->st->codec->codec_type == ost->st->codec->codec_type) {
> + pthread_mutex_lock(&ost->mutex_encoder);
> + while (!ost->flag_encoder)
> + pthread_cond_wait(&ost->cond_encoder, &ost->mutex_encoder);
> + pthread_mutex_unlock(&ost->mutex_encoder);
> + ost->flag_encoder = 0;
> + if (!ost->avfilter_ret) {
> + nb_success++;
> + } else if (ost->avfilter_ret == AVERROR_EOF) {
> + nb_eof++;
> + } else if (ost->avfilter_ret != AVERROR(EAGAIN)) {
> + char buf[256];
> + av_strerror(ost->avfilter_ret, buf, sizeof(buf));
> + av_log(NULL, AV_LOG_WARNING,
> + "Error in request_frame(): %s\n", buf);
> + ret_all = ost->avfilter_ret;
> + }
> + }
> + }
> + if (!nb_success)
> + return nb_eof == ist->nb_filters ? AVERROR_EOF : ret_all;
> +
> + /* Reap all buffers present in the buffer sinks */
> + for (i = 0; i < nb_output_streams; i++) {
> + OutputStream *ost = output_streams[i];
> + OutputFile *of = output_files[ost->file_index];
> + int ret = 0;
> +
> + if (!ost->filter)
> + continue;
> +
> + if (!ost->filtered_frame && !(ost->filtered_frame = avcodec_alloc_frame())) {
> + return AVERROR(ENOMEM);
> + } else
> + avcodec_get_frame_defaults(ost->filtered_frame);
> + filtered_frame = ost->filtered_frame;
> +
> + while (!ost->is_past_recording_time) {
> + if (ost->enc->type == AVMEDIA_TYPE_AUDIO &&
> + !(ost->enc->capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
> + ret = av_buffersink_read_samples(ost->filter->filter, &picref,
> + ost->st->codec->frame_size);
> + else
> +#ifdef SINKA
> + ret = av_buffersink_read(ost->filter->filter, &picref);
> +#else
> + ret = av_buffersink_get_buffer_ref(ost->filter->filter, &picref,
> + AV_BUFFERSINK_FLAG_NO_REQUEST);
> +#endif
> + if (ret < 0) {
> + if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) {
> + char buf[256];
> + av_strerror(ret, buf, sizeof(buf));
> + av_log(NULL, AV_LOG_WARNING,
> + "Error in av_buffersink_get_buffer_ref(): %s\n", buf);
> + }
> + break;
> + }
> + frame_pts = AV_NOPTS_VALUE;
> + if (picref->pts != AV_NOPTS_VALUE) {
> + filtered_frame->pts = frame_pts = av_rescale_q(picref->pts,
> + ost->filter->filter->inputs[0]->time_base,
> + ost->st->codec->time_base) -
> + av_rescale_q(of->start_time,
> + AV_TIME_BASE_Q,
> + ost->st->codec->time_base);
> +
> + if (of->start_time && filtered_frame->pts < 0) {
> + avfilter_unref_buffer(picref);
> + continue;
> + }
> + }
> + //if (ost->source_index >= 0)
> + // *filtered_frame= *input_streams[ost->source_index]->decoded_frame; //for me_threshold
> +
> +
> + switch (ost->filter->filter->inputs[0]->type) {
> + case AVMEDIA_TYPE_VIDEO:
> + avfilter_fill_frame_from_video_buffer_ref(filtered_frame, picref);
> + filtered_frame->pts = frame_pts;
> + if (!ost->frame_aspect_ratio)
> + ost->st->codec->sample_aspect_ratio = picref->video->sample_aspect_ratio;
> +
> + do_video_out(of->ctx, ost, filtered_frame,
> + same_quant ? ost->last_quality :
> + ost->st->codec->global_quality);
> + break;
> + case AVMEDIA_TYPE_AUDIO:
> + avfilter_copy_buf_props(filtered_frame, picref);
> + filtered_frame->pts = frame_pts;
> + do_audio_out(of->ctx, ost, filtered_frame);
> + break;
> + default:
> + // TODO support subtitle filters
> + av_assert0(0);
> + }
> +
> + avfilter_unref_buffer(picref);
> + }
> + }
> + return nb_eof == ist->nb_filters ? AVERROR_EOF : ret_all;
> +}
Rewriting the whole body of poll_filters() seems wrong: one of the versions
will unavoidably bitrot, i.e. not get the bugfixes that the other version
will receive.
> +#else
> /* check for new output on any of the filtergraphs */
> static int poll_filters(void)
> {
> @@ -2004,7 +2154,7 @@ static int poll_filters(void)
> }
> return nb_eof == nb_filtergraphs ? AVERROR_EOF : ret_all;
> }
> -
> +#endif
> static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time)
> {
> char buf[1024];
> @@ -2351,6 +2501,17 @@ static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output)
> }
>
> if (!*got_output) {
> +#if CONFIG_AV_FILTER_THREADS
> + for (i = 0; i < nb_output_streams; i++) {
> + OutputStream *ost = output_streams[i];
> + if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO) {
> + pthread_mutex_lock(&ost->mutex_avfilter);
> + ost->flag_avfilter = 1;
> + pthread_cond_signal(&ost->cond_avfilter);
> + pthread_mutex_unlock(&ost->mutex_avfilter);
> + }
> + }
> +#endif
> /* no audio frame */
> if (!pkt->size)
> for (i = 0; i < ist->nb_filters; i++)
> @@ -2424,6 +2585,17 @@ static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output)
>
> for (i = 0; i < ist->nb_filters; i++)
> av_buffersrc_add_frame(ist->filters[i]->filter, decoded_frame, 0);
> +#if CONFIG_AV_FILTER_THREADS
> + for (i = 0; i < nb_output_streams; i++) {
> + OutputStream *ost = output_streams[i];
> + if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO) {
> + pthread_mutex_lock(&ost->mutex_avfilter);
> + ost->flag_avfilter = 1;
> + pthread_cond_signal(&ost->cond_avfilter);
> + pthread_mutex_unlock(&ost->mutex_avfilter);
> + }
> + }
> +#endif
>
> return ret;
> }
> @@ -2454,6 +2626,17 @@ static int decode_video(InputStream *ist, AVPacket *pkt, int *got_output)
> quality = same_quant ? decoded_frame->quality : 0;
> if (!*got_output) {
> /* no picture yet */
> +#if CONFIG_AV_FILTER_THREADS
> + for (i = 0; i < nb_output_streams; i++) {
> + OutputStream *ost = output_streams[i];
> + if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO) {
> + pthread_mutex_lock(&ost->mutex_avfilter);
> + ost->flag_avfilter = 1;
> + pthread_cond_signal(&ost->cond_avfilter);
> + pthread_mutex_unlock(&ost->mutex_avfilter);
> + }
> + }
> +#endif
> if (!pkt->size)
> for (i = 0; i < ist->nb_filters; i++)
> av_buffersrc_add_ref(ist->filters[i]->filter, NULL, AV_BUFFERSRC_FLAG_NO_COPY);
> @@ -2532,7 +2715,17 @@ static int decode_video(InputStream *ist, AVPacket *pkt, int *got_output)
> }
>
> }
> -
> +#if CONFIG_AV_FILTER_THREADS
> + for (i = 0; i < nb_output_streams; i++) {
> + OutputStream *ost = output_streams[i];
> + if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO) {
> + pthread_mutex_lock(&ost->mutex_avfilter);
> + ost->flag_avfilter = 1;
> + pthread_cond_signal(&ost->cond_avfilter);
> + pthread_mutex_unlock(&ost->mutex_avfilter);
> + }
> + }
> +#endif
> av_free(buffer_to_free);
> return ret;
> }
> @@ -2791,7 +2984,9 @@ static int transcode_init(void)
> for (i = 0; i < nb_filtergraphs; i++)
> if ((ret = avfilter_graph_config(filtergraphs[i]->graph, NULL)) < 0)
> return ret;
> -
> +#if CONFIG_AV_FILTER_THREADS
> + int graph_id = 0;
> +#endif
> /* for each output stream, we compute the right encoding parameters */
> for (i = 0; i < nb_output_streams; i++) {
> ost = output_streams[i];
> @@ -2954,6 +3149,34 @@ static int transcode_init(void)
> av_log(NULL, AV_LOG_FATAL, "Error opening filters!\n");
> exit(1);
> }
> +#if CONFIG_AV_FILTER_THREADS
> + if (pthread_mutex_init(&ost->mutex_avfilter, NULL)) {
> + av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing mutex for avfilter\n");
> + exit(1);
> + }
> + if (pthread_cond_init(&ost->cond_avfilter, NULL)) {
> + av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing condition for avfilter\n");
> + exit(1);
> + }
> + ost->flag_avfilter = 0;
> +
> + if (pthread_mutex_init(&ost->mutex_encoder, NULL)) {
> + av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing mutex for encoder\n");
> + exit(1);
> + }
> + if (pthread_cond_init(&ost->cond_encoder, NULL)) {
> + av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing condition for avfilter\n");
> + exit(1);
> + }
> + ost->flag_encoder = 0;
> +
> + ost->avfilter_thread_alive = 1;
> + if (pthread_create(&ost->avfilter_thread, NULL, avfilter_frame, (void *)ost)) {
> + av_log(NULL, AV_LOG_FATAL, "Unsuccessful in creating thread ost->stream_thread\n");
> + exit(1);
> + }
> + ost->graph_id = graph_id++;
> +#endif
> }
>
> switch (codec->codec_type) {
> @@ -3611,7 +3834,11 @@ static int transcode(void)
>
> // fprintf(stderr,"read #%d.%d size=%d\n", ist->file_index, ist->st->index, pkt.size);
> if ((ret = output_packet(ist, &pkt)) < 0 ||
> +#if CONFIG_AV_FILTER_THREADS
> + ((ret = poll_filters(ist)) < 0 && ret != AVERROR_EOF)) {
> +#else
> ((ret = poll_filters()) < 0 && ret != AVERROR_EOF)) {
> +#endif
> char buf[128];
> av_strerror(ret, buf, sizeof(buf));
> av_log(NULL, AV_LOG_ERROR, "Error while decoding stream #%d:%d: %s\n",
> @@ -3638,8 +3865,13 @@ static int transcode(void)
> if (!input_files[ist->file_index]->eof_reached && ist->decoding_needed) {
> output_packet(ist, NULL);
> }
> +#if CONFIG_AV_FILTER_THREADS
> + poll_filters(ist);
> +#endif
> }
> +#if !(CONFIG_AV_FILTER_THREADS)
> poll_filters();
> +#endif
> flush_encoders();
>
> term_exit();
> @@ -3659,6 +3891,19 @@ static int transcode(void)
> if (ost->encoding_needed) {
> av_freep(&ost->st->codec->stats_in);
> avcodec_close(ost->st->codec);
> +#if CONFIG_AV_FILTER_THREADS
> + if (ost->filter && (ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO || ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO)) {
> + int ret;
> + ost->avfilter_thread_alive = 0;
> + pthread_mutex_lock(&ost->mutex_avfilter);
> + ost->flag_avfilter = 1;
> + pthread_cond_signal(&ost->cond_avfilter);
> + pthread_mutex_unlock(&ost->mutex_avfilter);
> + ret = pthread_join(ost->avfilter_thread, NULL);
> + if (ret)
> + av_log(NULL, AV_LOG_FATAL, "Error %d in Joining thread of %d th stream\n", ret, i);
> + }
> +#endif
> }
> }
>
I must say, I am quite confused by your design, I am not sure what thread is
responsible for what. As far as I understand, the operations on buffersrc
and buffersink stay in the main thread while the actual processing,
triggered by avfilter_graph_request_oldest, is done in a separate thread.
But I am also confused since you seem to be creating a thread for each
output stream, while several output streams can belong to the same filter
graph.
You must remember that libavfilter as a whole (and buffersrc and buffersink
in particular) is not thread-safe, so having operations on buffersrc and
buffersink in one thread and requests on another is not safe.
I believe you could achieve something much simpler if you try to make
buffersrc and buffersink thread-safe and able to synchronize. Something like
that would probably work:
av_buffersrc_set_thread_sync(buffersrc, sync_object);
where sync_object is more or less a pair (mutex,condition). For buffersink,
it would be slightly more tricky because the return value of request_frame
must be serialized together with the bufref themselves, but that is doable.
If this is done, then the changes in ffmpeg should amount only to starting a
thread per filter graph that repeatedly calls avfilter_graph_request_oldest.
Of course, this is only advice based on the ideas I had during the time I
thought about this patch. You are entirely free to disregard it entirely.
All I can say is that this patch is too complex for me to understand, so I
can not take the responsibility to approve it.
Regards,
--
Nicolas George
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 198 bytes
Desc: Digital signature
URL: <http://ffmpeg.org/pipermail/ffmpeg-devel/attachments/20120726/84c38df6/attachment.asc>
More information about the ffmpeg-devel
mailing list