msg: drop old instead of new messages on overflow

It did that because there was no other way. It used a lock-free ring
buffer, which does not support this. Use a "manual" ring buffer with
explicit locks instead, and drop messages from the start.

(We could have continued to use mp_ring, but it was already too late,
and although mp_ring is fine, using it for types other than bytes looked
awkward, and writing a ring buffer yet again seemed nicer. At least it's
not C++, where mp_ring would have been a template, and everything would
have looked like shit soup no matter what.)
This commit is contained in:
wm4 2019-11-22 01:08:20 +01:00
parent 53477ffc4b
commit 65a531b8e4
1 changed files with 57 additions and 43 deletions

View File

@ -30,7 +30,6 @@
#include "osdep/atomic.h" #include "osdep/atomic.h"
#include "common/common.h" #include "common/common.h"
#include "common/global.h" #include "common/global.h"
#include "misc/ring.h"
#include "misc/bstr.h" #include "misc/bstr.h"
#include "options/options.h" #include "options/options.h"
#include "options/path.h" #include "options/path.h"
@ -86,9 +85,13 @@ struct mp_log {
struct mp_log_buffer { struct mp_log_buffer {
struct mp_log_root *root; struct mp_log_root *root;
struct mp_ring *ring; struct mp_log_buffer_entry **entries; // ringbuffer
int capacity; // total space in entries[]
int entry0; // first (oldest) entry index
int num_entries; // number of valid entries after entry0
uint64_t dropped; // number of skipped entries
int level; int level;
atomic_bool silent; bool silent;
void (*wakeup_cb)(void *ctx); void (*wakeup_cb)(void *ctx);
void *wakeup_cb_ctx; void *wakeup_cb_ctx;
}; };
@ -298,6 +301,15 @@ static void write_log_file(struct mp_log *log, int lev, char *text)
fflush(root->log_file); fflush(root->log_file);
} }
static struct mp_log_buffer_entry *log_buffer_read(struct mp_log_buffer *buffer)
{
assert(buffer->num_entries);
struct mp_log_buffer_entry *res = buffer->entries[buffer->entry0];
buffer->entry0 = (buffer->entry0 + 1) % buffer->capacity;
buffer->num_entries -= 1;
return res;
}
static void write_msg_to_buffers(struct mp_log *log, int lev, char *text) static void write_msg_to_buffers(struct mp_log *log, int lev, char *text)
{ {
struct mp_log_root *root = log->root; struct mp_log_root *root = log->root;
@ -307,27 +319,21 @@ static void write_msg_to_buffers(struct mp_log *log, int lev, char *text)
if (buffer_level == MP_LOG_BUFFER_MSGL_TERM) if (buffer_level == MP_LOG_BUFFER_MSGL_TERM)
buffer_level = log->terminal_level; buffer_level = log->terminal_level;
if (lev <= buffer_level && lev != MSGL_STATUS) { if (lev <= buffer_level && lev != MSGL_STATUS) {
// Assuming a single writer (serialized by msg lock) if (buffer->num_entries == buffer->capacity) {
int avail = mp_ring_available(buffer->ring) / sizeof(void *); struct mp_log_buffer_entry *skip = log_buffer_read(buffer);
if (avail < 1) talloc_free(skip);
continue; buffer->dropped += 1;
struct mp_log_buffer_entry *entry = talloc_ptrtype(NULL, entry);
if (avail > 1) {
*entry = (struct mp_log_buffer_entry) {
.prefix = talloc_strdup(entry, log->verbose_prefix),
.level = lev,
.text = talloc_strdup(entry, text),
};
} else {
// write overflow message to signal that messages might be lost
*entry = (struct mp_log_buffer_entry) {
.prefix = "overflow",
.level = MSGL_FATAL,
.text = "log message buffer overflow\n",
};
} }
mp_ring_write(buffer->ring, (unsigned char *)&entry, sizeof(entry)); struct mp_log_buffer_entry *entry = talloc_ptrtype(NULL, entry);
if (buffer->wakeup_cb && !atomic_load(&buffer->silent)) *entry = (struct mp_log_buffer_entry) {
.prefix = talloc_strdup(entry, log->verbose_prefix),
.level = lev,
.text = talloc_strdup(entry, text),
};
int pos = (buffer->entry0 + buffer->num_entries) % buffer->capacity;
buffer->entries[pos] = entry;
buffer->num_entries += 1;
if (buffer->wakeup_cb && !buffer->silent)
buffer->wakeup_cb(buffer->wakeup_cb_ctx); buffer->wakeup_cb(buffer->wakeup_cb_ctx);
} }
} }
@ -618,16 +624,17 @@ struct mp_log_buffer *mp_msg_log_buffer_new(struct mpv_global *global,
} }
} }
assert(size > 0);
struct mp_log_buffer *buffer = talloc_ptrtype(NULL, buffer); struct mp_log_buffer *buffer = talloc_ptrtype(NULL, buffer);
*buffer = (struct mp_log_buffer) { *buffer = (struct mp_log_buffer) {
.root = root, .root = root,
.level = level, .level = level,
.ring = mp_ring_new(buffer, sizeof(void *) * size), .entries = talloc_array(buffer, struct mp_log_buffer_entry *, size),
.capacity = size,
.wakeup_cb = wakeup_cb, .wakeup_cb = wakeup_cb,
.wakeup_cb_ctx = wakeup_cb_ctx, .wakeup_cb_ctx = wakeup_cb_ctx,
}; };
if (!buffer->ring)
abort();
MP_TARRAY_APPEND(root, root->buffers, root->num_buffers, buffer); MP_TARRAY_APPEND(root, root->buffers, root->num_buffers, buffer);
@ -640,7 +647,7 @@ struct mp_log_buffer *mp_msg_log_buffer_new(struct mpv_global *global,
void mp_msg_log_buffer_set_silent(struct mp_log_buffer *buffer, bool silent) void mp_msg_log_buffer_set_silent(struct mp_log_buffer *buffer, bool silent)
{ {
pthread_mutex_lock(&mp_msg_lock); pthread_mutex_lock(&mp_msg_lock);
atomic_store(&buffer->silent, silent); buffer->silent = silent;
pthread_mutex_unlock(&mp_msg_lock); pthread_mutex_unlock(&mp_msg_lock);
} }
@ -663,13 +670,8 @@ void mp_msg_log_buffer_destroy(struct mp_log_buffer *buffer)
found: found:
while (1) { while (buffer->num_entries)
atomic_store(&buffer->silent, false); talloc_free(log_buffer_read(buffer));
struct mp_log_buffer_entry *e = mp_msg_log_buffer_read(buffer);
if (!e)
break;
talloc_free(e);
}
talloc_free(buffer); talloc_free(buffer);
atomic_fetch_add(&root->reload_counter, 1); atomic_fetch_add(&root->reload_counter, 1);
@ -680,15 +682,27 @@ found:
// Thread-safety: one buffer can be read by a single thread only. // Thread-safety: one buffer can be read by a single thread only.
struct mp_log_buffer_entry *mp_msg_log_buffer_read(struct mp_log_buffer *buffer) struct mp_log_buffer_entry *mp_msg_log_buffer_read(struct mp_log_buffer *buffer)
{ {
if (atomic_load(&buffer->silent)) struct mp_log_buffer_entry *res = NULL;
return NULL;
void *ptr = NULL; pthread_mutex_lock(&mp_msg_lock);
int read = mp_ring_read(buffer->ring, (unsigned char *)&ptr, sizeof(ptr));
if (read == 0) if (!buffer->silent && buffer->num_entries) {
return NULL; if (buffer->dropped) {
if (read != sizeof(ptr)) res = talloc_ptrtype(NULL, res);
abort(); *res = (struct mp_log_buffer_entry) {
return ptr; .prefix = "overflow",
.level = MSGL_FATAL,
.text = "log message buffer overflow\n",
};
buffer->dropped = 0;
} else {
res = log_buffer_read(buffer);
}
}
pthread_mutex_unlock(&mp_msg_lock);
return res;
} }
// Thread-safety: fully thread-safe, but keep in mind that the lifetime of // Thread-safety: fully thread-safe, but keep in mind that the lifetime of