[FFmpeg-devel] [PATCH 1/2] Audio Video Filtering using threads & semaphores

Manjunath Siddaiah msiddaiah at rgbnetworks.com
Thu Jul 26 18:28:42 CEST 2012


Thanks for reviewing my changes.
Here are my responses to your questions.

1. As far as I understand, you intend to run the whole filtering process in a different thread from the ffmpeg command-line tool. Is that right?
Answer: yes

2.Some comments about what the variables mean, and their relationships (who is locking what) would be useful.
Answer:Will add the comments to the variables appropriately.

3. It looks like this function is used as argument to pthread_create. The prototype is wrong, and gratuitously so since the return value is meaningless. Did your compiler not produce a warning for that?
Answer:I will confirm about that, if there is a warning I will change the proto-type.

4. ost->avfilter_thread_alive is shared: theoretically it must be protected by a mutex.
Answer:   Yes theoretically should be protected. But Ffmpeg thread will use this variable only at the start of transcode and at the end of transcode. Rest of the time it is fully owned by the filter thread.
   
5.  It looks wrong: you are doing the expensive stuff (avfilter_graph_request_oldest is what triggers most of the work) while holding the lock, and you release the lock just before you alter the variable     with the same name as the lock.
Answer: one wrong thing here is, alter the variable after mutex unlocking. I will change this.  Yes avfilter_graph_request_oldest is the expensive stuff, that's why it is put on thread to get the filtered frame. 

6. Rewriting the whole body of poll_filters() seems wrong: one of the versions will unavoidably bitrot, i.e. not get the bugfixes that the other version will receive.
Answer: I can rewrite my changes in a single function. While developing the algorithm, I retained the original stuff to understand the code.

7. I must say, I am quite confused by your design, I am not sure what thread is responsible for what. As far as I understand, the operations on buffersrc and buffersink stay in the main thread while the actual processing, triggered by avfilter_graph_request_oldest, is done in a separate thread.
But I am also confused since you seem to be creating a thread for each output stream, while several output streams can belong to the same filter graph.
Answer: ffmpeg main thread handles decode and encode function  and filtering thread will handle filtering the frame.  I do not understand your second statement here. As far as I know we have distinguishable filter-graph for each output stream.

8. You must remember that libavfilter as a whole (and buffersrc and buffersink in particular) is not thread-safe, so having operations on buffersrc and buffersink in one thread and requests on another is not safe.
I believe you could achieve something much simpler if you try to make buffersrc and buffersink thread-safe and able to synchronize. Something like that would probably work:av_buffersrc_set_thread_sync(buffersrc, sync_object);
where sync_object is more or less a pair (mutex,condition). For buffersink, it would be slightly more tricky because the return value of request_frame must be serialized together with the bufref themselves, but that is doable.
If this is done, then the changes in ffmpeg should amount only to starting a thread per filter graph that repeatedly calls avfilter_graph_request_oldest.
Answer: I have tested thoroughly, and not facing any problem. 

9.Of course, this is only advice based on the ideas I had during the time I thought about this patch. You are entirely free to disregard it entirely.
All I can say is that this patch is too complex for me to understand, so I can not take the responsibility to approve it.
Answer: with the above responses, revisit and let me know your further comments.



-----Original Message-----
From: ffmpeg-devel-bounces at ffmpeg.org [mailto:ffmpeg-devel-bounces at ffmpeg.org] On Behalf Of Nicolas George
Sent: Thursday, July 26, 2012 10:01 AM
To: FFmpeg development discussions and patches
Subject: Re: [FFmpeg-devel] [PATCH 1/2] Audio Video Filtering using threads & semaphores

Le tridi 3 messidor, an CCXX, Manjunath Siddaiah a écrit :
> Ok, semaphores are replaced by POSIX mutexes and condition variables.
> This time only changes are in ffmpeg.c and no configure file changes 
> and remains the same.

Thanks for the patch. I am afraid that with all the time that has passed, it will need to be rebased and adapted. In particular, the logic of the poll_filter function has been altered. Sorry for the delay.

Here are my comments.

As far as I understand, you intend to run the whole filtering process in a different thread from the ffmpeg command-line tool. Is that right?

