[FFmpeg-devel] [PATCH 49/49] fftools/ffmpeg: move each muxer to a separate thread

Anton Khirnov anton at khirnov.net
Mon Apr 4 14:30:37 EEST 2022


---
 fftools/ffmpeg.c     |  38 +++------
 fftools/ffmpeg.h     |   7 +-
 fftools/ffmpeg_mux.c | 197 +++++++++++++++++++++++++++++++++++--------
 3 files changed, 178 insertions(+), 64 deletions(-)

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 9dfbc4216a..8ea27d3422 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -1286,10 +1286,7 @@ static void finish_output_stream(OutputStream *ost)
     OutputFile *of = output_files[ost->file_index];
     ost->finished = ENCODER_FINISHED;
 
-    if (ost->sq_idx_mux >= 0)
-        sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL));
-    else
-        ost->finished |= MUXER_FINISHED;
+    output_packet(of, ost->pkt, ost, 1);
 }
 
 /**
@@ -3421,9 +3418,8 @@ static int need_output(void)
 
     for (i = 0; i < nb_output_streams; i++) {
         OutputStream *ost    = output_streams[i];
-        OutputFile *of       = output_files[ost->file_index];
 
-        if (ost->finished || of_finished(of))
+        if (ost->finished)
             continue;
 
         return 1;
@@ -4269,26 +4265,6 @@ static int transcode_step(void)
     return reap_filters(0);
 }
 
-static void flush_sync_queues_mux(void)
-{
-    /* mark all queue inputs as done */
-    for (int i = 0; i < nb_output_streams; i++) {
-        OutputStream   *ost = output_streams[i];
-        OutputFile      *of = output_files[ost->file_index];
-        if (ost->sq_idx_mux >= 0)
-            sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL));
-    }
-
-    /* encode all packets remaining in the sync queues */
-    for (int i = 0; i < nb_output_streams; i++) {
-        OutputStream   *ost = output_streams[i];
-        OutputFile      *of = output_files[ost->file_index];
-
-        if (!(ost->finished & MUXER_FINISHED))
-            output_packet(of, ost->pkt, ost, 1);
-    }
-}
-
 /*
  * The following code is the main loop of the file converter
  */
@@ -4310,6 +4286,12 @@ static int transcode(void)
 
     timer_start = av_gettime_relative();
 
+    for (i = 0; i < nb_output_files; i++) {
+        ret = of_thread_start(output_files[i]);
+        if (ret < 0)
+            goto fail;
+    }
+
     if ((ret = init_input_threads()) < 0)
         goto fail;
 
@@ -4346,7 +4328,9 @@ static int transcode(void)
         }
     }
     flush_encoders();
-    flush_sync_queues_mux();
+
+    for (i = 0; i < nb_output_files; i++)
+        of_thread_stop(output_files[i]);
 
     term_exit();
 
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 407342462f..c4a5c2a0a2 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -583,6 +583,8 @@ typedef struct OutputFile {
     const AVOutputFormat *format;
     const char           *url;
 
+    AVThreadMessageQueue *mux_queue;
+
     SyncQueue *sq_encode;
     SyncQueue *sq_mux;
 
@@ -697,11 +699,14 @@ int hwaccel_decode_init(AVCodecContext *avctx);
 
 int of_muxer_init(OutputFile *of, AVFormatContext *fc,
                   AVDictionary *opts, int64_t limit_filesize);
+
+int of_thread_start(OutputFile *of);
+void of_thread_stop(OutputFile *of);
+
 int of_write_trailer(OutputFile *of);
 void of_close(OutputFile **pof);
 
 int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof);
-int of_finished(OutputFile *of);
 int64_t of_filesize(OutputFile *of);
 AVChapter * const *
 of_get_chapters(OutputFile *of, unsigned int *nb_chapters);
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 6ca9a51dd6..f99dd5ec3e 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -16,17 +16,20 @@
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  */
 
+#include <stdatomic.h>
 #include <stdio.h>
 #include <string.h>
 
 #include "ffmpeg.h"
 #include "sync_queue.h"
+#include "thread_queue.h"
 
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
 #include "libavutil/log.h"
 #include "libavutil/mem.h"
 #include "libavutil/timestamp.h"
