[FFmpeg-devel] [PATCH 7/7] libavformat: Add FIFO pseudo-muxer
sebechlebskyjan at gmail.com
sebechlebskyjan at gmail.com
Mon Jul 4 17:45:15 EEST 2016
From: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
The fifo pseudo-muxer allows to separate encoder from the
actual output by using a first-in-first-out queue and
running actual muxer asynchronously in separate thread.
It can be configured to attempt transparent recovery
of output on failure.
Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
---
configure | 1 +
doc/muxers.texi | 80 ++++++
libavformat/Makefile | 1 +
libavformat/allformats.c | 1 +
libavformat/fifo.c | 666 +++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 749 insertions(+)
create mode 100644 libavformat/fifo.c
diff --git a/configure b/configure
index 126d0d6..b71c75f 100755
--- a/configure
+++ b/configure
@@ -2826,6 +2826,7 @@ dv_muxer_select="dvprofile"
dxa_demuxer_select="riffdec"
eac3_demuxer_select="ac3_parser"
f4v_muxer_select="mov_muxer"
+fifo_muxer_deps="pthreads"
flac_demuxer_select="flac_parser"
hds_muxer_select="flv_muxer"
hls_muxer_select="mpegts_muxer"
diff --git a/doc/muxers.texi b/doc/muxers.texi
index c2ca0ba..7ca14f6 100644
--- a/doc/muxers.texi
+++ b/doc/muxers.texi
@@ -1408,6 +1408,86 @@ Specify whether to remove all fragments when finished. Default 0 (do not remove)
@end table
+ at section fifo
+
+The fifo pseudo-muxer allows to separate encoding from any other muxer by using
+first-in-first-out queue and running the actual muxer in a separate thread. This
+is especially useful in combination with the @ref{tee} muxer and output to
+several destinations with different reliability/writing speed/latency.
+
+The fifo muxer can operate in regular or non-blocking mode. This determines the
+behaviour in case the queue fills up. In regular mode the encoding is blocked
+until the muxer processes some of the packets from queue; in non-blocking mode
+the packets are thrown away rather than blocking the encoder (this might be
+useful in real-time streaming scenarios).
+
+ at table @option
+
+ at item fifo_format
+Specify the format name. Useful if it cannot be guessed from the
+output name suffix.
+
+ at item queue_size
+Specify size of the queue (number of packets). Default value is 60.
+
+ at item format_opts
+Specify format options for the underlying muxer. Muxer options can be specified
+as a list of @var{key}=@var{value} pairs separated by ':'.
+
+ at item block_on_overflow @var{bool}
+If set to 0 (false), non-blocking mode will be used and in case the queue fills
+up, packets will be dropped. By default this option is set to 1 (true), so in
+case of queue overflow the encoder will be blocked until the muxer processes
+some of the packets.
+
+ at item attempt_recovery @var{bool}
+If failure occurs, attempt to recover the output. This is especially useful
+when used with network output, allows to restart streaming transparently.
+By default this option set to 0 (false).
+
+ at item max_recovery_attempts
+Sets maximum number of successive unsucessful recovery attempts after which
+the output fails permanently. Unlimited if set to zero. Default value is 16.
+
+ at item recovery_wait_time @var{duration}
+Waiting time before the next recovery attempt after previous unsuccessfull
+recovery attempt. Default value is 5 seconds.
+
+s at item recovery_wait_streamtime @var{bool}
+If set to 0 (false), the real time is used when waiting for the recovery
+attempt (i.e. the recovery will be attempted after at least
+recovery_wait_time seconds).
+If set to 1 (true), the time of the processed stream is taken into account
+instead (i.e. the recovery will be attempted after at least recovery_wait_time
+seconds of the stream is omitted).
+By default, this option is set to 0 (false).
+
+ at item recover_any_error @var{bool}
+If set to 1 (true), recovery will be attempted regardless of type of the error
+causing the failure. By default this option is set to 0 (false) and in case of
+certain errors the recovery is not attempted even when @ref{attempt_recovery}
+is set to 1.
+
+ at item restart_with_keyframe @var{bool}
+Specify whether to wait for the keyframe after recovering from
+queue overflow or failure. This option is set to 0 (false) by default.
+
+ at end table
+
+ at subsection Examples
+
+ at itemize
+
+ at item
+Stream something to rtmp server using non-blocking mode and recover automatically
+in case failure happens (for example the network becomes unavailable for a moment).
+ at example
+ffmpeg -re -i ... -c:v libx264 -c:a mp2 -f fifo -fifo_format flv -map 0:v -map 0:a
+ -block_on_overflow 0 -attempt_recovery 1 rtmp://example.com/live/stream_name
+ at end example
+
+ at end itemize
+
@section tee
The tee muxer can be used to write the same data to several files or any
diff --git a/libavformat/Makefile b/libavformat/Makefile
index c49f9de..42fb9be 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -162,6 +162,7 @@ OBJS-$(CONFIG_FFM_DEMUXER) += ffmdec.o
OBJS-$(CONFIG_FFM_MUXER) += ffmenc.o
OBJS-$(CONFIG_FFMETADATA_DEMUXER) += ffmetadec.o
OBJS-$(CONFIG_FFMETADATA_MUXER) += ffmetaenc.o
+OBJS-$(CONFIG_FIFO_MUXER) += fifo.o
OBJS-$(CONFIG_FILMSTRIP_DEMUXER) += filmstripdec.o
OBJS-$(CONFIG_FILMSTRIP_MUXER) += filmstripenc.o
OBJS-$(CONFIG_FLAC_DEMUXER) += flacdec.o rawdec.o \
diff --git a/libavformat/allformats.c b/libavformat/allformats.c
index d490cc4..b21a3de 100644
--- a/libavformat/allformats.c
+++ b/libavformat/allformats.c
@@ -123,6 +123,7 @@ void av_register_all(void)
REGISTER_MUXER (F4V, f4v);
REGISTER_MUXDEMUX(FFM, ffm);
REGISTER_MUXDEMUX(FFMETADATA, ffmetadata);
+ REGISTER_MUXER (FIFO, fifo);
REGISTER_MUXDEMUX(FILMSTRIP, filmstrip);
REGISTER_MUXDEMUX(FLAC, flac);
REGISTER_DEMUXER (FLIC, flic);
diff --git a/libavformat/fifo.c b/libavformat/fifo.c
new file mode 100644
index 0000000..c0aa24b
--- /dev/null
+++ b/libavformat/fifo.c
@@ -0,0 +1,666 @@
+/*
+ * FIFO pseudo-muxer
+ * Copyright (c) 2016 Jan Sebechlebsky
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+#include "libavutil/threadmessage.h"
+#include "avformat.h"
+#include "internal.h"
+#include "pthread.h"
+
+#define FIFO_DEFAULT_QUEUE_SIZE 60
+#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 16
+#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 second
+
+typedef struct FifoContext {
+ const AVClass *class;
+ AVFormatContext *avf;
+
+ char *format;
+ AVOutputFormat *oformat;
+
+ char *format_options_str;
+ AVDictionary *format_options;
+
+ int queue_size;
+ AVThreadMessageQueue *queue;
+ pthread_t writer_thread;
+
+ /* Return value of last write_trailer_call */
+ int write_trailer_ret;
+
+ /* Time to wait before next recovery attempt
+ * This can refer to the time in processed stream,
+ * or real time. */
+ int64_t recovery_wait_time;
+
+ /* Maximal number of unsuccessful successive recovery attempts */
+ int max_recovery_attempts;
+
+ /* Whether to attempt recovery from failure */
+ uint8_t attempt_recovery;
+
+ /* If >0 stream time will be used when waiting
+ * for the recovery attempt instead of real time */
+ uint8_t recovery_wait_streamtime;
+
+ /* If >0 recovery will be attempted regardless of error code
+ * (except AVERROR_EXIT, so exit request is never ignored) */
+ uint8_t recover_any_error;
+
+ pthread_mutex_t overflow_flag_lock;
+ /* Value > 0 signalizes queue overflow */
+ uint8_t overflow_flag;
+
+ /* Whether to block in case the queue is full. */
+ uint8_t block_on_overflow;
+
+ /* Whether to wait for keyframe when recovering
+ * from failure or queue overflow */
+ uint8_t restart_with_keyframe;
+
+} FifoContext;
+
+typedef struct FifoThreadContext {
+ AVFormatContext *avf;
+
+ /* Timestamp of last failure.
+ * This is either pts in case stream time is used,
+ * or microseconds as returned by av_getttime_relative() */
+ int64_t last_recovery_ts;
+
+ /* Number of current recovery process
+ * Value > 0 means we are in recovery process */
+ int recovery_nr;
+
+ /* If > 0 all frames will be dropped until keyframe is received */
+ uint8_t drop_until_keyframe;
+
+ /* Value > 0 means that the previous write_header call was successful
+ * so finalization by calling write_trailer and ff_io_close must be done
+ * before exiting / reinitialization of underlying muxer */
+ uint8_t header_written;
+} FifoThreadContext;
+
+typedef enum FifoMessageType {
+ FIFO_WRITE_HEADER,
+ FIFO_WRITE_PACKET,
+ FIFO_FLUSH_OUTPUT
+} FifoMessageType;
+
+typedef struct FifoMessage {
+ FifoMessageType type;
+ AVPacket pkt;
+} FifoMessage;
+
+static int fifo_thread_write_header(FifoThreadContext *ctx)
+{
+ AVFormatContext *avf = ctx->avf;
+ FifoContext *fifo = avf->priv_data;
+ AVFormatContext *avf2 = fifo->avf;
+ AVDictionary *format_options = NULL;
+ int ret, i;
+
+ ret = av_dict_copy(&format_options, fifo->format_options, 0);
+ if (ret < 0)
+ return ret;
+
+ ret = ff_format_output_open(avf2, avf->filename, &format_options);
+ if (ret < 0) {
+ av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename,
+ av_err2str(ret));
+ goto end;
+ }
+
+ for (i = 0;i < avf2->nb_streams; i++)
+ avf2->streams[i]->cur_dts = 0;
+
+ ret = avformat_write_header(avf2, &fifo->format_options);
+ if (!ret)
+ ctx->header_written = 1;
+
+ // Check for options unrecognized by underlying muxer
+ if (format_options) {
+ AVDictionaryEntry *entry = NULL;
+ while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
+ av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
+ ret = AVERROR(EINVAL);
+ }
+
+end:
+ av_dict_free(&format_options);
+ return ret;
+}
+
+static int fifo_thread_flush_output(FifoThreadContext *ctx)
+{
+ AVFormatContext *avf = ctx->avf;
+ FifoContext *fifo = avf->priv_data;
+ AVFormatContext *avf2 = fifo->avf;
+
+ return av_write_frame(avf2, NULL);
+}
+
+static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
+{
+ AVFormatContext *avf = ctx->avf;
+ FifoContext *fifo = avf->priv_data;
+ AVFormatContext *avf2 = fifo->avf;
+ int ret;
+
+ if (ctx->drop_until_keyframe) {
+ if (pkt->flags & AV_PKT_FLAG_KEY) {
+ ctx->drop_until_keyframe = 0;
+ av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
+ } else {
+ av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
+ av_packet_unref(pkt);
+ return 0;
+ }
+ }
+
+ if (pkt->pts != AV_NOPTS_VALUE) {
+ int s_idx = pkt->stream_index;
+ AVRational src_tb = avf->streams[s_idx]->time_base;
+ AVRational dst_tb = avf2->streams[s_idx]->time_base;
+
+ pkt->pts = av_rescale_q(pkt->pts, src_tb, dst_tb);
+ pkt->dts = av_rescale_q(pkt->dts, src_tb, dst_tb);
+ pkt->duration = av_rescale_q(pkt->duration, src_tb, dst_tb);
+ }
+
+ ret = av_write_frame(avf2, pkt);
+ if (ret >= 0)
+ av_packet_unref(pkt);
+ return ret;
+}
+
+static int fifo_thread_write_trailer(FifoThreadContext *ctx)
+{
+ AVFormatContext *avf = ctx->avf;
+ FifoContext *fifo = avf->priv_data;
+ AVFormatContext *avf2 = fifo->avf;
+ int ret;
+
+ if (!ctx->header_written)
+ return 0;
+
+ ret = av_write_trailer(avf2);
+ ff_format_io_close(avf2, &avf2->pb);
+
+ return ret;
+}
+
+static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
+{
+ switch (msg->type) {
+ case FIFO_WRITE_HEADER:
+ return fifo_thread_write_header(ctx);
+ case FIFO_WRITE_PACKET:
+ return fifo_thread_write_packet(ctx, &msg->pkt);
+ case FIFO_FLUSH_OUTPUT:
+ return fifo_thread_flush_output(ctx);
+ }
+
+ return AVERROR(EINVAL);
+}
+
+static int is_recoverable(const FifoContext *fifo, int err_no) {
+ if (!fifo->attempt_recovery)
+ return 0;
+
+ if (fifo->recover_any_error)
+ return err_no != AVERROR_EXIT;
+
+ switch (err_no) {
+ case AVERROR(EINVAL):
+ case AVERROR(ENOSYS):
+ case AVERROR_EOF:
+ case AVERROR_EXIT:
+ case AVERROR_PATCHWELCOME:
+ return 0;
+ default:
+ return 1;
+ }
+}
+
+static void free_message(void *msg)
+{
+ FifoMessage *fifo_msg = msg;
+
+ if (fifo_msg->type == FIFO_WRITE_PACKET)
+ av_packet_unref(&fifo_msg->pkt);
+}
+
+static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
+ int err_no)
+{
+ AVFormatContext *avf = ctx->avf;
+ FifoContext *fifo = avf->priv_data;
+ int ret;
+
+ av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
+ av_err2str(err_no));
+
+ if (fifo->recovery_wait_streamtime) {
+ if (pkt->pts == AV_NOPTS_VALUE)
+ av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
+ " timestamp, recovery will be attempted immediately");
+ ctx->last_recovery_ts = pkt->pts;
+ } else
+ ctx->last_recovery_ts = av_gettime_relative();
+
+ if (fifo->max_recovery_attempts &&
+ ctx->recovery_nr >= fifo->max_recovery_attempts) {
+ av_log(avf, AV_LOG_ERROR,
+ "Maximal number of %d recovery attempts reached.\n",
+ fifo->max_recovery_attempts);
+ ret = err_no;
+ } else
+ ret = AVERROR(EAGAIN);
+
+ return ret;
+}
+
+static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
+{
+ AVFormatContext *avf = ctx->avf;
+ FifoContext *fifo = avf->priv_data;
+ AVPacket *pkt = &msg->pkt;
+ int64_t time_since_recovery;
+ int ret;
+
+ if (!is_recoverable(fifo, err_no)) {
+ ret = err_no;
+ goto fail;
+ }
+
+ if (ctx->header_written) {
+ fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
+ ctx->header_written = 0;
+ }
+
+ if (!ctx->recovery_nr)
+ ctx->last_recovery_ts = 0;
+ else {
+ if (fifo->recovery_wait_streamtime) {
+ AVRational tb = avf->streams[pkt->stream_index]->time_base;
+ time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
+ tb, AV_TIME_BASE_Q);
+ } else {
+ time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
+ }
+
+ if (time_since_recovery < fifo->recovery_wait_time)
+ return AVERROR(EAGAIN);
+ }
+
+ ctx->recovery_nr++;
+
+ if (fifo->max_recovery_attempts)
+ av_log(avf, AV_LOG_INFO, "Recovery attempt #%d/%d\n",
+ ctx->recovery_nr, fifo->max_recovery_attempts);
+ else
+ av_log(avf, AV_LOG_INFO, "Recovery attempt #%d\n",
+ ctx->recovery_nr);
+
+
+ if (msg->type != FIFO_WRITE_HEADER) {
+ ret = fifo_thread_write_header(ctx);
+ if (ret < 0)
+ return fifo_thread_process_recovery_failure(ctx, pkt, ret);
+ }
+
+ if (fifo->restart_with_keyframe && !fifo->block_on_overflow)
+ ctx->drop_until_keyframe = 1;
+
+ ret = fifo_thread_dispatch_message(ctx, msg);
+ if (ret < 0) {
+ if (is_recoverable(fifo, ret))
+ return fifo_thread_process_recovery_failure(ctx, pkt, ret);
+ else
+ goto fail;
+ } else {
+ av_log(avf, AV_LOG_INFO, "Recovery successful\n");
+ ctx->recovery_nr = 0;
+ }
+
+ return 0;
+
+fail:
+ free_message(msg);
+ return ret;
+}
+
+static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
+{
+ AVFormatContext *avf = ctx->avf;
+ FifoContext *fifo = avf->priv_data;
+ int ret;
+
+ do {
+ if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
+ int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
+ int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
+ if (time_to_wait)
+ av_usleep(FFMIN(10000, time_to_wait));
+ }
+
+ ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
+ } while (ret == AVERROR(EAGAIN) && fifo->block_on_overflow);
+
+ if (ret == AVERROR(EAGAIN) && !fifo->block_on_overflow) {
+ if (msg->type == FIFO_WRITE_PACKET)
+ av_packet_unref(&msg->pkt);
+ ret = 0;
+ }
+
+ return ret;
+}
+
+static void *fifo_consumer_thread(void *data)
+{
+ AVFormatContext *avf = data;
+ FifoContext *fifo = avf->priv_data;
+ AVThreadMessageQueue *queue = fifo->queue;
+ FifoMessage msg;
+ int ret;
+
+ FifoThreadContext fifo_thread_ctx;
+ memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
+ fifo_thread_ctx.avf = avf;
+
+ while (1) {
+ uint8_t just_flushed = 0;
+
+ /* If the queue is full at the moment when fifo_write_packet
+ * attempts to insert new message (packet) to the queue,
+ * it sets the fifo->overflow_flag to 1 and drops packet.
+ * Here in consumer thread, the flag is checked and if it is
+ * set, the queue is flushed and flag cleared. */
+ pthread_mutex_lock(&fifo->overflow_flag_lock);
+ if (fifo->overflow_flag) {
+ av_thread_message_flush(queue);
+ if (fifo->restart_with_keyframe)
+ fifo_thread_ctx.drop_until_keyframe = 1;
+ fifo->overflow_flag = 0;
+ just_flushed = 1;
+ }
+ pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+ if (just_flushed)
+ av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
+
+ ret = av_thread_message_queue_recv(queue, &msg, 0);
+ if (ret < 0) {
+ av_thread_message_queue_set_err_send(queue, ret);
+ break;
+ }
+
+ if (!fifo_thread_ctx.recovery_nr)
+ ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
+
+ if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
+ int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
+ if (rec_ret < 0) {
+ av_thread_message_queue_set_err_send(queue, rec_ret);
+ break;
+ }
+ }
+ }
+
+ fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
+
+ return NULL;
+}
+
+static int fifo_mux_init(AVFormatContext *avf)
+{
+ FifoContext *fifo = avf->priv_data;
+ AVFormatContext *avf2;
+ int ret = 0, i;
+
+ ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
+ if (ret < 0) {
+ return ret;
+ }
+
+ fifo->avf = avf2;
+
+ avf2->interrupt_callback = avf->interrupt_callback;
+ avf2->max_delay = avf->max_delay;
+ ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
+ if (ret < 0)
+ return ret;
+ avf2->opaque = avf->opaque;
+ avf2->io_close = avf->io_close;
+ avf2->io_open = avf->io_open;
+ avf2->flags = avf->flags;
+
+ for (i = 0; i < avf->nb_streams; ++i) {
+ AVStream *st = avformat_new_stream(avf2, NULL);
+ if (!st)
+ return AVERROR(ENOMEM);
+
+ ret = ff_stream_encode_params_copy(st, avf->streams[i]);
+ if (ret < 0)
+ return ret;
+ }
+
+ return 0;
+}
+
+static int fifo_init(AVFormatContext *avf)
+{
+ FifoContext *fifo = avf->priv_data;
+ int ret = 0;
+
+ if (fifo->recovery_wait_streamtime && fifo->block_on_overflow) {
+ av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
+ " only when block_on_overflow is turned off\n");
+ return AVERROR(EINVAL);
+ }
+
+ if (fifo->format_options_str) {
+ ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
+ "=", ":", 0);
+ if (ret < 0) {
+ av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
+ fifo->format_options_str);
+ return ret;
+ }
+ }
+
+ fifo->oformat = av_guess_format(fifo->format, avf->filename, NULL);
+ if (!fifo->oformat) {
+ ret = AVERROR_MUXER_NOT_FOUND;
+ return ret;
+ }
+
+ ret = fifo_mux_init(avf);
+ if (ret < 0)
+ return ret;
+
+ ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
+ sizeof(FifoMessage));
+ if (!ret)
+ av_thread_message_queue_set_free_func(fifo->queue, free_message);
+
+ ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
+ if (ret < 0)
+ return AVERROR(ret);
+
+ return 0;
+}
+
+static int fifo_write_header(AVFormatContext *avf)
+{
+ FifoContext * fifo = avf->priv_data;
+ FifoMessage message = {.type = FIFO_WRITE_HEADER};
+ int ret;
+
+ ret = av_thread_message_queue_send(fifo->queue, &message, 0);
+ if (ret < 0)
+ return ret;
+
+ ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
+ if (ret) {
+ av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
+ av_err2str(AVERROR(ret)));
+ ret = AVERROR(ret);
+ }
+
+ return 0;
+}
+
+static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
+{
+ FifoContext *fifo = avf->priv_data;
+ FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
+ int ret;
+
+ if (pkt) {
+ av_init_packet(&msg.pkt);
+ ret = av_packet_ref(&msg.pkt,pkt);
+ if (ret < 0)
+ return ret;
+ }
+
+ ret = av_thread_message_queue_send(fifo->queue, &msg,
+ fifo->block_on_overflow ?
+ 0 : AV_THREAD_MESSAGE_NONBLOCK);
+ if (ret == AVERROR(EAGAIN)) {
+ uint8_t overflow_set = 0;
+
+ /* Queue is full, set fifo->overflow_flag to 1
+ * to let consumer thread know the queue should
+ * be flushed. */
+ pthread_mutex_lock(&fifo->overflow_flag_lock);
+ if (!fifo->overflow_flag)
+ fifo->overflow_flag = overflow_set = 1;
+ pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+ if (overflow_set)
+ av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
+ ret = 0;
+ goto fail;
+ } else if (ret < 0) {
+ goto fail;
+ }
+
+ return ret;
+fail:
+ if (pkt)
+ av_packet_unref(&msg.pkt);
+ return ret;
+}
+
+static int fifo_write_trailer(AVFormatContext *avf)
+{
+ FifoContext *fifo= avf->priv_data;
+ int ret;
+
+ av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
+
+ ret = pthread_join( fifo->writer_thread, NULL);
+ if (ret < 0) {
+ av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+ av_err2str(AVERROR(ret)));
+ return AVERROR(ret);
+ }
+
+ ret = fifo->write_trailer_ret;
+ return ret;
+}
+
+static void fifo_deinit(AVFormatContext *avf)
+{
+ FifoContext *fifo = avf->priv_data;
+
+ if (fifo->format_options)
+ av_dict_free(&fifo->format_options);
+
+ if (avf)
+ avformat_free_context(fifo->avf);
+
+ if (fifo->queue) {
+ av_thread_message_flush(fifo->queue);
+ av_thread_message_queue_free(&fifo->queue);
+ }
+
+ pthread_mutex_destroy(&fifo->overflow_flag_lock);
+}
+
+#define OFFSET(x) offsetof(FifoContext, x)
+static const AVOption options[] = {
+ {"fifo_format", "Target muxer", OFFSET(format),
+ AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"queue_size", "Size of fifo queue", OFFSET(queue_size),
+ AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
+ AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"block_on_overflow", "Block output on FIFO queue overflow until queue frees up", OFFSET(block_on_overflow),
+ AV_OPT_TYPE_BOOL, {.i64 = 1}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
+ AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
+ AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
+ AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
+ AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
+ OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
+ AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+ {NULL},
+};
+
+static const AVClass fifo_muxer_class = {
+ .class_name = "Fifo muxer",
+ .item_name = av_default_item_name,
+ .option = options,
+ .version = LIBAVUTIL_VERSION_INT,
+};
+
+AVOutputFormat ff_fifo_muxer = {
+ .name = "fifo",
+ .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
+ .priv_data_size = sizeof(FifoContext),
+ .init = fifo_init,
+ .write_header = fifo_write_header,
+ .write_packet = fifo_write_packet,
+ .write_trailer = fifo_write_trailer,
+ .deinit = fifo_deinit,
+ .priv_class = &fifo_muxer_class,
+ .flags = AVFMT_NOFILE,
+};
+
--
1.9.1
More information about the ffmpeg-devel
mailing list