[FFmpeg-devel] [PATCH v3 18/24] fftools/ffmpeg: add thread-aware transcode scheduling infrastructure

Anton Khirnov anton at khirnov.net
Sat Nov 11 17:21:16 EET 2023


See the comment block at the top of fftools/ffmpeg_sched.h for more
details on what this scheduler is for.

This commit adds the scheduling code itself, along with minimal
integration with the rest of the program:
* allocating and freeing the scheduler
* passing it throughout the call stack in order to register the
  individual components (demuxers/decoders/filtergraphs/encoders/muxers)
  with the scheduler

The scheduler is not actually used as of this commit, so it should not
result in any change in behavior. That will change in future commits.
---
v3:
Fix infinite loop with sync queues and -t.
Add forgotten AVClasses for SchFilterGraph and SchEnc.

v2:
keep setting OutputStream.{max_muxing_queue_size,muxing_queue_data_threshold}
until they are removed in 23/24
---
 fftools/Makefile          |    1 +
 fftools/ffmpeg.c          |   18 +-
 fftools/ffmpeg.h          |   24 +-
 fftools/ffmpeg_dec.c      |   10 +-
 fftools/ffmpeg_demux.c    |   46 +-
 fftools/ffmpeg_enc.c      |   13 +-
 fftools/ffmpeg_filter.c   |   37 +-
 fftools/ffmpeg_mux.c      |   17 +-
 fftools/ffmpeg_mux.h      |   11 +
 fftools/ffmpeg_mux_init.c |   85 +-
 fftools/ffmpeg_opt.c      |   22 +-
 fftools/ffmpeg_sched.c    | 2084 +++++++++++++++++++++++++++++++++++++
 fftools/ffmpeg_sched.h    |  461 ++++++++
 13 files changed, 2773 insertions(+), 56 deletions(-)
 create mode 100644 fftools/ffmpeg_sched.c
 create mode 100644 fftools/ffmpeg_sched.h

diff --git a/fftools/Makefile b/fftools/Makefile
index 56820e6bc8..d6a8913a7f 100644
--- a/fftools/Makefile
+++ b/fftools/Makefile
@@ -18,6 +18,7 @@ OBJS-ffmpeg +=                  \
     fftools/ffmpeg_mux.o        \
     fftools/ffmpeg_mux_init.o   \
     fftools/ffmpeg_opt.o        \
+    fftools/ffmpeg_sched.o      \
     fftools/objpool.o           \
     fftools/sync_queue.o        \
     fftools/thread_queue.o      \
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index f2293e0250..1a58bf98cf 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -99,6 +99,7 @@
 
 #include "cmdutils.h"
 #include "ffmpeg.h"
+#include "ffmpeg_sched.h"
 #include "ffmpeg_utils.h"
 #include "sync_queue.h"
 
@@ -1123,7 +1124,7 @@ static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
 /*
  * The following code is the main loop of the file converter
  */
-static int transcode(int *err_rate_exceeded)
+static int transcode(Scheduler *sch, int *err_rate_exceeded)
 {
     int ret = 0, i;
     InputStream *ist;
@@ -1261,6 +1262,8 @@ static int64_t getmaxrss(void)
 
 int main(int argc, char **argv)
 {
+    Scheduler *sch = NULL;
+
     int ret, err_rate_exceeded;
     BenchmarkTimeStamps ti;
 
@@ -1278,8 +1281,14 @@ int main(int argc, char **argv)
 
     show_banner(argc, argv, options);
 
+    sch = sch_alloc();
+    if (!sch) {
+        ret = AVERROR(ENOMEM);
+        goto finish;
+    }
+
     /* parse options and open all input/output files */
-    ret = ffmpeg_parse_options(argc, argv);
+    ret = ffmpeg_parse_options(argc, argv, sch);
     if (ret < 0)
         goto finish;
 
@@ -1297,7 +1306,7 @@ int main(int argc, char **argv)
     }
 
     current_time = ti = get_benchmark_time_stamps();
-    ret = transcode(&err_rate_exceeded);
+    ret = transcode(sch, &err_rate_exceeded);
     if (ret >= 0 && do_benchmark) {
         int64_t utime, stime, rtime;
         current_time = get_benchmark_time_stamps();
@@ -1317,5 +1326,8 @@ finish:
         ret = 0;
 
     ffmpeg_cleanup(ret);
+
+    sch_free(&sch);
+
     return ret;
 }
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index c954ed5ebf..5833f85ab5 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -27,6 +27,7 @@
 #include <signal.h>
 
 #include "cmdutils.h"
+#include "ffmpeg_sched.h"
 #include "sync_queue.h"
 
 #include "libavformat/avformat.h"
@@ -713,7 +714,8 @@ int parse_and_set_vsync(const char *arg, int *vsync_var, int file_idx, int st_id
 int check_filter_outputs(void);
 int filtergraph_is_simple(const FilterGraph *fg);
 int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
-                            char *graph_desc);
+                            char *graph_desc,
+                            Scheduler *sch, unsigned sch_idx_enc);
 int init_complex_filtergraph(FilterGraph *fg);
 
 int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src);
@@ -736,7 +738,8 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t
  */
 int ifilter_parameters_from_dec(InputFilter *ifilter, const AVCodecContext *dec);
 
-int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost);
+int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
+                     unsigned sched_idx_enc);
 
 /**
  * Create a new filtergraph in the global filtergraph list.
@@ -744,7 +747,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost);
  * @param graph_desc Graph description; an av_malloc()ed string, filtergraph
  *                   takes ownership of it.
  */
-int fg_create(FilterGraph **pfg, char *graph_desc);
+int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
 
 void fg_free(FilterGraph **pfg);
 
@@ -768,7 +771,7 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
  */
 int reap_filters(FilterGraph *fg, int flush);
 
