MEDIUM: ring: make the offset relative to the head/tail instead of absolute

The ring's offset currently contains a perpetually growing custor which
is the number of bytes written from the start. It's used by readers to
know where to (re)start reading from. It was made absolute because both
the head and the tail can change during writes and we needed a fixed
position to know where the reader was attached. But this is complicated,
error-prone, and limits the ability to reduce the lock's coverage. In
fact what is needed is to know where the reader is currently waiting, if
at all. And this location is exactly where it stored its count, so the
absolute position in the buffer (the seek offset from the first storage
byte) does represent exactly this, as it doesn't move (we don't realign
the buffer), and is stable regardless of how head/tail changes with writes.

This patch modifies this so that the application code now uses this
representation instead. The most noticeable change is the initialization,
where we've kept ~0 as a marker to go to the end, and it's now set to
the tail offset instead of trying to resolve the current write offset
against the current ring's position.

The offset was also used at the end of the consuming loop, to detect
if a new write had happened between the lock being released and taken
again, so as to wake the consumer(s) up again. For this we used to
take a copy of the ring->ofs before unlocking and comparing with the
new value read in the next lock. Since it's not possible to write past
the current reader's location, there's no risk of complete rollover, so
it's sufficient to check if the tail has changed.

Note that the change also has an impact on the haring consumer which
needs to adapt as well. But that's good in fact because it will rely
on one less variable, and will use offsets relative to the buffer's
head, and the change remains backward-compatible.
This commit is contained in:
Willy Tarreau 2023-02-22 14:50:14 +01:00
parent d0d85d2e36
commit d9c7188633
4 changed files with 23 additions and 45 deletions

View File

@ -123,20 +123,14 @@ int dump_ring(struct ring *ring, size_t ofs, int flags)
ofs = 0; ofs = 0;
/* going to the end means looking at tail-1 */ /* going to the end means looking at tail-1 */
if (flags & RING_WF_SEEK_NEW) ofs = (flags & RING_WF_SEEK_NEW) ? buf.data - 1 : 0;
ofs += b_data(&buf) - 1;
//HA_ATOMIC_INC(b_peek(&buf, ofs)); //HA_ATOMIC_INC(b_peek(&buf, ofs));
ofs += ring->ofs;
} }
while (1) { while (1) {
//HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); //HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
/* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter.
*/
ofs -= ring->ofs;
if (ofs >= buf.size) { if (ofs >= buf.size) {
fprintf(stderr, "FATAL error at %d\n", __LINE__); fprintf(stderr, "FATAL error at %d\n", __LINE__);
return 1; return 1;
@ -203,7 +197,6 @@ int dump_ring(struct ring *ring, size_t ofs, int flags)
} }
//HA_ATOMIC_INC(b_peek(&buf, ofs)); //HA_ATOMIC_INC(b_peek(&buf, ofs));
ofs += ring->ofs;
//HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); //HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
if (!(flags & RING_WF_WAIT_MODE)) if (!(flags & RING_WF_WAIT_MODE))

View File

@ -330,13 +330,11 @@ static void dns_resolve_send(struct dgram_conn *dgram)
if (unlikely(ofs == ~0)) { if (unlikely(ofs == ~0)) {
ofs = 0; ofs = 0;
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
} }
/* we were already there, adjust the offset to be relative to /* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter. * the buffer's head and remove us from the counter.
*/ */
ofs -= ring->ofs;
BUG_ON(ofs >= buf->size); BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_peek(buf, ofs)); HA_ATOMIC_DEC(b_peek(buf, ofs));
@ -380,7 +378,6 @@ static void dns_resolve_send(struct dgram_conn *dgram)
out: out:
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
ns->dgram->ofs_req = ofs; ns->dgram->ofs_req = ofs;
HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock); HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
HA_SPIN_UNLOCK(DNS_LOCK, &dgram->lock); HA_SPIN_UNLOCK(DNS_LOCK, &dgram->lock);
@ -498,7 +495,6 @@ static void dns_session_io_handler(struct appctx *appctx)
ofs = 0; ofs = 0;
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
} }
/* in this loop, ofs always points to the counter byte that precedes /* in this loop, ofs always points to the counter byte that precedes
@ -509,7 +505,6 @@ static void dns_session_io_handler(struct appctx *appctx)
/* we were already there, adjust the offset to be relative to /* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter. * the buffer's head and remove us from the counter.
*/ */
ofs -= ring->ofs;
BUG_ON(ofs >= buf->size); BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_peek(buf, ofs)); HA_ATOMIC_DEC(b_peek(buf, ofs));
@ -637,7 +632,6 @@ static void dns_session_io_handler(struct appctx *appctx)
} }
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
ds->ofs = ofs; ds->ofs = ofs;
} }
HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock); HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
@ -1129,13 +1123,11 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
if (unlikely(ofs == ~0)) { if (unlikely(ofs == ~0)) {
ofs = 0; ofs = 0;
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
} }
/* we were already there, adjust the offset to be relative to /* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter. * the buffer's head and remove us from the counter.
*/ */
ofs -= ring->ofs;
BUG_ON(ofs >= buf->size); BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_peek(buf, ofs)); HA_ATOMIC_DEC(b_peek(buf, ofs));
@ -1224,7 +1216,6 @@ static struct task *dns_process_req(struct task *t, void *context, unsigned int
} }
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
dss->ofs_req = ofs; dss->ofs_req = ofs;
HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock); HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);

