[FFmpeg-devel] [PATCH 24/24] ffmpeg: switch to scheduler

Anton Khirnov anton at khirnov.net
Sat Nov 4 09:56:33 EET 2023


---
 fftools/ffmpeg.c | 236 +++++------------------------------------------
 fftools/ffmpeg.h |  10 +-
 2 files changed, 25 insertions(+), 221 deletions(-)

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 5d1560b891..aae680f052 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -462,23 +462,13 @@ void update_benchmark(const char *fmt, ...)
     }
 }
 
-void close_output_stream(OutputStream *ost)
-{
-    OutputFile *of = output_files[ost->file_index];
-    ost->finished |= ENCODER_FINISHED;
-
-    if (ost->sq_idx_encode >= 0)
-        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-}
-
-static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time)
+static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time, int64_t pts)
 {
     AVBPrint buf, buf_script;
     int64_t total_size = of_filesize(output_files[0]);
     int vid;
     double bitrate;
     double speed;
-    int64_t pts = AV_NOPTS_VALUE;
     static int64_t last_time = -1;
     static int first_report = 1;
     uint64_t nb_frames_dup = 0, nb_frames_drop = 0;
@@ -533,17 +523,13 @@ static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
 
             vid = 1;
         }
-        /* compute min output value */
-        if (ost->last_mux_dts != AV_NOPTS_VALUE) {
-            if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts)
-                pts = ost->last_mux_dts;
-            if (copy_ts) {
-                if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
-                    copy_ts_first_pts = pts;
-                if (copy_ts_first_pts != AV_NOPTS_VALUE)
-                    pts -= copy_ts_first_pts;
-            }
-        }
+    }
+
+    if (copy_ts) {
+        if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
+            copy_ts_first_pts = pts;
+        if (copy_ts_first_pts != AV_NOPTS_VALUE)
+            pts -= copy_ts_first_pts;
     }
 
     us    = FFABS64U(pts) % AV_TIME_BASE;
@@ -746,19 +732,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
     return 0;
 }
 
-/* pkt = NULL means EOF (needed to flush decoder buffers) */
-static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
-    InputFile *f = input_files[ist->file_index];
-    int ret = 0;
-    int eof_reached = 0;
-
-    if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
-        eof_reached = 1;
-
-    return !eof_reached;
-}
-
 static void print_stream_maps(void)
 {
     av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
@@ -835,41 +808,6 @@ static void print_stream_maps(void)
     }
 }
 
-/**
- * Select the output stream to process.
- *
- * @retval 0 an output stream was selected
- * @retval AVERROR(EAGAIN) need to wait until more input is available
- * @retval AVERROR_EOF no more streams need output
- */
-static int choose_output(OutputStream **post)
-{
-    int64_t opts_min = INT64_MAX;
-    OutputStream *ost_min = NULL;
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        int64_t opts;
-
-        {
-            opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
-                   INT64_MIN : ost->last_mux_dts;
-        }
-
-        if (!ost->initialized && !ost->finished) {
-            ost_min = ost;
-            break;
-        }
-        if (!ost->finished && opts < opts_min) {
-            opts_min = opts;
-            ost_min  = ost;
-        }
-    }
-    if (!ost_min)
-        return AVERROR_EOF;
-    *post = ost_min;
-    return ost_min->unavailable ? AVERROR(EAGAIN) : 0;
-}
-
 static void set_tty_echo(int on)
 {
 #if HAVE_TERMIOS_H
@@ -941,110 +879,21 @@ static int check_keyboard_interaction(int64_t cur_time)
     return 0;
 }
 
-static void reset_eagain(void)
-{
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
-        ost->unavailable = 0;
-}
-
-/*
- * Return
- * - 0 -- one packet was read and processed
- * - AVERROR(EAGAIN) -- no packets were available for selected file,
- *   this function should be called again
- * - AVERROR_EOF -- this function should not be called again
- */
-static int process_input(int file_index, AVPacket *pkt)
-{
-    InputFile *ifile = input_files[file_index];
-    InputStream *ist;
-    int ret, i;
-
-    ret = 0;
-
-    if (ret < 0) {
-        if (ret != AVERROR_EOF) {
-            av_log(ifile, AV_LOG_ERROR,
-                   "Error retrieving a packet from demuxer: %s\n", av_err2str(ret));
-            if (exit_on_error)
-                return ret;
-        }
-
-        for (i = 0; i < ifile->nb_streams; i++) {
-            ist = ifile->streams[i];
-            if (!ist->discard) {
-                ret = process_input_packet(ist, NULL, 0);
-                if (ret>0)
-                    return 0;
-                else if (ret < 0)
-                    return ret;
-            }
-        }
-
-        ifile->eof_reached = 1;
-        return AVERROR(EAGAIN);
-    }
-
-    reset_eagain();
-
-    ist = ifile->streams[pkt->stream_index];
-
-    ret = process_input_packet(ist, pkt, 0);
-
-    av_packet_unref(pkt);
-
-    return ret < 0 ? ret : 0;
-}
-
-/**
- * Run a single step of transcoding.
- *
- * @return  0 for success, <0 for error
- */
-static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
-{
-    InputStream  *ist = NULL;
-    int ret;
-
-    if (ost->filter) {
-        if (!ist)
-            return 0;
-    } else {
-        ist = ost->ist;
-        av_assert0(ist);
-    }
-
-    ret = process_input(ist->file_index, demux_pkt);
-    if (ret == AVERROR(EAGAIN)) {
-        return 0;
-    }
-
-    if (ret < 0)
-        return ret == AVERROR_EOF ? 0 : ret;
-
-    return 0;
-}
-
 /*
  * The following code is the main loop of the file converter
  */
