[FFmpeg-devel] [PATCH 7/7] libavformat: Add FIFO pseudo-muxer

sebechlebskyjan at gmail.com sebechlebskyjan at gmail.com
Mon Jul 4 17:45:15 EEST 2016


From: Jan Sebechlebsky <sebechlebskyjan at gmail.com>

The fifo pseudo-muxer allows to separate encoder from the
actual output by using a first-in-first-out queue and
running actual muxer asynchronously in separate thread.

It can be configured to attempt transparent recovery
of output on failure.

Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
---
 configure                |   1 +
 doc/muxers.texi          |  80 ++++++
 libavformat/Makefile     |   1 +
 libavformat/allformats.c |   1 +
 libavformat/fifo.c       | 666 +++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 749 insertions(+)
 create mode 100644 libavformat/fifo.c

diff --git a/configure b/configure
index 126d0d6..b71c75f 100755
--- a/configure
+++ b/configure
@@ -2826,6 +2826,7 @@ dv_muxer_select="dvprofile"
 dxa_demuxer_select="riffdec"
 eac3_demuxer_select="ac3_parser"
 f4v_muxer_select="mov_muxer"
+fifo_muxer_deps="pthreads"
 flac_demuxer_select="flac_parser"
 hds_muxer_select="flv_muxer"
 hls_muxer_select="mpegts_muxer"
diff --git a/doc/muxers.texi b/doc/muxers.texi
index c2ca0ba..7ca14f6 100644
--- a/doc/muxers.texi
+++ b/doc/muxers.texi
@@ -1408,6 +1408,86 @@ Specify whether to remove all fragments when finished. Default 0 (do not remove)
 
 @end table
 
+ at section fifo
+
+The fifo pseudo-muxer allows to separate encoding from any other muxer by using
+first-in-first-out queue and running the actual muxer in a separate thread. This
+is especially useful in combination with the @ref{tee} muxer and output to
+several destinations with different reliability/writing speed/latency.
+
+The fifo muxer can operate in regular or non-blocking mode. This determines the
+behaviour in case the queue fills up. In regular mode the encoding is blocked
+until the muxer processes some of the packets from queue; in non-blocking mode
+the packets are thrown away rather than blocking the encoder (this might be
+useful in real-time streaming scenarios).
+
+ at table @option
+
+ at item fifo_format
+Specify the format name. Useful if it cannot be guessed from the
+output name suffix.
+
+ at item queue_size
+Specify size of the queue (number of packets). Default value is 60.
+
+ at item format_opts
+Specify format options for the underlying muxer. Muxer options can be specified
+as a list of @var{key}=@var{value} pairs separated by ':'.
+
+ at item block_on_overflow @var{bool}
+If set to 0 (false), non-blocking mode will be used and in case the queue fills
+up, packets will be dropped. By default this option is set to 1 (true), so in
+case of queue overflow the encoder will be blocked until the muxer processes
+some of the packets.
+
+ at item attempt_recovery @var{bool}
+If failure occurs, attempt to recover the output. This is especially useful
+when used with network output, allows to restart streaming transparently.
+By default this option set to 0 (false).
+
+ at item max_recovery_attempts
+Sets maximum number of successive unsucessful recovery attempts after which
+the output fails permanently. Unlimited if set to zero. Default value is 16.
+
+ at item recovery_wait_time @var{duration}
+Waiting time before the next recovery attempt after previous unsuccessfull
+recovery attempt. Default value is 5 seconds.
+
+s at item recovery_wait_streamtime @var{bool}
+If set to 0 (false), the real time is used when waiting for the recovery
+attempt (i.e. the recovery will be attempted after at least
+recovery_wait_time seconds).
+If set to 1 (true), the time of the processed stream is taken into account
+instead (i.e. the recovery will be attempted after at least recovery_wait_time
+seconds of the stream is omitted).
+By default, this option is set to 0 (false).
+
+ at item recover_any_error @var{bool}
+If set to 1 (true), recovery will be attempted regardless of type of the error
+causing the failure. By default this option is set to 0 (false) and in case of
+certain errors the recovery is not attempted even when @ref{attempt_recovery}
+is set to 1.
+
+ at item restart_with_keyframe @var{bool}
+Specify whether to wait for the keyframe after recovering from
+queue overflow or failure. This option is set to 0 (false) by default.
+
+ at end table
+
+ at subsection Examples
+
+ at itemize
+
+ at item
+Stream something to rtmp server using non-blocking mode and recover automatically
+in case failure happens (for example the network becomes unavailable for a moment).
+ at example
+ffmpeg -re -i ... -c:v libx264 -c:a mp2 -f fifo -fifo_format flv -map 0:v -map 0:a
+  -block_on_overflow 0 -attempt_recovery 1 rtmp://example.com/live/stream_name
+ at end example
+
+ at end itemize
+
 @section tee
 
 The tee muxer can be used to write the same data to several files or any
