From c62a2d540d2de0deaedd7822d68baea843a818aa Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Tue, 27 Feb 2024 16:54:18 +0100 Subject: [PATCH] MEDIUM: ring: move the ring reader code to ring_dispatch_messages() This new function is made around the loop that scans a ring for new messages and dispatches them to a message handler. It also takes ring flags (WAIT, NEW, etc) and offset pointers that the caller will use to initialize/reuse/update the current processing offset. The caller is still responsible for presetting it to ~0 before the first call if it wants the function to automatically adjust it (or set it to the correct value). The function may also return the last_ofs that was known before releasing the lock so that the caller knows what to compare against and if it needs to restart processing or not. The context remains a void* so that should not necessarily depend on an appctx. The current "show ring" code was ported to this and it continues to work as expected. --- include/haproxy/ring.h | 2 ++ src/ring.c | 81 ++++++++++++++++++++++++++---------------- 2 files changed, 52 insertions(+), 31 deletions(-) diff --git a/include/haproxy/ring.h b/include/haproxy/ring.h index 71217d523..0e8a3bf86 100644 --- a/include/haproxy/ring.h +++ b/include/haproxy/ring.h @@ -42,6 +42,8 @@ int cli_io_handler_show_ring(struct appctx *appctx); void cli_io_release_show_ring(struct appctx *appctx); size_t ring_max_payload(const struct ring *ring); +int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags, + ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len)); #endif /* _HAPROXY_RING_H */ diff --git a/src/ring.c b/src/ring.c index 682bce819..7c78b3301 100644 --- a/src/ring.c +++ b/src/ring.c @@ -328,36 +328,24 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags) return 0; } -/* This function dumps all events from the ring whose pointer is in into - * the appctx's output buffer, and takes from the seek offset into the - * buffer's history (0 for oldest known event). It looks at for boolean - * options: bit0 means it must wait for new data or any key to be pressed. Bit1 - * means it must seek directly to the end to wait for new contents. It returns - * 0 if the output buffer or events are missing is full and it needs to be - * called again, otherwise non-zero. It is meant to be used with - * cli_release_show_ring() to clean up. + +/* parses as many messages as possible from ring , starting at the offset + * stored at *ofs_ptr, with RING_WF_* flags in , and passes them to + * the message handler . If is not NULL, a copy of + * the last known tail pointer will be copied there so that the caller may use + * this to detect new data have arrived since we left the function. Returns 0 + * if it needs to pause, 1 once finished. */ -int cli_io_handler_show_ring(struct appctx *appctx) +int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags, + ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len)) { - struct show_ring_ctx *ctx = appctx->svcctx; - struct stconn *sc = appctx_sc(appctx); - struct ring *ring = ctx->ring; struct buffer *buf = &ring->buf; - size_t ofs; - size_t last_ofs; uint64_t msg_len; - size_t len, cnt; ssize_t copied; + size_t len, cnt; + size_t ofs; int ret; - /* FIXME: Don't watch the other side !*/ - if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE)) - return 1; - - HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); - LIST_DEL_INIT(&appctx->wait_entry); - HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); - HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock); /* explanation for the initialization below: it would be better to do @@ -368,17 +356,17 @@ int cli_io_handler_show_ring(struct appctx *appctx) * existing messages before grabbing a reference to a location. This * value cannot be produced after initialization. */ - if (unlikely(ctx->ofs == ~0)) { + if (unlikely(*ofs_ptr == ~0)) { /* going to the end means looking at tail-1 */ - ctx->ofs = b_peek_ofs(buf, (ctx->flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0); - HA_ATOMIC_INC(b_orig(buf) + ctx->ofs); + *ofs_ptr = b_peek_ofs(buf, (flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0); + HA_ATOMIC_INC(b_orig(buf) + *ofs_ptr); } /* we were already there, adjust the offset to be relative to * the buffer's head and remove us from the counter. */ - ofs = ctx->ofs - b_head_ofs(buf); - if (ctx->ofs < b_head_ofs(buf)) + ofs = *ofs_ptr - b_head_ofs(buf); + if (*ofs_ptr < b_head_ofs(buf)) ofs += b_size(buf); BUG_ON(ofs >= buf->size); @@ -397,7 +385,7 @@ int cli_io_handler_show_ring(struct appctx *appctx) cnt += len; BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); - copied = applet_append_line(appctx, buf, ofs + cnt, msg_len); + copied = msg_handler(ctx, buf, ofs + cnt, msg_len); if (copied == -2) { /* too large a message to ever fit, let's skip it */ ofs += cnt + msg_len; @@ -412,9 +400,40 @@ int cli_io_handler_show_ring(struct appctx *appctx) } HA_ATOMIC_INC(b_peek(buf, ofs)); - last_ofs = b_tail_ofs(buf); - ctx->ofs = b_peek_ofs(buf, ofs); + if (last_ofs_ptr) + *last_ofs_ptr = b_tail_ofs(buf); + *ofs_ptr = b_peek_ofs(buf, ofs); HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock); + return ret; +} + +/* This function dumps all events from the ring whose pointer is in into + * the appctx's output buffer, and takes from the seek offset into the + * buffer's history (0 for oldest known event). It looks at for boolean + * options: bit0 means it must wait for new data or any key to be pressed. Bit1 + * means it must seek directly to the end to wait for new contents. It returns + * 0 if the output buffer or events are missing is full and it needs to be + * called again, otherwise non-zero. It is meant to be used with + * cli_release_show_ring() to clean up. + */ +int cli_io_handler_show_ring(struct appctx *appctx) +{ + struct show_ring_ctx *ctx = appctx->svcctx; + struct stconn *sc = appctx_sc(appctx); + struct ring *ring = ctx->ring; + size_t last_ofs; + size_t ofs; + int ret; + + /* FIXME: Don't watch the other side !*/ + if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE)) + return 1; + + HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock); + LIST_DEL_INIT(&appctx->wait_entry); + HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock); + + ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line); if (ret && (ctx->flags & RING_WF_WAIT_MODE)) { /* we've drained everything and are configured to wait for more