1
0
mirror of https://github.com/mpv-player/mpv synced 2025-01-03 05:22:23 +00:00

cache: make the stream cache a proper stream that wraps other streams

Before this commit, the cache was franken-hacked on top of the stream
API. You had to use special functions (like cache_stream_fill_buffer()
instead of stream_fill_buffer()), which would access the stream in a
cached manner.

The whole idea about the previous design was that the cache runs in a
thread or in a forked process, while the cache awa functions made sure
the stream instance looked consistent to the user. If you used the
normal functions instead of the special ones while the cache was
running, you were out of luck.

Make it a bit more reasonable by turning the cache into a stream on its
own. This makes it behave exactly like a normal stream. The stream
callbacks call into the original (uncached) stream to do work. No
special cache functions or redirections are needed. The only different
thing about cache streams is that they are created by special functions,
instead of being part of the auto_open_streams[] array.

To make things simpler, remove the threading implementation, which was
messed into the code. The threading code could perhaps be kept, but I
don't really want to have to worry about this special case. A proper
threaded implementation will be added later.

Remove the cache enabling code from stream_radio.c. Since enabling the
cache involves replacing the old stream with a new one, the code as-is
can't be kept. It would be easily possible to enable the cache by
requesting a cache size (which is also much simpler). But nobody uses
stream_radio.c and I can't even test this thing, and the cache is
probably not really important for it either.
This commit is contained in:
wm4 2013-05-24 18:49:09 +02:00
parent 27b633671f
commit 7c4202b863
8 changed files with 234 additions and 352 deletions

12
configure vendored
View File

@ -495,7 +495,6 @@ _priority=no
def_dos_paths="#define HAVE_DOS_PATHS 0" def_dos_paths="#define HAVE_DOS_PATHS 0"
def_stream_cache="#define CONFIG_STREAM_CACHE 1" def_stream_cache="#define CONFIG_STREAM_CACHE 1"
def_priority="#undef CONFIG_PRIORITY" def_priority="#undef CONFIG_PRIORITY"
def_pthread_cache="#undef PTHREAD_CACHE"
need_shmem=yes need_shmem=yes
_build_man=auto _build_man=auto
for ac_option do for ac_option do
@ -1453,13 +1452,9 @@ else
fi fi
echores "$_pthreads" echores "$_pthreads"
if cygwin ; then if cygwin || mingw32 ; then
if test "$_pthreads" = yes ; then _stream_cache=no
def_pthread_cache="#define PTHREAD_CACHE 1" def_stream_cache="#undef CONFIG_STREAM_CACHE"
else
_stream_cache=no
def_stream_cache="#undef CONFIG_STREAM_CACHE"
fi
fi fi
echocheck "rpath" echocheck "rpath"
@ -3239,7 +3234,6 @@ $def_priority
/* configurable options */ /* configurable options */
$def_stream_cache $def_stream_cache
$def_pthread_cache
/* CPU stuff */ /* CPU stuff */

View File

@ -108,7 +108,6 @@
#ifdef CONFIG_DVBIN #ifdef CONFIG_DVBIN
#include "stream/dvbin.h" #include "stream/dvbin.h"
#endif #endif
#include "stream/cache2.h"
//**************************************************************************// //**************************************************************************//
// Playtree // Playtree
@ -934,13 +933,13 @@ static int find_new_tid(struct MPContext *mpctx, enum stream_type t)
// Map stream number (as used by libdvdread) to MPEG IDs (as used by demuxer). // Map stream number (as used by libdvdread) to MPEG IDs (as used by demuxer).
static int map_id_from_demuxer(struct demuxer *d, enum stream_type type, int id) static int map_id_from_demuxer(struct demuxer *d, enum stream_type type, int id)
{ {
if (d->stream->type == STREAMTYPE_DVD && type == STREAM_SUB) if (d->stream->uncached_type == STREAMTYPE_DVD && type == STREAM_SUB)
id = id & 0x1F; id = id & 0x1F;
return id; return id;
} }
static int map_id_to_demuxer(struct demuxer *d, enum stream_type type, int id) static int map_id_to_demuxer(struct demuxer *d, enum stream_type type, int id)
{ {
if (d->stream->type == STREAMTYPE_DVD && type == STREAM_SUB) if (d->stream->uncached_type == STREAMTYPE_DVD && type == STREAM_SUB)
id = id | 0x20; id = id | 0x20;
return id; return id;
} }
@ -3918,7 +3917,7 @@ static struct track *open_external_file(struct MPContext *mpctx, char *filename,
struct stream *stream = open_stream(filename, &mpctx->opts, &format); struct stream *stream = open_stream(filename, &mpctx->opts, &format);
if (!stream) if (!stream)
goto err_out; goto err_out;
stream_enable_cache_percent(stream, stream_cache, stream_enable_cache_percent(&stream, stream_cache,
opts->stream_cache_min_percent, opts->stream_cache_min_percent,
opts->stream_cache_seek_min_percent); opts->stream_cache_seek_min_percent);
// deal with broken demuxers: preselect streams // deal with broken demuxers: preselect streams
@ -4203,10 +4202,7 @@ static void play_current_file(struct MPContext *mpctx)
} }
// CACHE2: initial prefill: 20% later: 5% (should be set by -cacheopts) // CACHE2: initial prefill: 20% later: 5% (should be set by -cacheopts)
#ifdef CONFIG_DVBIN int res = stream_enable_cache_percent(&mpctx->stream,
goto_enable_cache: ;
#endif
int res = stream_enable_cache_percent(mpctx->stream,
opts->stream_cache_size, opts->stream_cache_size,
opts->stream_cache_min_percent, opts->stream_cache_min_percent,
opts->stream_cache_seek_min_percent); opts->stream_cache_seek_min_percent);
@ -4216,6 +4212,10 @@ goto_enable_cache: ;
stream_set_capture_file(mpctx->stream, opts->stream_capture); stream_set_capture_file(mpctx->stream, opts->stream_capture);
#ifdef CONFIG_DVBIN
goto_reopen_demuxer: ;
#endif
//============ Open DEMUXERS --- DETECT file type ======================= //============ Open DEMUXERS --- DETECT file type =======================
mpctx->audio_delay = opts->audio_delay; mpctx->audio_delay = opts->audio_delay;
@ -4402,9 +4402,8 @@ goto_enable_cache: ;
if (mpctx->dvbin_reopen) { if (mpctx->dvbin_reopen) {
mpctx->stop_play = 0; mpctx->stop_play = 0;
uninit_player(mpctx, INITIALIZED_ALL - (INITIALIZED_STREAM | INITIALIZED_GETCH2 | (opts->fixed_vo ? INITIALIZED_VO : 0))); uninit_player(mpctx, INITIALIZED_ALL - (INITIALIZED_STREAM | INITIALIZED_GETCH2 | (opts->fixed_vo ? INITIALIZED_VO : 0)));
cache_uninit(mpctx->stream);
mpctx->dvbin_reopen = 0; mpctx->dvbin_reopen = 0;
goto goto_enable_cache; goto goto_reopen_demuxer;
} }
#endif #endif

