From 03f8f39769f385f04b770c748a46298e46ab3196 Mon Sep 17 00:00:00 2001 From: Michael Niedermayer Date: Fri, 13 May 2011 17:16:15 +0200 Subject: [PATCH] udp: switch custom fifo to libavutil fifo. This also allows us to drop the semaphore as our fifo is lock free. Signed-off-by: Michael Niedermayer --- libavformat/udp.c | 69 ++++++++++++----------------------------------- 1 file changed, 17 insertions(+), 52 deletions(-) diff --git a/libavformat/udp.c b/libavformat/udp.c index 29391a1153..de5c33278e 100644 --- a/libavformat/udp.c +++ b/libavformat/udp.c @@ -29,13 +29,13 @@ #include "avformat.h" #include "avio_internal.h" #include "libavutil/parseutils.h" +#include "libavutil/fifo.h" #include #include "internal.h" #include "network.h" #include "os_support.h" #include "url.h" #include -#include #include #include @@ -56,15 +56,11 @@ typedef struct { int is_connected; /* Circular Buffer variables for use in UDP receive code */ - unsigned char *circular_buffer; - int circular_buffer_head; int circular_buffer_size; - int circular_buffer_tail; - int circular_buffer_available; + AVFifoBuffer *fifo; int circular_buffer_available_max; int circular_buffer_error; pthread_t circular_buffer_thread; - sem_t circular_buffer_semaphore; } UDPContext; #define UDP_TX_BUF_SIZE 32768 @@ -345,31 +341,28 @@ static void *circular_buffer_task( void *_URLContext) continue; /* How much do we have left to the end of the buffer */ - left = s->circular_buffer_size-s->circular_buffer_head; /* Whats the minimum we can read so that we dont comletely fill the buffer */ - sem_wait( &s->circular_buffer_semaphore); - left = FFMIN( left, s->circular_buffer_size-s->circular_buffer_available); - sem_post( &s->circular_buffer_semaphore ); + left = av_fifo_space(s->fifo); + left = FFMIN(left, s->fifo->end - s->fifo->wptr); + /* No Space left, error, what do we do now */ if( !left) { av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n"); s->circular_buffer_error = EIO; return NULL; } - len = recv(s->udp_fd, s->circular_buffer+s->circular_buffer_head, left, 0); + + len = recv(s->udp_fd, s->fifo->wptr, left, 0); if (len < 0) { if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) { s->circular_buffer_error = EIO; return NULL; } } - s->circular_buffer_head += len; - sem_wait( &s->circular_buffer_semaphore); - s->circular_buffer_available += len; - s->circular_buffer_available_max = FFMAX( s->circular_buffer_available_max, s->circular_buffer_available); - sem_post( &s->circular_buffer_semaphore ); - if( s->circular_buffer_head>=s->circular_buffer_size) - s->circular_buffer_head -= s->circular_buffer_size; + s->fifo->wptr += len; + if (s->fifo->wptr >= s->fifo->end) + s->fifo->wptr = s->fifo->buffer; + s->fifo->wndx += len; } return NULL; @@ -516,11 +509,7 @@ static int udp_open(URLContext *h, const char *uri, int flags) if (!is_output && s->circular_buffer_size) { /* start the task going */ - s->circular_buffer = av_malloc( s->circular_buffer_size); - if (sem_init( &s->circular_buffer_semaphore, PTHREAD_PROCESS_PRIVATE, 1 )) { - av_log(h, AV_LOG_ERROR, "sem_init failed\n"); - goto fail; - } + s->fifo = av_fifo_alloc(s->circular_buffer_size); if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) { av_log(h, AV_LOG_ERROR, "pthread_create failed\n"); goto fail; @@ -531,10 +520,7 @@ static int udp_open(URLContext *h, const char *uri, int flags) fail: if (udp_fd >= 0) closesocket(udp_fd); - if (s->circular_buffer) { - sem_destroy( &s->circular_buffer_semaphore); - av_free( s->circular_buffer); - } + av_fifo_free(s->fifo); av_free(s); return AVERROR(EIO); } @@ -551,30 +537,12 @@ static int udp_read(URLContext *h, uint8_t *buf, int size) if (s->circular_buffer_thread) { do { - sem_wait( &s->circular_buffer_semaphore ); - avail = s->circular_buffer_available; - sem_post( &s->circular_buffer_semaphore ); + avail = av_fifo_size(s->fifo); if (avail) { // >=size) { // Maximum amount available size = FFMIN( avail, size); - // Whats left till the end of the circular buffer - left = s->circular_buffer_size-s->circular_buffer_tail; - // How much do we need, all? - left = FFMIN( left, size); - // Get the first block - memcpy( buf, s->circular_buffer+s->circular_buffer_tail, left); - // Have we any more, this will be from the start of the buffer - if (size-left) - memcpy( buf+left, s->circular_buffer, size-left); - // Check for the tail wrapping around - s->circular_buffer_tail += size; - if( s->circular_buffer_tail>=s->circular_buffer_size) - s->circular_buffer_tail -= s->circular_buffer_size; - // Update the available amount - sem_wait( &s->circular_buffer_semaphore ); - s->circular_buffer_available -= size; - sem_post( &s->circular_buffer_semaphore ); + av_fifo_generic_read(s->fifo, buf, size, NULL); return size; } else { @@ -627,11 +595,8 @@ static int udp_close(URLContext *h) if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr); closesocket(s->udp_fd); - if (s->circular_buffer) { - sem_destroy( &s->circular_buffer_semaphore); - av_free( s->circular_buffer); - av_log( h, AV_LOG_INFO, "circular_buffer_info max:%d%%\r\n", (s->circular_buffer_available_max*100)/s->circular_buffer_size); - } + av_log( h, AV_LOG_INFO, "circular_buffer_info max:%d%%\r\n", (s->circular_buffer_available_max*100)/s->circular_buffer_size); + av_fifo_free(s->fifo); av_free(s); return 0; }