mirror of
https://git.ffmpeg.org/ffmpeg.git
synced 2024-12-17 21:14:47 +00:00
avformat/udp: redesign threaded udp tx code
This fixes partially completed send() Avoids holding the mutex during send() fixes race conditions in error handling removes copied non thread specific blocking code Fixes deadlocks on closure Fixes data loss on closure Signed-off-by: Michael Niedermayer <michael@niedermayer.cc>
This commit is contained in:
parent
413c842a69
commit
9b7a8bddac
@ -29,6 +29,7 @@
|
||||
|
||||
#include "avformat.h"
|
||||
#include "avio_internal.h"
|
||||
#include "libavutil/avassert.h"
|
||||
#include "libavutil/parseutils.h"
|
||||
#include "libavutil/fifo.h"
|
||||
#include "libavutil/intreadwrite.h"
|
||||
@ -93,6 +94,7 @@ typedef struct UDPContext {
|
||||
AVFifoBuffer *fifo;
|
||||
int circular_buffer_error;
|
||||
int64_t packet_gap; /* delay between transmitted packets */
|
||||
int close_req;
|
||||
#if HAVE_PTHREAD_CANCEL
|
||||
pthread_t circular_buffer_thread;
|
||||
pthread_mutex_t mutex;
|
||||
@ -545,30 +547,6 @@ end:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void do_udp_write(void *arg, void *buf, int size) {
|
||||
URLContext *h = arg;
|
||||
UDPContext *s = h->priv_data;
|
||||
|
||||
int ret;
|
||||
|
||||
if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
|
||||
ret = ff_network_wait_fd(s->udp_fd, 1);
|
||||
if (ret < 0) {
|
||||
s->circular_buffer_error = ret;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (!s->is_connected) {
|
||||
ret = sendto (s->udp_fd, buf, size, 0,
|
||||
(struct sockaddr *) &s->dest_addr,
|
||||
s->dest_addr_len);
|
||||
} else
|
||||
ret = send(s->udp_fd, buf, size, 0);
|
||||
|
||||
s->circular_buffer_error=ret;
|
||||
}
|
||||
|
||||
static void *circular_buffer_task_tx( void *_URLContext)
|
||||
{
|
||||
URLContext *h = _URLContext;
|
||||
@ -576,41 +554,67 @@ static void *circular_buffer_task_tx( void *_URLContext)
|
||||
int old_cancelstate;
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
|
||||
pthread_mutex_lock(&s->mutex);
|
||||
|
||||
if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
|
||||
av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
|
||||
s->circular_buffer_error = AVERROR(EIO);
|
||||
goto end;
|
||||
}
|
||||
|
||||
for(;;) {
|
||||
int len;
|
||||
const uint8_t *p;
|
||||
uint8_t tmp[4];
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
|
||||
|
||||
av_usleep(s->packet_gap);
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
|
||||
|
||||
pthread_mutex_lock(&s->mutex);
|
||||
|
||||
len=av_fifo_size(s->fifo);
|
||||
|
||||
while (len<4) {
|
||||
if (s->close_req)
|
||||
goto end;
|
||||
if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
|
||||
goto end;
|
||||
}
|
||||
len=av_fifo_size(s->fifo);
|
||||
}
|
||||
|
||||
av_fifo_generic_peek(s->fifo, tmp, 4, NULL);
|
||||
av_fifo_generic_read(s->fifo, tmp, 4, NULL);
|
||||
len=AV_RL32(tmp);
|
||||
|
||||
if (len>0 && av_fifo_size(s->fifo)>=len+4) {
|
||||
av_fifo_drain(s->fifo, 4); /* skip packet length */
|
||||
av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */
|
||||
if (s->circular_buffer_error == len) {
|
||||
/* all ok - reset error */
|
||||
s->circular_buffer_error=0;
|
||||
av_assert0(len >= 0);
|
||||
av_assert0(len <= sizeof(s->tmp));
|
||||
|
||||
av_fifo_generic_read(s->fifo, s->tmp, len, NULL);
|
||||
|
||||
pthread_mutex_unlock(&s->mutex);
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
|
||||
|
||||
p = s->tmp;
|
||||
while (len) {
|
||||
int ret;
|
||||
av_assert0(len > 0);
|
||||
if (!s->is_connected) {
|
||||
ret = sendto (s->udp_fd, p, len, 0,
|
||||
(struct sockaddr *) &s->dest_addr,
|
||||
s->dest_addr_len);
|
||||
} else
|
||||
ret = send(s->udp_fd, p, len, 0);
|
||||
if (ret >= 0) {
|
||||
len -= ret;
|
||||
p += ret;
|
||||
} else {
|
||||
ret = ff_neterrno();
|
||||
if (ret != AVERROR(EAGAIN) && ret != AVERROR(EINTR)) {
|
||||
s->circular_buffer_error = ret;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&s->mutex);
|
||||
av_usleep(s->packet_gap);
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
|
||||
pthread_mutex_lock(&s->mutex);
|
||||
}
|
||||
|
||||
end:
|
||||
@ -1055,7 +1059,6 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
|
||||
*/
|
||||
if (s->circular_buffer_error<0) {
|
||||
int err=s->circular_buffer_error;
|
||||
s->circular_buffer_error=0;
|
||||
pthread_mutex_unlock(&s->mutex);
|
||||
return err;
|
||||
}
|
||||
@ -1093,13 +1096,26 @@ static int udp_close(URLContext *h)
|
||||
{
|
||||
UDPContext *s = h->priv_data;
|
||||
|
||||
#if HAVE_PTHREAD_CANCEL
|
||||
// Request close once writing is finished
|
||||
if (s->thread_started && !(h->flags & AVIO_FLAG_READ)) {
|
||||
int ret;
|
||||
pthread_mutex_lock(&s->mutex);
|
||||
s->close_req = 1;
|
||||
pthread_cond_signal(&s->cond);
|
||||
pthread_mutex_unlock(&s->mutex);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
|
||||
udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
|
||||
closesocket(s->udp_fd);
|
||||
#if HAVE_PTHREAD_CANCEL
|
||||
if (s->thread_started) {
|
||||
int ret;
|
||||
pthread_cancel(s->circular_buffer_thread);
|
||||
// Cancel only read, as write has been signaled as success to the user
|
||||
if (h->flags & AVIO_FLAG_READ)
|
||||
pthread_cancel(s->circular_buffer_thread);
|
||||
ret = pthread_join(s->circular_buffer_thread, NULL);
|
||||
if (ret != 0)
|
||||
av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
|
||||
|
Loading…
Reference in New Issue
Block a user