diff --git a/libavformat/Makefile b/libavformat/Makefile
index c49f9de..42fb9be 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -162,6 +162,7 @@ OBJS-$(CONFIG_FFM_DEMUXER)               += ffmdec.o
 OBJS-$(CONFIG_FFM_MUXER)                 += ffmenc.o
 OBJS-$(CONFIG_FFMETADATA_DEMUXER)        += ffmetadec.o
 OBJS-$(CONFIG_FFMETADATA_MUXER)          += ffmetaenc.o
+OBJS-$(CONFIG_FIFO_MUXER)                += fifo.o
 OBJS-$(CONFIG_FILMSTRIP_DEMUXER)         += filmstripdec.o
 OBJS-$(CONFIG_FILMSTRIP_MUXER)           += filmstripenc.o
 OBJS-$(CONFIG_FLAC_DEMUXER)              += flacdec.o rawdec.o \
diff --git a/libavformat/allformats.c b/libavformat/allformats.c
index d490cc4..b21a3de 100644
--- a/libavformat/allformats.c
+++ b/libavformat/allformats.c
@@ -123,6 +123,7 @@ void av_register_all(void)
     REGISTER_MUXER   (F4V,              f4v);
     REGISTER_MUXDEMUX(FFM,              ffm);
     REGISTER_MUXDEMUX(FFMETADATA,       ffmetadata);
+    REGISTER_MUXER   (FIFO,             fifo);
     REGISTER_MUXDEMUX(FILMSTRIP,        filmstrip);
     REGISTER_MUXDEMUX(FLAC,             flac);
     REGISTER_DEMUXER (FLIC,             flic);
