diff --git a/include/proto/ring.h b/include/proto/ring.h index 6c08353ec..e32403454 100644 --- a/include/proto/ring.h +++ b/include/proto/ring.h @@ -22,11 +22,14 @@ #ifndef _PROTO_RING_H #define _PROTO_RING_H +#include +#include #include struct ring *ring_new(size_t size); struct ring *ring_resize(struct ring *ring, size_t size); void ring_free(struct ring *ring); +ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg); #endif /* _PROTO_RING_H */ diff --git a/src/ring.c b/src/ring.c index a1f77dab7..8ba3868ac 100644 --- a/src/ring.c +++ b/src/ring.c @@ -99,6 +99,95 @@ void ring_free(struct ring *ring) free(ring); } +/* Tries to send parts from followed by parts from + * to ring . The message is sent atomically. It may be truncated to + * bytes if is non-null. There is no distinction between the + * two lists, it's just a convenience to help the caller prepend some prefixes + * when necessary. It takes the ring's write lock to make sure no other thread + * will touch the buffer during the update. Returns the number of bytes sent, + * or <=0 on failure. + */ +ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg) +{ + struct buffer *buf = &ring->buf; + size_t totlen = 0; + size_t lenlen; + size_t dellen; + int dellenlen; + ssize_t sent = 0; + int i; + + /* we have to find some room to add our message (the buffer is + * never empty and at least contains the previous counter) and + * to update both the buffer contents and heads at the same + * time (it's doable using atomic ops but not worth the + * trouble, let's just lock). For this we first need to know + * the total message's length. We cannot measure it while + * copying due to the varint encoding of the length. + */ + for (i = 0; i < npfx; i++) + totlen += pfx[i].len; + for (i = 0; i < nmsg; i++) + totlen += msg[i].len; + + if (totlen > maxlen) + totlen = maxlen; + + lenlen = varint_bytes(totlen); + + HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); + if (lenlen + totlen + 1 + 1 > b_size(buf)) + goto done_buf; + + while (b_room(buf) < lenlen + totlen + 1) { + /* we need to delete the oldest message (from the end), + * and we have to stop if there's a reader stuck there. + * Unless there's corruption in the buffer it's guaranteed + * that we have enough data to find 1 counter byte, a + * varint-encoded length (1 byte min) and the message + * payload (0 bytes min). + */ + if (*b_head(buf)) + goto done_buf; + dellenlen = b_peek_varint(buf, 1, &dellen); + if (!dellenlen) + goto done_buf; + BUG_ON(b_data(buf) < 1 + dellenlen + dellen); + + b_del(buf, 1 + dellenlen + dellen); + ring->ofs += 1 + dellenlen + dellen; + } + + /* OK now we do have room */ + __b_put_varint(buf, totlen); + + totlen = 0; + for (i = 0; i < npfx; i++) { + size_t len = pfx[i].len; + + if (len + totlen > maxlen) + len = maxlen - totlen; + if (len) + __b_putblk(buf, pfx[i].ptr, len); + totlen += len; + } + + for (i = 0; i < nmsg; i++) { + size_t len = msg[i].len; + + if (len + totlen > maxlen) + len = maxlen - totlen; + if (len) + __b_putblk(buf, msg[i].ptr, len); + totlen += len; + } + + *b_tail(buf) = 0; buf->data++;; // new read counter + sent = lenlen + totlen + 1; + done_buf: + HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); + return sent; +} /* * Local variables: