MEDIUM: ring: unlock the ring's tail earlier
We know we can continue to protect the message area so we can unlock the tail as soon as we know its new value. Now we're seeing ~6.4M msg/s vs 5.4M previously on 3C6T of a 3rd gen EPYC, and 1.88M vs 1.54M for 24C48T threads, which is a significant gain! This requires to carefully write the new head counter before releasing the writers, and to change the calculation of the work area from tail..head to tail...new_tail while writing the message.
This commit is contained in:
parent
3cdd3d27a8
commit
41d3ea521b
37
src/ring.c
37
src/ring.c
|
@ -172,7 +172,7 @@ void ring_free(struct ring *ring)
|
|||
*/
|
||||
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
|
||||
{
|
||||
size_t head_ofs, tail_ofs;
|
||||
size_t head_ofs, tail_ofs, new_tail_ofs;
|
||||
size_t ring_size;
|
||||
char *ring_area;
|
||||
struct ist v1, v2;
|
||||
|
@ -273,8 +273,25 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
|
|||
vp_skip(&v1, &v2, 1 + dellenlen + dellen);
|
||||
}
|
||||
|
||||
/* now let's update the buffer with the new head and size */
|
||||
vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
|
||||
/* now let's update the buffer with the new tail if our message will fit */
|
||||
new_tail_ofs = tail_ofs;
|
||||
if (vp_size(v1, v2) <= ring_size - needed - 1) {
|
||||
vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
|
||||
|
||||
/* update the new space in the buffer */
|
||||
HA_ATOMIC_STORE(&ring->storage->head, head_ofs);
|
||||
|
||||
/* calculate next tail pointer */
|
||||
new_tail_ofs += needed;
|
||||
if (new_tail_ofs >= ring_size)
|
||||
new_tail_ofs -= ring_size;
|
||||
|
||||
/* reset next read counter before releasing writers */
|
||||
HA_ATOMIC_STORE(ring_area + (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), 0);
|
||||
}
|
||||
|
||||
/* and release other writers */
|
||||
HA_ATOMIC_STORE(&ring->storage->tail, new_tail_ofs);
|
||||
|
||||
if (vp_size(v1, v2) > ring_size - needed - 1 - 1) {
|
||||
/* we had to stop due to readers blocking the head,
|
||||
|
@ -283,8 +300,8 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
|
|||
goto done_update_buf;
|
||||
}
|
||||
|
||||
/* now focus on free room */
|
||||
vp_ring_to_room(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
|
||||
/* now focus on free room between the old and the new tail */
|
||||
vp_ring_to_room(&v1, &v2, ring_area, ring_size, (new_tail_ofs > 0 ? new_tail_ofs - 1 : ring_size - 1), tail_ofs);
|
||||
|
||||
/* let's write the message size */
|
||||
vp_put_varint(&v1, &v2, msglen);
|
||||
|
@ -311,20 +328,16 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
|
|||
msglen += len;
|
||||
}
|
||||
|
||||
vp_putchr(&v1, &v2, 0); // new read counter
|
||||
/* we must not write the read counter, it was already done,
|
||||
* plus we could ruin the one of the next writer.
|
||||
*/
|
||||
sent = lenlen + msglen + 1;
|
||||
BUG_ON_HOT(sent != needed);
|
||||
|
||||
vp_room_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
|
||||
|
||||
done_update_buf:
|
||||
/* unlock the message area */
|
||||
HA_ATOMIC_STORE(lock_ptr, readers);
|
||||
|
||||
/* update the new space in the buffer */
|
||||
ring->storage->head = head_ofs;
|
||||
HA_ATOMIC_STORE(&ring->storage->tail, tail_ofs);
|
||||
|
||||
/* notify potential readers */
|
||||
if (sent) {
|
||||
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
|
||||
|
|
Loading…
Reference in New Issue