MEDIUM: ring: remove the struct buffer from the ring

The purpose is to store a head and a tail that are independent so that
we can further improve the API to update them independently from each
other.

The struct was arranged like the original one so that as long as a ring
has its head set to zero (i.e. no recycling) it will continue to work.
The new format is already detectable thanks to the "rsvd" field which
indicates the number of reserved bytes at the beginning. It's located
where the buffer's area pointer previously was, so that older versions
of haring can continue to open the ring in repair mode, and newer ones
can use the fact that the upper bits of that variable are zero to guess
that it's working with the new format instead of the old one. Also let's
keep in mind that the layout will further change to place some alignment
constraints.

The haring tool will thus updated based on this and it detects that the
rsvd field is smaller than a page and that the sum of it with the size
equals the mapped size, in which case it uses the new dump_v2() function
instead of dump_v1(). The new function also creates a buffer from the
ring's area, size, head and tail and calls the generic one so that no
other code had to be adapted.
This commit is contained in:
Willy Tarreau 2024-02-27 09:17:45 +01:00
parent 01aa0a057c
commit bf3dead20c
5 changed files with 92 additions and 33 deletions

View File

@ -46,6 +46,15 @@ struct ring_v1 {
struct buffer buf; // storage area
};
// ring v2 format (not aligned)
struct ring_v2 {
size_t size; // storage size
size_t rsvd; // header length (used for file-backed maps)
size_t tail; // storage tail
size_t head; // storage head
char area[0]; // storage area begins immediately here
};
/* display the message and exit with the code */
__attribute__((noreturn)) void die(int code, const char *format, ...)
{
@ -180,6 +189,32 @@ int dump_ring_v1(struct ring_v1 *ring, size_t ofs, int flags)
return dump_ring_as_buf(buf, ofs, flags);
}
/* This function dumps all events from the ring <ring> from offset <ofs> and
* with flags <flags>.
*/
int dump_ring_v2(struct ring_v2 *ring, size_t ofs, int flags)
{
size_t size, head, tail, data;
struct buffer buf;
/* In ring v2 format, we have in this order:
* - size
* - hdr len (reserved bytes)
* - tail
* - head
* We can rebuild an equivalent buffer from these info for the function
* to dump.
*/
/* Now make our own buffer pointing to that area */
size = ring->size;
head = ring->head;
tail = ring->tail;
data = (head <= tail ? 0 : size) + tail - head;
buf = b_make((void *)ring + ring->rsvd, size, head, data);
return dump_ring_as_buf(buf, ofs, flags);
}
int main(int argc, char **argv)
{
void *ring;
@ -224,7 +259,11 @@ int main(int argc, char **argv)
return 1;
}
return dump_ring_v1(ring, 0, 0);
if (((struct ring_v2 *)ring)->rsvd < 4096 && // not a pointer (v1), must be ringv2's rsvd
((struct ring_v2 *)ring)->rsvd + ((struct ring_v2 *)ring)->size == statbuf.st_size)
return dump_ring_v2(ring, 0, 0);
else
return dump_ring_v1(ring, 0, 0);
}

View File

@ -105,8 +105,11 @@
/* this is the mmapped part */
struct ring_storage {
struct buffer buf; // storage layout
char area[0]; // storage area begins immediately here
size_t size; // storage size
size_t rsvd; // header length (used for file-backed maps)
size_t tail; // storage tail
size_t head; // storage head
char area[0]; // storage area begins immediately here
};
/* this is the ring definition, config, waiters etc */

View File

@ -25,6 +25,7 @@
#include <stdlib.h>
#include <import/ist.h>
#include <haproxy/ring-t.h>
#include <haproxy/vecpair.h>
struct appctx;
@ -47,31 +48,32 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
/* returns the ring storage's area */
static inline void *ring_area(const struct ring *ring)
{
return b_orig(&ring->storage->buf);
return ring->storage->area;
}
/* returns the number of bytes in the ring */
static inline size_t ring_data(const struct ring *ring)
{
return b_data(&ring->storage->buf);
return ((ring->storage->head <= ring->storage->tail) ?
0 : ring->storage->size) + ring->storage->tail - ring->storage->head;
}
/* returns the allocated size in bytes for the ring */
static inline size_t ring_size(const struct ring *ring)
{
return b_size(&ring->storage->buf);
return ring->storage->size;
}
/* returns the head offset of the ring */
static inline size_t ring_head(const struct ring *ring)
{
return b_head_ofs(&ring->storage->buf);
return ring->storage->head;
}
/* returns the tail offset of the ring */
static inline size_t ring_tail(const struct ring *ring)
{
return b_tail_ofs(&ring->storage->buf);
return ring->storage->tail;
}
/* duplicates ring <src> over ring <dst> for no more than <max> bytes or no
@ -81,11 +83,16 @@ static inline size_t ring_tail(const struct ring *ring)
*/
static inline size_t ring_dup(struct ring *dst, const struct ring *src, size_t max)
{
struct ist v1, v2;
vp_ring_to_data(&v1, &v2, ring_area(src), ring_size(src), ring_head(src), ring_tail(src));
if (max > ring_data(src))
max = ring_data(src);
b_reset(&dst->storage->buf);
b_ncat(&dst->storage->buf, &src->storage->buf, max);
vp_peek_ofs(v1, v2, 0, ring_area(dst), max);
dst->storage->head = 0;
dst->storage->tail = max;
return max;
}

