MEDIUM: threads/queue: Make queues thread-safe

The list of pending connections are now protected using the proxy or server
lock, depending on the context.
This commit is contained in:
Christopher Faulet 2017-06-27 15:43:53 +02:00 committed by Willy Tarreau
parent 821bb9beaa
commit 8ba59148ae

View File

@ -13,6 +13,7 @@
#include <common/config.h>
#include <common/memory.h>
#include <common/time.h>
#include <common/hathreads.h>
#include <proto/queue.h>
#include <proto/server.h>
@ -23,6 +24,8 @@
struct pool_head *pool2_pendconn;
static void __pendconn_free(struct pendconn *p);
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_pendconn()
{
@ -116,7 +119,7 @@ static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *p
ps = pp;
}
strm = ps->strm;
pendconn_free(ps);
__pendconn_free(ps);
/* we want to note that the stream has now been assigned a server */
strm->flags |= SF_ASSIGNED;
@ -139,10 +142,12 @@ void process_srv_queue(struct server *s)
struct proxy *p = s->proxy;
int maxconn;
SPIN_LOCK(PROXY_LOCK, &p->lock);
SPIN_LOCK(SERVER_LOCK, &s->lock);
/* First, check if we can handle some connections queued at the proxy. We
* will take as many as we can handle.
*/
maxconn = srv_dynamic_maxconn(s);
while (s->served < maxconn) {
struct stream *strm = pendconn_get_next_strm(s, p);
@ -151,6 +156,8 @@ void process_srv_queue(struct server *s)
break;
task_wakeup(strm->task, TASK_WOKEN_RES);
}
SPIN_UNLOCK(SERVER_LOCK, &s->lock);
SPIN_UNLOCK(PROXY_LOCK, &p->lock);
}
/* Adds the stream <strm> to the pending connection list of server <strm>->srv
@ -163,6 +170,7 @@ struct pendconn *pendconn_add(struct stream *strm)
{
struct pendconn *p;
struct server *srv;
int count;
p = pool_alloc2(pool2_pendconn);
if (!p)
@ -170,21 +178,26 @@ struct pendconn *pendconn_add(struct stream *strm)
strm->pend_pos = p;
p->strm = strm;
p->srv = srv = objt_server(strm->target);
srv = objt_server(strm->target);
if (strm->flags & SF_ASSIGNED && srv) {
if ((strm->flags & SF_ASSIGNED) && srv) {
p->srv = srv;
SPIN_LOCK(SERVER_LOCK, &srv->lock);
LIST_ADDQ(&srv->pendconns, &p->list);
srv->nbpend++;
strm->logs.srv_queue_size += srv->nbpend;
if (srv->nbpend > srv->counters.nbpend_max)
srv->counters.nbpend_max = srv->nbpend;
SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
count = HA_ATOMIC_ADD(&srv->nbpend, 1);
strm->logs.srv_queue_size += count;
HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count);
} else {
p->srv = NULL;
SPIN_LOCK(PROXY_LOCK, &strm->be->lock);
LIST_ADDQ(&strm->be->pendconns, &p->list);
strm->be->nbpend++;
strm->logs.prx_queue_size += strm->be->nbpend;
HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, strm->be->nbpend);
SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock);
count = HA_ATOMIC_ADD(&strm->be->nbpend, 1);
strm->logs.prx_queue_size += count;
HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count);
}
strm->be->totpend++;
HA_ATOMIC_ADD(&strm->be->totpend, 1);
return p;
}
@ -196,6 +209,7 @@ int pendconn_redistribute(struct server *s)
struct pendconn *pc, *pc_bck;
int xferred = 0;
SPIN_LOCK(SERVER_LOCK, &s->lock);
list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) {
struct stream *strm = pc->strm;
@ -208,11 +222,12 @@ int pendconn_redistribute(struct server *s)
/* it's left to the dispatcher to choose a server */
strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
pendconn_free(pc);
__pendconn_free(pc);
task_wakeup(strm->task, TASK_WOKEN_RES);
xferred++;
}
}
SPIN_UNLOCK(SERVER_LOCK, &s->lock);
return xferred;
}
@ -228,6 +243,7 @@ int pendconn_grab_from_px(struct server *s)
if (!srv_currently_usable(s))
return 0;
SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
struct stream *strm;
struct pendconn *p;
@ -237,9 +253,10 @@ int pendconn_grab_from_px(struct server *s)
break;
p->strm->target = &s->obj_type;
strm = p->strm;
pendconn_free(p);
__pendconn_free(p);
task_wakeup(strm->task, TASK_WOKEN_RES);
}
SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
return xferred;
}
@ -250,16 +267,38 @@ int pendconn_grab_from_px(struct server *s)
*/
void pendconn_free(struct pendconn *p)
{
if (p->srv) {
SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
LIST_DEL(&p->list);
SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
HA_ATOMIC_SUB(&p->srv->nbpend, 1);
}
else {
SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock);
LIST_DEL(&p->list);
SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock);
HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
}
p->strm->pend_pos = NULL;
if (p->srv)
p->srv->nbpend--;
else
p->strm->be->nbpend--;
p->strm->be->totpend--;
HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
pool_free2(pool2_pendconn, p);
}
/* Lock-free version of pendconn_free. */
static void __pendconn_free(struct pendconn *p)
{
if (p->srv) {
LIST_DEL(&p->list);
HA_ATOMIC_SUB(&p->srv->nbpend, 1);
}
else {
LIST_DEL(&p->list);
HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
}
p->strm->pend_pos = NULL;
HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
pool_free2(pool2_pendconn, p);
}
/*
* Local variables: