From a1dd243adb0b9ebf78263a5f2ec5729e903abe32 Mon Sep 17 00:00:00 2001 From: Emeric Brun Date: Wed, 21 Jun 2017 15:42:52 +0200 Subject: [PATCH] MAJOR: threads/buffer: Make buffer wait queue thread safe Adds a global lock to protect the buffer wait queue. --- include/common/buffer.h | 9 ++++++++- include/common/hathreads.h | 3 ++- include/proto/applet.h | 2 ++ include/proto/channel.h | 6 +++++- src/buffer.c | 5 +++++ src/flt_spoe.c | 7 +++++++ src/stream.c | 6 ++++++ 7 files changed, 35 insertions(+), 3 deletions(-) diff --git a/include/common/buffer.h b/include/common/buffer.h index 17931cf209..f11d6a9621 100644 --- a/include/common/buffer.h +++ b/include/common/buffer.h @@ -52,6 +52,9 @@ extern struct pool_head *pool2_buffer; extern struct buffer buf_empty; extern struct buffer buf_wanted; extern struct list buffer_wq; +#ifdef USE_THREAD +extern HA_SPINLOCK_T buffer_wq_lock; +#endif int init_buffer(); void deinit_buffer(); @@ -748,9 +751,13 @@ void __offer_buffer(void *from, unsigned int threshold); static inline void offer_buffers(void *from, unsigned int threshold) { - if (LIST_ISEMPTY(&buffer_wq)) + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); + if (LIST_ISEMPTY(&buffer_wq)) { + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); return; + } __offer_buffer(from, threshold); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); } /*************************************************************************/ diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 3a77bd1757..1717cc9b78 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -156,6 +156,7 @@ enum lock_label { STK_SESS_LOCK, APPLETS_LOCK, PEER_LOCK, + BUF_WQ_LOCK, LOCK_LABELS }; struct lock_stat { @@ -242,7 +243,7 @@ static inline void show_lock_stats() "TASK_RQ", "TASK_WQ", "POOL", "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER", "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS", - "APPLETS", "PEER" }; + "APPLETS", "PEER", "BUF_WQ" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/proto/applet.h b/include/proto/applet.h index d9f0ce2dff..766fc92311 100644 --- a/include/proto/applet.h +++ b/include/proto/applet.h @@ -88,8 +88,10 @@ static inline void __appctx_free(struct appctx *appctx) } if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) { + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_DEL(&appctx->buffer_wait.list); LIST_INIT(&appctx->buffer_wait.list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); } pool_free2(pool2_connection, appctx); diff --git a/include/proto/channel.h b/include/proto/channel.h index 9e12b5efc4..83ad0aab06 100644 --- a/include/proto/channel.h +++ b/include/proto/channel.h @@ -440,8 +440,12 @@ static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait * if (b_alloc_margin(&chn->buf, margin) != NULL) return 1; - if (LIST_ISEMPTY(&wait->list)) + if (LIST_ISEMPTY(&wait->list)) { + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_ADDQ(&buffer_wq, &wait->list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); + } + return 0; } diff --git a/src/buffer.c b/src/buffer.c index 83e4e9e36f..e892d1e4d4 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -33,6 +33,9 @@ struct buffer buf_wanted = { .p = buf_wanted.data }; /* list of objects waiting for at least one buffer */ struct list buffer_wq = LIST_HEAD_INIT(buffer_wq); +#ifdef USE_THREAD +HA_SPINLOCK_T buffer_wq_lock; +#endif /* this buffer is always the same size as standard buffers and is used for * swapping data inside a buffer. @@ -72,6 +75,8 @@ int init_buffer() if (global.tune.buf_limit) pool2_buffer->limit = global.tune.buf_limit; + SPIN_INIT(&buffer_wq_lock); + buffer = pool_refill_alloc(pool2_buffer, pool2_buffer->minavail - 1); if (!buffer) return 0; diff --git a/src/flt_spoe.c b/src/flt_spoe.c index aa3f37a103..7fc4ed87f3 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -2685,14 +2686,18 @@ spoe_acquire_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) return 1; if (!LIST_ISEMPTY(&buffer_wait->list)) { + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_DEL(&buffer_wait->list); LIST_INIT(&buffer_wait->list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); } if (b_alloc_margin(buf, global.tune.reserved_bufs)) return 1; + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_ADDQ(&buffer_wq, &buffer_wait->list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); return 0; } @@ -2700,8 +2705,10 @@ static void spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait) { if (!LIST_ISEMPTY(&buffer_wait->list)) { + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_DEL(&buffer_wait->list); LIST_INIT(&buffer_wait->list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); } /* Release the buffer if needed */ diff --git a/src/stream.c b/src/stream.c index 8975638ac5..51d235454f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -320,8 +320,10 @@ static void stream_free(struct stream *s) /* We may still be present in the buffer wait queue */ if (!LIST_ISEMPTY(&s->buffer_wait.list)) { + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_DEL(&s->buffer_wait.list); LIST_INIT(&s->buffer_wait.list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); } if (s->req.buf->size || s->res.buf->size) { b_drop(&s->req.buf); @@ -415,14 +417,18 @@ static void stream_free(struct stream *s) static int stream_alloc_work_buffer(struct stream *s) { if (!LIST_ISEMPTY(&s->buffer_wait.list)) { + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_DEL(&s->buffer_wait.list); LIST_INIT(&s->buffer_wait.list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); } if (b_alloc_margin(&s->res.buf, 0)) return 1; + SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock); LIST_ADDQ(&buffer_wq, &s->buffer_wait.list); + SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock); return 0; }