1
0
mirror of https://github.com/mpv-player/mpv synced 2025-02-20 06:46:55 +00:00

stream: redo buffer handling and allow arbitrary size for stream_peek()

struct stream used to include the stream buffer, including peek buffer,
inline in the struct. It could not be resized, which means the maximum
peek size was set in stone. This meant demux_lavf.c could peek only so
much data.

Change it to use a dynamic buffer. Because it's possible, keep the
inline buffer for default buffer sizes (which are basically always used
outside of file opening). It's unknown whether it really helps with
anything. Probably not.

This is also the fallback plan in case we need something like the old
stream cache in order to deal with mp4 + unseekable http: the code can
now be easily changed to use any buffer size.
This commit is contained in:
wm4 2018-10-02 19:57:01 +02:00
parent ca142be7e8
commit c91e659f88
4 changed files with 97 additions and 50 deletions

View File

@ -60,7 +60,7 @@
#endif
#define INITIAL_PROBE_SIZE STREAM_BUFFER_SIZE
#define PROBE_BUF_SIZE FFMIN(STREAM_MAX_BUFFER_SIZE, 2 * 1024 * 1024)
#define PROBE_BUF_SIZE (2 * 1024 * 1024)
// Should correspond to IO_BUFFER_SIZE in libavformat/aviobuf.c (not public)

View File