diff --git a/libavformat/fifo.c b/libavformat/fifo.c
new file mode 100644
index 0000000..c0aa24b
--- /dev/null
+++ b/libavformat/fifo.c
@@ -0,0 +1,666 @@
+/*
+ * FIFO pseudo-muxer
+ * Copyright (c) 2016 Jan Sebechlebsky
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+#include "libavutil/threadmessage.h"
+#include "avformat.h"
+#include "internal.h"
+#include "pthread.h"
+
+#define FIFO_DEFAULT_QUEUE_SIZE              60
+#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS   16
+#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 second
+
+typedef struct FifoContext {
+    const AVClass *class;
+    AVFormatContext *avf;
+
+    char *format;
+    AVOutputFormat *oformat;
+
+    char *format_options_str;
+    AVDictionary *format_options;
+
+    int queue_size;
+    AVThreadMessageQueue *queue;
+    pthread_t writer_thread;
+
+    /* Return value of last write_trailer_call */
+    int write_trailer_ret;
+
+    /* Time to wait before next recovery attempt
+     * This can refer to the time in processed stream,
+     * or real time. */
+    int64_t recovery_wait_time;
+
+    /* Maximal number of unsuccessful successive recovery attempts */
+    int max_recovery_attempts;
+
+    /* Whether to attempt recovery from failure */
+    uint8_t attempt_recovery;
+
+    /* If >0 stream time will be used when waiting
+     * for the recovery attempt instead of real time */
+    uint8_t recovery_wait_streamtime;
+
+    /* If >0 recovery will be attempted regardless of error code
+     * (except AVERROR_EXIT, so exit request is never ignored) */
+    uint8_t recover_any_error;
+
+    pthread_mutex_t overflow_flag_lock;
+    /* Value > 0 signalizes queue overflow */
+    uint8_t overflow_flag;
+
+    /* Whether to block in case the queue is full. */
+    uint8_t block_on_overflow;
+
+    /* Whether to wait for keyframe when recovering
+     * from failure or queue overflow */
+    uint8_t restart_with_keyframe;
+
+} FifoContext;
+
+typedef struct FifoThreadContext {
+    AVFormatContext *avf;
+
+    /* Timestamp of last failure.
+     * This is either pts in case stream time is used,
+     * or microseconds as returned by av_getttime_relative() */
+    int64_t last_recovery_ts;
+
+    /* Number of current recovery process
+     * Value > 0 means we are in recovery process */
+    int recovery_nr;
+
+    /* If > 0 all frames will be dropped until keyframe is received */
+    uint8_t drop_until_keyframe;
+
+    /* Value > 0 means that the previous write_header call was successful
+     * so finalization by calling write_trailer and ff_io_close must be done
+     * before exiting / reinitialization of underlying muxer */
+    uint8_t header_written;
+} FifoThreadContext;
+
+typedef enum FifoMessageType {
+    FIFO_WRITE_HEADER,
+    FIFO_WRITE_PACKET,
+    FIFO_FLUSH_OUTPUT
+} FifoMessageType;
+
+typedef struct FifoMessage {
+    FifoMessageType type;
+    AVPacket pkt;
+} FifoMessage;
+
+static int fifo_thread_write_header(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    AVDictionary *format_options = NULL;
+    int ret, i;
+
+    ret = av_dict_copy(&format_options, fifo->format_options, 0);
+    if (ret < 0)
+        return ret;
+
+    ret = ff_format_output_open(avf2, avf->filename, &format_options);
+    if (ret < 0) {
+        av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->filename,
+               av_err2str(ret));
+        goto end;
+    }
+
+    for (i = 0;i < avf2->nb_streams; i++)
+        avf2->streams[i]->cur_dts = 0;
+
+    ret = avformat_write_header(avf2, &fifo->format_options);
+    if (!ret)
+        ctx->header_written = 1;
+
+    // Check for options unrecognized by underlying muxer
+    if (format_options) {
+        AVDictionaryEntry *entry = NULL;
+        while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
+            av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
+        ret = AVERROR(EINVAL);
+    }
+
+end:
+    av_dict_free(&format_options);
+    return ret;
+}
+
+static int fifo_thread_flush_output(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+
+    return av_write_frame(avf2, NULL);
+}
+
+static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    int ret;
+
+    if (ctx->drop_until_keyframe) {
+        if (pkt->flags & AV_PKT_FLAG_KEY) {
+            ctx->drop_until_keyframe = 0;
+            av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
+        } else {
+            av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
+            av_packet_unref(pkt);
+            return 0;
+        }
+    }
+
+    if (pkt->pts != AV_NOPTS_VALUE) {
+        int s_idx = pkt->stream_index;
+        AVRational src_tb = avf->streams[s_idx]->time_base;
+        AVRational dst_tb = avf2->streams[s_idx]->time_base;
+
+        pkt->pts = av_rescale_q(pkt->pts, src_tb, dst_tb);
+        pkt->dts = av_rescale_q(pkt->dts, src_tb, dst_tb);
+        pkt->duration = av_rescale_q(pkt->duration, src_tb, dst_tb);
+    }
+
+    ret = av_write_frame(avf2, pkt);
+    if (ret >= 0)
+        av_packet_unref(pkt);
+    return ret;
+}
+
+static int fifo_thread_write_trailer(FifoThreadContext *ctx)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2 = fifo->avf;
+    int ret;
+
+    if (!ctx->header_written)
+        return 0;
+
+    ret = av_write_trailer(avf2);
+    ff_format_io_close(avf2, &avf2->pb);
+
+    return ret;
+}
+
+static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
+{
+    switch (msg->type) {
+    case FIFO_WRITE_HEADER:
+        return fifo_thread_write_header(ctx);
+    case FIFO_WRITE_PACKET:
+        return fifo_thread_write_packet(ctx, &msg->pkt);
+    case FIFO_FLUSH_OUTPUT:
+        return fifo_thread_flush_output(ctx);
+    }
+
+    return AVERROR(EINVAL);
+}
+
+static int is_recoverable(const FifoContext *fifo, int err_no) {
+    if (!fifo->attempt_recovery)
+        return 0;
+
+    if (fifo->recover_any_error)
+        return err_no != AVERROR_EXIT;
+
+    switch (err_no) {
+    case AVERROR(EINVAL):
+    case AVERROR(ENOSYS):
+    case AVERROR_EOF:
+    case AVERROR_EXIT:
+    case AVERROR_PATCHWELCOME:
+        return 0;
+    default:
+        return 1;
+    }
+}
+
+static void free_message(void *msg)
+{
+    FifoMessage *fifo_msg = msg;
+
+    if (fifo_msg->type == FIFO_WRITE_PACKET)
+        av_packet_unref(&fifo_msg->pkt);
+}
+
+static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt,
+                                                int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    int ret;
+
+    av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
+           av_err2str(err_no));
+
+    if (fifo->recovery_wait_streamtime) {
+        if (pkt->pts == AV_NOPTS_VALUE)
+            av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
+                   " timestamp, recovery will be attempted immediately");
+        ctx->last_recovery_ts = pkt->pts;
+    } else
+        ctx->last_recovery_ts = av_gettime_relative();
+
+    if (fifo->max_recovery_attempts &&
+        ctx->recovery_nr >= fifo->max_recovery_attempts) {
+        av_log(avf, AV_LOG_ERROR,
+               "Maximal number of %d recovery attempts reached.\n",
+               fifo->max_recovery_attempts);
+        ret = err_no;
+    } else
+        ret = AVERROR(EAGAIN);
+
+    return ret;
+}
+
+static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    AVPacket *pkt = &msg->pkt;
+    int64_t time_since_recovery;
+    int ret;
+
+    if (!is_recoverable(fifo, err_no)) {
+        ret = err_no;
+        goto fail;
+    }
+
+    if (ctx->header_written) {
+        fifo->write_trailer_ret = fifo_thread_write_trailer(ctx);
+        ctx->header_written = 0;
+    }
+
+    if (!ctx->recovery_nr)
+        ctx->last_recovery_ts = 0;
+    else {
+        if (fifo->recovery_wait_streamtime) {
+            AVRational tb = avf->streams[pkt->stream_index]->time_base;
+            time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
+                                               tb, AV_TIME_BASE_Q);
+        } else {
+            time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
+        }
+
+        if (time_since_recovery < fifo->recovery_wait_time)
+            return AVERROR(EAGAIN);
+    }
+
+    ctx->recovery_nr++;
+
+    if (fifo->max_recovery_attempts)
+        av_log(avf, AV_LOG_INFO, "Recovery attempt #%d/%d\n",
+               ctx->recovery_nr, fifo->max_recovery_attempts);
+    else
+        av_log(avf, AV_LOG_INFO, "Recovery attempt #%d\n",
+               ctx->recovery_nr);
+
+
+    if (msg->type != FIFO_WRITE_HEADER) {
+        ret = fifo_thread_write_header(ctx);
+        if (ret < 0)
+            return fifo_thread_process_recovery_failure(ctx, pkt, ret);
+    }
+
+    if (fifo->restart_with_keyframe && !fifo->block_on_overflow)
+        ctx->drop_until_keyframe = 1;
+
+    ret = fifo_thread_dispatch_message(ctx, msg);
+    if (ret < 0) {
+        if (is_recoverable(fifo, ret))
+            return fifo_thread_process_recovery_failure(ctx, pkt, ret);
+        else
+            goto fail;
+    } else {
+        av_log(avf, AV_LOG_INFO, "Recovery successful\n");
+        ctx->recovery_nr = 0;
+    }
+
+    return 0;
+
+fail:
+    free_message(msg);
+    return ret;
+}
+
+static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
+{
+    AVFormatContext *avf = ctx->avf;
+    FifoContext *fifo = avf->priv_data;
+    int ret;
+
+    do {
+        if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
+            int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
+            int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
+            if (time_to_wait)
+                av_usleep(FFMIN(10000, time_to_wait));
+        }
+
+        ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
+    } while (ret == AVERROR(EAGAIN) && fifo->block_on_overflow);
+
+    if (ret == AVERROR(EAGAIN) && !fifo->block_on_overflow) {
+        if (msg->type == FIFO_WRITE_PACKET)
+            av_packet_unref(&msg->pkt);
+        ret = 0;
+    }
+
+    return ret;
+}
+
+static void *fifo_consumer_thread(void *data)
+{
+    AVFormatContext *avf = data;
+    FifoContext *fifo = avf->priv_data;
+    AVThreadMessageQueue *queue = fifo->queue;
+    FifoMessage msg;
+    int ret;
+
+    FifoThreadContext fifo_thread_ctx;
+    memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
+    fifo_thread_ctx.avf = avf;
+
+    while (1) {
+        uint8_t just_flushed = 0;
+
+        /* If the queue is full at the moment when fifo_write_packet
+         * attempts to insert new message (packet) to the queue,
+         * it sets the fifo->overflow_flag to 1 and drops packet.
+         * Here in consumer thread, the flag is checked and if it is
+         * set, the queue is flushed and flag cleared. */
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
+        if (fifo->overflow_flag) {
+            av_thread_message_flush(queue);
+            if (fifo->restart_with_keyframe)
+                fifo_thread_ctx.drop_until_keyframe = 1;
+            fifo->overflow_flag = 0;
+            just_flushed = 1;
+        }
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+        if (just_flushed)
+            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
+
+        ret = av_thread_message_queue_recv(queue, &msg, 0);
+        if (ret < 0) {
+            av_thread_message_queue_set_err_send(queue, ret);
+            break;
+        }
+
+        if (!fifo_thread_ctx.recovery_nr)
+            ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
+
+        if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
+            int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
+            if (rec_ret < 0) {
+                av_thread_message_queue_set_err_send(queue, rec_ret);
+                break;
+            }
+        }
+    }
+
+    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
+
+    return NULL;
+}
+
+static int fifo_mux_init(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+    AVFormatContext *avf2;
+    int ret = 0, i;
+
+    ret = avformat_alloc_output_context2(&avf2, fifo->oformat, NULL, NULL);
+    if (ret < 0) {
+        return ret;
+    }
+
+    fifo->avf = avf2;
+
+    avf2->interrupt_callback = avf->interrupt_callback;
+    avf2->max_delay = avf->max_delay;
+    ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
+    if (ret < 0)
+        return ret;
+    avf2->opaque = avf->opaque;
+    avf2->io_close = avf->io_close;
+    avf2->io_open = avf->io_open;
+    avf2->flags = avf->flags;
+
+    for (i = 0; i < avf->nb_streams; ++i) {
+        AVStream *st = avformat_new_stream(avf2, NULL);
+        if (!st)
+            return AVERROR(ENOMEM);
+
+        ret = ff_stream_encode_params_copy(st, avf->streams[i]);
+        if (ret < 0)
+            return ret;
+    }
+
+    return 0;
+}
+
+static int fifo_init(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+    int ret = 0;
+
+    if (fifo->recovery_wait_streamtime && fifo->block_on_overflow) {
+        av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
+               " only when block_on_overflow is turned off\n");
+        return AVERROR(EINVAL);
+    }
+
+    if (fifo->format_options_str) {
+        ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
+                                   "=", ":", 0);
+        if (ret < 0) {
+            av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
+                   fifo->format_options_str);
+            return ret;
+        }
+    }
+
+    fifo->oformat = av_guess_format(fifo->format, avf->filename, NULL);
+    if (!fifo->oformat) {
+        ret = AVERROR_MUXER_NOT_FOUND;
+        return ret;
+    }
+
+    ret = fifo_mux_init(avf);
+    if (ret < 0)
+        return ret;
+
+    ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
+                                        sizeof(FifoMessage));
+    if (!ret)
+        av_thread_message_queue_set_free_func(fifo->queue, free_message);
+
+    ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
+    if (ret < 0)
+        return AVERROR(ret);
+
+    return 0;
+}
+
+static int fifo_write_header(AVFormatContext *avf)
+{
+    FifoContext * fifo = avf->priv_data;
+    FifoMessage message = {.type = FIFO_WRITE_HEADER};
+    int ret;
+
+    ret = av_thread_message_queue_send(fifo->queue, &message, 0);
+    if (ret < 0)
+        return ret;
+
+    ret = pthread_create(&fifo->writer_thread, NULL, fifo_consumer_thread, avf);
+    if (ret) {
+        av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
+               av_err2str(AVERROR(ret)));
+        ret = AVERROR(ret);
+    }
+
+    return 0;
+}
+
+static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
+{
+    FifoContext *fifo = avf->priv_data;
+    FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT};
+    int ret;
+
+    if (pkt) {
+        av_init_packet(&msg.pkt);
+        ret = av_packet_ref(&msg.pkt,pkt);
+        if (ret < 0)
+            return ret;
+    }
+
+    ret = av_thread_message_queue_send(fifo->queue, &msg,
+                                       fifo->block_on_overflow ?
+                                       0 : AV_THREAD_MESSAGE_NONBLOCK);
+    if (ret == AVERROR(EAGAIN)) {
+        uint8_t overflow_set = 0;
+
+        /* Queue is full, set fifo->overflow_flag to 1
+         * to let consumer thread know the queue should
+         * be flushed. */
+        pthread_mutex_lock(&fifo->overflow_flag_lock);
+        if (!fifo->overflow_flag)
+            fifo->overflow_flag = overflow_set = 1;
+        pthread_mutex_unlock(&fifo->overflow_flag_lock);
+
+        if (overflow_set)
+            av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
+        ret = 0;
+        goto fail;
+    } else if (ret < 0) {
+        goto fail;
+    }
+
+    return ret;
+fail:
+    if (pkt)
+        av_packet_unref(&msg.pkt);
+    return ret;
+}
+
+static int fifo_write_trailer(AVFormatContext *avf)
+{
+    FifoContext *fifo= avf->priv_data;
+    int ret;
+
+    av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
+
+    ret = pthread_join( fifo->writer_thread, NULL);
+    if (ret < 0) {
+        av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
+               av_err2str(AVERROR(ret)));
+        return AVERROR(ret);
+    }
+
+    ret = fifo->write_trailer_ret;
+    return ret;
+}
+
+static void fifo_deinit(AVFormatContext *avf)
+{
+    FifoContext *fifo = avf->priv_data;
+
+    if (fifo->format_options)
+        av_dict_free(&fifo->format_options);
+
+    if (avf)
+        avformat_free_context(fifo->avf);
+
+    if (fifo->queue) {
+        av_thread_message_flush(fifo->queue);
+        av_thread_message_queue_free(&fifo->queue);
+    }
+
+    pthread_mutex_destroy(&fifo->overflow_flag_lock);
+}
+
+#define OFFSET(x) offsetof(FifoContext, x)
+static const AVOption options[] = {
+        {"fifo_format", "Target muxer", OFFSET(format),
+         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"queue_size", "Size of fifo queue", OFFSET(queue_size),
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, 1024, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options_str),
+         AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"block_on_overflow", "Block output on FIFO queue overflow until queue frees up", OFFSET(block_on_overflow),
+         AV_OPT_TYPE_BOOL, {.i64 = 1}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
+        AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
+         AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
+         AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
+         OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
+         AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
+
+        {NULL},
+};
+
+static const AVClass fifo_muxer_class = {
+    .class_name = "Fifo muxer",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+AVOutputFormat ff_fifo_muxer = {
+    .name           = "fifo",
+    .long_name      = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
+    .priv_data_size = sizeof(FifoContext),
+    .init           = fifo_init,
+    .write_header   = fifo_write_header,
+    .write_packet   = fifo_write_packet,
+    .write_trailer  = fifo_write_trailer,
+    .deinit         = fifo_deinit,
+    .priv_class     = &fifo_muxer_class,
+    .flags          = AVFMT_NOFILE,
+};
+
-- 
1.9.1



More information about the ffmpeg-devel mailing list