[FFmpeg-devel] [PATCH 16/21] fftools/ffmpeg_dec: move decoding to a separate thread
Anton Khirnov
anton at khirnov.net
Wed Jun 14 19:49:03 EEST 2023
This is only a preparatory step to a fully threaded architecture and
does not yet make decoding truly parallel - the main thread will
currently submit a packet and wait until it has been fully processed by
the decoding thread before moving on. Decoded behavior as observed by
the rest of the program should remain unchanged. That will change in
future commits after encoders and filters are moved to threads and a
thread-aware scheduler is added.
---
fftools/ffmpeg.c | 63 +++++---
fftools/ffmpeg.h | 11 ++
fftools/ffmpeg_dec.c | 365 ++++++++++++++++++++++++++++++++++++++-----
fftools/ffmpeg_mux.c | 5 -
4 files changed, 382 insertions(+), 62 deletions(-)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 45e71ed626..4e6205e3cb 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -728,6 +728,46 @@ cleanup:
return ret;
}
+static void subtitle_free(void *opaque, uint8_t *data)
+{
+ AVSubtitle *sub = (AVSubtitle*)data;
+ avsubtitle_free(sub);
+ av_free(sub);
+}
+
+int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
+{
+ AVBufferRef *buf;
+ AVSubtitle *sub;
+ int ret;
+
+ if (copy) {
+ sub = av_mallocz(sizeof(*sub));
+ ret = sub ? copy_av_subtitle(sub, subtitle) : AVERROR(ENOMEM);
+ if (ret < 0) {
+ av_freep(&sub);
+ return ret;
+ }
+ } else {
+ sub = av_memdup(subtitle, sizeof(*subtitle));
+ if (!sub)
+ return AVERROR(ENOMEM);
+ memset(subtitle, 0, sizeof(*subtitle));
+ }
+
+ buf = av_buffer_create((uint8_t*)sub, sizeof(*sub),
+ subtitle_free, NULL, 0);
+ if (!buf) {
+ avsubtitle_free(sub);
+ av_freep(&sub);
+ return AVERROR(ENOMEM);
+ }
+
+ frame->buf[0] = buf;
+
+ return 0;
+}
+
static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
{
int ret = AVERROR_BUG;
@@ -1038,30 +1078,11 @@ static void decode_flush(InputFile *ifile)
{
for (int i = 0; i < ifile->nb_streams; i++) {
InputStream *ist = ifile->streams[i];
- int ret;
- if (ist->discard)
+ if (ist->discard || !ist->decoding_needed)
continue;
- do {
- ret = process_input_packet(ist, NULL, 1);
- } while (ret > 0);
-
- if (ist->decoding_needed) {
- /* report last frame duration to the demuxer thread */
- if (ist->par->codec_type == AVMEDIA_TYPE_AUDIO) {
- LastFrameDuration dur;
-
- dur.stream_idx = i;
- dur.duration = av_rescale_q(ist->nb_samples,
- (AVRational){ 1, ist->dec_ctx->sample_rate},
- ist->st->time_base);
-
- av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0);
- }
-
- avcodec_flush_buffers(ist->dec_ctx);
- }
+ dec_packet(ist, NULL, 1);
}
}
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index abc1a21d73..5d60da085b 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -730,6 +730,7 @@ int init_simple_filtergraph(InputStream *ist, OutputStream *ost,
int init_complex_filtergraph(FilterGraph *fg);
int copy_av_subtitle(AVSubtitle *dst, const AVSubtitle *src);
+int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy);
/**
* Get our axiliary frame data attached to the frame, allocating it
@@ -941,4 +942,14 @@ extern const char * const opt_name_codec_tags[];
extern const char * const opt_name_frame_rates[];
extern const char * const opt_name_top_field_first[];
+static inline void pkt_move(void *dst, void *src)
+{
+ av_packet_move_ref(dst, src);
+}
+
+static inline void frame_move(void *dst, void *src)
+{
+ av_frame_move_ref(dst, src);
+}
+
#endif /* FFTOOLS_FFMPEG_H */
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index e6c6e22b04..996e7318d9 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -30,6 +30,7 @@
#include "libavfilter/buffersrc.h"
#include "ffmpeg.h"
+#include "thread_queue.h"
struct Decoder {
AVFrame *frame;
@@ -45,8 +46,50 @@ struct Decoder {
AVRational last_frame_tb;
int64_t last_filter_in_rescale_delta;
int last_frame_sample_rate;
+
+ pthread_t thread;
+ /**
+ * Queue for sending coded packets from the main thread to
+ * the decoder thread.
+ *
+ * An empty packet is sent to flush the decoder without terminating
+ * decoding.
+ */
+ ThreadQueue *queue_in;
+ /**
+ * Queue for sending decoded frames from the decoder thread
+ * to the main thread.
+ *
+ * An empty frame is sent to signal that a single packet has been fully
+ * processed.
+ */
+ ThreadQueue *queue_out;
};
+// data that is local to the decoder thread and not visible outside of it
+typedef struct DecThreadContext {
+ AVFrame *frame;
+ AVPacket *pkt;
+} DecThreadContext;
+
+static int dec_thread_stop(Decoder *d)
+{
+ void *ret;
+
+ if (!d->queue_in)
+ return 0;
+
+ tq_send_finish(d->queue_in, 0);
+ tq_receive_finish(d->queue_out, 0);
+
+ pthread_join(d->thread, &ret);
+
+ tq_free(&d->queue_in);
+ tq_free(&d->queue_out);
+
+ return (intptr_t)ret;
+}
+
void dec_free(Decoder **pdec)
{
Decoder *dec = *pdec;
@@ -54,6 +97,8 @@ void dec_free(Decoder **pdec)
if (!dec)
return;
+ dec_thread_stop(dec);
+
av_frame_free(&dec->frame);
av_packet_free(&dec->pkt);
@@ -383,8 +428,10 @@ out:
return ret;
}
-static int transcode_subtitles(InputStream *ist, const AVPacket *pkt)
+static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
+ AVFrame *frame)
{
+ Decoder *d = ist->decoder;
AVPacket *flush_pkt = NULL;
AVSubtitle subtitle;
int got_output;
@@ -403,20 +450,30 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt)
if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Error decoding subtitles: %s\n",
av_err2str(ret));
- if (exit_on_error)
- exit_program(1);
ist->decode_errors++;
+ return exit_on_error ? ret : 0;
}
- if (ret < 0 || !got_output) {
- if (!pkt)
- sub2video_flush(ist);
- return ret < 0 ? ret : AVERROR_EOF;
- }
+ if (!got_output)
+ return pkt ? 0 : AVERROR_EOF;
ist->frames_decoded++;
- return process_subtitle(ist, &subtitle);
+ // XXX the queue for transferring data back to the main thread runs
+ // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
+ // inside the frame
+ // eventually, subtitles should be switched to use AVFrames natively
+ ret = subtitle_wrap_frame(frame, &subtitle, 0);
+ if (ret < 0) {
+ avsubtitle_free(&subtitle);
+ return ret;
+ }
+
+ ret = tq_send(d->queue_out, 0, frame);
+ if (ret < 0)
+ av_frame_unref(frame);
+
+ return ret;
}
static int send_filter_eof(InputStream *ist)
@@ -434,7 +491,7 @@ static int send_filter_eof(InputStream *ist)
return 0;
}
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
+static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame)
{
Decoder *d = ist->decoder;
AVCodecContext *dec = ist->dec_ctx;
@@ -442,7 +499,7 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
int ret;
if (dec->codec_type == AVMEDIA_TYPE_SUBTITLE)
- return transcode_subtitles(ist, pkt);
+ return transcode_subtitles(ist, pkt, frame);
// With fate-indeo3-2, we're getting 0-sized packets before EOF for some
// reason. This seems like a semi-critical bug. Don't trigger EOF, and
@@ -457,23 +514,25 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
if (ret == AVERROR(EAGAIN)) {
av_log(ist, AV_LOG_FATAL, "A decoder returned an unexpected error code. "
"This is a bug, please report it.\n");
- exit_program(1);
+ return AVERROR_BUG;
}
av_log(ist, AV_LOG_ERROR, "Error submitting %s to decoder: %s\n",
pkt ? "packet" : "EOF", av_err2str(ret));
- if (exit_on_error)
- exit_program(1);
- if (ret != AVERROR_EOF)
+ if (ret != AVERROR_EOF) {
ist->decode_errors++;
+ if (!exit_on_error)
+ ret = 0;
+ }
return ret;
}
while (1) {
- AVFrame *frame = d->frame;
FrameData *fd;
+ av_frame_unref(frame);
+
update_benchmark(NULL);
ret = avcodec_receive_frame(dec, frame);
update_benchmark("decode_%s %d.%d", type_desc,
@@ -483,30 +542,22 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
av_assert0(pkt); // should never happen during flushing
return 0;
} else if (ret == AVERROR_EOF) {
- /* after flushing, send an EOF on all the filter inputs attached to the stream */
- /* except when looping we need to flush but not to send an EOF */
- if (!no_eof) {
- ret = send_filter_eof(ist);
- if (ret < 0) {
- av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
- exit_program(1);
- }
- }
-
- return AVERROR_EOF;
+ return ret;
} else if (ret < 0) {
av_log(ist, AV_LOG_ERROR, "Decoding error: %s\n", av_err2str(ret));
- if (exit_on_error)
- exit_program(1);
ist->decode_errors++;
- return ret;
+
+ if (exit_on_error)
+ return ret;
+
+ continue;
}
if (frame->decode_error_flags || (frame->flags & AV_FRAME_FLAG_CORRUPT)) {
av_log(ist, exit_on_error ? AV_LOG_FATAL : AV_LOG_WARNING,
"corrupt decoded frame\n");
if (exit_on_error)
- exit_program(1);
+ return AVERROR_INVALIDDATA;
}
@@ -514,7 +565,7 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
fd = frame_data(frame);
if (!fd) {
av_frame_unref(frame);
- report_and_exit(AVERROR(ENOMEM));
+ return AVERROR(ENOMEM);
}
fd->pts = frame->pts;
fd->tb = dec->pkt_timebase;
@@ -533,19 +584,254 @@ int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
if (ret < 0) {
av_log(NULL, AV_LOG_FATAL, "Error while processing the decoded "
"data for stream #%d:%d\n", ist->file_index, ist->index);
- exit_program(1);
+ return ret;
}
}
ist->frames_decoded++;
- ret = send_frame_to_filters(ist, frame);
- av_frame_unref(frame);
+ ret = tq_send(d->queue_out, 0, frame);
if (ret < 0)
- exit_program(1);
+ return ret;
}
}
+static void dec_thread_set_name(const InputStream *ist)
+{
+ char name[16];
+ snprintf(name, sizeof(name), "dec%d:%d:%s", ist->file_index, ist->index,
+ ist->dec_ctx->codec->name);
+ ff_thread_setname(name);
+}
+
+static void dec_thread_uninit(DecThreadContext *dt)
+{
+ av_packet_free(&dt->pkt);
+ av_frame_free(&dt->frame);
+
+ memset(dt, 0, sizeof(*dt));
+}
+
+static int dec_thread_init(DecThreadContext *dt)
+{
+ memset(dt, 0, sizeof(*dt));
+
+ dt->frame = av_frame_alloc();
+ if (!dt->frame)
+ goto fail;
+
+ dt->pkt = av_packet_alloc();
+ if (!dt->pkt)
+ goto fail;
+
+ return 0;
+
+fail:
+ dec_thread_uninit(dt);
+ return AVERROR(ENOMEM);
+}
+
+static void *decoder_thread(void *arg)
+{
+ InputStream *ist = arg;
+ InputFile *ifile = input_files[ist->file_index];
+ Decoder *d = ist->decoder;
+ DecThreadContext dt;
+ int ret = 0, input_status = 0;
+
+ ret = dec_thread_init(&dt);
+ if (ret < 0)
+ goto finish;
+
+ dec_thread_set_name(ist);
+
+ while (!input_status) {
+ int dummy, flush_buffers;
+
+ input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
+ flush_buffers = input_status >= 0 && !dt.pkt->buf;
+ if (!dt.pkt->buf)
+ av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
+ flush_buffers ? "flush" : "EOF");
+
+ ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
+
+ av_packet_unref(dt.pkt);
+ av_frame_unref(dt.frame);
+
+ if (ret == AVERROR_EOF) {
+ av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
+ flush_buffers ? "resetting" : "finishing");
+
+ if (!flush_buffers)
+ break;
+
+ /* report last frame duration to the demuxer thread */
+ if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
+ LastFrameDuration dur;
+
+ dur.stream_idx = ist->index;
+ dur.duration = av_rescale_q(ist->nb_samples,
+ (AVRational){ 1, ist->dec_ctx->sample_rate},
+ ist->st->time_base);
+
+ av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0);
+ }
+
+ avcodec_flush_buffers(ist->dec_ctx);
+ } else if (ret < 0) {
+ av_log(ist, AV_LOG_ERROR, "Error processing packet in decoder: %s\n",
+ av_err2str(ret));
+ break;
+ }
+
+ // signal to the consumer thread that the entire packet was processed
+ ret = tq_send(d->queue_out, 0, dt.frame);
+ if (ret < 0) {
+ if (ret != AVERROR_EOF)
+ av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
+ break;
+ }
+ }
+
+ // EOF is normal thread termination
+ if (ret == AVERROR_EOF)
+ ret = 0;
+
+finish:
+ tq_receive_finish(d->queue_in, 0);
+ tq_send_finish (d->queue_out, 0);
+
+ // make sure the demuxer does not get stuck waiting for audio durations
+ // that will never arrive
+ if (ifile->audio_duration_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
+ av_thread_message_queue_set_err_recv(ifile->audio_duration_queue, AVERROR_EOF);
+
+ dec_thread_uninit(&dt);
+
+ av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
+
+ return (void*)(intptr_t)ret;
+}
+
+static int dec_thread_start(InputStream *ist)
+{
+ Decoder *d = ist->decoder;
+ ObjPool *op;
+ int ret = 0;
+
+ op = objpool_alloc_packets();
+ if (!op)
+ return AVERROR(ENOMEM);
+
+ d->queue_in = tq_alloc(1, 1, op, pkt_move);
+ if (!d->queue_in) {
+ objpool_free(&op);
+ return AVERROR(ENOMEM);
+ }
+
+ op = objpool_alloc_frames();
+ if (!op)
+ goto fail;
+
+ d->queue_out = tq_alloc(1, 4, op, frame_move);
+ if (!d->queue_out) {
+ objpool_free(&op);
+ goto fail;
+ }
+
+ ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
+ if (ret) {
+ ret = AVERROR(ret);
+ av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
+ av_err2str(ret));
+ goto fail;
+ }
+
+ return 0;
+fail:
+ if (ret >= 0)
+ ret = AVERROR(ENOMEM);
+
+ tq_free(&d->queue_in);
+ tq_free(&d->queue_out);
+ return ret;
+}
+
+int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
+{
+ Decoder *d = ist->decoder;
+ int ret = 0, thread_ret;
+
+ // thread already joined
+ if (!d->queue_in)
+ return AVERROR_EOF;
+
+ // send the packet/flush request/EOF to the decoder thread
+ if (pkt || no_eof) {
+ av_packet_unref(d->pkt);
+
+ if (pkt) {
+ ret = av_packet_ref(d->pkt, pkt);
+ if (ret < 0)
+ goto finish;
+ }
+
+ ret = tq_send(d->queue_in, 0, d->pkt);
+ if (ret < 0)
+ goto finish;
+ } else
+ tq_send_finish(d->queue_in, 0);
+
+ // retrieve all decoded data for the packet
+ while (1) {
+ int dummy;
+
+ ret = tq_receive(d->queue_out, &dummy, d->frame);
+ if (ret < 0)
+ goto finish;
+
+ // packet fully processed
+ if (!d->frame->buf[0])
+ return 0;
+
+ // process the decoded frame
+ if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
+ AVSubtitle *sub = (AVSubtitle*)d->frame->buf[0]->data;
+ ret = process_subtitle(ist, sub);
+ } else {
+ ret = send_frame_to_filters(ist, d->frame);
+ }
+ av_frame_unref(d->frame);
+ if (ret < 0)
+ goto finish;
+ }
+
+finish:
+ thread_ret = dec_thread_stop(d);
+ if (thread_ret < 0) {
+ av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
+ av_err2str(thread_ret));
+ ret = err_merge(ret, thread_ret);
+ }
+ // non-EOF errors here are all fatal
+ if (ret < 0 && ret != AVERROR_EOF)
+ report_and_exit(ret);
+
+ // signal EOF to our downstreams
+ if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE)
+ sub2video_flush(ist);
+ else {
+ ret = send_filter_eof(ist);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
+ exit_program(1);
+ }
+ }
+
+ return AVERROR_EOF;
+}
+
static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
{
InputStream *ist = s->opaque;
@@ -781,5 +1067,12 @@ int dec_open(InputStream *ist)
}
assert_avoptions(ist->decoder_opts);
+ ret = dec_thread_start(ist);
+ if (ret < 0) {
+ av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
+ av_err2str(ret));
+ return ret;
+ }
+
return 0;
}
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index 66b2324bb3..026796f7e6 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -475,11 +475,6 @@ static int thread_stop(Muxer *mux)
return (int)(intptr_t)ret;
}
-static void pkt_move(void *dst, void *src)
-{
- av_packet_move_ref(dst, src);
-}
-
static int thread_start(Muxer *mux)
{
AVFormatContext *fc = mux->fc;
--
2.40.1
More information about the ffmpeg-devel
mailing list