absurd buffering test

This commit is contained in:
wm4 2020-02-08 01:02:08 +01:00
parent af021a2891
commit df82b5a63f
1 changed files with 95 additions and 3 deletions

View File

@ -26,6 +26,8 @@
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#include <pthread.h>
#ifndef __MINGW32__ #ifndef __MINGW32__
#include <poll.h> #include <poll.h>
#endif #endif
@ -35,6 +37,7 @@
#include "common/common.h" #include "common/common.h"
#include "common/msg.h" #include "common/msg.h"
#include "misc/thread_tools.h" #include "misc/thread_tools.h"
#include "misc/ring.h"
#include "stream.h" #include "stream.h"
#include "options/m_option.h" #include "options/m_option.h"
#include "options/path.h" #include "options/path.h"
@ -66,21 +69,63 @@ struct priv {
bool appending; bool appending;
int64_t orig_size; int64_t orig_size;
struct mp_cancel *cancel; struct mp_cancel *cancel;
struct mp_ring *ring;
pthread_t thread;
pthread_mutex_t lock;
pthread_cond_t wakeup;
bool read_ok, in_io, eof, terminate;
int64_t size;
}; };
// Total timeout = RETRY_TIMEOUT * MAX_RETRIES // Total timeout = RETRY_TIMEOUT * MAX_RETRIES
#define RETRY_TIMEOUT 0.2 #define RETRY_TIMEOUT 0.2
#define MAX_RETRIES 10 #define MAX_RETRIES 10
static void stop_io(stream_t *s)
{
struct priv *p = s->priv;
pthread_mutex_lock(&p->lock);
p->read_ok = false;
while (p->in_io)
pthread_cond_wait(&p->wakeup, &p->lock);
pthread_mutex_unlock(&p->lock);
}
static int64_t get_size(stream_t *s) static int64_t get_size(stream_t *s)
{ {
struct priv *p = s->priv; struct priv *p = s->priv;
off_t size = lseek(p->fd, 0, SEEK_END); if (!p->size) {
lseek(p->fd, s->pos, SEEK_SET); stop_io(s);
return size == (off_t)-1 ? -1 : size; off_t size = lseek(p->fd, 0, SEEK_END);
lseek(p->fd, s->pos, SEEK_SET);
p->size = size == (off_t)-1 ? -1 : size;
}
return p->size;
} }
static int fill_buffer(stream_t *s, void *buffer, int max_len) static int fill_buffer(stream_t *s, void *buffer, int max_len)
{
struct priv *p = s->priv;
int res = 0;
pthread_mutex_lock(&p->lock);
while (1) {
int r = mp_ring_buffered(p->ring);
if (r || p->eof) {
res = mp_ring_read(p->ring, buffer, max_len);
break;
}
p->read_ok = true;
pthread_cond_broadcast(&p->wakeup);
pthread_cond_wait(&p->wakeup, &p->lock);
}
pthread_mutex_unlock(&p->lock);
return res;
}
static int real_fill_buffer(stream_t *s, void *buffer, int max_len)
{ {
struct priv *p = s->priv; struct priv *p = s->priv;
@ -120,21 +165,61 @@ static int fill_buffer(stream_t *s, void *buffer, int max_len)
return 0; return 0;
} }
static void *read_thread(void *arg)
{
stream_t *s = arg;
struct priv *p = s->priv;
pthread_mutex_lock(&p->lock);
while (!p->terminate) {
int a = mp_ring_available(p->ring);
if (p->read_ok && a >= 64 * 1024) {
p->in_io = true;
pthread_mutex_unlock(&p->lock);
uint8_t buf[64 * 1024];
int r = real_fill_buffer(s, buf, sizeof(buf));
r = MPMAX(r, 0);
pthread_mutex_lock(&p->lock);
p->in_io = false;
mp_ring_write(p->ring, buf, r);
p->eof = !r;
pthread_cond_broadcast(&p->wakeup);
} else {
pthread_cond_wait(&p->wakeup, &p->lock);
}
}
pthread_mutex_unlock(&p->lock);
return NULL;
}
static int write_buffer(stream_t *s, void *buffer, int len) static int write_buffer(stream_t *s, void *buffer, int len)
{ {
struct priv *p = s->priv; struct priv *p = s->priv;
stop_io(s);
return write(p->fd, buffer, len); return write(p->fd, buffer, len);
} }
static int seek(stream_t *s, int64_t newpos) static int seek(stream_t *s, int64_t newpos)
{ {
struct priv *p = s->priv; struct priv *p = s->priv;
stop_io(s);
pthread_mutex_lock(&p->lock);
mp_ring_reset(p->ring);
p->eof = false;
pthread_mutex_unlock(&p->lock);
return lseek(p->fd, newpos, SEEK_SET) != (off_t)-1; return lseek(p->fd, newpos, SEEK_SET) != (off_t)-1;
} }
static void s_close(stream_t *s) static void s_close(stream_t *s)
{ {
struct priv *p = s->priv; struct priv *p = s->priv;
stop_io(s);
pthread_mutex_lock(&p->lock);
p->terminate = true;
pthread_cond_broadcast(&p->wakeup);
pthread_mutex_unlock(&p->lock);
pthread_join(p->thread, NULL);
if (p->close) if (p->close)
close(p->fd); close(p->fd);
} }
@ -340,6 +425,13 @@ static int open_f(stream_t *stream)
if (stream->cancel) if (stream->cancel)
mp_cancel_set_parent(p->cancel, stream->cancel); mp_cancel_set_parent(p->cancel, stream->cancel);
pthread_mutex_init(&p->lock, NULL);
pthread_cond_init(&p->wakeup, NULL);
p->ring = mp_ring_new(stream, 2 * 64 * 1024);
int r = pthread_create(&p->thread, NULL, read_thread, stream);
if (r)
return -1;
return STREAM_OK; return STREAM_OK;
} }