[FFmpeg-devel] [PATCH 4/4] avformat/udp: do not use thread cancellation when receiving data

Marton Balint cus at passwd.hu
Thu Jan 16 02:20:16 EET 2020


It is not supported by every threading implementation, and the only thing we
gain with it is an immediate shutdown of receiving packets on close and
avoiding the poll call before reading the data.

I don't think it is a big issue if it takes 0.1 sec of delay to close an udp
stream. Back when this was introduced the delay was 1 sec, which was indeed
noticable.

And anybody who needs performance sensitive UDP should not use the fifo buffer
in the first place, because the kernel can buffer the data much more
effectively.

Signed-off-by: Marton Balint <cus at passwd.hu>
---
 libavformat/udp.c | 57 +++++++++++++++++++++++++------------------------------
 1 file changed, 26 insertions(+), 31 deletions(-)

diff --git a/libavformat/udp.c b/libavformat/udp.c
index 11af9b2500..75fa8c5e31 100644
--- a/libavformat/udp.c
+++ b/libavformat/udp.c
@@ -61,8 +61,8 @@
 #define IPPROTO_UDPLITE                                  136
 #endif
 
-#if HAVE_PTHREAD_CANCEL
-#include <pthread.h>
+#if HAVE_THREADS
+#include "libavutil/thread.h"
 #endif
 
 #ifndef IPV6_ADD_MEMBERSHIP
@@ -98,7 +98,7 @@ typedef struct UDPContext {
     int64_t bitrate; /* number of bits to send per second */
     int64_t burst_bits;
     int close_req;
-#if HAVE_PTHREAD_CANCEL
+#if HAVE_THREADS
     pthread_t circular_buffer_thread;
     pthread_mutex_t mutex;
     pthread_cond_t cond;
@@ -454,32 +454,34 @@ static int udp_get_file_handle(URLContext *h)
     return s->udp_fd;
 }
 
-#if HAVE_PTHREAD_CANCEL
-static void *circular_buffer_task_rx( void *_URLContext)
+#if HAVE_THREADS
+static void *circular_buffer_task_rx(void *_URLContext)
 {
     URLContext *h = _URLContext;
     UDPContext *s = h->priv_data;
-    int old_cancelstate;
 
-    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
     pthread_mutex_lock(&s->mutex);
     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
         av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
         s->circular_buffer_error = AVERROR(EIO);
         goto end;
     }
-    while(1) {
-        int len;
+    while (!s->close_req) {
+        int ret, len;
         struct sockaddr_storage addr;
         socklen_t addr_len = sizeof(addr);
 
         pthread_mutex_unlock(&s->mutex);
-        /* Blocking operations are always cancellation points;
-           see "General Information" / "Thread Cancelation Overview"
-           in Single Unix. */
-        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+        ret = ff_network_wait_fd(s->udp_fd, 0);
+        if (ret < 0) {
+            pthread_mutex_lock(&s->mutex);
+            if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
+                s->circular_buffer_error = ret;
+                goto end;
+            }
+            continue;
+        }
         len = recvfrom(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0, (struct sockaddr *)&addr, &addr_len);
-        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
         pthread_mutex_lock(&s->mutex);
         if (len < 0) {
             if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
@@ -520,14 +522,12 @@ static void *circular_buffer_task_tx( void *_URLContext)
 {
     URLContext *h = _URLContext;
     UDPContext *s = h->priv_data;
-    int old_cancelstate;
     int64_t target_timestamp = av_gettime_relative();
     int64_t start_timestamp = av_gettime_relative();
     int64_t sent_bits = 0;
     int64_t burst_interval = s->bitrate ? (s->burst_bits * 1000000 / s->bitrate) : 0;
     int64_t max_delay = s->bitrate ?  ((int64_t)h->max_packet_size * 8 * 1000000 / s->bitrate + 1) : 0;
 
-    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
     pthread_mutex_lock(&s->mutex);
 
     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
@@ -562,7 +562,6 @@ static void *circular_buffer_task_tx( void *_URLContext)
         av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
 
         pthread_mutex_unlock(&s->mutex);
-        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
 
         if (s->bitrate) {
             timestamp = av_gettime_relative();
@@ -608,7 +607,6 @@ static void *circular_buffer_task_tx( void *_URLContext)
             }
         }
 
-        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
         pthread_mutex_lock(&s->mutex);
     }
 
@@ -667,7 +665,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
             /* assume if no digits were found it is a request to enable it */
             if (buf == endptr)
                 s->overrun_nonfatal = 1;
-            if (!HAVE_PTHREAD_CANCEL)
+            if (!HAVE_THREADS)
                 av_log(h, AV_LOG_WARNING,
                        "'overrun_nonfatal' option was set but it is not supported "
                        "on this build (pthread support is required)\n");
@@ -695,14 +693,14 @@ static int udp_open(URLContext *h, const char *uri, int flags)
         }
         if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
             s->circular_buffer_size = strtol(buf, NULL, 10);
-            if (!HAVE_PTHREAD_CANCEL)
+            if (!HAVE_THREADS)
                 av_log(h, AV_LOG_WARNING,
                        "'circular_buffer_size' option was set but it is not supported "
                        "on this build (pthread support is required)\n");
         }
         if (av_find_info_tag(buf, sizeof(buf), "bitrate", p)) {
             s->bitrate = strtoll(buf, NULL, 10);
-            if (!HAVE_PTHREAD_CANCEL)
+            if (!HAVE_THREADS)
                 av_log(h, AV_LOG_WARNING,
                        "'bitrate' option was set but it is not supported "
                        "on this build (pthread support is required)\n");
@@ -877,7 +875,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 
     s->udp_fd = udp_fd;
 
-#if HAVE_PTHREAD_CANCEL
+#if HAVE_THREADS
     /*
       Create thread in case of:
       1. Input and circular_buffer_size is set
@@ -914,7 +912,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
 #endif
 
     return 0;
-#if HAVE_PTHREAD_CANCEL
+#if HAVE_THREADS
  thread_fail:
     pthread_cond_destroy(&s->cond);
  cond_fail:
@@ -944,7 +942,7 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
     int ret;
     struct sockaddr_storage addr;
     socklen_t addr_len = sizeof(addr);
-#if HAVE_PTHREAD_CANCEL
+#if HAVE_THREADS
     int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
 
     if (s->fifo) {
@@ -1008,7 +1006,7 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
     UDPContext *s = h->priv_data;
     int ret;
 
-#if HAVE_PTHREAD_CANCEL
+#if HAVE_THREADS
     if (s->fifo) {
         uint8_t tmp[4];
 
@@ -1057,9 +1055,9 @@ static int udp_close(URLContext *h)
 {
     UDPContext *s = h->priv_data;
 
-#if HAVE_PTHREAD_CANCEL
+#if HAVE_THREADS
     // Request close once writing is finished
-    if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
+    if (s->thread_started) {
         pthread_mutex_lock(&s->mutex);
         s->close_req = 1;
         pthread_cond_signal(&s->cond);
@@ -1069,12 +1067,9 @@ static int udp_close(URLContext *h)
 
     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
-#if HAVE_PTHREAD_CANCEL
+#if HAVE_THREADS
     if (s->thread_started) {
         int ret;
-        // Cancel only read, as write has been signaled as success to the user
-        if (h->flags & AVIO_FLAG_READ)
-            pthread_cancel(s->circular_buffer_thread);
         ret = pthread_join(s->circular_buffer_thread, NULL);
         if (ret != 0)
             av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
-- 
2.16.4



More information about the ffmpeg-devel mailing list