[FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes

Andreas Rheinhardt andreas.rheinhardt at outlook.com
Wed Jan 24 21:35:48 EET 2024


Anton Khirnov:
> Use 8 packets/frames by default rather than 1, which seems to provide
> better throughput.
> 
> Allow -thread_queue_size to set the muxer queue size manually again.
> ---
>  fftools/ffmpeg_mux.h                             |  2 --
>  fftools/ffmpeg_mux_init.c                        |  3 +--
>  fftools/ffmpeg_opt.c                             |  2 +-
>  fftools/ffmpeg_sched.c                           | 15 ++++++++++-----
>  fftools/ffmpeg_sched.h                           |  4 +++-
>  tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat |  5 -----
>  6 files changed, 15 insertions(+), 16 deletions(-)
> 
> diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
> index d0be8a51ea..e1b44142cf 100644
> --- a/fftools/ffmpeg_mux.h
> +++ b/fftools/ffmpeg_mux.h
> @@ -94,8 +94,6 @@ typedef struct Muxer {
>  
>      AVDictionary *opts;
>  
> -    int thread_queue_size;
> -
>      /* filesize limit expressed in bytes */
>      int64_t limit_filesize;
>      atomic_int_least64_t last_filesize;
> diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
> index 6b5e4f8b3c..8ada837555 100644
> --- a/fftools/ffmpeg_mux_init.c
> +++ b/fftools/ffmpeg_mux_init.c
> @@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
>      of->start_time     = o->start_time;
>      of->shortest       = o->shortest;
>  
> -    mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
>      mux->limit_filesize    = o->limit_filesize;
>      av_dict_copy(&mux->opts, o->g->format_opts, 0);
>  
> @@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
>      }
>  
>      err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
> -                      !strcmp(oc->oformat->name, "rtp"));
> +                      !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
>      if (err < 0)
>          return err;
>      mux->sch     = sch;
> diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
> index 304c493dcf..7505b0cf90 100644
> --- a/fftools/ffmpeg_opt.c
> +++ b/fftools/ffmpeg_opt.c
> @@ -144,7 +144,7 @@ static void init_options(OptionsContext *o)
>      o->limit_filesize = INT64_MAX;
>      o->chapters_input_file = INT_MAX;
>      o->accurate_seek  = 1;
> -    o->thread_queue_size = -1;
> +    o->thread_queue_size = 0;
>      o->input_sync_ref = -1;
>      o->find_stream_info = 1;
>      o->shortest_buf_duration = 10.f;
> diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
> index 4fc5a33941..62a40c6057 100644
> --- a/fftools/ffmpeg_sched.c
> +++ b/fftools/ffmpeg_sched.c
> @@ -218,6 +218,7 @@ typedef struct SchMux {
>       */
>      atomic_int          mux_started;
>      ThreadQueue        *queue;
> +    unsigned            queue_size;
>  
>      AVPacket           *sub_heartbeat_pkt;
>  } SchMux;
> @@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
>      ThreadQueue *tq;
>      ObjPool *op;
>  
> +    queue_size = queue_size > 0 ? queue_size : 8;
> +
>      op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
>                                     objpool_alloc_frames();
>      if (!op)
> @@ -653,7 +656,7 @@ static const AVClass sch_mux_class = {
>  };
>  
>  int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> -                void *arg, int sdp_auto)
> +                void *arg, int sdp_auto, unsigned thread_queue_size)
>  {
>      const unsigned idx = sch->nb_mux;
>  
> @@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
>      mux             = &sch->mux[idx];
>      mux->class      = &sch_mux_class;
>      mux->init       = init;
> +    mux->queue_size = thread_queue_size;
>  
>      task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
>  
> @@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
>      if (!dec->send_frame)
>          return AVERROR(ENOMEM);
>  
> -    ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
> +    ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
>      if (ret < 0)
>          return ret;
>  
> @@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
>  
>      task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
>  
> -    ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
> +    ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
>      if (ret < 0)
>          return ret;
>  
> @@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
>      if (ret < 0)
>          return ret;
>  
> -    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
> +    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
>      if (ret < 0)
>          return ret;
>  
> @@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch)
>              }
>          }
>  
> -        ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
> +        ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
> +                          QUEUE_PACKETS);
>          if (ret < 0)
>              return ret;
>  
> diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
> index b167d8d158..d12affa69d 100644
> --- a/fftools/ffmpeg_sched.h
> +++ b/fftools/ffmpeg_sched.h
> @@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
>   *             streams in the muxer.
>   * @param ctx Muxer state; will be passed to func/init and used for logging.
>   * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
> + * @param thread_queue_size number of packets that can be buffered before
> + *                          sending to the muxer blocks
>   *
>   * @retval ">=0" Index of the newly-created muxer.
>   * @retval "<0"  Error code.
>   */
>  int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> -                void *ctx, int sdp_auto);
> +                void *ctx, int sdp_auto, unsigned thread_queue_size);
>  /**
>   * Add a muxed stream for a previously added muxer.
>   *
> diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> index bc9b833799..3a3ec96637 100644
> --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> @@ -33,8 +33,3 @@
>  <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>  >> Safety remains our numb</font>
>  
> -9
> -00:00:03,704 --> 00:00:04,004
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> ->> Safety remains our number one</font>
> -

Why does the output of this test change?

- Andreas




More information about the ffmpeg-devel mailing list