[FFmpeg-devel] [PATCH] avformat/libsrt: data transmission bitrate control
Bartsevich, Dmitry
Bartsevich at scnsoft.com
Wed Sep 8 14:37:52 EEST 2021
The patch adds 3 parameters ("bitrate", "burst_bits", "fifo_size") and output bitrate control to the libsrt muxer. The code is mostly taken from udp.c and the reasoning is the same: data transmission bursts cause decoding errors on some decoders. Windows-specific APIs (performance counters and waitable timers) are used instead of standard FFmpeg routines to measure time intervals and to delay thread execution in Windows build: standard ones don't provide sub-millisecond precision and accuracy which are required for smooth outbound traffic.
Muxer URL would look like this:
"srt://10.10.10.10:12345?mode=caller&bitrate=15000000&burst_bits=150000"
Signed-off-by: Dmitry Bartsevich <bartsevich at scnsoft.com>
---
libavformat/libsrt.c | 275 +++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 275 insertions(+)
diff --git a/libavformat/libsrt.c b/libavformat/libsrt.c
index e5701625b8..0a00277d80 100644
--- a/libavformat/libsrt.c
+++ b/libavformat/libsrt.c
@@ -23,6 +23,9 @@
#include <srt/srt.h>
+#include "libavutil/avassert.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
#include "libavutil/opt.h"
#include "libavutil/parseutils.h"
#include "libavutil/time.h"
@@ -33,6 +36,15 @@
#include "os_support.h"
#include "url.h"
+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+#include "libavutil/thread.h"
+#endif
+
/* This is for MPEG-TS and it's a default SRTO_PAYLOADSIZE for SRTT_LIVE (8 TS packets) */
#ifndef SRT_LIVE_DEFAULT_PAYLOAD_SIZE
#define SRT_LIVE_DEFAULT_PAYLOAD_SIZE 1316
@@ -90,6 +102,21 @@ typedef struct SRTContext {
SRT_TRANSTYPE transtype;
int linger;
int tsbpd;
+
+ /* Circular Buffer variables for use in SRT sending code */
+ int circular_buffer_size;
+ AVFifoBuffer *fifo;
+ int circular_buffer_error;
+ int64_t bitrate; /* number of bits to send per second */
+ int64_t burst_bits;
+ int close_req;
+#if HAVE_PTHREAD_CANCEL
+ pthread_t circular_buffer_thread;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int thread_started;
+#endif
+ uint8_t tmp[SRT_LIVE_MAX_PAYLOAD_SIZE+4];
} SRTContext;
#define D AV_OPT_FLAG_DECODING_PARAM
@@ -142,6 +169,9 @@ static const AVOption libsrt_options[] = {
{ "file", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = SRTT_FILE }, INT_MIN, INT_MAX, .flags = D|E, "transtype" },
{ "linger", "Number of seconds that the socket waits for unsent data when closing", OFFSET(linger), AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
{ "tsbpd", "Timestamp-based packet delivery", OFFSET(tsbpd), AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags = D|E },
+ { "bitrate", "Bits to send per second", OFFSET(bitrate), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
+ { "burst_bits", "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = E },
+ { "fifo_size", "set the SRT sending circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, .flags = E },
{ NULL }
};
@@ -165,6 +195,138 @@ static int libsrt_socket_nonblock(int socket, int enable)
return srt_setsockopt(socket, 0, SRTO_RCVSYN, &blocking, sizeof(blocking));
}
+#if HAVE_PTHREAD_CANCEL
+
+/* More precise time measurement in Windows, call default routine otherwise */
+static int64_t av_gettime_relative_precise(void)
+{
+#ifdef _WIN32
+ static LARGE_INTEGER freq;
+ LARGE_INTEGER t;
+
+ if (freq.QuadPart == 0) {
+ QueryPerformanceFrequency(&freq);
+ }
+
+ QueryPerformanceCounter(&t);
+ return t.QuadPart * 1000000 / freq.QuadPart;
+#else
+ return av_gettime_relative();
+#endif
+}
+
+static void *circular_buffer_task_tx(void *_URLContext)
+{
+ URLContext *h = _URLContext;
+ SRTContext *s = h->priv_data;
+ int old_cancelstate;
+ int64_t target_timestamp = av_gettime_relative_precise();
+ int64_t start_timestamp = av_gettime_relative_precise();
+ int64_t sent_bits = 0;
+ int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
+ int64_t max_delay = s->bitrate ? ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
+#ifdef _WIN32
+ /* Use waitable timers to delay thread execution in Windows, as default
+ Sleep() API call has 1ms resolution and is not accurate enough */
+ LARGE_INTEGER timeout;
+ HANDLE waitable_timer = CreateWaitableTimer(NULL, FALSE, NULL);
+#endif
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+
+ if (libsrt_socket_nonblock(s->fd, 0) < 0) {
+ av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+ s->circular_buffer_error = AVERROR(EIO);
+ goto end;
+ }
+
+ for(;;) {
+ int len;
+ const uint8_t *p;
+ uint8_t tmp[4];
+ int64_t timestamp;
+
+ len = av_fifo_size(s->fifo);
+
+ while (len<4) {
+ if (s->close_req)
+ goto end;
+ pthread_cond_wait(&s->cond, &s->mutex);
+ len = av_fifo_size(s->fifo);
+ }
+
+ av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+ len = AV_RL32(tmp);
+
+ av_assert0(len >= 0);
+ av_assert0(len <= sizeof(s->tmp));
+
+ av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
+
+ pthread_mutex_unlock(&s->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+ if (s->bitrate) {
+ timestamp = av_gettime_relative_precise();
+ if (timestamp < target_timestamp) {
+ int64_t delay = target_timestamp - timestamp;
+ if (delay > max_delay) {
+ delay = max_delay;
+ start_timestamp = timestamp + delay;
+ sent_bits = 0;
+ }
+#ifdef _WIN32
+ /* Relative waitable timer delay in 100-nanosecond units */
+ timeout.QuadPart = -delay * 10;
+ if (SetWaitableTimer(waitable_timer, &timeout, NULL, NULL, NULL, FALSE))
+ WaitForSingleObject(waitable_timer, INFINITE);
+#else
+ av_usleep(delay);
+#endif
+ } else {
+ if (timestamp - burst_interval > target_timestamp) {
+ start_timestamp = timestamp - burst_interval;
+ sent_bits = 0;
+ }
+ }
+ sent_bits += len * 8;
+ target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
+ }
+
+ p = s->tmp;
+ while (len) {
+ int ret;
+ av_assert0(len > 0);
+ ret = srt_sendmsg(s->fd, p, len, -1, 0);
+ if (ret >= 0) {
+ len -= ret;
+ p += ret;
+ } else {
+ ret = ff_neterrno();
+ if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
+ pthread_mutex_lock(&s->mutex);
+ s->circular_buffer_error = ret;
+ pthread_mutex_unlock(&s->mutex);
+ return NULL;
+ }
+ }
+ }
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+ pthread_mutex_lock(&s->mutex);
+ }
+
+end:
+ pthread_mutex_unlock(&s->mutex);
+#ifdef _WIN32
+ CloseHandle(waitable_timer);
+#endif
+ return NULL;
+}
+
+#endif
+
static int libsrt_epoll_create(URLContext *h, int fd, int write)
{
int modes = SRT_EPOLL_ERR | (write ? SRT_EPOLL_OUT : SRT_EPOLL_IN);
@@ -379,6 +541,7 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
char portstr[10];
int64_t open_timeout = 0;
int eid, write_eid;
+ int is_output = (flags & AVIO_FLAG_WRITE) != 0;
av_url_split(proto, sizeof(proto), NULL, 0, hostname, sizeof(hostname),
&port, path, sizeof(path), uri);
@@ -490,9 +653,55 @@ static int libsrt_setup(URLContext *h, const char *uri, int flags)
s->fd = fd;
s->eid = eid;
+#if HAVE_PTHREAD_CANCEL
+ /*
+ Create thread in case of:
+ output and bitrate and circular_buffer_size is set
+ */
+
+ if (is_output && s->bitrate && !s->circular_buffer_size) {
+ /* Warn user in case of 'circular_buffer_size' is not set */
+ av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
+ }
+
+ if (is_output && s->bitrate && s->circular_buffer_size) {
+ /* start the task going */
+ s->fifo = av_fifo_alloc(s->circular_buffer_size);
+ if (!s->fifo) {
+ ret = AVERROR(ENOMEM);
+ goto fail;
+ }
+ ret = pthread_mutex_init(&s->mutex, NULL);
+ if (ret != 0) {
+ av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
+ ret = AVERROR(ret);
+ goto fail;
+ }
+ ret = pthread_cond_init(&s->cond, NULL);
+ if (ret != 0) {
+ av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+ ret = AVERROR(ret);
+ goto cond_fail;
+ }
+ ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task_tx, h);
+ if (ret != 0) {
+ av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+ ret = AVERROR(ret);
+ goto thread_fail;
+ }
+ s->thread_started = 1;
+ }
+#endif
+
freeaddrinfo(ai);
return 0;
+#if HAVE_PTHREAD_CANCEL
+ thread_fail:
+ pthread_cond_destroy(&s->cond);
+ cond_fail:
+ pthread_mutex_destroy(&s->mutex);
+#endif
fail:
if (cur_ai->ai_next) {
/* Retry with the next sockaddr */
@@ -643,7 +852,25 @@ static int libsrt_open(URLContext *h, const char *uri, int flags)
if (av_find_info_tag(buf, sizeof(buf), "linger", p)) {
s->linger = strtol(buf, NULL, 10);
}
+ if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
+ s->circular_buffer_size = strtol(buf, NULL, 10);
+ if (!HAVE_PTHREAD_CANCEL)
+ av_log(h, AV_LOG_WARNING,
+ "'circular_buffer_size' option was set but it is not supported "
+ "on this build (pthread support is required)\n");
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
+ s->bitrate = strtoll(buf, NULL, 10);
+ if (!HAVE_PTHREAD_CANCEL)
+ av_log(h, AV_LOG_WARNING,
+ "'bitrate' option was set but it is not supported "
+ "on this build (pthread support is required)\n");
+ }
+ if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
+ s->burst_bits = strtoll(buf, NULL, 10);
+ }
}
+ s->circular_buffer_size *= 188;
ret = libsrt_setup(h, uri, flags);
if (ret < 0)
goto err;
@@ -680,6 +907,36 @@ static int libsrt_write(URLContext *h, const uint8_t *buf, int size)
SRTContext *s = h->priv_data;
int ret;
+#if HAVE_PTHREAD_CANCEL
+ if (s->fifo) {
+ uint8_t tmp[4];
+
+ pthread_mutex_lock(&s->mutex);
+
+ /*
+ Return error if last tx failed.
+ Here we can't know on which packet error was, but it needs to know that error exists.
+ */
+ if (s->circular_buffer_error < 0) {
+ int err = s->circular_buffer_error;
+ pthread_mutex_unlock(&s->mutex);
+ return err;
+ }
+
+ if(av_fifo_space(s->fifo) < size + 4) {
+ /* What about a partial packet tx ? */
+ pthread_mutex_unlock(&s->mutex);
+ return AVERROR(ENOMEM);
+ }
+ AV_WL32(tmp, size);
+ av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
+ av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+ return size;
+ }
+#endif
+
if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
ret = libsrt_network_wait_fd_timeout(h, s->eid, 1, h->rw_timeout, &h->interrupt_callback);
if (ret)
@@ -698,6 +955,24 @@ static int libsrt_close(URLContext *h)
{
SRTContext *s = h->priv_data;
+#if HAVE_PTHREAD_CANCEL
+ // Request close once writing is finished
+ if (s->thread_started) {
+ int ret;
+
+ pthread_mutex_lock(&s->mutex);
+ s->close_req = 1;
+ pthread_cond_signal(&s->cond);
+ pthread_mutex_unlock(&s->mutex);
+
+ ret = pthread_join(s->circular_buffer_thread, NULL);
+ if (ret != 0)
+ av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
+ pthread_mutex_destroy(&s->mutex);
+ pthread_cond_destroy(&s->cond);
+ }
+#endif
+
srt_epoll_release(s->eid);
srt_close(s->fd);
--
2.16.1.windows.4
More information about the ffmpeg-devel
mailing list