> diff --git a/ffmpeg.c b/ffmpeg.c
> old mode 100644
> new mode 100755
> index 17fe6e5..bcf23ca
> --- a/ffmpeg.c
> +++ b/ffmpeg.c
> @@ -327,6 +327,18 @@ typedef struct OutputStream {
>      int copy_initial_nonkeyframes;
>  
>      int keep_pix_fmt;
> +#if CONFIG_AV_FILTER_THREADS
> +    int avfilter_thread_alive;
> +    pthread_t avfilter_thread;
> +    pthread_mutex_t mutex_avfilter;
> +    pthread_cond_t cond_avfilter;
> +    int flag_avfilter;
> +    pthread_mutex_t mutex_encoder;
> +    pthread_cond_t cond_encoder;
> +    int flag_encoder;
> +    int graph_id;
> +    int avfilter_ret;
> +#endif

Some comments about what the variables mean, and their relationships (who is locking what) would be useful.

>  } OutputStream;
>  
>  
> @@ -1894,6 +1906,144 @@ static void do_video_stats(AVFormatContext *os, OutputStream *ost,
>      }
>  }
>  
> +#if CONFIG_AV_FILTER_THREADS
> +static int avfilter_frame(void *p)

It looks like this function is used as argument to pthread_create. The prototype is wrong, and gratuitously so since the return value is meaningless. Did your compiler not produce a warning for that?

