diff --git a/src/queue.c b/src/queue.c index 95b8edafb..93d3e9438 100644 --- a/src/queue.c +++ b/src/queue.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -23,6 +24,8 @@ struct pool_head *pool2_pendconn; +static void __pendconn_free(struct pendconn *p); + /* perform minimal intializations, report 0 in case of error, 1 if OK. */ int init_pendconn() { @@ -116,7 +119,7 @@ static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *p ps = pp; } strm = ps->strm; - pendconn_free(ps); + __pendconn_free(ps); /* we want to note that the stream has now been assigned a server */ strm->flags |= SF_ASSIGNED; @@ -139,10 +142,12 @@ void process_srv_queue(struct server *s) struct proxy *p = s->proxy; int maxconn; + SPIN_LOCK(PROXY_LOCK, &p->lock); + SPIN_LOCK(SERVER_LOCK, &s->lock); + /* First, check if we can handle some connections queued at the proxy. We * will take as many as we can handle. */ - maxconn = srv_dynamic_maxconn(s); while (s->served < maxconn) { struct stream *strm = pendconn_get_next_strm(s, p); @@ -151,6 +156,8 @@ void process_srv_queue(struct server *s) break; task_wakeup(strm->task, TASK_WOKEN_RES); } + SPIN_UNLOCK(SERVER_LOCK, &s->lock); + SPIN_UNLOCK(PROXY_LOCK, &p->lock); } /* Adds the stream to the pending connection list of server ->srv @@ -163,6 +170,7 @@ struct pendconn *pendconn_add(struct stream *strm) { struct pendconn *p; struct server *srv; + int count; p = pool_alloc2(pool2_pendconn); if (!p) @@ -170,21 +178,26 @@ struct pendconn *pendconn_add(struct stream *strm) strm->pend_pos = p; p->strm = strm; - p->srv = srv = objt_server(strm->target); + srv = objt_server(strm->target); - if (strm->flags & SF_ASSIGNED && srv) { + if ((strm->flags & SF_ASSIGNED) && srv) { + p->srv = srv; + SPIN_LOCK(SERVER_LOCK, &srv->lock); LIST_ADDQ(&srv->pendconns, &p->list); - srv->nbpend++; - strm->logs.srv_queue_size += srv->nbpend; - if (srv->nbpend > srv->counters.nbpend_max) - srv->counters.nbpend_max = srv->nbpend; + SPIN_UNLOCK(SERVER_LOCK, &srv->lock); + count = HA_ATOMIC_ADD(&srv->nbpend, 1); + strm->logs.srv_queue_size += count; + HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count); } else { + p->srv = NULL; + SPIN_LOCK(PROXY_LOCK, &strm->be->lock); LIST_ADDQ(&strm->be->pendconns, &p->list); - strm->be->nbpend++; - strm->logs.prx_queue_size += strm->be->nbpend; - HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, strm->be->nbpend); + SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock); + count = HA_ATOMIC_ADD(&strm->be->nbpend, 1); + strm->logs.prx_queue_size += count; + HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count); } - strm->be->totpend++; + HA_ATOMIC_ADD(&strm->be->totpend, 1); return p; } @@ -196,6 +209,7 @@ int pendconn_redistribute(struct server *s) struct pendconn *pc, *pc_bck; int xferred = 0; + SPIN_LOCK(SERVER_LOCK, &s->lock); list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) { struct stream *strm = pc->strm; @@ -208,11 +222,12 @@ int pendconn_redistribute(struct server *s) /* it's left to the dispatcher to choose a server */ strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET); - pendconn_free(pc); + __pendconn_free(pc); task_wakeup(strm->task, TASK_WOKEN_RES); xferred++; } } + SPIN_UNLOCK(SERVER_LOCK, &s->lock); return xferred; } @@ -228,6 +243,7 @@ int pendconn_grab_from_px(struct server *s) if (!srv_currently_usable(s)) return 0; + SPIN_LOCK(PROXY_LOCK, &s->proxy->lock); for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) { struct stream *strm; struct pendconn *p; @@ -237,9 +253,10 @@ int pendconn_grab_from_px(struct server *s) break; p->strm->target = &s->obj_type; strm = p->strm; - pendconn_free(p); + __pendconn_free(p); task_wakeup(strm->task, TASK_WOKEN_RES); } + SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock); return xferred; } @@ -250,16 +267,38 @@ int pendconn_grab_from_px(struct server *s) */ void pendconn_free(struct pendconn *p) { - LIST_DEL(&p->list); + if (p->srv) { + SPIN_LOCK(SERVER_LOCK, &p->srv->lock); + LIST_DEL(&p->list); + SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock); + HA_ATOMIC_SUB(&p->srv->nbpend, 1); + } + else { + SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock); + LIST_DEL(&p->list); + SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock); + HA_ATOMIC_SUB(&p->strm->be->nbpend, 1); + } p->strm->pend_pos = NULL; - if (p->srv) - p->srv->nbpend--; - else - p->strm->be->nbpend--; - p->strm->be->totpend--; + HA_ATOMIC_SUB(&p->strm->be->totpend, 1); pool_free2(pool2_pendconn, p); } +/* Lock-free version of pendconn_free. */ +static void __pendconn_free(struct pendconn *p) +{ + if (p->srv) { + LIST_DEL(&p->list); + HA_ATOMIC_SUB(&p->srv->nbpend, 1); + } + else { + LIST_DEL(&p->list); + HA_ATOMIC_SUB(&p->strm->be->nbpend, 1); + } + p->strm->pend_pos = NULL; + HA_ATOMIC_SUB(&p->strm->be->totpend, 1); + pool_free2(pool2_pendconn, p); +} /* * Local variables: