diff --git a/haproxy.c b/haproxy.c index 766841c6f2..eaa6bc183b 100644 --- a/haproxy.c +++ b/haproxy.c @@ -531,6 +531,7 @@ struct server { char *id; /* just for identification */ struct list pendconns; /* pending connections */ int nbpend; /* number of pending connections */ + struct task *queue_mgt; /* the task associated to the queue processing */ struct sockaddr_in addr; /* the address to connect to */ struct sockaddr_in source_addr; /* the address to which we want to bind for connect() */ short check_port; /* the port to use for the health checks */ @@ -1879,20 +1880,6 @@ static struct session *pendconn_get_next_sess(struct server *srv, struct proxy * return sess; } -/* Checks if other sessions are waiting for a known server, and wakes the - * first one up. Note that neither nor can be NULL. Returns 1 - * if a session has been assigned, 0 if nothing has been done. - */ -static int offer_connection_slot(struct server *srv, struct proxy *px) { - struct session *sess; - - sess = pendconn_get_next_sess(srv, px); - if (sess == NULL) - return 0; - task_wakeup(&rq, sess->task); - return 1; -} - /* Adds the session to the pending connection list of server ->srv * or to the one of ->proxy if srv is NULL. All counters and back pointers * are updated accordingly. Returns NULL if no memory is available, otherwise the @@ -1919,6 +1906,16 @@ static struct pendconn *pendconn_add(struct session *sess) { return p; } +/* returns 0 if nothing has to be done for server regarding queued connections, + * and non-zero otherwise. Suited for and if/else usage. + */ +static inline int may_dequeue_tasks(struct server *s, struct proxy *p) { + return (s && (s->nbpend || p->nbpend) && + s->maxconn && s->cur_sess < s->maxconn && s->queue_mgt); +} + + + /*********************************************************************/ /* more specific functions ***************************************/ /*********************************************************************/ @@ -4560,10 +4557,10 @@ int srv_count_retry_down(struct session *t, int conn_err) { 503, t->proxy->errmsg.len503, t->proxy->errmsg.msg503); /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } return 0; @@ -4596,20 +4593,26 @@ int srv_retryable_connect(struct session *t) { srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C, 500, t->proxy->errmsg.len500, t->proxy->errmsg.msg500); /* release other sessions waiting for this server */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } /* ensure that we have enough retries left */ - if (srv_count_retry_down(t, conn_err)) - /* FIXME-20060509: should not we try to offer this slot to anybody ? */ + if (srv_count_retry_down(t, conn_err)) { + /* let's try to offer this slot to anybody */ + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; + } } while (t->srv == NULL || t->conn_retries > 0 || !(t->proxy->options & PR_O_REDISP)); /* We're on our last chance, and the REDISP option was specified. * We will ignore cookie and force to balance or use the dispatcher. */ - /* FIXME-20060509: should not we try to offer this slot to anybody ? */ + /* let's try to offer this slot to anybody */ + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); + t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET); t->srv = NULL; /* it's left to the dispatcher to choose a server */ if ((t->flags & SN_CK_MASK) == SN_CK_VALID) { @@ -4639,17 +4642,10 @@ int srv_redispatch_connect(struct session *t) { break; case SRV_STATUS_NOSRV: + /* note: it is guaranteed that t->srv == NULL here */ tv_eternity(&t->cnexpire); srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_C, 503, t->proxy->errmsg.len503, t->proxy->errmsg.msg503); - - /* FIXME-20060501: we should not need this once we flush every session - * when the last server goes down. - * FIXME-20060509: this will never execute because it is guaranteed that t->srv == NULL here. - */ - /* release other sessions waiting for this server */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); return 1; case SRV_STATUS_QUEUED: @@ -4669,8 +4665,8 @@ int srv_redispatch_connect(struct session *t) { srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C, 500, t->proxy->errmsg.len500, t->proxy->errmsg.msg500); /* release other sessions waiting for this server */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } /* if we get here, it's because we got SRV_STATUS_OK, which also @@ -4709,12 +4705,6 @@ int process_srv(struct session *t) { tv_eternity(&t->cnexpire); srv_close_with_err(t, SN_ERR_CLICL, SN_FINST_C, 0, 0, NULL); - /* it might be possible that we have been granted an access to the - * server while waiting for a free slot. Since we'll never use it, - * we have to pass it on to another session. - */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); return 1; } else { @@ -4879,10 +4869,10 @@ int process_srv(struct session *t) { send_log(t->proxy, LOG_ALERT, "Blocking cacheable cookie in response from instance %s, server %s.\n", t->proxy->id, t->srv->id); /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -4903,10 +4893,10 @@ int process_srv(struct session *t) { if (!(t->flags & SN_FINST_MASK)) t->flags |= SN_FINST_H; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5343,10 +5333,10 @@ int process_srv(struct session *t) { if (!(t->flags & SN_FINST_MASK)) t->flags |= SN_FINST_H; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5378,10 +5368,10 @@ int process_srv(struct session *t) { if (!(t->flags & SN_FINST_MASK)) t->flags |= SN_FINST_H; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5478,10 +5468,10 @@ int process_srv(struct session *t) { if (!(t->flags & SN_FINST_MASK)) t->flags |= SN_FINST_D; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5592,10 +5582,10 @@ int process_srv(struct session *t) { if (!(t->flags & SN_FINST_MASK)) t->flags |= SN_FINST_D; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5608,10 +5598,10 @@ int process_srv(struct session *t) { //close(t->srv_fd); t->srv_state = SV_STCLOSE; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5628,10 +5618,10 @@ int process_srv(struct session *t) { if (!(t->flags & SN_FINST_MASK)) t->flags |= SN_FINST_D; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5670,10 +5660,10 @@ int process_srv(struct session *t) { if (!(t->flags & SN_FINST_MASK)) t->flags |= SN_FINST_D; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5686,10 +5676,10 @@ int process_srv(struct session *t) { //close(t->srv_fd); t->srv_state = SV_STCLOSE; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -5706,10 +5696,10 @@ int process_srv(struct session *t) { if (!(t->flags & SN_FINST_MASK)) t->flags |= SN_FINST_D; /* We used to have a free connection slot. Since we'll never use it, - * we have to pass it on to another session. + * we have to inform the server that it may be used by another session. */ - if (t->srv) - offer_connection_slot(t->srv, t->proxy); + if (may_dequeue_tasks(t->srv, t->proxy)) + task_wakeup(&rq, t->srv->queue_mgt); return 1; } @@ -6054,6 +6044,31 @@ int process_chk(struct task *t) { +/* + * Manages a server's connection queue. If woken up, will try to dequeue as + * many pending sessions as possible, and wake them up. The task has nothing + * else to do, so it always returns TIME_ETERNITY. + */ +int process_srv_queue(struct task *t) { + struct server *s = (struct server*)t->context; + struct proxy *p = s->proxy; + int xferred; + + /* First, check if we can handle some connections queued at the proxy. We + * will take as many as we can handle. + */ + for (xferred = 0; s->cur_sess + xferred < s->maxconn; xferred++) { + struct session *sess; + + sess = pendconn_get_next_sess(s, p); + if (sess == NULL) + break; + task_wakeup(&rq, sess->task); + } + + return TIME_ETERNITY; +} + #if STATTIME > 0 int stats(void); #endif @@ -8596,6 +8611,34 @@ int readcfgfile(char *file) { curproxy->errmsg.len504 = strlen(HTTP_504); } + /* + * If this server supports a maxconn parameter, it needs a dedicated + * tasks to fill the emptied slots when a connection leaves. + */ + newsrv = curproxy->srv; + while (newsrv != NULL) { + if (newsrv->maxconn > 0) { + struct task *t; + + if ((t = pool_alloc(task)) == NULL) { + Alert("parsing [%s:%d] : out of memory.\n", file, linenum); + return -1; + } + + t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */ + t->wq = LIST_HEAD(wait_queue[1]); /* already assigned to the eternity queue */ + t->state = TASK_IDLE; + t->process = process_srv_queue; + t->context = newsrv; + newsrv->queue_mgt = t; + + /* never run it unless specifically woken up */ + tv_eternity(&t->expire); + task_queue(t); + } + newsrv = newsrv->next; + } + /* now we'll start this proxy's health checks if any */ /* 1- count the checkers to run simultaneously */ nbchk = 0;