[FFmpeg-devel] [PATCH 26/27] WIP fftools/ffmpeg_mux: convert to the scheduler
Anton Khirnov
anton at khirnov.net
Tue Sep 19 22:10:53 EEST 2023
---
fftools/ffmpeg.c | 26 ------
fftools/ffmpeg.h | 8 --
fftools/ffmpeg_mux.c | 184 ++++++++++++--------------------------
fftools/ffmpeg_mux.h | 14 ++-
fftools/ffmpeg_mux_init.c | 33 +++----
fftools/ffmpeg_opt.c | 6 +-
6 files changed, 80 insertions(+), 191 deletions(-)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 69e73b84ed..e82d88b3e0 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -806,38 +806,12 @@ int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt)
static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
{
InputFile *f = input_files[ist->file_index];
- int64_t dts_est = AV_NOPTS_VALUE;
int ret = 0;
int eof_reached = 0;
if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
eof_reached = 1;
- if (pkt && pkt->opaque_ref) {
- DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
- dts_est = pd->dts_est;
- }
-
- if (f->recording_time != INT64_MAX) {
- int64_t start_time = 0;
- if (copy_ts) {
- start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
- start_time += start_at_zero ? 0 : f->start_time_effective;
- }
- if (dts_est >= f->recording_time + start_time)
- pkt = NULL;
- }
-
- for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
- OutputStream *ost = ist->outputs[oidx];
- if (ost->enc || (!pkt && no_eof))
- continue;
-
- ret = of_streamcopy(ost, pkt, dts_est);
- if (ret < 0)
- return ret;
- }
-
return !eof_reached;
}
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 66b5e398bc..e3aea75415 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -665,7 +665,6 @@ extern FilterGraph **filtergraphs;
extern int nb_filtergraphs;
extern char *vstats_filename;
-extern char *sdp_filename;
extern float dts_delta_threshold;
extern float dts_error_threshold;
@@ -813,13 +812,6 @@ void of_free(OutputFile **pof);
void of_enc_stats_close(void);
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
-
-/**
- * @param dts predicted packet dts in AV_TIME_BASE_Q
- */
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
-
int64_t of_filesize(OutputFile *of);
int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 9628728d95..ba90079266 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -22,16 +22,13 @@
#include "ffmpeg.h"
#include "ffmpeg_mux.h"
-#include "objpool.h"
#include "sync_queue.h"
-#include "thread_queue.h"
#include "libavutil/fifo.h"
#include "libavutil/intreadwrite.h"
#include "libavutil/log.h"
#include "libavutil/mem.h"
#include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
#include "libavcodec/packet.h"
@@ -42,8 +39,6 @@ typedef struct MuxThreadContext {
AVPacket *pkt;
} MuxThreadContext;
-int want_sdp = 1;
-
static Muxer *mux_from_of(OutputFile *of)
{
return (Muxer*)of;
@@ -254,19 +249,22 @@ void *muxer_thread(void *arg)
OutputStream *ost;
int stream_idx, stream_eof = 0;
- ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
+ ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
+ stream_idx = mt.pkt->stream_index;
if (stream_idx < 0) {
av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
ret = 0;
break;
}
- ost = of->streams[stream_idx];
+ ost = of->streams[mux->sch_stream_idx[stream_idx]];
+ mt.pkt->stream_index = ost->index;
+
ret = sync_queue_process(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
av_packet_unref(mt.pkt);
if (ret == AVERROR_EOF) {
if (stream_eof) {
- tq_receive_finish(mux->tq, stream_idx);
+ sch_mux_receive_finish(mux->sch, of->index, stream_idx);
} else {
av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
ret = 0;
@@ -278,17 +276,19 @@ void *muxer_thread(void *arg)
}
}
+ // XXX move av_write_trailer() here?
+
finish:
mux_thread_uninit(&mt);
- for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
- tq_receive_finish(mux->tq, i);
+ sch_mux_receive_finish(mux->sch, of->index, -1);
av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
return (void*)(intptr_t)ret;
}
+#if 0
static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
{
int ret = 0;
@@ -296,7 +296,8 @@ static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
if (!pkt || ost->finished & MUXER_FINISHED)
goto finish;
- ret = tq_send(mux->tq, ost->index, pkt);
+ // XXX
+ //ret = tq_send(mux->tq, ost->index, pkt);
if (ret < 0)
goto finish;
@@ -306,8 +307,9 @@ finish:
if (pkt)
av_packet_unref(pkt);
+ // XXX
ost->finished |= MUXER_FINISHED;
- tq_send_finish(mux->tq, ost->index);
+ //tq_send_finish(mux->tq, ost->index);
return ret == AVERROR_EOF ? 0 : ret;
}
@@ -428,57 +430,48 @@ fail:
av_log(ost, AV_LOG_ERROR, "Error %s\n", err_msg);
return exit_on_error ? ret : 0;
}
+#endif
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt, int64_t dts)
{
OutputFile *of = output_files[ost->file_index];
MuxStream *ms = ms_from_ost(ost);
int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time;
int64_t ts_offset;
- AVPacket *opkt = ms->pkt;
- int ret;
-
- av_packet_unref(opkt);
if (of->recording_time != INT64_MAX &&
dts >= of->recording_time + start_time)
- pkt = NULL;
-
- // EOF: flush output bitstream filters.
- if (!pkt)
- return of_output_packet(of, ost, NULL);
+ return AVERROR_EOF;
if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
!ms->copy_initial_nonkeyframes)
- return 0;
+ return AVERROR(EAGAIN);
if (!ms->streamcopy_started) {
if (!ms->copy_prior_start &&
(pkt->pts == AV_NOPTS_VALUE ?
dts < ms->ts_copy_start :
pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base)))
- return 0;
+ return AVERROR(EAGAIN);
if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
- return 0;
+ return AVERROR(EAGAIN);
}
- ret = av_packet_ref(opkt, pkt);
- if (ret < 0)
- return ret;
-
- ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
+ ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
if (pkt->pts != AV_NOPTS_VALUE)
- opkt->pts -= ts_offset;
+ pkt->pts -= ts_offset;
if (pkt->dts == AV_NOPTS_VALUE) {
- opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
+ pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
} else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
- opkt->pts = opkt->dts - ts_offset;
+ pkt->pts = pkt->dts - ts_offset;
}
- opkt->dts -= ts_offset;
+ pkt->dts -= ts_offset;
+ // XXX
+#if 0
{
int ret = trigger_fix_sub_duration_heartbeat(ost, pkt);
if (ret < 0) {
@@ -488,73 +481,42 @@ int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
return ret;
}
}
-
- ret = of_output_packet(of, ost, opkt);
- if (ret < 0)
- return ret;
+#endif
ms->streamcopy_started = 1;
return 0;
}
-static int thread_stop(Muxer *mux)
+int of_streamcopy_hook(void *opaque, void *item)
{
- void *ret;
+ OutputStream *ost = opaque;
+ const InputStream *ist = ost->ist;
+ const InputFile *f = input_files[ist->file_index];
- if (!mux || !mux->tq)
- return 0;
+ AVPacket *pkt = item;
+ int64_t dts_est = AV_NOPTS_VALUE;
- for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
- tq_send_finish(mux->tq, i);
+ // nothing for us to do about seek signals
+ if ((intptr_t)pkt->opaque == PKT_OPAQUE_SEEK)
+ return AVERROR(EAGAIN);
- pthread_join(mux->thread, &ret);
-
- tq_free(&mux->tq);
-
- return (int)(intptr_t)ret;
-}
-
-static int thread_start(Muxer *mux)
-{
- AVFormatContext *fc = mux->fc;
- ObjPool *op;
- int ret;
-
- op = objpool_alloc_packets();
- if (!op)
- return AVERROR(ENOMEM);
-
- mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
- if (!mux->tq) {
- objpool_free(&op);
- return AVERROR(ENOMEM);
+ if (pkt->opaque_ref) {
+ DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
+ dts_est = pd->dts_est;
}
- ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
- if (ret) {
- tq_free(&mux->tq);
- return AVERROR(ret);
- }
-
- /* flush the muxing queues */
- for (int i = 0; i < fc->nb_streams; i++) {
- OutputStream *ost = mux->of.streams[i];
- MuxStream *ms = ms_from_ost(ost);
- AVPacket *pkt;
-
- while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
- ret = thread_submit_packet(mux, ost, pkt);
- if (pkt) {
- ms->muxing_queue_data_size -= pkt->size;
- av_packet_free(&pkt);
- }
- if (ret < 0)
- return ret;
+ if (f->recording_time != INT64_MAX) {
+ int64_t start_time = 0;
+ if (copy_ts) {
+ start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
+ start_time += start_at_zero ? 0 : f->start_time_effective;
}
+ if (dts_est >= f->recording_time + start_time)
+ return AVERROR_EOF;
}
- return 0;
+ return of_streamcopy(ost, pkt, dts_est);
}
int print_sdp(const char *filename)
@@ -565,11 +527,6 @@ int print_sdp(const char *filename)
AVIOContext *sdp_pb;
AVFormatContext **avc;
- for (i = 0; i < nb_output_files; i++) {
- if (!mux_from_of(output_files[i])->header_written)
- return 0;
- }
-
avc = av_malloc_array(nb_output_files, sizeof(*avc));
if (!avc)
return AVERROR(ENOMEM);
@@ -604,25 +561,17 @@ int print_sdp(const char *filename)
avio_closep(&sdp_pb);
}
- // SDP successfully written, allow muxer threads to start
- ret = 1;
-
fail:
av_freep(&avc);
return ret;
}
-int mux_check_init(Muxer *mux)
+int mux_check_init(void *arg)
{
+ Muxer *mux = arg;
OutputFile *of = &mux->of;
AVFormatContext *fc = mux->fc;
- int ret, i;
-
- for (i = 0; i < fc->nb_streams; i++) {
- OutputStream *ost = of->streams[i];
- if (!ost->initialized)
- return 0;
- }
+ int ret;
ret = avformat_write_header(fc, &mux->opts);
if (ret < 0) {
@@ -636,26 +585,6 @@ int mux_check_init(Muxer *mux)
av_dump_format(fc, of->index, fc->url, 1);
nb_output_dumped++;
- if (sdp_filename || want_sdp) {
- ret = print_sdp(sdp_filename);
- if (ret < 0) {
- av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
- return ret;
- } else if (ret == 1) {
- /* SDP is written only after all the muxers are ready, so now we
- * start ALL the threads */
- for (i = 0; i < nb_output_files; i++) {
- ret = thread_start(mux_from_of(output_files[i]));
- if (ret < 0)
- return ret;
- }
- }
- } else {
- ret = thread_start(mux_from_of(of));
- if (ret < 0)
- return ret;
- }
-
return 0;
}
@@ -711,9 +640,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost)
ost->st->time_base);
}
- ost->initialized = 1;
+ if (ms->sched_idx >= 0)
+ return sch_mux_stream_ready(mux->sch, of->index, ms->sched_idx);
- return mux_check_init(mux);
+ return 0;
}
static int check_written(OutputFile *of)
@@ -827,15 +757,13 @@ int of_write_trailer(OutputFile *of)
AVFormatContext *fc = mux->fc;
int ret, mux_result = 0;
- if (!mux->tq) {
+ if (!mux->header_written) {
av_log(mux, AV_LOG_ERROR,
"Nothing was written into output file, because "
"at least one of its streams received no packets.\n");
return AVERROR(EINVAL);
}
- mux_result = thread_stop(mux);
-
ret = av_write_trailer(fc);
if (ret < 0) {
av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret));
@@ -951,8 +879,6 @@ void of_free(OutputFile **pof)
return;
mux = mux_from_of(of);
- thread_stop(mux);
-
sq_free(&of->sq_encode);
sq_free(&mux->sq_mux);
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index d5aba6db36..311b612018 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -25,7 +25,6 @@
#include <stdint.h>
#include "ffmpeg_sched.h"
-#include "thread_queue.h"
#include "libavformat/avformat.h"
@@ -33,7 +32,6 @@
#include "libavutil/dict.h"
#include "libavutil/fifo.h"
-#include "libavutil/thread.h"
typedef struct MuxStream {
OutputStream ost;
@@ -104,9 +102,6 @@ typedef struct Muxer {
int *sch_stream_idx;
int nb_sch_stream_idx;
- pthread_t thread;
- ThreadQueue *tq;
-
AVDictionary *opts;
int thread_queue_size;
@@ -120,14 +115,15 @@ typedef struct Muxer {
AVPacket *sq_pkt;
} Muxer;
-/* whether we want to print an SDP, set in of_open() */
-extern int want_sdp;
-
-int mux_check_init(Muxer *mux);
+int mux_check_init(void *arg);
static MuxStream *ms_from_ost(OutputStream *ost)
{
return (MuxStream*)ost;
}
+/* XXX explain why this is needed
+ * (so it's called in the sending thread rather than the muxer) */
+int of_streamcopy_hook(void *opaque, void *item);
+
#endif /* FFTOOLS_FFMPEG_MUX_H */
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 9fb6a74393..122a7344e4 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -1375,7 +1375,7 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
MATCH_PER_STREAM_OPT(bitstream_filters, str, bsfs, oc, st);
if (bsfs && *bsfs) {
- ret = av_bsf_list_parse_str(bsfs, &ms->bsf_ctx);
+ ret = sch_add_mux_stream_bsf(mux->sch, mux->of.index, ost->index, bsfs);
if (ret < 0) {
av_log(ost, AV_LOG_ERROR, "Error parsing bitstream filter sequence '%s': %s\n", bsfs, av_err2str(ret));
return ret;
@@ -1486,7 +1486,8 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
return ret;
} else {
ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx),
- SCH_MSTREAM(ost->file_index, ms->sched_idx), NULL, NULL);
+ SCH_MSTREAM(ost->file_index, ms->sched_idx),
+ of_streamcopy_hook, ost);
if (ret < 0)
return ret;
}
@@ -1931,11 +1932,17 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
* - at least one encoded audio/video stream is frame-limited, since
* that has similar semantics to 'shortest'
* - at least one audio encoder requires constant frame sizes
+ *
+ * Note that encoding sync queues are handled in the scheduler, because
+ * different encoders run in different threads and need external
+ * synchronization, while muxer sync queues can be handled inside the muxer
*/
if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) {
- of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
- if (!of->sq_encode)
- return AVERROR(ENOMEM);
+ int sq_idx, ret;
+
+ sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
+ if (sq_idx < 0)
+ return sq_idx;
for (int i = 0; i < oc->nb_streams; i++) {
OutputStream *ost = of->streams[i];
@@ -1945,13 +1952,11 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
if (!IS_AV_ENC(ost, type))
continue;
- ost->sq_idx_encode = sq_add_stream(of->sq_encode,
- of->shortest || ms->max_frames < INT64_MAX);
- if (ost->sq_idx_encode < 0)
- return ost->sq_idx_encode;
-
- if (ms->max_frames != INT64_MAX)
- sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames);
+ ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sched_idx_enc,
+ of->shortest || ms->max_frames < INT64_MAX,
+ ms->max_frames);
+ if (ret < 0)
+ return ret;
}
}
@@ -2704,8 +2709,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
av_strlcat(mux->log_name, "/", sizeof(mux->log_name));
av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
- if (strcmp(oc->oformat->name, "rtp"))
- want_sdp = 0;
of->format = oc->oformat;
if (recording_time != INT64_MAX)
@@ -2721,7 +2724,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
AVFMT_FLAG_BITEXACT);
}
- err = sch_add_mux(sch, muxer_thread, NULL, mux,
+ err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
!strcmp(oc->oformat->name, "rtp"));
if (err < 0)
return err;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index d463306546..6177a96a4e 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[] = {"top", NULL};
HWDevice *filter_hw_device;
char *vstats_filename;
-char *sdp_filename;
float audio_drift_threshold = 0.1;
float dts_delta_threshold = 10;
@@ -580,9 +579,8 @@ fail:
static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
{
- av_free(sdp_filename);
- sdp_filename = av_strdup(arg);
- return 0;
+ Scheduler *sch = optctx;
+ return sch_sdp_filename(sch, arg);
}
#if CONFIG_VAAPI
--
2.40.1
More information about the ffmpeg-devel
mailing list