-static int transcode(Scheduler *sch, int *err_rate_exceeded)
+static int transcode(Scheduler *sch)
 {
     int ret = 0, i;
-    InputStream *ist;
-    int64_t timer_start;
-    AVPacket *demux_pkt = NULL;
+    int64_t timer_start, transcode_ts = 0;
 
     print_stream_maps();
 
-    *err_rate_exceeded = 0;
     atomic_store(&transcode_init_done, 1);
 
-    demux_pkt = av_packet_alloc();
-    if (!demux_pkt) {
-        ret = AVERROR(ENOMEM);
-        goto fail;
-    }
+    ret = sch_start(sch);
+    if (ret < 0)
+        return ret;
 
     if (stdin_interaction) {
         av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
@@ -1052,8 +901,7 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
 
     timer_start = av_gettime_relative();
 
-    while (!received_sigterm) {
-        OutputStream *ost;
+    while (!sch_wait(sch, stats_period, &transcode_ts)) {
         int64_t cur_time= av_gettime_relative();
 
         /* if 'q' pressed, exits */
@@ -1061,48 +909,11 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
             if (check_keyboard_interaction(cur_time) < 0)
                 break;
 
-        ret = choose_output(&ost);
-        if (ret == AVERROR(EAGAIN)) {
-            reset_eagain();
-            av_usleep(10000);
-            ret = 0;
-            continue;
-        } else if (ret < 0) {
-            av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write to, finishing.\n");
-            ret = 0;
-            break;
-        }
-
-        ret = transcode_step(ost, demux_pkt);
-        if (ret < 0 && ret != AVERROR_EOF) {
-            av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
-            break;
-        }
-
         /* dump report by using the output first video and audio streams */
-        print_report(0, timer_start, cur_time);
+        print_report(0, timer_start, cur_time, transcode_ts);
     }
 
-    /* at the end of stream, we must flush the decoder buffers */
-    for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) {
-        float err_rate;
-
-        if (!input_files[ist->file_index]->eof_reached) {
-            int err = process_input_packet(ist, NULL, 0);
-            ret = err_merge(ret, err);
-        }
-
-        err_rate = (ist->frames_decoded || ist->decode_errors) ?
-                   ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
-        if (err_rate > max_error_rate) {
-            av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
-                   err_rate, max_error_rate);
-            *err_rate_exceeded = 1;
-        } else if (err_rate)
-            av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
-    }
-
-    term_exit();
+    ret = sch_stop(sch);
 
     /* write the trailer if needed */
     for (i = 0; i < nb_output_files; i++) {
@@ -1110,11 +921,10 @@ static int transcode(Scheduler *sch, int *err_rate_exceeded)
         ret = err_merge(ret, err);
     }
 
-    /* dump report by using the first video and audio streams */
-    print_report(1, timer_start, av_gettime_relative());
+    term_exit();
 
-fail:
-    av_packet_free(&demux_pkt);
+    /* dump report by using the first video and audio streams */
+    print_report(1, timer_start, av_gettime_relative(), transcode_ts);
 
     return ret;
 }
@@ -1167,7 +977,7 @@ int main(int argc, char **argv)
 {
     Scheduler *sch = NULL;
 
-    int ret, err_rate_exceeded;
+    int ret;
     BenchmarkTimeStamps ti;
 
     init_dynload();
@@ -1209,7 +1019,7 @@ int main(int argc, char **argv)
     }
 
     current_time = ti = get_benchmark_time_stamps();
-    ret = transcode(sch, &err_rate_exceeded);
+    ret = transcode(sch);
     if (ret >= 0 && do_benchmark) {
         int64_t utime, stime, rtime;
         current_time = get_benchmark_time_stamps();
@@ -1221,8 +1031,8 @@ int main(int argc, char **argv)
                utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0);
     }
 
-    ret = received_nb_signals ? 255 :
-          err_rate_exceeded   ?  69 : ret;
+    ret = received_nb_signals                 ? 255 :
+          (ret == FFMPEG_ERROR_RATE_EXCEEDED) ?  69 : ret;
 
 finish:
     if (ret == AVERROR_EXIT)
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index afc4496bd6..c726e80751 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -526,6 +526,8 @@ typedef struct OutputStream {
     InputStream *ist;
 
     AVStream *st;            /* stream in the output file */
+
+    // XXX: remove
     /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */
     int64_t last_mux_dts;
 
@@ -572,13 +574,6 @@ typedef struct OutputStream {
     AVDictionary *sws_dict;
     AVDictionary *swr_opts;
     char *apad;
-    OSTFinished finished;        /* no more packets should be written for this stream */
-    int unavailable;                     /* true if the steram is unavailable (possibly temporarily) */
-
-    // init_output_stream() has been called for this stream
-    // The encoder and the bitstream filters have been initialized and the stream
-    // parameters are set in the AVStream.
-    int initialized;
 
     const char *attachment_filename;
 
@@ -810,7 +805,6 @@ InputStream *ist_iter(InputStream *prev);
  * pass NULL to start iteration */
 OutputStream *ost_iter(OutputStream *prev);
 
-void close_output_stream(OutputStream *ost);
 void update_benchmark(const char *fmt, ...);
 
 #define SPECIFIER_OPT_FMT_str  "%s"
-- 
2.42.0



More information about the ffmpeg-devel mailing list