-int ffmpeg_parse_options(int argc, char **argv);
+int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
 
 void enc_stats_write(OutputStream *ost, EncStats *es,
                      const AVFrame *frame, const AVPacket *pkt,
@@ -791,7 +794,7 @@ AVBufferRef *hw_device_for_filter(void);
 
 int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
 
-int dec_open(InputStream *ist);
+int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
 void dec_free(Decoder **pdec);
 
 /**
@@ -805,7 +808,8 @@ void dec_free(Decoder **pdec);
  */
 int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
 
-int enc_alloc(Encoder **penc, const AVCodec *codec);
+int enc_alloc(Encoder **penc, const AVCodec *codec,
+              Scheduler *sch, unsigned sch_idx);
 void enc_free(Encoder **penc);
 
 int enc_open(OutputStream *ost, const AVFrame *frame);
@@ -821,7 +825,7 @@ int enc_flush(void);
  */
 int of_stream_init(OutputFile *of, OutputStream *ost);
 int of_write_trailer(OutputFile *of);
-int of_open(const OptionsContext *o, const char *filename);
+int of_open(const OptionsContext *o, const char *filename, Scheduler *sch);
 void of_free(OutputFile **pof);
 
 void of_enc_stats_close(void);
@@ -835,7 +839,7 @@ int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
 
 int64_t of_filesize(OutputFile *of);
 
-int ifile_open(const OptionsContext *o, const char *filename);
+int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
 void ifile_close(InputFile **f);
 
 /**
@@ -920,4 +924,8 @@ extern const char * const opt_name_frame_rates[];
 extern const char * const opt_name_top_field_first[];
 #endif
 
+void *muxer_thread(void *arg);
+void *decoder_thread(void *arg);
+void *encoder_thread(void *arg);
+
 #endif /* FFTOOLS_FFMPEG_H */
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 798ddc25b3..53e14f061e 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -52,6 +52,9 @@ struct Decoder {
     AVFrame *sub_prev[2];
     AVFrame *sub_heartbeat;
 
+    Scheduler      *sch;
+    unsigned        sch_idx;
+
     pthread_t       thread;
     /**
      * Queue for sending coded packets from the main thread to
@@ -650,7 +653,7 @@ fail:
     return AVERROR(ENOMEM);
 }
 
-static void *decoder_thread(void *arg)
+void *decoder_thread(void *arg)
 {
     InputStream *ist = arg;
     InputFile *ifile = input_files[ist->file_index];
@@ -1022,7 +1025,7 @@ static int hw_device_setup_for_decode(InputStream *ist)
     return 0;
 }
 
-int dec_open(InputStream *ist)
+int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
 {
     Decoder *d;
     const AVCodec *codec = ist->dec;
@@ -1040,6 +1043,9 @@ int dec_open(InputStream *ist)
         return ret;
     d = ist->decoder;
 
+    d->sch     = sch;
+    d->sch_idx = sch_idx;
+
     if (codec->type == AVMEDIA_TYPE_SUBTITLE && ist->fix_sub_duration) {
         for (int i = 0; i < FF_ARRAY_ELEMS(d->sub_prev); i++) {
             d->sub_prev[i] = av_frame_alloc();
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index 65a5e08ca5..2234dbe076 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -20,6 +20,7 @@
 #include <stdint.h>
 
 #include "ffmpeg.h"
+#include "ffmpeg_sched.h"
 #include "ffmpeg_utils.h"
 #include "objpool.h"
 #include "thread_queue.h"
@@ -60,6 +61,9 @@ typedef struct DemuxStream {
     // name used for logging
     char log_name[32];
 
+    int sch_idx_stream;
+    int sch_idx_dec;
+
     double ts_scale;
 
     int streamcopy_needed;
@@ -108,6 +112,7 @@ typedef struct Demuxer {
 
     double readrate_initial_burst;
 
+    Scheduler            *sch;
     ThreadQueue          *thread_queue;
     int                   thread_queue_size;
     pthread_t             thread;
@@ -780,7 +785,9 @@ void ifile_close(InputFile **pf)
 
 static int ist_use(InputStream *ist, int decoding_needed)
 {
+    Demuxer      *d = demuxer_from_ifile(input_files[ist->file_index]);
     DemuxStream *ds = ds_from_ist(ist);
+    int ret;
 
     if (ist->user_set_discard == AVDISCARD_ALL) {
         av_log(ist, AV_LOG_ERROR, "Cannot %s a disabled input stream\n",
@@ -788,13 +795,32 @@ static int ist_use(InputStream *ist, int decoding_needed)
         return AVERROR(EINVAL);
     }
 
+    if (ds->sch_idx_stream < 0) {
+        ret = sch_add_demux_stream(d->sch, d->f.index);
+        if (ret < 0)
+            return ret;
+        ds->sch_idx_stream = ret;
+    }
+
     ist->discard          = 0;
     ist->st->discard      = ist->user_set_discard;
     ist->decoding_needed |= decoding_needed;
     ds->streamcopy_needed |= !decoding_needed;
 
-    if (decoding_needed && !avcodec_is_open(ist->dec_ctx)) {
-        int ret = dec_open(ist);
+    if (decoding_needed && ds->sch_idx_dec < 0) {
+        int is_audio = ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO;
+
+        ret = sch_add_dec(d->sch, decoder_thread, ist, d->loop && is_audio);
+        if (ret < 0)
+            return ret;
+        ds->sch_idx_dec = ret;
+
+        ret = sch_connect(d->sch, SCH_DSTREAM(d->f.index, ds->sch_idx_stream),
+                                  SCH_DEC(ds->sch_idx_dec));
+        if (ret < 0)
+            return ret;
+
+        ret = dec_open(ist, d->sch, ds->sch_idx_dec);
         if (ret < 0)
             return ret;
     }
@@ -804,6 +830,7 @@ static int ist_use(InputStream *ist, int decoding_needed)
 
 int ist_output_add(InputStream *ist, OutputStream *ost)
 {
+    DemuxStream *ds = ds_from_ist(ist);
     int ret;
 
     ret = ist_use(ist, ost->enc ? DECODING_FOR_OST : 0);
@@ -816,11 +843,12 @@ int ist_output_add(InputStream *ist, OutputStream *ost)
 
     ist->outputs[ist->nb_outputs - 1] = ost;
 
-    return 0;
+    return ost->enc ? ds->sch_idx_dec : ds->sch_idx_stream;
 }
 
 int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
 {
+    DemuxStream *ds = ds_from_ist(ist);
     int ret;
 
     ret = ist_use(ist, is_simple ? DECODING_FOR_OST : DECODING_FOR_FILTER);
@@ -838,7 +866,7 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
     if (ret < 0)
         return ret;
 
-    return 0;
+    return ds->sch_idx_dec;
 }
 
 static int choose_decoder(const OptionsContext *o, AVFormatContext *s, AVStream *st,
@@ -970,6 +998,9 @@ static DemuxStream *demux_stream_alloc(Demuxer *d, AVStream *st)
     if (!ds)
         return NULL;
 
+    ds->sch_idx_stream = -1;
+    ds->sch_idx_dec    = -1;
+
     ds->ist.st         = st;
     ds->ist.file_index = f->index;
     ds->ist.index      = st->index;
@@ -1295,7 +1326,7 @@ static Demuxer *demux_alloc(void)
     return d;
 }
 
-int ifile_open(const OptionsContext *o, const char *filename)
+int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
 {
     Demuxer   *d;
     InputFile *f;
@@ -1322,6 +1353,11 @@ int ifile_open(const OptionsContext *o, const char *filename)
 
     f = &d->f;
 
+    ret = sch_add_demux(sch, input_thread, d);
+    if (ret < 0)
+        return ret;
+    d->sch = sch;
+
     if (stop_time != INT64_MAX && recording_time != INT64_MAX) {
         stop_time = INT64_MAX;
         av_log(d, AV_LOG_WARNING, "-t and -to cannot be used together; using -t.\n");
diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
index f1c41272b0..fbfe592f20 100644
--- a/fftools/ffmpeg_enc.c
+++ b/fftools/ffmpeg_enc.c
@@ -56,6 +56,9 @@ struct Encoder {
     int opened;
     int finished;
 
+    Scheduler      *sch;
+    unsigned        sch_idx;
+
     pthread_t       thread;
     /**
      * Queue for sending frames from the main thread to
@@ -113,7 +116,8 @@ void enc_free(Encoder **penc)
     av_freep(penc);
 }
 
-int enc_alloc(Encoder **penc, const AVCodec *codec)
+int enc_alloc(Encoder **penc, const AVCodec *codec,
+              Scheduler *sch, unsigned sch_idx)
 {
     Encoder *enc;
 
@@ -133,6 +137,9 @@ int enc_alloc(Encoder **penc, const AVCodec *codec)
     if (!enc->pkt)
         goto fail;
 
+    enc->sch     = sch;
+    enc->sch_idx = sch_idx;
+
     *penc = enc;
 
     return 0;
@@ -217,8 +224,6 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost)
     return 0;
 }
 
-static void *encoder_thread(void *arg);
-
 static int enc_thread_start(OutputStream *ost)
 {
     Encoder *e = ost->enc;
@@ -994,7 +999,7 @@ fail:
     return AVERROR(ENOMEM);
 }
 
-static void *encoder_thread(void *arg)
+void *encoder_thread(void *arg)
 {
     OutputStream *ost = arg;
     OutputFile    *of = output_files[ost->file_index];
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 7933c220ca..92c582683d 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -65,6 +65,9 @@ typedef struct FilterGraphPriv {
     // frame for sending output to the encoder
     AVFrame *frame_enc;
 
+    Scheduler       *sch;
+    unsigned         sch_idx;
+
     pthread_t        thread;
     /**
      * Queue for sending frames from the main thread to the filtergraph. Has
@@ -735,14 +738,19 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
 {
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
+    int ret, dec_idx;
 
     av_assert0(!ifp->ist);
 
     ifp->ist             = ist;
     ifp->type_src        = ist->st->codecpar->codec_type;
 
-    ret = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph));
+    dec_idx = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph));
+    if (dec_idx < 0)
+        return dec_idx;
+
+    ret = sch_connect(fgp->sch, SCH_DEC(dec_idx),
+                                SCH_FILTER_IN(fgp->sch_idx, ifp->index));
     if (ret < 0)
         return ret;
 
@@ -798,13 +806,15 @@ static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
     return 0;
 }
 
-int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost)
+int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
+                     unsigned sched_idx_enc)
 {
     const OutputFile  *of = output_files[ost->file_index];
     OutputFilterPriv *ofp = ofp_from_ofilter(ofilter);
     FilterGraph  *fg = ofilter->graph;
     FilterGraphPriv *fgp = fgp_from_fg(fg);
     const AVCodec *c = ost->enc_ctx->codec;
+    int ret;
 
     av_assert0(!ofilter->ost);
 
@@ -887,6 +897,11 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost)
         break;
     }
 
+    ret = sch_connect(fgp->sch, SCH_FILTER_OUT(fgp->sch_idx, ofp->index),
+                                SCH_ENC(sched_idx_enc));
+    if (ret < 0)
+        return ret;
+
     fgp->nb_outputs_bound++;
     av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
 
@@ -1016,7 +1031,7 @@ static const AVClass fg_class = {
     .category   = AV_CLASS_CATEGORY_FILTER,
 };
 
-int fg_create(FilterGraph **pfg, char *graph_desc)
+int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
 {
     FilterGraphPriv *fgp;
     FilterGraph      *fg;
@@ -1037,6 +1052,7 @@ int fg_create(FilterGraph **pfg, char *graph_desc)
     fg->index      = nb_filtergraphs - 1;
     fgp->graph_desc = graph_desc;
     fgp->disable_conversions = !auto_conversion_filters;
+    fgp->sch                 = sch;
 
     snprintf(fgp->log_name, sizeof(fgp->log_name), "fc#%d", fg->index);
 
@@ -1104,6 +1120,12 @@ int fg_create(FilterGraph **pfg, char *graph_desc)
         goto fail;
     }
 
+    ret = sch_add_filtergraph(sch, fg->nb_inputs, fg->nb_outputs,
+                              filter_thread, fgp);
+    if (ret < 0)
+        goto fail;
+    fgp->sch_idx = ret;
+
 fail:
     avfilter_inout_free(&inputs);
     avfilter_inout_free(&outputs);
@@ -1116,13 +1138,14 @@ fail:
 }
 
 int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
-                            char *graph_desc)
+                            char *graph_desc,
+                            Scheduler *sch, unsigned sched_idx_enc)
 {
     FilterGraph *fg;
     FilterGraphPriv *fgp;
     int ret;
 
-    ret = fg_create(&fg, graph_desc);
+    ret = fg_create(&fg, graph_desc, sch);
     if (ret < 0)
         return ret;
     fgp = fgp_from_fg(fg);
@@ -1148,7 +1171,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
     if (ret < 0)
         return ret;
 
-    ret = ofilter_bind_ost(fg->outputs[0], ost);
+    ret = ofilter_bind_ost(fg->outputs[0], ost, sched_idx_enc);
     if (ret < 0)
         return ret;
 
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index bc6ce33483..7dd8e8c848 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -297,7 +297,7 @@ fail:
     return AVERROR(ENOMEM);
 }
 
-static void *muxer_thread(void *arg)
+void *muxer_thread(void *arg)
 {
     Muxer     *mux = arg;
     OutputFile *of = &mux->of;
@@ -570,7 +570,9 @@ static int thread_start(Muxer *mux)
     return 0;
 }
 
-static int print_sdp(void)
+int print_sdp(const char *filename);
+
+int print_sdp(const char *filename)
 {
     char sdp[16384];
     int i;
@@ -603,19 +605,18 @@ static int print_sdp(void)
     if (ret < 0)
         goto fail;
 
-    if (!sdp_filename) {
+    if (!filename) {
         printf("SDP:\n%s\n", sdp);
         fflush(stdout);
     } else {
-        ret = avio_open2(&sdp_pb, sdp_filename, AVIO_FLAG_WRITE, &int_cb, NULL);
+        ret = avio_open2(&sdp_pb, filename, AVIO_FLAG_WRITE, &int_cb, NULL);
         if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", sdp_filename);
+            av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", filename);
             goto fail;
         }
 
         avio_print(sdp_pb, sdp);
         avio_closep(&sdp_pb);
-        av_freep(&sdp_filename);
     }
 
     // SDP successfully written, allow muxer threads to start
@@ -651,7 +652,7 @@ int mux_check_init(Muxer *mux)
     nb_output_dumped++;
 
     if (sdp_filename || want_sdp) {
-        ret = print_sdp();
+        ret = print_sdp(sdp_filename);
         if (ret < 0) {
             av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
             return ret;
@@ -974,6 +975,8 @@ void of_free(OutputFile **pof)
         ost_free(&of->streams[i]);
     av_freep(&of->streams);
 
+    av_freep(&mux->sch_stream_idx);
+
     av_dict_free(&mux->opts);
 
     av_packet_free(&mux->sq_pkt);
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index a2bb4dfc7d..aaf81eaa8d 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -24,6 +24,7 @@
 #include <stdatomic.h>
 #include <stdint.h>
 
+#include "ffmpeg_sched.h"
 #include "thread_queue.h"
 
 #include "libavformat/avformat.h"
@@ -50,6 +51,9 @@ typedef struct MuxStream {
 
     EncStats stats;
 
+    int sch_idx;
+    int sch_idx_enc;
+
     int64_t max_frames;
 
     /*
@@ -94,6 +98,13 @@ typedef struct Muxer {
 
     AVFormatContext *fc;
 
+    Scheduler   *sch;
+    unsigned     sch_idx;
+
+    // OutputStream indices indexed by scheduler stream indices
+    int         *sch_stream_idx;
+    int       nb_sch_stream_idx;
+
     pthread_t    thread;
     ThreadQueue *tq;
 
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index d5a10e92bd..922e5f85f2 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -23,6 +23,7 @@
 #include "cmdutils.h"
 #include "ffmpeg.h"
 #include "ffmpeg_mux.h"
+#include "ffmpeg_sched.h"
 #include "fopen_utf8.h"
 
 #include "libavformat/avformat.h"
@@ -435,6 +436,9 @@ static MuxStream *mux_stream_alloc(Muxer *mux, enum AVMediaType type)
 
     ms->ost.class = &output_stream_class;
 
+    ms->sch_idx     = -1;
+    ms->sch_idx_enc = -1;
+
     snprintf(ms->log_name, sizeof(ms->log_name), "%cost#%d:%d",
              type_str ? *type_str : '?', mux->of.index, ms->ost.index);
 
@@ -1126,6 +1130,22 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     if (!ms)
         return AVERROR(ENOMEM);
 
+    // only streams with sources (i.e. not attachments)
+    // are handled by the scheduler
+    if (ist || ofilter) {
+        ret = GROW_ARRAY(mux->sch_stream_idx, mux->nb_sch_stream_idx);
+        if (ret < 0)
+            return ret;
+
+        ret = sch_add_mux_stream(mux->sch, mux->sch_idx);
+        if (ret < 0)
+            return ret;
+
+        av_assert0(ret == mux->nb_sch_stream_idx - 1);
+        mux->sch_stream_idx[ret] = ms->ost.index;
+        ms->sch_idx              = ret;
+    }
+
     ost = &ms->ost;
 
     if (o->streamid) {
@@ -1169,7 +1189,12 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
         if (!ost->enc_ctx)
             return AVERROR(ENOMEM);
 
-        ret = enc_alloc(&ost->enc, enc);
+        ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
+        if (ret < 0)
+            return ret;
+        ms->sch_idx_enc = ret;
+
+        ret = enc_alloc(&ost->enc, enc, mux->sch, ms->sch_idx_enc);
         if (ret < 0)
             return ret;
 
@@ -1379,11 +1404,19 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
         ost->enc_ctx->global_quality = FF_QP2LAMBDA * qscale;
     }
 
-    ms->max_muxing_queue_size = 128;
-    MATCH_PER_STREAM_OPT(max_muxing_queue_size, i, ms->max_muxing_queue_size, oc, st);
+    if (ms->sch_idx >= 0) {
+        int max_muxing_queue_size       = 128;
+        int muxing_queue_data_threshold = 50 * 1024 * 1024;
 
-    ms->muxing_queue_data_threshold = 50*1024*1024;
-    MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, ms->muxing_queue_data_threshold, oc, st);
+        MATCH_PER_STREAM_OPT(max_muxing_queue_size, i, max_muxing_queue_size, oc, st);
+        MATCH_PER_STREAM_OPT(muxing_queue_data_threshold, i, muxing_queue_data_threshold, oc, st);
+
+        sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx,
+                                 max_muxing_queue_size, muxing_queue_data_threshold);
+
+        ms->max_muxing_queue_size       = max_muxing_queue_size;
+        ms->muxing_queue_data_threshold = muxing_queue_data_threshold;
+    }
 
     MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample,
                          oc, st);
@@ -1421,23 +1454,46 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
         (type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO)) {
         if (ofilter) {
             ost->filter       = ofilter;
-            ret = ofilter_bind_ost(ofilter, ost);
+            ret = ofilter_bind_ost(ofilter, ost, ms->sch_idx_enc);
             if (ret < 0)
                 return ret;
         } else {
-            ret = init_simple_filtergraph(ost->ist, ost, filters);
+            ret = init_simple_filtergraph(ost->ist, ost, filters,
+                                          mux->sch, ms->sch_idx_enc);
             if (ret < 0) {
                 av_log(ost, AV_LOG_ERROR,
                        "Error initializing a simple filtergraph\n");
                 return ret;
             }
         }
+
+        ret = sch_connect(mux->sch, SCH_ENC(ms->sch_idx_enc),
+                                    SCH_MSTREAM(mux->sch_idx, ms->sch_idx));
+        if (ret < 0)
+            return ret;
     } else if (ost->ist) {
-        ret = ist_output_add(ost->ist, ost);
-        if (ret < 0) {
+        int sched_idx = ist_output_add(ost->ist, ost);
+        if (sched_idx < 0) {
             av_log(ost, AV_LOG_ERROR,
                    "Error binding an input stream\n");
-            return ret;
+            return sched_idx;
+        }
+
+        if (ost->enc) {
+            ret = sch_connect(mux->sch, SCH_DEC(sched_idx),
+                                        SCH_ENC(ms->sch_idx_enc));
+            if (ret < 0)
+                return ret;
+
+            ret = sch_connect(mux->sch, SCH_ENC(ms->sch_idx_enc),
+                                        SCH_MSTREAM(mux->sch_idx, ms->sch_idx));
+            if (ret < 0)
+                return ret;
+        } else {
+            ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx),
+                                        SCH_MSTREAM(ost->file_index, ms->sch_idx));
+            if (ret < 0)
+                return ret;
         }
     }
 
@@ -2617,7 +2673,7 @@ static Muxer *mux_alloc(void)
     return mux;
 }
 
-int of_open(const OptionsContext *o, const char *filename)
+int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
 {
     Muxer *mux;
     AVFormatContext *oc;
@@ -2687,6 +2743,13 @@ int of_open(const OptionsContext *o, const char *filename)
                                            AVFMT_FLAG_BITEXACT);
     }
 
+    err = sch_add_mux(sch, muxer_thread, NULL, mux,
+                      !strcmp(oc->oformat->name, "rtp"));
+    if (err < 0)
+        return err;
+    mux->sch     = sch;
+    mux->sch_idx = err;
+
     /* create all output streams for this file */
     err = create_streams(mux, o);
     if (err < 0)
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index cd1aaabccc..e1680ebe0e 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -28,6 +28,7 @@
 #endif
 
 #include "ffmpeg.h"
+#include "ffmpeg_sched.h"
 #include "cmdutils.h"
 #include "opt_common.h"
 #include "sync_queue.h"
@@ -1163,20 +1164,22 @@ static int opt_audio_qscale(void *optctx, const char *opt, const char *arg)
 
 static int opt_filter_complex(void *optctx, const char *opt, const char *arg)
 {
+    Scheduler *sch = optctx;
     char *graph_desc = av_strdup(arg);
     if (!graph_desc)
         return AVERROR(ENOMEM);
 
-    return fg_create(NULL, graph_desc);
+    return fg_create(NULL, graph_desc, sch);
 }
 
 static int opt_filter_complex_script(void *optctx, const char *opt, const char *arg)
 {
+    Scheduler *sch = optctx;
     char *graph_desc = file_read(arg);
     if (!graph_desc)
         return AVERROR(EINVAL);
 
-    return fg_create(NULL, graph_desc);
+    return fg_create(NULL, graph_desc, sch);
 }
 
 void show_help_default(const char *opt, const char *arg)
@@ -1268,8 +1271,9 @@ static const OptionGroupDef groups[] = {
     [GROUP_INFILE]  = { "input url",   "i",  OPT_INPUT },
 };
 
-static int open_files(OptionGroupList *l, const char *inout,
-                      int (*open_file)(const OptionsContext*, const char*))
+static int open_files(OptionGroupList *l, const char *inout, Scheduler *sch,
+                      int (*open_file)(const OptionsContext*, const char*,
+                                       Scheduler*))
 {
     int i, ret;
 
@@ -1289,7 +1293,7 @@ static int open_files(OptionGroupList *l, const char *inout,
         }
 
         av_log(NULL, AV_LOG_DEBUG, "Opening an %s file: %s.\n", inout, g->arg);
-        ret = open_file(&o, g->arg);
+        ret = open_file(&o, g->arg, sch);
         uninit_options(&o);
         if (ret < 0) {
             av_log(NULL, AV_LOG_ERROR, "Error opening %s file %s.\n",
@@ -1302,7 +1306,7 @@ static int open_files(OptionGroupList *l, const char *inout,
     return 0;
 }
 
-int ffmpeg_parse_options(int argc, char **argv)
+int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch)
 {
     OptionParseContext octx;
     const char *errmsg = NULL;
@@ -1319,7 +1323,7 @@ int ffmpeg_parse_options(int argc, char **argv)
     }
 
     /* apply global options */
-    ret = parse_optgroup(NULL, &octx.global_opts);
+    ret = parse_optgroup(sch, &octx.global_opts);
     if (ret < 0) {
         errmsg = "parsing global options";
         goto fail;
@@ -1329,7 +1333,7 @@ int ffmpeg_parse_options(int argc, char **argv)
     term_init();
 
     /* open input files */
-    ret = open_files(&octx.groups[GROUP_INFILE], "input", ifile_open);
+    ret = open_files(&octx.groups[GROUP_INFILE], "input", sch, ifile_open);
     if (ret < 0) {
         errmsg = "opening input files";
         goto fail;
@@ -1343,7 +1347,7 @@ int ffmpeg_parse_options(int argc, char **argv)
     }
 
     /* open output files */
-    ret = open_files(&octx.groups[GROUP_OUTFILE], "output", of_open);
+    ret = open_files(&octx.groups[GROUP_OUTFILE], "output", sch, of_open);
     if (ret < 0) {
         errmsg = "opening output files";
         goto fail;
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
new file mode 100644
index 0000000000..1e5bcc2c73
--- /dev/null
+++ b/fftools/ffmpeg_sched.c
@@ -0,0 +1,2084 @@
+/*
+ * Inter-thread scheduling/synchronization.
+ * Copyright (c) 2023 Anton Khirnov
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdatomic.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "cmdutils.h"
+#include "ffmpeg_sched.h"
+#include "ffmpeg_utils.h"
+#include "sync_queue.h"
+#include "thread_queue.h"
+
+#include "libavcodec/packet.h"
+
+#include "libavutil/avassert.h"
+#include "libavutil/error.h"
+#include "libavutil/fifo.h"
+#include "libavutil/frame.h"
+#include "libavutil/mem.h"
+#include "libavutil/thread.h"
+#include "libavutil/threadmessage.h"
+#include "libavutil/time.h"
+
+// 100 ms
+// FIXME: some other value? make this dynamic?
+#define SCHEDULE_TOLERANCE (100 * 1000)
+
+enum QueueType {
+    QUEUE_PACKETS,
+    QUEUE_FRAMES,
+};
+
+typedef struct SchWaiter {
+    pthread_mutex_t     lock;
+    pthread_cond_t      cond;
+    atomic_int          choked;
+
+    // the following are internal state of schedule_update_locked() and must not
+    // be accessed outside of it
+    int                 choked_prev;
+    int                 choked_next;
+} SchWaiter;
+
+typedef struct SchTask {
+    Scheduler          *parent;
+    SchedulerNode       node;
+
+    SchThreadFunc       func;
+    void               *func_arg;
+
+    pthread_t           thread;
+    int                 thread_running;
+} SchTask;
+
+typedef struct SchDec {
+    const AVClass      *class;
+
+    SchedulerNode       src;
+    SchedulerNode      *dst;
+    uint8_t            *dst_finished;
+    unsigned         nb_dst;
+
+    SchTask             task;
+    // Queue for receiving input packets, one stream.
+    ThreadQueue        *queue;
+
+    // Queue for sending post-flush end timestamps back to the source
+    AVThreadMessageQueue *queue_end_ts;
+    int                 expect_end_ts;
+
+    // temporary storage used by sch_dec_send()
+    AVFrame            *send_frame;
+} SchDec;
+
+typedef struct SchSyncQueue {
+    SyncQueue          *sq;
+    AVFrame            *frame;
+    pthread_mutex_t     lock;
+
+    unsigned           *enc_idx;
+    unsigned         nb_enc_idx;
+} SchSyncQueue;
+
+typedef struct SchEnc {
+    const AVClass      *class;
+
+    SchedulerNode       src;
+    SchedulerNode       dst;
+
+    // [0] - index of the sync queue in Scheduler.sq_enc,
+    // [1] - index of this encoder in the sq
+    int                 sq_idx[2];
+
+    /* Opening encoders is somewhat nontrivial due to their interaction with
+     * sync queues, which are (among other things) responsible for maintaining
+     * constant audio frame size, when it is required by the encoder.
+     *
+     * Opening the encoder requires stream parameters, obtained from the first
+     * frame. However, that frame cannot be properly chunked by the sync queue
+     * without knowing the required frame size, which is only available after
+     * opening the encoder.
+     *
+     * This apparent circular dependency is resolved in the following way:
+     * - the caller creating the encoder gives us a callback which opens the
+     *   encoder and returns the required frame size (if any)
+     * - when the first frame is sent to the encoder, the sending thread
+     *      - calls this callback, opening the encoder
+     *      - passes the returned frame size to the sync queue
+     */
+    int               (*open_cb)(void *opaque, const AVFrame *frame);
+    int                 opened;
+
+    SchTask             task;
+    // Queue for receiving input frames, one stream.
+    ThreadQueue        *queue;
+    // tq_send() to queue returned EOF
+    int                 in_finished;
+} SchEnc;
+
+typedef struct SchDemuxStream {
+    SchedulerNode      *dst;
+    uint8_t            *dst_finished;
+    unsigned         nb_dst;
+} SchDemuxStream;
+
+typedef struct SchDemux {
+    const AVClass      *class;
+
+    SchDemuxStream     *streams;
+    unsigned         nb_streams;
+
+    SchTask             task;
+    SchWaiter           waiter;
+
+    // temporary storage used by sch_demux_send()
+    AVPacket           *send_pkt;
+} SchDemux;
+
+typedef struct PreMuxQueue {
+    /**
+     * Queue for buffering the packets before the muxer task can be started.
+     */
+    AVFifo         *fifo;
+    /**
+     * Maximum number of packets in fifo.
+     */
+    int             max_packets;
+    /*
+     * The size of the AVPackets' buffers in queue.
+     * Updated when a packet is either pushed or pulled from the queue.
+     */
+    size_t          data_size;
+    /* Threshold after which max_packets will be in effect */
+    size_t          data_threshold;
+} PreMuxQueue;
+
+typedef struct SchMuxStream {
+    SchedulerNode       src;
+    SchedulerNode       src_sched;
+
+    PreMuxQueue         pre_mux_queue;
+
+    ////////////////////////////////////////////////////////////
+    // The following are protected by Scheduler.schedule_lock //
+
+    /* dts of the last packet sent to this stream
+       in AV_TIME_BASE_Q */
+    int64_t             last_dts;
+    // this stream no longer accepts input
+    int                 finished;
+    ////////////////////////////////////////////////////////////
+} SchMuxStream;
+
+typedef struct SchMux {
+    const AVClass      *class;
+
+    SchMuxStream       *streams;
+    unsigned         nb_streams;
+    unsigned         nb_streams_ready;
+
+    int               (*init)(void *arg);
+
+    SchTask             task;
+    /**
+     * Set to 1 after starting the muxer task and flushing the
+     * pre-muxing queues.
+     * Set either before any tasks have started, or with
+     * Scheduler.mux_ready_lock held.
+     */
+    atomic_int          mux_started;
+    ThreadQueue        *queue;
+} SchMux;
+
+typedef struct SchFilterIn {
+    SchedulerNode       src;
+    SchedulerNode       src_sched;
+    int                 send_finished;
+} SchFilterIn;
+
+typedef struct SchFilterOut {
+    SchedulerNode       dst;
+} SchFilterOut;
+
+typedef struct SchFilterGraph {
+    const AVClass      *class;
+
+    SchFilterIn        *inputs;
+    unsigned         nb_inputs;
+    unsigned         nb_inputs_finished;
+
+    SchFilterOut       *outputs;
+    unsigned         nb_outputs;
+
+    SchTask             task;
+    // input queue, nb_inputs+1 streams
+    // last stream is control
+    ThreadQueue        *queue;
+    SchWaiter           waiter;
+
+    // protected by schedule_lock
+    unsigned            best_input;
+} SchFilterGraph;
+
+struct Scheduler {
+    const AVClass      *class;
+
+    SchDemux           *demux;
+    unsigned         nb_demux;
+
+    SchMux             *mux;
+    unsigned         nb_mux;
+
+    unsigned         nb_mux_ready;
+    pthread_mutex_t     mux_ready_lock;
+
+    unsigned         nb_mux_done;
+    pthread_mutex_t     mux_done_lock;
+    pthread_cond_t      mux_done_cond;
+
+
+    SchDec             *dec;
+    unsigned         nb_dec;
+
+    SchEnc             *enc;
+    unsigned         nb_enc;
+
+    SchSyncQueue       *sq_enc;
+    unsigned         nb_sq_enc;
+
+    SchFilterGraph     *filters;
+    unsigned         nb_filters;
+
+    char               *sdp_filename;
+    int                 sdp_auto;
+
+    int                 transcode_started;
+    atomic_int          terminate;
+    atomic_int          task_failed;
+
+    pthread_mutex_t     schedule_lock;
+
+    atomic_int_least64_t last_dts;
+};
+
+/**
+ * Wait until this task is allowed to proceed.
+ *
+ * @retval 0 the caller should proceed
+ * @retval 1 the caller should terminate
+ */
+static int waiter_wait(Scheduler *sch, SchWaiter *w)
+{
+    int terminate;
+
+    if (!atomic_load(&w->choked))
+        return 0;
+
+    pthread_mutex_lock(&w->lock);
+
+    while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
+        pthread_cond_wait(&w->cond, &w->lock);
+
+    terminate = atomic_load(&sch->terminate);
+
+    pthread_mutex_unlock(&w->lock);
+
+    return terminate;
+}
+
+static void waiter_set(SchWaiter *w, int choked)
+{
+    pthread_mutex_lock(&w->lock);
+
+    atomic_store(&w->choked, choked);
+    pthread_cond_signal(&w->cond);
+
+    pthread_mutex_unlock(&w->lock);
+}
+
+static int waiter_init(SchWaiter *w)
+{
+    int ret;
+
+    atomic_init(&w->choked, 0);
+
+    ret = pthread_mutex_init(&w->lock, NULL);
+    if (ret)
+        return AVERROR(errno);
+
+    ret = pthread_cond_init(&w->cond, NULL);
+    if (ret)
+        return AVERROR(errno);
+
+    return 0;
+}
+
+static void waiter_uninit(SchWaiter *w)
+{
+    pthread_mutex_destroy(&w->lock);
+    pthread_cond_destroy(&w->cond);
+}
+
+static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
+                       enum QueueType type)
+{
+    ThreadQueue *tq;
+    ObjPool *op;
+
+    op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
+                                   objpool_alloc_frames();
+    if (!op)
+        return AVERROR(ENOMEM);
+
+    tq = tq_alloc(nb_streams, queue_size, op,
+                  (type == QUEUE_PACKETS) ? pkt_move : frame_move);
+    if (!tq) {
+        objpool_free(&op);
+        return AVERROR(ENOMEM);
+    }
+
+    *ptq = tq;
+    return 0;
+}
+
+static void *task_wrapper(void *arg);
+
+static int task_stop(SchTask *task)
+{
+    int ret;
+    void *thread_ret;
+
+    if (!task->thread_running)
+        return 0;
+
+    ret = pthread_join(task->thread, &thread_ret);
+    av_assert0(ret == 0);
+
+    task->thread_running = 0;
+
+    return (intptr_t)thread_ret;
+}
+
+static int task_start(SchTask *task)
+{
+    int ret;
+
+    av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
+
+    av_assert0(!task->thread_running);
+
+    ret = pthread_create(&task->thread, NULL, task_wrapper, task);
+    if (ret) {
+        av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
+               strerror(ret));
+        return AVERROR(ret);
+    }
+
+    task->thread_running = 1;
+    return 0;
+}
+
+static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
+                      SchThreadFunc func, void *func_arg)
+{
+    task->parent    = sch;
+
+    task->node.type = type;
+    task->node.idx  = idx;
+
+    task->func      = func;
+    task->func_arg  = func_arg;
+}
+
+int sch_stop(Scheduler *sch)
+{
+    int ret = 0, err;
+
+    atomic_store(&sch->terminate, 1);
+
+    for (unsigned type = 0; type < 2; type++)
+        for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
+            SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
+            waiter_set(w, 1);
+        }
+
+    for (unsigned i = 0; i < sch->nb_demux; i++) {
+        SchDemux *d = &sch->demux[i];
+
+        err = task_stop(&d->task);
+        ret = err_merge(ret, err);
+    }
+
+    for (unsigned i = 0; i < sch->nb_dec; i++) {
+        SchDec *dec = &sch->dec[i];
+
+        err = task_stop(&dec->task);
+        ret = err_merge(ret, err);
+    }
+
+    for (unsigned i = 0; i < sch->nb_filters; i++) {
+        SchFilterGraph *fg = &sch->filters[i];
+
+        err = task_stop(&fg->task);
+        ret = err_merge(ret, err);
+    }
+
+    for (unsigned i = 0; i < sch->nb_enc; i++) {
+        SchEnc *enc = &sch->enc[i];
+
+        err = task_stop(&enc->task);
+        ret = err_merge(ret, err);
+    }
+
+    for (unsigned i = 0; i < sch->nb_mux; i++) {
+        SchMux *mux = &sch->mux[i];
+
+        err = task_stop(&mux->task);
+        ret = err_merge(ret, err);
+    }
+
+    return ret;
+}
+
+void sch_free(Scheduler **psch)
+{
+    Scheduler *sch = *psch;
+
+    if (!sch)
+        return;
+
+    sch_stop(sch);
+
+    for (unsigned i = 0; i < sch->nb_demux; i++) {
+        SchDemux *d = &sch->demux[i];
+
+        for (unsigned j = 0; j < d->nb_streams; j++) {
+            SchDemuxStream *ds = &d->streams[j];
+            av_freep(&ds->dst);
+            av_freep(&ds->dst_finished);
+        }
+        av_freep(&d->streams);
+
+        av_packet_free(&d->send_pkt);
+
+        waiter_uninit(&d->waiter);
+    }
+    av_freep(&sch->demux);
+
+    for (unsigned i = 0; i < sch->nb_mux; i++) {
+        SchMux *mux = &sch->mux[i];
+
+        for (unsigned j = 0; j < mux->nb_streams; j++) {
+            SchMuxStream *ms = &mux->streams[j];
+
+            if (ms->pre_mux_queue.fifo) {
+                AVPacket *pkt;
+                while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
+                    av_packet_free(&pkt);
+                av_fifo_freep2(&ms->pre_mux_queue.fifo);
+            }
+        }
+
+        av_freep(&mux->streams);
+
+        tq_free(&mux->queue);
+    }
+    av_freep(&sch->mux);
+
+    for (unsigned i = 0; i < sch->nb_dec; i++) {
+        SchDec *dec = &sch->dec[i];
+
+        tq_free(&dec->queue);
+
+        av_thread_message_queue_free(&dec->queue_end_ts);
+
+        av_freep(&dec->dst);
+        av_freep(&dec->dst_finished);
+
+        av_frame_free(&dec->send_frame);
+    }
+    av_freep(&sch->dec);
+
+    for (unsigned i = 0; i < sch->nb_enc; i++) {
+        SchEnc *enc = &sch->enc[i];
+
+        tq_free(&enc->queue);
+    }
+    av_freep(&sch->enc);
+
+    for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
+        SchSyncQueue *sq = &sch->sq_enc[i];
+        sq_free(&sq->sq);
+        av_frame_free(&sq->frame);
+        pthread_mutex_destroy(&sq->lock);
+        av_freep(&sq->enc_idx);
+    }
+    av_freep(&sch->sq_enc);
+
+    for (unsigned i = 0; i < sch->nb_filters; i++) {
+        SchFilterGraph *fg = &sch->filters[i];
+
+        tq_free(&fg->queue);
+
+        av_freep(&fg->inputs);
+        av_freep(&fg->outputs);
+
+        waiter_uninit(&fg->waiter);
+    }
+    av_freep(&sch->filters);
+
+    av_freep(&sch->sdp_filename);
+
+    pthread_mutex_destroy(&sch->mux_ready_lock);
+
+    pthread_mutex_destroy(&sch->mux_done_lock);
+    pthread_cond_destroy(&sch->mux_done_cond);
+
+    av_freep(psch);
+}
+
+static const AVClass scheduler_class = {
+    .class_name = "Scheduler",
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+Scheduler *sch_alloc(void)
+{
+    Scheduler *sch;
+    int ret;
+
+    sch = av_mallocz(sizeof(*sch));
+    if (!sch)
+        return NULL;
+
+    sch->class    = &scheduler_class;
+    sch->sdp_auto = 1;
+
+    ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
+    if (ret)
+        goto fail;
+
+    ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
+    if (ret)
+        goto fail;
+
+    ret = pthread_cond_init(&sch->mux_done_cond, NULL);
+    if (ret)
+        goto fail;
+
+    return sch;
+fail:
+    sch_free(&sch);
+    return NULL;
+}
+
+int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
+{
+    av_freep(&sch->sdp_filename);
+    sch->sdp_filename = av_strdup(sdp_filename);
+    return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
+}
+
+static const AVClass sch_mux_class = {
+    .class_name                = "SchMux",
+    .version                   = LIBAVUTIL_VERSION_INT,
+    .parent_log_context_offset = offsetof(SchMux, task.func_arg),
+};
+
+int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
+                void *arg, int sdp_auto)
+{
+    const unsigned idx = sch->nb_mux;
+
+    SchMux *mux;
+    int ret;
+
+    ret = GROW_ARRAY(sch->mux, sch->nb_mux);
+    if (ret < 0)
+        return ret;
+
+    mux             = &sch->mux[idx];
+    mux->class      = &sch_mux_class;
+    mux->init       = init;
+
+    task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
+
+    sch->sdp_auto &= sdp_auto;
+
+    return idx;
+}
+
+int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
+{
+    SchMux       *mux;
+    SchMuxStream *ms;
+    unsigned      stream_idx;
+    int ret;
+
+    av_assert0(mux_idx < sch->nb_mux);
+    mux = &sch->mux[mux_idx];
+
+    ret = GROW_ARRAY(mux->streams, mux->nb_streams);
+    if (ret < 0)
+        return ret;
+    stream_idx = mux->nb_streams - 1;
+
+    ms = &mux->streams[stream_idx];
+
+    ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
+    if (!ms->pre_mux_queue.fifo)
+        return AVERROR(ENOMEM);
+
+    ms->last_dts = AV_NOPTS_VALUE;
+
+    return stream_idx;
+}
+
+static const AVClass sch_demux_class = {
+    .class_name                = "SchDemux",
+    .version                   = LIBAVUTIL_VERSION_INT,
+    .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
+};
+
+int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
+{
+    const unsigned idx = sch->nb_demux;
+
+    SchDemux *d;
+    int ret;
+
+    ret = GROW_ARRAY(sch->demux, sch->nb_demux);
+    if (ret < 0)
+        return ret;
+
+    d = &sch->demux[idx];
+
+    task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
+
+    d->class    = &sch_demux_class;
+    d->send_pkt = av_packet_alloc();
+    if (!d->send_pkt)
+        return AVERROR(ENOMEM);
+
+    ret = waiter_init(&d->waiter);
+    if (ret < 0)
+        return ret;
+
+    return idx;
+}
+
+int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
+{
+    SchDemux *d;
+    int ret;
+
+    av_assert0(demux_idx < sch->nb_demux);
+    d = &sch->demux[demux_idx];
+
+    ret = GROW_ARRAY(d->streams, d->nb_streams);
+    return ret < 0 ? ret : d->nb_streams - 1;
+}
+
+static const AVClass sch_dec_class = {
+    .class_name                = "SchDec",
+    .version                   = LIBAVUTIL_VERSION_INT,
+    .parent_log_context_offset = offsetof(SchDec, task.func_arg),
+};
+
+int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
+                int send_end_ts)
+{
+    const unsigned idx = sch->nb_dec;
+
+    SchDec *dec;
+    int ret;
+
+    ret = GROW_ARRAY(sch->dec, sch->nb_dec);
+    if (ret < 0)
+        return ret;
+
+    dec = &sch->dec[idx];
+
+    task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
+
+    dec->class      = &sch_dec_class;
+    dec->send_frame = av_frame_alloc();
+    if (!dec->send_frame)
+        return AVERROR(ENOMEM);
+
+    ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
+    if (ret < 0)
+        return ret;
+
+    if (send_end_ts) {
+        ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
+        if (ret < 0)
+            return ret;
+    }
+
+    return idx;
+}
+
+static const AVClass sch_enc_class = {
+    .class_name                = "SchEnc",
+    .version                   = LIBAVUTIL_VERSION_INT,
+    .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
+};
+
+int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
+                int (*open_cb)(void *opaque, const AVFrame *frame))
+{
+    const unsigned idx = sch->nb_enc;
+
+    SchEnc *enc;
+    int ret;
+
+    ret = GROW_ARRAY(sch->enc, sch->nb_enc);
+    if (ret < 0)
+        return ret;
+
+    enc             = &sch->enc[idx];
+
+    enc->class      = &sch_enc_class;
+    enc->open_cb    = open_cb;
+    enc->sq_idx[0]  = -1;
+    enc->sq_idx[1]  = -1;
+
+    task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
+
+    ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
+    if (ret < 0)
+        return ret;
+
+    return idx;
+}
+
+static const AVClass sch_fg_class = {
+    .class_name                = "SchFilterGraph",
+    .version                   = LIBAVUTIL_VERSION_INT,
+    .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
+};
+
+int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
+                        SchThreadFunc func, void *ctx)
+{
+    const unsigned idx = sch->nb_filters;
+
+    SchFilterGraph *fg;
+    int ret;
+
+    ret = GROW_ARRAY(sch->filters, sch->nb_filters);
+    if (ret < 0)
+        return ret;
+    fg = &sch->filters[idx];
+
+    fg->class = &sch_fg_class;
+
+    task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
+
+    if (nb_inputs) {
+        fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
+        if (!fg->inputs)
+            return AVERROR(ENOMEM);
+        fg->nb_inputs = nb_inputs;
+    }
+
+    if (nb_outputs) {
+        fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
+        if (!fg->outputs)
+            return AVERROR(ENOMEM);
+        fg->nb_outputs = nb_outputs;
+    }
+
+    ret = waiter_init(&fg->waiter);
+    if (ret < 0)
+        return ret;
+
+    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
+    if (ret < 0)
+        return ret;
+
+    return idx;
+}
+
+int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
+{
+    SchSyncQueue *sq;
+    int ret;
+
+    ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
+    if (ret < 0)
+        return ret;
+    sq = &sch->sq_enc[sch->nb_sq_enc - 1];
+
+    sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
+    if (!sq->sq)
+        return AVERROR(ENOMEM);
+
+    sq->frame = av_frame_alloc();
+    if (!sq->frame)
+        return AVERROR(ENOMEM);
+
+    ret = pthread_mutex_init(&sq->lock, NULL);
+    if (ret)
+        return AVERROR(ret);
+
+    return sq - sch->sq_enc;
+}
+
+int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
+                   int limiting, uint64_t max_frames)
+{
+    SchSyncQueue *sq;
+    SchEnc *enc;
+    int ret;
+
+    av_assert0(sq_idx < sch->nb_sq_enc);
+    sq = &sch->sq_enc[sq_idx];
+
+    av_assert0(enc_idx < sch->nb_enc);
+    enc = &sch->enc[enc_idx];
+
+    ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
+    if (ret < 0)
+        return ret;
+    sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
+
+    ret = sq_add_stream(sq->sq, limiting);
+    if (ret < 0)
+        return ret;
+
+    enc->sq_idx[0] = sq_idx;
+    enc->sq_idx[1] = ret;
+
+    if (max_frames != INT64_MAX)
+        sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
+
+    return 0;
+}
+
+int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
+{
+    int ret;
+
+    switch (src.type) {
+    case SCH_NODE_TYPE_DEMUX: {
+        SchDemuxStream *ds;
+
+        av_assert0(src.idx < sch->nb_demux &&
+                   src.idx_stream < sch->demux[src.idx].nb_streams);
+        ds = &sch->demux[src.idx].streams[src.idx_stream];
+
+        ret = GROW_ARRAY(ds->dst, ds->nb_dst);
+        if (ret < 0)
+            return ret;
+
+        ds->dst[ds->nb_dst - 1] = dst;
+
+        // demuxed packets go to decoding or streamcopy
+        switch (dst.type) {
+        case SCH_NODE_TYPE_DEC: {
+            SchDec *dec;
+
+            av_assert0(dst.idx < sch->nb_dec);
+            dec = &sch->dec[dst.idx];
+
+            av_assert0(!dec->src.type);
+            dec->src = src;
+            break;
+            }
+        case SCH_NODE_TYPE_MUX: {
+            SchMuxStream *ms;
+
+            av_assert0(dst.idx < sch->nb_mux &&
+                       dst.idx_stream < sch->mux[dst.idx].nb_streams);
+            ms = &sch->mux[dst.idx].streams[dst.idx_stream];
+
+            av_assert0(!ms->src.type);
+            ms->src = src;
+
+            break;
+            }
+        default: av_assert0(0);
+        }
+
+        break;
+        }
+    case SCH_NODE_TYPE_DEC: {
+        SchDec *dec;
+
+        av_assert0(src.idx < sch->nb_dec);
+        dec = &sch->dec[src.idx];
+
+        ret = GROW_ARRAY(dec->dst, dec->nb_dst);
+        if (ret < 0)
+            return ret;
+
+        dec->dst[dec->nb_dst - 1] = dst;
+
+        // decoded frames go to filters or encoding
+        switch (dst.type) {
+        case SCH_NODE_TYPE_FILTER_IN: {
+            SchFilterIn *fi;
+
+            av_assert0(dst.idx < sch->nb_filters &&
+                       dst.idx_stream < sch->filters[dst.idx].nb_inputs);
+            fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
+
+            av_assert0(!fi->src.type);
+            fi->src = src;
+            break;
+            }
+        case SCH_NODE_TYPE_ENC: {
+            SchEnc *enc;
+
+            av_assert0(dst.idx < sch->nb_enc);
+            enc = &sch->enc[dst.idx];
+
+            av_assert0(!enc->src.type);
+            enc->src = src;
+            break;
+            }
+        default: av_assert0(0);
+        }
+
+        break;
+        }
+    case SCH_NODE_TYPE_FILTER_OUT: {
+        SchFilterOut *fo;
+        SchEnc      *enc;
+
+        av_assert0(src.idx < sch->nb_filters &&
+                   src.idx_stream < sch->filters[src.idx].nb_outputs);
+        // filtered frames go to encoding
+        av_assert0(dst.type == SCH_NODE_TYPE_ENC &&
+                   dst.idx < sch->nb_enc);
+
+        fo  = &sch->filters[src.idx].outputs[src.idx_stream];
+        enc = &sch->enc[dst.idx];
+
+        av_assert0(!fo->dst.type && !enc->src.type);
+        fo->dst  = dst;
+        enc->src = src;
+
+        break;
+        }
+    case SCH_NODE_TYPE_ENC: {
+        SchEnc       *enc;
+        SchMuxStream *ms;
+
+        av_assert0(src.idx < sch->nb_enc);
+        // encoding packets go to muxing
+        av_assert0(dst.type == SCH_NODE_TYPE_MUX &&
+                   dst.idx < sch->nb_mux &&
+                   dst.idx_stream < sch->mux[dst.idx].nb_streams);
+        enc = &sch->enc[src.idx];
+        ms  = &sch->mux[dst.idx].streams[dst.idx_stream];
+
+        av_assert0(!enc->dst.type && !ms->src.type);
+        enc->dst = dst;
+        ms->src  = src;
+
+        break;
+        }
+    default: av_assert0(0);
+    }
+
+    return 0;
+}
+
+static int mux_task_start(SchMux *mux)
+{
+    int ret = 0;
+
+    ret = task_start(&mux->task);
+    if (ret < 0)
+        return ret;
+
+    /* flush the pre-muxing queues */
+    for (unsigned i = 0; i < mux->nb_streams; i++) {
+        SchMuxStream *ms = &mux->streams[i];
+        AVPacket *pkt;
+        int finished = 0;
+
+        while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0) {
+            if (pkt) {
+                if (!finished)
+                    ret = tq_send(mux->queue, i, pkt);
+                av_packet_free(&pkt);
+                if (ret == AVERROR_EOF)
+                    finished = 1;
+                else if (ret < 0)
+                    return ret;
+            } else
+                tq_send_finish(mux->queue, i);
+        }
+    }
+
+    atomic_store(&mux->mux_started, 1);
+
+    return 0;
+}
+
+int print_sdp(const char *filename);
+
+static int mux_init(Scheduler *sch, SchMux *mux)
+{
+    int ret;
+
+    ret = mux->init(mux->task.func_arg);
+    if (ret < 0)
+        return ret;
+
+    sch->nb_mux_ready++;
+
+    if (sch->sdp_filename || sch->sdp_auto) {
+        if (sch->nb_mux_ready < sch->nb_mux)
+            return 0;
+
+        ret = print_sdp(sch->sdp_filename);
+        if (ret < 0) {
+            av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
+            return ret;
+        }
+
+        /* SDP is written only after all the muxers are ready, so now we
+         * start ALL the threads */
+        for (unsigned i = 0; i < sch->nb_mux; i++) {
+            ret = mux_task_start(&sch->mux[i]);
+            if (ret < 0)
+                return ret;
+        }
+    } else {
+        ret = mux_task_start(mux);
+        if (ret < 0)
+            return ret;
+    }
+
+    return 0;
+}
+
+void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
+                              size_t data_threshold, int max_packets)
+{
+    SchMux       *mux;
+    SchMuxStream *ms;
+
+    av_assert0(mux_idx < sch->nb_mux);
+    mux = &sch->mux[mux_idx];
+
+    av_assert0(stream_idx < mux->nb_streams);
+    ms = &mux->streams[stream_idx];
+
+    ms->pre_mux_queue.max_packets    = max_packets;
+    ms->pre_mux_queue.data_threshold = data_threshold;
+}
+
+int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
+{
+    SchMux *mux;
+    int ret = 0;
+
+    av_assert0(mux_idx < sch->nb_mux);
+    mux = &sch->mux[mux_idx];
+
+    av_assert0(stream_idx < mux->nb_streams);
+
+    pthread_mutex_lock(&sch->mux_ready_lock);
+
+    av_assert0(mux->nb_streams_ready < mux->nb_streams);
+
+    // this may be called during initialization - do not start
+    // threads before sch_start() is called
+    if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started)
+        ret = mux_init(sch, mux);
+
+    pthread_mutex_unlock(&sch->mux_ready_lock);
+
+    return ret;
+}
+
+static int64_t trailing_dts(const Scheduler *sch)
+{
+    int64_t min_dts = INT64_MAX;
+
+    for (unsigned i = 0; i < sch->nb_mux; i++) {
+        const SchMux *mux = &sch->mux[i];
+
+        for (unsigned j = 0; j < mux->nb_streams; j++) {
+            const SchMuxStream *ms = &mux->streams[j];
+
+            if (ms->finished)
+                continue;
+            if (ms->last_dts == AV_NOPTS_VALUE)
+                return AV_NOPTS_VALUE;
+
+            min_dts = FFMIN(min_dts, ms->last_dts);
+        }
+    }
+
+    return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
+}
+
+static void schedule_update_locked(Scheduler *sch)
+{
+    int64_t dts;
+
+    // on termination request all waiters are choked,
+    // we are not to unchoke them
+    if (atomic_load(&sch->terminate))
+        return;
+
+    dts = trailing_dts(sch);
+
+    atomic_store(&sch->last_dts, dts);
+
+    // initialize our internal state
+    for (unsigned type = 0; type < 2; type++)
+        for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
+            SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
+            w->choked_prev = atomic_load(&w->choked);
+            w->choked_next = 1;
+        }
+
+    // figure out the sources that are allowed to proceed
+    for (unsigned i = 0; i < sch->nb_mux; i++) {
+        SchMux *mux = &sch->mux[i];
+
+        for (unsigned j = 0; j < mux->nb_streams; j++) {
+            SchMuxStream *ms = &mux->streams[j];
+            SchDemux *d;
+
+            // unblock sources for output streams that are not finished
+            // and not too far ahead of the trailing stream
+            if (ms->finished)
+                continue;
+            if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
+                continue;
+            if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
+                continue;
+
+            // for outputs fed from filtergraphs, consider that filtergraph's
+            // best_input information, in other cases there is a well-defined
+            // source demuxer
+            if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) {
+                SchFilterGraph *fg = &sch->filters[ms->src_sched.idx];
+                SchFilterIn *fi;
+
+                // the filtergraph contains internal sources and
+                // requested to be scheduled directly
+                if (fg->best_input == fg->nb_inputs) {
+                    fg->waiter.choked_next = 0;
+                    continue;
+                }
+
+                fi = &fg->inputs[fg->best_input];
+                d  = &sch->demux[fi->src_sched.idx];
+            } else
+                d = &sch->demux[ms->src_sched.idx];
+
+            d->waiter.choked_next = 0;
+        }
+    }
+
+    for (unsigned type = 0; type < 2; type++)
+        for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
+            SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
+            if (w->choked_prev != w->choked_next)
+                waiter_set(w, w->choked_next);
+        }
+
+}
+
+int sch_start(Scheduler *sch)
+{
+    int ret;
+
+    sch->transcode_started = 1;
+
+    for (unsigned i = 0; i < sch->nb_mux; i++) {
+        SchMux *mux = &sch->mux[i];
+
+        for (unsigned j = 0; j < mux->nb_streams; j++) {
+            SchMuxStream *ms = &mux->streams[j];
+
+            switch (ms->src.type) {
+            case SCH_NODE_TYPE_ENC: {
+                SchEnc *enc = &sch->enc[ms->src.idx];
+                if (enc->src.type == SCH_NODE_TYPE_DEC) {
+                    ms->src_sched = sch->dec[enc->src.idx].src;
+                    av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
+                } else {
+                    ms->src_sched = enc->src;
+                    av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
+                }
+                break;
+                }
+            case SCH_NODE_TYPE_DEMUX:
+                ms->src_sched = ms->src;
+                break;
+            default:
+                av_log(mux, AV_LOG_ERROR,
+                       "Muxer stream #%u not connected to a source\n", j);
+                return AVERROR(EINVAL);
+            }
+        }
+
+        ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
+        if (ret < 0)
+            return ret;
+
+        if (mux->nb_streams_ready == mux->nb_streams) {
+            ret = mux_init(sch, mux);
+            if (ret < 0)
+                return ret;
+        }
+    }
+
+    for (unsigned i = 0; i < sch->nb_enc; i++) {
+        SchEnc *enc = &sch->enc[i];
+
+        if (!enc->src.type) {
+            av_log(enc, AV_LOG_ERROR,
+                   "Encoder not connected to a source\n");
+            return AVERROR(EINVAL);
+        }
+        if (!enc->dst.type) {
+            av_log(enc, AV_LOG_ERROR,
+                   "Encoder not connected to a sink\n");
+            return AVERROR(EINVAL);
+        }
+
+        ret = task_start(&enc->task);
+        if (ret < 0)
+            return ret;
+    }
+
+    for (unsigned i = 0; i < sch->nb_filters; i++) {
+        SchFilterGraph *fg = &sch->filters[i];
+
+        for (unsigned j = 0; j < fg->nb_inputs; j++) {
+            SchFilterIn *fi = &fg->inputs[j];
+
+            if (!fi->src.type) {
+                av_log(fg, AV_LOG_ERROR,
+                       "Filtergraph input %u not connected to a source\n", j);
+                return AVERROR(EINVAL);
+            }
+
+            fi->src_sched = sch->dec[fi->src.idx].src;
+        }
+
+        for (unsigned j = 0; j < fg->nb_outputs; j++) {
+            SchFilterOut *fo = &fg->outputs[j];
+
+            if (!fo->dst.type) {
+                av_log(fg, AV_LOG_ERROR,
+                       "Filtergraph %u output %u not connected to a sink\n", i, j);
+                return AVERROR(EINVAL);
+            }
+        }
+
+        ret = task_start(&fg->task);
+        if (ret < 0)
+            return ret;
+    }
+
+    for (unsigned i = 0; i < sch->nb_dec; i++) {
+        SchDec *dec = &sch->dec[i];
+
+        if (!dec->src.type) {
+            av_log(dec, AV_LOG_ERROR,
+                   "Decoder not connected to a source\n");
+            return AVERROR(EINVAL);
+        }
+        if (!dec->nb_dst) {
+            av_log(dec, AV_LOG_ERROR,
+                   "Decoder not connected to any sink\n");
+            return AVERROR(EINVAL);
+        }
+
+        dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished));
+        if (!dec->dst_finished)
+            return AVERROR(ENOMEM);
+
+        ret = task_start(&dec->task);
+        if (ret < 0)
+            return ret;
+    }
+
+    for (unsigned i = 0; i < sch->nb_demux; i++) {
+        SchDemux *d = &sch->demux[i];
+
+        if (!d->nb_streams)
+            continue;
+
+        for (unsigned j = 0; j < d->nb_streams; j++) {
+            SchDemuxStream *ds = &d->streams[j];
+
+            if (!ds->nb_dst) {
+                av_log(d, AV_LOG_ERROR,
+                       "Demuxer stream %u not connected to any sink\n", j);
+                return AVERROR(EINVAL);
+            }
+
+            ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
+            if (!ds->dst_finished)
+                return AVERROR(ENOMEM);
+        }
+
+        ret = task_start(&d->task);
+        if (ret < 0)
+            return ret;
+    }
+
+    pthread_mutex_lock(&sch->schedule_lock);
+    schedule_update_locked(sch);
+    pthread_mutex_unlock(&sch->schedule_lock);
+
+    return 0;
+}
+
+int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
+{
+    int ret;
+
+    // convert delay to absolute timestamp
+    timeout_us += av_gettime();
+
+    pthread_mutex_lock(&sch->mux_done_lock);
+
+    if (sch->nb_mux_done < sch->nb_mux) {
+        struct timespec tv = { .tv_sec  =  timeout_us / 1000000,
+                               .tv_nsec = (timeout_us % 1000000) * 1000 };
+        pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
+    }
+
+    ret = sch->nb_mux_done == sch->nb_mux;
+
+    pthread_mutex_unlock(&sch->mux_done_lock);
+
+    *transcode_ts = atomic_load(&sch->last_dts);
+
+    // abort transcoding if any task failed
+    ret |= atomic_load(&sch->task_failed);
+
+    return ret;
+}
+
+static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
+{
+    int ret;
+
+    ret = enc->open_cb(enc->task.func_arg, frame);
+    if (ret < 0)
+        return ret;
+
+    // ret>0 signals audio frame size, which means sync queue should
+    // have been enabled during encoder creation
+    if (ret > 0) {
+        av_assert0(enc->sq_idx[0] >= 0);
+        sq_frame_samples(sch->sq_enc[enc->sq_idx[0]].sq, enc->sq_idx[1], ret);
+    }
+
+    return 0;
+}
+
+static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
+{
+    int ret;
+
+    if (!frame) {
+        tq_send_finish(enc->queue, 0);
+        return 0;
+    }
+
+    if (enc->in_finished)
+        return AVERROR_EOF;
+
+    ret = tq_send(enc->queue, 0, frame);
+    if (ret < 0)
+        enc->in_finished = 1;
+
+    return ret;
+}
+
+static int send_to_sq(Scheduler *sch, SchSyncQueue *sq,
+                      AVFrame *frame, unsigned stream_idx)
+{
+    int ret = 0;
+
+    pthread_mutex_lock(&sq->lock);
+
+    ret = sq_send(sq->sq, stream_idx, SQFRAME(frame));
+    if (ret < 0)
+        goto finish;
+
+    while (1) {
+        SchEnc *enc;
+
+        // TODO: the SQ API should be extended to allow returning EOF
+        // for individual streams
+        ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
+        if (ret == AVERROR(EAGAIN)) {
+            ret = 0;
+            goto finish;
+        } else if (ret < 0) {
+            // close all encoders fed from this sync queue
+            for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
+                int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
+
+                // if the sync queue error is EOF and closing the encoder
+                // produces a more serious error, make sure to pick the latter
+                ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
+            }
+            goto finish;
+        }
+
+        enc = &sch->enc[sq->enc_idx[ret]];
+        ret = send_to_enc_thread(sch, enc, sq->frame);
+        if (ret < 0) {
+            av_assert0(ret == AVERROR_EOF);
+            av_frame_unref(sq->frame);
+            sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
+            continue;
+        }
+    }
+
+finish:
+    pthread_mutex_unlock(&sq->lock);
+
+    return ret;
+}
+
+static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
+{
+    if (enc->open_cb && frame && !enc->opened) {
+        int ret = enc_open(sch, enc, frame);
+        if (ret < 0)
+            return ret;
+        enc->opened = 1;
+
+        // discard empty frames that only carry encoder init parameters
+        if (!frame->buf[0]) {
+            av_frame_unref(frame);
+            return 0;
+        }
+    }
+
+    return (enc->sq_idx[0] >= 0) ?
+           send_to_sq(sch, &sch->sq_enc[enc->sq_idx[0]], frame, enc->sq_idx[1]) :
+           send_to_enc_thread(sch, enc, frame);
+}
+
+static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
+{
+    PreMuxQueue *q = &ms->pre_mux_queue;
+    AVPacket *tmp_pkt = NULL;
+    int ret;
+
+    if (!av_fifo_can_write(q->fifo)) {
+        size_t     packets = av_fifo_can_read(q->fifo);
+        size_t    pkt_size = pkt ? pkt->size : 0;
+        int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
+        size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
+        size_t new_size = FFMIN(2 * packets, max_packets);
+
+        if (new_size <= packets) {
+            av_log(mux, AV_LOG_ERROR,
+                   "Too many packets buffered for output stream.\n");
+            return AVERROR(ENOSPC);
+        }
+        ret = av_fifo_grow2(q->fifo, new_size - packets);
+        if (ret < 0)
+            return ret;
+    }
+
+    if (pkt) {
+        tmp_pkt = av_packet_alloc();
+        if (!tmp_pkt)
+            return AVERROR(ENOMEM);
+
+        av_packet_move_ref(tmp_pkt, pkt);
+        q->data_size += tmp_pkt->size;
+    }
+    av_fifo_write(q->fifo, &tmp_pkt, 1);
+
+    return 0;
+}
+
+static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
+                       AVPacket *pkt)
+{
+    SchMuxStream *ms = &mux->streams[stream_idx];
+    int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE)                    ?
+                  av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q) :
+                  AV_NOPTS_VALUE;
+
+    // queue the packet if the muxer cannot be started yet
+    if (!atomic_load(&mux->mux_started)) {
+        int queued = 0;
+
+        // the muxer could have started between the above atomic check and
+        // locking the mutex, then this block falls through to normal send path
+        pthread_mutex_lock(&sch->mux_ready_lock);
+
+        if (!atomic_load(&mux->mux_started)) {
+            int ret = mux_queue_packet(mux, ms, pkt);
+            queued = ret < 0 ? ret : 1;
+        }
+
+        pthread_mutex_unlock(&sch->mux_ready_lock);
+
+        if (queued < 0)
+            return queued;
+        else if (queued)
+            goto update_schedule;
+    }
+
+    if (pkt) {
+        int ret = tq_send(mux->queue, stream_idx, pkt);
+        if (ret < 0)
+            return ret;
+    } else
+        tq_send_finish(mux->queue, stream_idx);
+
+update_schedule:
+    // TODO: use atomics to check whether this changes trailing dts
+    // to avoid locking unnecesarily
+    if (dts != AV_NOPTS_VALUE || !pkt) {
+        pthread_mutex_lock(&sch->schedule_lock);
+
+        if (pkt) ms->last_dts = dts;
+        else     ms->finished = 1;
+
+        schedule_update_locked(sch);
+
+        pthread_mutex_unlock(&sch->schedule_lock);
+    }
+
+    return 0;
+}
+
+static int
+demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
+                         uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
+{
+    int ret;
+
+    if (*dst_finished)
+        return AVERROR_EOF;
+
+    if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
+        (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
+        av_packet_unref(pkt);
+        pkt = NULL;
+    }
+
+    if (!pkt)
+        goto finish;
+
+    ret = (dst.type == SCH_NODE_TYPE_MUX) ?
+          send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
+          tq_send(sch->dec[dst.idx].queue, 0, pkt);
+    if (ret == AVERROR_EOF)
+        goto finish;
+
+    return ret;
+
+finish:
+    if (dst.type == SCH_NODE_TYPE_MUX)
+        send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
+    else
+        tq_send_finish(sch->dec[dst.idx].queue, 0);
+
+    *dst_finished = 1;
+    return AVERROR_EOF;
+}
+
+static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
+                                 AVPacket *pkt, unsigned flags)
+{
+    unsigned nb_done = 0;
+
+    for (unsigned i = 0; i < ds->nb_dst; i++) {
+        AVPacket *to_send = pkt;
+        uint8_t *finished = &ds->dst_finished[i];
+
+        int ret;
+
+        // sending a packet consumes it, so make a temporary reference if needed
+        if (pkt && i < ds->nb_dst - 1) {
+            to_send = d->send_pkt;
+
+            ret = av_packet_ref(to_send, pkt);
+            if (ret < 0)
+                return ret;
+        }
+
+        ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
+        if (to_send)
+            av_packet_unref(to_send);
+        if (ret == AVERROR_EOF)
+            nb_done++;
+        else if (ret < 0)
+            return ret;
+    }
+
+    return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
+}
+
+static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
+{
+    Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
+
+    av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
+
+    for (unsigned i = 0; i < d->nb_streams; i++) {
+        SchDemuxStream *ds = &d->streams[i];
+
+        for (unsigned j = 0; j < ds->nb_dst; j++) {
+            const SchedulerNode *dst = &ds->dst[j];
+            SchDec *dec;
+            int ret;
+
+            if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
+                continue;
+
+            dec = &sch->dec[dst->idx];
+
+            ret = tq_send(dec->queue, 0, pkt);
+            if (ret < 0)
+                return ret;
+
+            if (dec->queue_end_ts) {
+                Timestamp ts;
+                ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
+                if (ret < 0)
+                    return ret;
+
+                if (max_end_ts.ts == AV_NOPTS_VALUE ||
+                    (ts.ts != AV_NOPTS_VALUE &&
+                     av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
+                    max_end_ts = ts;
+
+            }
+        }
+    }
+
+    pkt->pts       = max_end_ts.ts;
+    pkt->time_base = max_end_ts.tb;
+
+    return 0;
+}
+
+int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
+                   unsigned flags)
+{
+    SchDemux *d;
+    int terminate;
+
+    av_assert0(demux_idx < sch->nb_demux);
+    d = &sch->demux[demux_idx];
+
+    terminate = waiter_wait(sch, &d->waiter);
+    if (terminate)
+        return AVERROR_EXIT;
+
+    // flush the downstreams after seek
+    if (pkt->stream_index == -1)
+        return demux_flush(sch, d, pkt);
+
+    av_assert0(pkt->stream_index < d->nb_streams);
+
+    return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
+}
+
+static int demux_done(Scheduler *sch, unsigned demux_idx)
+{
+    SchDemux *d = &sch->demux[demux_idx];
+    int ret = 0;
+
+    for (unsigned i = 0; i < d->nb_streams; i++) {
+        int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
+        if (err != AVERROR_EOF)
+            ret = err_merge(ret, err);
+    }
+
+    pthread_mutex_lock(&sch->schedule_lock);
+
+    schedule_update_locked(sch);
+
+    pthread_mutex_unlock(&sch->schedule_lock);
+
+    return ret;
+}
+
+int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
+{
+    SchMux *mux;
+    int ret, stream_idx;
+
+    av_assert0(mux_idx < sch->nb_mux);
+    mux = &sch->mux[mux_idx];
+
+    ret = tq_receive(mux->queue, &stream_idx, pkt);
+    pkt->stream_index = stream_idx;
+    return ret;
+}
+
+void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
+{
+    SchMux *mux;
+
+    av_assert0(mux_idx < sch->nb_mux);
+    mux = &sch->mux[mux_idx];
+
+    av_assert0(stream_idx < mux->nb_streams);
+    tq_receive_finish(mux->queue, stream_idx);
+
+    pthread_mutex_lock(&sch->schedule_lock);
+    mux->streams[stream_idx].finished = 1;
+
+    schedule_update_locked(sch);
+
+    pthread_mutex_unlock(&sch->schedule_lock);
+}
+
+static int mux_done(Scheduler *sch, unsigned mux_idx)
+{
+    SchMux *mux = &sch->mux[mux_idx];
+
+    pthread_mutex_lock(&sch->schedule_lock);
+
+    for (unsigned i = 0; i < mux->nb_streams; i++) {
+        tq_receive_finish(mux->queue, i);
+        mux->streams[i].finished = 1;
+    }
+
+    schedule_update_locked(sch);
+
+    pthread_mutex_unlock(&sch->schedule_lock);
+
+    pthread_mutex_lock(&sch->mux_done_lock);
+
+    av_assert0(sch->nb_mux_done < sch->nb_mux);
+    sch->nb_mux_done++;
+
+    pthread_cond_signal(&sch->mux_done_cond);
+
+    pthread_mutex_unlock(&sch->mux_done_lock);
+
+    return 0;
+}
+
+int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
+{
+    SchDec *dec;
+    int ret, dummy;
+
+    av_assert0(dec_idx < sch->nb_dec);
+    dec = &sch->dec[dec_idx];
+
+    // the decoder should have given us post-flush end timestamp in pkt
+    if (dec->expect_end_ts) {
+        Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
+        ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
+        if (ret < 0)
+            return ret;
+
+        dec->expect_end_ts = 0;
+    }
+
+    ret = tq_receive(dec->queue, &dummy, pkt);
+    av_assert0(dummy <= 0);
+
+    // got a flush packet, on the next call to this function the decoder
+    // will give us post-flush end timestamp
+    if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
+        dec->expect_end_ts = 1;
+
+    return ret;
+}
+
+static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
+                          unsigned in_idx, AVFrame *frame)
+{
+    if (frame)
+        return tq_send(fg->queue, in_idx, frame);
+
+    if (!fg->inputs[in_idx].send_finished) {
+        fg->inputs[in_idx].send_finished = 1;
+        tq_send_finish(fg->queue, in_idx);
+
+        // close the control stream when all actual inputs are done
+        if (++fg->nb_inputs_finished == fg->nb_inputs)
+            tq_send_finish(fg->queue, fg->nb_inputs);
+    }
+    return 0;
+}
+
+static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
+                           uint8_t *dst_finished, AVFrame *frame)
+{
+    int ret;
+
+    if (*dst_finished)
+        return AVERROR_EOF;
+
+    if (!frame)
+        goto finish;
+
+    ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
+          send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
+          send_to_enc(sch, &sch->enc[dst.idx], frame);
+    if (ret == AVERROR_EOF)
+        goto finish;
+
+    return ret;
+
+finish:
+    if (dst.type == SCH_NODE_TYPE_FILTER_IN)
+        send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
+    else
+        send_to_enc(sch, &sch->enc[dst.idx], NULL);
+
+    *dst_finished = 1;
+
+    return AVERROR_EOF;
+}
+
+int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
+{
+    SchDec *dec;
+    int ret = 0;
+    unsigned nb_done = 0;
+
+    av_assert0(dec_idx < sch->nb_dec);
+    dec = &sch->dec[dec_idx];
+
+    for (unsigned i = 0; i < dec->nb_dst; i++) {
+        uint8_t *finished = &dec->dst_finished[i];
+        AVFrame *to_send  = frame;
+
+        // sending a frame consumes it, so make a temporary reference if needed
+        if (i < dec->nb_dst - 1) {
+            to_send = dec->send_frame;
+
+            // frame may sometimes contain props only,
+            // e.g. to signal EOF timestamp
+            ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
+                                  av_frame_copy_props(to_send, frame);
+            if (ret < 0)
+                return ret;
+        }
+
+        ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send);
+        if (ret < 0) {
+            av_frame_unref(to_send);
+            if (ret == AVERROR_EOF) {
+                nb_done++;
+                ret = 0;
+                continue;
+            }
+            goto finish;
+        }
+    }
+
+finish:
+    return ret < 0                  ? ret :
+           (nb_done == dec->nb_dst) ? AVERROR_EOF : 0;
+}
+
+static int dec_done(Scheduler *sch, unsigned dec_idx)
+{
+    SchDec *dec = &sch->dec[dec_idx];
+    int ret = 0;
+
+    tq_receive_finish(dec->queue, 0);
+
+    // make sure our source does not get stuck waiting for end timestamps
+    // that will never arrive
+    if (dec->queue_end_ts)
+        av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
+
+    for (unsigned i = 0; i < dec->nb_dst; i++) {
+        int err = dec_send_to_dst(sch, dec->dst[i], &dec->dst_finished[i], NULL);
+        if (err < 0 && err != AVERROR_EOF)
+            ret = err_merge(ret, err);
+    }
+
+    return ret;
+}
+
+int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
+{
+    SchEnc *enc;
+    int ret, dummy;
+
+    av_assert0(enc_idx < sch->nb_enc);
+    enc = &sch->enc[enc_idx];
+
+    ret = tq_receive(enc->queue, &dummy, frame);
+    av_assert0(dummy <= 0);
+
+    return ret;
+}
+
+int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
+{
+    SchEnc *enc;
+
+    av_assert0(enc_idx < sch->nb_enc);
+    enc = &sch->enc[enc_idx];
+
+    return send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, pkt);
+}
+
+static int enc_done(Scheduler *sch, unsigned enc_idx)
+{
+    SchEnc *enc = &sch->enc[enc_idx];
+
+    tq_receive_finish(enc->queue, 0);
+
+    return send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, NULL);
+}
+
+int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
+                       unsigned *in_idx, AVFrame *frame)
+{
+    SchFilterGraph *fg;
+
+    av_assert0(fg_idx < sch->nb_filters);
+    fg = &sch->filters[fg_idx];
+
+    av_assert0(*in_idx <= fg->nb_inputs);
+
+    // update scheduling to account for desired input stream, if it changed
+    //
+    // this check needs no locking because only the filtering thread
+    // updates this value
+    if (*in_idx != fg->best_input) {
+        pthread_mutex_lock(&sch->schedule_lock);
+
+        fg->best_input = *in_idx;
+        schedule_update_locked(sch);
+
+        pthread_mutex_unlock(&sch->schedule_lock);
+    }
+
+    if (*in_idx == fg->nb_inputs) {
+        int terminate = waiter_wait(sch, &fg->waiter);
+        return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
+    }
+
+    while (1) {
+        int ret, idx;
+
+        ret = tq_receive(fg->queue, &idx, frame);
+        if (idx < 0)
+            return AVERROR_EOF;
+        else if (ret >= 0) {
+            *in_idx = idx;
+            return 0;
+        }
+
+        // disregard EOFs for specific streams - they should always be
+        // preceded by an EOF frame
+    }
+}
+
+int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
+{
+    SchFilterGraph *fg;
+
+    av_assert0(fg_idx < sch->nb_filters);
+    fg = &sch->filters[fg_idx];
+
+    av_assert0(out_idx < fg->nb_outputs);
+    return send_to_enc(sch, &sch->enc[fg->outputs[out_idx].dst.idx], frame);
+}
+
+static int filter_done(Scheduler *sch, unsigned fg_idx)
+{
+    SchFilterGraph *fg = &sch->filters[fg_idx];
+    int ret = 0;
+
+    for (unsigned i = 0; i <= fg->nb_inputs; i++)
+        tq_receive_finish(fg->queue, i);
+
+    for (unsigned i = 0; i < fg->nb_outputs; i++) {
+        SchEnc *enc = &sch->enc[fg->outputs[i].dst.idx];
+        int err = send_to_enc(sch, enc, NULL);
+        if (err < 0 && err != AVERROR_EOF)
+            ret = err_merge(ret, err);
+    }
+
+    return ret;
+}
+
+int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
+{
+    SchFilterGraph *fg;
+
+    av_assert0(fg_idx < sch->nb_filters);
+    fg = &sch->filters[fg_idx];
+
+    return send_to_filter(sch, fg, fg->nb_inputs, frame);
+}
+
+static void *task_wrapper(void *arg)
+{
+    SchTask  *task = arg;
+    Scheduler *sch = task->parent;
+    int ret;
+    int err = 0;
+
+    ret = (intptr_t)task->func(task->func_arg);
+    if (ret < 0)
+        av_log(task->func_arg, AV_LOG_ERROR,
+               "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
+
+    switch (task->node.type) {
+    case SCH_NODE_TYPE_DEMUX:       err = demux_done (sch, task->node.idx); break;
+    case SCH_NODE_TYPE_MUX:         err = mux_done   (sch, task->node.idx); break;
+    case SCH_NODE_TYPE_DEC:         err = dec_done   (sch, task->node.idx); break;
+    case SCH_NODE_TYPE_ENC:         err = enc_done   (sch, task->node.idx); break;
+    case SCH_NODE_TYPE_FILTER_IN:   err = filter_done(sch, task->node.idx); break;
+    default: av_assert0(0);
+    }
+
+    ret = err_merge(ret, err);
+
+    // EOF is considered normal termination
+    if (ret == AVERROR_EOF)
+        ret = 0;
+    if (ret < 0)
+        atomic_store(&sch->task_failed, 1);
+
+    av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
+           "Terminating thread with return code %d (%s)\n", ret,
+           ret < 0 ? av_err2str(ret) : "success");
+
+    return (void*)(intptr_t)ret;
+}
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
new file mode 100644
index 0000000000..bba1f07b7b
--- /dev/null
+++ b/fftools/ffmpeg_sched.h
@@ -0,0 +1,461 @@
+/*
+ * Inter-thread scheduling/synchronization.
+ * Copyright (c) 2023 Anton Khirnov
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef FFTOOLS_FFMPEG_SCHED_H
+#define FFTOOLS_FFMPEG_SCHED_H
+
+#include <stddef.h>
+#include <stdint.h>
+
+/*
+ * This file contains the API for the transcode scheduler.
+ *
+ * Overall architecture of the transcoding process involves instances of the
+ * following components:
+ * - demuxers, each containing any number of demuxed streams; demuxed packets
+ *   belonging to some stream are sent to any number of decoders (transcoding)
+ *   and/or muxers (streamcopy);
+ * - decoders, which receive encoded packets from some demuxed stream, decode
+ *   them, and send decoded frames to any number of filtergraph inputs
+ *   (audio/video) or encoders (subtitles);
+ * - filtergraphs, each containing zero or more inputs (0 in case the
+ *   filtergraph contains a lavfi source filter), and one or more outputs; the
+ *   inputs and outputs need not have matching media types;
+ *   each filtergraph input receives decoded frames from some decoder;
+ *   filtered frames from each output are sent to some encoder;
+ * - encoders, which receive decoded frames from some decoder (subtitles) or
+ *   some filtergraph output (audio/video), encode them, and send encoded
+ *   packets to some muxed stream;
+ * - muxers, each containing any number of muxed streams; each muxed stream
+ *   receives encoded packets from some demuxed stream (streamcopy) or some
+ *   encoder (transcoding); those packets are interleaved and written out by the
+ *   muxer.
+ *
+ * There must be at least one muxer instance, otherwise the transcode produces
+ * no output and is meaningless. Otherwise, in a generic transcoding scenario
+ * there may be arbitrary number of instances of any of the above components,
+ * interconnected in various ways.
+ *
+ * The code tries to keep all the output streams across all the muxers in sync
+ * (i.e. at the same DTS), which is accomplished by varying the rates at which
+ * packets are read from different demuxers and lavfi sources. Note that the
+ * degree of control we have over synchronization is fundamentally limited - if
+ * some demuxed streams in the same input are interleaved at different rates
+ * than that at which they are to be muxed (e.g. because an input file is badly
+ * interleaved, or the user changed their speed by mismatching amounts), then
+ * there will be increasing amounts of buffering followed by eventual
+ * transcoding failure.
+ *
+ * N.B. 1: there are meaningful transcode scenarios with no demuxers, e.g.
+ * - encoding and muxing output from filtergraph(s) that have no inputs;
+ * - creating a file that contains nothing but attachments and/or metadata.
+ *
+ * N.B. 2: a filtergraph output could, in principle, feed multiple encoders, but
+ * this is unnecessary because the (a)split filter provides the same
+ * functionality.
+ *
+ * The scheduler, in the above model, is the master object that oversees and
+ * facilitates the transcoding process. The basic idea is that all instances
+ * of the abovementioned components communicate only with the scheduler and not
+ * with each other. The scheduler is then the single place containing the
+ * knowledge about the whole transcoding pipeline.
+ */
+
+struct AVFrame;
+struct AVPacket;
+
+typedef struct Scheduler Scheduler;
+
+enum SchedulerNodeType {
+    SCH_NODE_TYPE_NONE = 0,
+    SCH_NODE_TYPE_DEMUX,
+    SCH_NODE_TYPE_MUX,
+    SCH_NODE_TYPE_DEC,
+    SCH_NODE_TYPE_ENC,
+    SCH_NODE_TYPE_FILTER_IN,
+    SCH_NODE_TYPE_FILTER_OUT,
+};
+
+typedef struct SchedulerNode {
+    enum SchedulerNodeType  type;
+    unsigned                idx;
+    unsigned                idx_stream;
+} SchedulerNode;
+
+typedef void* (*SchThreadFunc)(void *arg);
+
+#define SCH_DSTREAM(file, stream)                           \
+    (SchedulerNode){ .type = SCH_NODE_TYPE_DEMUX,           \
+                     .idx = file, .idx_stream = stream }
+#define SCH_MSTREAM(file, stream)                           \
+    (SchedulerNode){ .type = SCH_NODE_TYPE_MUX,             \
+                     .idx = file, .idx_stream = stream }
+#define SCH_DEC(decoder)                                    \
+    (SchedulerNode){ .type = SCH_NODE_TYPE_DEC,             \
+                    .idx = decoder }
+#define SCH_ENC(encoder)                                    \
+    (SchedulerNode){ .type = SCH_NODE_TYPE_ENC,             \
+                    .idx = encoder }
+#define SCH_FILTER_IN(filter, input)                        \
+    (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_IN,       \
+                     .idx = filter, .idx_stream = input }
+#define SCH_FILTER_OUT(filter, output)                      \
+    (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_OUT,      \
+                     .idx = filter, .idx_stream = output }
+
+Scheduler *sch_alloc(void);
+void sch_free(Scheduler **sch);
+
+int sch_start(Scheduler *sch);
+int sch_stop(Scheduler *sch);
+
+/**
+ * Wait until transcoding terminates or the specified timeout elapses.
+ *
+ * @param timeout_us Amount of time in microseconds after which this function
+ *                   will timeout.
+ * @param transcode_ts Current transcode timestamp in AV_TIME_BASE_Q, for
+ *                     informational purposes only.
+ *
+ * @retval 0 waiting timed out, transcoding is not finished
+ * @retval 1 transcoding is finished
+ */
+int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts);
+
+/**
+ * Add a demuxer to the scheduler.
+ *
+ * @param func Function executed as the demuxer task.
+ * @param ctx Demuxer state; will be passed to func and used for logging.
+ *
+ * @retval ">=0" Index of the newly-created demuxer.
+ * @retval "<0"  Error code.
+ */
+int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx);
+/**
+ * Add a demuxed stream for a previously added demuxer.
+ *
+ * @param demux_idx index previously returned by sch_add_demux()
+ *
+ * @retval ">=0" Index of the newly-created demuxed stream.
+ * @retval "<0"  Error code.
+ */
+int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx);
+
+/**
+ * Add a decoder to the scheduler.
+ *
+ * @param func Function executed as the decoder task.
+ * @param ctx Decoder state; will be passed to func and used for logging.
+ * @param send_end_ts The decoder will return an end timestamp after flush packets
+ *                    are delivered to it. See documentation for
+ *                    sch_dec_receive() for more details.
+ *
+ * @retval ">=0" Index of the newly-created decoder.
+ * @retval "<0"  Error code.
+ */
+int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
+                int send_end_ts);
+
+/**
+ * Add a filtergraph to the scheduler.
+ *
+ * @param nb_inputs Number of filtergraph inputs.
+ * @param nb_outputs number of filtergraph outputs
+ * @param func Function executed as the filtering task.
+ * @param ctx Filter state; will be passed to func and used for logging.
+ *
+ * @retval ">=0" Index of the newly-created filtergraph.
+ * @retval "<0"  Error code.
+ */
+int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
+                        SchThreadFunc func, void *ctx);
+
+/**
+ * Add a muxer to the scheduler.
+ *
+ * Note that muxer thread startup is more complicated than for other components,
+ * because
+ * - muxer streams fed by audio/video encoders become initialized dynamically at
+ *   runtime, after those encoders receive their first frame and initialize
+ *   themselves, followed by calling sch_mux_stream_ready()
+ * - the header can be written after all the streams for a muxer are initialized
+ * - we may need to write an SDP, which must happen
+ *      - AFTER all the headers are written
+ *      - BEFORE any packets are written by any muxer
+ *      - with all the muxers quiescent
+ * To avoid complicated muxer-thread synchronization dances, we postpone
+ * starting the muxer threads until after the SDP is written. The sequence of
+ * events is then as follows:
+ * - After sch_mux_stream_ready() is called for all the streams in a given muxer,
+ *   the header for that muxer is written (care is taken that headers for
+ *   different muxers are not written concurrently, since they write file
+ *   information to stderr). If SDP is not wanted, the muxer thread then starts
+ *   and muxing begins.
+ * - When SDP _is_ wanted, no muxer threads start until the header for the last
+ *   muxer is written. After that, the SDP is written, after which all the muxer
+ *   threads are started at once.
+ *
+ * In order for the above to work, the scheduler needs to be able to invoke
+ * just writing the header, which is the reason the init parameter exists.
+ *
+ * @param func Function executed as the muxing task.
+ * @param init Callback that is called to initialize the muxer and write the
+ *             header. Called after sch_mux_stream_ready() is called for all the
+ *             streams in the muxer.
+ * @param ctx Muxer state; will be passed to func/init and used for logging.
+ * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
+ *
+ * @retval ">=0" Index of the newly-created muxer.
+ * @retval "<0"  Error code.
+ */
+int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
+                void *ctx, int sdp_auto);
+/**
+ * Add a muxed stream for a previously added muxer.
+ *
+ * @param mux_idx index previously returned by sch_add_mux()
+ *
+ * @retval ">=0" Index of the newly-created muxed stream.
+ * @retval "<0"  Error code.
+ */
+int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx);
+
+/**
+ * Configure limits on packet buffering performed before the muxer task is
+ * started.
+ *
+ * @param mux_idx index previously returned by sch_add_mux()
+ * @param stream_idx_idx index previously returned by sch_add_mux_stream()
+ * @param data_threshold Total size of the buffered packets' data after which
+ *                       max_packets applies.
+ * @param max_packets maximum Maximum number of buffered packets after
+ *                            data_threshold is reached.
+ */
+void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
+                              size_t data_threshold, int max_packets);
+
+/**
+ * Signal to the scheduler that the specified muxed stream is initialized and
+ * ready. Muxing is started once all the streams are ready.
+ */
+int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);
+
+/**
+ * Set the file path for the SDP.
+ *
+ * The SDP is written when either of the following is true:
+ * - this function is called at least once
+ * - sdp_auto=1 is passed to EVERY call of sch_add_mux()
+ */
+int sch_sdp_filename(Scheduler *sch, const char *sdp_filename);
+
+/**
+ * Add an encoder to the scheduler.
+ *
+ * @param func Function executed as the encoding task.
+ * @param ctx Encoder state; will be passed to func and used for logging.
+ * @param open_cb This callback, if specified, will be called when the first
+ *                frame is obtained for this encoder. For audio encoders with a
+ *                fixed frame size (which use a sync queue in the scheduler to
+ *                rechunk frames), it must return that frame size on success.
+ *                Otherwise (non-audio, variable frame size) it should return 0.
+ *
+ * @retval ">=0" Index of the newly-created encoder.
+ * @retval "<0"  Error code.
+ */
+int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
+                int (*open_cb)(void *func_arg, const struct AVFrame *frame));
+
+/**
+ * Add an pre-encoding sync queue to the scheduler.
+ *
+ * @param buf_size_us Sync queue buffering size, passed to sq_alloc().
+ * @param logctx Logging context for the sync queue. passed to sq_alloc().
+ *
+ * @retval ">=0" Index of the newly-created sync queue.
+ * @retval "<0"  Error code.
+ */
+int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx);
+int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
+                   int limiting, uint64_t max_frames);
+
+int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst);
+
+enum DemuxSendFlags {
+    /**
+     * Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations
+     * send normally to other types.
+     */
+    DEMUX_SEND_STREAMCOPY_EOF = (1 << 0),
+};
+
+/**
+ * Called by demuxer tasks to communicate with their downstreams. The following
+ * may be sent:
+ * - a demuxed packet for the stream identified by pkt->stream_index;
+ * - demuxer discontinuity/reset (e.g. after a seek) - this is signalled by an
+ *   empty packet with stream_index=-1.
+ *
+ * @param demux_idx demuxer index
+ * @param pkt A demuxed packet to send.
+ *            When flushing (i.e. pkt->stream_index=-1 on entry to this
+ *            function), on successful return pkt->pts/pkt->time_base will be
+ *            set to the maximum end timestamp of any decoded audio stream, or
+ *            AV_NOPTS_VALUE if no decoded audio streams are present.
+ *
+ * @retval "non-negative value" success
+ * @retval AVERROR_EOF all consumers for the stream are done
+ * @retval AVERROR_EXIT all consumers are done, should terminate demuxing
+ * @retval "anoter negative error code" other failure
+ */
+int sch_demux_send(Scheduler *sch, unsigned demux_idx, struct AVPacket *pkt,
+                   unsigned flags);
+
+/**
+ * Called by decoder tasks to receive a packet for decoding.
+ *
+ * @param dec_idx decoder index
+ * @param pkt Input packet will be written here on success.
+ *
+ *            An empty packet signals that the decoder should be flushed, but
+ *            more packets will follow (e.g. after seeking). When a decoder
+ *            created with send_end_ts=1 receives a flush packet, it must write
+ *            the end timestamp of the stream after flushing to
+ *            pkt->pts/time_base on the next call to this function (if any).
+ *
+ * @retval "non-negative value" success
+ * @retval AVERROR_EOF no more packets will arrive, should terminate decoding
+ * @retval "another negative error code" other failure
+ */
+int sch_dec_receive(Scheduler *sch, unsigned dec_idx, struct AVPacket *pkt);
+
+/**
+ * Called by decoder tasks to send a decoded frame downstream.
+ *
+ * @param dec_idx Decoder index previously returned by sch_add_dec().
+ * @param frame Decoded frame; on success it is consumed and cleared by this
+ *              function
+ *
+ * @retval ">=0" success
+ * @retval AVERROR_EOF all consumers are done, should terminate decoding
+ * @retval "another negative error code" other failure
+ */
+int sch_dec_send(Scheduler *sch, unsigned dec_idx, struct AVFrame *frame);
+
+/**
+ * Called by filtergraph tasks to obtain frames for filtering. Will wait for a
+ * frame to become available and return it in frame.
+ *
+ * Filtergraphs that contain lavfi sources and do not currently require new
+ * input frames should call this function as a means of rate control - then
+ * in_idx should be set equal to nb_inputs on entry to this function.
+ *
+ * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
+ * @param[in,out] in_idx On input contains the index of the input on which a frame
+ *                       is most desired. May be set to nb_inputs to signal that
+ *                       the filtergraph does not need more input currently.
+ *
+ *                       On success, will be replaced with the input index of
+ *                       the actually returned frame or EOF timestamp.
+ *
+ * @retval ">=0" Frame data or EOF timestamp was delivered into frame, in_idx
+ *               contains the index of the input it belongs to.
+ * @retval AVERROR(EAGAIN) No frame was returned, the filtergraph should
+ *                         resume filtering. May only be returned when
+ *                         in_idx=nb_inputs on entry to this function.
+ * @retval AVERROR_EOF No more frames will arrive, should terminate filtering.
+ */
+int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
+                       unsigned *in_idx, struct AVFrame *frame);
+
+/**
+ * Called by filtergraph tasks to send a filtered frame or EOF to consumers.
+ *
+ * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
+ * @param out_idx Index of the output which produced the frame.
+ * @param frame The frame to send to consumers. When NULL, signals that no more
+ *              frames will be produced for the specified output. When non-NULL,
+ *              the frame is consumed and cleared by this function on success.
+ *
+ * @retval "non-negative value" success
+ * @retval AVERROR_EOF all consumers are done
+ * @retval "anoter negative error code" other failure
+ */
+int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx,
+                    struct AVFrame *frame);
+
+int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame);
+
+/**
+ * Called by encoder tasks to obtain frames for encoding. Will wait for a frame
+ * to become available and return it in frame.
+ *
+ * @param enc_idx Encoder index previously returned by sch_add_enc().
+ * @param frame   Newly-received frame will be stored here on success. Must be
+ *                clean on entrance to this function.
+ *
+ * @retval 0 A frame was successfully delivered into frame.
+ * @retval AVERROR_EOF No more frames will be delivered, the encoder should
+ *                     flush everything and terminate.
+ *
+ */
+int sch_enc_receive(Scheduler *sch, unsigned enc_idx, struct AVFrame *frame);
+
+/**
+ * Called by encoder tasks to send encoded packets downstream.
+ *
+ * @param enc_idx Encoder index previously returned by sch_add_enc().
+ * @param pkt     An encoded packet; it will be consumed and cleared by this
+ *                function on success.
+ *
+ * @retval 0     success
+ * @retval "<0"  Error code.
+ */
+int sch_enc_send   (Scheduler *sch, unsigned enc_idx, struct AVPacket *pkt);
+
+/**
+ * Called by muxer tasks to obtain packets for muxing. Will wait for a packet
+ * for any muxed stream to become available and return it in pkt.
+ *
+ * @param mux_idx Muxer index previously returned by sch_add_mux().
+ * @param pkt     Newly-received packet will be stored here on success. Must be
+ *                clean on entrance to this function.
+ *
+ * @retval 0 A packet was successfully delivered into pkt. Its stream_index
+ *           corresponds to a stream index previously returned from
+ *           sch_add_mux_stream().
+ * @retval AVERROR_EOF When pkt->stream_index is non-negative, this signals that
+ *                     no more packets will be delivered for this stream index.
+ *                     Otherwise this indicates that no more packets will be
+ *                     delivered for any stream and the muxer should therefore
+ *                     flush everything and terminate.
+ */
+int sch_mux_receive(Scheduler *sch, unsigned mux_idx, struct AVPacket *pkt);
+
+/**
+ * Called by muxer tasks to signal that a stream will no longer accept input.
+ *
+ * @param stream_idx Stream index previously returned from sch_add_mux_stream().
+ */
+void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);
+
+#endif /* FFTOOLS_FFMPEG_SCHED_H */
-- 
2.42.0



More information about the ffmpeg-devel mailing list