[FFmpeg-devel] [PATCH] avformat/srt: add Haivision SRT protocol

Nablet Developer sdk at nablet.com
Thu Oct 5 11:25:21 EEST 2017


protocol requires libsrt (https://github.com/Haivision/srt) to be
installed

Signed-off-by: Nablet Developer <sdk at nablet.com>
---
 configure               |    5 +
 libavformat/Makefile    |    1 +
 libavformat/protocols.c |    1 +
 libavformat/srt.c       | 1105 +++++++++++++++++++++++++++++++++++++++++++++++
 libavformat/url.h       |    4 +
 5 files changed, 1116 insertions(+)
 create mode 100644 libavformat/srt.c

diff --git a/configure b/configure
index 391c141..d955805 100755
--- a/configure
+++ b/configure
@@ -301,6 +301,7 @@ External library support:
                            on OSX if openssl and gnutls are not used [autodetect]
   --disable-xlib           disable xlib [autodetect]
   --disable-zlib           disable zlib [autodetect]
+  --enable-libsrt          enable libsrt [autodetect]
 
   The following libraries provide various hardware acceleration features:
   --disable-audiotoolbox   disable Apple AudioToolbox code [autodetect]
@@ -1638,6 +1639,7 @@ EXTERNAL_LIBRARY_LIST="
     openal
     opencl
     opengl
+    libsrt
 "
 
 HWACCEL_AUTODETECT_LIBRARY_LIST="
@@ -3170,6 +3172,8 @@ tls_securetransport_protocol_select="tcp_protocol"
 tls_protocol_deps_any="tls_schannel_protocol tls_securetransport_protocol tls_gnutls_protocol tls_openssl_protocol"
 udp_protocol_select="network"
 udplite_protocol_select="network"
+srt_protocol_select="network"
+srt_protocol_deps="libsrt"
 unix_protocol_deps="sys_un_h"
 unix_protocol_select="network"
 
@@ -6076,6 +6080,7 @@ enabled rkmpp             && { { require_pkg_config rockchip_mpp rockchip_mpp ro
                                { enabled libdrm ||
                                  die "ERROR: rkmpp requires --enable-libdrm"; }
                              }
+enabled libsrt           && require_pkg_config srt srt/srt.h srt_socket
 
 if enabled gcrypt; then
     GCRYPT_CONFIG="${cross_prefix}libgcrypt-config"
diff --git a/libavformat/Makefile b/libavformat/Makefile
index df709c29..695cf51 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -593,6 +593,7 @@ OBJS-$(CONFIG_TLS_SCHANNEL_PROTOCOL)     += tls_schannel.o tls.o
 OBJS-$(CONFIG_TLS_SECURETRANSPORT_PROTOCOL) += tls_securetransport.o tls.o
 OBJS-$(CONFIG_UDP_PROTOCOL)              += udp.o
 OBJS-$(CONFIG_UDPLITE_PROTOCOL)          += udp.o
+OBJS-$(CONFIG_SRT_PROTOCOL)              += srt.o
 OBJS-$(CONFIG_UNIX_PROTOCOL)             += unix.o
 
 # libavdevice dependencies
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index 8d3555e..8b4fded 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -62,6 +62,7 @@ extern const URLProtocol ff_tls_securetransport_protocol;
 extern const URLProtocol ff_tls_openssl_protocol;
 extern const URLProtocol ff_udp_protocol;
 extern const URLProtocol ff_udplite_protocol;
+extern const URLProtocol ff_srt_protocol;
 extern const URLProtocol ff_unix_protocol;
 extern const URLProtocol ff_librtmp_protocol;
 extern const URLProtocol ff_librtmpe_protocol;
diff --git a/libavformat/srt.c b/libavformat/srt.c
new file mode 100644
index 0000000..5950a50
--- /dev/null
+++ b/libavformat/srt.c
@@ -0,0 +1,1105 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file
+ * SRT protocol
+ */
+
+#define _DEFAULT_SOURCE
+#define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */
+
+#include "avformat.h"
+#include "avio_internal.h"
+#include "libavutil/avassert.h"
+#include "libavutil/parseutils.h"
+#include "libavutil/fifo.h"
+#include "libavutil/intreadwrite.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+#include "libavutil/log.h"
+#include "libavutil/time.h"
+#include "internal.h"
+#include "network.h"
+#include "os_support.h"
+#include "url.h"
+
+#ifdef __APPLE__
+#include "TargetConditionals.h"
+#endif
+
+#include <srt/srt.h>
+#include <srt/logging_api.h>
+
+
+#if HAVE_PTHREAD_CANCEL
+#include <pthread.h>
+#endif
+
+#ifndef HAVE_PTHREAD_CANCEL
+#define HAVE_PTHREAD_CANCEL 0
+#endif
+
+#ifndef IPV6_ADD_MEMBERSHIP
+#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
+#define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
+#endif
+
+#define UDP_TX_BUF_SIZE 32768
+#define UDP_MAX_PKT_SIZE 65536
+#define UDP_HEADER_SIZE 8
+
+typedef struct SRTContext {
+    const AVClass *class;
+    SRTSOCKET srt_fd;
+    int buffer_size;
+    int pkt_size;
+    int local_port;
+    int reuse_socket;
+    int overrun_nonfatal;
+    struct sockaddr_storage dest_addr;
+    int dest_addr_len;
+    int is_connected;
+
+    /* Circular Buffer variables for use in UDP receive code */
+    int circular_buffer_size;
+    AVFifoBuffer *fifo;
+    int circular_buffer_error;
+    int64_t bitrate; /* number of bits to send per second */
+    int64_t burst_bits;
+    int close_req;
+#if HAVE_PTHREAD_CANCEL
+    pthread_t circular_buffer_thread;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    int thread_started;
+#endif
+    uint8_t tmp[UDP_MAX_PKT_SIZE+4];
+    int remaining_in_dg;
+    char *localaddr;
+    int timeout;
+    const char *sources;
+    char *block;
+
+    /* SRT socket options (srt/srt.h) */
+    int64_t maxbw;
+    int pbkeylen;
+    char passphrase[65];
+    int mss;
+    int fc;
+    int ipttl;
+    int iptos;
+    int64_t inputbw;
+    int64_t oheadbw;
+    int tsbpddelay;
+    int tlpktdrop;
+    int nakreport;
+    int conntimeo;
+    int mode;
+} SRTContext;
+
+#define SRT_MODE_CALLER 0
+#define SRT_MODE_LISTENER 1
+#define SRT_MODE_RENDEZVOUS 2
+
+#define OFFSET(x) offsetof(SRTContext, x)
+#define D AV_OPT_FLAG_DECODING_PARAM
+#define E AV_OPT_FLAG_ENCODING_PARAM
+static const AVOption options[] = {
+    { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
+    { "bitrate",        "Bits to send per second",                         OFFSET(bitrate),        AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
+    { "burst_bits",     "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits),   AV_OPT_TYPE_INT64,  { .i64 = 0  },     0, INT64_MAX, .flags = E },
+    { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
+    { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
+    { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
+    { "pkt_size",       "Maximum UDP packet size",                         OFFSET(pkt_size),       AV_OPT_TYPE_INT,    { .i64 = 1472 },  -1, INT_MAX, .flags = D|E },
+    { "reuse",          "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       D|E },
+    { "reuse_socket",   "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_BOOL,   { .i64 = -1 },    -1, 1,       .flags = D|E },
+    { "fifo_size",      "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
+    { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,    D },
+    { "timeout",        "set raise error timeout (only in read mode)",     OFFSET(timeout),        AV_OPT_TYPE_INT,    { .i64 = 0 },      0, INT_MAX, D },
+    { "sources",        "Source list",                                     OFFSET(sources),        AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
+    { "block",          "Block list",                                      OFFSET(block),          AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
+
+    /* SRT socket options (srt/srt.h), see srt/common/socketoptions.hpp */
+    { "maxbw",          "maximum bandwidth (bytes per second) that the connection can use",            OFFSET(maxbw),      AV_OPT_TYPE_INT64,  { .i64 = -1 }, -1, INT64_MAX, .flags = D|E },
+    { "pbkeylen",       "Crypto key len in bytes {16,24,32} Default: 16 (128-bit)",                    OFFSET(pbkeylen),   AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, 32,        .flags = D|E },
+    { "passphrase",     "Crypto PBKDF2 Passphrase size[0,10..64] 0:disable crypto",                    OFFSET(passphrase), AV_OPT_TYPE_STRING, { .str = NULL },              .flags = D|E },
+    { "mss",            "the Maximum Transfer Unit",                                                   OFFSET(mss),        AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
+    { "fc",             "Flight flag size (window size)",                                              OFFSET(fc),         AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
+    { "ipttl",          "IP Time To Live",                                                             OFFSET(ipttl),      AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
+    { "iptos",          "IP Type of Service",                                                          OFFSET(iptos),      AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
+    { "inputbw",        "Estimated input stream rate",                                                 OFFSET(inputbw),    AV_OPT_TYPE_INT64,  { .i64 = -1 }, -1, INT64_MAX, .flags = D|E },
+    { "oheadbw",        "MaxBW ceiling based on % over input stream rate",                             OFFSET(oheadbw),    AV_OPT_TYPE_INT64,  { .i64 = -1 }, -1, INT64_MAX, .flags = D|E },
+    { "tsbpddelay",     "TsbPd receiver delay (mSec) to absorb burst of missed packet retransmission", OFFSET(tsbpddelay), AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, INT_MAX,   .flags = D|E },
+    { "tlpktdrop",      "Enable receiver pkt drop",                                                    OFFSET(tlpktdrop),  AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, 1,         .flags = D|E },
+    { "nakreport",      "Enable receiver to send periodic NAK reports",                                OFFSET(nakreport),  AV_OPT_TYPE_INT,    { .i64 = -1 }, -1, 1,         .flags = D|E },
+    { "conntimeo",      "Connect timeout in msec. Ccaller default: 3000, rendezvous (x 10)",           OFFSET(conntimeo),  AV_OPT_TYPE_INT64,  { .i64 = -1 }, -1, INT64_MAX, .flags = D|E },
+    { "mode",           "connection mode (caller, listener, rendezvous)",                              OFFSET(mode),       AV_OPT_TYPE_INT,    { .i64 = SRT_MODE_CALLER }, SRT_MODE_CALLER, SRT_MODE_RENDEZVOUS, .flags = D|E },
+    { "caller",         NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRT_MODE_CALLER },     INT_MIN, INT_MAX, .flags = D|E },
+    { "listener",       NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRT_MODE_LISTENER },   INT_MIN, INT_MAX, .flags = D|E },
+    { "rendezvous",     NULL, 0, AV_OPT_TYPE_CONST,  { .i64 = SRT_MODE_RENDEZVOUS }, INT_MIN, INT_MAX, .flags = D|E },
+
+    { NULL }
+};
+
+static const AVClass srt_class = {
+    .class_name = "srt",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+static int srt_neterrno(void)
+{
+    int err = 0;
+    err = srt_getlasterror(NULL);
+    if (err == SRT_EASYNCRCV)
+        return AVERROR(EAGAIN);
+    return err;
+}
+
+static int srt_network_wait_fd(int fd, int write)
+{
+    SRTSOCKET ready[2];
+    int len = 2;
+    int ret = -1;
+    int eid = 0;
+    int modes = write ? SRT_EPOLL_OUT : SRT_EPOLL_IN;
+    eid = srt_epoll_create();
+    if (eid < 0)
+        return eid;
+    ret = srt_epoll_add_usock(eid, fd, &modes);
+    if (ret < 0) {
+        srt_epoll_release(eid);
+        return AVERROR(EINVAL);
+    }
+    if (write) {
+        ret = srt_epoll_wait(eid, 0, 0, ready, &len, -1, 0, 0, 0, 0);
+    } else {
+        ret = srt_epoll_wait(eid, ready, &len, 0, 0, -1, 0, 0, 0, 0);
+    }
+    srt_epoll_release(eid);
+    return (ret == SRT_ERROR) ? AVERROR(EAGAIN) : 0;
+}
+
+static int srt_socket_nonblock(int socket, int enable)
+{
+    int error = 0;
+    error = srt_setsockopt(socket, 0, SRTO_SNDSYN, &enable, sizeof(enable));
+    if (error < 0)
+        return error;
+    error = srt_setsockopt(socket, 0, SRTO_RCVSYN, &enable, sizeof(enable));
+    return error;
+}
+
+static void log_net_error(void *ctx, int level, const char* prefix)
+{
+    av_log(ctx, level, "%s %s\n", prefix, srt_getlasterror_str());
+}
+
+static struct addrinfo *srt_resolve_host(URLContext *h,
+                                         const char *hostname, int port,
+                                         int type, int family, int flags)
+{
+    struct addrinfo hints = { 0 }, *res = 0;
+    int error;
+    char sport[16];
+    const char *node = 0, *service = "0";
+
+    if (port > 0) {
+        snprintf(sport, sizeof(sport), "%d", port);
+        service = sport;
+    }
+    if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
+        node = hostname;
+    }
+    hints.ai_socktype = type;
+    hints.ai_family   = family;
+    hints.ai_flags = flags;
+    if ((error = getaddrinfo(node, service, &hints, &res))) {
+        res = NULL;
+        av_log(h, AV_LOG_ERROR, "getaddrinfo(%s, %s): %s\n",
+               node ? node : "unknown",
+               service,
+               gai_strerror(error));
+    }
+
+    return res;
+}
+
+static int srt_set_url(URLContext *h,
+                       struct sockaddr_storage *addr,
+                       const char *hostname, int port)
+{
+    struct addrinfo *res0;
+    int addr_len;
+
+    res0 = srt_resolve_host(h, hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
+    if (!res0) return AVERROR(EIO);
+    memcpy(addr, res0->ai_addr, res0->ai_addrlen);
+    addr_len = res0->ai_addrlen;
+    freeaddrinfo(res0);
+
+    return addr_len;
+}
+
+static int srt_socket_create(URLContext *h, struct sockaddr_storage *addr,
+                             socklen_t *addr_len, const char *localaddr)
+{
+    SRTContext *s = h->priv_data;
+    int srt_fd = SRT_INVALID_SOCK;
+    struct addrinfo *res0, *res;
+    int family = AF_UNSPEC;
+
+    if (((struct sockaddr *) &s->dest_addr)->sa_family)
+        family = ((struct sockaddr *) &s->dest_addr)->sa_family;
+    res0 = srt_resolve_host(h, (localaddr && localaddr[0]) ? localaddr : NULL,
+                            s->local_port,
+                            SOCK_DGRAM, family, AI_PASSIVE);
+    if (!res0)
+        goto fail;
+    for (res = res0; res; res=res->ai_next) {
+        srt_fd = srt_socket(res->ai_family, SOCK_DGRAM, 0);
+        if (srt_fd != SRT_INVALID_SOCK) break;
+        log_net_error(NULL, AV_LOG_ERROR, "socket");
+    }
+
+    if (srt_fd < 0)
+        goto fail;
+
+    memcpy(addr, res->ai_addr, res->ai_addrlen);
+    *addr_len = res->ai_addrlen;
+
+    freeaddrinfo(res0);
+
+    return srt_fd;
+
+ fail:
+    if (srt_fd >= 0)
+        srt_close(srt_fd);
+    if (res0)
+        freeaddrinfo(res0);
+    return -1;
+}
+
+static int srt_port(struct sockaddr_storage *addr, int addr_len)
+{
+    char sbuf[sizeof(int)*3+1];
+    int error;
+
+    if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV))) {
+        av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
+        return -1;
+    }
+
+    return strtol(sbuf, NULL, 10);
+}
+
+
+/**
+ * If no filename is given to av_open_input_file because you want to
+ * get the local port first, then you must call this function to set
+ * the remote server address.
+ *
+ * url syntax: srt://host:port[?option=val...]
+ * option  'localport=n' : set the local port
+ *         'pkt_size=n'  : set max packet size
+ *         'reuse=1'     : enable reusing the socket
+ *         'overrun_nonfatal=1': survive in case of circular buffer overrun
+ *
+ * @param h media file context
+ * @param uri of the remote server
+ * @return zero if no error.
+ */
+int ff_srt_set_remote_url(URLContext *h, const char *uri)
+{
+    SRTContext *s = h->priv_data;
+    char hostname[256], buf[10];
+    int port;
+    const char *p;
+
+    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+    /* set the destination address */
+    s->dest_addr_len = srt_set_url(h, &s->dest_addr, hostname, port);
+    if (s->dest_addr_len < 0) {
+        return AVERROR(EIO);
+    }
+    p = strchr(uri, '?');
+    if (p) {
+        if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
+            int was_connected = s->is_connected;
+            s->is_connected = strtol(buf, NULL, 10);
+            if (s->is_connected && !was_connected) {
+                if (srt_connect(s->srt_fd, (struct sockaddr *) &s->dest_addr,
+                            s->dest_addr_len)) {
+                    s->is_connected = 0;
+                    log_net_error(h, AV_LOG_ERROR, "connect");
+                    return AVERROR(EIO);
+                }
+            }
+        }
+    }
+
+    return 0;
+}
+
+/**
+ * Return the local port used by the UDP connection
+ * @param h media file context
+ * @return the local port number
+ */
+int ff_srt_get_local_port(URLContext *h)
+{
+    SRTContext *s = h->priv_data;
+    return s->local_port;
+}
+
+/**
+ * Return the srt file handle for select() usage to wait for several RTP
+ * streams at the same time.
+ * @param h media file context
+ */
+static int srt_get_file_handle(URLContext *h)
+{
+    SRTContext *s = h->priv_data;
+    return s->srt_fd;
+}
+
+#if HAVE_PTHREAD_CANCEL
+static void *circular_buffer_task_rx(void * ctx)
+{
+    URLContext *h = ctx;
+    SRTContext *s = h->priv_data;
+    int old_cancelstate;
+
+    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+    pthread_mutex_lock(&s->mutex);
+    if (srt_socket_nonblock(s->srt_fd, 0) < 0) {
+        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+        s->circular_buffer_error = AVERROR(EIO);
+        goto end;
+    }
+    while (1) {
+        int len;
+
+        pthread_mutex_unlock(&s->mutex);
+        /* Blocking operations are always cancellation points;
+           see "General Information" / "Thread Cancelation Overview"
+           in Single Unix. */
+        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+        len = srt_recvmsg(s->srt_fd, s->tmp+4, sizeof(s->tmp)-4);
+        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+        pthread_mutex_lock(&s->mutex);
+        if (len < 0) {
+
+            if (srt_neterrno() != AVERROR(EAGAIN) && srt_neterrno() != AVERROR(EINTR)) {
+                log_net_error(NULL, AV_LOG_ERROR, "srt_recvmsg)");
+                s->circular_buffer_error = srt_neterrno();
+                goto end;
+            }
+            continue;
+        }
+        AV_WL32(s->tmp, len);
+
+        if (av_fifo_space(s->fifo) < len + 4) {
+            /* No Space left */
+            if (s->overrun_nonfatal) {
+                av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
+                        "Surviving due to overrun_nonfatal option\n");
+                continue;
+            } else {
+                av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
+                        "To avoid, increase fifo_size URL option. "
+                        "To survive in such case, use overrun_nonfatal option\n");
+                s->circular_buffer_error = AVERROR(EIO);
+                goto end;
+            }
+        }
+        av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
+        pthread_cond_signal(&s->cond);
+    }
+
+end:
+    pthread_cond_signal(&s->cond);
+    pthread_mutex_unlock(&s->mutex);
+    return NULL;
+}
+
+static void *circular_buffer_task_tx(void * ctx)
+{
+    URLContext *h = ctx;
+    SRTContext *s = h->priv_data;
+    int old_cancelstate;
+    int64_t target_timestamp = av_gettime_relative();
+    int64_t start_timestamp = av_gettime_relative();
+    int64_t sent_bits = 0;
+    int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
+    int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
+
+    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+    pthread_mutex_lock(&s->mutex);
+
+    if (srt_socket_nonblock(s->srt_fd, 0) < 0) {
+        av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
+        s->circular_buffer_error = AVERROR(EIO);
+        goto end;
+    }
+
+    while (1) {
+        int len;
+        const uint8_t *p;
+        uint8_t tmp[4];
+        int64_t timestamp;
+
+        len=av_fifo_size(s->fifo);
+
+        while (len<4) {
+            if (s->close_req)
+                goto end;
+            if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
+                goto end;
+            }
+            len=av_fifo_size(s->fifo);
+        }
+
+        av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+        len=AV_RL32(tmp);
+
+        av_assert0(len >= 0);
+        av_assert0(len <= sizeof(s->tmp));
+
+        av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
+
+        pthread_mutex_unlock(&s->mutex);
+        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+        if (s->bitrate) {
+            timestamp = av_gettime_relative();
+            if (timestamp < target_timestamp) {
+                int64_t delay = target_timestamp - timestamp;
+                if (delay > max_delay) {
+                    delay = max_delay;
+                    start_timestamp = timestamp + delay;
+                    sent_bits = 0;
+                }
+                av_usleep(delay);
+            } else {
+                if (timestamp - burst_interval > target_timestamp) {
+                    start_timestamp = timestamp - burst_interval;
+                    sent_bits = 0;
+                }
+            }
+            sent_bits += len * 8;
+            target_timestamp = start_timestamp + sent_bits * 1000000 / s->bitrate;
+        }
+
+        p = s->tmp;
+        while (len) {
+            int ret;
+            av_assert0(len > 0);
+            ret = srt_sendmsg(s->srt_fd, p, len, -1, 0);
+            if (ret >= 0) {
+                len -= ret;
+                p   += ret;
+            } else {
+                ret = srt_neterrno();
+                if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
+                    pthread_mutex_lock(&s->mutex);
+                    s->circular_buffer_error = ret;
+                    pthread_mutex_unlock(&s->mutex);
+                    return NULL;
+                }
+            }
+        }
+
+        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+        pthread_mutex_lock(&s->mutex);
+    }
+
+end:
+    pthread_mutex_unlock(&s->mutex);
+    return NULL;
+}
+
+
+#endif
+
+static int parse_source_list(char *buf, const char **sources, int *num_sources,
+                             int max_sources)
+{
+    const char *source_start;
+
+    source_start = buf;
+    while (1) {
+        char *next = strchr(source_start, ',');
+        if (next)
+            *next = '\0';
+        sources[*num_sources] = av_strdup(source_start);
+        if (!sources[*num_sources])
+            return AVERROR(ENOMEM);
+        source_start = next + 1;
+        (*num_sources)++;
+        if (*num_sources >= max_sources || !next)
+            break;
+    }
+    return 0;
+}
+
+/* - The "POST" options can be altered any time on a connected socket.
+     They MAY have also some meaning when set prior to connecting; such
+     option is SRTO_RCVSYN, which makes connect/accept call asynchronous.
+     Because of that this option is treated special way in this app. */
+static int srt_set_options_post(URLContext *h, int srt_fd)
+{
+    SRTContext *s = h->priv_data;
+
+    if (s->inputbw >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_INPUTBW, &s->inputbw, sizeof(s->inputbw)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_INPUTBW)");
+        goto fail;
+    }
+    if (s->oheadbw >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_OHEADBW, &s->oheadbw, sizeof(s->oheadbw)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_OHEADBW)");
+        goto fail;
+    }
+    return 0;
+fail:
+    return AVERROR(EIO);
+}
+
+/* - The "PRE" options must be set prior to connecting and can't be altered
+/    on a connected socket, however if set on a listening socket, they are
+/    derived by accept-ed socket. */
+static int srt_set_options_pre(URLContext *h, int srt_fd)
+{
+    SRTContext *s = h->priv_data;
+    int yes = 1;
+
+    if (s->mode == SRT_MODE_RENDEZVOUS && srt_setsockopt(srt_fd, 0, SRTO_RENDEZVOUS, &yes, sizeof(yes)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_RENDEZVOUS)");
+        goto fail;
+    }
+
+    if (s->maxbw >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_MAXBW, &s->maxbw, sizeof(s->maxbw)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_MAXBW)");
+        goto fail;
+    }
+    if (s->pbkeylen >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_PBKEYLEN, &s->pbkeylen, sizeof(s->pbkeylen)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_PBKEYLEN)");
+        goto fail;
+    }
+    if (s->passphrase[0] && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_PASSPHRASE, &s->passphrase, sizeof(s->passphrase)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_PASSPHRASE)");
+        goto fail;
+    }
+    if (s->mss >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_MSS, &s->mss, sizeof(s->mss)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_MSS)");
+        goto fail;
+    }
+    if (s->fc >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_FC, &s->fc, sizeof(s->fc)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_FC)");
+        goto fail;
+    }
+    if (s->ipttl >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_IPTTL, &s->ipttl, sizeof(s->ipttl)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_IPTTL)");
+        goto fail;
+    }
+    if (s->iptos >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_IPTOS, &s->iptos, sizeof(s->iptos)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_IPTOS)");
+        goto fail;
+    }
+    if (s->tsbpddelay >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_TSBPDDELAY, &s->tsbpddelay, sizeof(s->tsbpddelay)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_TSBPDDELAY)");
+        goto fail;
+    }
+    if (s->tlpktdrop >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_TLPKTDROP, &s->tlpktdrop, sizeof(s->tlpktdrop)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_TLPKTDROP)");
+        goto fail;
+    }
+    if (s->nakreport >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_NAKREPORT, &s->nakreport, sizeof(s->nakreport)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_NAKREPORT)");
+        goto fail;
+    }
+    if (s->conntimeo >= 0 && srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_CONNTIMEO, &s->conntimeo, sizeof(s->conntimeo)) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "setsockopt(SRTO_CONNTIMEO)");
+        goto fail;
+    }
+    return 0;
+fail:
+    return AVERROR(EIO);
+}
+
+/* put it in UDP context */
+/* return non zero if error */
+static int srt_open(URLContext *h, const char *uri, int flags)
+{
+    char hostname[1024], localaddr[1024] = "";
+    int port, srt_fd = SRT_INVALID_SOCK, tmp, bind_ret = -1;
+    int listen_fd = SRT_INVALID_SOCK;
+    SRTContext *s = h->priv_data;
+    int is_output;
+    const char *p;
+    char buf[256];
+    struct sockaddr_storage my_addr;
+    socklen_t len, tmplen;
+    int i, num_include_sources = 0, num_exclude_sources = 0;
+    char *include_sources[32], *exclude_sources[32];
+
+    h->is_streamed = 1;
+
+    is_output = !(flags & AVIO_FLAG_READ);
+    if (s->buffer_size < 0)
+        s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
+
+    if (s->sources) {
+        if (parse_source_list(s->sources, include_sources,
+                              &num_include_sources,
+                              FF_ARRAY_ELEMS(include_sources)))
+            goto fail;
+    }
+
+    if (s->block) {
+        if (parse_source_list(s->block, exclude_sources, &num_exclude_sources,
+                              FF_ARRAY_ELEMS(exclude_sources)))
+            goto fail;
+    }
+
+    if (s->pkt_size > 0)
+        h->max_packet_size = s->pkt_size;
+
+    p = strchr(uri, '?');
+    if (p) {
+        if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
+            char *endptr = NULL;
+            s->reuse_socket = strtol(buf, &endptr, 10);
+            /* assume if no digits were found it is a request to enable it */
+            if (buf == endptr)
+                s->reuse_socket = 1;
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
+            char *endptr = NULL;
+            s->overrun_nonfatal = strtol(buf, &endptr, 10);
+            /* assume if no digits were found it is a request to enable it */
+            if (buf == endptr)
+                s->overrun_nonfatal = 1;
+            if (!HAVE_PTHREAD_CANCEL)
+                av_log(h, AV_LOG_WARNING,
+                       "'overrun_nonfatal' option was set but it is not supported "
+                       "on this build (pthread support is required)\n");
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
+            s->local_port = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
+            s->pkt_size = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
+            s->buffer_size = strtol(buf, NULL, 10);
+        }
+        s->is_connected = 1;
+        if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
+            s->circular_buffer_size = strtol(buf, NULL, 10);
+            if (!HAVE_PTHREAD_CANCEL)
+                av_log(h, AV_LOG_WARNING,
+                       "'circular_buffer_size' option was set but it is not supported "
+                       "on this build (pthread support is required)\n");
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
+            s->bitrate = strtoll(buf, NULL, 10);
+            if (!HAVE_PTHREAD_CANCEL)
+                av_log(h, AV_LOG_WARNING,
+                       "'bitrate' option was set but it is not supported "
+                       "on this build (pthread support is required)\n");
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "burst_bits", p)) {
+            s->burst_bits = strtoll(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
+            av_strlcpy(localaddr, buf, sizeof(localaddr));
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
+            if (parse_source_list(buf, include_sources, &num_include_sources,
+                                  FF_ARRAY_ELEMS(include_sources)))
+                goto fail;
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
+            if (parse_source_list(buf, exclude_sources, &num_exclude_sources,
+                                  FF_ARRAY_ELEMS(exclude_sources)))
+                goto fail;
+        }
+        if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
+            s->timeout = strtol(buf, NULL, 10);
+
+        /* SRT options (srt/srt.h) */
+        if (av_find_info_tag(buf, sizeof(buf), "maxbw", p)) {
+            s->maxbw = strtoll(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "pbkeylen", p)) {
+            s->pbkeylen = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "passphrase", p)) {
+            av_strlcpy(s->passphrase, buf, sizeof(s->passphrase));
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "mss", p)) {
+            s->mss = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "fc", p)) {
+            s->fc = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "ipttl", p)) {
+            s->ipttl = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "iptos", p)) {
+            s->iptos = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "inputbw", p)) {
+            s->inputbw = strtoll(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "oheadbw", p)) {
+            s->oheadbw = strtoll(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "tsbpddelay", p)) {
+            s->tsbpddelay = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "tlpktdrop", p)) {
+            s->tlpktdrop = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "nakreport", p)) {
+            s->nakreport = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "conntimeo", p)) {
+            s->conntimeo = strtol(buf, NULL, 10);
+        }
+        if (av_find_info_tag(buf, sizeof(buf), "mode", p)) {
+            if (!strcmp(buf, "caller")) {
+                s->mode = SRT_MODE_CALLER;
+            } else if (!strcmp(buf, "listener")) {
+                s->mode = SRT_MODE_LISTENER;
+            } else if (!strcmp(buf, "rendezvous")) {
+                s->mode = SRT_MODE_RENDEZVOUS;
+            }
+        }
+    }
+    /* handling needed to support options picking from both AVOption and URL */
+    s->circular_buffer_size *= 188;
+    if (flags & AVIO_FLAG_WRITE) {
+        h->max_packet_size = s->pkt_size;
+    } else {
+        h->max_packet_size = UDP_MAX_PKT_SIZE;
+    }
+    h->rw_timeout = s->timeout;
+
+    /* fill the dest addr */
+    av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+    /* XXX: fix av_url_split */
+    if (hostname[0] == '\0' || hostname[0] == '?') {
+        /* only accepts null hostname if input */
+        if (!(flags & AVIO_FLAG_READ))
+            goto fail;
+    } else {
+        if (ff_srt_set_remote_url(h, uri) < 0)
+            goto fail;
+    }
+
+    if ((s->local_port <= 0) && (h->flags & AVIO_FLAG_READ))
+        s->local_port = port;
+
+    if (localaddr[0]) {
+        srt_fd = srt_socket_create(h, &my_addr, &len, localaddr);
+    } else {
+        srt_fd = srt_socket_create(h, &my_addr, &len, s->localaddr);
+    }
+    if (srt_fd < 0)
+        goto fail;
+
+
+    if (srt_set_options_pre(h, srt_fd) < 0)
+        goto fail;
+
+    /* Follow the requested reuse option */
+    if (s->reuse_socket > 0) {
+        s->reuse_socket = 1;
+        if (srt_setsockopt (srt_fd, SOL_SOCKET, SRTO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)))
+            goto fail;
+    }
+    if (is_output) {
+        /* limit the tx buf size to limit latency */
+        tmp = s->buffer_size;
+        if (srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
+            log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
+            goto fail;
+        }
+    } else {
+        /* set srt recv buffer size to the requested value (default 64K) */
+        tmp = s->buffer_size;
+        if (srt_setsockopt(srt_fd, SOL_SOCKET, SRTO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
+            log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
+        }
+        tmplen = sizeof(tmp);
+        if (srt_getsockopt(srt_fd, SOL_SOCKET, SRTO_RCVBUF, &tmp, &tmplen) < 0) {
+            log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
+        } else {
+            av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
+            if (tmp < s->buffer_size)
+                av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp);
+        }
+
+        /* make the socket non-blocking */
+        srt_socket_nonblock(srt_fd, 1);
+    }
+    /* the bind is needed to give a port to the socket now */
+    if (bind_ret < 0 && srt_bind(srt_fd,(struct sockaddr *)&my_addr, len) < 0) {
+        log_net_error(h, AV_LOG_ERROR, "bind failed");
+        goto fail;
+    }
+
+    if (s->mode == SRT_MODE_LISTENER) {
+        listen_fd = srt_fd;
+        srt_fd = SRT_INVALID_SOCK;
+
+        if (srt_listen(listen_fd, 1))
+            goto fail;
+        srt_fd = srt_accept(listen_fd, NULL, NULL);
+        if (srt_fd == SRT_INVALID_SOCK)
+            goto fail;
+    }
+
+    len = sizeof(my_addr);
+    srt_getsockname(srt_fd, (struct sockaddr *)&my_addr, &len);
+    s->local_port = srt_port(&my_addr, len);
+
+    if (s->mode != SRT_MODE_LISTENER) {
+        if (srt_connect(srt_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
+            log_net_error(h, AV_LOG_ERROR, "connect");
+            goto fail;
+        }
+    }
+    if (srt_set_options_post(h, srt_fd) < 0)
+        goto fail;
+
+    for (i = 0; i < num_include_sources; i++)
+        av_freep(&include_sources[i]);
+    for (i = 0; i < num_exclude_sources; i++)
+        av_freep(&exclude_sources[i]);
+
+    s->srt_fd = srt_fd;
+
+#if HAVE_PTHREAD_CANCEL
+    /*
+      Create thread in case of:
+      1. Input and circular_buffer_size is set
+      2. Output and bitrate and circular_buffer_size is set
+    */
+
+    if (is_output && s->bitrate && !s->circular_buffer_size) {
+        /* Warn user in case of 'circular_buffer_size' is not set */
+        av_log(h, AV_LOG_WARNING,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
+    }
+
+    if ((!is_output && s->circular_buffer_size) || (is_output && s->bitrate && s->circular_buffer_size)) {
+        int ret;
+
+        /* start the task going */
+        s->fifo = av_fifo_alloc(s->circular_buffer_size);
+        ret = pthread_mutex_init(&s->mutex, NULL);
+        if (ret) {
+            av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
+            goto fail;
+        }
+        ret = pthread_cond_init(&s->cond, NULL);
+        if (ret) {
+            av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
+            goto cond_fail;
+        }
+        ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
+        if (ret) {
+            av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
+            goto thread_fail;
+        }
+        s->thread_started = 1;
+    }
+#endif
+
+    return 0;
+#if HAVE_PTHREAD_CANCEL
+ thread_fail:
+    pthread_cond_destroy(&s->cond);
+ cond_fail:
+    pthread_mutex_destroy(&s->mutex);
+#endif
+ fail:
+    if (srt_fd >= 0)
+        srt_close(srt_fd);
+    av_fifo_freep(&s->fifo);
+    for (i = 0; i < num_include_sources; i++)
+        av_freep(&include_sources[i]);
+    for (i = 0; i < num_exclude_sources; i++)
+        av_freep(&exclude_sources[i]);
+    return AVERROR(EIO);
+}
+
+static int srt_read(URLContext *h, uint8_t *buf, int size)
+{
+    SRTContext *s = h->priv_data;
+    int ret;
+#if HAVE_PTHREAD_CANCEL
+    int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
+
+    if (s->fifo) {
+        pthread_mutex_lock(&s->mutex);
+        do {
+            avail = av_fifo_size(s->fifo);
+            if (avail) {
+                uint8_t tmp[4];
+
+                av_fifo_generic_read(s->fifo, tmp, 4, NULL);
+                avail= AV_RL32(tmp);
+                if (avail > size){
+                    av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
+                    avail= size;
+                }
+
+                av_fifo_generic_read(s->fifo, buf, avail, NULL);
+                av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
+                pthread_mutex_unlock(&s->mutex);
+                return avail;
+            } else if (s->circular_buffer_error){
+                int err = s->circular_buffer_error;
+                pthread_mutex_unlock(&s->mutex);
+                return err;
+            } else if (nonblock) {
+                pthread_mutex_unlock(&s->mutex);
+                return AVERROR(EAGAIN);
+            } else {
+                /* FIXME: using the monotonic clock would be better,
+                   but it does not exist on all supported platforms. */
+                int64_t t = av_gettime() + 100000;
+                struct timespec tv = { .tv_sec  =  t / 1000000,
+                                       .tv_nsec = (t % 1000000) * 1000 };
+                if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
+                    pthread_mutex_unlock(&s->mutex);
+                    return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
+                }
+                nonblock = 1;
+            }
+        } while (1);
+    }
+#endif
+
+    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+        ret = srt_network_wait_fd(s->srt_fd, 0);
+        if (ret < 0)
+            return ret;
+    }
+    ret = srt_recvmsg(s->srt_fd, buf, size);
+
+    return ret < 0 ? srt_neterrno() : ret;
+}
+
+static int srt_write(URLContext *h, const uint8_t *buf, int size)
+{
+    SRTContext *s = h->priv_data;
+    int ret;
+
+#if HAVE_PTHREAD_CANCEL
+    if (s->fifo) {
+        uint8_t tmp[4];
+
+        pthread_mutex_lock(&s->mutex);
+
+        /*
+          Return error if last tx failed.
+          Here we can't know on which packet error was, but it needs to know that error exists.
+        */
+        if (s->circular_buffer_error<0) {
+            int err=s->circular_buffer_error;
+            pthread_mutex_unlock(&s->mutex);
+            return err;
+        }
+
+        if (av_fifo_space(s->fifo) < size + 4) {
+            /* What about a partial packet tx ? */
+            pthread_mutex_unlock(&s->mutex);
+            return AVERROR(ENOMEM);
+        }
+        AV_WL32(tmp, size);
+        av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
+        av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+        return size;
+    }
+#endif
+    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+        ret = srt_network_wait_fd(s->srt_fd, 1);
+        if (ret < 0)
+            return ret;
+    }
+
+    ret = srt_sendmsg(s->srt_fd, buf, size, -1, 0);
+
+    return ret < 0 ? srt_neterrno() : ret;
+}
+
+/* use different name to avoid conflict with srt_close from srt/srt.h */
+static int do_srt_close(URLContext *h)
+{
+    SRTContext *s = h->priv_data;
+
+#if HAVE_PTHREAD_CANCEL
+    /* Request close once writing is finished */
+    if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
+        pthread_mutex_lock(&s->mutex);
+        s->close_req = 1;
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+    }
+#endif
+
+#if HAVE_PTHREAD_CANCEL
+    if (s->thread_started) {
+        int ret;
+        /* Cancel only read, as write has been signaled as success to the user */
+        if (h->flags & AVIO_FLAG_READ)
+            pthread_cancel(s->circular_buffer_thread);
+        ret = pthread_join(s->circular_buffer_thread, NULL);
+        if (ret)
+            av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
+        pthread_mutex_destroy(&s->mutex);
+        pthread_cond_destroy(&s->cond);
+    }
+#endif
+    srt_close(s->srt_fd);
+    av_fifo_freep(&s->fifo);
+    return 0;
+}
+
+const URLProtocol ff_srt_protocol = {
+    .name                = "srt",
+    .url_open            = srt_open,
+    .url_read            = srt_read,
+    .url_write           = srt_write,
+    .url_close           = do_srt_close,
+    .url_get_file_handle = srt_get_file_handle,
+    .priv_data_size      = sizeof(SRTContext),
+    .priv_data_class     = &srt_class,
+    .flags               = URL_PROTOCOL_FLAG_NETWORK,
+};
+
diff --git a/libavformat/url.h b/libavformat/url.h
index 4750bff..d6e034e 100644
--- a/libavformat/url.h
+++ b/libavformat/url.h
@@ -279,6 +279,10 @@ int ff_check_interrupt(AVIOInterruptCB *cb);
 int ff_udp_set_remote_url(URLContext *h, const char *uri);
 int ff_udp_get_local_port(URLContext *h);
 
+/* srt.c */
+int ff_srt_set_remote_url(URLContext *h, const char *uri);
+int ff_srt_get_local_port(URLContext *h);
+
 /**
  * Assemble a URL string from components. This is the reverse operation
  * of av_url_split.
-- 
2.7.4



More information about the ffmpeg-devel mailing list