@ -301,7 +301,7 @@ int stream_dump(struct MPContext *mpctx, const char *source_filename)
MP_MSG(mpctx, MSGL_STATUS, "Dumping %lld/%lld...",
(long long int)pos, (long long int)size);
}
bstr data = stream_peek(stream, STREAM_MAX_BUFFER_SIZE);
bstr data = stream_peek(stream, 4096);
if (data.len == 0) {
ok &= stream->eof;
break;

View File

@ -42,8 +42,6 @@
#include "options/m_option.h"
#include "options/m_config.h"
#define TOTAL_BUFFER_SIZE STREAM_MAX_BUFFER_SIZE
extern const stream_info_t stream_info_cdda;
extern const stream_info_t stream_info_dvb;
extern const stream_info_t stream_info_smb;
@ -172,11 +170,6 @@ char *mp_url_escape(void *talloc_ctx, const char *url, const char *ok)
return rv;
}
static stream_t *new_stream(void)
{
return talloc_zero_size(NULL, sizeof(stream_t) + TOTAL_BUFFER_SIZE);
}
static const char *match_proto(const char *url, const char *proto)
{
int l = strlen(proto);
@ -189,6 +182,37 @@ static const char *match_proto(const char *url, const char *proto)
return NULL;
}
// Resize the current stream buffer, or do nothing if the size is adequate.
// Caller must ensure the used buffer is not less than the new buffer size.
// Calling this with 0 ensures it uses the default buffer size.
static void stream_resize_buffer(struct stream *s, int new)
{
new = MPMAX(new, STREAM_BUFFER_SIZE);
if (new == s->buffer_alloc)
return;
int buffer_used = s->buf_len - s->buf_pos;
assert(buffer_used <= new);
void *nbuf = s->buffer_inline;
if (new > STREAM_BUFFER_SIZE)
nbuf = ta_alloc_size(s, new);
if (nbuf) {
if (s->buffer)
memmove(nbuf, &s->buffer[s->buf_pos], buffer_used);
s->buf_pos = 0;
s->buf_len = buffer_used;
if (s->buffer != s->buffer_inline)
ta_free(s->buffer);
s->buffer = nbuf;
s->buffer_alloc = new;
}
}
static int open_internal(const stream_info_t *sinfo, const char *url, int flags,
struct mp_cancel *c, struct mpv_global *global,
struct stream **ret)
@ -208,7 +232,7 @@ static int open_internal(const stream_info_t *sinfo, const char *url, int flags,
if (!path)
return STREAM_NO_MATCH;
stream_t *s = new_stream();
stream_t *s = talloc_zero(NULL, stream_t);
s->log = mp_log_new(s, global->log, sinfo->name);
s->info = sinfo;
s->cancel = c;
@ -241,6 +265,9 @@ static int open_internal(const stream_info_t *sinfo, const char *url, int flags,
if (!s->read_chunk)
s->read_chunk = 4 * STREAM_BUFFER_SIZE;
stream_resize_buffer(s, 0);
MP_HANDLE_OOM(s->buffer);
assert(s->seekable == !!s->seek);
if (s->mime_type)
@ -316,8 +343,11 @@ stream_t *open_output_stream(const char *filename, struct mpv_global *global)
// Partial reads are possible, even if EOF is not reached.
static int stream_read_unbuffered(stream_t *s, void *buf, int len)
{
assert(len >= 0);
if (len <= 0)
return 0;
int res = 0;
s->buf_pos = s->buf_len = 0;
// we will retry even if we already reached EOF previously.
if (s->fill_buffer && !mp_cancel_test(s->cancel))
res = s->fill_buffer(s, buf, len);
@ -332,19 +362,54 @@ static int stream_read_unbuffered(stream_t *s, void *buf, int len)
return res;
}
static int stream_fill_buffer_by(stream_t *s, int64_t len)
// Ask for having "total" bytes ready to read in the stream buffer. This can do
// a partial read if requested, so it can actually read less.
// To read everything, you may have to call this in a loop.
// total: desired amount of bytes in buffer
// allow_short: if true, attempt at most once to read more if needed
// returns: actual bytes in buffer (can be smaller or larger than total)
static int stream_extend_buffer(struct stream *s, int total, bool allow_short)
{
len = MPMIN(len, s->read_chunk);
len = MPMAX(len, STREAM_BUFFER_SIZE);
len = stream_read_unbuffered(s, s->buffer, len);
s->buf_pos = 0;
s->buf_len = len;
return s->buf_len;
assert(total >= 0);
if (s->buf_len - s->buf_pos < total) {
// Move to front to guarantee we really can read up to max size.
s->buf_len = s->buf_len - s->buf_pos;
memmove(s->buffer, &s->buffer[s->buf_pos], s->buf_len);
s->buf_pos = 0;
// Read ahead by about as much as stream_fill_buffer() would, to avoid
// that many small stream_peek() calls will read the buffer at these
// quantities.
total = MPMAX(total, STREAM_BUFFER_SIZE);
// Allocate more if the buffer is too small. Also, if the buffer is
// larger than needed, resize it to smaller. This assumes stream_peek()
// calls are rare or done with small sizes.
stream_resize_buffer(s, total);
// Read less if allocation above failed.
total = MPMIN(total, s->buffer_alloc);
// Fill rest of the buffer. Can be partial.
while (total > s->buf_len) {
int read = stream_read_unbuffered(s, &s->buffer[s->buf_len],
total - s->buf_len);
s->buf_len += read;
if (allow_short || !read)
break;
}
if (s->buf_len)
s->eof = 0;
}
return s->buf_len - s->buf_pos;
}
int stream_fill_buffer(stream_t *s)
{
return stream_fill_buffer_by(s, STREAM_BUFFER_SIZE);
return stream_extend_buffer(s, STREAM_BUFFER_SIZE, true);
}
// Read between 1..buf_size bytes of data, return how much data has been read.
@ -355,6 +420,7 @@ int stream_read_partial(stream_t *s, char *buf, int buf_size)
assert(buf_size >= 0);
if (s->buf_pos == s->buf_len && buf_size > 0) {
s->buf_pos = s->buf_len = 0;
stream_resize_buffer(s, 0);
// Do a direct read
// Also, small reads will be more efficient with buffering & copying
if (buf_size >= STREAM_BUFFER_SIZE)
@ -388,34 +454,16 @@ int stream_read(stream_t *s, char *mem, int total)
// Read ahead at most len bytes without changing the read position. Return a
// pointer to the internal buffer, starting from the current read position.
// Can read ahead at most STREAM_MAX_BUFFER_SIZE bytes.
// Reading ahead may require memory allocation. If allocation fails, read ahead
// is silently limited to the last successful allocation.
// The returned buffer becomes invalid on the next stream call, and you must
// not write to it.
struct bstr stream_peek(stream_t *s, int len)
{
assert(len >= 0);
assert(len <= STREAM_MAX_BUFFER_SIZE);
if (s->buf_len - s->buf_pos < len) {
// Move to front to guarantee we really can read up to max size.
int buf_valid = s->buf_len - s->buf_pos;
memmove(s->buffer, &s->buffer[s->buf_pos], buf_valid);
// Fill rest of the buffer.
while (buf_valid < len) {
int chunk = MPMAX(len - buf_valid, STREAM_BUFFER_SIZE);
if (buf_valid + chunk > TOTAL_BUFFER_SIZE)
chunk = TOTAL_BUFFER_SIZE - buf_valid;
int read = stream_read_unbuffered(s, &s->buffer[buf_valid], chunk);
if (read == 0)
break; // EOF
buf_valid += read;
}
s->buf_pos = 0;
s->buf_len = buf_valid;
if (s->buf_len)
s->eof = 0;
}
return (bstr){.start = &s->buffer[s->buf_pos],
.len = FFMIN(len, s->buf_len - s->buf_pos)};
int avail = stream_extend_buffer(s, len, false);
return (bstr){.start = &s->buffer[s->buf_pos], .len = MPMIN(len, avail)};
}
int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
@ -442,7 +490,7 @@ static bool stream_skip_read(struct stream *s, int64_t len)
while (len > 0) {
unsigned int left = s->buf_len - s->buf_pos;
if (!left) {
if (!stream_fill_buffer_by(s, left))
if (!stream_fill_buffer(s))
return false;
continue;
}
@ -461,6 +509,7 @@ void stream_drop_buffers(stream_t *s)
s->pos = stream_tell(s);
s->buf_pos = s->buf_len = 0;
s->eof = 0;
stream_resize_buffer(s, 0);
}
// Seek function bypassing the local stream buffer.
@ -573,7 +622,7 @@ stream_t *open_memory_stream(void *data, int len)
struct mpv_global *dummy = talloc_zero(NULL, struct mpv_global);
dummy->log = mp_null_log;
stream_t *s = stream_open("memory://", dummy);
assert(s);
MP_HANDLE_OOM(s);
talloc_steal(s, dummy);
stream_control(s, STREAM_CTRL_SET_CONTENTS, &(bstr){data, len});
return s;

View File

@ -30,10 +30,6 @@
#define STREAM_BUFFER_SIZE 2048
// Max buffer for initial probe.
#define STREAM_MAX_BUFFER_SIZE (2 * 1024 * 1024)
// stream->mode
#define STREAM_READ 0
#define STREAM_WRITE 1
@ -123,8 +119,10 @@ typedef struct stream {
// added to this. The user can reset this as needed.
uint64_t total_unbuffered_read_bytes;
// Includes additional padding in case sizes get rounded up by sector size.
unsigned char buffer[];
uint8_t *buffer;
int buffer_alloc;
uint8_t buffer_inline[STREAM_BUFFER_SIZE];
} stream_t;
int stream_fill_buffer(stream_t *s);