diff --git a/demux/demux_lavf.c b/demux/demux_lavf.c index edacf4c7eb..3b7083c2be 100644 --- a/demux/demux_lavf.c +++ b/demux/demux_lavf.c @@ -59,7 +59,7 @@ #endif #define INITIAL_PROBE_SIZE STREAM_BUFFER_SIZE -#define PROBE_BUF_SIZE FFMIN(STREAM_MAX_BUFFER_SIZE, 2 * 1024 * 1024) +#define PROBE_BUF_SIZE (2 * 1024 * 1024) // Should correspond to IO_BUFFER_SIZE in libavformat/aviobuf.c (not public) diff --git a/player/misc.c b/player/misc.c index ce353b9590..633ec6917d 100644 --- a/player/misc.c +++ b/player/misc.c @@ -301,7 +301,7 @@ int stream_dump(struct MPContext *mpctx, const char *source_filename) MP_MSG(mpctx, MSGL_STATUS, "Dumping %lld/%lld...", (long long int)pos, (long long int)size); } - bstr data = stream_peek(stream, STREAM_MAX_BUFFER_SIZE); + bstr data = stream_peek(stream, 4096); if (data.len == 0) { ok &= stream->eof; break; diff --git a/stream/stream.c b/stream/stream.c index 4ce7cd393b..fd56b8f705 100644 --- a/stream/stream.c +++ b/stream/stream.c @@ -42,8 +42,6 @@ #include "options/m_option.h" #include "options/m_config.h" -#define TOTAL_BUFFER_SIZE STREAM_MAX_BUFFER_SIZE - extern const stream_info_t stream_info_null; extern const stream_info_t stream_info_memory; extern const stream_info_t stream_info_mf; @@ -146,11 +144,6 @@ char *mp_url_escape(void *talloc_ctx, const char *url, const char *ok) return rv; } -static stream_t *new_stream(void) -{ - return talloc_zero_size(NULL, sizeof(stream_t) + TOTAL_BUFFER_SIZE); -} - static const char *match_proto(const char *url, const char *proto) { int l = strlen(proto); @@ -163,6 +156,37 @@ static const char *match_proto(const char *url, const char *proto) return NULL; } +// Resize the current stream buffer, or do nothing if the size is adequate. +// Caller must ensure the used buffer is not less than the new buffer size. +// Calling this with 0 ensures it uses the default buffer size. +static void stream_resize_buffer(struct stream *s, int new) +{ + new = MPMAX(new, STREAM_BUFFER_SIZE); + + if (new == s->buffer_alloc) + return; + + int buffer_used = s->buf_len - s->buf_pos; + assert(buffer_used <= new); + + void *nbuf = s->buffer_inline; + if (new > STREAM_BUFFER_SIZE) + nbuf = ta_alloc_size(s, new); + + if (nbuf) { + if (s->buffer) + memmove(nbuf, &s->buffer[s->buf_pos], buffer_used); + s->buf_pos = 0; + s->buf_len = buffer_used; + + if (s->buffer != s->buffer_inline) + ta_free(s->buffer); + + s->buffer = nbuf; + s->buffer_alloc = new; + } +} + static int open_internal(const stream_info_t *sinfo, const char *url, int flags, struct mp_cancel *c, struct mpv_global *global, struct stream **ret) @@ -182,7 +206,7 @@ static int open_internal(const stream_info_t *sinfo, const char *url, int flags, if (!path) return STREAM_NO_MATCH; - stream_t *s = new_stream(); + stream_t *s = talloc_zero(NULL, stream_t); s->log = mp_log_new(s, global->log, sinfo->name); s->info = sinfo; s->cancel = c; @@ -215,6 +239,9 @@ static int open_internal(const stream_info_t *sinfo, const char *url, int flags, if (!s->read_chunk) s->read_chunk = 4 * STREAM_BUFFER_SIZE; + stream_resize_buffer(s, 0); + MP_HANDLE_OOM(s->buffer); + assert(s->seekable == !!s->seek); if (s->mime_type) @@ -290,8 +317,11 @@ stream_t *open_output_stream(const char *filename, struct mpv_global *global) // Partial reads are possible, even if EOF is not reached. static int stream_read_unbuffered(stream_t *s, void *buf, int len) { + assert(len >= 0); + if (len <= 0) + return 0; + int res = 0; - s->buf_pos = s->buf_len = 0; // we will retry even if we already reached EOF previously. if (s->fill_buffer && !mp_cancel_test(s->cancel)) res = s->fill_buffer(s, buf, len); @@ -306,19 +336,54 @@ static int stream_read_unbuffered(stream_t *s, void *buf, int len) return res; } -static int stream_fill_buffer_by(stream_t *s, int64_t len) +// Ask for having "total" bytes ready to read in the stream buffer. This can do +// a partial read if requested, so it can actually read less. +// To read everything, you may have to call this in a loop. +// total: desired amount of bytes in buffer +// allow_short: if true, attempt at most once to read more if needed +// returns: actual bytes in buffer (can be smaller or larger than total) +static int stream_extend_buffer(struct stream *s, int total, bool allow_short) { - len = MPMIN(len, s->read_chunk); - len = MPMAX(len, STREAM_BUFFER_SIZE); - len = stream_read_unbuffered(s, s->buffer, len); - s->buf_pos = 0; - s->buf_len = len; - return s->buf_len; + assert(total >= 0); + + if (s->buf_len - s->buf_pos < total) { + // Move to front to guarantee we really can read up to max size. + s->buf_len = s->buf_len - s->buf_pos; + memmove(s->buffer, &s->buffer[s->buf_pos], s->buf_len); + s->buf_pos = 0; + + // Read ahead by about as much as stream_fill_buffer() would, to avoid + // that many small stream_peek() calls will read the buffer at these + // quantities. + total = MPMAX(total, STREAM_BUFFER_SIZE); + + // Allocate more if the buffer is too small. Also, if the buffer is + // larger than needed, resize it to smaller. This assumes stream_peek() + // calls are rare or done with small sizes. + stream_resize_buffer(s, total); + + // Read less if allocation above failed. + total = MPMIN(total, s->buffer_alloc); + + // Fill rest of the buffer. Can be partial. + while (total > s->buf_len) { + int read = stream_read_unbuffered(s, &s->buffer[s->buf_len], + total - s->buf_len); + s->buf_len += read; + if (allow_short || !read) + break; + } + + if (s->buf_len) + s->eof = 0; + } + + return s->buf_len - s->buf_pos; } int stream_fill_buffer(stream_t *s) { - return stream_fill_buffer_by(s, STREAM_BUFFER_SIZE); + return stream_extend_buffer(s, STREAM_BUFFER_SIZE, true); } // Read between 1..buf_size bytes of data, return how much data has been read. @@ -329,6 +394,7 @@ int stream_read_partial(stream_t *s, char *buf, int buf_size) assert(buf_size >= 0); if (s->buf_pos == s->buf_len && buf_size > 0) { s->buf_pos = s->buf_len = 0; + stream_resize_buffer(s, 0); // Do a direct read // Also, small reads will be more efficient with buffering & copying if (buf_size >= STREAM_BUFFER_SIZE) @@ -362,33 +428,16 @@ int stream_read(stream_t *s, char *mem, int total) // Read ahead at most len bytes without changing the read position. Return a // pointer to the internal buffer, starting from the current read position. -// Can read ahead at most STREAM_MAX_BUFFER_SIZE bytes. +// Reading ahead may require memory allocation. If allocation fails, read ahead +// is silently limited to the last successful allocation. // The returned buffer becomes invalid on the next stream call, and you must // not write to it. struct bstr stream_peek(stream_t *s, int len) { assert(len >= 0); - assert(len <= STREAM_MAX_BUFFER_SIZE); - if (s->buf_len - s->buf_pos < len) { - // Move to front to guarantee we really can read up to max size. - int buf_valid = s->buf_len - s->buf_pos; - memmove(s->buffer, &s->buffer[s->buf_pos], buf_valid); - // Fill rest of the buffer. - while (buf_valid < len) { - int chunk = MPMAX(len - buf_valid, STREAM_BUFFER_SIZE); - assert(buf_valid + chunk <= TOTAL_BUFFER_SIZE); - int read = stream_read_unbuffered(s, &s->buffer[buf_valid], chunk); - if (read == 0) - break; // EOF - buf_valid += read; - } - s->buf_pos = 0; - s->buf_len = buf_valid; - if (s->buf_len) - s->eof = 0; - } - return (bstr){.start = &s->buffer[s->buf_pos], - .len = FFMIN(len, s->buf_len - s->buf_pos)}; + + int avail = stream_extend_buffer(s, len, false); + return (bstr){.start = &s->buffer[s->buf_pos], .len = MPMIN(len, avail)}; } int stream_write_buffer(stream_t *s, unsigned char *buf, int len) @@ -412,7 +461,7 @@ static bool stream_skip_read(struct stream *s, int64_t len) while (len > 0) { unsigned int left = s->buf_len - s->buf_pos; if (!left) { - if (!stream_fill_buffer_by(s, left)) + if (!stream_fill_buffer(s)) return false; continue; } @@ -431,6 +480,7 @@ void stream_drop_buffers(stream_t *s) s->pos = stream_tell(s); s->buf_pos = s->buf_len = 0; s->eof = 0; + stream_resize_buffer(s, 0); } // Seek function bypassing the local stream buffer. @@ -541,7 +591,7 @@ stream_t *open_memory_stream(void *data, int len) struct mpv_global *dummy = talloc_zero(NULL, struct mpv_global); dummy->log = mp_null_log; stream_t *s = stream_open("memory://", dummy); - assert(s); + MP_HANDLE_OOM(s); talloc_steal(s, dummy); stream_control(s, STREAM_CTRL_SET_CONTENTS, &(bstr){data, len}); return s; diff --git a/stream/stream.h b/stream/stream.h index 68df46b31c..e3be20d7de 100644 --- a/stream/stream.h +++ b/stream/stream.h @@ -30,10 +30,6 @@ #define STREAM_BUFFER_SIZE 2048 -// Max buffer for initial probe. -#define STREAM_MAX_BUFFER_SIZE (2 * 1024 * 1024) - - // stream->mode #define STREAM_READ 0 #define STREAM_WRITE 1 @@ -119,8 +115,10 @@ typedef struct stream { // added to this. The user can reset this as needed. uint64_t total_unbuffered_read_bytes; - // Includes additional padding in case sizes get rounded up by sector size. - unsigned char buffer[]; + uint8_t *buffer; + + int buffer_alloc; + uint8_t buffer_inline[STREAM_BUFFER_SIZE]; } stream_t; int stream_fill_buffer(stream_t *s);