MAJOR: threads/buffer: Make buffer wait queue thread safe

Adds a global lock to protect the buffer wait queue.
This commit is contained in:
Emeric Brun 2017-06-21 15:42:52 +02:00 committed by Willy Tarreau
parent 80527f5bb6
commit a1dd243adb
7 changed files with 35 additions and 3 deletions

View File

@ -52,6 +52,9 @@ extern struct pool_head *pool2_buffer;
extern struct buffer buf_empty;
extern struct buffer buf_wanted;
extern struct list buffer_wq;
#ifdef USE_THREAD
extern HA_SPINLOCK_T buffer_wq_lock;
#endif
int init_buffer();
void deinit_buffer();
@ -748,9 +751,13 @@ void __offer_buffer(void *from, unsigned int threshold);
static inline void offer_buffers(void *from, unsigned int threshold)
{
if (LIST_ISEMPTY(&buffer_wq))
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
if (LIST_ISEMPTY(&buffer_wq)) {
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
return;
}
__offer_buffer(from, threshold);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
/*************************************************************************/

View File

@ -156,6 +156,7 @@ enum lock_label {
STK_SESS_LOCK,
APPLETS_LOCK,
PEER_LOCK,
BUF_WQ_LOCK,
LOCK_LABELS
};
struct lock_stat {
@ -242,7 +243,7 @@ static inline void show_lock_stats()
"TASK_RQ", "TASK_WQ", "POOL",
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
"APPLETS", "PEER" };
"APPLETS", "PEER", "BUF_WQ" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {

View File

@ -88,8 +88,10 @@ static inline void __appctx_free(struct appctx *appctx)
}
if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&appctx->buffer_wait.list);
LIST_INIT(&appctx->buffer_wait.list);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
pool_free2(pool2_connection, appctx);

View File

@ -440,8 +440,12 @@ static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait *
if (b_alloc_margin(&chn->buf, margin) != NULL)
return 1;
if (LIST_ISEMPTY(&wait->list))
if (LIST_ISEMPTY(&wait->list)) {
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_ADDQ(&buffer_wq, &wait->list);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
return 0;
}

View File

@ -33,6 +33,9 @@ struct buffer buf_wanted = { .p = buf_wanted.data };
/* list of objects waiting for at least one buffer */
struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
#ifdef USE_THREAD
HA_SPINLOCK_T buffer_wq_lock;
#endif
/* this buffer is always the same size as standard buffers and is used for
* swapping data inside a buffer.
@ -72,6 +75,8 @@ int init_buffer()
if (global.tune.buf_limit)
pool2_buffer->limit = global.tune.buf_limit;
SPIN_INIT(&buffer_wq_lock);
buffer = pool_refill_alloc(pool2_buffer, pool2_buffer->minavail - 1);
if (!buffer)
return 0;

View File

@ -18,6 +18,7 @@
#include <common/debug.h>
#include <common/memory.h>
#include <common/time.h>
#include <common/hathreads.h>
#include <types/arg.h>
#include <types/global.h>
@ -2685,14 +2686,18 @@ spoe_acquire_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
return 1;
if (!LIST_ISEMPTY(&buffer_wait->list)) {
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&buffer_wait->list);
LIST_INIT(&buffer_wait->list);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (b_alloc_margin(buf, global.tune.reserved_bufs))
return 1;
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_ADDQ(&buffer_wq, &buffer_wait->list);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
return 0;
}
@ -2700,8 +2705,10 @@ static void
spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
if (!LIST_ISEMPTY(&buffer_wait->list)) {
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&buffer_wait->list);
LIST_INIT(&buffer_wait->list);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
/* Release the buffer if needed */

View File

@ -320,8 +320,10 @@ static void stream_free(struct stream *s)
/* We may still be present in the buffer wait queue */
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&s->buffer_wait.list);
LIST_INIT(&s->buffer_wait.list);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (s->req.buf->size || s->res.buf->size) {
b_drop(&s->req.buf);
@ -415,14 +417,18 @@ static void stream_free(struct stream *s)
static int stream_alloc_work_buffer(struct stream *s)
{
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&s->buffer_wait.list);
LIST_INIT(&s->buffer_wait.list);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (b_alloc_margin(&s->res.buf, 0))
return 1;
SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
return 0;
}