+#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -46,18 +49,24 @@ typedef struct MuxStream {
     /* dts of the last packet sent to the muxer, in the stream timebase
      * used for making up missing dts values */
     int64_t last_mux_dts;
+
+    /* data (a real or a flush packet) was received for this stream */
+    int got_data;
 } MuxStream;
 
 struct Muxer {
     AVFormatContext *fc;
 
+    pthread_t    thread;
+    ThreadQueue *tq;
+
     MuxStream *streams;
 
     AVDictionary *opts;
 
     /* filesize limit expressed in bytes */
     int64_t limit_filesize;
-    int64_t final_filesize;
+    atomic_int_least64_t last_filesize;
     int header_written;
 };
 
@@ -221,13 +230,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
     return 0;
 }
 
+static int64_t filesize(AVIOContext *pb)
+{
+    int64_t ret = -1;
+
+    if (pb) {
+        ret = avio_size(pb);
+        if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
+            ret = avio_tell(pb);
+    }
+
+    return ret;
+}
+
 static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
 {
     MuxStream *ms = &of->mux->streams[ost->index];
     AVFormatContext *s = of->mux->fc;
     AVStream *st = ost->st;
+    int64_t fs;
     int ret;
 
+    fs = filesize(s->pb);
+    atomic_store(&of->mux->last_filesize, fs);
+    if (fs >= of->mux->limit_filesize)
+        return AVERROR_EOF;
+
     if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && video_sync_method == VSYNC_DROP) ||
         (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0))
         pkt->pts = pkt->dts = AV_NOPTS_VALUE;
@@ -333,8 +361,8 @@ static int check_write_header(OutputFile *of)
     int ret, i;
 
     for (i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = output_streams[of->ost_index + i];
-        if (!ost->initialized)
+        MuxStream *ms = &of->mux->streams[i];
+        if (!ms->got_data)
             return 0;
     }
 
@@ -378,12 +406,15 @@ static int check_write_header(OutputFile *of)
     return 0;
 }
 
-int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof)
+static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt)
 {
+    Muxer    *mux = of->mux;
+    MuxStream *ms = &mux->streams[ost->index];
     int ret;
 
-    if (!of->mux->header_written) {
-        ret  = check_write_header(of);
+    ms->got_data = 1;
+    if (!mux->header_written) {
+        ret = check_write_header(of);
         if (ret < 0) {
             av_packet_unref(pkt);
             return ret;
@@ -391,34 +422,102 @@ int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof)
     }
 
     if (ost->sq_idx_mux >= 0) {
-        ret = sq_send(of->sq_mux, ost->sq_idx_mux,
-                      SQPKT(eof ? NULL: pkt));
+        int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt));
         if (ret < 0) {
-            av_packet_unref(pkt);
-            if (ret == AVERROR_EOF) {
-                ost->finished |= MUXER_FINISHED;
-                return 0;
-            } else
-                return ret;
+            if (pkt)
+                av_packet_unref(pkt);
+            return ret;
         }
 
         while (1) {
+            pkt = av_packet_alloc();
+            if (!pkt)
+                // XXX
+                abort();
+
             ret = sq_receive(of->sq_mux, -1, SQPKT(pkt));
-            if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN))
-                return 0;
-            else if (ret < 0)
-                return ret;
+            if (ret < 0) {
+                av_packet_free(&pkt);
+                return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret;
+            }
 
             ret = submit_packet(of, pkt, output_streams[of->ost_index + ret]);
+            av_packet_free(&pkt);
             if (ret < 0)
                 return ret;
         }
-    } else if (!eof)
+    } else if (pkt)
         return submit_packet(of, pkt, ost);
 
     return 0;
 }
 