View File

@ -141,7 +141,7 @@ static int enable_cache(struct MPContext *mpctx, struct stream **stream,
return -1; return -1;
} }
stream_enable_cache_percent(*stream, stream_enable_cache_percent(stream,
opts->stream_cache_size, opts->stream_cache_size,
opts->stream_cache_min_percent, opts->stream_cache_min_percent,
opts->stream_cache_seek_min_percent); opts->stream_cache_seek_min_percent);
@ -230,7 +230,7 @@ static int find_ordered_chapter_sources(struct MPContext *mpctx,
char *main_filename = mpctx->demuxer->filename; char *main_filename = mpctx->demuxer->filename;
mp_msg(MSGT_CPLAYER, MSGL_INFO, "This file references data from " mp_msg(MSGT_CPLAYER, MSGL_INFO, "This file references data from "
"other sources.\n"); "other sources.\n");
if (mpctx->demuxer->stream->type != STREAMTYPE_FILE) { if (mpctx->demuxer->stream->uncached_type != STREAMTYPE_FILE) {
mp_msg(MSGT_CPLAYER, MSGL_WARN, "Playback source is not a " mp_msg(MSGT_CPLAYER, MSGL_WARN, "Playback source is not a "
"normal disk file. Will not search for related files.\n"); "normal disk file. Will not search for related files.\n");
} else { } else {

View File

@ -34,8 +34,10 @@
#include <string.h> #include <string.h>
#include <signal.h> #include <signal.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#include <assert.h>
#include <libavutil/common.h> #include <libavutil/common.h>
@ -43,27 +45,17 @@
#include "osdep/shmem.h" #include "osdep/shmem.h"
#include "osdep/timer.h" #include "osdep/timer.h"
#if defined(__MINGW32__)
#include <windows.h>
static void ThreadProc(void *s);
#elif defined(PTHREAD_CACHE)
#include <pthread.h>
static void *ThreadProc(void *s);
#else
#include <sys/wait.h>
#define FORKED_CACHE 1
#endif
#ifndef FORKED_CACHE
#define FORKED_CACHE 0
#endif
#include "core/mp_msg.h" #include "core/mp_msg.h"
#include "stream.h" #include "stream.h"
#include "cache2.h"
#include "core/mp_common.h" #include "core/mp_common.h"
// Note: (cache_vars_t*)(cache->priv)->cache == cache
typedef struct { typedef struct {
stream_t *cache; // wrapper stream, used by demuxer etc.
stream_t *stream; // "real" stream, used to read from the source media
unsigned int cache_pid;
// constats: // constats:
unsigned char *buffer; // base pointer of the allocated buffer memory unsigned char *buffer; // base pointer of the allocated buffer memory
int64_t buffer_size; // size of the allocated buffer memory int64_t buffer_size; // size of the allocated buffer memory
@ -71,9 +63,7 @@ typedef struct {
int64_t back_size; // we should keep back_size amount of old bytes for backward seek int64_t back_size; // we should keep back_size amount of old bytes for backward seek
int64_t fill_limit; // we should fill buffer only if space>=fill_limit int64_t fill_limit; // we should fill buffer only if space>=fill_limit
int64_t seek_limit; // keep filling cache if distance is less that seek limit int64_t seek_limit; // keep filling cache if distance is less that seek limit
#if FORKED_CACHE
pid_t ppid; // parent PID to detect killed parent pid_t ppid; // parent PID to detect killed parent
#endif
// filler's pointers: // filler's pointers:
int eof; int eof;
int64_t min_filepos; // buffer contain only a part of the file, from min-max pos int64_t min_filepos; // buffer contain only a part of the file, from min-max pos
@ -85,7 +75,6 @@ typedef struct {
// int seek_lock; // 1 if we will seek/reset buffer, 2 if we are ready for cmd // int seek_lock; // 1 if we will seek/reset buffer, 2 if we are ready for cmd
// int fifo_flag; // 1 if we should use FIFO to notice cache about buffer reads. // int fifo_flag; // 1 if we should use FIFO to notice cache about buffer reads.
// callback // callback
stream_t *stream;
volatile int control; volatile int control;
volatile uint64_t control_uint_arg; volatile uint64_t control_uint_arg;
volatile double control_double_arg; volatile double control_double_arg;
@ -98,19 +87,20 @@ typedef struct {
volatile int idle; volatile int idle;
} cache_vars_t; } cache_vars_t;
static void cache_wakeup(stream_t *s) static void cache_wakeup(stream_t *stream)
{ {
#if FORKED_CACHE cache_vars_t *s = stream->priv;
// signal process to wake up immediately // signal process to wake up immediately
kill(s->cache_pid, SIGUSR1); kill(s->cache_pid, SIGUSR1);
#endif
} }
// Runs in the forked process
static void cache_flush(cache_vars_t *s) static void cache_flush(cache_vars_t *s)
{ {
s->offset = s->min_filepos = s->max_filepos = s->read_filepos; // drop cache content :( s->offset = s->min_filepos = s->max_filepos = s->read_filepos; // drop cache content :(
} }
// Runs in the main process
static int cache_read(cache_vars_t *s, unsigned char *buf, int size) static int cache_read(cache_vars_t *s, unsigned char *buf, int size)
{ {
int total = 0; int total = 0;
@ -179,6 +169,7 @@ static int cache_read(cache_vars_t *s, unsigned char *buf, int size)
return total; return total;
} }
// Runs in the forked process
static int cache_fill(cache_vars_t *s) static int cache_fill(cache_vars_t *s)
{ {
int64_t back, back2, newb, space, len, pos; int64_t back, back2, newb, space, len, pos;
@ -198,7 +189,7 @@ static int cache_fill(cache_vars_t *s)
cache_flush(s); cache_flush(s);
if (s->stream->eof) if (s->stream->eof)
stream_reset(s->stream); stream_reset(s->stream);
stream_seek_internal(s->stream, read); stream_seek_unbuffered(s->stream, read);
mp_msg(MSGT_CACHE, MSGL_DBG2, "Seek done. new pos: 0x%" PRIX64 " \n", mp_msg(MSGT_CACHE, MSGL_DBG2, "Seek done. new pos: 0x%" PRIX64 " \n",
(int64_t)stream_tell(s->stream)); (int64_t)stream_tell(s->stream));
} }
@ -259,12 +250,12 @@ static int cache_fill(cache_vars_t *s)
if (wraparound_copy) { if (wraparound_copy) {
int to_copy; int to_copy;
len = stream_read_internal(s->stream, s->stream->buffer, space); len = stream_read_unbuffered(s->stream, s->stream->buffer, space);
to_copy = FFMIN(len, s->buffer_size - pos); to_copy = FFMIN(len, s->buffer_size - pos);
memcpy(s->buffer + pos, s->stream->buffer, to_copy); memcpy(s->buffer + pos, s->stream->buffer, to_copy);
memcpy(s->buffer, s->stream->buffer + to_copy, len - to_copy); memcpy(s->buffer, s->stream->buffer + to_copy, len - to_copy);
} else } else
len = stream_read_internal(s->stream, &s->buffer[pos], space); len = stream_read_unbuffered(s->stream, &s->buffer[pos], space);
s->eof = !len; s->eof = !len;
s->max_filepos += len; s->max_filepos += len;
@ -275,6 +266,7 @@ static int cache_fill(cache_vars_t *s)
} }
// Runs in the forked process
static int cache_execute_control(cache_vars_t *s) static int cache_execute_control(cache_vars_t *s)
{ {
double double_res; double double_res;
@ -309,14 +301,12 @@ static int cache_execute_control(cache_vars_t *s)
s->stream_start_time = pos; s->stream_start_time = pos;
else else
s->stream_start_time = MP_NOPTS_VALUE; s->stream_start_time = MP_NOPTS_VALUE;
#if FORKED_CACHE
// if parent PID changed, main process was killed -> exit // if parent PID changed, main process was killed -> exit
if (s->ppid != getppid()) { if (s->ppid != getppid()) {
mp_msg(MSGT_CACHE, MSGL_WARN, mp_msg(MSGT_CACHE, MSGL_WARN,
"Parent process disappeared, exiting cache process.\n"); "Parent process disappeared, exiting cache process.\n");
return 0; return 0;
} }
#endif
last = mp_time_sec(); last = mp_time_sec();
} }
if (s->control == -1) if (s->control == -1)
@ -379,20 +369,12 @@ static int cache_execute_control(cache_vars_t *s)
static void *shared_alloc(int64_t size) static void *shared_alloc(int64_t size)
{ {
#if FORKED_CACHE
return shmem_alloc(size); return shmem_alloc(size);
#else
return malloc(size);
#endif
} }
static void shared_free(void *ptr, int64_t size) static void shared_free(void *ptr, int64_t size)
{ {
#if FORKED_CACHE
shmem_free(ptr, size); shmem_free(ptr, size);
#else
free(ptr);
#endif
} }
static cache_vars_t *cache_init(int64_t size, int sector) static cache_vars_t *cache_init(int64_t size, int sector)
@ -418,34 +400,27 @@ static cache_vars_t *cache_init(int64_t size, int sector)
s->fill_limit = 8 * sector; s->fill_limit = 8 * sector;
s->back_size = s->buffer_size / 2; s->back_size = s->buffer_size / 2;
#if FORKED_CACHE
s->ppid = getpid(); s->ppid = getpid();
#endif
return s; return s;
} }
void cache_uninit(stream_t *s) static void cache_uninit(stream_t *s)
{ {
cache_vars_t *c = s->cache_data; cache_vars_t *c = s->priv;
if (s->cache_pid) { if (c->cache_pid) {
#if !FORKED_CACHE kill(c->cache_pid, SIGKILL);
cache_do_control(s, -2, NULL); waitpid(c->cache_pid, NULL, 0);
#else c->cache_pid = 0;
kill(s->cache_pid, SIGKILL);
waitpid(s->cache_pid, NULL, 0);
#endif
s->cache_pid = 0;
} }
if (!c) if (!c)
return; return;
shared_free(c->buffer, c->buffer_size); shared_free(c->buffer, c->buffer_size);
c->buffer = NULL; c->buffer = NULL;
c->stream = NULL; c->stream = NULL;
shared_free(s->cache_data, sizeof(cache_vars_t)); shared_free(c, sizeof(cache_vars_t));
s->cache_data = NULL; s->priv = NULL;
} }
#if FORKED_CACHE
static void exit_sighandler(int x) static void exit_sighandler(int x)
{ {
// close stream // close stream
@ -455,7 +430,6 @@ static void exit_sighandler(int x)
static void dummy_sighandler(int x) static void dummy_sighandler(int x)
{ {
} }
#endif
/** /**
* Main loop of the cache process or thread. * Main loop of the cache process or thread.
@ -463,31 +437,25 @@ static void dummy_sighandler(int x)
static void cache_mainloop(cache_vars_t *s) static void cache_mainloop(cache_vars_t *s)
{ {
int sleep_count = 0; int sleep_count = 0;
#if FORKED_CACHE
struct sigaction sa = { struct sigaction sa = {
.sa_handler = SIG_IGN .sa_handler = SIG_IGN
}; };
sigaction(SIGUSR1, &sa, NULL); sigaction(SIGUSR1, &sa, NULL);
#endif
do { do {
if (!cache_fill(s)) { if (!cache_fill(s)) {
s->idle = 1; s->idle = 1;
#if FORKED_CACHE
// Let signal wake us up, we cannot leave this // Let signal wake us up, we cannot leave this
// enabled since we do not handle EINTR in most places. // enabled since we do not handle EINTR in most places.
// This might need extra code to work on BSD. // This might need extra code to work on BSD.
sa.sa_handler = dummy_sighandler; sa.sa_handler = dummy_sighandler;
sigaction(SIGUSR1, &sa, NULL); sigaction(SIGUSR1, &sa, NULL);
#endif
if (sleep_count < INITIAL_FILL_USLEEP_COUNT) { if (sleep_count < INITIAL_FILL_USLEEP_COUNT) {
sleep_count++; sleep_count++;
mp_sleep_us(INITIAL_FILL_USLEEP_TIME); mp_sleep_us(INITIAL_FILL_USLEEP_TIME);
} else } else
mp_sleep_us(FILL_USLEEP_TIME); // idle mp_sleep_us(FILL_USLEEP_TIME); // idle
#if FORKED_CACHE
sa.sa_handler = SIG_IGN; sa.sa_handler = SIG_IGN;
sigaction(SIGUSR1, &sa, NULL); sigaction(SIGUSR1, &sa, NULL);
#endif
} else { } else {
sleep_count = 0; sleep_count = 0;
s->idle = 0; s->idle = 0;
@ -495,180 +463,24 @@ static void cache_mainloop(cache_vars_t *s)
} while (cache_execute_control(s)); } while (cache_execute_control(s));
} }
int stream_enable_cache_percent(stream_t *stream, int64_t stream_cache_size, static int cache_fill_buffer(struct stream *stream, char *buffer, int max_len)
float stream_cache_min_percent,
float stream_cache_seek_min_percent)
{ {
return stream_enable_cache(stream, stream_cache_size * 1024, cache_vars_t *c = stream->priv;
stream_cache_size * 1024 * assert(c->cache_pid);
(stream_cache_min_percent / 100.0),
stream_cache_size * 1024 *
(stream_cache_seek_min_percent / 100.0));
}
/** if (stream->pos != c->read_filepos)
* \return 1 on success, 0 if the function was interrupted and -1 on error
*/
int stream_enable_cache(stream_t *stream, int64_t size, int64_t min,
int64_t seek_limit)
{
if (size < 0)
size = stream->cache_size * 1024;
if (!size)
return 1;
mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n",
size / 1024);
int ss = stream->sector_size ? stream->sector_size : STREAM_BUFFER_SIZE;
int res = -1;
cache_vars_t *s;
if (size > SIZE_MAX) {
mp_msg(MSGT_CACHE, MSGL_FATAL,
"Cache size larger than max. allocation size\n");
return -1;
}
s = cache_init(size, ss);
if (s == NULL)
return -1;
stream->cache_data = s;
s->stream = stream; // callback
s->seek_limit = seek_limit;
//make sure that we won't wait from cache_fill
//more data than it is allowed to fill
if (s->seek_limit > s->buffer_size - s->fill_limit)
s->seek_limit = s->buffer_size - s->fill_limit;
if (min > s->buffer_size - s->fill_limit)
min = s->buffer_size - s->fill_limit;
// to make sure we wait for the cache process/thread to be active
// before continuing
if (min <= 0)
min = 1;
#if FORKED_CACHE
if ((stream->cache_pid = fork())) {
if ((pid_t)stream->cache_pid == -1)
stream->cache_pid = 0;
#else
{
stream_t *stream2 = malloc(sizeof(stream_t));
memcpy(stream2, s->stream, sizeof(stream_t));
s->stream = stream2;
#if defined(__MINGW32__)
stream->cache_pid = _beginthread(ThreadProc, 0, s);
#else
{
pthread_t tid;
pthread_create(&tid, NULL, ThreadProc, s);
stream->cache_pid = 1;
}
#endif
#endif
if (!stream->cache_pid) {
mp_msg(MSGT_CACHE, MSGL_ERR,
"Starting cache process/thread failed: %s.\n",
strerror(errno));
goto err_out;
}
// wait until cache is filled at least prefill_init %
mp_msg(MSGT_CACHE, MSGL_V, "CACHE_PRE_INIT: %" PRId64 " [%" PRId64 "] "
"%" PRId64 " pre:%" PRId64 " eof:%d \n",
s->min_filepos, s->read_filepos, s->max_filepos, min, s->eof);
while (s->read_filepos < s->min_filepos ||
s->max_filepos - s->read_filepos < min)
{
mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% "
"(%" PRId64 " bytes) ",
100.0 * (float)(s->max_filepos - s->read_filepos) /
(float)(s->buffer_size),
s->max_filepos - s->read_filepos);
if (s->eof)
break; // file is smaller than prefill size
if (stream_check_interrupt(PREFILL_SLEEP_TIME)) {
res = 0;
goto err_out;
}
}
mp_msg(MSGT_CACHE, MSGL_STATUS, "\n");
stream->cached = true;
return 1; // parent exits
err_out:
cache_uninit(stream);
return res;
}
#if FORKED_CACHE
signal(SIGTERM, exit_sighandler); // kill
cache_mainloop(s);
// make sure forked code never leaves this function
exit(0);
#endif
}
#if !FORKED_CACHE
#if defined(__MINGW32__)
static void ThreadProc(void *s)
{
cache_mainloop(s);
_endthread();
}
#else
static void *ThreadProc(void *s)
{
cache_mainloop(s);
return NULL;
}
#endif
#endif
int cache_stream_fill_buffer(stream_t *s)
{
int len;
int sector_size;
if (!s->cache_pid)
return stream_fill_buffer(s);
if (s->pos != ((cache_vars_t *)s->cache_data)->read_filepos)
mp_msg(MSGT_CACHE, MSGL_ERR, mp_msg(MSGT_CACHE, MSGL_ERR,
"!!! read_filepos differs!!! report this bug...\n"); "!!! read_filepos differs!!! report this bug...\n");
sector_size = ((cache_vars_t *)s->cache_data)->sector_size;
if (sector_size > STREAM_MAX_SECTOR_SIZE) {
mp_msg(MSGT_CACHE, MSGL_ERR, "Sector size %i larger than maximum %i\n",
sector_size,
STREAM_MAX_SECTOR_SIZE);
sector_size = STREAM_MAX_SECTOR_SIZE;
}
len = cache_read(s->cache_data, s->buffer, sector_size);
//printf("cache_stream_fill_buffer->read -> %d\n",len);
if (len <= 0) {
s->eof = 1;
s->buf_pos = s->buf_len = 0;
return 0;
}
s->eof = 0;
s->buf_pos = 0;
s->buf_len = len;
s->pos += len;
// printf("[%d]",len);fflush(stdout);
stream_capture_write(s);
return len;
return cache_read(c, buffer, max_len);
} }
int cache_stream_seek_long(stream_t *stream, int64_t pos) static int cache_seek(stream_t *stream, int64_t pos)
{ {
cache_vars_t *s; cache_vars_t *s = stream->priv;
int64_t newpos; int64_t newpos;
if (!stream->cache_pid) assert(s->cache_pid);
return stream_seek_long(stream, pos);
s = stream->cache_data;
// s->seek_lock=1; // s->seek_lock=1;
mp_msg(MSGT_CACHE, MSGL_DBG2, "CACHE2_SEEK: 0x%" PRIX64 " <= 0x%" PRIX64 mp_msg(MSGT_CACHE, MSGL_DBG2, "CACHE2_SEEK: 0x%" PRIX64 " <= 0x%" PRIX64
@ -680,29 +492,14 @@ int cache_stream_seek_long(stream_t *stream, int64_t pos)
stream->pos = s->read_filepos = newpos; stream->pos = s->read_filepos = newpos;
s->eof = 0; // !!!!!!! s->eof = 0; // !!!!!!!
cache_wakeup(stream); cache_wakeup(stream);
return 1;
cache_stream_fill_buffer(stream);
pos -= newpos;
if (pos >= 0 && pos <= stream->buf_len) {
stream->buf_pos = pos; // byte position in sector
return 1;
}
// stream->buf_pos=stream->buf_len=0;
// return 1;
mp_msg(MSGT_CACHE, MSGL_V,
"cache_stream_seek: WARNING! Can't seek to 0x%" PRIX64 " !\n",
pos + newpos);
return 0;
} }
int cache_do_control(stream_t *stream, int cmd, void *arg) static int cache_control(stream_t *stream, int cmd, void *arg)
{ {
int sleep_count = 0; int sleep_count = 0;
int pos_change = 0; int pos_change = 0;
cache_vars_t *s = stream->cache_data; cache_vars_t *s = stream->priv;
switch (cmd) { switch (cmd) {
case STREAM_CTRL_GET_CACHE_SIZE: case STREAM_CTRL_GET_CACHE_SIZE:
*(int64_t *)arg = s->buffer_size; *(int64_t *)arg = s->buffer_size;
@ -812,3 +609,86 @@ int cache_do_control(stream_t *stream, int cmd, void *arg)
} }
return s->control_res; return s->control_res;
} }
// return 1 on success, 0 if the function was interrupted and -1 on error, or
// if the cache is disabled
int stream_cache_init(stream_t *cache, stream_t *stream, int64_t size,
int64_t min, int64_t seek_limit)
{
if (size < 0)
size = stream->cache_size * 1024;
if (!size)
return -1;
mp_tmsg(MSGT_NETWORK, MSGL_INFO, "Cache size set to %" PRId64 " KiB\n",
size / 1024);
int ss = stream->sector_size ? stream->sector_size : STREAM_BUFFER_SIZE;
cache_vars_t *s;
if (size > SIZE_MAX) {
mp_msg(MSGT_CACHE, MSGL_FATAL,
"Cache size larger than max. allocation size\n");
return -1;
}
s = cache_init(size, ss);
if (s == NULL)
return -1;
cache->priv = s;
s->cache = cache;
s->stream = stream; // callback
s->seek_limit = seek_limit;
cache->seek = cache_seek;
cache->fill_buffer = cache_fill_buffer;
cache->control = cache_control;
cache->close = cache_uninit;
//make sure that we won't wait from cache_fill
//more data than it is allowed to fill
if (s->seek_limit > s->buffer_size - s->fill_limit)
s->seek_limit = s->buffer_size - s->fill_limit;
if (min > s->buffer_size - s->fill_limit)
min = s->buffer_size - s->fill_limit;
// to make sure we wait for the cache process/thread to be active
// before continuing
if (min <= 0)
min = 1;
pid_t child_pid = fork();
if (child_pid) {
if (child_pid == (pid_t)-1)
child_pid = 0;
if (!child_pid) {
mp_msg(MSGT_CACHE, MSGL_ERR,
"Starting cache process/thread failed: %s.\n",
strerror(errno));
return -1;
}
s->cache_pid = child_pid;
// wait until cache is filled at least prefill_init %
mp_msg(MSGT_CACHE, MSGL_V, "CACHE_PRE_INIT: %" PRId64 " [%" PRId64 "] "
"%" PRId64 " pre:%" PRId64 " eof:%d \n",
s->min_filepos, s->read_filepos, s->max_filepos, min, s->eof);
while (s->read_filepos < s->min_filepos ||
s->max_filepos - s->read_filepos < min)
{
mp_tmsg(MSGT_CACHE, MSGL_STATUS, "\rCache fill: %5.2f%% "
"(%" PRId64 " bytes) ",
100.0 * (float)(s->max_filepos - s->read_filepos) /
(float)(s->buffer_size),
s->max_filepos - s->read_filepos);
if (s->eof)
break; // file is smaller than prefill size
if (stream_check_interrupt(PREFILL_SLEEP_TIME))
return 0;
}
mp_msg(MSGT_CACHE, MSGL_STATUS, "\n");
return 1; // parent exits
}
signal(SIGTERM, exit_sighandler); // kill
cache_mainloop(s);
// make sure forked code never leaves this function
exit(0);
}

View File

@ -1,27 +0,0 @@
/*
* This file is part of MPlayer.
*
* MPlayer is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* MPlayer is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with MPlayer; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef MPLAYER_CACHE2_H
#define MPLAYER_CACHE2_H
#include "stream.h"
void cache_uninit(stream_t *s);
int cache_do_control(stream_t *stream, int cmd, void *arg);
#endif /* MPLAYER_CACHE2_H */

View File

@ -52,8 +52,6 @@
#include "core/m_option.h" #include "core/m_option.h"
#include "core/m_struct.h" #include "core/m_struct.h"
#include "cache2.h"
/// We keep these 2 for the gui atm, but they will be removed. /// We keep these 2 for the gui atm, but they will be removed.
char *cdrom_device = NULL; char *cdrom_device = NULL;
char *dvd_device = NULL; char *dvd_device = NULL;
@ -136,7 +134,7 @@ static const stream_info_t *const auto_open_streams[] = {
NULL NULL
}; };
static stream_t *new_stream(int fd, int type); static stream_t *new_stream(void);
static stream_t *open_stream_plugin(const stream_info_t *sinfo, static stream_t *open_stream_plugin(const stream_info_t *sinfo,
const char *filename, const char *filename,
@ -165,7 +163,7 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo,
} }
} }
} }
s = new_stream(-2, -2); s = new_stream();
s->opts = options; s->opts = options;
s->url = strdup(filename); s->url = strdup(filename);
s->flags |= mode; s->flags |= mode;
@ -200,6 +198,8 @@ static stream_t *open_stream_plugin(const stream_info_t *sinfo,
s->mode = mode; s->mode = mode;
s->uncached_type = s->type;
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: [%s] %s\n", sinfo->name, filename); mp_msg(MSGT_OPEN, MSGL_V, "STREAM: [%s] %s\n", sinfo->name, filename);
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Description: %s\n", sinfo->info); mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Description: %s\n", sinfo->info);
mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Author: %s\n", sinfo->author); mp_msg(MSGT_OPEN, MSGL_V, "STREAM: Author: %s\n", sinfo->author);
@ -308,7 +308,7 @@ static int stream_reconnect(stream_t *s)
continue; continue;
} }
if (stream_seek_internal(s, pos) < 0 && s->pos == pos) if (stream_seek_unbuffered(s, pos) < 0 && s->pos == pos)
return 1; return 1;
} }
return 0; return 0;
@ -345,7 +345,11 @@ void stream_capture_write(stream_t *s)
} }
} }
int stream_read_internal(stream_t *s, void *buf, int len) // Read function bypassing the local stream buffer. This will not write into
// s->buffer, but into buf[0..len] instead.
// Returns < 0 on error, 0 on EOF, and length of bytes read on success.
// Partial reads are possible, even if EOF is not reached.
int stream_read_unbuffered(stream_t *s, void *buf, int len)
{ {
int orig_len = len; int orig_len = len;
// we will retry even if we already reached EOF previously. // we will retry even if we already reached EOF previously.
@ -379,7 +383,7 @@ int stream_read_internal(stream_t *s, void *buf, int len)
goto eof_out; goto eof_out;
// make sure EOF is set to ensure no endless loops // make sure EOF is set to ensure no endless loops
s->eof = 1; s->eof = 1;
return stream_read_internal(s, buf, orig_len); return stream_read_unbuffered(s, buf, orig_len);
eof_out: eof_out:
s->eof = 1; s->eof = 1;
@ -394,7 +398,7 @@ eof_out:
int stream_fill_buffer(stream_t *s) int stream_fill_buffer(stream_t *s)
{ {
int len = stream_read_internal(s, s->buffer, STREAM_BUFFER_SIZE); int len = stream_read_unbuffered(s, s->buffer, STREAM_BUFFER_SIZE);
if (len <= 0) if (len <= 0)
return 0; return 0;
s->buf_pos = 0; s->buf_pos = 0;
@ -411,7 +415,7 @@ int stream_read(stream_t *s, char *mem, int total)
int x; int x;
x = s->buf_len - s->buf_pos; x = s->buf_len - s->buf_pos;
if (x == 0) { if (x == 0) {
if (!cache_stream_fill_buffer(s)) if (!stream_fill_buffer(s))
return total - len; // EOF return total - len; // EOF
x = s->buf_len - s->buf_pos; x = s->buf_len - s->buf_pos;
} }
@ -441,7 +445,8 @@ int stream_write_buffer(stream_t *s, unsigned char *buf, int len)
return rd; return rd;
} }
int stream_seek_internal(stream_t *s, int64_t newpos) // Seek function bypassing the local stream buffer.
int stream_seek_unbuffered(stream_t *s, int64_t newpos)
{ {
if (newpos == 0 || newpos != s->pos) { if (newpos == 0 || newpos != s->pos) {
switch (s->type) { switch (s->type) {
@ -492,7 +497,9 @@ int stream_seek_internal(stream_t *s, int64_t newpos)
return -1; return -1;
} }
int stream_seek_long(stream_t *s, int64_t pos) // Unlike stream_seek, does not try to seek within local buffer.
// Unlike stream_seek_unbuffered(), it still fills the local buffer.
static int stream_seek_long(stream_t *s, int64_t pos)
{ {
int res; int res;
int64_t newpos = 0; int64_t newpos = 0;
@ -517,7 +524,7 @@ int stream_seek_long(stream_t *s, int64_t pos)
(int64_t)s->pos, (int64_t)newpos, (int64_t)pos, s->buf_len); (int64_t)s->pos, (int64_t)newpos, (int64_t)pos, s->buf_len);
pos -= newpos; pos -= newpos;
res = stream_seek_internal(s, newpos); res = stream_seek_unbuffered(s, newpos);
if (res >= 0) if (res >= 0)
return res; return res;
@ -565,7 +572,7 @@ int stream_seek(stream_t *s, int64_t pos)
} }
} }
return cache_stream_seek_long(s, pos); return stream_seek_long(s, pos);
} }
int stream_skip(stream_t *s, int64_t len) int stream_skip(stream_t *s, int64_t len)
@ -578,7 +585,7 @@ int stream_skip(stream_t *s, int64_t len)
while (len > 0) { while (len > 0) {
int x = s->buf_len - s->buf_pos; int x = s->buf_len - s->buf_pos;
if (x == 0) { if (x == 0) {
if (!cache_stream_fill_buffer(s)) if (!stream_fill_buffer(s))
return 0; // EOF return 0; // EOF
x = s->buf_len - s->buf_pos; x = s->buf_len - s->buf_pos;
} }
@ -602,10 +609,6 @@ void stream_reset(stream_t *s)
int stream_control(stream_t *s, int cmd, void *arg) int stream_control(stream_t *s, int cmd, void *arg)
{ {
#ifdef CONFIG_STREAM_CACHE
if (s->cache_pid)
return cache_do_control(s, cmd, arg);
#endif
if (!s->control) if (!s->control)
return STREAM_UNSUPPORTED; return STREAM_UNSUPPORTED;
return s->control(s, cmd, arg); return s->control(s, cmd, arg);
@ -620,7 +623,7 @@ void stream_update_size(stream_t *s)
} }
} }
static stream_t *new_stream(int fd, int type) static stream_t *new_stream(void)
{ {
stream_t *s = talloc_zero(NULL, stream_t); stream_t *s = talloc_zero(NULL, stream_t);
@ -632,18 +635,13 @@ static stream_t *new_stream(int fd, int type)
} }
#endif #endif
s->fd = fd; s->fd = -2;
s->type = type; s->type = -2;
stream_reset(s);
return s; return s;
} }
void free_stream(stream_t *s) void free_stream(stream_t *s)
{ {
// printf("\n*** free_stream() called ***\n");
#ifdef CONFIG_STREAM_CACHE
cache_uninit(s);
#endif
stream_set_capture_file(s, NULL); stream_set_capture_file(s, NULL);
if (s->close) if (s->close)
@ -662,6 +660,8 @@ void free_stream(stream_t *s)
WSACleanup(); // there might be a better place for this (-> later) WSACleanup(); // there might be a better place for this (-> later)
#endif #endif
free(s->url); free(s->url);
if (s->uncached_stream)
free_stream(s->uncached_stream);
talloc_free(s); talloc_free(s);
} }
@ -681,6 +681,65 @@ int stream_check_interrupt(int time)
return stream_check_interrupt_cb(stream_check_interrupt_ctx, time); return stream_check_interrupt_cb(stream_check_interrupt_ctx, time);
} }
int stream_enable_cache_percent(stream_t **stream, int64_t stream_cache_size,
float stream_cache_min_percent,
float stream_cache_seek_min_percent)
{
return stream_enable_cache(stream, stream_cache_size * 1024,
stream_cache_size * 1024 *
(stream_cache_min_percent / 100.0),
stream_cache_size * 1024 *
(stream_cache_seek_min_percent / 100.0));
}
/**
* \return 1 on success, 0 if the function was interrupted and -1 on error, or
* if the cache is disabled
*/
int stream_enable_cache(stream_t **stream, int64_t size, int64_t min,
int64_t seek_limit)
{
stream_t *orig = *stream;
if (orig->mode != STREAM_READ)
return 1;
// Can't handle a loaded buffer.
orig->buf_len = orig->buf_pos = 0;
stream_t *cache = new_stream();
cache->type = STREAMTYPE_CACHE;
cache->uncached_type = orig->type;
cache->uncached_stream = orig;
cache->flags |= MP_STREAM_SEEK;
cache->mode = STREAM_READ;
cache->read_chunk = orig->read_chunk;
cache->url = strdup(orig->url);
cache->mime_type = talloc_strdup(cache, orig->mime_type);
cache->lavf_type = orig->lavf_type;
cache->opts = orig->opts;
cache->sector_size = orig->sector_size;
cache->read_chunk = orig->read_chunk;
cache->cache_size = orig->cache_size;
cache->start_pos = orig->start_pos;
cache->end_pos = orig->end_pos;
int res = -1;
#ifdef CONFIG_STREAM_CACHE
res = stream_cache_init(cache, orig, size, min, seek_limit);
#endif
if (res <= 0) {
cache->uncached_stream = NULL; // don't free original stream
free_stream(cache);
} else {
*stream = cache;
}
return res;
}
/** /**
* Helper function to read 16 bits little-endian and advance pointer * Helper function to read 16 bits little-endian and advance pointer
*/ */
@ -789,7 +848,7 @@ unsigned char *stream_read_line(stream_t *s, unsigned char *mem, int max,
len = s->buf_len - s->buf_pos; len = s->buf_len - s->buf_pos;
// try to fill the buffer // try to fill the buffer
if (len <= 0 && if (len <= 0 &&
(!cache_stream_fill_buffer(s) || (!stream_fill_buffer(s) ||
(len = s->buf_len - s->buf_pos) <= 0)) (len = s->buf_len - s->buf_pos) <= 0))
break; break;
end = find_newline(s->buffer + s->buf_pos, len, utf16); end = find_newline(s->buffer + s->buf_pos, len, utf16);

View File

@ -53,6 +53,7 @@
#define STREAMTYPE_RADIO 19 #define STREAMTYPE_RADIO 19
#define STREAMTYPE_BLURAY 20 #define STREAMTYPE_BLURAY 20
#define STREAMTYPE_AVDEVICE 21 #define STREAMTYPE_AVDEVICE 21
#define STREAMTYPE_CACHE 22
#define STREAM_BUFFER_SIZE 2048 #define STREAM_BUFFER_SIZE 2048
#define STREAM_MAX_SECTOR_SIZE (8 * 1024) #define STREAM_MAX_SECTOR_SIZE (8 * 1024)
@ -167,7 +168,8 @@ typedef struct stream {
int fd; // file descriptor, see man open(2) int fd; // file descriptor, see man open(2)
int type; // see STREAMTYPE_* int type; // see STREAMTYPE_*
int flags; int uncached_type; // like (uncached_stream ? uncached_stream->type : type)
int flags; // MP_STREAM_SEEK_* or'ed flags
int sector_size; // sector size (seek will be aligned on this size if non 0) int sector_size; // sector size (seek will be aligned on this size if non 0)
int read_chunk; // maximum amount of data to read at once to limit latency (0 for default) int read_chunk; // maximum amount of data to read at once to limit latency (0 for default)
unsigned int buf_pos, buf_len; unsigned int buf_pos, buf_len;
@ -176,9 +178,6 @@ typedef struct stream {
int mode; //STREAM_READ or STREAM_WRITE int mode; //STREAM_READ or STREAM_WRITE
bool streaming; // known to be a network stream if true bool streaming; // known to be a network stream if true
int cache_size; // cache size in KB to use if enabled int cache_size; // cache size in KB to use if enabled
bool cached; // cache active
unsigned int cache_pid;
void *cache_data;
void *priv; // used for DVD, TV, RTSP etc void *priv; // used for DVD, TV, RTSP etc
char *url; // strdup() of filename/url char *url; // strdup() of filename/url
char *mime_type; // when HTTP streaming is used char *mime_type; // when HTTP streaming is used
@ -191,6 +190,8 @@ typedef struct stream {
FILE *capture_file; FILE *capture_file;
char *capture_filename; char *capture_filename;
struct stream *uncached_stream;
} stream_t; } stream_t;
#ifdef CONFIG_NETWORKING #ifdef CONFIG_NETWORKING
@ -198,36 +199,26 @@ typedef struct stream {
#endif #endif
int stream_fill_buffer(stream_t *s); int stream_fill_buffer(stream_t *s);
int stream_seek_long(stream_t *s, int64_t pos);
void stream_set_capture_file(stream_t *s, const char *filename); void stream_set_capture_file(stream_t *s, const char *filename);
void stream_capture_write(stream_t *s); void stream_capture_write(stream_t *s);
#ifdef CONFIG_STREAM_CACHE int stream_enable_cache_percent(stream_t **stream, int64_t stream_cache_size,
int stream_enable_cache_percent(stream_t *stream, int64_t stream_cache_size,
float stream_cache_min_percent, float stream_cache_min_percent,
float stream_cache_seek_min_percent); float stream_cache_seek_min_percent);
int stream_enable_cache(stream_t *stream, int64_t size, int64_t min, int stream_enable_cache(stream_t **stream, int64_t size, int64_t min,
int64_t prefill); int64_t seek_limit);
int cache_stream_fill_buffer(stream_t *s);
int cache_stream_seek_long(stream_t *s, int64_t pos); // Internal
#else int stream_cache_init(stream_t *cache, stream_t *stream, int64_t size,
// no cache, define wrappers: int64_t min, int64_t seek_limit);
#define cache_stream_fill_buffer(x) stream_fill_buffer(x)
#define cache_stream_seek_long(x, y) stream_seek_long(x, y)
#define stream_enable_cache(x, y, z, w) 1
#define stream_enable_cache_percent(x, y, z, w) 1
#endif
int stream_write_buffer(stream_t *s, unsigned char *buf, int len); int stream_write_buffer(stream_t *s, unsigned char *buf, int len);
inline static int stream_read_char(stream_t *s) inline static int stream_read_char(stream_t *s)
{ {
return (s->buf_pos < s->buf_len) ? s->buffer[s->buf_pos++] : return (s->buf_pos < s->buf_len) ? s->buffer[s->buf_pos++] :
(cache_stream_fill_buffer(s) ? s->buffer[s->buf_pos++] : -256); (stream_fill_buffer(s) ? s->buffer[s->buf_pos++] : -256);
// if(s->buf_pos<s->buf_len) return s->buffer[s->buf_pos++];
// stream_fill_buffer(s);
// if(s->buf_pos<s->buf_len) return s->buffer[s->buf_pos++];
// return 0; // EOF
} }
inline static unsigned int stream_read_word(stream_t *s) inline static unsigned int stream_read_word(stream_t *s)
@ -333,10 +324,9 @@ void stream_set_interrupt_callback(int (*cb)(struct input_ctx *, int),
/// Call the interrupt checking callback if there is one and /// Call the interrupt checking callback if there is one and
/// wait for time milliseconds /// wait for time milliseconds
int stream_check_interrupt(int time); int stream_check_interrupt(int time);
/// Internal read function bypassing the stream buffer
int stream_read_internal(stream_t *s, void *buf, int len); int stream_read_unbuffered(stream_t *s, void *buf, int len);
/// Internal seek function bypassing the stream buffer int stream_seek_unbuffered(stream_t *s, int64_t newpos);
int stream_seek_internal(stream_t *s, int64_t newpos);
bool stream_manages_timeline(stream_t *s); bool stream_manages_timeline(stream_t *s);

View File

@ -923,19 +923,6 @@ static int open_s(stream_t *stream,int mode, void* opts, int* file_format) {
return STREAM_ERROR; return STREAM_ERROR;
} }
#if defined(CONFIG_RADIO_CAPTURE) && defined(CONFIG_STREAM_CACHE)
if(priv->do_capture){
//5 second cache
if(!stream_enable_cache(stream,5*priv->audio_in.samplerate*priv->audio_in.channels*
priv->audio_in.bytes_per_sample,2*priv->audio_in.samplerate*priv->audio_in.channels*
priv->audio_in.bytes_per_sample,priv->audio_in.blocksize)) {
mp_tmsg(MSGT_RADIO, MSGL_ERR, "[radio] Call to stream_enable_cache failed: %s\n",strerror(errno));
close_s(stream);
return STREAM_ERROR;
}
}
#endif
set_volume(priv,priv->radio_param->volume); set_volume(priv,priv->radio_param->volume);
return STREAM_OK; return STREAM_OK;