[FFmpeg-devel] [PATCH] [RFC]doc/examples: alternative input handler
Bodecs Bela
bodecsb at vivanet.hu
Fri Mar 30 15:47:25 EEST 2018
Hi All,
regularly, on different forums and mailing lists a requirement popups
for a feature to automatically failover switching between main input and a
secondary input in case of main input unavailability.
The base motivation: let's say you have a unreliable live stream source
and you
want to transcode its video and audio streams in realtime but you
want to survive the ocasions when the source is unavailable. So use a
secondary live source but the transition should occur seamlessly without
breaking/re-starting the transcoding processs.
Some days ago there was a discussion on devel-irc about this topic and
we concluded that this feature is not feasible inside ffmpeg without
"hacking", but a separate client app could do this.
So I created this example app to handle two separate input sources and
switching realtime between them. I am not sure wheter it should be
inside the tools subdir.
The detailed description is available in the header section of the
source file.
I will appretiate your suggestions about it.
Thank you in advance.
best,
Bela Bodecs
-------------- next part --------------
>From aa7ed4cdaa3411f7481d6ffa00a5d366a0386525 Mon Sep 17 00:00:00 2001
From: Bela Bodecs <bodecsb at vivanet.hu>
Date: Fri, 30 Mar 2018 14:30:25 +0200
Subject: [PATCH] [RFC]doc/examples: alternative input handler
API utility for automatic failover switching between main input and
secondary input in case of input unavailability.
Motivation: let's say you have a unreliable live stream source and you
want totranscode its first video and audio stream in realtime but you
want to survive the ocasions when the source is unavailable. So use a
secondary live source but the transition should occur seamlessly without
breaking/re-starting the transcoding processs.
See the source file header section for detailed description.
Signed-off-by: Bela Bodecs <bodecsb at vivanet.hu>
---
configure | 2 +
doc/examples/Makefile | 1 +
doc/examples/Makefile.example | 1 +
doc/examples/alternative_input.c | 1233 ++++++++++++++++++++++++++++++++++++++
4 files changed, 1237 insertions(+)
create mode 100644 doc/examples/alternative_input.c
diff --git a/configure b/configure
index 0c5ed07..5585c60 100755
--- a/configure
+++ b/configure
@@ -1529,6 +1529,7 @@ EXAMPLE_LIST="
transcoding_example
vaapi_encode_example
vaapi_transcode_example
+ alternative_input_example
"
EXTERNAL_AUTODETECT_LIBRARY_LIST="
@@ -3337,6 +3338,7 @@ transcode_aac_example_deps="avcodec avformat swresample"
transcoding_example_deps="avfilter avcodec avformat avutil"
vaapi_encode_example_deps="avcodec avutil h264_vaapi_encoder"
vaapi_transcode_example_deps="avcodec avformat avutil h264_vaapi_encoder"
+alternative_input_example_deps="avfilter avcodec avformat avutil"
# EXTRALIBS_LIST
cpu_init_extralibs="pthreads_extralibs"
diff --git a/doc/examples/Makefile b/doc/examples/Makefile
index 928ff30..3c50eca 100644
--- a/doc/examples/Makefile
+++ b/doc/examples/Makefile
@@ -21,6 +21,7 @@ EXAMPLES-$(CONFIG_TRANSCODE_AAC_EXAMPLE) += transcode_aac
EXAMPLES-$(CONFIG_TRANSCODING_EXAMPLE) += transcoding
EXAMPLES-$(CONFIG_VAAPI_ENCODE_EXAMPLE) += vaapi_encode
EXAMPLES-$(CONFIG_VAAPI_TRANSCODE_EXAMPLE) += vaapi_transcode
+EXAMPLES-$(CONFIG_ALTERNATIVE_INPUT_EXAMPLE) += alternative_input
EXAMPLES := $(EXAMPLES-yes:%=doc/examples/%$(PROGSSUF)$(EXESUF))
EXAMPLES_G := $(EXAMPLES-yes:%=doc/examples/%$(PROGSSUF)_g$(EXESUF))
diff --git a/doc/examples/Makefile.example b/doc/examples/Makefile.example
index 6428154..937d266 100644
--- a/doc/examples/Makefile.example
+++ b/doc/examples/Makefile.example
@@ -30,6 +30,7 @@ EXAMPLES= avio_dir_cmd \
scaling_video \
transcode_aac \
transcoding \
+ alternative_input \
OBJS=$(addsuffix .o,$(EXAMPLES))
diff --git a/doc/examples/alternative_input.c b/doc/examples/alternative_input.c
new file mode 100644
index 0000000..3d1e534
--- /dev/null
+++ b/doc/examples/alternative_input.c
@@ -0,0 +1,1233 @@
+/*
+ * Copyright (c) 2018 Bodecs Bela
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+/**
+ * @file
+ * API utility for automatic failover switching between main input and secondary
+ * input in case of input unavailability
+ * @example alternative_input.c
+ */
+
+/**
+ * Motivation: let's say you have a unreliable live stream source and you want to
+ * transcode its first video and audio stream in realtime
+ * but you want to survive the ocasions when
+ * the source is unavailable. So use a secondary live source but
+ * the transition should occur seamlessly without breaking/re-starting
+ * the transcoding processs
+ *
+ * You may have a main source as an flv format rtmp://<server>/<stream> or
+ * an mpegts format udp://<multicast_address:port>/ or a
+ * hls format http://<server>/stream.m3u8 or whatever similar.
+ *
+ * Your original ffmpeg command line may look like this:
+ * ffmpeg -f <input_format_name> -i <main_input_url> -map 0:v:0 -map 0:a:0
+ * -c:v x264 -s 640x360 -r 25 -pix_fmt yuv420p
+ * -c:a aac -ac 2 -ar 44100
+ * -f hls out.m3u8
+ *
+ * Should the source is unavailable you may want to use a secondary source to show a
+ * color-bar screen with a silent audio. To achive this we virtually cut into
+ * two halves your original ffmpeg command and insert alternative_input handler
+ * between them.
+ *
+ * Here is the modified output handler command line: (command#1)
+ * ffmpeg -y -f nut -listen 1 -i unix:output.unix
+ * -c:v x264 -s 640x360 -r 25 -pix_fmt yuv420p
+ * -c:a aac -ac 2 -ar 44100
+ * -f hls out.m3u8
+ *
+ * here is the modified main input producer command line: (command#2)
+ * ffmpeg -y -f <input_format_name> -i <main_input_url> -map 0:v:0 -map 0:a:0
+ * -c:v rawvideo -s 640x360 -r 25 -pix_fmt yuv420p
+ * -c:a pcm_s32le -ac 2 -ar 44100
+ * -f nut -listen 1 unix:input_main.unix
+ *
+ * here is the secondary input producer command line: (command#3)
+ * ffmpeg -y -re -f lavfi
+ * -i "aevalsrc=exprs=0:nb_samples=1024:sample_rate=44100:channel_layout=stereo, \
+ * aformat=sample_fmts=s32"
+ * -re -f lavfi -i "smptehdbars=size=640x360:rate=25, format=pix_fmts=yuv420p"
+ * -c:v rawvideo -c:a pcm_s32le
+ * -map 1 -map 0
+ * -f nut -listen 1 unix:input_second.unix
+ *
+ * and finally the alternative input handler command line: (command#4)
+ * alternative_input -im unix:input_main.unix -ifm nut
+ * -is unix:input_second.unix -ifs nut
+ * -o unix:output.unix -of nut
+ * -timeout 150
+ *
+ * How to test:
+ * start modified output handler (command#1), then in a separate window
+ * start alternative input handler (command#4), then in a separate window
+ * start main input producer (command#2) and then in a separate window
+ * start secondary input producer (command#3). You will get on the output
+ * of output handler the main input. Now stop main input producer
+ * eg. by pressing q in its window. Now you get the secondary source
+ * (smpt-colorbars on screen and silence as audio) on the output of output
+ * handler Now, start the main input producer again. After successfull start
+ * you will get on the output of output handler the main input again.
+ *
+ * some suggestions:
+ * - use long analyze duration (-analyzeduration 10000000) option
+ * on main input to reliably collect all input info
+ * - all corresponding elementary streams on inputs of alternative
+ * input handler must have matching properties regarding
+ * stream type, pix format, pix size, audio sample rate
+ * - expected input format of alternative input handler is always
+ * intra only video and audio format is pcm_s32le
+ * - elementary stream number is unlimited in inputs
+ * - on beginning first start output handler, then alternative input handler,
+ * then main input and then secondary input because alternative input handler
+ * will stop immediatly if output is not writeable but try to open
+ * inputs continously
+ * - at beginning no output will be produced as long as both of
+ * main and second input are not opened
+ * - alternative input handler output video codec is rawvideo and
+ * output audio codec is pcm_s32le
+ * - nut muxer/demuxer format was tested successfully for output/input,
+ * other format may work (e.g. avi with their limitations)
+ * - only unix protocol was tested successfully for input/output
+ * - unavailable input will be tested for re-opening in each 1000 ms, even
+ * the secondary input as well
+ * - should the main input is avalailable again the switching back occurs
+ *
+ *
+ * Description of command line parameters of alternative input handler:
+ * -im url of primary/main input
+ * -ifm (optional) format name of primary input
+ * -is url of secondary input
+ * -ifs (optional) format name of secondary input
+ * -o url of output
+ * -of (optional) output format name
+ * -timeout (optional) if main input is not available for this time period,
+ * switching to the second input will occur (defautl value 100ms),
+ * value expressed in milliseconds
+ * -loglevel (optional) info|debug|warning|error (default level is info)
+ * -dsc (optional) internally inputs are consumed in real time fashion,
+ * if data may arrive quicker than relatime according to incoming timestamps,
+ * reading will be slow down if consecutive timestamps differ more
+ * than this threshold value. So input data will be treated as disconitnued.
+ * Value expressed in microseconds, default value is 3000000
+ *
+ */
+
+#include <unistd.h>
+
+#include <libavcodec/avcodec.h>
+#include <libavformat/avformat.h>
+#include <libavfilter/buffersink.h>
+#include <libavfilter/buffersrc.h>
+#include <libavutil/opt.h>
+#include <libavutil/channel_layout.h>
+#include <libavutil/frame.h>
+#include <libavutil/time.h>
+#include <libavutil/mathematics.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdarg.h>
+
+// how often try to re-open input in case of failover
+#define INPUT_TRYING_INTERVAL_USEC 1000000
+#define DEFAULT_INPUT_TIMEOUT_MSEC 100
+#define DEFAULT_LOG_LEVEL AV_LOG_INFO
+#define MAIN_INPUT_INDEX 0
+#define SECOND_INPUT_INDEX 1
+#define NB_INPUTS 2
+#define DEFAULT_INPUT_STREAM_TIME_DISCONTINUITY_THRESHOLD_US 3000000
+#define DEFAULT_OUTPUT_AUDIO_CODEC_NAME "pcm_s32le"
+#define DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT AV_SAMPLE_FMT_S32
+#define DEFAULT_OUTPUT_VIDEO_CODEC_NAME "rawvideo"
+
+typedef struct InputStreamStatData {
+ int64_t first_pts; // pts of first encoded active input's frame since the last open in its own input stream timebase
+ int64_t nb_frames; // nb of forwarded/encoded frames of current active input
+} InputStreamStatData;
+
+
+typedef struct OutputStreamStatData {
+ int64_t last_pts; // last encoded output frame end pts (pts + dur) in output stream timebase
+ int64_t first_pts;
+ int64_t pts_delta; // to adjust by this value the encoded frames pts in output stream timebase
+ int64_t nb_frames; // total output frames
+} OutputStreamStatData;
+
+
+typedef struct AppContext {
+ char *input_filenames[NB_INPUTS]; // e.g. "unix:doc/examples/input_main.unix";
+ char *input_format_names[NB_INPUTS]; // e.g "nut"
+
+ AVCodecContext **dec_ctx[NB_INPUTS]; // infinitely many streams in each input
+ AVFormatContext *input_fmt_ctx[NB_INPUTS];
+
+ char *output_filename;
+ char *output_format_name;
+ AVCodecContext **enc_ctx; // infinitely many streams as in input
+ AVFormatContext *output_fmt_ctx;
+
+ InputStreamStatData *input_stream_data;
+ OutputStreamStatData *output_stream_data;
+
+ int input_failover_counter; // main->second switchings
+
+ pthread_mutex_t encoder_mutex;
+ int thread_id[NB_INPUTS];
+ int input_timeout_ms;
+ int input_stream_time_discnt_thrshd_us;
+ int64_t start; // start wallclock time of this program
+
+ volatile sig_atomic_t input_source_index;
+ volatile sig_atomic_t to_exit;
+ volatile sig_atomic_t input_has_new_frame[NB_INPUTS];
+
+ pthread_t input_threads[NB_INPUTS]; // each input has its own reading thread
+
+} AppContext;
+
+
+
+static AppContext app_ctx = { {NULL, NULL}, {NULL, NULL}, {NULL, NULL}, {NULL, NULL},
+ NULL, NULL, NULL, NULL, NULL, NULL,
+ 0, PTHREAD_MUTEX_INITIALIZER,
+ {MAIN_INPUT_INDEX, SECOND_INPUT_INDEX}, DEFAULT_INPUT_TIMEOUT_MSEC,
+ DEFAULT_INPUT_STREAM_TIME_DISCONTINUITY_THRESHOLD_US, 0,
+ 0, 0, {0, 0} };
+
+
+static const char *output_audio_codec_name = DEFAULT_OUTPUT_AUDIO_CODEC_NAME;
+static const char *output_video_codec_name = DEFAULT_OUTPUT_VIDEO_CODEC_NAME;
+
+static void timed_log(int level, const char *fmt, ...)
+{
+ char timed_fmt[2048];
+ int64_t now_us = av_gettime();
+ va_list vl;
+ va_start(vl, fmt);
+ if (snprintf(timed_fmt, sizeof(timed_fmt), "[%"PRId64"--%"PRId64"] %s", now_us, now_us - app_ctx.start, fmt) > 0)
+ av_vlog(NULL, level, timed_fmt, vl);
+ va_end(vl);
+}
+
+static int open_single_input(int input_index)
+{
+ int ret, i;
+ AVInputFormat *input_format = NULL;
+ AVDictionary * input_options = NULL;
+ AVFormatContext * input_fmt_ctx = NULL;
+
+ if (app_ctx.input_format_names[input_index]) {
+ if (!(input_format = av_find_input_format(app_ctx.input_format_names[input_index]))) {
+ timed_log(AV_LOG_ERROR, "Input #%d Unknown input format: '%s'\n", input_index,
+ app_ctx.input_format_names[input_index]);
+ return EINVAL;
+ }
+ }
+
+ av_dict_set(&input_options, "rw_timeout", "2000000", 0);
+ av_dict_set(&input_options, "timeout", "2000", 0);
+ if ((app_ctx.input_fmt_ctx[input_index] = avformat_alloc_context()) < 0)
+ return AVERROR(ENOMEM);
+
+
+ // try to open input several times
+ while (!app_ctx.to_exit) {
+ if ((ret = avformat_open_input(&app_ctx.input_fmt_ctx[input_index],
+ app_ctx.input_filenames[input_index],
+ input_format, &input_options)) >= 0) {
+ timed_log(AV_LOG_INFO, "Input #%d File successfully opened: %s\n",
+ input_index, app_ctx.input_filenames[input_index]);
+ break;
+ }
+ timed_log(AV_LOG_ERROR, "Input #%d Cannot open input file %s, %s\n",
+ input_index, app_ctx.input_filenames[input_index], av_err2str(ret));
+
+ av_usleep(INPUT_TRYING_INTERVAL_USEC);
+ }
+
+
+ input_fmt_ctx = app_ctx.input_fmt_ctx[input_index];
+
+ if ((ret = avformat_find_stream_info(input_fmt_ctx, NULL)) < 0) {
+ timed_log(AV_LOG_ERROR, "Input #%d Cannot find stream information\n", input_index);
+ return ret;
+ }
+
+ app_ctx.dec_ctx[input_index] = av_mallocz_array(input_fmt_ctx->nb_streams,
+ sizeof(*app_ctx.dec_ctx[input_index]));
+ if (!app_ctx.dec_ctx[input_index]) {
+ timed_log(AV_LOG_ERROR, "Could not allocate decoding context array for Input #%d\n", input_index);
+ return AVERROR(ENOMEM);
+ }
+
+
+ // creating decoding context for each input stream
+ for (i = 0; i < input_fmt_ctx->nb_streams; i++) {
+
+ AVStream *stream = input_fmt_ctx->streams[i];
+ AVCodec *dec = avcodec_find_decoder(stream->codecpar->codec_id);
+ AVCodecContext *codec_ctx;
+ if (!dec) {
+ timed_log(AV_LOG_ERROR, "Input #%d Failed to find decoder for elementary stream index #%u\n",
+ input_index, i);
+ return AVERROR_DECODER_NOT_FOUND;
+ }
+ codec_ctx = avcodec_alloc_context3(dec);
+ if (!codec_ctx) {
+ timed_log(AV_LOG_ERROR, "Input #%d Failed to allocate the decoder context for "
+ "elementary stream index #%u\n", input_index, i);
+ return AVERROR(ENOMEM);
+ }
+ ret = avcodec_parameters_to_context(codec_ctx, stream->codecpar);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR,
+ "Input #%d Failed to copy decoder parameters to decoder context for stream #%u\n",
+ input_index, i);
+ return ret;
+ }
+
+ av_opt_set_int(codec_ctx, "refcounted_frames", 1, 0);
+
+
+ /* Reencode video and audio streams and only remux subtitles, data streams etc. */
+ if (codec_ctx->codec_type == AVMEDIA_TYPE_VIDEO || codec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
+ if (codec_ctx->codec_type == AVMEDIA_TYPE_VIDEO)
+ codec_ctx->framerate = av_guess_frame_rate(input_fmt_ctx, stream, NULL);
+ /* Open decoder */
+ ret = avcodec_open2(codec_ctx, dec, NULL);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Input #%d Failed to open decoder for elementary stream #%u\n",
+ input_index, i);
+ return ret;
+ }
+
+ } else if (codec_ctx->codec_type == AVMEDIA_TYPE_UNKNOWN) {
+ timed_log(AV_LOG_FATAL, "Input #%d Elementary stream #%d is of unknown type, cannot proceed\n",
+ input_index, i);
+ return AVERROR(EINVAL);
+
+ }
+
+ app_ctx.dec_ctx[input_index][i] = codec_ctx;
+
+ }
+
+ av_dump_format(input_fmt_ctx, 0, app_ctx.input_filenames[input_index], 0);
+
+ return 0;
+}
+
+
+static int try_to_reopen_input(int input_source_index)
+{
+ int ret;
+ while (!app_ctx.to_exit) {
+ if ((ret = open_single_input(input_source_index)) >= 0) { //
+ timed_log(AV_LOG_INFO, "Input #%d Successfull reopening\n", input_source_index);
+
+ // intentionally do not dry the output pipeline here
+ // but remain in its current state to use other realtime stream as secondary input
+ return 0;
+ }
+ av_usleep(INPUT_TRYING_INTERVAL_USEC);
+ }
+ return AVERROR(EIO);
+}
+
+// input packet maybe null in case of drying
+static int encode_frame(AVFrame *frame, int stream_index, int input_source_index)
+{
+ int ret;
+ AVCodecContext * enc_ctx = app_ctx.enc_ctx[stream_index];
+ AVPacket *output_packet;
+
+ output_packet = av_packet_alloc();
+ if (!output_packet) {
+ timed_log(AV_LOG_ERROR, "Input #%d Stream #%d could not allocate output packet\n",
+ input_source_index, stream_index);
+ return AVERROR(ENOMEM);
+ }
+
+ /* send the frame to the encoder */
+ if (frame) { // frame maybe null
+ OutputStreamStatData * st_data = &app_ctx.output_stream_data[stream_index];
+ st_data->last_pts = frame->pts;
+ if (!st_data->nb_frames)
+ st_data->first_pts = frame->pts;
+ st_data->nb_frames++;
+
+ // add calculated frame duration to input frame pts
+ if (enc_ctx->codec_type == AVMEDIA_TYPE_AUDIO && frame->sample_rate)
+ // calculate frame duration by number of audio samples
+ st_data->last_pts += av_rescale_q(frame->nb_samples, av_make_q(1, frame->sample_rate), enc_ctx->time_base);
+
+ else if (enc_ctx->codec_type == AVMEDIA_TYPE_VIDEO && st_data->nb_frames >= 2)
+ // use overall mean frame duration (curr_pts/nb_frames-1) * nb_frames
+ st_data->last_pts = av_rescale(frame->pts - st_data->first_pts, st_data->nb_frames, st_data->nb_frames - 1);
+
+
+ timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d Send frame for encoding, pts: %3"PRId64"\n",
+ input_source_index, stream_index, frame->pts);
+ }
+
+ ret = avcodec_send_frame(enc_ctx, frame);
+ if (ret == AVERROR(EAGAIN)) {
+
+ } else if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Input #%d Error sending a frame for encoding: %s\n",
+ input_source_index, av_err2str(ret));
+ return ret;
+ }
+
+ while (ret >= 0) {
+ ret = avcodec_receive_packet(enc_ctx, output_packet);
+ if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
+ return ret;
+ else if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error during encoding: %s\n",
+ input_source_index, stream_index, av_err2str(ret));
+ return ret;
+ }
+
+ timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d Write output packet, pts: %"PRId64" (size=%d)\n",
+ input_source_index, stream_index, output_packet->pts, output_packet->size);
+
+ output_packet->stream_index = stream_index;
+ ret = av_interleaved_write_frame(app_ctx.output_fmt_ctx, output_packet);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error muxing packet, %s\n",
+ input_source_index, stream_index, av_err2str(ret));
+ break;
+ }
+ av_packet_unref(output_packet);
+ }
+
+ av_packet_free(&output_packet);
+ return ret;
+}
+
+
+// packet maybe null, so need stream_index
+static int handle_received_packet(AVPacket *packet, int stream_index, int input_source_index)
+{
+ int ret = 0;
+ int64_t new_pts = 0;
+
+ AVCodecContext * dec_ctx = app_ctx.dec_ctx[input_source_index][stream_index];
+ AVFormatContext * input_fmt_ctx = app_ctx.input_fmt_ctx[input_source_index];
+
+ AVFrame *frame = av_frame_alloc();
+ if (!frame) {
+ timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Could not allocate frame\n",
+ input_source_index, stream_index);
+ return AVERROR(ENOMEM);
+ }
+
+ if (packet) {
+ timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d packet received, pts: %3"PRId64", size: %d\n",
+ input_source_index, stream_index, packet->pts, packet->size);
+ }
+
+ ret = avcodec_send_packet(dec_ctx, packet);
+ if (ret == AVERROR(EAGAIN)) {
+ // nothing to do
+ } else if (ret == AVERROR_EOF) {
+ timed_log(AV_LOG_INFO, "Input #%d Stream #%d avcodec_send_packet returned: %s\n",
+ input_source_index, stream_index, av_err2str(ret));
+
+ } else if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error while sending a packet to decoder: %s\n",
+ input_source_index, stream_index, av_err2str(ret));
+ av_frame_free(&frame);
+ return ret;
+ }
+
+ while (ret >= 0) {
+ ret = avcodec_receive_frame(dec_ctx, frame);
+ if (ret == AVERROR(EAGAIN))
+ break;
+ else if (ret == AVERROR_EOF) {
+ timed_log(AV_LOG_INFO, "Input #%d Stream #%d avcodec_receive_frame returned: %s\n",
+ input_source_index, stream_index, av_err2str(ret));
+ break;
+ } else if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error while receiving a frame from decoder: %s\n",
+ input_source_index, stream_index, av_err2str(ret));
+ av_frame_free(&frame);
+ return ret;
+ }
+
+ app_ctx.input_has_new_frame[input_source_index] = 1;
+ timed_log(AV_LOG_DEBUG, "Input #%d Set input_has_new_frame flag\n", input_source_index);
+ if (app_ctx.input_source_index == input_source_index && !pthread_mutex_trylock(&app_ctx.encoder_mutex) ) {
+
+ InputStreamStatData * in_st_data = &app_ctx.input_stream_data[stream_index];
+
+ if (in_st_data->first_pts == AV_NOPTS_VALUE) {
+ in_st_data->first_pts = frame->pts;
+ in_st_data->nb_frames = 1;
+
+ } else {
+
+ int64_t avg_delta_frame_pts = (frame->pts - in_st_data->first_pts) / (double)in_st_data->nb_frames;
+ int64_t avg_delta_frame_pts_time = av_rescale_q(avg_delta_frame_pts, dec_ctx->time_base, AV_TIME_BASE_Q);
+
+ if (in_st_data->nb_frames > 25 && dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO)
+ timed_log(AV_LOG_DEBUG, "Input #%d stream #%d stream fps: %0.2f, nb_frames: %"PRId64"\n",
+ input_source_index, stream_index,
+ (double)1000000/avg_delta_frame_pts_time, in_st_data->nb_frames);
+ else
+ timed_log(AV_LOG_DEBUG, "Input #%d stream #%d nb_frames: %"PRId64"\n",
+ input_source_index, stream_index, in_st_data->nb_frames);
+
+ in_st_data->nb_frames ++;
+ }
+
+ new_pts = av_rescale_q_rnd(frame->pts - in_st_data->first_pts,
+ input_fmt_ctx->streams[stream_index]->time_base,
+ app_ctx.output_fmt_ctx->streams[stream_index]->time_base,
+ AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
+ new_pts += app_ctx.output_stream_data[stream_index].pts_delta;
+
+ timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d frame received and sending for encoding, "
+ "pts: %"PRId64" => %"PRId64"\n", input_source_index,
+ stream_index, frame->pts, new_pts);
+
+ frame->pts = new_pts;
+
+ ret = encode_frame(frame, stream_index, input_source_index);
+ if (ret < 0 && ret != AVERROR(EAGAIN)) {
+ app_ctx.to_exit = 1;
+ timed_log(AV_LOG_INFO, "encoding terminating\n");
+ }
+ pthread_mutex_unlock(&app_ctx.encoder_mutex);
+
+ } else
+ ret = 0;
+
+ av_frame_unref(frame);
+ }
+
+ av_frame_free(&frame);
+
+ return ret;
+}
+
+
+static void print_usage(const char * program_name)
+{
+ av_log(NULL, AV_LOG_ERROR, "usage: %s -im <primary/main input> [-ifm <format name of primary input>] "
+ "-is <secondary input> [-ifs <format name of secondary input>] "
+ "-o <output> [-of <output format name>] "
+ "[-timeout <input msec>] [-loglevel info|debug|warning|error] "
+ "[-dsc <input disconitnuity threshold usec>]\n", program_name);
+}
+
+
+static int read_parameters(int argc, char **argv)
+{
+ int i;
+
+ for (i = 1; i < argc; i++) {
+ if (!strcmp(argv[i], "-im") && i+1 < argc) {
+ app_ctx.input_filenames[MAIN_INPUT_INDEX] = argv[++i];
+
+ } else if (!strcmp(argv[i], "-ifm") && i+1 < argc) {
+ app_ctx.input_format_names[MAIN_INPUT_INDEX] = argv[++i];
+
+ } else if (!strcmp(argv[i], "-is") && i+1 < argc) {
+ app_ctx.input_filenames[SECOND_INPUT_INDEX] = argv[++i];
+
+ } else if (!strcmp(argv[i], "-ifs") && i+1 < argc) {
+ app_ctx.input_format_names[SECOND_INPUT_INDEX] = argv[++i];
+
+ } else if (!strcmp(argv[i], "-o") && i+1 < argc) {
+ app_ctx.output_filename = argv[++i];
+
+ } else if (!strcmp(argv[i], "-of") && i+1 < argc) {
+ app_ctx.output_format_name = argv[++i];
+
+ } else if (!strcmp(argv[i], "-loglevel") && i+1 < argc) {
+ i++;
+ if (!strcmp(argv[i], "info")) {
+ av_log_set_level(AV_LOG_INFO);
+ } else if (!strcmp(argv[i], "error")) {
+ av_log_set_level(AV_LOG_ERROR);
+ } else if (!strcmp(argv[i], "warning")) {
+ av_log_set_level(AV_LOG_WARNING);
+ } else if (!strcmp(argv[i], "debug")) {
+ av_log_set_level(AV_LOG_DEBUG);
+ } else {
+ timed_log(AV_LOG_ERROR,
+ "Unexpected loglevel value: %s\n", argv[i]);
+ return AVERROR(EINVAL);
+ }
+
+
+ } else if (!strcmp(argv[i], "-timeout") && i+1 < argc) {
+ char * tail = NULL;
+ app_ctx.input_timeout_ms = strtoll(argv[++i], &tail, 10);
+ if (*tail || app_ctx.input_timeout_ms < 1) {
+ timed_log(AV_LOG_ERROR,
+ "Invalid or negative value '%s' for input timeout checking interval\n", argv[i]);
+ return AVERROR(EINVAL);
+ }
+
+ } else if (!strcmp(argv[i], "-dsc") && i+1 < argc) {
+ char * tail = NULL;
+ app_ctx.input_stream_time_discnt_thrshd_us = strtoll(argv[++i], &tail, 10);
+ if (*tail || app_ctx.input_timeout_ms < 1) {
+ timed_log(AV_LOG_ERROR,
+ "Invalid or negative value '%s' for input time discontinuity interval\n", argv[i]);
+ return AVERROR(EINVAL);
+ }
+
+ } else {
+ timed_log(AV_LOG_ERROR, "unknown option, or missing parameter: %s\n", argv[i]);
+ print_usage(argv[0]);
+ return AVERROR(EINVAL);
+ }
+ }
+
+ if (!app_ctx.input_filenames[MAIN_INPUT_INDEX] ||
+ !app_ctx.input_filenames[SECOND_INPUT_INDEX] ||
+ !app_ctx.output_filename) {
+ print_usage(argv[0]);
+ return AVERROR(EINVAL);
+ }
+
+ return 0;
+}
+
+
+static int check_input_streams_matching(void)
+{
+ int i;
+
+ if (app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams != app_ctx.input_fmt_ctx[SECOND_INPUT_INDEX]->nb_streams) {
+ timed_log(AV_LOG_ERROR, "First input has #%d streams but secondary input has #%d streams, "
+ "but stream numbers should be matching, so aborting\n",
+ app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams,
+ app_ctx.input_fmt_ctx[SECOND_INPUT_INDEX]->nb_streams);
+ return AVERROR(EINVAL);
+ }
+
+ for (i = 0; i < app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams; i++) {
+ AVCodecContext * main_dec_ctx = app_ctx.dec_ctx[MAIN_INPUT_INDEX][i];
+ AVCodecContext * second_dec_ctx = app_ctx.dec_ctx[SECOND_INPUT_INDEX][i];
+
+ if (main_dec_ctx->codec_type != second_dec_ctx->codec_type) {
+ timed_log(AV_LOG_ERROR, "Mismatching stream types at #%d elementary stream, aborting\n", i);
+ return AVERROR(EINVAL);
+ }
+
+ if (main_dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) {
+ if (main_dec_ctx->width != second_dec_ctx->width) {
+ timed_log(AV_LOG_ERROR, "at stream #%d video width mismatch: %d != %d\n", i,
+ main_dec_ctx->width, second_dec_ctx->width);
+ return AVERROR(EINVAL);
+ }
+ if (main_dec_ctx->height != second_dec_ctx->height) {
+ timed_log(AV_LOG_ERROR, "at stream #%d video height mismatch: %d != %d\n", i,
+ main_dec_ctx->height, second_dec_ctx->height);
+ return AVERROR(EINVAL);
+ }
+ if (main_dec_ctx->pix_fmt != second_dec_ctx->pix_fmt) {
+ timed_log(AV_LOG_ERROR, "at stream #%d video pix_fmt mismatch: %d != %d\n", i,
+ main_dec_ctx->pix_fmt, second_dec_ctx->pix_fmt);
+ return AVERROR(EINVAL);
+ }
+ // TODO: check more video parameters
+ }
+ if (main_dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
+ if (main_dec_ctx->channels != second_dec_ctx->channels) {
+ timed_log(AV_LOG_ERROR, "at stream #%d audio channel number mismatch: %d != %d\n", i,
+ main_dec_ctx->channels, second_dec_ctx->channels);
+ return AVERROR(EINVAL);
+ }
+ if (main_dec_ctx->channel_layout != second_dec_ctx->channel_layout) {
+ timed_log(AV_LOG_ERROR, "at stream #%d audio channel layout mismatch: %"PRId64" != %"PRId64"\n",
+ i, main_dec_ctx->channel_layout, second_dec_ctx->channel_layout);
+ return AVERROR(EINVAL);
+ }
+ if (main_dec_ctx->sample_rate != second_dec_ctx->sample_rate) {
+ timed_log(AV_LOG_ERROR, "at stream #%d audio sample rate mismatch: %d != %d\n", i,
+ main_dec_ctx->sample_rate, second_dec_ctx->sample_rate);
+ return AVERROR(EINVAL);
+ }
+
+ if (main_dec_ctx->sample_fmt != DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT) {
+ timed_log(AV_LOG_ERROR, "at elementary stream #%d audio sample format is not as expected (%d)\n",
+ i, DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT);
+ return AVERROR(EINVAL);
+ }
+
+ if (main_dec_ctx->sample_fmt != second_dec_ctx->sample_fmt) {
+ timed_log(AV_LOG_ERROR, "at elementary stream #%d audio sample format mismatch: %d != %d\n",
+ i, main_dec_ctx->sample_fmt, second_dec_ctx->sample_fmt);
+ return AVERROR(EINVAL);
+ }
+
+ // TODO: check more audio parameters
+ }
+ }
+
+ return 0;
+}
+
+
+
+static int allocate_arrays(void)
+{
+
+ int nb_streams = app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams;
+
+ app_ctx.enc_ctx = av_mallocz_array(nb_streams, sizeof(*app_ctx.enc_ctx));
+ if (!app_ctx.enc_ctx) {
+ timed_log(AV_LOG_ERROR,"Could not allocate encoder context list\n");
+ return AVERROR(ENOMEM);
+ }
+
+ app_ctx.input_stream_data = av_mallocz_array(nb_streams, sizeof(*app_ctx.input_stream_data));
+ if (!app_ctx.input_stream_data) {
+ timed_log(AV_LOG_ERROR,"Could not allocate input_stream_data list\n");
+ return AVERROR(ENOMEM);
+ }
+
+ app_ctx.output_stream_data = av_mallocz_array(nb_streams, sizeof(*app_ctx.output_stream_data));
+ if (!app_ctx.output_stream_data) {
+ timed_log(AV_LOG_ERROR,"Could not allocate input_stream_data list\n");
+ return AVERROR(ENOMEM);
+ }
+
+ return 0;
+}
+
+
+static int open_output (void)
+{
+ int i, ret;
+ AVDictionary * output_options = NULL;
+ AVOutputFormat * output_format = NULL;
+
+ AVStream * out_stream;
+ AVStream * in_stream;
+ AVCodecContext * dec_ctx = NULL, * enc_ctx = NULL;
+ AVCodec * output_video_codec, * output_audio_codec;
+ AVFormatContext * input_fmt_ctx = app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX];
+
+
+ if (app_ctx.output_format_name) {
+ if (!(output_format = av_guess_format(app_ctx.output_format_name, NULL, NULL))) {
+ timed_log(AV_LOG_ERROR, "Unknown output format: '%s'\n", app_ctx.output_format_name);
+ return AVERROR(EINVAL);
+ }
+ }
+
+ // allocate the output media context
+ ret = avformat_alloc_output_context2(&app_ctx.output_fmt_ctx, output_format, NULL,
+ app_ctx.output_filename);
+ if (ret < 0 || !app_ctx.output_fmt_ctx) {
+ timed_log(AV_LOG_ERROR,"Could not deduce output format for %s.\n", app_ctx.output_filename);
+ return AVERROR(EINVAL);
+ }
+
+ if ((ret = allocate_arrays()) < 0)
+ return ret;
+
+ // find the video encoder for output
+ output_video_codec = avcodec_find_encoder_by_name(output_video_codec_name);
+ if (!output_video_codec) {
+ timed_log(AV_LOG_ERROR, "Output video codec '%s' not found\n", output_video_codec_name);
+ return AVERROR_ENCODER_NOT_FOUND;
+ }
+
+ // find the audio encoder for output
+ output_audio_codec = avcodec_find_encoder_by_name(output_audio_codec_name);
+ if (!output_audio_codec) {
+ timed_log(AV_LOG_ERROR, "Output audio codec '%s' not found\n", output_audio_codec_name);
+ return AVERROR_ENCODER_NOT_FOUND;
+ }
+
+
+ // creating encoding context for each input stream based on main input format
+ for (i = 0; i < input_fmt_ctx->nb_streams; i++) {
+
+ app_ctx.input_stream_data[i].first_pts = AV_NOPTS_VALUE;
+ app_ctx.output_stream_data[i].first_pts = AV_NOPTS_VALUE;
+ app_ctx.output_stream_data[i].last_pts = AV_NOPTS_VALUE;
+ app_ctx.output_stream_data[i].pts_delta = 0;
+ app_ctx.output_stream_data[i].nb_frames = 0;
+
+ in_stream = input_fmt_ctx->streams[i];
+ dec_ctx = app_ctx.dec_ctx[MAIN_INPUT_INDEX][i]; // based on main input
+
+ out_stream = avformat_new_stream(app_ctx.output_fmt_ctx, NULL);
+ if (!out_stream) {
+ timed_log(AV_LOG_ERROR, "Failed allocating output stream\n");
+ return AVERROR_UNKNOWN;
+ }
+
+ enc_ctx = NULL;
+
+ if (dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) {
+
+ // create the context for video encoder
+ enc_ctx = avcodec_alloc_context3(output_video_codec);
+ if (!enc_ctx) {
+ timed_log(AV_LOG_ERROR, "Could not allocate output video codec context\n");
+ return AVERROR(EINVAL);
+ }
+
+ enc_ctx->height = dec_ctx->height;
+ enc_ctx->width = dec_ctx->width;
+ enc_ctx->sample_aspect_ratio = dec_ctx->sample_aspect_ratio;
+ enc_ctx->pix_fmt = dec_ctx->pix_fmt;
+ // TODO: check wheter pix_format included in output_video_codec->pix_fmts,
+ // supported format list of video codec
+
+ enc_ctx->time_base = av_inv_q(dec_ctx->framerate);
+ enc_ctx->gop_size = 0; // intra only, but it is useless in case of rawvideo
+
+ av_opt_set_int(enc_ctx, "refcounted_frames", 1, 0);
+
+ ret = avcodec_open2(enc_ctx, output_video_codec, NULL);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Could not open output video codec: %s\n", av_err2str(ret));
+ return ret;
+ }
+
+ } else if (dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
+
+ // create the context for audio encoder
+ enc_ctx = avcodec_alloc_context3(output_audio_codec);
+ if (!enc_ctx) {
+ timed_log(AV_LOG_ERROR, "Could not allocate output audio codec context\n");
+ return AVERROR(EINVAL);
+ }
+
+ enc_ctx->sample_rate = dec_ctx->sample_rate;
+ enc_ctx->channel_layout = dec_ctx->channel_layout;
+ enc_ctx->channels = dec_ctx->channels;
+ // TODO: check by av_get_channel_layout_nb_channels(enc_ctx->channel_layout);
+
+ enc_ctx->sample_fmt = DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT; // encoder->sample_fmts[0];
+ enc_ctx->time_base = (AVRational){1, enc_ctx->sample_rate};
+
+ ret = avcodec_open2(enc_ctx, output_audio_codec, NULL);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Could not open output audio codec: %s\n", av_err2str(ret));
+ return ret;
+ }
+
+ }
+
+ if (dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO || dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
+
+ ret = avcodec_parameters_from_context(out_stream->codecpar, enc_ctx);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Failed to copy encoder parameters to output stream #%u\n", i);
+ return ret;
+ }
+ if (app_ctx.output_fmt_ctx->oformat->flags & AVFMT_GLOBALHEADER)
+ enc_ctx->flags |= AV_CODEC_FLAG_GLOBAL_HEADER;
+
+ out_stream->time_base = enc_ctx->time_base; // hint for the muxer
+ app_ctx.enc_ctx[i] = enc_ctx;
+
+ } else if (dec_ctx->codec_type == AVMEDIA_TYPE_UNKNOWN) {
+ timed_log(AV_LOG_FATAL, "Elementary stream #%d is of unknown type, cannot proceed\n", i);
+ return AVERROR_INVALIDDATA;
+
+ } else {
+ // this stream will be remuxed only
+ ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Copying codec parameters for elementary stream #%u failed\n", i);
+ return ret;
+ }
+ out_stream->time_base = in_stream->time_base;
+ }
+
+ app_ctx.enc_ctx[i] = enc_ctx;
+
+ }
+
+
+ av_dump_format(app_ctx.output_fmt_ctx, 0, app_ctx.output_filename, 1);
+
+
+ // open the output file, if needed by the format
+ if (!(app_ctx.output_fmt_ctx->oformat->flags & AVFMT_NOFILE)) {
+ ret = avio_open2(&app_ctx.output_fmt_ctx->pb, app_ctx.output_filename,
+ AVIO_FLAG_WRITE, NULL, &output_options);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Could not open '%s': %s\n",
+ app_ctx.output_filename, av_err2str(ret));
+ return ret;
+ }
+ }
+
+
+ // Write the stream header, if any
+ ret = avformat_write_header(app_ctx.output_fmt_ctx, &output_options);
+ if (ret < 0) {
+ timed_log(AV_LOG_ERROR, "Error occurred when opening output file: %s\n", av_err2str(ret));
+ return ret;
+ }
+
+ return 0;
+}
+
+
+static int calculate_new_ts_delta_values(void)
+{
+ int i;
+ int64_t max_last_pts = AV_NOPTS_VALUE;
+ int max_index = -1;
+
+ // find the max last_pts, this will be the old output duration
+ for(i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) {
+ if (app_ctx.output_stream_data[i].last_pts == AV_NOPTS_VALUE)
+ continue;
+
+ if (max_index == -1) {
+ max_index = i;
+ continue;
+ }
+
+ if (av_compare_ts(app_ctx.output_stream_data[i].last_pts,
+ app_ctx.output_fmt_ctx->streams[i]->time_base,
+ app_ctx.output_stream_data[max_index].last_pts,
+ app_ctx.output_fmt_ctx->streams[max_index]->time_base) > 0)
+ max_index = i;
+ }
+
+ if (max_index == -1) {
+ timed_log(AV_LOG_ERROR, "could not calculate new max pts\n");
+ return AVERROR(EINVAL);
+ }
+
+ // save here because we will clear somewhere in the next for loop
+ max_last_pts = app_ctx.output_stream_data[max_index].last_pts;
+
+ // calculate new delta by adding the max and then rescaling to new input time base
+ for(i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) {
+ app_ctx.output_stream_data[i].pts_delta = av_rescale_q_rnd(max_last_pts,
+ app_ctx.output_fmt_ctx->streams[max_index]->time_base,
+ app_ctx.output_fmt_ctx->streams[i]->time_base,
+ AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX);
+ app_ctx.input_stream_data[i].first_pts = AV_NOPTS_VALUE;
+ }
+
+ return 0;
+}
+
+
+static int dry_current_input_pipeline(int input_source_index)
+{
+ int i, ret;
+
+ for (i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++)
+ if ((ret = handle_received_packet(NULL, i, input_source_index)) < 0)
+ if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+ timed_log(AV_LOG_WARNING, "Input #%d stream # %d problem on drying the pipeline: %s/n",
+ input_source_index, i, av_err2str(ret));
+
+ return 0;
+}
+
+
+static int handle_input(int input_source_index)
+{
+ int i, ret, eof_input = 0, error_input = 0, input_reopen_counter = 0;
+ AVPacket input_packet;
+ int64_t dts_delta_time = 0;
+
+ timed_log(AV_LOG_INFO, "Input #%d thread started\n", input_source_index);
+ while (!app_ctx.to_exit) { // almost for ever
+ int to_set_dts_delta_time = 1;
+
+ // read packets continouosly from input
+ while (!app_ctx.to_exit) {
+ ret = av_read_frame(app_ctx.input_fmt_ctx[input_source_index], &input_packet);
+
+ if (ret < 0) {
+ if (ret == AVERROR_EOF) {
+ eof_input = 1;
+ timed_log(AV_LOG_INFO, "input #%d eof detected by av_read_frame\n",
+ input_source_index);
+ } else {
+ error_input = 1;
+ timed_log(AV_LOG_ERROR, "input #%d av_read_frame returned: %s\n",
+ input_source_index, av_err2str(ret));
+ }
+ break;
+ }
+
+ if (input_packet.stream_index >= app_ctx.input_fmt_ctx[input_source_index]->nb_streams)
+ timed_log(AV_LOG_WARNING, "Input #%d unexpected stream index: %d\n",
+ input_source_index, input_packet.stream_index);
+
+ else {
+ // ensuring realtime processing
+ if (input_packet.dts != AV_NOPTS_VALUE) {
+
+ int64_t dts_time = av_rescale_q(input_packet.dts,
+ app_ctx.input_fmt_ctx[input_source_index]->streams[input_packet.stream_index]->time_base,
+ AV_TIME_BASE_Q);
+ int64_t now_us = av_gettime_relative();
+ int64_t sleep_us = dts_time - now_us + dts_delta_time;
+
+ if (to_set_dts_delta_time) {
+ to_set_dts_delta_time = 0;
+ dts_delta_time = now_us - dts_time;
+ sleep_us = 0;
+ }
+
+ if (abs(sleep_us) > app_ctx.input_stream_time_discnt_thrshd_us) {
+ timed_log(AV_LOG_INFO,
+ "Input #%d time discontinuity detected: %"PRIi64"us (limit: %dus), packet wallclock timestamp: %"PRIi64
+ ", delta: %"PRIi64"us\n",
+ input_source_index, sleep_us, app_ctx.input_stream_time_discnt_thrshd_us, dts_time, dts_delta_time);
+ sleep_us = 0;
+ dts_delta_time = now_us - dts_time;
+ }
+ if (sleep_us > app_ctx.input_stream_time_discnt_thrshd_us) {
+ timed_log(AV_LOG_WARNING, "Input %d Too long sleeping time: %"PRIi64", truncate to %d\n",
+ input_source_index, sleep_us, app_ctx.input_stream_time_discnt_thrshd_us);
+ sleep_us = app_ctx.input_stream_time_discnt_thrshd_us;
+ }
+ if (sleep_us > 0) {
+ timed_log(AV_LOG_DEBUG, "Input #%d sleeping %"PRIi64"us to simulate realtime receiving\n",
+ input_source_index, sleep_us);
+ for(;sleep_us > app_ctx.input_timeout_ms * 500; sleep_us -= app_ctx.input_timeout_ms * 500) // 500 = 1000/2
+ av_usleep(sleep_us);
+
+ av_usleep(sleep_us);
+ }
+ }
+
+
+ if (app_ctx.dec_ctx[input_source_index][input_packet.stream_index]->codec_type == AVMEDIA_TYPE_VIDEO ||
+ app_ctx.dec_ctx[input_source_index][input_packet.stream_index]->codec_type == AVMEDIA_TYPE_AUDIO) {
+
+ if ((ret = handle_received_packet(&input_packet, input_packet.stream_index, input_source_index)) < 0)
+ if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+ break;
+
+
+ } else if (app_ctx.input_source_index == input_source_index && !pthread_mutex_trylock(&app_ctx.encoder_mutex) ) {
+ app_ctx.input_has_new_frame[input_source_index] = 1;
+
+ /* remux this frame without reencoding */
+ av_packet_rescale_ts(&input_packet,
+ app_ctx.input_fmt_ctx[input_source_index]->streams[input_packet.stream_index]->time_base,
+ app_ctx.output_fmt_ctx->streams[input_packet.stream_index]->time_base);
+
+ ret = av_interleaved_write_frame(app_ctx.output_fmt_ctx, &input_packet);
+
+ pthread_mutex_unlock(&app_ctx.encoder_mutex);
+
+ if (ret < 0) {
+ app_ctx.to_exit = 1;
+ break;
+ }
+ }
+ }
+ av_packet_unref(&input_packet);
+ }
+
+
+ if (!app_ctx.to_exit && (eof_input || error_input)) {
+
+ timed_log(AV_LOG_INFO, "Gonna reopen Input #%d, ocasion: #%d\n",
+ input_source_index, ++input_reopen_counter);
+
+ // dry current pipeline
+ dry_current_input_pipeline(input_source_index);
+
+ // close input
+ for (i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++)
+ avcodec_free_context(&app_ctx.dec_ctx[input_source_index][i]);
+
+ avformat_close_input(&app_ctx.input_fmt_ctx[input_source_index]);
+
+ eof_input = 0;
+ error_input = 0;
+
+ if (try_to_reopen_input(input_source_index) < 0) {
+ break;
+ }
+
+ }
+ }
+
+ if (!app_ctx.to_exit && eof_input) {
+ // dry current pipeline
+ dry_current_input_pipeline(input_source_index);
+ }
+
+ return 0;
+}
+
+
+
+static void *threaded_input_handler(void * arg)
+{
+ int input_source_index = *(int *)arg;
+ handle_input(input_source_index);
+ pthread_exit(NULL);
+}
+
+
+
+int main(int argc, char **argv)
+{
+ int ret, i, k;
+ int64_t last_input_check_time = 0;
+ pthread_attr_t attr;
+
+ app_ctx.start = av_gettime();
+ av_log_set_level(DEFAULT_LOG_LEVEL);
+
+ // read and check command line parameters
+ if (read_parameters(argc, argv) < 0)
+ exit(1);
+
+ avformat_network_init();
+ avfilter_register_all();
+
+ app_ctx.input_source_index = -1; // none
+ app_ctx.to_exit = 0;
+
+ // For portability, explicitly create threads in a joinable state
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+
+ for (i = 0; i < NB_INPUTS && !app_ctx.to_exit; i++) {
+ if ((ret = open_single_input(i)) < 0) // open input
+ goto end;
+
+ app_ctx.input_has_new_frame[i] = 0;
+ if ((ret = pthread_create(&app_ctx.input_threads[i], &attr, threaded_input_handler, (void *) &app_ctx.thread_id[i]))) {
+ timed_log(AV_LOG_ERROR, "return code from #%d pthread_create() is %d\n", i, ret);
+ goto end;
+ }
+ }
+
+ if ((ret = check_input_streams_matching()) < 0)
+ goto end;
+
+ if (open_output() < 0)
+ goto end;
+
+
+ last_input_check_time = av_gettime_relative();
+ while (!app_ctx.to_exit) { // almost for ever
+ int64_t now_us = av_gettime_relative();
+ int64_t check_interval = now_us - last_input_check_time;
+
+ if (check_interval > app_ctx.input_timeout_ms * 1000) {
+ last_input_check_time = now_us;
+ if (app_ctx.input_source_index == MAIN_INPUT_INDEX && app_ctx.input_has_new_frame[MAIN_INPUT_INDEX]) {
+ // normal case
+ timed_log(AV_LOG_DEBUG, "Checking running main input: ok, in last %"PRIi64"us \n", check_interval);
+
+ } else if (app_ctx.input_source_index != MAIN_INPUT_INDEX && app_ctx.input_has_new_frame[MAIN_INPUT_INDEX]) {
+ if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
+ if (app_ctx.input_source_index >= 0) {
+ timed_log(AV_LOG_INFO, "#%d switching back to main input because new frame arrived\n",
+ app_ctx.input_failover_counter);
+ calculate_new_ts_delta_values();
+ } else
+ timed_log(AV_LOG_INFO, "Switching to main input\n");
+ app_ctx.input_source_index = MAIN_INPUT_INDEX;
+ pthread_mutex_unlock(&app_ctx.encoder_mutex);
+ } else
+ timed_log(AV_LOG_ERROR, "Could not lock encoder_mutex for input switching\n");
+
+ } else if (app_ctx.input_source_index != SECOND_INPUT_INDEX && app_ctx.input_has_new_frame[SECOND_INPUT_INDEX]) {
+ if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
+ if (app_ctx.input_source_index >= 0) {
+ app_ctx.input_failover_counter++;
+ timed_log(AV_LOG_INFO, "#%d switching to second input, now new frame on Input #%d in last %"PRIi64"us\n",
+ app_ctx.input_failover_counter, MAIN_INPUT_INDEX, check_interval);
+ calculate_new_ts_delta_values();
+ } else
+ timed_log(AV_LOG_INFO, "Switching to second input\n");
+ app_ctx.input_source_index = SECOND_INPUT_INDEX;
+ pthread_mutex_unlock(&app_ctx.encoder_mutex);
+ } else
+ timed_log(AV_LOG_ERROR, "Could not lock encoder_mutex for input switching\n");
+ }
+
+
+ for (i = 0; i < NB_INPUTS; i++)
+ app_ctx.input_has_new_frame[i] = 0;
+
+ }
+ av_usleep(app_ctx.input_timeout_ms * 250); // 250 = 1000 / 4
+ }
+
+
+ if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
+ app_ctx.input_source_index = -1;
+ pthread_mutex_unlock(&app_ctx.encoder_mutex);
+ }
+
+ av_write_trailer(app_ctx.output_fmt_ctx);
+
+
+ if (!(app_ctx.output_fmt_ctx->oformat->flags & AVFMT_NOFILE))
+ avio_closep(&app_ctx.output_fmt_ctx->pb);
+
+end:
+
+ app_ctx.to_exit = 1;
+ // wait all input thread to terminate
+ for (i = 0; i < NB_INPUTS; i++)
+ pthread_join(app_ctx.input_threads[i], NULL);
+
+
+ for (k = 0; k < NB_INPUTS; k++) {
+ for (i = 0; app_ctx.output_fmt_ctx &&
+ i < app_ctx.output_fmt_ctx->nb_streams; i++)
+ avcodec_free_context(&app_ctx.dec_ctx[k][i]);
+
+ avformat_close_input(&app_ctx.input_fmt_ctx[k]);
+ }
+
+ for (i = 0; app_ctx.output_fmt_ctx && i < app_ctx.output_fmt_ctx->nb_streams; i++)
+ avcodec_free_context(&app_ctx.enc_ctx[i]);
+
+ avformat_close_input(&app_ctx.output_fmt_ctx);
+ avformat_free_context(app_ctx.output_fmt_ctx);
+
+ avformat_network_deinit();
+
+
+ pthread_mutex_destroy(&app_ctx.encoder_mutex);
+
+ exit(0);
+}
--
2.5.3.windows.1
More information about the ffmpeg-devel
mailing list