> +{
> +    OutputStream *ost = (OutputStream *)p;
> +    while (ost->avfilter_thread_alive) {

ost->avfilter_thread_alive is shared: theoretically it must be protected 
ost->by
a mutex.

> +        pthread_mutex_lock(&ost->mutex_avfilter);
> +        while (!ost->flag_avfilter)
> +            pthread_cond_wait(&ost->cond_avfilter, 
> + &ost->mutex_avfilter);
> +
> +        ost->avfilter_ret = avfilter_graph_request_oldest(filtergraphs[ost->graph_id]->graph);
> +        pthread_mutex_unlock(&ost->mutex_avfilter);
> +        ost->flag_avfilter = 0;

It looks wrong: you are doing the expensive stuff (avfilter_graph_request_oldest is what triggers most of the work) while holding the lock, and you release the lock just before you alter the variable with the same name as the lock.

> +        
> +        pthread_mutex_lock(&ost->mutex_encoder);
> +        ost->flag_encoder = 1;
> +        pthread_cond_signal(&ost->cond_encoder);
> +        pthread_mutex_unlock(&ost->mutex_encoder);
> +    }
> +    return 0;
> +}
> +/* check for new output on any of the filtergraphs */ static int 
> +poll_filters(InputStream *ist) {
> +    AVFilterBufferRef *picref;
> +    AVFrame *filtered_frame = NULL;
> +    int i, ret, ret_all;
> +    unsigned nb_success, nb_eof;
> +    int64_t frame_pts;
> +
> +    if (ist->st->codec->codec_type != AVMEDIA_TYPE_AUDIO && ist->st->codec->codec_type != AVMEDIA_TYPE_VIDEO)
> +        return 0;
> +
> +    ret_all = nb_success = nb_eof = 0;
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ist->st->codec->codec_type == ost->st->codec->codec_type) {
> +            pthread_mutex_lock(&ost->mutex_encoder);
> +            while (!ost->flag_encoder)
> +                pthread_cond_wait(&ost->cond_encoder, &ost->mutex_encoder);
> +            pthread_mutex_unlock(&ost->mutex_encoder);
> +            ost->flag_encoder = 0;
> +            if (!ost->avfilter_ret) {
> +                nb_success++;
> +            } else if (ost->avfilter_ret == AVERROR_EOF) {
> +                nb_eof++;
> +            } else if (ost->avfilter_ret != AVERROR(EAGAIN)) {
> +                char buf[256];
> +                av_strerror(ost->avfilter_ret, buf, sizeof(buf));
> +                av_log(NULL, AV_LOG_WARNING,
> +                       "Error in request_frame(): %s\n", buf);
> +                ret_all = ost->avfilter_ret;
> +            }
> +        }
> +    }
> +    if (!nb_success)
> +        return nb_eof == ist->nb_filters ? AVERROR_EOF : ret_all;
> +
> +    /* Reap all buffers present in the buffer sinks */
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        OutputFile    *of = output_files[ost->file_index];
> +        int ret = 0;
> +
> +        if (!ost->filter)
> +            continue;
> +
> +        if (!ost->filtered_frame && !(ost->filtered_frame = avcodec_alloc_frame())) {
> +            return AVERROR(ENOMEM);
> +        } else
> +            avcodec_get_frame_defaults(ost->filtered_frame);
> +        filtered_frame = ost->filtered_frame;
> +
> +        while (!ost->is_past_recording_time) {
> +            if (ost->enc->type == AVMEDIA_TYPE_AUDIO &&
> +                !(ost->enc->capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
> +                ret = av_buffersink_read_samples(ost->filter->filter, &picref,
> +                                                ost->st->codec->frame_size);
> +            else
> +#ifdef SINKA
> +                ret = av_buffersink_read(ost->filter->filter, 
> +&picref); #else
> +                ret = av_buffersink_get_buffer_ref(ost->filter->filter, &picref,
> +                                                   
> +AV_BUFFERSINK_FLAG_NO_REQUEST); #endif
> +            if (ret < 0) {
> +                if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) {
> +                    char buf[256];
> +                    av_strerror(ret, buf, sizeof(buf));
> +                    av_log(NULL, AV_LOG_WARNING,
> +                           "Error in av_buffersink_get_buffer_ref(): %s\n", buf);
> +                }
> +                break;
> +            }
> +            frame_pts = AV_NOPTS_VALUE;
> +            if (picref->pts != AV_NOPTS_VALUE) {
> +                filtered_frame->pts = frame_pts = av_rescale_q(picref->pts,
> +                                                ost->filter->filter->inputs[0]->time_base,
> +                                                ost->st->codec->time_base) -
> +                                    av_rescale_q(of->start_time,
> +                                                AV_TIME_BASE_Q,
> +                                                
> +ost->st->codec->time_base);
> +
> +                if (of->start_time && filtered_frame->pts < 0) {
> +                    avfilter_unref_buffer(picref);
> +                    continue;
> +                }
> +            }
> +            //if (ost->source_index >= 0)
> +            //    *filtered_frame= *input_streams[ost->source_index]->decoded_frame; //for me_threshold
> +
> +
> +            switch (ost->filter->filter->inputs[0]->type) {
> +            case AVMEDIA_TYPE_VIDEO:
> +                avfilter_fill_frame_from_video_buffer_ref(filtered_frame, picref);
> +                filtered_frame->pts = frame_pts;
> +                if (!ost->frame_aspect_ratio)
> +                    ost->st->codec->sample_aspect_ratio = 
> + picref->video->sample_aspect_ratio;
> +
> +                do_video_out(of->ctx, ost, filtered_frame,
> +                             same_quant ? ost->last_quality :
> +                                          ost->st->codec->global_quality);
> +                break;
> +            case AVMEDIA_TYPE_AUDIO:
> +                avfilter_copy_buf_props(filtered_frame, picref);
> +                filtered_frame->pts = frame_pts;
> +                do_audio_out(of->ctx, ost, filtered_frame);
> +                break;
> +            default:
> +                // TODO support subtitle filters
> +                av_assert0(0);
> +            }
> +
> +            avfilter_unref_buffer(picref);
> +        }
> +    }
> +    return nb_eof == ist->nb_filters ? AVERROR_EOF : ret_all; }

Rewriting the whole body of poll_filters() seems wrong: one of the versions will unavoidably bitrot, i.e. not get the bugfixes that the other version will receive.