View File

@ -31,7 +31,7 @@
/* context used to dump the contents of a ring via "show events" or "show errors" */ /* context used to dump the contents of a ring via "show events" or "show errors" */
struct show_ring_ctx { struct show_ring_ctx {
struct ring *ring; /* ring to be dumped */ struct ring *ring; /* ring to be dumped */
size_t ofs; /* offset to restart from, ~0 = end */ size_t ofs; /* storage offset to restart from; ~0=oldest */
uint flags; /* set of RING_WF_* */ uint flags; /* set of RING_WF_* */
}; };
@ -278,8 +278,9 @@ int ring_attach(struct ring *ring)
return 1; return 1;
} }
/* detach an appctx from a ring. The appctx is expected to be waiting at /* detach an appctx from a ring. The appctx is expected to be waiting at offset
* offset <ofs>. Nothing is done if <ring> is NULL. * <ofs> relative to the beginning of the storage, or ~0 if not waiting yet.
* Nothing is done if <ring> is NULL.
*/ */
void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs) void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
{ {
@ -289,7 +290,11 @@ void ring_detach_appctx(struct ring *ring, struct appctx *appctx, size_t ofs)
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
if (ofs != ~0) { if (ofs != ~0) {
/* reader was still attached */ /* reader was still attached */
ofs -= ring->ofs; if (ofs < b_head_ofs(&ring->buf))
ofs += b_size(&ring->buf) - b_head_ofs(&ring->buf);
else
ofs -= b_head_ofs(&ring->buf);
BUG_ON(ofs >= b_size(&ring->buf)); BUG_ON(ofs >= b_size(&ring->buf));
LIST_DEL_INIT(&appctx->wait_entry); LIST_DEL_INIT(&appctx->wait_entry);
HA_ATOMIC_DEC(b_peek(&ring->buf, ofs)); HA_ATOMIC_DEC(b_peek(&ring->buf, ofs));
@ -340,7 +345,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
struct stconn *sc = appctx_sc(appctx); struct stconn *sc = appctx_sc(appctx);
struct ring *ring = ctx->ring; struct ring *ring = ctx->ring;
struct buffer *buf = &ring->buf; struct buffer *buf = &ring->buf;
size_t ofs = ctx->ofs; size_t ofs;
size_t last_ofs; size_t last_ofs;
uint64_t msg_len; uint64_t msg_len;
size_t len, cnt; size_t len, cnt;
@ -363,21 +368,19 @@ int cli_io_handler_show_ring(struct appctx *appctx)
* existing messages before grabbing a reference to a location. This * existing messages before grabbing a reference to a location. This
* value cannot be produced after initialization. * value cannot be produced after initialization.
*/ */
if (unlikely(ofs == ~0)) { if (unlikely(ctx->ofs == ~0)) {
ofs = 0;
/* going to the end means looking at tail-1 */ /* going to the end means looking at tail-1 */
if (ctx->flags & RING_WF_SEEK_NEW) ctx->ofs = b_peek_ofs(buf, (ctx->flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
ofs += b_data(buf) - 1; HA_ATOMIC_INC(b_orig(buf) + ctx->ofs);
HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
} }
/* we were already there, adjust the offset to be relative to /* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter. * the buffer's head and remove us from the counter.
*/ */
ofs -= ring->ofs; ofs = ctx->ofs - b_head_ofs(buf);
if (ctx->ofs < b_head_ofs(buf))
ofs += b_size(buf);
BUG_ON(ofs >= buf->size); BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_peek(buf, ofs)); HA_ATOMIC_DEC(b_peek(buf, ofs));
@ -413,9 +416,8 @@ int cli_io_handler_show_ring(struct appctx *appctx)
} }
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs; last_ofs = b_tail_ofs(buf);
last_ofs = ring->ofs; ctx->ofs = b_peek_ofs(buf, ofs);
ctx->ofs = ofs;
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) { if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
@ -426,7 +428,7 @@ int cli_io_handler_show_ring(struct appctx *appctx)
/* let's be woken up once new data arrive */ /* let's be woken up once new data arrive */
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_APPEND(&ring->waiters, &appctx->wait_entry); LIST_APPEND(&ring->waiters, &appctx->wait_entry);
ofs = ring->ofs; ofs = b_tail_ofs(&ring->buf);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
if (ofs != last_ofs) { if (ofs != last_ofs) {
/* more data was added into the ring between the /* more data was added into the ring between the

View File

@ -362,9 +362,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
*/ */
if (unlikely(ofs == ~0)) { if (unlikely(ofs == ~0)) {
ofs = 0; ofs = 0;
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
} }
/* in this loop, ofs always points to the counter byte that precedes /* in this loop, ofs always points to the counter byte that precedes
@ -375,7 +373,6 @@ static void sink_forward_io_handler(struct appctx *appctx)
/* we were already there, adjust the offset to be relative to /* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter. * the buffer's head and remove us from the counter.
*/ */
ofs -= ring->ofs;
BUG_ON(ofs >= buf->size); BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_peek(buf, ofs)); HA_ATOMIC_DEC(b_peek(buf, ofs));
@ -407,9 +404,8 @@ static void sink_forward_io_handler(struct appctx *appctx)
} }
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs; last_ofs = b_tail_ofs(buf);
sft->ofs = ofs; sft->ofs = ofs;
last_ofs = ring->ofs;
} }
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
@ -417,7 +413,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
/* let's be woken up once new data arrive */ /* let's be woken up once new data arrive */
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_APPEND(&ring->waiters, &appctx->wait_entry); LIST_APPEND(&ring->waiters, &appctx->wait_entry);
ofs = ring->ofs; ofs = b_tail_ofs(buf);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
if (ofs != last_ofs) { if (ofs != last_ofs) {
/* more data was added into the ring between the /* more data was added into the ring between the
@ -502,9 +498,7 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
*/ */
if (unlikely(ofs == ~0)) { if (unlikely(ofs == ~0)) {
ofs = 0; ofs = 0;
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
} }
/* in this loop, ofs always points to the counter byte that precedes /* in this loop, ofs always points to the counter byte that precedes
@ -515,7 +509,6 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
/* we were already there, adjust the offset to be relative to /* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter. * the buffer's head and remove us from the counter.
*/ */
ofs -= ring->ofs;
BUG_ON(ofs >= buf->size); BUG_ON(ofs >= buf->size);
HA_ATOMIC_DEC(b_peek(buf, ofs)); HA_ATOMIC_DEC(b_peek(buf, ofs));
@ -551,7 +544,6 @@ static void sink_forward_oc_io_handler(struct appctx *appctx)
} }
HA_ATOMIC_INC(b_peek(buf, ofs)); HA_ATOMIC_INC(b_peek(buf, ofs));
ofs += ring->ofs;
sft->ofs = ofs; sft->ofs = ofs;
} }
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);