MEDIUM: dynbuf: make the buffer_wq an array of list heads

Let's turn the buffer_wq into an array of 4 list heads. These are chosen
by criticality. The DB_CRIT_TO_QUEUE() macro maps each criticality level
into one of these 4 queues. The goal here clearly is to make it possible
to wake up the most critical queues in priority in order to let some tasks
finish their job and release buffers that others can use.

In order to avoid having to look up all queues, a bit map indicates which
queues are in use, which also allows to avoid looping in the most common
case where queues are empty..
This commit is contained in:
Willy Tarreau 2024-04-22 18:39:06 +02:00
parent a214197ce7
commit a5d6a79986
4 changed files with 76 additions and 13 deletions

View File

@ -48,6 +48,9 @@
* snd_buf() handler to encode the outgoing channel's data.
* - buffer permanently allocated at boot (e.g. temporary compression
* buffers). If these fail, we can't boot.
*
* Please DO NOT CHANGE THESE LEVELS without first getting a full understanding
* of how all this works and touching the DB_CRIT_TO_QUEUE() macro below!
*/
enum dynbuf_crit {
DB_GROW_RING = 0, // used to grow an existing buffer ring
@ -61,6 +64,29 @@ enum dynbuf_crit {
DB_PERMANENT, // buffers permanently allocated.
};
/* We'll deal with 4 queues, with indexes numbered from 0 to 3 based on the
* criticality of the allocation. All criticality levels are mapped to a 2-bit
* queue index. While some levels never use the queue (the first two), some of
* the others will share a same queue, and all levels will define a ratio of
* allocated emergency buffers below which we refrain from trying to allocate.
* In practice, for now the thresholds will just be the queue number times 33%
* so that queue 0 is allowed to deplete emergency buffers and queue 3 not at
* all. This gives us: queue idx=3 for DB_MUX_RX and below, 2 for DB_SE_RX,
* 1 for DB_CHANNEL, 0 for DB_MUX_TX and above. This must match the DYNBUF_NBQ
* in tinfo-t.h.
*/
#define DB_CRIT_TO_QUEUE(crit) ((0x000001BF >> ((crit) * 2)) & 3)
#define DB_GROW_RING_Q DB_CRIT_TO_QUEUE(DB_GROW_RING)
#define DB_UNLIKELY_Q DB_CRIT_TO_QUEUE(DB_UNLIKELY)
#define DB_MUX_RX_Q DB_CRIT_TO_QUEUE(DB_MUX_RX)
#define DB_SE_RX_Q DB_CRIT_TO_QUEUE(DB_SE_RX)
#define DB_CHANNEL_Q DB_CRIT_TO_QUEUE(DB_CHANNEL)
#define DB_MUX_TX_Q DB_CRIT_TO_QUEUE(DB_MUX_TX)
#define DB_PERMANENT_Q DB_CRIT_TO_QUEUE(DB_PERMANENT)
/* an element of the <buffer_wq> list. It represents an object that need to
* acquire a buffer to continue its process. */
struct buffer_wait {

View File

@ -117,8 +117,19 @@ void __offer_buffers(void *from, unsigned int count);
static inline void offer_buffers(void *from, unsigned int count)
{
if (!LIST_ISEMPTY(&th_ctx->buffer_wq))
int q;
if (likely(!th_ctx->bufq_map))
return;
for (q = 0; q < DYNBUF_NBQ; q++) {
if (!(th_ctx->bufq_map & (1 << q)))
continue;
BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q]));
__offer_buffers(from, count);
break;
}
}
/* Queues a buffer request for the current thread via <bw>, and returns
@ -130,6 +141,8 @@ static inline void offer_buffers(void *from, unsigned int count)
*/
static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw)
{
int q = DB_CRIT_TO_QUEUE(crit);
if (LIST_INLIST(&bw->list))
return 1;
@ -137,7 +150,8 @@ static inline int b_requeue(enum dynbuf_crit crit, struct buffer_wait *bw)
if (crit < DB_MUX_RX)
return 0;
LIST_APPEND(&th_ctx->buffer_wq, &bw->list);
th_ctx->bufq_map |= 1 << q;
LIST_APPEND(&th_ctx->buffer_wq[q], &bw->list);
return 1;
}

View File