> +#else
>  /* check for new output on any of the filtergraphs */  static int 
> poll_filters(void)  { @@ -2004,7 +2154,7 @@ static int 
> poll_filters(void)
>      }
>      return nb_eof == nb_filtergraphs ? AVERROR_EOF : ret_all;  }
> -
> +#endif
>  static void print_report(int is_last_report, int64_t timer_start, 
> int64_t cur_time)  {
>      char buf[1024];
> @@ -2351,6 +2501,17 @@ static int decode_audio(InputStream *ist, AVPacket *pkt, int *got_output)
>      }
>  
>      if (!*got_output) {
> +#if CONFIG_AV_FILTER_THREADS
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO) {
> +            pthread_mutex_lock(&ost->mutex_avfilter);
> +            ost->flag_avfilter = 1;
> +            pthread_cond_signal(&ost->cond_avfilter);
> +            pthread_mutex_unlock(&ost->mutex_avfilter);
> +        }
> +    }
> +#endif
>          /* no audio frame */
>          if (!pkt->size)
>              for (i = 0; i < ist->nb_filters; i++) @@ -2424,6 +2585,17 
> @@ static int decode_audio(InputStream *ist, AVPacket *pkt, int 
> *got_output)
>  
>      for (i = 0; i < ist->nb_filters; i++)
>          av_buffersrc_add_frame(ist->filters[i]->filter, 
> decoded_frame, 0);
> +#if CONFIG_AV_FILTER_THREADS
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO) {
> +            pthread_mutex_lock(&ost->mutex_avfilter);
> +            ost->flag_avfilter = 1;
> +            pthread_cond_signal(&ost->cond_avfilter);
> +            pthread_mutex_unlock(&ost->mutex_avfilter);
> +        }
> +    }
> +#endif
>  
>      return ret;
>  }
> @@ -2454,6 +2626,17 @@ static int decode_video(InputStream *ist, AVPacket *pkt, int *got_output)
>      quality = same_quant ? decoded_frame->quality : 0;
>      if (!*got_output) {
>          /* no picture yet */
> +#if CONFIG_AV_FILTER_THREADS
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO) {
> +            pthread_mutex_lock(&ost->mutex_avfilter);
> +            ost->flag_avfilter = 1;
> +            pthread_cond_signal(&ost->cond_avfilter);
> +            pthread_mutex_unlock(&ost->mutex_avfilter);
> +        }
> +    }
> +#endif
>          if (!pkt->size)
>              for (i = 0; i < ist->nb_filters; i++)
>                  av_buffersrc_add_ref(ist->filters[i]->filter, NULL, 
> AV_BUFFERSRC_FLAG_NO_COPY); @@ -2532,7 +2715,17 @@ static int decode_video(InputStream *ist, AVPacket *pkt, int *got_output)
>          }
>  
>      }
> -
> +#if CONFIG_AV_FILTER_THREADS
> +    for (i = 0; i < nb_output_streams; i++) {
> +        OutputStream *ost = output_streams[i];
> +        if (ost->filter && ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO) {
> +            pthread_mutex_lock(&ost->mutex_avfilter);
> +            ost->flag_avfilter = 1;
> +            pthread_cond_signal(&ost->cond_avfilter);
> +            pthread_mutex_unlock(&ost->mutex_avfilter);
> +        }
> +    }
> +#endif
>      av_free(buffer_to_free);
>      return ret;
>  }
> @@ -2791,7 +2984,9 @@ static int transcode_init(void)
>      for (i = 0; i < nb_filtergraphs; i++)
>          if ((ret = avfilter_graph_config(filtergraphs[i]->graph, NULL)) < 0)
>              return ret;
> -
> +#if CONFIG_AV_FILTER_THREADS
> +        int graph_id = 0;
> +#endif
>      /* for each output stream, we compute the right encoding parameters */
>      for (i = 0; i < nb_output_streams; i++) {
>          ost = output_streams[i];
> @@ -2954,6 +3149,34 @@ static int transcode_init(void)
>                          av_log(NULL, AV_LOG_FATAL, "Error opening filters!\n");
>                          exit(1);
>                      }
> +#if CONFIG_AV_FILTER_THREADS
> +                    if (pthread_mutex_init(&ost->mutex_avfilter, NULL)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing mutex for avfilter\n");
> +                        exit(1);
> +                    }
> +                    if (pthread_cond_init(&ost->cond_avfilter, NULL)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing condition for avfilter\n");
> +                        exit(1);
> +                    }
> +                    ost->flag_avfilter = 0;
> +
> +                    if (pthread_mutex_init(&ost->mutex_encoder, NULL)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing mutex for encoder\n");
> +                        exit(1);
> +                    }
> +                    if (pthread_cond_init(&ost->cond_encoder, NULL)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in initializing condition for avfilter\n");
> +                        exit(1);
> +                    }
> +                    ost->flag_encoder = 0;
> +
> +                    ost->avfilter_thread_alive = 1;
> +                    if (pthread_create(&ost->avfilter_thread, NULL, avfilter_frame, (void *)ost)) {
> +                        av_log(NULL, AV_LOG_FATAL, "Unsuccessful in creating thread ost->stream_thread\n");
> +                        exit(1);
> +                    }
> +                    ost->graph_id = graph_id++; #endif
>              }
>  
>              switch (codec->codec_type) { @@ -3611,7 +3834,11 @@ 
> static int transcode(void)
>  
>          // fprintf(stderr,"read #%d.%d size=%d\n", ist->file_index, ist->st->index, pkt.size);
>          if ((ret = output_packet(ist, &pkt)) < 0 ||
> +#if CONFIG_AV_FILTER_THREADS
> +            ((ret = poll_filters(ist)) < 0 && ret != AVERROR_EOF)) { 
> +#else
>              ((ret = poll_filters()) < 0 && ret != AVERROR_EOF)) {
> +#endif
>              char buf[128];
>              av_strerror(ret, buf, sizeof(buf));
>              av_log(NULL, AV_LOG_ERROR, "Error while decoding stream 
> #%d:%d: %s\n", @@ -3638,8 +3865,13 @@ static int transcode(void)
>          if (!input_files[ist->file_index]->eof_reached && ist->decoding_needed) {
>              output_packet(ist, NULL);
>          }
> +#if CONFIG_AV_FILTER_THREADS
> +        poll_filters(ist);
> +#endif
>      }
> +#if !(CONFIG_AV_FILTER_THREADS)
>      poll_filters();
> +#endif
>      flush_encoders();
>  
>      term_exit();
> @@ -3659,6 +3891,19 @@ static int transcode(void)
>          if (ost->encoding_needed) {
>              av_freep(&ost->st->codec->stats_in);
>              avcodec_close(ost->st->codec);
> +#if CONFIG_AV_FILTER_THREADS
> +            if (ost->filter && (ost->st->codec->codec_type == AVMEDIA_TYPE_VIDEO || ost->st->codec->codec_type == AVMEDIA_TYPE_AUDIO)) {
> +                int ret;
> +                ost->avfilter_thread_alive = 0;
> +                pthread_mutex_lock(&ost->mutex_avfilter);
> +                ost->flag_avfilter = 1;
> +                pthread_cond_signal(&ost->cond_avfilter);
> +                pthread_mutex_unlock(&ost->mutex_avfilter);
> +                ret = pthread_join(ost->avfilter_thread, NULL);
> +                if (ret)
> +                    av_log(NULL, AV_LOG_FATAL, "Error %d in Joining thread of %d th stream\n", ret, i);
> +            }
> +#endif
>          }
>      }
>  

I must say, I am quite confused by your design, I am not sure what thread is responsible for what. As far as I understand, the operations on buffersrc and buffersink stay in the main thread while the actual processing, triggered by avfilter_graph_request_oldest, is done in a separate thread.
But I am also confused since you seem to be creating a thread for each output stream, while several output streams can belong to the same filter graph.

You must remember that libavfilter as a whole (and buffersrc and buffersink in particular) is not thread-safe, so having operations on buffersrc and buffersink in one thread and requests on another is not safe.

I believe you could achieve something much simpler if you try to make buffersrc and buffersink thread-safe and able to synchronize. Something like that would probably work:

	av_buffersrc_set_thread_sync(buffersrc, sync_object);

where sync_object is more or less a pair (mutex,condition). For buffersink, it would be slightly more tricky because the return value of request_frame must be serialized together with the bufref themselves, but that is doable.

If this is done, then the changes in ffmpeg should amount only to starting a thread per filter graph that repeatedly calls avfilter_graph_request_oldest.

Of course, this is only advice based on the ideas I had during the time I thought about this patch. You are entirely free to disregard it entirely.
All I can say is that this patch is too complex for me to understand, so I can not take the responsibility to approve it.

Regards,

--
  Nicolas George


More information about the ffmpeg-devel mailing list