OPTIM: ring: have only one thread at a time wake up all readers

It's inefficient and counter-productive that each ring writer iterates
over all readers to wake them up. Let's just have one in charge of this,
it strongly limits contention. The only thing is that since the thread
is iterating over a list, we want to be sure that if the first readers
have already completed their job, they will be woken up again. For this
we keep a counter of messages delivered after the wakeup started, and
the waking thread will check it before going back to sleep. In order to
avoid looping forever, it will also drop its waking flag soon enough to
possibly let another one take it.

There used to be a few cases of watchdogs before this on a 24-core AMD
EPYC platform on the list iteration those never appeared anymore.
The perf has dropped a bit on 3C6T on the EPYC, from 6.61 to 6.0M but
remains unchanged at 24C48T.
This commit is contained in:
Willy Tarreau 2024-03-02 11:09:37 +01:00
parent 1f8b14b7be
commit c7bd7a68e4
2 changed files with 14 additions and 5 deletions

View File

@ -123,7 +123,9 @@ struct ring {
struct ring_storage *storage; // the mapped part
struct mt_list waiters; // list of waiters, for now, CLI "show event"
int readers_count;
uint flags; // RING_FL_*
uint flags; // RING_FL_*
uint pending; // new writes that have not yet been subject to a wakeup
uint waking; // indicates a thread is currently waking up readers
};
#endif /* _HAPROXY_RING_T_H */

View File

@ -49,6 +49,8 @@ void ring_init(struct ring *ring, void *area, size_t size, int reset)
ring->readers_count = 0;
ring->flags = 0;
ring->storage = area;
ring->pending = 0;
ring->waking = 0;
if (reset) {
ring->storage->size = size - sizeof(*ring->storage);
@ -338,11 +340,16 @@ ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], siz
/* notify potential readers */
if (sent && HA_ATOMIC_LOAD(&ring->readers_count)) {
struct mt_list *elt1, elt2;
struct appctx *appctx;
HA_ATOMIC_INC(&ring->pending);
while (HA_ATOMIC_LOAD(&ring->pending) && HA_ATOMIC_XCHG(&ring->waking, 1) == 0) {
struct mt_list *elt1, elt2;
struct appctx *appctx;
mt_list_for_each_entry_safe(appctx, &ring->waiters, wait_entry, elt1, elt2)
appctx_wakeup(appctx);
HA_ATOMIC_STORE(&ring->pending, 0);
mt_list_for_each_entry_safe(appctx, &ring->waiters, wait_entry, elt1, elt2)
appctx_wakeup(appctx);
HA_ATOMIC_STORE(&ring->waking, 0);
}
}
leave: