mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-02-21 21:26:58 +00:00
MEDIUM: ring: use the topmost bit of the tail as a lock
We're now locking the tail while looking for some room in the ring. In fact it's still while writing to it, but the goal definitely is to get rid of the lock ASAP. For this we reserve the topmost bit of the tail as a lock, which may have as a possible visible effect that buffers will be limited to 2GB instead of 4GB on 32-bit machines (though in practise, good luck for allocating more than 2GB contiguous on 32-bit), but in practice since the size is read with atol() and some operating systems limit it to LONG_MAX unless passing negative numbers, the limit is already there. For now the impact on x86_64 is significant (drop from 2.35 to 1.4M/s on 48 threads on EPYC 24 cores) but this situation is only temporary so that changes can be reviewable and bisectable. Other approaches were attempted, such as using XCHG instead, which is slightly faster on x86 with low thread counts (but causes more write contention), and forces readers to stall under heavy traffic because they can't access a valid value for the queue anymore. A CAS requires preloading the value and is les good on ARMv8.1. XADD could also be considered with 12-13 upper bits of the offset dedicated to locking, but that looks overkill.
This commit is contained in:
parent
2192983ffd
commit
eb3d5f464d
@ -218,7 +218,7 @@ int dump_ring_v2(struct ring_v2 *ring, size_t ofs, int flags)
|
||||
/* Now make our own buffer pointing to that area */
|
||||
size = ring->size;
|
||||
head = ring->head;
|
||||
tail = ring->tail;
|
||||
tail = ring->tail & ~RING_TAIL_LOCK;
|
||||
data = (head <= tail ? 0 : size) + tail - head;
|
||||
buf = b_make((void *)ring + ring->rsvd, size, head, data);
|
||||
return dump_ring_as_buf(buf, ofs, flags);
|
||||
|
@ -103,6 +103,9 @@
|
||||
#define RING_WRITING_SIZE 255 /* the next message's size is being written */
|
||||
#define RING_MAX_READERS 254 /* highest supported value for RC */
|
||||
|
||||
/* mask used to lock the tail */
|
||||
#define RING_TAIL_LOCK (1ULL << ((sizeof(size_t) * 8) - 1))
|
||||
|
||||
/* this is the mmapped part */
|
||||
struct ring_storage {
|
||||
size_t size; // storage size
|
||||
|
@ -54,8 +54,10 @@ static inline void *ring_area(const struct ring *ring)
|
||||
/* returns the number of bytes in the ring */
|
||||
static inline size_t ring_data(const struct ring *ring)
|
||||
{
|
||||
return ((ring->storage->head <= ring->storage->tail) ?
|
||||
0 : ring->storage->size) + ring->storage->tail - ring->storage->head;
|
||||
size_t tail = HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK;
|
||||
|
||||
return ((ring->storage->head <= tail) ?
|
||||
0 : ring->storage->size) + tail - ring->storage->head;
|
||||
}
|
||||
|
||||
/* returns the allocated size in bytes for the ring */
|
||||
@ -70,10 +72,10 @@ static inline size_t ring_head(const struct ring *ring)
|
||||
return ring->storage->head;
|
||||
}
|
||||
|
||||
/* returns the tail offset of the ring */
|
||||
/* returns the ring's tail offset without the lock bit */
|
||||
static inline size_t ring_tail(const struct ring *ring)
|
||||
{
|
||||
return ring->storage->tail;
|
||||
return HA_ATOMIC_LOAD(&ring->storage->tail) & ~RING_TAIL_LOCK;
|
||||
}
|
||||
|
||||
/* duplicates ring <src> over ring <dst> for no more than <max> bytes or no
|
||||
|
34
src/ring.c
34
src/ring.c
@ -222,12 +222,32 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
|
||||
ring_area = ring->storage->area;
|
||||
ring_size = ring->storage->size;
|
||||
|
||||
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
|
||||
if (needed + 1 > ring_size)
|
||||
goto leave;
|
||||
|
||||
head_ofs = ring_head(ring);
|
||||
tail_ofs = ring_tail(ring);
|
||||
/* try to get exclusivity on the ring's tail. For this we set the
|
||||
* tail's highest bit, and the one that gets it wins. Many tests were
|
||||
* run on this and the approach below is optimal for armv8.1 atomics,
|
||||
* second-to-optimal with both x86_64 and second-to-optimal on armv8.0.
|
||||
* x86_64 would benefit slightly more from an xchg() which would
|
||||
* require the readers to loop during changes, and armv8.0 is slightly
|
||||
* better there as well (+5%). The CAS is bad for both (requires a
|
||||
* preload), though it might degrade better on large x86 compared to
|
||||
* a busy loop that the compiler would implement for the FETCH_OR.
|
||||
* Alternately we could kill 12 upper bits on a 64-bit tail ofs and
|
||||
* use XADD. Not tested, and would require to undo or watch for the
|
||||
* change (use it as a ticket).
|
||||
*/
|
||||
while (1) {
|
||||
tail_ofs = HA_ATOMIC_FETCH_OR(&ring->storage->tail, RING_TAIL_LOCK);
|
||||
if (!(tail_ofs & RING_TAIL_LOCK))
|
||||
break;
|
||||
pl_wait_unlock_long(&ring->storage->tail, RING_TAIL_LOCK);
|
||||
}
|
||||
|
||||
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
|
||||
|
||||
head_ofs = ring->storage->head;
|
||||
|
||||
/* this is the byte before tail, it contains the users count */
|
||||
lock_ptr = (uint8_t*)ring_area + (tail_ofs > 0 ? tail_ofs - 1 : ring_size - 1);
|
||||
@ -305,7 +325,7 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
|
||||
|
||||
/* update the new space in the buffer */
|
||||
ring->storage->head = head_ofs;
|
||||
ring->storage->tail = tail_ofs;
|
||||
HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs);
|
||||
|
||||
/* notify potential readers */
|
||||
if (sent) {
|
||||
@ -313,8 +333,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
|
||||
appctx_wakeup(appctx);
|
||||
}
|
||||
|
||||
leave:
|
||||
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
|
||||
leave:
|
||||
return sent;
|
||||
}
|
||||
|
||||
@ -417,8 +437,8 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t
|
||||
|
||||
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
|
||||
|
||||
head_ofs = ring->storage->head;
|
||||
tail_ofs = ring->storage->tail;
|
||||
head_ofs = ring_head(ring);
|
||||
tail_ofs = ring_tail(ring);
|
||||
|
||||
/* explanation for the initialization below: it would be better to do
|
||||
* this in the parsing function but this would occasionally result in
|
||||
|
@ -905,6 +905,12 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (size > RING_TAIL_LOCK) {
|
||||
ha_alert("parsing [%s:%d] : too large size '%llu' for new sink buffer, the limit on this platform is %llu bytes.\n", file, linenum, (ullong)size, (ullong)RING_TAIL_LOCK);
|
||||
err_code |= ERR_ALERT | ERR_FATAL;
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (cfg_sink->store) {
|
||||
ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum);
|
||||
err_code |= ERR_ALERT | ERR_FATAL;
|
||||
|
Loading…
Reference in New Issue
Block a user