@ -65,6 +65,8 @@ enum {
#define TH_FL_STARTED 0x00000010 /* set once the thread starts */
#define TH_FL_IN_LOOP 0x00000020 /* set only inside the polling loop */
/* we have 4 buffer-wait queues, in highest to lowest emergency order */
#define DYNBUF_NBQ 4
/* Thread group information. This defines a base and a count of global thread
* IDs which belong to it, and which can be looked up into thread_info/ctx. It
@ -133,14 +135,15 @@ struct thread_ctx {
int current_queue; /* points to current tasklet list being run, -1 if none */
unsigned int nb_tasks; /* number of tasks allocated on this thread */
uint8_t tl_class_mask; /* bit mask of non-empty tasklets classes */
uint8_t bufq_map; /* one bit per non-empty buffer_wq */
// 7 bytes hole here
// 6 bytes hole here
struct list pool_lru_head; /* oldest objects in thread-local pool caches */
struct list buffer_wq; /* buffer waiters */
struct list streams; /* list of streams attached to this thread */
struct list quic_conns; /* list of active quic-conns attached to this thread */
struct list quic_conns_clo; /* list of closing quic-conns attached to this thread */
struct list queued_checks; /* checks waiting for a connection slot */
struct list buffer_wq[DYNBUF_NBQ]; /* buffer waiters, 4 criticality-based queues */
unsigned int nb_rhttp_conns; /* count of current conns used for active reverse HTTP */
ALWAYS_ALIGN(2*sizeof(void*));

View File

@ -30,13 +30,24 @@ int init_buffer()
void *buffer;
int thr;
int done;
int i;
pool_head_buffer = create_pool("buffer", global.tune.bufsize, MEM_F_SHARED|MEM_F_EXACT);
if (!pool_head_buffer)
return 0;
for (thr = 0; thr < MAX_THREADS; thr++)
LIST_INIT(&ha_thread_ctx[thr].buffer_wq);
/* make sure any change to the queues assignment isn't overlooked */
BUG_ON(DB_PERMANENT - DB_UNLIKELY - 1 != DYNBUF_NBQ);
BUG_ON(DB_MUX_RX_Q < DB_SE_RX_Q || DB_MUX_RX_Q >= DYNBUF_NBQ);
BUG_ON(DB_SE_RX_Q < DB_CHANNEL_Q || DB_SE_RX_Q >= DYNBUF_NBQ);
BUG_ON(DB_CHANNEL_Q < DB_MUX_TX_Q || DB_CHANNEL_Q >= DYNBUF_NBQ);
BUG_ON(DB_MUX_TX_Q >= DYNBUF_NBQ);
for (thr = 0; thr < MAX_THREADS; thr++) {
for (i = 0; i < DYNBUF_NBQ; i++)
LIST_INIT(&ha_thread_ctx[thr].buffer_wq[i]);
ha_thread_ctx[thr].bufq_map = 0;
}
/* The reserved buffer is what we leave behind us. Thus we always need
@ -104,6 +115,7 @@ void buffer_dump(FILE *o, struct buffer *b, int from, int to)
void __offer_buffers(void *from, unsigned int count)
{
struct buffer_wait *wait, *wait_back;
int q;
/* For now, we consider that all objects need 1 buffer, so we can stop
* waking up them once we have enough of them to eat all the available
@ -111,15 +123,23 @@ void __offer_buffers(void *from, unsigned int count)
* other tasks, but that's a rough estimate. Similarly, for each cached
* event we'll need 1 buffer.
*/
list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq, list) {
if (!count)
break;
if (wait->target == from || !wait->wakeup_cb(wait->target))
for (q = 0; q < DYNBUF_NBQ; q++) {
if (!(th_ctx->bufq_map & (1 << q)))
continue;
BUG_ON_HOT(LIST_ISEMPTY(&th_ctx->buffer_wq[q]));
LIST_DEL_INIT(&wait->list);
count--;
list_for_each_entry_safe(wait, wait_back, &th_ctx->buffer_wq[q], list) {
if (!count)
break;
if (wait->target == from || !wait->wakeup_cb(wait->target))
continue;
LIST_DEL_INIT(&wait->list);
count--;
}
if (LIST_ISEMPTY(&th_ctx->buffer_wq[q]))
th_ctx->bufq_map &= ~(1 << q);
}
}