[FFmpeg-devel] [PATCH 2/3] librist: allow use of circular buffer for receiving.
Gijs Peskens
gijs at peskens.net
Tue Sep 28 11:22:40 EEST 2021
libRIST internally stores packets in a fifo of 1024 packets, overwriting
old packets when not read in a sufficient pace. Unfortunately this results
in many fifo overflow errors when ffmpeg consumes a libRIST stream.
This patch creates a receiver thread based on the UDP circular buffer code.
Signed-off-by: Gijs Peskens <gijs at peskens.net>
---
libavformat/librist.c | 201 ++++++++++++++++++++++++++++++++++++++++--
1 file changed, 196 insertions(+), 5 deletions(-)
diff --git a/libavformat/librist.c b/libavformat/librist.c
index b120346f48..47c01a8432 100644
--- a/libavformat/librist.c
+++ b/libavformat/librist.c
@@ -26,6 +26,8 @@
#include "libavutil/opt.h"
#include "libavutil/parseutils.h"
#include "libavutil/time.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
#include "avformat.h"
#include "internal.h"
@@ -33,6 +35,15 @@
#include "os_support.h"
#include "url.h"
+#if HAVE_W32THREADS
+#undef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 1
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+#include "libavutil/thread.h"
+#endif
+
#include <librist/librist.h>
#include <librist/version.h>
// RIST_MAX_PACKET_SIZE - 28 minimum protocol overhead
@@ -67,6 +78,19 @@ typedef struct RISTContext {
struct rist_peer *peer;
struct rist_ctx *ctx;
+
+ int circular_buffer_size;
+ AVFifoBuffer *fifo;
+ int circular_buffer_error;
+ int overrun_nonfatal;
+
+#if HAVE_PTHREAD_CANCEL
+ pthread_t receiver_thread;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int thread_started;
+ int thread_stop;
+#endif
} RISTContext;
#define D AV_OPT_FLAG_DECODING_PARAM
@@ -82,6 +106,8 @@ static const AVOption librist_options[] = {
{ "log_level", "set loglevel", OFFSET(log_level), AV_OPT_TYPE_INT, {.i64=RIST_LOG_INFO}, -1, INT_MAX, .flags = D|E },
{ "secret", "set encryption secret",OFFSET(secret), AV_OPT_TYPE_STRING,{.str=NULL}, 0, 0, .flags = D|E },
{ "encryption","set encryption type",OFFSET(encryption), AV_OPT_TYPE_INT ,{.i64=0}, 0, INT_MAX, .flags = D|E },
+ { "fifo_size", "set the receiving circular buffer size, expressed as a number of packets with size of 188 bytes, 0 to disable", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
+ { "overrun_nonfatal", "survive in case of receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, D },
{ NULL }
};
@@ -119,6 +145,15 @@ static int librist_close(URLContext *h)
RISTContext *s = h->priv_data;
int ret = 0;
+#if HAVE_PTHREAD_CANCEL
+ if (s->thread_started) {
+ pthread_mutex_lock(&s->mutex);
+ s->thread_stop = 1;
+ pthread_mutex_unlock(&s->mutex);
+ pthread_join(s->receiver_thread, NULL);
+ }
+#endif
+ av_fifo_freep(&s->fifo);
s->peer = NULL;
if (s->ctx)
@@ -128,6 +163,78 @@ static int librist_close(URLContext *h)
return risterr2ret(ret);
}
+static void *receiver_thread(void *_url_context)
+{
+ URLContext *h = _url_context;
+ RISTContext *s = h->priv_data;
+ int ret;
+ uint8_t tmp[4];
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+ const struct rist_data_block *data_block;
+#else
+ struct rist_data_block *data_block;
+#endif
+
+ while (1)
+ {
+ pthread_mutex_lock(&s->mutex);
+ if (s->thread_stop)
+ break;
+ pthread_mutex_unlock(&s->mutex);
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+ ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME);
+#else
+ ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME);
+#endif
+ if (ret == 0)
+ continue;
+
+ pthread_mutex_lock(&s->mutex);
+ if (ret < 0) {
+ s->circular_buffer_error = ret;
+ break;
+ }
+
+ if (data_block->payload_len > MAX_PAYLOAD_SIZE) {
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+ rist_receiver_data_block_free((struct rist_data_block**)&data_block);
+#else
+ rist_receiver_data_block_free2(&data_block);
+#endif
+ s->circular_buffer_error = AVERROR_EXTERNAL;
+ break;
+ }
+ AV_WL32(tmp, data_block->payload_len);
+ if (av_fifo_space(s->fifo) < (data_block->payload_len +4))
+ {
+ /* No Space left */
+ if (s->overrun_nonfatal) {
+ av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
+ "Surviving due to overrun_nonfatal option\n");
+ continue;
+ } else {
+ av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
+ "To avoid, increase fifo_size URL option. "
+ "To survive in such case, use overrun_nonfatal option\n");
+ s->circular_buffer_error = AVERROR(EIO);
+ break;
+ }
+ }
+ av_fifo_generic_write(s->fifo, tmp, 4, NULL);
+ av_fifo_generic_write(s->fifo, (void*)data_block->payload, data_block->payload_len, NULL);
+ pthread_mutex_unlock(&s->mutex);
+ pthread_cond_signal(&s->cond);
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+ rist_receiver_data_block_free((struct rist_data_block**)&data_block);
+#else
+ rist_receiver_data_block_free2(&data_block);
+#endif
+ }
+ pthread_mutex_unlock(&s->mutex);
+ pthread_cond_signal(&s->cond);
+ return NULL;
+}
+
static int librist_open(URLContext *h, const char *uri, int flags)
{
RISTContext *s = h->priv_data;
@@ -194,27 +301,111 @@ static int librist_open(URLContext *h, const char *uri, int flags)
if (ret < 0)
goto err;
+ s->circular_buffer_size *= 188;
+
+#if HAVE_PTHREAD_CANCEL
+ //Create receiver thread if circular buffer size is set and we are receiving
+ if ((flags & AVIO_FLAG_READ) && s->circular_buffer_size > 0) {
+ /* start the task going */
+ s->fifo = av_fifo_alloc(s->circular_buffer_size);
+ if (!s->fifo) {
+ ret = AVERROR(ENOMEM);
+ goto err;
+ }
+ ret = pthread_mutex_init(&s->mutex, NULL);
+ if (ret != 0) {
+ av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
+ ret = AVERROR(ret);
+ goto err;
+ }
+ ret = pthread_cond_init(&s->cond, NULL);
+ if (ret != 0) {
+ av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+ ret = AVERROR(ret);
+ goto cond_fail;
+ }
+ ret = pthread_create(&s->receiver_thread, NULL, receiver_thread, h);
+ if (ret != 0) {
+ av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+ ret = AVERROR(ret);
+ goto thread_fail;
+ }
+ s->thread_started = 1;
+ }
+#endif
return 0;
-
+#if HAVE_PTHREAD_CANCEL
+ thread_fail:
+ pthread_cond_destroy(&s->cond);
+ cond_fail:
+ pthread_mutex_destroy(&s->mutex);
+#endif
err:
librist_close(h);
-
+ av_fifo_freep(&s->fifo);
return risterr2ret(ret);
}
static int librist_read(URLContext *h, uint8_t *buf, int size)
{
RISTContext *s = h->priv_data;
+#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
+ const struct rist_data_block *data_block;
+#else
+ struct rist_data_block *data_block;
+#endif
int ret;
+#if HAVE_PTHREAD_CANCEL
+ int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
+
+ if (s->fifo) {
+ pthread_mutex_lock(&s->mutex);
+ do {
+ avail = av_fifo_size(s->fifo);
+ if (avail) { // >=size) {
+ uint8_t tmp[4];
+
+ av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+ avail = AV_RL32(tmp);
+ if(avail > size){
+ av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
+ avail = size;
+ }
+
+ av_fifo_generic_read(s->fifo, buf, avail, NULL);
+ av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
+ pthread_mutex_unlock(&s->mutex);
+ return avail;
+ } else if(s->circular_buffer_error){
+ int err = s->circular_buffer_error;
+ pthread_mutex_unlock(&s->mutex);
+ return err;
+ } else if(nonblock) {
+ pthread_mutex_unlock(&s->mutex);
+ return AVERROR(EAGAIN);
+ } else {
+ /* FIXME: using the monotonic clock would be better,
+ but it does not exist on all supported platforms. */
+ int64_t t = av_gettime() + 100000;
+ struct timespec tv = { .tv_sec = t / 1000000,
+ .tv_nsec = (t % 1000000) * 1000 };
+ int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv);
+ if (err) {
+ pthread_mutex_unlock(&s->mutex);
+ return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
+ }
+ nonblock = 1;
+ }
+ } while(1);
+ }
+#endif
+
#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41
- const struct rist_data_block *data_block;
ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME);
#else
- struct rist_data_block *data_block;
ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME);
#endif
-
if (ret < 0)
return risterr2ret(ret);
--
2.30.2
More information about the ffmpeg-devel
mailing list