[FFmpeg-devel] [PATCH v5 01/11] avformat: Add fifo pseudo-muxer

Marton Balint cus at passwd.hu
Tue Aug 9 00:55:32 EEST 2016


On Thu, 4 Aug 2016, sebechlebskyjan at gmail.com wrote:

> From: Jan Sebechlebsky <sebechlebskyjan at gmail.com>
>
> Signed-off-by: Jan Sebechlebsky <sebechlebskyjan at gmail.com>

applying the patches with "git am" seems to report some whitespace 
issues (empty lines at the end of files) could you have a look at those?

[...]

> --- a/doc/muxers.texi
> +++ b/doc/muxers.texi
> @@ -1436,6 +1436,96 @@ Specify whether to remove all fragments when finished. Default 0 (do not remove)
> 
> @end table
> 
> + at section fifo
> +
> +The fifo pseudo-muxer allows to separate encoding from any other muxer by using

... allows the separation of encoding and muxing by using a ...

> +first-in-first-out queue and running the actual muxer in a separate thread. This
> +is especially useful in combination with the @ref{tee} muxer and output to

I get errors for this line when builiding the docs:
doc/muxers.texi:1443: @ref reference to nonexistent node `tee'

> +several destinations with different reliability/writing speed/latency.
> +
> +The behavior of fifo muxer in case of failure can be configured:

The behaviour of the fifo muxer if the queue fills up or if the output 
fails is selectable,

> + at itemize @bullet
> +
> + at item
> +output can be transparently restarted with configurable delay between retries
> +based on real time or time of the processed stream.
> +
> + at item
> +encoding can be blocked during temporary failure, or continue transparently
> +dropping packets in case fifo queue fills up.
> +
> + at end itemize
> +
> + at table @option
> +
> + at item fifo_format
> +Specify the format name. Useful if it cannot be guessed from the
> +output name suffix.
> +
> + at item queue_size
> +Specify size of the queue (number of packets). Default value is 60.
> +
> + at item format_opts
> +Specify format options for the underlying muxer. Muxer options can be specified
> +as a list of @var{key}=@var{value} pairs separated by ':'.
> +
> + at item drop_pkts_on_overflow @var{bool}
> +If set to 1 (true), in case the fifo queue fills up, packets will be dropped
> +rather than blocking the encoder. This allows to continue streaming without
> +delaying the output, at the cost of ommiting part of the stream. By default

delaying the input you mean?

> +this option is set to 0 (false), so in such cases the encoder will be blocked
> +until the muxer processes some of the packets and none of them is lost.
> +
> + at item attempt_recovery @var{bool}
> +If failure occurs, attempt to recover the output. This is especially useful
> +when used with network output, allows to restart streaming transparently.
> +By default this option set to 0 (false).
> +
> + at item max_recovery_attempts
> +Sets maximum number of successive unsucessful recovery attempts after which
> +the output fails permanently. Unlimited if set to zero. Default value is 16.
> +
> + at item recovery_wait_time @var{duration}
> +Waiting time before the next recovery attempt after previous unsuccessfull
> +recovery attempt. Default value is 5 seconds.
> +
> +s at item recovery_wait_streamtime @var{bool}
> +If set to 0 (false), the real time is used when waiting for the recovery
> +attempt (i.e. the recovery will be attempted after at least
> +recovery_wait_time seconds).
> +If set to 1 (true), the time of the processed stream is taken into account
> +instead (i.e. the recovery will be attempted after at least recovery_wait_time
> +seconds of the stream is omitted).
> +By default, this option is set to 0 (false).
> +
> + at item recover_any_error @var{bool}
> +If set to 1 (true), recovery will be attempted regardless of type of the error
> +causing the failure. By default this option is set to 0 (false) and in case of
> +certain errors the recovery is not attempted even when @ref{attempt_recovery}

... in case of certain (usually permanent) errors ...

I also get an error when building docs:
doc/muxers.texi:1504: @ref reference to nonexistent node `attempt_recovery'

> +is set to 1.
> +
> + at item restart_with_keyframe @var{bool}
> +Specify whether to wait for the keyframe after recovering from
> +queue overflow or failure. This option is set to 0 (false) by default.
> +
> + at end table
> +
> + at subsection Examples
> +
> + at itemize
> +
> + at item
> +Stream something to rtmp server, continue processing the stream at real-time
> +rate even in case of temporary failure (network outage) and attempt to recover
> +streaming every second indefinitely.
> + at example
> +ffmpeg -re -i ... -c:v libx264 -c:a mp2 -f fifo -fifo_format flv -map 0:v -map 0:a
> +  -block_on_overflow 0 -attempt_recovery 1 -recovery_wait_time 1
> +  -max_recovery_attempts 0 rtmp://example.com/live/stream_name
> + at end example
> +
> + at end itemize
> +
> @section tee
>

[...]

> +    /* If >0 recovery will be attempted regardless of error code
> +     * (except AVERROR_EXIT, so exit request is never ignored) */
> +    int recover_any_error;
> +
> +    /* Whether to drop packets in case the queue is full. */
> +    int drop_pkts_on_overflow;
> +
> +    /* Whether to wait for keyframe when recovering
> +     * from failure or queue overflow */
> +    int restart_with_keyframe;
> +
> +    pthread_mutex_t overflow_flag_lock;
> +    /* Value > 0 signalizes queue overflow */

signals

[...]

> +static void *fifo_consumer_thread(void *data)
> +{
> +    AVFormatContext *avf = data;
> +    FifoContext *fifo = avf->priv_data;
> +    AVThreadMessageQueue *queue = fifo->queue;
> +    FifoMessage msg;
> +    int ret;
> +
> +    FifoThreadContext fifo_thread_ctx;
> +    memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
> +    fifo_thread_ctx.avf = avf;
> +
> +    ret = fifo_thread_write_header(&fifo_thread_ctx);
> +    if (ret < 0) {
> +        int rec_ret = fifo_thread_recover(&fifo_thread_ctx, NULL, ret);
> +        if (rec_ret < 0) {
> +            av_thread_message_queue_set_err_send(queue, rec_ret);
> +            return NULL;
> +        }
> +    }

I think you can move this code inside the start of the loop, and eliminate 
similar code at the end of the loop. Also using NULL as msg seems 
suspicous, since you dereference msg in fifo_thread_attempt_recovery 
and in fifo_thread_recover as well.

Maybe it is better to enter the loop with a dummy message type 
WRITE_HEADER, and modify fifo_thread_attempt_recovery accordingly.

> +
> +    while (1) {
> +        uint8_t just_flushed = 0;
> +
> +        /* If the queue is full at the moment when fifo_write_packet
> +         * attempts to insert new message (packet) to the queue,
> +         * it sets the fifo->overflow_flag to 1 and drops packet.
> +         * Here in consumer thread, the flag is checked and if it is
> +         * set, the queue is flushed and flag cleared. */
> +        pthread_mutex_lock(&fifo->overflow_flag_lock);
> +        if (fifo->overflow_flag) {
> +            av_thread_message_flush(queue);
> +            if (fifo->restart_with_keyframe)
> +                fifo_thread_ctx.drop_until_keyframe = 1;
> +            fifo->overflow_flag = 0;
> +            just_flushed = 1;
> +        }
> +        pthread_mutex_unlock(&fifo->overflow_flag_lock);
> +
> +        if (just_flushed)
> +            av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
> +
> +        ret = av_thread_message_queue_recv(queue, &msg, 0);
> +        if (ret < 0) {
> +            av_thread_message_queue_set_err_send(queue, ret);
> +            break;
> +        }
> +
> +        if (!fifo_thread_ctx.recovery_nr)
> +            ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
> +
> +        if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
> +            int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
> +            if (rec_ret < 0) {
> +                av_thread_message_queue_set_err_send(queue, rec_ret);
> +                break;
> +            }
> +        }
> +    }
> +
> +    fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
> +
> +    return NULL;
> +}
> +

[...]

> +static int fifo_init(AVFormatContext *avf)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +    int ret = 0;
> +
> +    if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
> +        av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
> +               " only when block_on_overflow is turned off\n");

" when drop_pkts_on_overflow is also turned on"

> +        return AVERROR(EINVAL);
> +    }
> +
> +    if (fifo->format_options_str) {
> +        ret = av_dict_parse_string(&fifo->format_options, fifo->format_options_str,
> +                                   "=", ":", 0);
> +        if (ret < 0) {
> +            av_log(avf, AV_LOG_ERROR, "Could not parse format options list '%s'\n",
> +                   fifo->format_options_str);
> +            return ret;
> +        }
> +    }
> +
> +    fifo->oformat = av_guess_format(fifo->format, avf->filename, NULL);
> +    if (!fifo->oformat) {
> +        ret = AVERROR_MUXER_NOT_FOUND;
> +        return ret;
> +    }
> +
> +    ret = fifo_mux_init(avf);
> +    if (ret < 0)
> +        return ret;
> +
> +    ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
> +                                        sizeof(FifoMessage));
> +    if (!ret)

use (ret < 0) instead.

> +        av_thread_message_queue_set_free_func(fifo->queue, free_message);
> +
> +    ret = pthread_mutex_init(&fifo->overflow_flag_lock, NULL);
> +    if (ret < 0)
> +        return AVERROR(ret);
> +
> +    return 0;
> +}
> +

[...]

> +static void fifo_deinit(AVFormatContext *avf)
> +{
> +    FifoContext *fifo = avf->priv_data;
> +
> +    if (fifo->format_options)
> +        av_dict_free(&fifo->format_options);
> +
> +    if (avf)

is this supposed to be if (fifo->avf) ?

> +        avformat_free_context(fifo->avf);
> +
> +    if (fifo->queue) {
> +        av_thread_message_flush(fifo->queue);
> +        av_thread_message_queue_free(&fifo->queue);
> +    }
> +
> +    pthread_mutex_destroy(&fifo->overflow_flag_lock);
> +}
> +

[...]

Thanks for your work, and sorry for the delay in the review.

Regards,
Marton


More information about the ffmpeg-devel mailing list