MEDIUM: ring: protect the reader's positions against writers

The reader now needs to protect the positions it's reading. This is
already done via the readers counter at the beginning of messages,
but as long as the lock is present, this counter is decremented
before starting to parse messages, and incremented at the end.

We must now do that in reverse, first protect the end of the messages,
and only then remove ourselves from the already processed messages, so
that at no point could a writer pass over and possibly overwrite data
we're currently watching.
This commit is contained in:
Willy Tarreau 2024-02-28 17:18:34 +01:00
parent 73b2436fe6
commit 2192983ffd

View File

@ -402,7 +402,7 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags)
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
{
size_t head_ofs, tail_ofs;
size_t head_ofs, tail_ofs, prev_ofs;
size_t ring_size;
uint8_t *ring_area;
struct ist v1, v2;
@ -450,11 +450,10 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
head_ofs = *ofs_ptr;
BUG_ON(head_ofs >= ring_size);
/* dec readers count */
do {
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax());
/* we keep track of where we were and we don't release it before
* we've protected the next place.
*/
prev_ofs = head_ofs;
/* in this loop, head_ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
@ -494,12 +493,18 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs);
/* inc readers count */
/* inc readers count on new place */
do {
readers = _HA_ATOMIC_LOAD(ring_area + head_ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax());
/* dec readers count on old place */
do {
readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs);
} while ((readers > RING_MAX_READERS ||
!_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax());
if (last_ofs_ptr)
*last_ofs_ptr = tail_ofs;
*ofs_ptr = head_ofs;