[FFmpeg-devel] [PATCH] [RFC]doc/examples: alternative input handler

Bodecs Bela bodecsb at vivanet.hu
Fri Mar 30 15:47:25 EEST 2018


Hi All,

regularly, on different forums and mailing lists a requirement popups 
for a feature to automatically failover switching between main input and a
secondary input in case of main input unavailability.

The base motivation: let's say you have a unreliable live stream source 
and you
want to transcode its video and audio streams in realtime but you
want to survive the ocasions when the source is unavailable. So use a
secondary live source but the transition should occur seamlessly without
breaking/re-starting the transcoding processs.

Some days ago there was a discussion on devel-irc about this topic and 
we concluded that this feature is not feasible inside ffmpeg without 
"hacking", but a separate client app could do this.

So I created this example app to handle two separate input sources and 
switching realtime between them. I am not sure wheter it should be 
inside the tools subdir.

The detailed description is available in the header section of the 
source file.

I will appretiate your suggestions about it.

Thank you in advance.

best,

Bela Bodecs


-------------- next part --------------
>From aa7ed4cdaa3411f7481d6ffa00a5d366a0386525 Mon Sep 17 00:00:00 2001
From: Bela Bodecs <bodecsb at vivanet.hu>
Date: Fri, 30 Mar 2018 14:30:25 +0200
Subject: [PATCH] [RFC]doc/examples: alternative input handler

API utility for automatic failover switching between main input and
secondary input in case of input unavailability.
Motivation: let's say you have a unreliable live stream source and you
want totranscode its first video and audio stream in realtime but you
want to survive the ocasions when the source is unavailable. So use a
secondary live source but the transition should occur seamlessly without
breaking/re-starting the transcoding processs.
See the source file header section for detailed description.


Signed-off-by: Bela Bodecs <bodecsb at vivanet.hu>
---
 configure                        |    2 +
 doc/examples/Makefile            |    1 +
 doc/examples/Makefile.example    |    1 +
 doc/examples/alternative_input.c | 1233 ++++++++++++++++++++++++++++++++++++++
 4 files changed, 1237 insertions(+)
 create mode 100644 doc/examples/alternative_input.c

diff --git a/configure b/configure
index 0c5ed07..5585c60 100755
--- a/configure
+++ b/configure
@@ -1529,6 +1529,7 @@ EXAMPLE_LIST="
     transcoding_example
     vaapi_encode_example
     vaapi_transcode_example
+    alternative_input_example
 "
 
 EXTERNAL_AUTODETECT_LIBRARY_LIST="
@@ -3337,6 +3338,7 @@ transcode_aac_example_deps="avcodec avformat swresample"
 transcoding_example_deps="avfilter avcodec avformat avutil"
 vaapi_encode_example_deps="avcodec avutil h264_vaapi_encoder"
 vaapi_transcode_example_deps="avcodec avformat avutil h264_vaapi_encoder"
+alternative_input_example_deps="avfilter avcodec avformat avutil"
 
 # EXTRALIBS_LIST
 cpu_init_extralibs="pthreads_extralibs"
diff --git a/doc/examples/Makefile b/doc/examples/Makefile
index 928ff30..3c50eca 100644
--- a/doc/examples/Makefile
+++ b/doc/examples/Makefile
@@ -21,6 +21,7 @@ EXAMPLES-$(CONFIG_TRANSCODE_AAC_EXAMPLE)     += transcode_aac
 EXAMPLES-$(CONFIG_TRANSCODING_EXAMPLE)       += transcoding
 EXAMPLES-$(CONFIG_VAAPI_ENCODE_EXAMPLE)      += vaapi_encode
 EXAMPLES-$(CONFIG_VAAPI_TRANSCODE_EXAMPLE)   += vaapi_transcode
+EXAMPLES-$(CONFIG_ALTERNATIVE_INPUT_EXAMPLE) += alternative_input
 
 EXAMPLES       := $(EXAMPLES-yes:%=doc/examples/%$(PROGSSUF)$(EXESUF))
 EXAMPLES_G     := $(EXAMPLES-yes:%=doc/examples/%$(PROGSSUF)_g$(EXESUF))
diff --git a/doc/examples/Makefile.example b/doc/examples/Makefile.example
index 6428154..937d266 100644
--- a/doc/examples/Makefile.example
+++ b/doc/examples/Makefile.example
@@ -30,6 +30,7 @@ EXAMPLES=       avio_dir_cmd                       \
                 scaling_video                      \
                 transcode_aac                      \
                 transcoding                        \
+                alternative_input                  \ 
 
 OBJS=$(addsuffix .o,$(EXAMPLES))
 
diff --git a/doc/examples/alternative_input.c b/doc/examples/alternative_input.c
new file mode 100644
index 0000000..3d1e534
--- /dev/null
+++ b/doc/examples/alternative_input.c
@@ -0,0 +1,1233 @@
+/*
+ * Copyright (c) 2018 Bodecs Bela
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+/**
+ * @file
+ * API utility for automatic failover switching between main input and secondary
+ * input in case of input unavailability
+ * @example alternative_input.c
+ */
+
+/**
+ * Motivation: let's say you have a unreliable live stream source and you want to
+ *             transcode its first video and audio stream in realtime
+ *             but you want to survive the ocasions when
+ *             the source is unavailable. So use a secondary live source but
+ *             the transition should occur seamlessly without breaking/re-starting
+ *             the transcoding processs
+ *
+ * You may have a main source as an flv format rtmp://<server>/<stream> or
+ * an mpegts format udp://<multicast_address:port>/ or a
+ * hls format http://<server>/stream.m3u8 or whatever similar.
+ *
+ * Your original ffmpeg command line may look like this:
+ *    ffmpeg -f <input_format_name> -i <main_input_url> -map 0:v:0 -map 0:a:0
+ *           -c:v x264 -s 640x360 -r 25 -pix_fmt yuv420p
+ *           -c:a aac -ac 2 -ar 44100
+ *           -f hls out.m3u8
+ *
+ * Should the source is unavailable you may want to use a secondary source to show a
+ * color-bar screen with a silent audio. To achive this we virtually cut into
+ * two halves your original ffmpeg command and insert alternative_input handler
+ * between them.
+ *
+ * Here is the modified output handler command line: (command#1)
+ * ffmpeg -y -f nut  -listen 1 -i unix:output.unix
+ *         -c:v x264 -s 640x360 -r 25 -pix_fmt yuv420p
+ *         -c:a aac  -ac 2 -ar 44100
+ *         -f hls out.m3u8
+ *
+ * here is the modified main input producer command line: (command#2)
+ * ffmpeg -y  -f <input_format_name> -i <main_input_url> -map 0:v:0 -map 0:a:0
+ *        -c:v rawvideo -s 640x360 -r 25 -pix_fmt yuv420p
+ *        -c:a pcm_s32le -ac 2 -ar 44100
+ *        -f nut -listen 1 unix:input_main.unix
+ *
+ * here is the secondary input producer command line: (command#3)
+ * ffmpeg -y -re -f lavfi
+ *        -i "aevalsrc=exprs=0:nb_samples=1024:sample_rate=44100:channel_layout=stereo, \
+ *            aformat=sample_fmts=s32"
+ *        -re -f lavfi -i "smptehdbars=size=640x360:rate=25, format=pix_fmts=yuv420p"
+ *        -c:v rawvideo -c:a pcm_s32le
+ *        -map 1 -map 0
+ *        -f nut -listen 1 unix:input_second.unix
+ *
+ * and finally the alternative input handler command line: (command#4)
+ * alternative_input -im unix:input_main.unix -ifm nut
+ *                   -is unix:input_second.unix -ifs nut
+ *                   -o unix:output.unix -of nut
+ *                   -timeout 150
+ *
+ * How to test:
+ *  start modified output handler (command#1), then in a separate window
+ *  start alternative input handler (command#4), then in a separate window
+ *  start main input producer (command#2) and then in a separate window
+ *  start secondary input producer (command#3). You will get on the output
+ *  of output handler the main input. Now stop main input producer
+ *  eg. by pressing q in its window. Now you get the secondary source
+ *  (smpt-colorbars on screen and silence as audio) on the output of output
+ *  handler Now, start the main input producer again. After successfull start
+ *  you will get on the output of output handler the main input again.
+ *
+ * some suggestions:
+ *   - use long analyze duration (-analyzeduration 10000000) option
+ *     on main input to reliably collect all input info
+ *   - all corresponding elementary streams on inputs of alternative
+ *     input handler must have matching properties regarding
+ *     stream type, pix format, pix size, audio sample rate
+ *   - expected input format of alternative input handler is always
+ *     intra only video and audio format is pcm_s32le
+ *   - elementary stream number is unlimited in inputs
+ *   - on beginning first start output handler, then alternative input handler,
+ *     then main input and then secondary input because alternative input handler
+ *     will stop immediatly if output is not writeable but try to open
+ *     inputs continously
+ *   - at beginning no output will be produced as long as both of
+ *     main and second input are not opened
+ *   - alternative input handler output video codec is rawvideo and
+ *     output audio codec is pcm_s32le
+ *   - nut muxer/demuxer format was tested successfully for output/input,
+ *     other format may work (e.g. avi with their limitations)
+ *   - only unix protocol was tested successfully for input/output
+ *   - unavailable input will be tested for re-opening in each 1000 ms, even
+ *     the secondary input as well
+ *   - should the main input is avalailable again the switching back occurs
+ *
+ *
+ * Description of command line parameters of alternative input handler:
+ *  -im url of primary/main input
+ *  -ifm (optional) format name of primary input
+ *  -is url of secondary input
+ *  -ifs (optional) format name of secondary input
+ *  -o url of output
+ *  -of (optional) output format name
+ *  -timeout (optional) if main input is not available for this time period,
+ *    switching to the second input will occur (defautl value 100ms),
+ *    value expressed in milliseconds
+ *  -loglevel (optional) info|debug|warning|error (default level is info)
+ *  -dsc (optional) internally inputs are consumed in real time fashion,
+ *    if data may arrive quicker than relatime according to incoming timestamps,
+ *    reading will be slow down if consecutive timestamps differ more
+ *    than this threshold value. So input data will be treated as disconitnued.
+ *    Value expressed in microseconds, default value is 3000000
+ *
+ */
+
+#include <unistd.h>
+
+#include <libavcodec/avcodec.h>
+#include <libavformat/avformat.h>
+#include <libavfilter/buffersink.h>
+#include <libavfilter/buffersrc.h>
+#include <libavutil/opt.h>
+#include <libavutil/channel_layout.h>
+#include <libavutil/frame.h>
+#include <libavutil/time.h>
+#include <libavutil/mathematics.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdarg.h>
+
+// how often try to re-open input in case of failover
+#define INPUT_TRYING_INTERVAL_USEC   1000000
+#define DEFAULT_INPUT_TIMEOUT_MSEC 100
+#define DEFAULT_LOG_LEVEL AV_LOG_INFO
+#define MAIN_INPUT_INDEX 0
+#define SECOND_INPUT_INDEX 1
+#define NB_INPUTS 2
+#define DEFAULT_INPUT_STREAM_TIME_DISCONTINUITY_THRESHOLD_US 3000000
+#define DEFAULT_OUTPUT_AUDIO_CODEC_NAME "pcm_s32le"
+#define DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT   AV_SAMPLE_FMT_S32
+#define DEFAULT_OUTPUT_VIDEO_CODEC_NAME "rawvideo"
+
+typedef struct InputStreamStatData {
+    int64_t first_pts;  // pts of first encoded active input's frame since the last open in its own input stream timebase
+    int64_t nb_frames;  // nb of forwarded/encoded frames of current active input
+} InputStreamStatData;
+
+
+typedef struct OutputStreamStatData {
+    int64_t last_pts;   // last encoded output frame end pts (pts + dur) in output stream timebase
+    int64_t first_pts;
+    int64_t pts_delta;  // to adjust by this value the encoded frames pts in output stream timebase
+    int64_t nb_frames;  // total output frames
+} OutputStreamStatData;
+
+
+typedef struct AppContext {
+    char *input_filenames[NB_INPUTS];  // e.g. "unix:doc/examples/input_main.unix";
+    char *input_format_names[NB_INPUTS];  // e.g "nut"
+
+    AVCodecContext **dec_ctx[NB_INPUTS];  // infinitely many streams in each input
+    AVFormatContext *input_fmt_ctx[NB_INPUTS];
+
+    char *output_filename;
+    char *output_format_name;
+    AVCodecContext **enc_ctx; // infinitely many streams as in input
+    AVFormatContext *output_fmt_ctx;
+
+    InputStreamStatData *input_stream_data;
+    OutputStreamStatData *output_stream_data;
+
+    int input_failover_counter;  // main->second  switchings
+
+    pthread_mutex_t encoder_mutex;
+    int thread_id[NB_INPUTS];
+    int input_timeout_ms;
+    int input_stream_time_discnt_thrshd_us;
+    int64_t start;  // start wallclock time of this program
+
+    volatile sig_atomic_t input_source_index;
+    volatile sig_atomic_t to_exit;
+    volatile sig_atomic_t input_has_new_frame[NB_INPUTS];
+
+    pthread_t input_threads[NB_INPUTS]; // each input has its own reading thread
+
+} AppContext;
+
+
+
+static AppContext app_ctx = { {NULL, NULL}, {NULL, NULL}, {NULL, NULL}, {NULL, NULL},
+                              NULL, NULL, NULL, NULL, NULL, NULL,
+                              0, PTHREAD_MUTEX_INITIALIZER,
+                              {MAIN_INPUT_INDEX, SECOND_INPUT_INDEX}, DEFAULT_INPUT_TIMEOUT_MSEC,
+                              DEFAULT_INPUT_STREAM_TIME_DISCONTINUITY_THRESHOLD_US, 0,
+                              0, 0, {0, 0} };
+
+
+static const char *output_audio_codec_name = DEFAULT_OUTPUT_AUDIO_CODEC_NAME;
+static const char *output_video_codec_name = DEFAULT_OUTPUT_VIDEO_CODEC_NAME;
+
+static void timed_log(int level, const char *fmt, ...)
+{
+    char timed_fmt[2048];
+    int64_t now_us = av_gettime();
+    va_list vl;
+    va_start(vl, fmt);
+    if (snprintf(timed_fmt, sizeof(timed_fmt), "[%"PRId64"--%"PRId64"] %s", now_us, now_us - app_ctx.start, fmt) > 0)
+        av_vlog(NULL, level, timed_fmt, vl);
+    va_end(vl);
+}
+
+static int open_single_input(int input_index)
+{
+    int ret, i;
+    AVInputFormat *input_format = NULL;
+    AVDictionary * input_options = NULL;
+    AVFormatContext * input_fmt_ctx = NULL;
+
+    if (app_ctx.input_format_names[input_index]) {
+        if (!(input_format = av_find_input_format(app_ctx.input_format_names[input_index]))) {
+            timed_log(AV_LOG_ERROR, "Input #%d Unknown input format: '%s'\n", input_index,
+                      app_ctx.input_format_names[input_index]);
+            return EINVAL;
+        }
+    }
+
+    av_dict_set(&input_options, "rw_timeout", "2000000", 0);
+    av_dict_set(&input_options, "timeout", "2000", 0);
+    if ((app_ctx.input_fmt_ctx[input_index] = avformat_alloc_context()) < 0)
+            return AVERROR(ENOMEM);
+
+
+    // try to open input several times
+    while (!app_ctx.to_exit) {
+        if ((ret = avformat_open_input(&app_ctx.input_fmt_ctx[input_index],
+                                       app_ctx.input_filenames[input_index],
+                                       input_format, &input_options)) >= 0) {
+            timed_log(AV_LOG_INFO, "Input #%d File successfully opened: %s\n",
+                      input_index, app_ctx.input_filenames[input_index]);
+            break;
+        }
+        timed_log(AV_LOG_ERROR, "Input #%d Cannot open input file %s, %s\n",
+                      input_index, app_ctx.input_filenames[input_index], av_err2str(ret));
+
+        av_usleep(INPUT_TRYING_INTERVAL_USEC);
+    }
+
+
+    input_fmt_ctx = app_ctx.input_fmt_ctx[input_index];
+
+    if ((ret = avformat_find_stream_info(input_fmt_ctx, NULL)) < 0) {
+        timed_log(AV_LOG_ERROR, "Input #%d Cannot find stream information\n", input_index);
+        return ret;
+    }
+
+    app_ctx.dec_ctx[input_index] = av_mallocz_array(input_fmt_ctx->nb_streams,
+                                                    sizeof(*app_ctx.dec_ctx[input_index]));
+    if (!app_ctx.dec_ctx[input_index]) {
+        timed_log(AV_LOG_ERROR, "Could not allocate decoding context array for Input #%d\n", input_index);
+        return AVERROR(ENOMEM);
+    }
+
+
+    // creating decoding context for each input stream
+    for (i = 0; i < input_fmt_ctx->nb_streams; i++) {
+
+        AVStream *stream = input_fmt_ctx->streams[i];
+        AVCodec *dec = avcodec_find_decoder(stream->codecpar->codec_id);
+        AVCodecContext *codec_ctx;
+        if (!dec) {
+            timed_log(AV_LOG_ERROR, "Input #%d Failed to find decoder for elementary stream index #%u\n",
+                      input_index, i);
+            return AVERROR_DECODER_NOT_FOUND;
+        }
+        codec_ctx = avcodec_alloc_context3(dec);
+        if (!codec_ctx) {
+            timed_log(AV_LOG_ERROR, "Input #%d Failed to allocate the decoder context for "
+                      "elementary stream index #%u\n", input_index, i);
+            return AVERROR(ENOMEM);
+        }
+        ret = avcodec_parameters_to_context(codec_ctx, stream->codecpar);
+        if (ret < 0) {
+            timed_log(AV_LOG_ERROR,
+                      "Input #%d Failed to copy decoder parameters to decoder context for stream #%u\n",
+                      input_index, i);
+            return ret;
+        }
+
+        av_opt_set_int(codec_ctx, "refcounted_frames", 1, 0);
+
+
+        /* Reencode video and audio streams and only remux subtitles, data streams etc. */
+        if (codec_ctx->codec_type == AVMEDIA_TYPE_VIDEO || codec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
+            if (codec_ctx->codec_type == AVMEDIA_TYPE_VIDEO)
+                codec_ctx->framerate = av_guess_frame_rate(input_fmt_ctx, stream, NULL);
+            /* Open decoder */
+            ret = avcodec_open2(codec_ctx, dec, NULL);
+            if (ret < 0) {
+                timed_log(AV_LOG_ERROR, "Input #%d Failed to open decoder for elementary stream #%u\n",
+                          input_index, i);
+                return ret;
+            }
+
+        } else if (codec_ctx->codec_type == AVMEDIA_TYPE_UNKNOWN) {
+            timed_log(AV_LOG_FATAL, "Input #%d Elementary stream #%d is of unknown type, cannot proceed\n",
+                      input_index, i);
+            return AVERROR(EINVAL);
+
+        }
+
+        app_ctx.dec_ctx[input_index][i] = codec_ctx;
+
+    }
+
+    av_dump_format(input_fmt_ctx, 0, app_ctx.input_filenames[input_index], 0);
+
+    return 0;
+}
+
+
+static int try_to_reopen_input(int input_source_index)
+{
+    int ret;
+    while (!app_ctx.to_exit) {
+        if ((ret = open_single_input(input_source_index)) >= 0) { //
+            timed_log(AV_LOG_INFO, "Input #%d Successfull reopening\n", input_source_index);
+
+            // intentionally do not dry the output pipeline here
+            // but remain in its current state to use other realtime stream as secondary input
+            return 0;
+        }
+        av_usleep(INPUT_TRYING_INTERVAL_USEC);
+    }
+    return AVERROR(EIO);
+}
+
+// input packet maybe null in case of drying
+static int encode_frame(AVFrame *frame, int stream_index, int input_source_index)
+{
+    int ret;
+    AVCodecContext * enc_ctx = app_ctx.enc_ctx[stream_index];
+    AVPacket *output_packet;
+
+    output_packet = av_packet_alloc();
+    if (!output_packet) {
+        timed_log(AV_LOG_ERROR, "Input #%d Stream #%d could not allocate output packet\n",
+               input_source_index, stream_index);
+        return AVERROR(ENOMEM);
+    }
+
+    /* send the frame to the encoder */
+    if (frame) { // frame maybe null
+        OutputStreamStatData * st_data = &app_ctx.output_stream_data[stream_index];
+        st_data->last_pts = frame->pts;
+        if (!st_data->nb_frames)
+            st_data->first_pts = frame->pts;
+        st_data->nb_frames++;
+
+        // add calculated frame duration to input frame pts
+        if (enc_ctx->codec_type == AVMEDIA_TYPE_AUDIO && frame->sample_rate)
+            // calculate frame duration by number of audio samples
+            st_data->last_pts += av_rescale_q(frame->nb_samples, av_make_q(1, frame->sample_rate), enc_ctx->time_base);
+
+        else if (enc_ctx->codec_type == AVMEDIA_TYPE_VIDEO && st_data->nb_frames >= 2)
+            // use overall mean frame duration   (curr_pts/nb_frames-1) * nb_frames
+            st_data->last_pts = av_rescale(frame->pts - st_data->first_pts, st_data->nb_frames, st_data->nb_frames - 1);
+
+
+        timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d Send frame for encoding, pts: %3"PRId64"\n",
+               input_source_index, stream_index, frame->pts);
+    }
+
+    ret = avcodec_send_frame(enc_ctx, frame);
+    if (ret == AVERROR(EAGAIN)) {
+
+    } else if (ret < 0) {
+        timed_log(AV_LOG_ERROR, "Input #%d Error sending a frame for encoding: %s\n",
+                  input_source_index, av_err2str(ret));
+        return ret;
+    }
+
+    while (ret >= 0) {
+        ret = avcodec_receive_packet(enc_ctx, output_packet);
+        if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
+            return ret;
+        else if (ret < 0) {
+            timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error during encoding: %s\n",
+                   input_source_index, stream_index, av_err2str(ret));
+            return ret;
+        }
+
+        timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d Write output packet, pts: %"PRId64" (size=%d)\n",
+               input_source_index, stream_index, output_packet->pts, output_packet->size);
+
+        output_packet->stream_index = stream_index;
+        ret = av_interleaved_write_frame(app_ctx.output_fmt_ctx, output_packet);
+        if (ret < 0) {
+            timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error muxing packet, %s\n",
+                    input_source_index, stream_index, av_err2str(ret));
+            break;
+        }
+        av_packet_unref(output_packet);
+    }
+
+    av_packet_free(&output_packet);
+    return ret;
+}
+
+
+// packet maybe null, so need stream_index
+static int handle_received_packet(AVPacket *packet, int stream_index, int input_source_index)
+{
+    int ret = 0;
+    int64_t new_pts = 0;
+
+    AVCodecContext * dec_ctx = app_ctx.dec_ctx[input_source_index][stream_index];
+    AVFormatContext * input_fmt_ctx = app_ctx.input_fmt_ctx[input_source_index];
+
+    AVFrame *frame = av_frame_alloc();
+    if (!frame) {
+        timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Could not allocate frame\n",
+               input_source_index, stream_index);
+        return AVERROR(ENOMEM);
+    }
+
+    if (packet) {
+        timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d packet received, pts: %3"PRId64", size: %d\n",
+               input_source_index, stream_index, packet->pts, packet->size);
+    }
+
+    ret = avcodec_send_packet(dec_ctx, packet);
+    if (ret == AVERROR(EAGAIN)) {
+        // nothing to do
+    } else if (ret == AVERROR_EOF) {
+        timed_log(AV_LOG_INFO, "Input #%d Stream #%d avcodec_send_packet returned: %s\n",
+               input_source_index, stream_index, av_err2str(ret));
+
+    } else if (ret < 0) {
+        timed_log(AV_LOG_ERROR, "Input #%d Stream #%d Error while sending a packet to decoder: %s\n",
+               input_source_index, stream_index, av_err2str(ret));
+        av_frame_free(&frame);
+        return ret;
+    }
+
+    while (ret >= 0) {
+        ret = avcodec_receive_frame(dec_ctx, frame);
+        if (ret == AVERROR(EAGAIN))
+            break;
+        else if (ret == AVERROR_EOF) {
+            timed_log(AV_LOG_INFO, "Input #%d Stream #%d  avcodec_receive_frame returned: %s\n",
+                      input_source_index, stream_index, av_err2str(ret));
+            break;
+        } else if (ret < 0) {
+            timed_log(AV_LOG_ERROR, "Input #%d Stream #%d  Error while receiving a frame from decoder: %s\n",
+                      input_source_index, stream_index, av_err2str(ret));
+            av_frame_free(&frame);
+            return ret;
+        }
+
+        app_ctx.input_has_new_frame[input_source_index] = 1;
+        timed_log(AV_LOG_DEBUG, "Input #%d Set input_has_new_frame flag\n", input_source_index);
+        if (app_ctx.input_source_index == input_source_index && !pthread_mutex_trylock(&app_ctx.encoder_mutex) ) {
+
+            InputStreamStatData * in_st_data = &app_ctx.input_stream_data[stream_index];
+
+            if (in_st_data->first_pts == AV_NOPTS_VALUE) {
+                in_st_data->first_pts = frame->pts;
+                in_st_data->nb_frames = 1;
+
+            } else {
+
+                int64_t avg_delta_frame_pts = (frame->pts - in_st_data->first_pts) / (double)in_st_data->nb_frames;
+                int64_t avg_delta_frame_pts_time = av_rescale_q(avg_delta_frame_pts, dec_ctx->time_base, AV_TIME_BASE_Q);
+
+                if (in_st_data->nb_frames > 25 && dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO)
+                    timed_log(AV_LOG_DEBUG, "Input #%d stream #%d  stream fps: %0.2f,  nb_frames: %"PRId64"\n",
+                           input_source_index, stream_index,
+                           (double)1000000/avg_delta_frame_pts_time, in_st_data->nb_frames);
+                else
+                    timed_log(AV_LOG_DEBUG, "Input #%d stream #%d  nb_frames: %"PRId64"\n",
+                           input_source_index, stream_index, in_st_data->nb_frames);
+
+                in_st_data->nb_frames ++;
+            }
+
+            new_pts = av_rescale_q_rnd(frame->pts - in_st_data->first_pts,
+                                       input_fmt_ctx->streams[stream_index]->time_base,
+                                       app_ctx.output_fmt_ctx->streams[stream_index]->time_base,
+                                       AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
+            new_pts += app_ctx.output_stream_data[stream_index].pts_delta;
+
+            timed_log(AV_LOG_DEBUG, "Input #%d Stream #%d frame received and sending for encoding, "
+                      "pts: %"PRId64" => %"PRId64"\n", input_source_index,
+                      stream_index, frame->pts, new_pts);
+
+            frame->pts = new_pts;
+
+            ret = encode_frame(frame, stream_index, input_source_index);
+            if (ret < 0 && ret != AVERROR(EAGAIN)) {
+                app_ctx.to_exit = 1;
+                timed_log(AV_LOG_INFO, "encoding terminating\n");
+            }
+            pthread_mutex_unlock(&app_ctx.encoder_mutex);
+
+        } else
+            ret = 0;
+
+        av_frame_unref(frame);
+    }
+
+    av_frame_free(&frame);
+
+    return ret;
+}
+
+
+static void print_usage(const char * program_name)
+{
+    av_log(NULL, AV_LOG_ERROR, "usage: %s -im <primary/main input> [-ifm <format name of primary input>] "
+              "-is <secondary input> [-ifs <format name of secondary input>] "
+              "-o <output> [-of <output format name>] "
+              "[-timeout <input msec>] [-loglevel info|debug|warning|error] "
+              "[-dsc <input disconitnuity threshold usec>]\n", program_name);
+}
+
+
+static int read_parameters(int argc, char **argv)
+{
+    int i;
+
+    for (i = 1; i < argc; i++) {
+        if (!strcmp(argv[i], "-im") && i+1 < argc) {
+            app_ctx.input_filenames[MAIN_INPUT_INDEX] = argv[++i];
+
+        } else if (!strcmp(argv[i], "-ifm") && i+1 < argc) {
+            app_ctx.input_format_names[MAIN_INPUT_INDEX] = argv[++i];
+
+        } else if (!strcmp(argv[i], "-is") && i+1 < argc) {
+            app_ctx.input_filenames[SECOND_INPUT_INDEX] = argv[++i];
+
+        } else if (!strcmp(argv[i], "-ifs") && i+1 < argc) {
+            app_ctx.input_format_names[SECOND_INPUT_INDEX] = argv[++i];
+
+        } else if (!strcmp(argv[i], "-o") && i+1 < argc) {
+            app_ctx.output_filename = argv[++i];
+
+        } else if (!strcmp(argv[i], "-of") && i+1 < argc) {
+            app_ctx.output_format_name = argv[++i];
+
+        } else if (!strcmp(argv[i], "-loglevel") && i+1 < argc) {
+            i++;
+            if (!strcmp(argv[i], "info")) {
+                av_log_set_level(AV_LOG_INFO);
+            } else if (!strcmp(argv[i], "error")) {
+                av_log_set_level(AV_LOG_ERROR);
+            } else if (!strcmp(argv[i], "warning")) {
+                av_log_set_level(AV_LOG_WARNING);
+            } else if (!strcmp(argv[i], "debug")) {
+                av_log_set_level(AV_LOG_DEBUG);
+            } else {
+                timed_log(AV_LOG_ERROR,
+                       "Unexpected loglevel value: %s\n", argv[i]);
+                return AVERROR(EINVAL);
+            }
+
+
+        } else if (!strcmp(argv[i], "-timeout") && i+1 < argc) {
+            char * tail = NULL;
+            app_ctx.input_timeout_ms = strtoll(argv[++i], &tail, 10);
+            if (*tail || app_ctx.input_timeout_ms < 1) {
+                timed_log(AV_LOG_ERROR,
+                       "Invalid or negative value '%s' for input timeout checking interval\n", argv[i]);
+                return AVERROR(EINVAL);
+            }
+
+        } else if (!strcmp(argv[i], "-dsc") && i+1 < argc) {
+            char * tail = NULL;
+            app_ctx.input_stream_time_discnt_thrshd_us = strtoll(argv[++i], &tail, 10);
+            if (*tail || app_ctx.input_timeout_ms < 1) {
+                timed_log(AV_LOG_ERROR,
+                       "Invalid or negative value '%s' for input time discontinuity interval\n", argv[i]);
+                return AVERROR(EINVAL);
+            }
+
+        } else {
+            timed_log(AV_LOG_ERROR, "unknown option, or missing parameter: %s\n", argv[i]);
+            print_usage(argv[0]);
+            return AVERROR(EINVAL);
+        }
+    }
+
+    if (!app_ctx.input_filenames[MAIN_INPUT_INDEX] ||
+        !app_ctx.input_filenames[SECOND_INPUT_INDEX] ||
+        !app_ctx.output_filename) {
+        print_usage(argv[0]);
+        return AVERROR(EINVAL);
+    }
+
+    return 0;
+}
+
+
+static int check_input_streams_matching(void)
+{
+    int i;
+
+    if (app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams != app_ctx.input_fmt_ctx[SECOND_INPUT_INDEX]->nb_streams) {
+        timed_log(AV_LOG_ERROR, "First input has #%d streams but secondary input has #%d streams, "
+                  "but stream numbers should be matching, so aborting\n",
+                  app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams,
+                  app_ctx.input_fmt_ctx[SECOND_INPUT_INDEX]->nb_streams);
+        return AVERROR(EINVAL);
+    }
+
+    for (i = 0; i < app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams; i++) {
+        AVCodecContext * main_dec_ctx = app_ctx.dec_ctx[MAIN_INPUT_INDEX][i];
+        AVCodecContext * second_dec_ctx = app_ctx.dec_ctx[SECOND_INPUT_INDEX][i];
+
+        if (main_dec_ctx->codec_type != second_dec_ctx->codec_type) {
+            timed_log(AV_LOG_ERROR, "Mismatching stream types at #%d elementary stream, aborting\n", i);
+            return AVERROR(EINVAL);
+        }
+
+        if (main_dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) {
+            if (main_dec_ctx->width != second_dec_ctx->width) {
+                timed_log(AV_LOG_ERROR, "at stream #%d video width mismatch: %d != %d\n", i,
+                       main_dec_ctx->width, second_dec_ctx->width);
+                return AVERROR(EINVAL);
+            }
+            if (main_dec_ctx->height != second_dec_ctx->height) {
+                timed_log(AV_LOG_ERROR, "at stream #%d video height mismatch: %d != %d\n", i,
+                       main_dec_ctx->height, second_dec_ctx->height);
+                return AVERROR(EINVAL);
+            }
+            if (main_dec_ctx->pix_fmt != second_dec_ctx->pix_fmt) {
+                timed_log(AV_LOG_ERROR, "at stream #%d video pix_fmt mismatch: %d != %d\n", i,
+                       main_dec_ctx->pix_fmt, second_dec_ctx->pix_fmt);
+                return AVERROR(EINVAL);
+            }
+            // TODO: check more video parameters
+        }
+        if (main_dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
+            if (main_dec_ctx->channels != second_dec_ctx->channels) {
+                timed_log(AV_LOG_ERROR, "at stream #%d audio channel number mismatch: %d != %d\n", i,
+                       main_dec_ctx->channels, second_dec_ctx->channels);
+                return AVERROR(EINVAL);
+            }
+            if (main_dec_ctx->channel_layout != second_dec_ctx->channel_layout) {
+                timed_log(AV_LOG_ERROR, "at stream #%d audio channel layout mismatch: %"PRId64" != %"PRId64"\n",
+                       i, main_dec_ctx->channel_layout, second_dec_ctx->channel_layout);
+                return AVERROR(EINVAL);
+            }
+            if (main_dec_ctx->sample_rate != second_dec_ctx->sample_rate) {
+                timed_log(AV_LOG_ERROR, "at stream #%d audio sample rate mismatch: %d != %d\n", i,
+                       main_dec_ctx->sample_rate, second_dec_ctx->sample_rate);
+                return AVERROR(EINVAL);
+            }
+
+            if (main_dec_ctx->sample_fmt != DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT) {
+                timed_log(AV_LOG_ERROR, "at elementary stream #%d audio sample format is not as expected (%d)\n",
+                       i, DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT);
+                return AVERROR(EINVAL);
+            }
+
+            if (main_dec_ctx->sample_fmt != second_dec_ctx->sample_fmt) {
+                timed_log(AV_LOG_ERROR, "at elementary stream #%d audio sample format mismatch: %d != %d\n",
+                       i, main_dec_ctx->sample_fmt, second_dec_ctx->sample_fmt);
+                return AVERROR(EINVAL);
+            }
+
+            // TODO: check more audio parameters
+        }
+    }
+
+    return 0;
+}
+
+
+
+static int allocate_arrays(void)
+{
+
+    int nb_streams = app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX]->nb_streams;
+
+    app_ctx.enc_ctx = av_mallocz_array(nb_streams, sizeof(*app_ctx.enc_ctx));
+    if (!app_ctx.enc_ctx) {
+        timed_log(AV_LOG_ERROR,"Could not allocate encoder context list\n");
+        return AVERROR(ENOMEM);
+    }
+
+    app_ctx.input_stream_data = av_mallocz_array(nb_streams, sizeof(*app_ctx.input_stream_data));
+    if (!app_ctx.input_stream_data) {
+        timed_log(AV_LOG_ERROR,"Could not allocate input_stream_data list\n");
+        return AVERROR(ENOMEM);
+    }
+
+    app_ctx.output_stream_data = av_mallocz_array(nb_streams, sizeof(*app_ctx.output_stream_data));
+    if (!app_ctx.output_stream_data) {
+        timed_log(AV_LOG_ERROR,"Could not allocate input_stream_data list\n");
+        return AVERROR(ENOMEM);
+    }
+
+    return 0;
+}
+
+
+static int open_output (void)
+{
+    int i, ret;
+    AVDictionary * output_options = NULL;
+    AVOutputFormat * output_format = NULL;
+
+    AVStream * out_stream;
+    AVStream * in_stream;
+    AVCodecContext * dec_ctx = NULL, * enc_ctx = NULL;
+    AVCodec * output_video_codec, * output_audio_codec;
+    AVFormatContext * input_fmt_ctx = app_ctx.input_fmt_ctx[MAIN_INPUT_INDEX];
+
+
+    if (app_ctx.output_format_name) {
+        if (!(output_format = av_guess_format(app_ctx.output_format_name, NULL, NULL))) {
+            timed_log(AV_LOG_ERROR, "Unknown output format: '%s'\n", app_ctx.output_format_name);
+            return AVERROR(EINVAL);
+        }
+    }
+
+    // allocate the output media context
+    ret = avformat_alloc_output_context2(&app_ctx.output_fmt_ctx, output_format, NULL,
+                                         app_ctx.output_filename);
+    if (ret < 0 || !app_ctx.output_fmt_ctx) {
+        timed_log(AV_LOG_ERROR,"Could not deduce output format for %s.\n", app_ctx.output_filename);
+        return AVERROR(EINVAL);
+    }
+
+    if ((ret = allocate_arrays()) < 0)
+        return ret;
+
+    // find the video encoder for output
+    output_video_codec = avcodec_find_encoder_by_name(output_video_codec_name);
+    if (!output_video_codec) {
+        timed_log(AV_LOG_ERROR, "Output video codec '%s' not found\n", output_video_codec_name);
+        return AVERROR_ENCODER_NOT_FOUND;
+    }
+
+    // find the audio encoder for output
+    output_audio_codec = avcodec_find_encoder_by_name(output_audio_codec_name);
+    if (!output_audio_codec) {
+        timed_log(AV_LOG_ERROR, "Output audio codec '%s' not found\n", output_audio_codec_name);
+        return AVERROR_ENCODER_NOT_FOUND;
+    }
+
+
+    // creating encoding context for each input stream based on main input format
+    for (i = 0; i < input_fmt_ctx->nb_streams; i++) {
+
+        app_ctx.input_stream_data[i].first_pts = AV_NOPTS_VALUE;
+        app_ctx.output_stream_data[i].first_pts = AV_NOPTS_VALUE;
+        app_ctx.output_stream_data[i].last_pts = AV_NOPTS_VALUE;
+        app_ctx.output_stream_data[i].pts_delta = 0;
+        app_ctx.output_stream_data[i].nb_frames = 0;
+
+        in_stream = input_fmt_ctx->streams[i];
+        dec_ctx = app_ctx.dec_ctx[MAIN_INPUT_INDEX][i]; // based on main input
+
+        out_stream = avformat_new_stream(app_ctx.output_fmt_ctx, NULL);
+        if (!out_stream) {
+            timed_log(AV_LOG_ERROR, "Failed allocating output stream\n");
+            return AVERROR_UNKNOWN;
+        }
+
+        enc_ctx = NULL;
+
+        if (dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO) {
+
+            // create the context for video encoder
+            enc_ctx = avcodec_alloc_context3(output_video_codec);
+            if (!enc_ctx) {
+                timed_log(AV_LOG_ERROR, "Could not allocate output video codec context\n");
+                return AVERROR(EINVAL);
+            }
+
+            enc_ctx->height = dec_ctx->height;
+            enc_ctx->width = dec_ctx->width;
+            enc_ctx->sample_aspect_ratio = dec_ctx->sample_aspect_ratio;
+            enc_ctx->pix_fmt = dec_ctx->pix_fmt;
+            // TODO: check wheter pix_format included in output_video_codec->pix_fmts,
+            //       supported format list of video codec
+
+            enc_ctx->time_base = av_inv_q(dec_ctx->framerate);
+            enc_ctx->gop_size = 0;  // intra only, but it is useless in case of rawvideo
+
+            av_opt_set_int(enc_ctx, "refcounted_frames", 1, 0);
+
+            ret = avcodec_open2(enc_ctx, output_video_codec, NULL);
+            if (ret < 0) {
+                timed_log(AV_LOG_ERROR,  "Could not open output video codec: %s\n", av_err2str(ret));
+                return ret;
+            }
+
+        } else if (dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
+
+            // create the context for audio encoder
+            enc_ctx = avcodec_alloc_context3(output_audio_codec);
+            if (!enc_ctx) {
+                timed_log(AV_LOG_ERROR,  "Could not allocate output audio codec context\n");
+                return AVERROR(EINVAL);
+            }
+
+            enc_ctx->sample_rate = dec_ctx->sample_rate;
+            enc_ctx->channel_layout = dec_ctx->channel_layout;
+            enc_ctx->channels = dec_ctx->channels;
+            // TODO: check by av_get_channel_layout_nb_channels(enc_ctx->channel_layout);
+
+            enc_ctx->sample_fmt = DEFAULT_EXPECTED_AUDIO_SAMPLE_FORMAT; // encoder->sample_fmts[0];
+            enc_ctx->time_base = (AVRational){1, enc_ctx->sample_rate};
+
+            ret = avcodec_open2(enc_ctx, output_audio_codec, NULL);
+            if (ret < 0) {
+                timed_log(AV_LOG_ERROR,  "Could not open output audio codec: %s\n", av_err2str(ret));
+                return ret;
+            }
+
+        }
+
+        if (dec_ctx->codec_type == AVMEDIA_TYPE_VIDEO || dec_ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
+
+            ret = avcodec_parameters_from_context(out_stream->codecpar, enc_ctx);
+            if (ret < 0) {
+                timed_log(AV_LOG_ERROR, "Failed to copy encoder parameters to output stream #%u\n", i);
+                return ret;
+            }
+            if (app_ctx.output_fmt_ctx->oformat->flags & AVFMT_GLOBALHEADER)
+                enc_ctx->flags |= AV_CODEC_FLAG_GLOBAL_HEADER;
+
+            out_stream->time_base = enc_ctx->time_base; // hint for the muxer
+            app_ctx.enc_ctx[i] = enc_ctx;
+
+        } else if (dec_ctx->codec_type == AVMEDIA_TYPE_UNKNOWN) {
+                timed_log(AV_LOG_FATAL, "Elementary stream #%d is of unknown type, cannot proceed\n", i);
+                return AVERROR_INVALIDDATA;
+
+        } else {
+                // this stream will be remuxed only
+                ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar);
+                if (ret < 0) {
+                    timed_log(AV_LOG_ERROR, "Copying codec parameters for elementary stream #%u failed\n", i);
+                    return ret;
+                }
+                out_stream->time_base = in_stream->time_base;
+        }
+
+        app_ctx.enc_ctx[i] = enc_ctx;
+
+    }
+
+
+    av_dump_format(app_ctx.output_fmt_ctx, 0, app_ctx.output_filename, 1);
+
+
+    // open the output file, if needed by the format
+    if (!(app_ctx.output_fmt_ctx->oformat->flags & AVFMT_NOFILE)) {
+        ret = avio_open2(&app_ctx.output_fmt_ctx->pb, app_ctx.output_filename,
+                         AVIO_FLAG_WRITE, NULL, &output_options);
+        if (ret < 0) {
+            timed_log(AV_LOG_ERROR, "Could not open '%s': %s\n",
+                   app_ctx.output_filename, av_err2str(ret));
+            return ret;
+        }
+    }
+
+
+    // Write the stream header, if any
+    ret = avformat_write_header(app_ctx.output_fmt_ctx, &output_options);
+    if (ret < 0) {
+        timed_log(AV_LOG_ERROR, "Error occurred when opening output file: %s\n", av_err2str(ret));
+        return ret;
+    }
+
+    return 0;
+}
+
+
+static int calculate_new_ts_delta_values(void)
+{
+    int i;
+    int64_t max_last_pts = AV_NOPTS_VALUE;
+    int max_index = -1;
+
+    // find the max last_pts, this will be the old output duration
+    for(i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) {
+        if (app_ctx.output_stream_data[i].last_pts == AV_NOPTS_VALUE)
+            continue;
+
+        if (max_index == -1) {
+            max_index = i;
+            continue;
+        }
+
+        if (av_compare_ts(app_ctx.output_stream_data[i].last_pts,
+                          app_ctx.output_fmt_ctx->streams[i]->time_base,
+                          app_ctx.output_stream_data[max_index].last_pts,
+                          app_ctx.output_fmt_ctx->streams[max_index]->time_base) > 0)
+            max_index = i;
+    }
+
+    if (max_index == -1) {
+        timed_log(AV_LOG_ERROR, "could not calculate new max pts\n");
+        return AVERROR(EINVAL);
+    }
+
+    // save here because we will clear somewhere in the next for loop
+    max_last_pts = app_ctx.output_stream_data[max_index].last_pts;
+
+    // calculate new delta by adding the max and then rescaling to new input time base
+    for(i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++) {
+        app_ctx.output_stream_data[i].pts_delta = av_rescale_q_rnd(max_last_pts,
+                                                                   app_ctx.output_fmt_ctx->streams[max_index]->time_base,
+                                                                   app_ctx.output_fmt_ctx->streams[i]->time_base,
+                                                                   AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX);
+        app_ctx.input_stream_data[i].first_pts = AV_NOPTS_VALUE;
+    }
+
+    return 0;
+}
+
+
+static int dry_current_input_pipeline(int input_source_index)
+{
+    int i, ret;
+
+    for (i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++)
+        if ((ret = handle_received_packet(NULL, i, input_source_index)) < 0)
+            if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+                timed_log(AV_LOG_WARNING, "Input #%d stream # %d problem on drying the pipeline: %s/n",
+                       input_source_index, i, av_err2str(ret));
+
+    return 0;
+}
+
+
+static int handle_input(int input_source_index)
+{
+   int i, ret, eof_input = 0, error_input = 0, input_reopen_counter = 0;
+   AVPacket input_packet;
+   int64_t dts_delta_time = 0;
+
+   timed_log(AV_LOG_INFO, "Input #%d thread started\n", input_source_index);
+   while (!app_ctx.to_exit) {  // almost for ever
+       int to_set_dts_delta_time = 1;
+
+       // read  packets continouosly from input
+       while (!app_ctx.to_exit) {
+           ret = av_read_frame(app_ctx.input_fmt_ctx[input_source_index], &input_packet);
+
+           if (ret < 0) {
+               if (ret == AVERROR_EOF) {
+                   eof_input = 1;
+                   timed_log(AV_LOG_INFO, "input #%d eof detected by av_read_frame\n",
+                             input_source_index);
+               } else {
+                   error_input = 1;
+                   timed_log(AV_LOG_ERROR, "input #%d av_read_frame returned: %s\n",
+                             input_source_index, av_err2str(ret));
+               }
+               break;
+           }
+
+           if (input_packet.stream_index >= app_ctx.input_fmt_ctx[input_source_index]->nb_streams)
+               timed_log(AV_LOG_WARNING, "Input #%d unexpected stream index: %d\n",
+                         input_source_index, input_packet.stream_index);
+
+           else {
+               // ensuring realtime processing
+               if (input_packet.dts != AV_NOPTS_VALUE) {
+
+                   int64_t dts_time = av_rescale_q(input_packet.dts,
+                                                   app_ctx.input_fmt_ctx[input_source_index]->streams[input_packet.stream_index]->time_base,
+                                                   AV_TIME_BASE_Q);
+                   int64_t now_us = av_gettime_relative();
+                   int64_t sleep_us = dts_time - now_us + dts_delta_time;
+
+                   if (to_set_dts_delta_time) {
+                       to_set_dts_delta_time = 0;
+                       dts_delta_time = now_us - dts_time;
+                       sleep_us = 0;
+                   }
+
+                   if (abs(sleep_us) > app_ctx.input_stream_time_discnt_thrshd_us) {
+                       timed_log(AV_LOG_INFO,
+                              "Input #%d time discontinuity detected: %"PRIi64"us (limit: %dus), packet wallclock timestamp: %"PRIi64
+                              ", delta: %"PRIi64"us\n",
+                              input_source_index, sleep_us, app_ctx.input_stream_time_discnt_thrshd_us, dts_time, dts_delta_time);
+                       sleep_us = 0;
+                       dts_delta_time = now_us - dts_time;
+                   }
+                   if (sleep_us > app_ctx.input_stream_time_discnt_thrshd_us) {
+                       timed_log(AV_LOG_WARNING, "Input %d Too long sleeping time: %"PRIi64", truncate to %d\n",
+                               input_source_index, sleep_us, app_ctx.input_stream_time_discnt_thrshd_us);
+                       sleep_us = app_ctx.input_stream_time_discnt_thrshd_us;
+                   }
+                   if (sleep_us > 0) {
+                       timed_log(AV_LOG_DEBUG, "Input #%d sleeping %"PRIi64"us to simulate realtime receiving\n",
+                              input_source_index, sleep_us);
+                       for(;sleep_us > app_ctx.input_timeout_ms * 500; sleep_us -= app_ctx.input_timeout_ms * 500)  // 500 = 1000/2
+                           av_usleep(sleep_us);
+
+                       av_usleep(sleep_us);
+                   }
+               }
+
+
+               if (app_ctx.dec_ctx[input_source_index][input_packet.stream_index]->codec_type == AVMEDIA_TYPE_VIDEO ||
+                   app_ctx.dec_ctx[input_source_index][input_packet.stream_index]->codec_type == AVMEDIA_TYPE_AUDIO) {
+
+                   if ((ret = handle_received_packet(&input_packet, input_packet.stream_index, input_source_index)) < 0)
+                       if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+                           break;
+
+
+               } else if (app_ctx.input_source_index == input_source_index && !pthread_mutex_trylock(&app_ctx.encoder_mutex) ) {
+                   app_ctx.input_has_new_frame[input_source_index] = 1;
+
+                   /* remux this frame without reencoding */
+                   av_packet_rescale_ts(&input_packet,
+                                        app_ctx.input_fmt_ctx[input_source_index]->streams[input_packet.stream_index]->time_base,
+                                        app_ctx.output_fmt_ctx->streams[input_packet.stream_index]->time_base);
+
+                   ret = av_interleaved_write_frame(app_ctx.output_fmt_ctx, &input_packet);
+
+                   pthread_mutex_unlock(&app_ctx.encoder_mutex);
+
+                   if (ret < 0) {
+                       app_ctx.to_exit = 1;
+                       break;
+                   }
+               }
+           }
+           av_packet_unref(&input_packet);
+       }
+
+
+       if (!app_ctx.to_exit && (eof_input || error_input)) {
+
+           timed_log(AV_LOG_INFO, "Gonna reopen Input #%d, ocasion: #%d\n",
+                     input_source_index, ++input_reopen_counter);
+
+           // dry current pipeline
+           dry_current_input_pipeline(input_source_index);
+
+           // close input
+           for (i = 0; i < app_ctx.output_fmt_ctx->nb_streams; i++)
+               avcodec_free_context(&app_ctx.dec_ctx[input_source_index][i]);
+
+           avformat_close_input(&app_ctx.input_fmt_ctx[input_source_index]);
+
+           eof_input = 0;
+           error_input = 0;
+
+           if (try_to_reopen_input(input_source_index) < 0) {
+               break;
+           }
+
+       }
+   }
+
+   if (!app_ctx.to_exit && eof_input) {
+       // dry current pipeline
+       dry_current_input_pipeline(input_source_index);
+   }
+
+   return 0;
+}
+
+
+
+static void *threaded_input_handler(void * arg)
+{
+   int input_source_index = *(int *)arg;
+   handle_input(input_source_index);
+   pthread_exit(NULL);
+}
+
+
+
+int main(int argc, char **argv)
+{
+    int ret, i, k;
+    int64_t last_input_check_time = 0;
+    pthread_attr_t attr;
+
+    app_ctx.start = av_gettime();
+    av_log_set_level(DEFAULT_LOG_LEVEL);
+
+    // read and check command line parameters
+    if (read_parameters(argc, argv) < 0)
+        exit(1);
+
+    avformat_network_init();
+    avfilter_register_all();
+
+    app_ctx.input_source_index = -1;  // none
+    app_ctx.to_exit = 0;
+
+    // For portability, explicitly create threads in a joinable state
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+
+    for (i = 0; i < NB_INPUTS && !app_ctx.to_exit; i++) {
+        if ((ret = open_single_input(i)) < 0)  // open input
+            goto end;
+
+        app_ctx.input_has_new_frame[i] = 0;
+        if ((ret = pthread_create(&app_ctx.input_threads[i], &attr, threaded_input_handler, (void *) &app_ctx.thread_id[i]))) {
+            timed_log(AV_LOG_ERROR, "return code from #%d pthread_create() is %d\n", i, ret);
+            goto end;
+        }
+    }
+
+    if ((ret = check_input_streams_matching()) < 0)
+        goto end;
+
+    if (open_output() < 0)
+        goto end;
+
+
+    last_input_check_time = av_gettime_relative();
+    while (!app_ctx.to_exit) {  // almost for ever
+        int64_t now_us = av_gettime_relative();
+        int64_t check_interval = now_us - last_input_check_time;
+
+        if (check_interval > app_ctx.input_timeout_ms * 1000) {
+            last_input_check_time = now_us;
+            if (app_ctx.input_source_index == MAIN_INPUT_INDEX && app_ctx.input_has_new_frame[MAIN_INPUT_INDEX]) {
+                // normal case
+                timed_log(AV_LOG_DEBUG, "Checking running main input: ok, in last %"PRIi64"us \n", check_interval);
+
+            } else if (app_ctx.input_source_index != MAIN_INPUT_INDEX && app_ctx.input_has_new_frame[MAIN_INPUT_INDEX]) {
+                if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
+                    if (app_ctx.input_source_index >= 0) {
+                        timed_log(AV_LOG_INFO, "#%d switching back to main input because new frame arrived\n",
+                                  app_ctx.input_failover_counter);
+                        calculate_new_ts_delta_values();
+                    } else
+                        timed_log(AV_LOG_INFO, "Switching to main input\n");
+                    app_ctx.input_source_index = MAIN_INPUT_INDEX;
+                    pthread_mutex_unlock(&app_ctx.encoder_mutex);
+                } else
+                    timed_log(AV_LOG_ERROR, "Could not lock encoder_mutex for input switching\n");
+
+            } else if (app_ctx.input_source_index != SECOND_INPUT_INDEX && app_ctx.input_has_new_frame[SECOND_INPUT_INDEX]) {
+                if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
+                    if (app_ctx.input_source_index >= 0) {
+                        app_ctx.input_failover_counter++;
+                        timed_log(AV_LOG_INFO, "#%d switching to second input, now new frame on Input #%d in last %"PRIi64"us\n",
+                                  app_ctx.input_failover_counter, MAIN_INPUT_INDEX, check_interval);
+                        calculate_new_ts_delta_values();
+                    } else
+                        timed_log(AV_LOG_INFO, "Switching to second input\n");
+                    app_ctx.input_source_index = SECOND_INPUT_INDEX;
+                    pthread_mutex_unlock(&app_ctx.encoder_mutex);
+                } else
+                    timed_log(AV_LOG_ERROR, "Could not lock encoder_mutex for input switching\n");
+            }
+
+
+            for (i = 0; i < NB_INPUTS; i++)
+                app_ctx.input_has_new_frame[i] = 0;
+
+        }
+        av_usleep(app_ctx.input_timeout_ms * 250); // 250 = 1000 / 4
+    }
+
+
+    if (!pthread_mutex_lock(&app_ctx.encoder_mutex)) {
+        app_ctx.input_source_index = -1;
+        pthread_mutex_unlock(&app_ctx.encoder_mutex);
+    }
+
+    av_write_trailer(app_ctx.output_fmt_ctx);
+
+
+    if (!(app_ctx.output_fmt_ctx->oformat->flags & AVFMT_NOFILE))
+        avio_closep(&app_ctx.output_fmt_ctx->pb);
+
+end:
+
+    app_ctx.to_exit = 1;
+    // wait all input thread to terminate
+    for (i = 0; i < NB_INPUTS; i++)
+       pthread_join(app_ctx.input_threads[i], NULL);
+
+
+    for (k = 0; k < NB_INPUTS; k++) {
+        for (i = 0; app_ctx.output_fmt_ctx &&
+                    i < app_ctx.output_fmt_ctx->nb_streams; i++)
+            avcodec_free_context(&app_ctx.dec_ctx[k][i]);
+
+        avformat_close_input(&app_ctx.input_fmt_ctx[k]);
+    }
+
+    for (i = 0; app_ctx.output_fmt_ctx && i < app_ctx.output_fmt_ctx->nb_streams; i++)
+        avcodec_free_context(&app_ctx.enc_ctx[i]);
+
+    avformat_close_input(&app_ctx.output_fmt_ctx);
+    avformat_free_context(app_ctx.output_fmt_ctx);
+
+    avformat_network_deinit();
+
+
+    pthread_mutex_destroy(&app_ctx.encoder_mutex);
+
+    exit(0);
+}
-- 
2.5.3.windows.1



More information about the ffmpeg-devel mailing list