[FFmpeg-devel] [PATCH 21/27] WIP fftools/ffmpeg: add thread-aware transcode scheduling infrastructure
Anton Khirnov
anton at khirnov.net
Tue Sep 19 22:10:48 EEST 2023
See the comment block at the top of fftools/ffmpeg_sched.h for more
details on what this scheduler is for.
This commit adds the scheduling code itself, along with minimal
integration with the rest of the program:
* allocating and freeing the scheduler
* passing it throughout the call stack in order to register the
individual components (demuxers/decoders/filtergraphs/encoders/muxers)
with the scheduler
The scheduler is not actually used as of this commit, so it should not
result in any change in behavior. That will change in future commits.
---
fftools/Makefile | 1 +
fftools/ffmpeg.c | 18 +-
fftools/ffmpeg.h | 26 +-
fftools/ffmpeg_dec.c | 10 +-
fftools/ffmpeg_demux.c | 44 +-
fftools/ffmpeg_enc.c | 13 +-
fftools/ffmpeg_filter.c | 38 +-
fftools/ffmpeg_mux.c | 15 +-
fftools/ffmpeg_mux.h | 10 +
fftools/ffmpeg_mux_init.c | 70 +-
fftools/ffmpeg_opt.c | 22 +-
fftools/ffmpeg_sched.c | 1703 +++++++++++++++++++++++++++++++++++++
fftools/ffmpeg_sched.h | 414 +++++++++
13 files changed, 2332 insertions(+), 52 deletions(-)
create mode 100644 fftools/ffmpeg_sched.c
create mode 100644 fftools/ffmpeg_sched.h
diff --git a/fftools/Makefile b/fftools/Makefile
index 56820e6bc8..d6a8913a7f 100644
--- a/fftools/Makefile
+++ b/fftools/Makefile
@@ -18,6 +18,7 @@ OBJS-ffmpeg += \
fftools/ffmpeg_mux.o \
fftools/ffmpeg_mux_init.o \
fftools/ffmpeg_opt.o \
+ fftools/ffmpeg_sched.o \
fftools/objpool.o \
fftools/sync_queue.o \
fftools/thread_queue.o \
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index e084318864..995424ca93 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -99,6 +99,7 @@
#include "cmdutils.h"
#include "ffmpeg.h"
+#include "ffmpeg_sched.h"
#include "sync_queue.h"
const char program_name[] = "ffmpeg";
@@ -1155,7 +1156,7 @@ static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
/*
* The following code is the main loop of the file converter
*/
-static int transcode(int *err_rate_exceeded)
+static int transcode(Scheduler *sch, int *err_rate_exceeded)
{
int ret = 0, i;
InputStream *ist;
@@ -1293,6 +1294,8 @@ static int64_t getmaxrss(void)
int main(int argc, char **argv)
{
+ Scheduler *sch = NULL;
+
int ret, err_rate_exceeded;
BenchmarkTimeStamps ti;
@@ -1310,8 +1313,14 @@ int main(int argc, char **argv)
show_banner(argc, argv, options);
+ sch = sch_alloc();
+ if (!sch) {
+ ret = AVERROR(ENOMEM);
+ goto finish;
+ }
+
/* parse options and open all input/output files */
- ret = ffmpeg_parse_options(argc, argv);
+ ret = ffmpeg_parse_options(argc, argv, sch);
if (ret < 0)
goto finish;
@@ -1329,7 +1338,7 @@ int main(int argc, char **argv)
}
current_time = ti = get_benchmark_time_stamps();
- ret = transcode(&err_rate_exceeded);
+ ret = transcode(sch, &err_rate_exceeded);
if (ret >= 0 && do_benchmark) {
int64_t utime, stime, rtime;
current_time = get_benchmark_time_stamps();
@@ -1349,5 +1358,8 @@ finish:
ret = 0;
ffmpeg_cleanup(ret);
+
+ sch_free(&sch);
+
return ret;
}
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index a4fd825749..278216e5ff 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -27,6 +27,7 @@
#include <signal.h>
#include "cmdutils.h"
+#include "ffmpeg_sched.h"
#include "sync_queue.h"
#include "libavformat/avformat.h"
@@ -731,7 +732,8 @@ int parse_and_set_vsync(const char *arg, int *vsync_var, int file_idx, int st_id
int check_filter_outputs(void);
int filtergraph_is_simple(const FilterGraph *fg);
int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
- char *graph_desc);
+ char *graph_desc,
+ Scheduler *sch, unsigned sch_idx_enc);
int init_complex_filtergraph(FilterGraph *fg);
int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src);
@@ -754,7 +756,8 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t
*/
int ifilter_parameters_from_dec(InputFilter *ifilter, const AVCodecContext *dec);
-int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost);
+int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
+ unsigned sched_idx_enc);
/**
* Create a new filtergraph in the global filtergraph list.
@@ -762,7 +765,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost);
* @param graph_desc Graph description; an av_malloc()ed string, filtergraph
* takes ownership of it.
*/
-int fg_create(FilterGraph **pfg, char *graph_desc);
+int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
void fg_free(FilterGraph **pfg);
@@ -786,7 +789,7 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
*/
int reap_filters(FilterGraph *fg, int flush);
-int ffmpeg_parse_options(int argc, char **argv);
+int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
void enc_stats_write(OutputStream *ost, EncStats *es,
const AVFrame *frame, const AVPacket *pkt,
@@ -809,7 +812,7 @@ AVBufferRef *hw_device_for_filter(void);
int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
-int dec_open(InputStream *ist);
+int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
void dec_free(Decoder **pdec);
/**
@@ -823,7 +826,8 @@ void dec_free(Decoder **pdec);
*/
int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-int enc_alloc(Encoder **penc, const AVCodec *codec);
+int enc_alloc(Encoder **penc, const AVCodec *codec,
+ Scheduler *sch, unsigned sch_idx);
void enc_free(Encoder **penc);
int enc_open(OutputStream *ost, const AVFrame *frame);
@@ -839,7 +843,7 @@ int enc_flush(void);
*/
int of_stream_init(OutputFile *of, OutputStream *ost);
int of_write_trailer(OutputFile *of);
-int of_open(const OptionsContext *o, const char *filename);
+int of_open(const OptionsContext *o, const char *filename, Scheduler *sch);
void of_free(OutputFile **pof);
void of_enc_stats_close(void);
@@ -853,7 +857,7 @@ int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
int64_t of_filesize(OutputFile *of);
-int ifile_open(const OptionsContext *o, const char *filename);
+int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
void ifile_close(InputFile **f);
/**
@@ -961,4 +965,10 @@ static inline void frame_move(void *dst, void *src)
av_frame_move_ref(dst, src);
}
+void *muxer_thread(void *arg);
+void *decoder_thread(void *arg);
+void *encoder_thread(void *arg);
+
+int print_sdp(const char *filename);
+
#endif /* FFTOOLS_FFMPEG_H */
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 36163195ca..dc8d0374a3 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -51,6 +51,9 @@ struct Decoder {
AVFrame *sub_prev[2];
AVFrame *sub_heartbeat;
+ Scheduler *sch;
+ unsigned sch_idx;
+
pthread_t thread;
/**
* Queue for sending coded packets from the main thread to
@@ -667,7 +670,7 @@ fail:
return AVERROR(ENOMEM);
}
-static void *decoder_thread(void *arg)
+void *decoder_thread(void *arg)
{
InputStream *ist = arg;
InputFile *ifile = input_files[ist->file_index];
@@ -1048,7 +1051,7 @@ static int hw_device_setup_for_decode(InputStream *ist)
return 0;
}
-int dec_open(InputStream *ist)
+int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
{
Decoder *d;
const AVCodec *codec = ist->dec;
@@ -1066,6 +1069,9 @@ int dec_open(InputStream *ist)
return ret;
d = ist->decoder;
+ d->sch = sch;
+ d->sch_idx = sch_idx;
+
if (codec->type == AVMEDIA_TYPE_SUBTITLE && ist->fix_sub_duration) {
for (int i = 0; i < FF_ARRAY_ELEMS(d->sub_prev); i++) {
d->sub_prev[i] = av_frame_alloc();
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index ea74b45663..074546d517 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -20,6 +20,7 @@
#include <stdint.h>
#include "ffmpeg.h"
+#include "ffmpeg_sched.h"
#include "objpool.h"
#include "thread_queue.h"
@@ -59,6 +60,9 @@ typedef struct DemuxStream {
// name used for logging
char log_name[32];
+ int sch_idx_stream;
+ int sch_idx_dec;
+
double ts_scale;
int streamcopy_needed;
@@ -110,6 +114,7 @@ typedef struct Demuxer {
double readrate_initial_burst;
+ Scheduler *sch;
ThreadQueue *thread_queue;
int thread_queue_size;
pthread_t thread;
@@ -824,7 +829,9 @@ void ifile_close(InputFile **pf)
static int ist_use(InputStream *ist, int decoding_needed)
{
+ Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]);
DemuxStream *ds = ds_from_ist(ist);
+ int ret;
if (ist->user_set_discard == AVDISCARD_ALL) {
av_log(ist, AV_LOG_ERROR, "Cannot %s a disabled input stream\n",
@@ -832,13 +839,30 @@ static int ist_use(InputStream *ist, int decoding_needed)
return AVERROR(EINVAL);
}
+ if (ds->sch_idx_stream < 0) {
+ ret = sch_add_demux_stream(d->sch, d->f.index);
+ if (ret < 0)
+ return ret;
+ ds->sch_idx_stream = ret;
+ }
+
ist->discard = 0;
ist->st->discard = ist->user_set_discard;
ist->decoding_needed |= decoding_needed;
ds->streamcopy_needed |= !decoding_needed;
- if (decoding_needed && !avcodec_is_open(ist->dec_ctx)) {
- int ret = dec_open(ist);
+ if (decoding_needed && ds->sch_idx_dec < 0) {
+ ret = sch_add_dec(d->sch, decoder_thread, ist);
+ if (ret < 0)
+ return ret;
+ ds->sch_idx_dec = ret;
+
+ ret = sch_connect(d->sch, SCH_DSTREAM(d->f.index, ds->sch_idx_stream),
+ SCH_DEC(ds->sch_idx_dec), NULL, NULL);
+ if (ret < 0)
+ return ret;
+
+ ret = dec_open(ist, d->sch, ds->sch_idx_dec);
if (ret < 0)
return ret;
}
@@ -848,6 +872,7 @@ static int ist_use(InputStream *ist, int decoding_needed)
int ist_output_add(InputStream *ist, OutputStream *ost)
{
+ DemuxStream *ds = ds_from_ist(ist);
int ret;
ret = ist_use(ist, ost->enc ? DECODING_FOR_OST : 0);
@@ -860,11 +885,12 @@ int ist_output_add(InputStream *ist, OutputStream *ost)
ist->outputs[ist->nb_outputs - 1] = ost;
- return 0;
+ return ost->enc ? ds->sch_idx_dec : ds->sch_idx_stream;
}
int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
{
+ DemuxStream *ds = ds_from_ist(ist);
int ret;
ret = ist_use(ist, is_simple ? DECODING_FOR_OST : DECODING_FOR_FILTER);
@@ -882,7 +908,7 @@ int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
if (ret < 0)
return ret;
- return 0;
+ return ds->sch_idx_dec;
}
static int choose_decoder(const OptionsContext *o, AVFormatContext *s, AVStream *st,
@@ -1009,6 +1035,9 @@ static DemuxStream *demux_stream_alloc(Demuxer *d, AVStream *st)
if (!ds)
return NULL;
+ ds->sch_idx_stream = -1;
+ ds->sch_idx_dec = -1;
+
ds->ist.st = st;
ds->ist.file_index = f->index;
ds->ist.index = st->index;
@@ -1339,7 +1368,7 @@ static Demuxer *demux_alloc(void)
return d;
}
-int ifile_open(const OptionsContext *o, const char *filename)
+int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
{
Demuxer *d;
InputFile *f;
@@ -1366,6 +1395,11 @@ int ifile_open(const OptionsContext *o, const char *filename)
f = &d->f;
+ ret = sch_add_demux(sch, input_thread, d);
+ if (ret < 0)
+ return ret;
+ d->sch = sch;
+
if (stop_time != INT64_MAX && recording_time != INT64_MAX) {
stop_time = INT64_MAX;
av_log(d, AV_LOG_WARNING, "-t and -to cannot be used together; using -t.\n");
diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
index ea542173c5..9bede78a1e 100644
--- a/fftools/ffmpeg_enc.c
+++ b/fftools/ffmpeg_enc.c
@@ -55,6 +55,9 @@ struct Encoder {
int opened;
int finished;
+ Scheduler *sch;
+ unsigned sch_idx;
+
pthread_t thread;
/**
* Queue for sending frames from the main thread to
@@ -112,7 +115,8 @@ void enc_free(Encoder **penc)
av_freep(penc);
}
-int enc_alloc(Encoder **penc, const AVCodec *codec)
+int enc_alloc(Encoder **penc, const AVCodec *codec,
+ Scheduler *sch, unsigned sch_idx)
{
Encoder *enc;
@@ -132,6 +136,9 @@ int enc_alloc(Encoder **penc, const AVCodec *codec)
if (!enc->pkt)
goto fail;
+ enc->sch = sch;
+ enc->sch_idx = sch_idx;
+
*penc = enc;
return 0;
@@ -216,8 +223,6 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost)
return 0;
}
-static void *encoder_thread(void *arg);
-
static int enc_thread_start(OutputStream *ost)
{
Encoder *e = ost->enc;
@@ -1028,7 +1033,7 @@ fail:
return AVERROR(ENOMEM);
}
-static void *encoder_thread(void *arg)
+void *encoder_thread(void *arg)
{
OutputStream *ost = arg;
OutputFile *of = output_files[ost->file_index];
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 04c4b4ea7b..e8e78f5454 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -72,6 +72,9 @@ typedef struct FilterGraphPriv {
// frame for sending output to the encoder
AVFrame *frame_enc;
+ Scheduler *sch;
+ unsigned sch_idx;
+
pthread_t thread;
/**
* Queue for sending frames from the main thread to the filtergraph. Has
@@ -742,14 +745,20 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
{
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
- int ret;
+ int ret, dec_idx;
av_assert0(!ifp->ist);
ifp->ist = ist;
ifp->type_src = ist->st->codecpar->codec_type;
- ret = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph));
+ dec_idx = ist_filter_add(ist, ifilter, filtergraph_is_simple(ifilter->graph));
+ if (dec_idx < 0)
+ return dec_idx;
+
+ ret = sch_connect(fgp->sch, SCH_DEC(dec_idx),
+ SCH_FILTER_IN(fgp->sch_idx, ifp->index),
+ NULL, NULL);
if (ret < 0)
return ret;
@@ -805,13 +814,15 @@ static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
return 0;
}
-int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost)
+int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
+ unsigned sched_idx_enc)
{
const OutputFile *of = output_files[ost->file_index];
OutputFilterPriv *ofp = ofp_from_ofilter(ofilter);
FilterGraph *fg = ofilter->graph;
FilterGraphPriv *fgp = fgp_from_fg(fg);
const AVCodec *c = ost->enc_ctx->codec;
+ int ret;
av_assert0(!ofilter->ost);
@@ -894,6 +905,11 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost)
break;
}
+ ret = sch_connect(fgp->sch, SCH_FILTER_OUT(fgp->sch_idx, ofp->index),
+ SCH_ENC(sched_idx_enc), NULL, NULL);
+ if (ret < 0)
+ return ret;
+
fgp->nb_outputs_bound++;
av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
@@ -1023,7 +1039,7 @@ static const AVClass fg_class = {
.category = AV_CLASS_CATEGORY_FILTER,
};
-int fg_create(FilterGraph **pfg, char *graph_desc)
+int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
{
FilterGraphPriv *fgp;
FilterGraph *fg;
@@ -1044,6 +1060,7 @@ int fg_create(FilterGraph **pfg, char *graph_desc)
fg->index = nb_filtergraphs - 1;
fgp->graph_desc = graph_desc;
fgp->disable_conversions = !auto_conversion_filters;
+ fgp->sch = sch;
snprintf(fgp->log_name, sizeof(fgp->log_name), "fc#%d", fg->index);
@@ -1103,6 +1120,12 @@ int fg_create(FilterGraph **pfg, char *graph_desc)
goto fail;
}
+ ret = sch_add_filtergraph(sch, fg->nb_inputs, fg->nb_outputs,
+ filter_thread, fgp);
+ if (ret < 0)
+ goto fail;
+ fgp->sch_idx = ret;
+
fail:
avfilter_inout_free(&inputs);
avfilter_inout_free(&outputs);
@@ -1115,13 +1138,14 @@ fail:
}
int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
- char *graph_desc)
+ char *graph_desc,
+ Scheduler *sch, unsigned sched_idx_enc)
{
FilterGraph *fg;
FilterGraphPriv *fgp;
int ret;
- ret = fg_create(&fg, graph_desc);
+ ret = fg_create(&fg, graph_desc, sch);
if (ret < 0)
return ret;
fgp = fgp_from_fg(fg);
@@ -1147,7 +1171,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
if (ret < 0)
return ret;
- ret = ofilter_bind_ost(fg->outputs[0], ost);
+ ret = ofilter_bind_ost(fg->outputs[0], ost, sched_idx_enc);
if (ret < 0)
return ret;
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 033894ae86..9628728d95 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -235,7 +235,7 @@ fail:
return AVERROR(ENOMEM);
}
-static void *muxer_thread(void *arg)
+void *muxer_thread(void *arg)
{
Muxer *mux = arg;
OutputFile *of = &mux->of;
@@ -557,7 +557,7 @@ static int thread_start(Muxer *mux)
return 0;
}
-static int print_sdp(void)
+int print_sdp(const char *filename)
{
char sdp[16384];
int i;
@@ -590,19 +590,18 @@ static int print_sdp(void)
if (ret < 0)
goto fail;
- if (!sdp_filename) {
+ if (!filename) {
printf("SDP:\n%s\n", sdp);
fflush(stdout);
} else {
- ret = avio_open2(&sdp_pb, sdp_filename, AVIO_FLAG_WRITE, &int_cb, NULL);
+ ret = avio_open2(&sdp_pb, filename, AVIO_FLAG_WRITE, &int_cb, NULL);
if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", sdp_filename);
+ av_log(NULL, AV_LOG_ERROR, "Failed to open sdp file '%s'\n", filename);
goto fail;
}
avio_print(sdp_pb, sdp);
avio_closep(&sdp_pb);
- av_freep(&sdp_filename);
}
// SDP successfully written, allow muxer threads to start
@@ -638,7 +637,7 @@ int mux_check_init(Muxer *mux)
nb_output_dumped++;
if (sdp_filename || want_sdp) {
- ret = print_sdp();
+ ret = print_sdp(sdp_filename);
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
return ret;
@@ -961,6 +960,8 @@ void of_free(OutputFile **pof)
ost_free(&of->streams[i]);
av_freep(&of->streams);
+ av_freep(&mux->sch_stream_idx);
+
av_dict_free(&mux->opts);
av_packet_free(&mux->sq_pkt);
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index a2bb4dfc7d..d5aba6db36 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -24,6 +24,7 @@
#include <stdatomic.h>
#include <stdint.h>
+#include "ffmpeg_sched.h"
#include "thread_queue.h"
#include "libavformat/avformat.h"
@@ -50,6 +51,9 @@ typedef struct MuxStream {
EncStats stats;
+ int sched_idx;
+ int sched_idx_enc;
+
int64_t max_frames;
/*
@@ -94,6 +98,12 @@ typedef struct Muxer {
AVFormatContext *fc;
+ Scheduler *sch;
+
+ // OutputStream indices indexed by scheduler stream indices
+ int *sch_stream_idx;
+ int nb_sch_stream_idx;
+
pthread_t thread;
ThreadQueue *tq;
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index f35680e355..3380cbeb5c 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -23,6 +23,7 @@
#include "cmdutils.h"
#include "ffmpeg.h"
#include "ffmpeg_mux.h"
+#include "ffmpeg_sched.h"
#include "fopen_utf8.h"
#include "libavformat/avformat.h"
@@ -436,6 +437,9 @@ static MuxStream *mux_stream_alloc(Muxer *mux, enum AVMediaType type)
ms->ost.class = &output_stream_class;
+ ms->sched_idx = -1;
+ ms->sched_idx_enc = -1;
+
snprintf(ms->log_name, sizeof(ms->log_name), "%cost#%d:%d",
type_str ? *type_str : '?', mux->of.index, ms->ost.index);
@@ -1123,6 +1127,22 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (!ms)
return AVERROR(ENOMEM);
+ // only streams with sources (i.e. not attachments)
+ // are handled by the scheduler
+ if (ist || ofilter) {
+ ret = GROW_ARRAY(mux->sch_stream_idx, mux->nb_sch_stream_idx);
+ if (ret < 0)
+ return ret;
+
+ ret = sch_add_mux_stream(mux->sch, mux->of.index);
+ if (ret < 0)
+ return ret;
+
+ av_assert0(ret == mux->nb_sch_stream_idx - 1);
+ mux->sch_stream_idx[ret] = ms->ost.index;
+ ms->sched_idx = ret;
+ }
+
ost = &ms->ost;
if (o->streamid) {
@@ -1166,7 +1186,12 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
if (!ost->enc_ctx)
return AVERROR(ENOMEM);
- ret = enc_alloc(&ost->enc, enc);
+ ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
+ if (ret < 0)
+ return ret;
+ ms->sched_idx_enc = ret;
+
+ ret = enc_alloc(&ost->enc, enc, mux->sch, ms->sched_idx_enc);
if (ret < 0)
return ret;
@@ -1421,23 +1446,48 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
(type == AVMEDIA_TYPE_VIDEO || type == AVMEDIA_TYPE_AUDIO)) {
if (ofilter) {
ost->filter = ofilter;
- ret = ofilter_bind_ost(ofilter, ost);
+ ret = ofilter_bind_ost(ofilter, ost, ms->sched_idx_enc);
if (ret < 0)
return ret;
} else {
- ret = init_simple_filtergraph(ost->ist, ost, filters);
+ ret = init_simple_filtergraph(ost->ist, ost, filters,
+ mux->sch, ms->sched_idx_enc);
if (ret < 0) {
av_log(ost, AV_LOG_ERROR,
"Error initializing a simple filtergraph\n");
return ret;
}
}
+
+ ret = sch_connect(mux->sch, SCH_ENC(ms->sched_idx_enc),
+ SCH_MSTREAM(ost->file_index, ms->sched_idx),
+ NULL, NULL);
+ if (ret < 0)
+ return ret;
} else if (ost->ist) {
- ret = ist_output_add(ost->ist, ost);
- if (ret < 0) {
+ int sched_idx = ist_output_add(ost->ist, ost);
+ if (sched_idx < 0) {
av_log(ost, AV_LOG_ERROR,
"Error binding an input stream\n");
- return ret;
+ return sched_idx;
+ }
+
+ if (ost->enc) {
+ ret = sch_connect(mux->sch, SCH_DEC(sched_idx), SCH_ENC(ms->sched_idx_enc),
+ NULL, NULL);
+ if (ret < 0)
+ return ret;
+
+ ret = sch_connect(mux->sch, SCH_ENC(ms->sched_idx_enc),
+ SCH_MSTREAM(ost->file_index, ms->sched_idx),
+ NULL, NULL);
+ if (ret < 0)
+ return ret;
+ } else {
+ ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx),
+ SCH_MSTREAM(ost->file_index, ms->sched_idx), NULL, NULL);
+ if (ret < 0)
+ return ret;
}
}
@@ -2617,7 +2667,7 @@ static Muxer *mux_alloc(void)
return mux;
}
-int of_open(const OptionsContext *o, const char *filename)
+int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
{
Muxer *mux;
AVFormatContext *oc;
@@ -2687,6 +2737,12 @@ int of_open(const OptionsContext *o, const char *filename)
AVFMT_FLAG_BITEXACT);
}
+ err = sch_add_mux(sch, muxer_thread, NULL, mux,
+ !strcmp(oc->oformat->name, "rtp"));
+ if (err < 0)
+ return err;
+ mux->sch = sch;
+
/* create all output streams for this file */
err = create_streams(mux, o);
if (err < 0)
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index 304471dd03..d463306546 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -28,6 +28,7 @@
#endif
#include "ffmpeg.h"
+#include "ffmpeg_sched.h"
#include "cmdutils.h"
#include "opt_common.h"
#include "sync_queue.h"
@@ -1157,20 +1158,22 @@ static int opt_audio_qscale(void *optctx, const char *opt, const char *arg)
static int opt_filter_complex(void *optctx, const char *opt, const char *arg)
{
+ Scheduler *sch = optctx;
char *graph_desc = av_strdup(arg);
if (!graph_desc)
return AVERROR(ENOMEM);
- return fg_create(NULL, graph_desc);
+ return fg_create(NULL, graph_desc, sch);
}
static int opt_filter_complex_script(void *optctx, const char *opt, const char *arg)
{
+ Scheduler *sch = optctx;
char *graph_desc = file_read(arg);
if (!graph_desc)
return AVERROR(EINVAL);
- return fg_create(NULL, graph_desc);
+ return fg_create(NULL, graph_desc, sch);
}
void show_help_default(const char *opt, const char *arg)
@@ -1262,8 +1265,9 @@ static const OptionGroupDef groups[] = {
[GROUP_INFILE] = { "input url", "i", OPT_INPUT },
};
-static int open_files(OptionGroupList *l, const char *inout,
- int (*open_file)(const OptionsContext*, const char*))
+static int open_files(OptionGroupList *l, const char *inout, Scheduler *sch,
+ int (*open_file)(const OptionsContext*, const char*,
+ Scheduler*))
{
int i, ret;
@@ -1283,7 +1287,7 @@ static int open_files(OptionGroupList *l, const char *inout,
}
av_log(NULL, AV_LOG_DEBUG, "Opening an %s file: %s.\n", inout, g->arg);
- ret = open_file(&o, g->arg);
+ ret = open_file(&o, g->arg, sch);
uninit_options(&o);
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error opening %s file %s.\n",
@@ -1296,7 +1300,7 @@ static int open_files(OptionGroupList *l, const char *inout,
return 0;
}
-int ffmpeg_parse_options(int argc, char **argv)
+int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch)
{
OptionParseContext octx;
const char *errmsg = NULL;
@@ -1313,7 +1317,7 @@ int ffmpeg_parse_options(int argc, char **argv)
}
/* apply global options */
- ret = parse_optgroup(NULL, &octx.global_opts);
+ ret = parse_optgroup(sch, &octx.global_opts);
if (ret < 0) {
errmsg = "parsing global options";
goto fail;
@@ -1323,7 +1327,7 @@ int ffmpeg_parse_options(int argc, char **argv)
term_init();
/* open input files */
- ret = open_files(&octx.groups[GROUP_INFILE], "input", ifile_open);
+ ret = open_files(&octx.groups[GROUP_INFILE], "input", sch, ifile_open);
if (ret < 0) {
errmsg = "opening input files";
goto fail;
@@ -1337,7 +1341,7 @@ int ffmpeg_parse_options(int argc, char **argv)
}
/* open output files */
- ret = open_files(&octx.groups[GROUP_OUTFILE], "output", of_open);
+ ret = open_files(&octx.groups[GROUP_OUTFILE], "output", sch, of_open);
if (ret < 0) {
errmsg = "opening output files";
goto fail;
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
new file mode 100644
index 0000000000..de7070906a
--- /dev/null
+++ b/fftools/ffmpeg_sched.c
@@ -0,0 +1,1703 @@
+/*
+ * Inter-thread scheduling/synchronization.
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdatomic.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "cmdutils.h"
+#include "ffmpeg_sched.h"
+#include "sync_queue.h"
+#include "thread_queue.h"
+
+// TODO: try to get rid of this
+#include "ffmpeg.h"
+
+#include "libavcodec/bsf.h"
+#include "libavcodec/packet.h"
+
+#include "libavutil/avassert.h"
+#include "libavutil/error.h"
+#include "libavutil/frame.h"
+#include "libavutil/mem.h"
+#include "libavutil/thread.h"
+
+// 100 ms
+// XXX: some other value? make this dynamic?
+#define SCHEDULE_TOLERANCE (100 * 1000)
+
+enum QueueType {
+ QUEUE_PACKETS,
+ QUEUE_FRAMES,
+};
+
+typedef struct SchTask {
+ SchThreadFunc func;
+ void *func_arg;
+
+ pthread_t thread;
+ int thread_running;
+} SchTask;
+
+typedef struct SchDec {
+ SchedulerNode src;
+ SchedulerNode *dst;
+ uint8_t *dst_finished;
+ unsigned nb_dst;
+
+ SchTask task;
+ // Queue for receiving input packets, one stream.
+ ThreadQueue *queue;
+
+ // temporary storage used by sch_dec_send()
+ AVFrame *send_frame;
+} SchDec;
+
+typedef struct SchSyncQueue {
+ SyncQueue *sq;
+ AVFrame *frame;
+ pthread_mutex_t lock;
+
+ unsigned *enc_idx;
+ unsigned nb_enc_idx;
+} SchSyncQueue;
+
+typedef struct SchEnc {
+ SchedulerNode src;
+ SchedulerNode dst;
+
+ // [0] - index of the sync queue in Scheduler.sq_enc,
+ // [1] - index of this encoder in the sq
+ int sq_idx[2];
+
+ pthread_mutex_t open_lock;
+ pthread_cond_t open_cond;
+ int (*open_cb)(void *opaque, const AVFrame *frame);
+ int can_open;
+ int opened;
+
+ SchTask task;
+ // Queue for receiving input frames, one stream.
+ ThreadQueue *queue;
+} SchEnc;
+
+typedef struct SchDemuxStream {
+ SchedulerNode *dst;
+ uint8_t *dst_finished;
+ unsigned nb_dst;
+} SchDemuxStream;
+
+typedef struct SchDemux {
+ SchDemuxStream *streams;
+ unsigned nb_streams;
+
+ SchTask task;
+
+ pthread_mutex_t demux_lock;
+ pthread_cond_t demux_cond;
+ atomic_int can_demux;
+ int terminate;
+
+ int finished;
+
+ // temporary storage used by sch_demux_send()
+ AVPacket *send_pkt;
+
+ // the following must not be accessed outside of schedule_update_locked()
+ int can_demux_prev;
+ int can_demux_next;
+} SchDemux;
+
+typedef struct SchMuxStream {
+ SchedulerNode src;
+ SchedulerNode src_sched;
+
+ // XXX
+ int (*hook)(void *opaque, void *item);
+ void *opaque;
+
+ AVBSFContext *bsf_ctx;
+ AVPacket *bsf_pkt;
+
+ ////////////////////////////////////////////////////////////
+ // The following are protected by Scheduler.schedule_lock //
+
+ /* dts of the last packet sent to this stream
+ in AV_TIME_BASE_Q */
+ int64_t last_dts;
+ int source_blocked;
+ // this stream no longer accepts input
+ int finished;
+ ////////////////////////////////////////////////////////////
+} SchMuxStream;
+
+typedef struct SchMux {
+ SchMuxStream *streams;
+ unsigned nb_streams;
+ unsigned nb_streams_ready;
+
+ int (*init)(void *arg);
+
+ SchTask task;
+ ThreadQueue *queue;
+} SchMux;
+
+typedef struct SchFilterIn {
+ SchedulerNode src;
+ SchedulerNode src_sched;
+} SchFilterIn;
+
+typedef struct SchFilterOut {
+ SchedulerNode dst;
+} SchFilterOut;
+
+typedef struct SchFilterGraph {
+ SchFilterIn *inputs;
+ unsigned nb_inputs;
+
+ SchFilterOut *outputs;
+ unsigned nb_outputs;
+
+ SchTask task;
+ ThreadQueue *queue;
+
+ // protected by schedule_lock
+ int best_input;
+} SchFilterGraph;
+
+struct Scheduler {
+ SchDemux *demux;
+ unsigned nb_demux;
+
+ SchMux *mux;
+ unsigned nb_mux;
+
+ unsigned nb_mux_ready;
+ pthread_mutex_t mux_ready_lock;
+
+ unsigned nb_mux_done;
+ pthread_mutex_t mux_done_lock;
+ pthread_cond_t mux_done_cond;
+
+
+ SchDec *dec;
+ unsigned nb_dec;
+
+ SchEnc *enc;
+ unsigned nb_enc;
+
+ SchSyncQueue *sq_enc;
+ unsigned nb_sq_enc;
+
+ SchFilterGraph *filters;
+ unsigned nb_filters;
+
+ char *sdp_filename;
+ int sdp_auto;
+
+ int transcode_started;
+
+ pthread_mutex_t schedule_lock;
+};
+
+static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
+ enum QueueType type)
+{
+ ThreadQueue *tq;
+ ObjPool *op;
+
+ op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
+ objpool_alloc_frames();
+ if (!op)
+ return AVERROR(ENOMEM);
+
+ tq = tq_alloc(nb_streams, queue_size, op,
+ (type == QUEUE_PACKETS) ? pkt_move : frame_move);
+ if (!tq) {
+ objpool_free(&op);
+ return AVERROR(ENOMEM);
+ }
+
+ *ptq = tq;
+ return 0;
+}
+
+static int task_stop(SchTask *task)
+{
+ int ret;
+ void *thread_ret;
+
+ if (!task->thread_running)
+ return 0;
+
+ ret = pthread_join(task->thread, &thread_ret);
+ av_assert0(ret == 0);
+
+ task->thread_running = 0;
+
+ return (intptr_t)thread_ret;
+}
+
+static int task_start(SchTask *task)
+{
+ int ret;
+
+ av_assert0(!task->thread_running);
+
+ ret = pthread_create(&task->thread, NULL, task->func, task->func_arg);
+ if (ret) {
+ av_log(NULL, AV_LOG_ERROR, "pthread_create() failed: %s\n", strerror(ret));
+ return AVERROR(ret);
+ }
+
+ task->thread_running = 1;
+ return 0;
+}
+
+int sch_stop(Scheduler *sch)
+{
+ int ret = 0, err;
+
+ // XXX ensure no threads can get stuck
+
+ for (unsigned i = 0; i < sch->nb_demux; i++) {
+ SchDemux *d = &sch->demux[i];
+
+ pthread_mutex_lock(&d->demux_lock);
+
+ d->terminate = 1;
+ atomic_store(&d->can_demux, 0);
+ pthread_cond_signal(&d->demux_cond);
+
+ pthread_mutex_unlock(&d->demux_lock);
+
+ err = task_stop(&d->task);
+ ret = err_merge(ret, err);
+ }
+
+ for (unsigned i = 0; i < sch->nb_dec; i++) {
+ SchDec *dec = &sch->dec[i];
+
+ // XXX should not be needed?
+ //tq_send_finish(dec->queue, 0);
+
+ err = task_stop(&dec->task);
+ ret = err_merge(ret, err);
+ }
+
+ for (unsigned i = 0; i < sch->nb_filters; i++) {
+ SchFilterGraph *fg = &sch->filters[i];
+
+ err = task_stop(&fg->task);
+ ret = err_merge(ret, err);
+ }
+
+ for (unsigned i = 0; i < sch->nb_enc; i++) {
+ SchEnc *enc = &sch->enc[i];
+
+ err = task_stop(&enc->task);
+ ret = err_merge(ret, err);
+ }
+
+ for (unsigned i = 0; i < sch->nb_mux; i++) {
+ SchMux *mux = &sch->mux[i];
+
+ err = task_stop(&mux->task);
+ ret = err_merge(ret, err);
+ }
+
+ return ret;
+}
+
+void sch_free(Scheduler **psch)
+{
+ Scheduler *sch = *psch;
+
+ if (!sch)
+ return;
+
+ sch_stop(sch);
+
+ for (unsigned i = 0; i < sch->nb_demux; i++) {
+ SchDemux *d = &sch->demux[i];
+
+ for (unsigned j = 0; j < d->nb_streams; j++) {
+ SchDemuxStream *ds = &d->streams[j];
+ av_freep(&ds->dst);
+ av_freep(&ds->dst_finished);
+ }
+ av_freep(&d->streams);
+
+ av_packet_free(&d->send_pkt);
+
+ pthread_mutex_destroy(&d->demux_lock);
+ pthread_cond_destroy(&d->demux_cond);
+ }
+ av_freep(&sch->demux);
+
+ for (unsigned i = 0; i < sch->nb_mux; i++) {
+ SchMux *mux = &sch->mux[i];
+
+ for (unsigned j = 0; j < mux->nb_streams; j++) {
+ SchMuxStream *ms = &mux->streams[j];
+
+ av_bsf_free(&ms->bsf_ctx);
+ av_packet_free(&ms->bsf_pkt);
+ }
+
+ av_freep(&mux->streams);
+
+ tq_free(&mux->queue);
+ }
+ av_freep(&sch->mux);
+
+ for (unsigned i = 0; i < sch->nb_dec; i++) {
+ SchDec *dec = &sch->dec[i];
+
+ tq_free(&dec->queue);
+
+ av_freep(&dec->dst);
+ av_freep(&dec->dst_finished);
+
+ av_frame_free(&dec->send_frame);
+ }
+ av_freep(&sch->dec);
+
+ for (unsigned i = 0; i < sch->nb_enc; i++) {
+ SchEnc *enc = &sch->enc[i];
+
+ tq_free(&enc->queue);
+ }
+ av_freep(&sch->enc);
+
+ for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
+ SchSyncQueue *sq = &sch->sq_enc[i];
+ sq_free(&sq->sq);
+ av_frame_free(&sq->frame);
+ pthread_mutex_destroy(&sq->lock);
+ av_freep(&sq->enc_idx);
+ }
+ av_freep(&sch->sq_enc);
+
+ for (unsigned i = 0; i < sch->nb_filters; i++) {
+ SchFilterGraph *fg = &sch->filters[i];
+
+ tq_free(&fg->queue);
+
+ av_freep(&fg->inputs);
+ av_freep(&fg->outputs);
+ }
+ av_freep(&sch->filters);
+
+ av_freep(&sch->sdp_filename);
+
+ pthread_mutex_destroy(&sch->mux_ready_lock);
+
+ pthread_mutex_destroy(&sch->mux_done_lock);
+ pthread_cond_destroy(&sch->mux_done_cond);
+
+ av_freep(psch);
+}
+
+Scheduler *sch_alloc(void)
+{
+ Scheduler *sch;
+ int ret;
+
+ sch = av_mallocz(sizeof(*sch));
+ if (!sch)
+ return NULL;
+
+ sch->sdp_auto = 1;
+
+ ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
+ if (ret)
+ goto fail;
+
+ ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
+ if (ret)
+ goto fail;
+
+ ret = pthread_cond_init(&sch->mux_done_cond, NULL);
+ if (ret)
+ goto fail;
+
+ return sch;
+fail:
+ sch_free(&sch);
+ return NULL;
+}
+
+int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
+{
+ av_freep(&sch->sdp_filename);
+ sch->sdp_filename = av_strdup(sdp_filename);
+ return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
+}
+
+int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
+ void *arg, int sdp_auto)
+{
+ SchMux *mux;
+ int ret;
+
+ ret = GROW_ARRAY(sch->mux, sch->nb_mux);
+ if (ret < 0)
+ return ret;
+
+ mux = &sch->mux[sch->nb_mux - 1];
+ mux->task.func = func;
+ mux->task.func_arg = arg;
+
+ mux->init = init;
+
+ sch->sdp_auto &= sdp_auto;
+
+ return mux - sch->mux;
+}
+
+int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
+{
+ SchMux *mux;
+ unsigned stream_idx;
+ int ret;
+
+ av_assert0(mux_idx < sch->nb_mux);
+ mux = &sch->mux[mux_idx];
+
+ ret = GROW_ARRAY(mux->streams, mux->nb_streams);
+ if (ret < 0)
+ return ret;
+ stream_idx = mux->nb_streams - 1;
+
+ mux->streams[stream_idx].last_dts = AV_NOPTS_VALUE;
+
+ return stream_idx;
+}
+
+int sch_add_mux_stream_bsf(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
+ const char *bsf)
+{
+ SchMuxStream *ms;
+ int ret;
+
+ av_assert0(mux_idx < sch->nb_mux && stream_idx < sch->mux[mux_idx].nb_streams);
+ ms = &sch->mux[mux_idx].streams[stream_idx];
+
+ av_assert0(!ms->bsf_ctx);
+
+ ret = av_bsf_list_parse_str(bsf, &ms->bsf_ctx);
+ if (ret < 0)
+ return ret;
+
+ ms->bsf_pkt = av_packet_alloc();
+ if (!ms->bsf_pkt)
+ return AVERROR(ENOMEM);
+
+ return 0;
+}
+
+int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *func_arg)
+{
+ SchDemux *d;
+ int ret;
+
+ ret = GROW_ARRAY(sch->demux, sch->nb_demux);
+ if (ret < 0)
+ return ret;
+
+ d = &sch->demux[sch->nb_demux - 1];
+ d->task.func = func;
+ d->task.func_arg = func_arg;
+
+ d->send_pkt = av_packet_alloc();
+ if (!d->send_pkt)
+ return AVERROR(ENOMEM);
+
+ ret = pthread_mutex_init(&d->demux_lock, NULL);
+ if (ret)
+ return AVERROR(errno);
+
+ ret = pthread_cond_init(&d->demux_cond, NULL);
+ if (ret)
+ return AVERROR(errno);
+
+ return d - sch->demux;
+}
+
+int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
+{
+ SchDemux *d;
+ int ret;
+
+ av_assert0(demux_idx < sch->nb_demux);
+ d = &sch->demux[demux_idx];
+
+ ret = GROW_ARRAY(d->streams, d->nb_streams);
+ return ret < 0 ? ret : d->nb_streams - 1;
+}
+
+int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *func_arg)
+{
+ SchDec *dec;
+ int ret;
+
+ ret = GROW_ARRAY(sch->dec, sch->nb_dec);
+ if (ret < 0)
+ return ret;
+
+ dec = &sch->dec[sch->nb_dec - 1];
+ dec->task.func = func;
+ dec->task.func_arg = func_arg;
+
+ dec->send_frame = av_frame_alloc();
+ if (!dec->send_frame)
+ return AVERROR(ENOMEM);
+
+ ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
+ if (ret < 0)
+ return ret;
+
+ return dec - sch->dec;
+}
+
+int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *func_arg,
+ int (*open_cb)(void *opaque, const AVFrame *frame))
+{
+ SchEnc *enc;
+ int ret;
+
+ ret = GROW_ARRAY(sch->enc, sch->nb_enc);
+ if (ret < 0)
+ return ret;
+
+ enc = &sch->enc[sch->nb_enc - 1];
+ enc->task.func = func;
+ enc->task.func_arg = func_arg;
+ enc->open_cb = open_cb;
+
+ ret = pthread_mutex_init(&enc->open_lock, NULL);
+ if (ret)
+ return AVERROR(ret);
+
+ ret = pthread_cond_init(&enc->open_cond, NULL);
+ if (ret)
+ return AVERROR(ret);
+
+ enc->sq_idx[0] = -1;
+ enc->sq_idx[1] = -1;
+
+ ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
+ if (ret < 0)
+ return ret;
+
+ return enc - sch->enc;
+}
+
+int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
+ SchThreadFunc func, void *func_arg)
+{
+ SchFilterGraph *fg;
+ int ret;
+
+ ret = GROW_ARRAY(sch->filters, sch->nb_filters);
+ if (ret < 0)
+ return ret;
+ fg = &sch->filters[sch->nb_filters - 1];
+
+ if (nb_inputs) {
+ fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
+ if (!fg->inputs)
+ return AVERROR(ENOMEM);
+ fg->nb_inputs = nb_inputs;
+ }
+
+ if (nb_outputs) {
+ fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
+ if (!fg->outputs)
+ return AVERROR(ENOMEM);
+ fg->nb_outputs = nb_outputs;
+ }
+
+ fg->best_input = nb_inputs ? 0 : -1;
+ fg->task.func = func;
+ fg->task.func_arg = func_arg;
+
+ ret = queue_alloc(&fg->queue, fg->nb_inputs, 1, QUEUE_FRAMES);
+ if (ret < 0)
+ return ret;
+
+ return fg - sch->filters;
+}
+
+int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
+{
+ SchSyncQueue *sq;
+ int ret;
+
+ ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
+ if (ret < 0)
+ return ret;
+ sq = &sch->sq_enc[sch->nb_sq_enc - 1];
+
+ sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
+ if (!sq->sq)
+ return AVERROR(ENOMEM);
+
+ sq->frame = av_frame_alloc();
+ if (!sq->frame)
+ return AVERROR(ENOMEM);
+
+ ret = pthread_mutex_init(&sq->lock, NULL);
+ if (ret)
+ return AVERROR(ret);
+
+ return sq - sch->sq_enc;
+}
+
+int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
+ int limiting, uint64_t max_frames)
+{
+ SchSyncQueue *sq;
+ SchEnc *enc;
+ int ret;
+
+ av_assert0(sq_idx < sch->nb_sq_enc);
+ sq = &sch->sq_enc[sq_idx];
+
+ av_assert0(enc_idx < sch->nb_enc);
+ enc = &sch->enc[enc_idx];
+
+ ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
+ if (ret < 0)
+ return ret;
+ sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
+
+ ret = sq_add_stream(sq->sq, limiting);
+ if (ret < 0)
+ return ret;
+
+ enc->sq_idx[0] = sq_idx;
+ enc->sq_idx[1] = ret;
+
+ if (max_frames != INT64_MAX)
+ sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
+
+ return 0;
+}
+
+int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst,
+ int (*hook)(void *opaque, void *item), void *opaque)
+{
+ int ret;
+
+ // XXX hack
+ if (hook)
+ av_assert0(src.type == SCH_NODE_TYPE_DEMUX && dst.type == SCH_NODE_TYPE_MUX);
+
+ switch (src.type) {
+ case SCH_NODE_TYPE_DEMUX: {
+ SchDemuxStream *ds;
+
+ av_assert0(src.idx < sch->nb_demux &&
+ src.idx_stream < sch->demux[src.idx].nb_streams);
+ ds = &sch->demux[src.idx].streams[src.idx_stream];
+
+ ret = GROW_ARRAY(ds->dst, ds->nb_dst);
+ if (ret < 0)
+ return ret;
+
+ ds->dst[ds->nb_dst - 1] = dst;
+
+ // demuxed packets go to decoding or streamcopy
+ switch (dst.type) {
+ case SCH_NODE_TYPE_DEC: {
+ SchDec *dec;
+
+ av_assert0(dst.idx < sch->nb_dec);
+ dec = &sch->dec[dst.idx];
+
+ av_assert0(!dec->src.type);
+ dec->src = src;
+ break;
+ }
+ case SCH_NODE_TYPE_MUX: {
+ SchMuxStream *ms;
+
+ av_assert0(dst.idx < sch->nb_mux &&
+ dst.idx_stream < sch->mux[dst.idx].nb_streams);
+ ms = &sch->mux[dst.idx].streams[dst.idx_stream];
+
+ av_assert0(!ms->src.type);
+ ms->src = src;
+
+ ms->hook = hook;
+ ms->opaque = opaque;
+
+ break;
+ }
+ default: av_assert0(0);
+ }
+
+ break;
+ }
+ case SCH_NODE_TYPE_DEC: {
+ SchDec *dec;
+
+ av_assert0(src.idx < sch->nb_dec);
+ dec = &sch->dec[src.idx];
+
+ ret = GROW_ARRAY(dec->dst, dec->nb_dst);
+ if (ret < 0)
+ return ret;
+
+ dec->dst[dec->nb_dst - 1] = dst;
+
+ // decoded frames go to filters or encoding
+ switch (dst.type) {
+ case SCH_NODE_TYPE_FILTER_IN: {
+ SchFilterIn *fi;
+
+ av_assert0(dst.idx < sch->nb_filters &&
+ dst.idx_stream < sch->filters[dst.idx].nb_inputs);
+ fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
+
+ av_assert0(!fi->src.type);
+ fi->src = src;
+ break;
+ }
+ case SCH_NODE_TYPE_ENC: {
+ SchEnc *enc;
+
+ av_assert0(dst.idx < sch->nb_enc);
+ enc = &sch->enc[dst.idx];
+
+ av_assert0(!enc->src.type);
+ enc->src = src;
+ break;
+ }
+ default: av_assert0(0);
+ }
+
+ break;
+ }
+ case SCH_NODE_TYPE_FILTER_OUT: {
+ SchFilterOut *fo;
+ SchEnc *enc;
+
+ av_assert0(src.idx < sch->nb_filters &&
+ src.idx_stream < sch->filters[src.idx].nb_outputs);
+ // filtered frames go to encoding
+ av_assert0(dst.type == SCH_NODE_TYPE_ENC &&
+ dst.idx < sch->nb_enc);
+
+ fo = &sch->filters[src.idx].outputs[src.idx_stream];
+ enc = &sch->enc[dst.idx];
+
+ av_assert0(!fo->dst.type && !enc->src.type);
+ fo->dst = dst;
+ enc->src = src;
+
+ break;
+ }
+ case SCH_NODE_TYPE_ENC: {
+ SchEnc *enc;
+ SchMuxStream *ms;
+
+ av_assert0(src.idx < sch->nb_enc);
+ // encoding packets go to muxing
+ av_assert0(dst.type == SCH_NODE_TYPE_MUX &&
+ dst.idx < sch->nb_mux &&
+ dst.idx_stream < sch->mux[dst.idx].nb_streams);
+ enc = &sch->enc[src.idx];
+ ms = &sch->mux[dst.idx].streams[dst.idx_stream];
+
+ av_assert0(!enc->dst.type && !ms->src.type);
+ enc->dst = dst;
+ ms->src = src;
+
+ break;
+ }
+ default: av_assert0(0);
+ }
+
+ return 0;
+}
+
+static int mux_init(Scheduler *sch, SchMux *mux)
+{
+ int ret;
+
+ ret = mux->init(mux->task.func_arg);
+ if (ret < 0)
+ return ret;
+
+ sch->nb_mux_ready++;
+
+ // XXX: test this
+ if (sch->sdp_filename || sch->sdp_auto) {
+ if (sch->nb_mux_ready < sch->nb_mux)
+ return 0;
+
+ ret = print_sdp(sch->sdp_filename);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
+ return ret;
+ }
+
+ /* SDP is written only after all the muxers are ready, so now we
+ * start ALL the threads */
+ for (int i = 0; i < sch->nb_mux; i++) {
+ ret = task_start(&sch->mux[i].task);
+ if (ret < 0)
+ return ret;
+ }
+ } else {
+ ret = task_start(&mux->task);
+ if (ret < 0)
+ return ret;
+ }
+
+ // XXX
+#if 0
+ /* flush the muxing queues */
+ for (int i = 0; i < fc->nb_streams; i++) {
+ OutputStream *ost = mux->of.streams[i];
+ MuxStream *ms = ms_from_ost(ost);
+ AVPacket *pkt;
+
+ while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
+ ret = thread_submit_packet(mux, ost, pkt);
+ if (pkt) {
+ ms->muxing_queue_data_size -= pkt->size;
+ av_packet_free(&pkt);
+ }
+ if (ret < 0)
+ return ret;
+ }
+ }
+#endif
+
+ return 0;
+}
+
+static int64_t trailing_dts(const Scheduler *sch)
+{
+ int64_t min_dts = INT64_MAX;
+
+ for (unsigned i = 0; i < sch->nb_mux; i++) {
+ const SchMux *mux = &sch->mux[i];
+
+ for (int j = 0; j < mux->nb_streams; j++) {
+ const SchMuxStream *ms = &mux->streams[j];
+
+ if (ms->finished)
+ continue;
+ if (ms->last_dts == AV_NOPTS_VALUE)
+ return AV_NOPTS_VALUE;
+
+ min_dts = FFMIN(min_dts, ms->last_dts);
+ }
+ }
+
+ return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
+}
+
+static void schedule_update_locked(Scheduler *sch)
+{
+ int64_t dts = trailing_dts(sch);
+
+ // initialize our internal state
+ // XXX handle filtergraphs with no inputs here
+ for (unsigned i = 0; i < sch->nb_demux; i++) {
+ SchDemux *d = &sch->demux[i];
+ d->can_demux_prev = atomic_load(&d->can_demux);
+ d->can_demux_next = 0;
+ }
+
+ // figure out the sources that are allowed to proceed
+ for (unsigned i = 0; i < sch->nb_mux; i++) {
+ SchMux *mux = &sch->mux[i];
+
+ for (int j = 0; j < mux->nb_streams; j++) {
+ SchMuxStream *ms = &mux->streams[j];
+ SchDemux *d;
+
+ // unblock sources for output streams that are not finished
+ // and not too far ahead of the trailing stream
+ if (ms->finished)
+ continue;
+ if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
+ continue;
+ if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
+ continue;
+
+ // for outputs fed from filtergraphs, consider that filtergraph's
+ // best_input information, in other cases there is a well-defined
+ // source demuxer
+ if (ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT) {
+ SchFilterGraph *fg = &sch->filters[ms->src_sched.idx];
+ SchFilterIn *fi;
+
+ // XXX handle filtergraphs with no inputs here
+ if (fg->best_input < 0)
+ continue;
+ fi = &fg->inputs[fg->best_input];
+
+ d = &sch->demux[fi->src_sched.idx];
+ } else
+ d = &sch->demux[ms->src_sched.idx];
+
+ d->can_demux_next = 1;
+ }
+ }
+
+ for (unsigned i = 0; i < sch->nb_demux; i++) {
+ SchDemux *d = &sch->demux[i];
+
+ if (d->can_demux_prev == d->can_demux_next)
+ continue;
+
+ pthread_mutex_lock(&d->demux_lock);
+
+ atomic_store(&d->can_demux, d->can_demux_next);
+ pthread_cond_signal(&d->demux_cond);
+
+ pthread_mutex_unlock(&d->demux_lock);
+ }
+}
+
+int sch_start(Scheduler *sch)
+{
+ int ret;
+
+ sch->transcode_started = 1;
+
+ // XXX add comprehensive logging
+
+ for (unsigned i = 0; i < sch->nb_mux; i++) {
+ SchMux *mux = &sch->mux[i];
+
+ for (unsigned j = 0; j < mux->nb_streams; j++) {
+ SchMuxStream *ms = &mux->streams[j];
+
+ switch (ms->src.type) {
+ case SCH_NODE_TYPE_ENC: {
+ SchEnc *enc = &sch->enc[ms->src.idx];
+ if (enc->src.type == SCH_NODE_TYPE_DEC) {
+ ms->src_sched = sch->dec[enc->src.idx].src;
+ av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
+ } else {
+ ms->src_sched = enc->src;
+ av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
+ }
+ break;
+ }
+ case SCH_NODE_TYPE_DEMUX:
+ ms->src_sched = ms->src;
+ break;
+ default:
+ av_log(NULL, AV_LOG_ERROR,
+ "Muxer stream #%u:%u not connected to a source\n", i, j);
+ return AVERROR(EINVAL);
+ }
+ }
+
+ // XXX should be special buffering queue
+ ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
+ if (ret < 0)
+ return ret;
+
+ if (mux->nb_streams_ready == mux->nb_streams) {
+ ret = mux_init(sch, mux);
+ if (ret < 0)
+ return ret;
+ }
+ }
+
+ for (unsigned i = 0; i < sch->nb_enc; i++) {
+ SchEnc *enc = &sch->enc[i];
+
+ if (!enc->src.type) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Encoder %u not connected to a source\n", i);
+ return AVERROR(EINVAL);
+ }
+ if (!enc->dst.type) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Encoder %u not connected to a sink\n", i);
+ return AVERROR(EINVAL);
+ }
+
+ ret = task_start(&enc->task);
+ if (ret < 0)
+ return ret;
+ }
+
+ for (unsigned i = 0; i < sch->nb_filters; i++) {
+ SchFilterGraph *fg = &sch->filters[i];
+
+ for (unsigned j = 0; j < fg->nb_inputs; j++) {
+ SchFilterIn *fi = &fg->inputs[j];
+
+ if (!fi->src.type) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Filtergraph %u input %u not connected to a source\n", i, j);
+ return AVERROR(EINVAL);
+ }
+
+ fi->src_sched = sch->dec[fi->src.idx].src;
+ }
+
+ for (unsigned j = 0; j < fg->nb_outputs; j++) {
+ SchFilterOut *fo = &fg->outputs[j];
+
+ if (!fo->dst.type) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Filtergraph %u output %u not connected to a sink\n", i, j);
+ return AVERROR(EINVAL);
+ }
+ }
+
+ ret = task_start(&fg->task);
+ if (ret < 0)
+ return ret;
+ }
+
+ for (unsigned i = 0; i < sch->nb_dec; i++) {
+ SchDec *dec = &sch->dec[i];
+
+ if (!dec->src.type) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Decoder %u not connected to a source\n", i);
+ return AVERROR(EINVAL);
+ }
+ if (!dec->nb_dst) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Decoder %u not connected to any sink\n", i);
+ return AVERROR(EINVAL);
+ }
+
+ dec->dst_finished = av_calloc(dec->nb_dst, sizeof(*dec->dst_finished));
+ if (!dec->dst_finished)
+ return AVERROR(ENOMEM);
+
+ ret = task_start(&dec->task);
+ if (ret < 0)
+ return ret;
+ }
+
+ for (unsigned i = 0; i < sch->nb_demux; i++) {
+ SchDemux *d = &sch->demux[i];
+
+ // XXX: is this useful?
+ if (!d->nb_streams)
+ continue;
+
+ for (unsigned j = 0; j < d->nb_streams; j++) {
+ SchDemuxStream *ds = &d->streams[j];
+
+ if (!ds->nb_dst) {
+ av_log(NULL, AV_LOG_ERROR,
+ "Demuxer stream #%u:%u not connected to any sink\n", i, j);
+ return AVERROR(EINVAL);
+ }
+
+ ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
+ if (!ds->dst_finished)
+ return AVERROR(ENOMEM);
+ }
+
+ ret = task_start(&d->task);
+ if (ret < 0)
+ return ret;
+ }
+
+ pthread_mutex_lock(&sch->schedule_lock);
+ schedule_update_locked(sch);
+ pthread_mutex_unlock(&sch->schedule_lock);
+
+ return 0;
+}
+
+int sch_wait(Scheduler *sch, uint64_t timeout_us)
+{
+ int ret;
+
+ pthread_mutex_lock(&sch->mux_done_lock);
+
+ if (sch->nb_mux_done < sch->nb_mux) {
+ struct timespec tv = { .tv_sec = timeout_us / 1000000,
+ .tv_nsec = (timeout_us % 1000000) * 1000 };
+ pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
+ }
+
+ ret = sch->nb_mux_done == sch->nb_mux;
+
+ pthread_mutex_unlock(&sch->mux_done_lock);
+
+ return ret;
+}
+
+static void enc_allow_open(SchEnc *enc)
+{
+ pthread_mutex_lock(&enc->open_lock);
+
+ enc->can_open = 1;
+ pthread_cond_signal(&enc->open_cond);
+
+ pthread_mutex_unlock(&enc->open_lock);
+}
+
+static int sch_enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
+{
+ int ret;
+
+ // this is called from the sending thread (filter/decoder),
+ // so wait until the encoding thread is ready to be opened
+ pthread_mutex_lock(&enc->open_lock);
+
+ while (!enc->can_open)
+ pthread_cond_wait(&enc->open_cond, &enc->open_lock);
+
+ pthread_mutex_unlock(&enc->open_lock);
+
+ ret = enc->open_cb(enc->task.func_arg, frame);
+ if (ret < 0)
+ return ret;
+
+ // ret>0 signals audio frame size, which means sync queue should
+ // have been enabled during encoder creation
+ if (ret > 0) {
+ av_assert0(enc->sq_idx[0] >= 0);
+ sq_frame_samples(sch->sq_enc[enc->sq_idx[0]].sq, enc->sq_idx[1], ret);
+ }
+
+ return 0;
+}
+
+static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
+{
+ if (frame)
+ return tq_send(enc->queue, 0, frame);
+
+ tq_send_finish(enc->queue, 0);
+ return 0;
+}
+
+static int send_to_sq(Scheduler *sch, SchSyncQueue *sq,
+ AVFrame *frame, unsigned stream_idx)
+{
+ int ret = 0;
+
+ pthread_mutex_lock(&sq->lock);
+
+ ret = sq_send(sq->sq, stream_idx, SQFRAME(frame));
+ if (ret < 0)
+ goto finish;
+
+ while (1) {
+ SchEnc *enc;
+
+ // TODO: the SQ API should be extended to allow returning EOF
+ // for individual streams
+ ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
+ if (ret == AVERROR(EAGAIN)) {
+ ret = 0;
+ goto finish;
+ } else if (ret < 0) {
+ // close all encoders fed from this sync queue
+ for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
+ int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
+
+ // if the sync queue error is EOF and closing the encoder
+ // produces a more serious error, make sure to pick the latter
+ ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
+ }
+ goto finish;
+ }
+
+ enc = &sch->enc[sq->enc_idx[ret]];
+ ret = send_to_enc_thread(sch, enc, sq->frame);
+ av_frame_unref(sq->frame);
+ if (ret < 0) {
+ sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
+ goto finish;
+ }
+ }
+
+finish:
+ pthread_mutex_unlock(&sq->lock);
+
+ return ret;
+}
+
+static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
+{
+ if (enc->open_cb && frame && !enc->opened) {
+ int ret = sch_enc_open(sch, enc, frame);
+ if (ret < 0)
+ return ret;
+ enc->opened = 1;
+ }
+
+ return (enc->sq_idx[0] >= 0) ?
+ send_to_sq(sch, &sch->sq_enc[enc->sq_idx[0]], frame, enc->sq_idx[1]) :
+ send_to_enc_thread(sch, enc, frame);
+}
+
+static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
+ AVPacket *pkt)
+{
+ SchMuxStream *ms = &mux->streams[stream_idx];
+ int64_t dts = AV_NOPTS_VALUE;
+
+ if (pkt) {
+ int ret;
+
+ if (pkt->dts != AV_NOPTS_VALUE)
+ dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q);
+
+ ret = tq_send(mux->queue, stream_idx, pkt);
+ if (ret < 0)
+ return ret;
+ } else
+ tq_send_finish(mux->queue, stream_idx);
+
+ // TODO: use atomics to check whether this changes trailing dts
+ // to avoid locking unnecesarily
+ if (dts != AV_NOPTS_VALUE || !pkt) {
+ pthread_mutex_lock(&sch->schedule_lock);
+
+ if (pkt) ms->last_dts = dts;
+ else ms->finished = 1;
+
+ schedule_update_locked(sch);
+
+ pthread_mutex_unlock(&sch->schedule_lock);
+ }
+
+ return 0;
+}
+
+static int
+demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
+ uint8_t *dst_finished, AVPacket *pkt)
+{
+ int ret;
+
+ if (*dst_finished)
+ return AVERROR_EOF;
+
+ if (!pkt)
+ goto finish;
+
+ if (dst.type == SCH_NODE_TYPE_MUX) {
+ SchMux *mux = &sch->mux[dst.idx];
+ SchMuxStream *ms = &mux->streams[dst.idx_stream];
+
+ // XXX check if this can be dropped
+ if (ms->hook) {
+ ret = ms->hook(ms->opaque, pkt);
+ if (ret == AVERROR_EOF)
+ goto finish;
+ else if (ret < 0)
+ return (ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
+
+ ret = send_to_mux(sch, mux, dst.idx_stream, pkt);
+ } else
+ ret = tq_send(sch->dec[dst.idx].queue, 0, pkt);
+
+ if (ret == AVERROR_EOF)
+ goto finish;
+
+ return ret;
+
+finish:
+ if (dst.type == SCH_NODE_TYPE_MUX)
+ send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt);
+ else
+ tq_send_finish(sch->dec[dst.idx].queue, 0);
+
+ *dst_finished = 1;
+ return AVERROR_EOF;
+}
+
+static int demux_send_for_stream(Scheduler *sch, SchDemux *d,
+ SchDemuxStream *ds, AVPacket *pkt)
+{
+ unsigned nb_done = 0;
+
+ for (unsigned i = 0; i < ds->nb_dst; i++) {
+ AVPacket *to_send = pkt;
+ uint8_t *finished = &ds->dst_finished[i];
+
+ int ret;
+
+ // sending a packet consumes it, so make a temporary reference if needed
+ if (pkt && !*finished && i < ds->nb_dst - 1) {
+ to_send = d->send_pkt;
+
+ ret = av_packet_ref(to_send, pkt);
+ if (ret < 0)
+ return ret;
+ }
+
+ ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send);
+ if (to_send)
+ av_packet_unref(to_send);
+ if (ret == AVERROR_EOF)
+ nb_done++;
+ else if (ret < 0)
+ return ret;
+ }
+
+ // XXX sub2video_heartbeat()
+
+ return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
+}
+
+int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt)
+{
+ SchDemux *d;
+ int ret = 0, terminate = 0;
+
+ av_assert0(demux_idx < sch->nb_demux);
+ d = &sch->demux[demux_idx];
+
+ if (d->finished)
+ return AVERROR_EXIT;
+
+ // sleep until the scheduling algorithm allows us to proceed
+ if (!atomic_load(&d->can_demux)) {
+
+ pthread_mutex_lock(&d->demux_lock);
+
+ while (!atomic_load(&d->can_demux) && !d->terminate)
+ pthread_cond_wait(&d->demux_cond, &d->demux_lock);
+
+ terminate = d->terminate;
+
+ pthread_mutex_unlock(&d->demux_lock);
+ }
+
+ if (!pkt || terminate) {
+ for (unsigned i = 0; i < d->nb_streams; i++) {
+ ret = demux_send_for_stream(sch, d, &d->streams[i], NULL);
+ av_assert0(ret >= 0 || ret == AVERROR_EOF);
+ }
+
+ pthread_mutex_lock(&sch->schedule_lock);
+
+ d->finished = 1;
+ schedule_update_locked(sch);
+
+ pthread_mutex_unlock(&sch->schedule_lock);
+
+ return pkt ? AVERROR_EXIT : 0;
+ }
+
+
+ av_assert0(pkt->stream_index < d->nb_streams);
+
+ return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt);
+}
+
+int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
+{
+ SchMux *mux;
+ int ret, stream_idx;
+
+ av_assert0(mux_idx < sch->nb_mux);
+ mux = &sch->mux[mux_idx];
+
+ ret = tq_receive(mux->queue, &stream_idx, pkt);
+ pkt->stream_index = stream_idx;
+ return ret;
+}
+
+void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, int stream_idx)
+{
+ SchMux *mux;
+
+ av_assert0(mux_idx < sch->nb_mux);
+ mux = &sch->mux[mux_idx];
+
+ pthread_mutex_lock(&sch->schedule_lock);
+
+ if (stream_idx >= 0) {
+ av_assert0(stream_idx < mux->nb_streams);
+ tq_receive_finish(mux->queue, stream_idx);
+ mux->streams[stream_idx].finished = 1;
+ } else {
+ // the muxer as a whole is done
+ for (unsigned i = 0; i < mux->nb_streams; i++) {
+ tq_receive_finish(mux->queue, i);
+ mux->streams[i].finished = 1;
+ }
+
+ pthread_mutex_lock(&sch->mux_done_lock);
+
+ av_assert0(sch->nb_mux_done < sch->nb_mux);
+ sch->nb_mux_done++;
+
+ pthread_cond_signal(&sch->mux_done_cond);
+
+ pthread_mutex_unlock(&sch->mux_done_lock);
+ }
+
+ schedule_update_locked(sch);
+
+ pthread_mutex_unlock(&sch->schedule_lock);
+}
+
+int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
+{
+ SchMux *mux;
+ int ret = 0;
+
+ av_assert0(mux_idx < sch->nb_mux);
+ mux = &sch->mux[mux_idx];
+
+ av_assert0(stream_idx < mux->nb_streams);
+
+ pthread_mutex_lock(&sch->mux_ready_lock);
+
+ av_assert0(mux->nb_streams_ready < mux->nb_streams);
+
+ // this may be called during initialization - do not start
+ // threads before sch_start() is called
+ if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started)
+ ret = mux_init(sch, mux);
+
+ pthread_mutex_unlock(&sch->mux_ready_lock);
+
+ return ret;
+}
+
+int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
+{
+ SchDec *dec;
+ int ret, dummy;
+
+ av_assert0(dec_idx < sch->nb_dec);
+ dec = &sch->dec[dec_idx];
+
+ ret = tq_receive(dec->queue, &dummy, pkt);
+ av_assert0(dummy <= 0);
+
+ return ret;
+}
+
+static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
+ unsigned in_idx, AVFrame *frame)
+{
+ if (frame)
+ return tq_send(fg->queue, in_idx, frame);
+
+ tq_send_finish(fg->queue, in_idx);
+ return 0;
+}
+
+static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
+ uint8_t *dst_finished, AVFrame *frame)
+{
+ int ret;
+
+ if (*dst_finished)
+ return AVERROR_EOF;
+
+ if (!frame)
+ goto finish;
+
+ ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
+ send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
+ send_to_enc(sch, &sch->enc[dst.idx], frame);
+ if (ret == AVERROR_EOF)
+ goto finish;
+
+ return ret;
+
+finish:
+ if (dst.type == SCH_NODE_TYPE_FILTER_IN)
+ send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
+ else
+ send_to_enc(sch, &sch->enc[dst.idx], NULL);
+
+ *dst_finished = 1;
+
+ return AVERROR_EOF;
+}
+
+int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
+{
+ SchDec *dec;
+ int ret = 0;
+ unsigned nb_done = 0;
+
+ av_assert0(dec_idx < sch->nb_dec);
+ dec = &sch->dec[dec_idx];
+
+ for (unsigned i = 0; i < dec->nb_dst; i++) {
+ uint8_t *finished = &dec->dst_finished[i];
+
+ AVFrame *to_send = frame;
+
+ // sending a frame consumes it, so make a temporary reference if needed
+ if (frame && !*finished && i < dec->nb_dst - 1) {
+ to_send = dec->send_frame;
+
+ // frame may sometimes contain props only,
+ // e.g. to signal EOF timestamp
+ ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
+ av_frame_copy_props(to_send, frame);
+ if (ret < 0)
+ return ret;
+ }
+
+ ret = dec_send_to_dst(sch, dec->dst[i], finished, to_send);
+ if (to_send)
+ av_frame_unref(to_send);
+ if (ret == AVERROR_EOF) {
+ nb_done++;
+ ret = 0;
+ continue;
+ } else if (ret < 0)
+ goto finish;
+ }
+
+finish:
+ // close the decoder's input queue at the end
+ if (!frame) {
+ tq_receive_finish(dec->queue, 0);
+ return 0;
+ }
+
+ return ret < 0 ? ret :
+ (nb_done == dec->nb_dst) ? AVERROR_EOF : 0;
+}
+
+int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
+{
+ SchEnc *enc;
+ int ret, dummy;
+
+ av_assert0(enc_idx < sch->nb_enc);
+ enc = &sch->enc[enc_idx];
+
+ if (!enc->can_open)
+ enc_allow_open(enc);
+
+ ret = tq_receive(enc->queue, &dummy, frame);
+ av_assert0(dummy <= 0);
+
+ return ret;
+}
+
+int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
+{
+ SchEnc *enc;
+ int ret;
+
+ av_assert0(enc_idx < sch->nb_enc);
+ enc = &sch->enc[enc_idx];
+
+ // XXX this is for cases when sch_enc_receive() was never called
+ // should it be here or in sch_stop()?
+ if (!enc->can_open) {
+ av_assert0(!pkt);
+ enc_allow_open(enc);
+ }
+
+ ret = send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, pkt);
+ if (pkt)
+ av_packet_unref(pkt);
+ else {
+ // close the encoder's input queue at the end
+ tq_receive_finish(enc->queue, 0);
+ }
+
+ return ret;
+}
+
+int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
+ int *in_idx, AVFrame *frame)
+{
+ SchFilterGraph *fg;
+ int ret;
+
+ av_assert0(fg_idx < sch->nb_filters);
+ fg = &sch->filters[fg_idx];
+
+ av_assert0((*in_idx >= 0 || !fg->nb_inputs) &&
+ *in_idx < (int)fg->nb_inputs);
+
+ // update scheduling to account for desired input stream, if it changed
+ if (*in_idx != fg->best_input) {
+ pthread_mutex_lock(&sch->schedule_lock);
+
+ fg->best_input = *in_idx;
+ schedule_update_locked(sch);
+
+ pthread_mutex_unlock(&sch->schedule_lock);
+ }
+
+ // XXX: handle graphs with no inputs
+
+ ret = tq_receive(fg->queue, in_idx, frame);
+ av_assert0(*in_idx < (int)fg->nb_inputs);
+
+ return ret;
+}
+
+int sch_filter_send(Scheduler *sch, unsigned fg_idx, int out_idx, AVFrame *frame)
+{
+ SchFilterGraph *fg;
+ int ret;
+
+ av_assert0(fg_idx < sch->nb_filters);
+ fg = &sch->filters[fg_idx];
+
+ if (out_idx < 0) {
+ av_assert0(!frame);
+
+ for (unsigned i = 0; i < fg->nb_inputs; i++)
+ tq_receive_finish(fg->queue, i);
+
+ // XXX update schedule here?
+
+ for (unsigned i = 0; i < fg->nb_outputs; i++) {
+ SchEnc *enc = &sch->enc[fg->outputs[i].dst.idx];
+ int err = send_to_enc(sch, enc, NULL);
+ ret = err_merge(ret, err);
+ }
+
+ return ret;
+ }
+
+ av_assert0(out_idx < fg->nb_outputs);
+ ret = send_to_enc(sch, &sch->enc[fg->outputs[out_idx].dst.idx], frame);
+
+ if (frame)
+ av_frame_unref(frame);
+
+ return ret;
+}
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
new file mode 100644
index 0000000000..d1f1a006b4
--- /dev/null
+++ b/fftools/ffmpeg_sched.h
@@ -0,0 +1,414 @@
+/*
+ * Inter-thread scheduling/synchronization.
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef FFTOOLS_FFMPEG_SCHED_H
+#define FFTOOLS_FFMPEG_SCHED_H
+
+/*
+ * This file contains the API for the transcode scheduler.
+ *
+ * Overall architecture of the transcoding process involves instances of the
+ * following components:
+ * - demuxers, each containing any number of streams; demuxed packets belonging
+ * to some stream are sent to any number of decoders (transcoding) or
+ * muxers (streamcopy);
+ * - decoders, which receive encoded packets from some demuxed stream, decode
+ * them, and send decoded frames to any number of filtergraph inputs
+ * (audio/video) or encoders (subtitles);
+ * - filtergraphs, which receive decoded frames from some decoder on every
+ * input, filter them, and send filtered frames from each output to some
+ * encoder; a generic filtergraph may have zero or more inputs (0 in case the
+ * filtergraph contains a lavfi source filter), and one or more outputs; the
+ * inputs and outputs need not have matching media types;
+ * - encoders, which receive decoded frames from some decoder (subtitles) or
+ * some filtergraph output (audio/video), encode them, and send encoded
+ * packets to some muxed stream;
+ * - muxers, each containing any number of muxed streams; each muxed stream
+ * receives encoded packets from some demuxed stream (streamcopy) or some
+ * encoder (transcoding); those packets are interleaved and written out by the
+ * muxer.
+ *
+ * There must be at least one muxer instance, otherwise the transcode produces
+ * no output and is meaningless. Otherwise, in a generic transcoding scenario
+ * there may be arbitrary number of instances of any of the above components,
+ * interconnected in various ways.
+ *
+ * The code tries to keep all the output streams across all the muxers in sync
+ * (i.e. at the same DTS), which is accomplished by varying the rates at which
+ * packets are read from different demuxers. Note that the degree of control we
+ * have over synchronization is fundamentally limited - if some demuxed streams
+ * in the same input are interleaved at different rates than that at which they
+ * are to be muxed (e.g. because an input file is badly interleaved, or the user
+ * changed their speed by mismatching amounts), then there will be increasing
+ * amounts of buffering followed by eventual transcoding failure.
+ *
+ * N.B. 1: there are meaningful transcode scenarios with no demuxers, e.g.
+ * - encoding and muxing output from filtergraph(s) that have no inputs;
+ * - creating a file that contains nothing but attachments and/or metadata.
+ *
+ * N.B. 2: a filtergraph output could, in principle, feed multiple encoders, but
+ * this is unnecessary because the (a)split filter provides the same
+ * functionality.
+ *
+ * The scheduler, in the above model, is the master object that oversees and
+ * facilitates the transcoding process. The basic idea is that all instances
+ * of the abovementioned components communicate only with the scheduler and not
+ * with each other. The scheduler is then the single place containing the
+ * knowledge about the whole transcoding pipeline.
+ */
+
+typedef struct Scheduler Scheduler;
+
+enum SchedulerNodeType {
+ SCH_NODE_TYPE_NONE = 0,
+ SCH_NODE_TYPE_DEMUX,
+ SCH_NODE_TYPE_MUX,
+ SCH_NODE_TYPE_DEC,
+ SCH_NODE_TYPE_ENC,
+ SCH_NODE_TYPE_FILTER_IN,
+ SCH_NODE_TYPE_FILTER_OUT,
+};
+
+typedef struct SchedulerNode {
+ enum SchedulerNodeType type;
+ unsigned idx;
+ unsigned idx_stream;
+} SchedulerNode;
+
+typedef void* (*SchThreadFunc)(void *arg);
+
+#define SCH_DSTREAM(file, stream) \
+ (SchedulerNode){ .type = SCH_NODE_TYPE_DEMUX, \
+ .idx = file, .idx_stream = stream }
+#define SCH_MSTREAM(file, stream) \
+ (SchedulerNode){ .type = SCH_NODE_TYPE_MUX, \
+ .idx = file, .idx_stream = stream }
+#define SCH_DEC(decoder) \
+ (SchedulerNode){ .type = SCH_NODE_TYPE_DEC, \
+ .idx = decoder }
+#define SCH_ENC(encoder) \
+ (SchedulerNode){ .type = SCH_NODE_TYPE_ENC, \
+ .idx = encoder }
+#define SCH_FILTER_IN(filter, input) \
+ (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_IN, \
+ .idx = filter, .idx_stream = input }
+#define SCH_FILTER_OUT(filter, output) \
+ (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_OUT, \
+ .idx = filter, .idx_stream = output }
+
+Scheduler *sch_alloc(void);
+void sch_free(Scheduler **sch);
+
+int sch_start(Scheduler *sch);
+int sch_stop(Scheduler *sch);
+
+/**
+ * Wait until transcoding terminates or the specified timeout elapses.
+ *
+ * @param timeout_us Amount of time in microseconds after which this function
+ * will timeout.
+ *
+ * @retval 0 waiting timed out, transcoding is not finished
+ * @retval 1 transcoding is finished
+ */
+int sch_wait(Scheduler *sch, uint64_t timeout_us);
+
+/**
+ * Add a demuxer to the scheduler.
+ *
+ * @retval ">=0" Index of the newly-created demuxer.
+ * @retval "<0" Error code.
+ */
+int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *func_arg);
+/**
+ * Add a demuxed stream for a previously added demuxer.
+ *
+ * @param demux_idx index previously returned by sch_add_demux()
+ *
+ * @retval ">=0" Index of the newly-created demuxed stream.
+ * @retval "<0" Error code.
+ */
+int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx);
+
+/**
+ * Add a decoder to the scheduler.
+ *
+ * @retval ">=0" Index of the newly-created decoder.
+ * @retval "<0" Error code.
+ */
+int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *func_arg);
+
+/**
+ * Add a filtergraph to the scheduler.
+ *
+ * @param nb_inputs Number of filtergraph inputs.
+ * @param nb_outputs number of filtergraph outputs
+ *
+ * @retval ">=0" Index of the newly-created filtergraph.
+ * @retval "<0" Error code.
+ */
+int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
+ SchThreadFunc func, void *func_arg);
+
+/**
+ * Add a muxer to the scheduler.
+ *
+ * Note that muxer thread startup is more complicated than for other components,
+ * because
+ * - muxer streams fed by audio/video encoders become initialized dynamically at
+ * runtime, after those encoders receive their first frame and initialize
+ * themselves, followed by calling sch_mux_stream_ready()
+ * - the header can be written after all the streams for a muxer are initialized
+ * - we may need to write an SDP, which must happen
+ * - AFTER all the headers are written
+ * - BEFORE any packets are written by any muxer
+ * - with all the muxers quiescent
+ * To avoid complicated muxer-thread synchronization dances, we postpone
+ * starting the muxer threads until after the SDP is written. The sequence of
+ * events is then as follows:
+ * - After sch_mux_stream_ready() is called for all the streams in a given muxer,
+ * the header for that muxer is written (care is taken that headers for
+ * different muxers are not written concurrently, since they write file
+ * information to stderr). If SDP is not wanted, the muxer thread then starts
+ * and muxing begins.
+ * - When SDP _is_ wanted, no muxer threads start until the header for the last
+ * muxer is written. After that, the SDP is written, after which all the muxer
+ * threads are started at once.
+ *
+ * In order, for the above to work, the scheduler needs to be able to invoke
+ * just writing the header, which is the reason the init parameter exists.
+ *
+ * @param init Callback that is called to initialize the muxer and write the
+ * header. Called after sch_mux_stream_ready() is called for all the
+ * streams in the muxer.
+ * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
+ *
+ * @retval ">=0" Index of the newly-created muxer.
+ * @retval "<0" Error code.
+ */
+int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
+ void *arg, int sdp_auto);
+/**
+ * Add a muxed stream for a previously added muxer.
+ *
+ * @param mux_idx index previously returned by sch_add_mux()
+ *
+ * @retval ">=0" Index of the newly-created muxed stream.
+ * @retval "<0" Error code.
+ */
+int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx);
+
+/**
+ * Signal to the scheduler that the specified muxed stream is initialized and
+ * ready. Muxing is started once all the streams are ready.
+ */
+int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);
+
+/**
+ * Set the file path for the SDP.
+ *
+ * The SDP is written when either of the following is true:
+ * - this function is called at least once
+ * - sdp_auto=1 is passed to EVERY call of sch_add_mux()
+ */
+int sch_sdp_filename(Scheduler *sch, const char *sdp_filename);
+
+/**
+ * Add an encoder to the scheduler.
+ *
+ * @param open_cb This callback, if specified, will be called when the first
+ * frame is obtained for this encoder. For audio encoders with a
+ * fixed frame size (which use a sync queue in the scheduler to
+ * rechunk frames), it must return that frame size on success.
+ * Otherwise (non-audio, variable frame size) it should return 0.
+ *
+ * @retval ">=0" Index of the newly-created encoder.
+ * @retval "<0" Error code.
+ */
+int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *func_arg,
+ int (*open_cb)(void *func_arg, const AVFrame *frame));
+
+/**
+ * Add an pre-encoding sync queue to the scheduler.
+ *
+ * @param buf_size_us Sync queue buffering size, passed to sq_alloc().
+ * @param logctx Logging context for the sync queue. passed to sq_alloc().
+ *
+ * @retval ">=0" Index of the newly-created sync queue.
+ * @retval "<0" Error code.
+ */
+int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx);
+int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
+ int limiting, uint64_t max_frames);
+
+/**
+ * XXX: TODO
+ */
+int sch_add_mux_stream_bsf(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
+ const char *bsf);
+
+int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst,
+ int (*hook)(void *opaque, void *item), void *opaque);
+
+/**
+ * Called by demuxing threads to send a demuxed packet or EOF to all its
+ * consumers. The stream is indentified by the packet's stream_index field.
+ *
+ * Every demuxer task must call this function with pkt=NULL exactly once, right
+ * before it exits - this call always succeeeds.
+ *
+ * @param demux_idx demuxer index
+ * @param pkt A demuxed packet to send or NULL to signal end of demuxing.
+ * If non-NULL, on success the packet is consumed and cleared
+ * by this function
+ *
+ * @retval "non-negative value" success
+ * @retval AVERROR_EOF all consumers for the stream are done
+ * @retval AVERROR_EXIT all consumers are done, should terminate demuxing
+ * @retval "anoter negative error code" other failure
+ */
+int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt);
+
+/**
+ * Called by decoders to receive a packet for decoding.
+ *
+ * @param dec_idx decoder index
+ * @param pkt Input packet will be written here on success. An empty packet
+ * signals that the decoder should be flushed, but more packets will
+ * follow (e.g. after seeking).
+ *
+ * @retval "non-negative value" success
+ * @retval AVERROR_EOF no more packets will arrive, should terminate decoding
+ * @retval "another negative error code" other failure
+ */
+int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt);
+
+/**
+ * Called by decoder tasks to send a decoded frame or EOF downstream.
+ *
+ * Every decoder task must call this function with frame=NULL exactly once, right
+ * before it exits - this call always succeeeds.
+ *
+ * @param dec_idx Decoder index previously returned by sch_add_dec().
+ * @param frame Decoded frame or NULL to signal end of decoding.
+ * If non-NULL, on success the frame is consumed and cleared
+ * by this function
+ *
+ * @retval ">=0" success
+ * @retval AVERROR_EOF all consumers are done, should terminate decoding
+ * @retval "another negative error code" other failure
+ */
+int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame);
+
+/**
+ * Called by filtergraph tasks to obtain frames for filtering. Will wait for a
+ * frame to become available and return it in frame.
+ *
+ * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
+ * @param[in,out] in_idx On input contains the index of the input on which a frame
+ * is most desired. On output contains input index of the
+ * actually returned frame or EOF.
+ */
+int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
+ int *in_idx, AVFrame *frame);
+/**
+ * Called by filtergraph tasks to send a filtered frame or EOF to consumers.
+ *
+ * Every filter task must call this function with out_idx<0 exactly once,
+ * right before it exits - that call always succeeds.
+ *
+ * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
+ * @param out_idx Index of the output which produced the frame. When negative,
+ * this call signals EOF on all streams, i.e. the filtering
+ * task is being terminated - in that case frame must be NULL.
+ * @param frame The frame to send to consumers. When NULL, signals that no more
+ * frames will be produced for the specified output (or all outputs
+ * when out_idx<0). When non-NULL, the frame is always consumed and
+ * cleared by this function.
+ *
+ * @retval "non-negative value" success
+ * @retval AVERROR_EOF all consumers are done
+ * @retval "anoter negative error code" other failure
+ */
+int sch_filter_send(Scheduler *sch, unsigned fg_idx, int out_idx, AVFrame *frame);
+
+/**
+ * Called by encoder tasks to obtain frames for encoding. Will wait for a frame
+ * to become available and return it in frame.
+ *
+ * @param enc_idx Encoder index previously returned by sch_add_enc().
+ * @param frame Newly-received frame will be stored here. Must be clean on
+ * entrance to this function.
+ *
+ * @retval 0 A frame was successfully delivered into frame.
+ * @retval AVERROR_EOF No more frames will be delivered, the encoder should
+ * flush everything and terminate.
+ *
+ */
+int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame);
+/**
+ * Called by encoder tasks to send encoded packets or EOF downstream.
+ * Will not block.
+ *
+ * Every encoder task must call this function with pkt=NULL exactly once,
+ * right before it exits - that call always succeeds.
+ *
+ * @param enc_idx Encoder index previously returned by sch_add_enc().
+ * @param pkt An encoded packet or NULL to signal EOF. When non-NULL, the
+ * packet will be consumed and cleared by this function on
+ * success.
+ *
+ * @retval 0 success
+ * @retval "<0" Error code.
+ */
+int sch_enc_send (Scheduler *sch, unsigned enc_idx, AVPacket *pkt);
+
+/**
+ * Called by muxer tasks to obtain packets for muxing. Will wait for a packet
+ * for any muxed stream to become available and return it in pkt.
+ *
+ * @param mux_idx Muxer index previously returned by sch_add_mux().
+ * @param pkt Newly-received packet will be stored here. Must be clean
+ * on entrance to this function.
+ *
+ * @retval 0 A packet was successfully delivered into pkt. Its stream_index
+ * corresponds to a stream index previously returned from
+ * sch_add_mux_stream().
+ * @retval AVERROR_EOF When pkt->stream_index is non-negative, this signals that
+ * no more packets will be delivered for this stream index.
+ * Otherwise this indicates that no more packets will be
+ * delivered for any stream and the muxer should therefore
+ * flush everything and terminate.
+ */
+int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt);
+
+/**
+ * Called by muxer tasks to signal that a single stream or all streams will no
+ * longer accept input.
+ *
+ * Every muxer task must call this function with stream_idx=-1 exactly once,
+ * right before it exits.
+ *
+ * @param stream_idx A non-negative stream index to mark one stream as finished,
+ * -1 to mark all.
+ */
+void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, int stream_idx);
+
+#endif /* FFTOOLS_FFMPEG_SCHED_H */
--
2.40.1
More information about the ffmpeg-devel
mailing list