From be97853c2f9f5d93a0724941034cefcdbf77ebac Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Tue, 27 Aug 2019 11:44:13 +0200 Subject: [PATCH] MINOR: ring: add a ring_write() function This function tries to write to the ring buffer, possibly removing enough old messages to make room for the new one. It takes two arrays of fragments on input to ease the insertion of prefixes by the caller. It atomically writes the message, possibly truncating it if desired, and returns the operation's status. --- include/proto/ring.h | 3 ++ src/ring.c | 89 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/include/proto/ring.h b/include/proto/ring.h index 6c08353ec..e32403454 100644 --- a/include/proto/ring.h +++ b/include/proto/ring.h @@ -22,11 +22,14 @@ #ifndef _PROTO_RING_H #define _PROTO_RING_H +#include +#include #include struct ring *ring_new(size_t size); struct ring *ring_resize(struct ring *ring, size_t size); void ring_free(struct ring *ring); +ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg); #endif /* _PROTO_RING_H */ diff --git a/src/ring.c b/src/ring.c index a1f77dab7..8ba3868ac 100644 --- a/src/ring.c +++ b/src/ring.c @@ -99,6 +99,95 @@ void ring_free(struct ring *ring) free(ring); } +/* Tries to send parts from followed by parts from + * to ring . The message is sent atomically. It may be truncated to + * bytes if is non-null. There is no distinction between the + * two lists, it's just a convenience to help the caller prepend some prefixes + * when necessary. It takes the ring's write lock to make sure no other thread + * will touch the buffer during the update. Returns the number of bytes sent, + * or <=0 on failure. + */ +ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg) +{ + struct buffer *buf = &ring->buf; + size_t totlen = 0; + size_t lenlen; + size_t dellen; + int dellenlen; + ssize_t sent = 0; + int i; + + /* we have to find some room to add our message (the buffer is + * never empty and at least contains the previous counter) and + * to update both the buffer contents and heads at the same + * time (it's doable using atomic ops but not worth the + * trouble, let's just lock). For this we first need to know + * the total message's length. We cannot measure it while + * copying due to the varint encoding of the length. + */ + for (i = 0; i < npfx; i++) + totlen += pfx[i].len; + for (i = 0; i < nmsg; i++) + totlen += msg[i].len; + + if (totlen > maxlen) + totlen = maxlen; + + lenlen = varint_bytes(totlen); + + HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); + if (lenlen + totlen + 1 + 1 > b_size(buf)) + goto done_buf; + + while (b_room(buf) < lenlen + totlen + 1) { + /* we need to delete the oldest message (from the end), + * and we have to stop if there's a reader stuck there. + * Unless there's corruption in the buffer it's guaranteed + * that we have enough data to find 1 counter byte, a + * varint-encoded length (1 byte min) and the message + * payload (0 bytes min). + */ + if (*b_head(buf)) + goto done_buf; + dellenlen = b_peek_varint(buf, 1, &dellen); + if (!dellenlen) + goto done_buf; + BUG_ON(b_data(buf) < 1 + dellenlen + dellen); + + b_del(buf, 1 + dellenlen + dellen); + ring->ofs += 1 + dellenlen + dellen; + } + + /* OK now we do have room */ + __b_put_varint(buf, totlen); + + totlen = 0; + for (i = 0; i < npfx; i++) { + size_t len = pfx[i].len; + + if (len + totlen > maxlen) + len = maxlen - totlen; + if (len) + __b_putblk(buf, pfx[i].ptr, len); + totlen += len; + } + + for (i = 0; i < nmsg; i++) { + size_t len = msg[i].len; + + if (len + totlen > maxlen) + len = maxlen - totlen; + if (len) + __b_putblk(buf, msg[i].ptr, len); + totlen += len; + } + + *b_tail(buf) = 0; buf->data++;; // new read counter + sent = lenlen + totlen + 1; + done_buf: + HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); + return sent; +} /* * Local variables: