From 41d3ea521bfdac6734fdbb6c237ef15946691a7c Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Wed, 28 Feb 2024 09:57:00 +0100 Subject: [PATCH] MEDIUM: ring: unlock the ring's tail earlier We know we can continue to protect the message area so we can unlock the tail as soon as we know its new value. Now we're seeing ~6.4M msg/s vs 5.4M previously on 3C6T of a 3rd gen EPYC, and 1.88M vs 1.54M for 24C48T threads, which is a significant gain! This requires to carefully write the new head counter before releasing the writers, and to change the calculation of the work area from tail..head to tail...new_tail while writing the message. --- src/ring.c | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/ring.c b/src/ring.c index 97d10ee27a..94ce0fe5ee 100644 --- a/src/ring.c +++ b/src/ring.c @@ -172,7 +172,7 @@ void ring_free(struct ring *ring) */ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg) { - size_t head_ofs, tail_ofs; + size_t head_ofs, tail_ofs, new_tail_ofs; size_t ring_size; char *ring_area; struct ist v1, v2; @@ -273,8 +273,25 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz vp_skip(&v1, &v2, 1 + dellenlen + dellen); } - /* now let's update the buffer with the new head and size */ - vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + /* now let's update the buffer with the new tail if our message will fit */ + new_tail_ofs = tail_ofs; + if (vp_size(v1, v2) <= ring_size - needed - 1) { + vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); + + /* update the new space in the buffer */ + HA_ATOMIC_STORE(&ring->storage->head, head_ofs); + + /* calculate next tail pointer */ + new_tail_ofs += needed; + if (new_tail_ofs >= ring_size) + new_tail_ofs -= ring_size; + + /* reset next read counter before releasing writers */ + HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0); + } + + /* and release other writers */ + HA_ATOMIC_STORE(&ring->storage->tail, new_tail_ofs); if (vp_size(v1, v2) > ring_size - needed - 1 - 1) { /* we had to stop due to readers blocking the head, @@ -283,8 +300,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz goto done_update_buf; } - /* now focus on free room */ - vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs); + /* now focus on free room between the old and the new tail */ + vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs); /* let's write the message size */ vp_put_varint(&v1, &v2, msglen); @@ -311,20 +328,16 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz msglen += len; } - vp_putchr(&v1, &v2, 0); // new read counter + /* we must not write the read counter, it was already done, + * plus we could ruin the one of the next writer. + */ sent = lenlen + msglen + 1; BUG_ON_HOT(sent != needed); - vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs); - done_update_buf: /* unlock the message area */ HA_ATOMIC_STORE(lock_ptr, readers); - /* update the new space in the buffer */ - ring->storage->head = head_ofs; - HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs); - /* notify potential readers */ if (sent) { HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);