[FFmpeg-cvslog] udp: Replace double select() by select+mutex+cond.

Michael Niedermayer git at videolan.org
Fri Dec 23 02:03:05 CET 2011


ffmpeg | branch: master | Michael Niedermayer <michaelni at gmx.at> | Fri Dec 23 01:17:18 2011 +0100| [bc900501e0e2002e40d2d0c87b5a98b913b2d1a2] | committer: Michael Niedermayer

udp: Replace double select() by select+mutex+cond.

When no data was available both the buffer thread as well as
the main thread would block in select(), when data becomes
available both should move forward and as data is read in the
buffer thread the main thread would block in select() later
the read data was put in the fifo but the main thread still
would be blocked in select() until either the timeout or
another packet would come in.

This is solved in this commit by using a mutex and a condition
variable

Signed-off-by: Michael Niedermayer <michaelni at gmx.at>

> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=bc900501e0e2002e40d2d0c87b5a98b913b2d1a2
---

 libavformat/udp.c |   41 +++++++++++++++++++++++++++--------------
 1 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/libavformat/udp.c b/libavformat/udp.c
index 3cdee06..d360bd6 100644
--- a/libavformat/udp.c
+++ b/libavformat/udp.c
@@ -69,6 +69,8 @@ typedef struct {
     int circular_buffer_error;
 #if HAVE_PTHREADS
     pthread_t circular_buffer_thread;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
 #endif
     uint8_t tmp[UDP_MAX_PKT_SIZE+4];
     int remaining_in_dg;
@@ -317,6 +319,7 @@ static int udp_get_file_handle(URLContext *h)
     return s->udp_fd;
 }
 
+#if HAVE_PTHREADS
 static void *circular_buffer_task( void *_URLContext)
 {
     URLContext *h = _URLContext;
@@ -331,7 +334,7 @@ static void *circular_buffer_task( void *_URLContext)
 
         if (ff_check_interrupt(&h->interrupt_callback)) {
             s->circular_buffer_error = EINTR;
-            return NULL;
+            goto end;
         }
 
         FD_ZERO(&rfds);
@@ -343,7 +346,7 @@ static void *circular_buffer_task( void *_URLContext)
             if (ff_neterrno() == AVERROR(EINTR))
                 continue;
             s->circular_buffer_error = EIO;
-            return NULL;
+            goto end;
         }
 
         if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
@@ -357,23 +360,31 @@ static void *circular_buffer_task( void *_URLContext)
         if(left < UDP_MAX_PKT_SIZE + 4) {
             av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
             s->circular_buffer_error = EIO;
-            return NULL;
+            goto end;
         }
         left = FFMIN(left, s->fifo->end - s->fifo->wptr);
         len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
         if (len < 0) {
             if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
                 s->circular_buffer_error = EIO;
-                return NULL;
+                goto end;
             }
             continue;
         }
         AV_WL32(s->tmp, len);
+        pthread_mutex_lock(&s->mutex);
         av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
     }
 
+end:
+    pthread_mutex_lock(&s->mutex);
+    pthread_cond_signal(&s->cond);
+    pthread_mutex_unlock(&s->mutex);
     return NULL;
 }
+#endif
 
 /* put it in UDP context */
 /* return non zero if error */
@@ -516,6 +527,8 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     if (!is_output && s->circular_buffer_size) {
         /* start the task going */
         s->fifo = av_fifo_alloc(s->circular_buffer_size);
+        pthread_mutex_init(&s->mutex, NULL);
+        pthread_cond_init(&s->cond, NULL);
         if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) {
             av_log(h, AV_LOG_ERROR, "pthread_create failed\n");
             goto fail;
@@ -536,15 +549,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
     UDPContext *s = h->priv_data;
     int ret;
     int avail;
-    fd_set rfds;
-    struct timeval tv;
 
+#if HAVE_PTHREADS
     if (s->fifo) {
-
+        pthread_mutex_lock(&s->mutex);
         do {
             avail = av_fifo_size(s->fifo);
             if (avail) { // >=size) {
                 uint8_t tmp[4];
+                pthread_mutex_unlock(&s->mutex);
 
                 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
                 avail= AV_RL32(tmp);
@@ -557,19 +570,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
                 av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
                 return avail;
             } else if(s->circular_buffer_error){
+                pthread_mutex_unlock(&s->mutex);
                 return s->circular_buffer_error;
             }
             else {
-                FD_ZERO(&rfds);
-                FD_SET(s->udp_fd, &rfds);
-                tv.tv_sec = 1;
-                tv.tv_usec = 0;
-                ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
-                if (ret<0)
-                    return ret;
+                pthread_cond_wait(&s->cond, &s->mutex);
             }
         } while( 1);
     }
+#endif
 
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
         ret = ff_network_wait_fd(s->udp_fd, 0);
@@ -610,6 +619,10 @@ static int udp_close(URLContext *h)
         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
     closesocket(s->udp_fd);
     av_fifo_free(s->fifo);
+#if HAVE_PTHREADS
+    pthread_mutex_destroy(&s->mutex);
+    pthread_cond_destroy(&s->cond);
+#endif
     return 0;
 }
 



More information about the ffmpeg-cvslog mailing list