View File

@ -50,15 +50,16 @@ void ring_init(struct ring *ring, void *area, size_t size, int reset)
ring->readers_count = 0;
ring->flags = 0;
ring->storage = area;
ring->storage->buf.area = ring->storage->area;
if (reset) {
ring->storage->buf = b_make(ring->storage->area,
size - sizeof(*ring->storage),
0, 0);
ring->storage->size = size - sizeof(*ring->storage);
ring->storage->rsvd = sizeof(*ring->storage);
ring->storage->head = 0;
ring->storage->tail = 0;
/* write the initial RC byte */
b_putchr(&ring->storage->buf, 0);
*ring->storage->area = 0;
ring->storage->tail = 1;
}
}
@ -114,11 +115,12 @@ struct ring *ring_new(size_t size)
*/
struct ring *ring_resize(struct ring *ring, size_t size)
{
struct ring_storage *new;
struct ring_storage *old, *new;
if (size <= ring_data(ring) + sizeof(*ring->storage))
return ring;
old = ring->storage;
new = malloc(size);
if (!new)
return NULL;
@ -128,11 +130,17 @@ struct ring *ring_resize(struct ring *ring, size_t size)
/* recheck the ring's size, it may have changed during the malloc */
if (size > ring_data(ring) + sizeof(*ring->storage)) {
/* copy old contents */
new->buf = b_make(new->area, size - sizeof(*ring->storage), 0, 0);
b_getblk(&ring->storage->buf, new->area, ring_data(ring), 0);
new->buf.data = ring_data(ring);
struct ist v1, v2;
size_t len;
vp_ring_to_data(&v1, &v2, old->area, old->size, old->head, old->tail);
len = vp_size(v1, v2);
vp_peek_ofs(v1, v2, 0, new->area, len);
new->size = size - sizeof(*ring->storage);
new->rsvd = sizeof(*ring->storage);
new->head = 0;
new->tail = len;
new = HA_ATOMIC_XCHG(&ring->storage, new);
/* new is now the old one */
}
thread_release();
@ -164,7 +172,6 @@ void ring_free(struct ring *ring)
*/
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
{
struct buffer *buf = &ring->storage->buf;
size_t head_ofs, tail_ofs;
size_t ring_size;
char *ring_area;
@ -210,15 +217,15 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
/* these ones do not change under us (only resize affects them and it
* must be done under thread isolation).
*/
ring_area = b_orig(buf);
ring_size = b_size(buf);
ring_area = ring->storage->area;
ring_size = ring->storage->size;
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (needed + 1 > ring_size)
goto leave;
head_ofs = b_head_ofs(buf);
tail_ofs = b_tail_ofs(buf);
head_ofs = ring_head(ring);
tail_ofs = ring_tail(ring);
vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
@ -281,8 +288,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
done_update_buf:
/* update the new space in the buffer */
buf->head = head_ofs;
buf->data = ((tail_ofs >= head_ofs) ? 0 : ring_size) + tail_ofs - head_ofs;
ring->storage->head = head_ofs;
ring->storage->tail = tail_ofs;
/* notify potential readers */
if (sent) {
@ -373,7 +380,6 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
{
struct buffer *buf = &ring->storage->buf;
size_t head_ofs, tail_ofs;
size_t ring_size;
char *ring_area;
@ -383,13 +389,13 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
ssize_t copied;
int ret;
ring_area = b_orig(buf);
ring_size = b_size(buf);
ring_area = ring->storage->area;
ring_size = ring->storage->size;
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
head_ofs = b_head_ofs(buf);
tail_ofs = b_tail_ofs(buf);
head_ofs = ring->storage->head;
tail_ofs = ring->storage->tail;
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in

View File

@ -666,10 +666,14 @@ void sink_rotate_file_backed_ring(const char *name)
if (ret != sizeof(storage))
goto rotate;
/* check that it's the expected format before touching it */
if (storage.rsvd != sizeof(storage))
return;
/* contents are present, we want to keep them => rotate. Note that
* an empty ring buffer has one byte (the marker).
*/
if (storage.buf.data > 1)
if (storage.head != 0 || storage.tail != 1)
goto rotate;
/* nothing to keep, let's scratch the file and preserve the backup */