+static void *muxer_thread(void *arg)
+{
+    OutputFile *of = arg;
+    Muxer     *mux = of->mux;
+
+    while (1) {
+        OutputStream *ost;
+        AVPacket *pkt = NULL;
+        int stream_idx, ret;
+
+        ret = tq_receive(mux->tq, &stream_idx, &pkt);
+        if (stream_idx < 0) {
+            av_log(NULL, AV_LOG_DEBUG,
+                   "All streams finished for output file #%d\n", of->index);
+            break;
+        }
+
+        ost = output_streams[of->ost_index + stream_idx];
+        ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt);
+        av_packet_free(&pkt);
+        if (ret == AVERROR_EOF)
+            tq_receive_finish(mux->tq, stream_idx);
+        else if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR,
+                   "Error muxing a packet for output file #%d\n", of->index);
+            break;
+        }
+    }
+
+    for (unsigned int i = 0; i < mux->fc->nb_streams; i++) {
+        sync_queue_process(of, output_streams[of->ost_index], NULL);
+        tq_receive_finish(mux->tq, i);
+    }
+
+    av_log(NULL, AV_LOG_DEBUG, "Terminating muxer thread %d\n", of->index);
+
+    return NULL;
+}
+
+int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof)
+{
+    AVPacket *pkt1;
+    int ret = 0;
+
+    if (eof) {
+        tq_send_finish(of->mux->tq, ost->index);
+        return 0;
+    }
+
+    pkt1 = av_packet_alloc();
+    if (!pkt1) {
+        av_packet_unref(pkt);
+        return AVERROR(ENOMEM);
+    }
+
+    av_packet_move_ref(pkt1, pkt);
+
+    ret = tq_send(of->mux->tq, ost->index, &pkt1);
+    if (ret < 0) {
+        av_packet_free(&pkt1);
+        ost->finished |= MUXER_FINISHED;
+    }
+
+    return ret == AVERROR_EOF ? 0 : ret;
+}
+
 int of_write_trailer(OutputFile *of)
 {
     AVFormatContext *fc = of->mux->fc;
@@ -438,7 +537,7 @@ int of_write_trailer(OutputFile *of)
         return ret;
     }
 
-    of->mux->final_filesize = of_filesize(of);
+    of->mux->last_filesize = filesize(fc->pb);
 
     if (!(of->format->flags & AVFMT_NOFILE)) {
         ret = avio_closep(&fc->pb);
@@ -487,6 +586,9 @@ static void mux_free(Muxer **pmux)
     av_freep(&mux->streams);
     av_dict_free(&mux->opts);
 
+    if (mux->tq) {
+    }
+
     fc_close(&mux->fc);
 
     av_freep(pmux);
@@ -558,30 +660,53 @@ fail:
     return ret;
 }
 
-int of_finished(OutputFile *of)
+int64_t of_filesize(OutputFile *of)
 {
-    return of_filesize(of) >= of->mux->limit_filesize;
+    return atomic_load(&of->mux->last_filesize);
 }
 
-int64_t of_filesize(OutputFile *of)
+AVChapter * const *
+of_get_chapters(OutputFile *of, unsigned int *nb_chapters)
 {
-    AVIOContext *pb = of->mux->fc->pb;
-    int64_t ret = -1;
+    *nb_chapters = of->mux->fc->nb_chapters;
+    return of->mux->fc->chapters;
+}
 
-    if (of->mux->final_filesize)
-        ret = of->mux->final_filesize;
-    else if (pb) {
-        ret = avio_size(pb);
-        if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too
-            ret = avio_tell(pb);
+static void pkt_free(void *pkt)
+{
+    av_packet_free((AVPacket**)&pkt);
+}
+
+int of_thread_start(OutputFile *of)
+{
+    Muxer *mux = of->mux;
+    int ret;
+
+    mux->tq = tq_alloc(mux->fc->nb_streams, 8, sizeof(AVPacket*),
+                       pkt_free);
+    if (!mux->tq)
+        return AVERROR(ENOMEM);
+
+    ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of);
+    if (ret) {
+        tq_free(&mux->tq);
+        return AVERROR(ret);
     }
 
-    return ret;
+    return 0;
 }
 
-AVChapter * const *
-of_get_chapters(OutputFile *of, unsigned int *nb_chapters)
+void of_thread_stop(OutputFile *of)
 {
-    *nb_chapters = of->mux->fc->nb_chapters;
-    return of->mux->fc->chapters;
+    Muxer *mux = of->mux;
+
+    if (!mux || !mux->tq)
+        return;
+
+    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
+        tq_send_finish(mux->tq, i);
+
+    pthread_join(mux->thread, NULL);
+
+    tq_free(&mux->tq);
 }
-- 
2.34.1



More information about the ffmpeg-devel mailing list