MINOR: ring: add a generic CLI io_handler to dump a ring buffer

The three functions (attach, IO handler, and release) are meant to be
called by any CLI command which requires to dump the contents of a ring
buffer. We do not implement anything generic to dump any ring buffer on
the CLI since it's meant to be used by other functionalities above.
However these functions deal with locking and everything so it's trivial
to embed them in other code.
This commit is contained in:
Willy Tarreau 2019-08-27 11:55:39 +02:00
parent be97853c2f
commit 072931cdcb
2 changed files with 125 additions and 0 deletions

View File

@ -30,6 +30,9 @@ struct ring *ring_new(size_t size);
struct ring *ring_resize(struct ring *ring, size_t size);
void ring_free(struct ring *ring);
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg);
int ring_attach_cli(struct ring *ring, struct appctx *appctx);
int cli_io_handler_show_ring(struct appctx *appctx);
void cli_io_release_show_ring(struct appctx *appctx);
#endif /* _PROTO_RING_H */

View File

@ -23,7 +23,10 @@
#include <common/compat.h>
#include <common/config.h>
#include <common/hathreads.h>
#include <types/applet.h>
#include <proto/cli.h>
#include <proto/ring.h>
#include <proto/stream_interface.h>
/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
* allocation failure.
@ -189,6 +192,125 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
return sent;
}
/* Tries to attach CLI handler <appctx> as a new reader on ring <ring>. This is
* meant to be used when registering a CLI function to dump a buffer, so it
* returns zero on success, or non-zero on failure with a message in the appctx
* CLI context.
*/
int ring_attach_cli(struct ring *ring, struct appctx *appctx)
{
int users = ring->readers_count;
do {
if (users >= 1)
return cli_err(appctx,
"Sorry, too many watchers (255) on this ring buffer. "
"What could it have so interesting to attract so many watchers ?");
} while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1));
appctx->ctx.cli.p0 = ring;
appctx->ctx.cli.p1 = 0; // start from the oldest event
return 0;
}
/* This function dumps all events from the ring whose pointer is in <p0> into
* the appctx's output buffer, and takes from <p1> the seek offset into the
* buffer's history (0 for oldest known event). It returns 0 if the output
* buffer is full and it needs to be called again, otherwise non-zero. It is
* meant to be used with cli_release_show_ring() to clean up.
*/
int cli_io_handler_show_ring(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct ring *ring = appctx->ctx.cli.p0;
struct buffer *buf = &ring->buf;
size_t ofs = (unsigned long)appctx->ctx.cli.p1;
uint64_t msg_len;
size_t len, cnt;
int ret;
if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
return 1;
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
* dropped events because we'd take a reference on the oldest message
* and keep it while being scheduled. Thus instead let's take it the
* first time we enter here so that we have a chance to pass many
* existing messages before grabbing a reference to a location.
*/
if (unlikely(!ofs)) {
HA_ATOMIC_ADD(b_head(buf), 1);
ofs += ring->ofs;
}
/* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter.
*/
ofs -= ring->ofs;
BUG_ON(ofs >= buf->size);
HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
/* in this loop, ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
* stop before the end (ret=0).
*/
ret = 1;
while (ofs + 1 < b_data(buf)) {
cnt = 1;
len = b_peek_varint(buf, ofs + cnt, &msg_len);
if (!len)
break;
cnt += len;
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
if (unlikely(msg_len + 1 > b_size(&trash))) {
/* too large a message to ever fit, let's skip it */
ofs += cnt + msg_len;
continue;
}
chunk_reset(&trash);
len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
trash.data += len;
trash.area[trash.data++] = '\n';
if (ci_putchk(si_ic(si), &trash) == -1) {
si_rx_room_blk(si);
ret = 0;
break;
}
ofs += cnt + msg_len;
}
HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
ofs += ring->ofs;
appctx->ctx.cli.p1 = (void *)ofs;
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
return ret;
}
/* must be called after cli_io_handler_show_ring() above */
void cli_io_release_show_ring(struct appctx *appctx)
{
struct ring *ring = appctx->ctx.cli.p0;
size_t ofs = (unsigned long)appctx->ctx.cli.p1;
if (!ring)
return;
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
ofs -= ring->ofs;
BUG_ON(ofs >= b_size(&ring->buf));
HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1);
HA_ATOMIC_SUB(&ring->readers_count, 1);
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
}
/*
* Local variables:
* c-indent-level: 8