[FFmpeg-devel] [PATCH] avformat/fifo: add timeshift option to delay output
Marton Balint
cus at passwd.hu
Tue Jun 9 02:25:16 EEST 2020
On Mon, 8 Jun 2020, Marton Balint wrote:
>
>
> On Tue, 26 May 2020, Marton Balint wrote:
>
>> Signed-off-by: Marton Balint <cus at passwd.hu>
>> ---
>> doc/muxers.texi | 5 ++++
>> libavformat/fifo.c | 59 ++++++++++++++++++++++++++++++++++++++++++-
>> libavformat/version.h | 2 +-
>> 3 files changed, 64 insertions(+), 2 deletions(-)
>
> Ping, will apply soon.
Applied.
Regards,
Marton
>
> Thanks,
> Marton
>
>>
>> diff --git a/doc/muxers.texi b/doc/muxers.texi
>> index c598abbe66..d6f9de3702 100644
>> --- a/doc/muxers.texi
>> +++ b/doc/muxers.texi
>> @@ -2275,6 +2275,11 @@ certain (usually permanent) errors the recovery is
> not attempted even when
>> Specify whether to wait for the keyframe after recovering from
>> queue overflow or failure. This option is set to 0 (false) by default.
>>
>> + at item timeshift @var{duration}
>> +Buffer the specified amount of packets and delay writing the output. Note
> that
>> + at var{queue_size} must be big enough to store the packets for timeshift. At
> the
>> +end of the input the fifo buffer is flushed at realtime speed.
>> +
>> @end table
>>
>> @subsection Examples
>> diff --git a/libavformat/fifo.c b/libavformat/fifo.c
>> index d11dc6626c..17748e94ce 100644
>> --- a/libavformat/fifo.c
>> +++ b/libavformat/fifo.c
>> @@ -19,6 +19,8 @@
>> * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
>> */
>>
>> +#include <stdatomic.h>
>> +
>> #include "libavutil/avassert.h"
>> #include "libavutil/opt.h"
>> #include "libavutil/time.h"
>> @@ -77,6 +79,9 @@ typedef struct FifoContext {
>> /* Value > 0 signals queue overflow */
>> volatile uint8_t overflow_flag;
>>
>> + atomic_int_least64_t queue_duration;
>> + int64_t last_sent_dts;
>> + int64_t timeshift;
>> } FifoContext;
>>
>> typedef struct FifoThreadContext {
>> @@ -98,9 +103,12 @@ typedef struct FifoThreadContext {
>> * so finalization by calling write_trailer and ff_io_close must be
> done
>> * before exiting / reinitialization of underlying muxer */
>> uint8_t header_written;
>> +
>> + int64_t last_received_dts;
>> } FifoThreadContext;
>>
>> typedef enum FifoMessageType {
>> + FIFO_NOOP,
>> FIFO_WRITE_HEADER,
>> FIFO_WRITE_PACKET,
>> FIFO_FLUSH_OUTPUT
>> @@ -159,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext
> *ctx)
>> return av_write_frame(avf2, NULL);
>> }
>>
>> +static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t
> *last_dts)
>> +{
>> + AVStream *st = avf->streams[pkt->stream_index];
>> + int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
>> + int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts -
> *last_dts);
>> + *last_dts = dts;
>> + return duration;
>> +}
>> +
>> static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
>> {
>> AVFormatContext *avf = ctx->avf;
>> @@ -167,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext
> *ctx, AVPacket *pkt)
>> AVRational src_tb, dst_tb;
>> int ret, s_idx;
>>
>> + if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
>> + atomic_fetch_sub_explicit(&fifo->queue_duration,
> next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
>> +
>> if (ctx->drop_until_keyframe) {
>> if (pkt->flags & AV_PKT_FLAG_KEY) {
>> ctx->drop_until_keyframe = 0;
>> @@ -209,6 +229,9 @@ static int
> fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg
>> {
>> int ret = AVERROR(EINVAL);
>>
>> + if (msg->type == FIFO_NOOP)
>> + return 0;
>> +
>> if (!ctx->header_written) {
>> ret = fifo_thread_write_header(ctx);
>> if (ret < 0)
>> @@ -390,12 +413,13 @@ static void *fifo_consumer_thread(void *data)
>> AVFormatContext *avf = data;
>> FifoContext *fifo = avf->priv_data;
>> AVThreadMessageQueue *queue = fifo->queue;
>> - FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
>> + FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER,
> {0}};
>> int ret;
>>
>> FifoThreadContext fifo_thread_ctx;
>> memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
>> fifo_thread_ctx.avf = avf;
>> + fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
>>
>> while (1) {
>> uint8_t just_flushed = 0;
>> @@ -429,6 +453,10 @@ static void *fifo_consumer_thread(void *data)
>> if (just_flushed)
>> av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
>>
>> + if (fifo->timeshift)
>> + while (atomic_load_explicit(&fifo->queue_duration,
> memory_order_relaxed) < fifo->timeshift)
>> + av_usleep(10000);
>> +
>> ret = av_thread_message_queue_recv(queue, &msg, 0);
>> if (ret < 0) {
>> av_thread_message_queue_set_err_send(queue, ret);
>> @@ -488,6 +516,8 @@ static int fifo_init(AVFormatContext *avf)
>> " only when drop_pkts_on_overflow is also turned on\n");
>> return AVERROR(EINVAL);
>> }
>> + atomic_init(&fifo->queue_duration, 0);
>> + fifo->last_sent_dts = AV_NOPTS_VALUE;
>>
>> oformat = av_guess_format(fifo->format, avf->url, NULL);
>> if (!oformat) {
>> @@ -563,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf,
> AVPacket *pkt)
>> goto fail;
>> }
>>
>> + if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
>> + atomic_fetch_add_explicit(&fifo->queue_duration,
> next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
>> +
>> return ret;
>> fail:
>> if (pkt)
>> @@ -576,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf)
>> int ret;
>>
>> av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
>> + if (fifo->timeshift) {
>> + int64_t now = av_gettime_relative();
>> + int64_t elapsed = 0;
>> + FifoMessage msg = {FIFO_NOOP};
>> + do {
>> + int64_t delay = av_gettime_relative() - now;
>> + if (delay < 0) { // Discontinuity?
>> + delay = 10000;
>> + now = av_gettime_relative();
>> + } else {
>> + now += delay;
>> + }
>> + atomic_fetch_add_explicit(&fifo->queue_duration, delay,
> memory_order_relaxed);
>> + elapsed += delay;
>> + if (elapsed > fifo->timeshift)
>> + break;
>> + av_usleep(10000);
>> + ret = av_thread_message_queue_send(fifo->queue, &msg,
> AV_THREAD_MESSAGE_NONBLOCK);
>> + } while (ret >= 0 || ret == AVERROR(EAGAIN));
>> + atomic_store(&fifo->queue_duration, INT64_MAX);
>> + }
>>
>> ret = pthread_join(fifo->writer_thread, NULL);
>> if (ret < 0) {
>> @@ -630,6 +684,9 @@ static const AVOption options[] = {
>> {"recover_any_error", "Attempt recovery regardless of type of the
> error", OFFSET(recover_any_error),
>> AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
>>
>> + {"timeshift", "Delay fifo output", OFFSET(timeshift),
>> + AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX,
> AV_OPT_FLAG_ENCODING_PARAM},
>> +
>> {NULL},
>> };
>>
>> diff --git a/libavformat/version.h b/libavformat/version.h
>> index 493a0b337f..5c13cf1620 100644
>> --- a/libavformat/version.h
>> +++ b/libavformat/version.h
>> @@ -33,7 +33,7 @@
>> // Also please add any ticket numbers that you believe might be affected
> here
>> #define LIBAVFORMAT_VERSION_MAJOR 58
>> #define LIBAVFORMAT_VERSION_MINOR 43
>> -#define LIBAVFORMAT_VERSION_MICRO 100
>> +#define LIBAVFORMAT_VERSION_MICRO 101
>>
>> #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
>> LIBAVFORMAT_VERSION_MINOR, \
>> --
>> 2.26.1
>>
>> _______________________________________________
>> ffmpeg-devel mailing list
>> ffmpeg-devel at ffmpeg.org
>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>>
>> To unsubscribe, visit link above, or email
>> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
More information about the ffmpeg-devel
mailing list