MAJOR: server: make server state changes synchronous again

Now we try to synchronously push updates as they come using the new rdv
point, so that the call to the server update function from the main poll
loop is not needed anymore.

It further reduces the apparent latency in the health checks as the response
time almost always appears as 0 ms, resulting in a slightly higher check rate
of ~1960 conn/s. Despite this, the CPU consumption has slightly dropped again
to ~32% for the same test.

The only trick is that the checks code is built with a bit of recursivity
because srv_update_status() calls server_recalc_eweight(), and the latter
needs to signal srv_update_status() in case of updates. Thus we added an
extra argument to this function to indicate whether or not it must
propagate updates (no if it comes from srv_update_status).
This commit is contained in:
Willy Tarreau 2018-08-02 11:48:52 +02:00
parent 647c70b681
commit 3ff577e165
4 changed files with 47 additions and 46 deletions

View File

@ -91,7 +91,7 @@ void srv_dump_kws(char **out);
* and the proxy's algorihtm. To be used after updating sv->uweight. The warmup
* state is automatically disabled if the time is elapsed.
*/
void server_recalc_eweight(struct server *sv);
void server_recalc_eweight(struct server *sv, int must_update);
/* returns the current server throttle rate between 0 and 100% */
static inline unsigned int server_throttle_rate(struct server *sv)

View File

@ -1475,7 +1475,7 @@ static struct task *server_warmup(struct task *t, void *context, unsigned short
return t;
/* recalculate the weights and update the state */
server_recalc_eweight(s);
server_recalc_eweight(s, 1);
/* probably that we can refill this server with a bit more connections */
pendconn_grab_from_px(s);

View File

@ -2375,16 +2375,6 @@ void mworker_pipe_register()
fd_want_recv(mworker_pipe[0]);
}
/* verifies if some servers' statuses need to be updated and call the update */
static inline void servers_check_for_updates()
{
if (!LIST_ISEMPTY(&updated_servers)) {
thread_isolate();
servers_update_status();
thread_release();
}
}
/* Runs the polling loop */
static void run_poll_loop()
{
@ -2431,9 +2421,6 @@ static void run_poll_loop()
HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
fd_process_cached_events();
/* check for server status updates */
servers_check_for_updates();
activity[tid].loops++;
}
}

View File

@ -47,7 +47,7 @@
struct list updated_servers = LIST_HEAD_INIT(updated_servers);
__decl_hathreads(HA_SPINLOCK_T updated_servers_lock);
static void srv_register_update(struct server *srv);
static void srv_update_status(struct server *s);
static void srv_update_state(struct server *srv, int version, char **params);
static int srv_apply_lastaddr(struct server *srv, int *err_code);
static int srv_set_fqdn(struct server *srv, const char *fqdn, int dns_locked);
@ -929,12 +929,16 @@ void srv_set_stopped(struct server *s, const char *reason, struct check *check)
s->op_st_chg.duration = check->duration;
}
srv_register_update(s);
for (srv = s->trackers; srv; srv = srv->tracknext) {
HA_SPIN_LOCK(SERVER_LOCK, &srv->lock);
srv_set_stopped(srv, NULL, NULL);
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
}
/* propagate changes */
thread_isolate();
srv_update_status(s);
thread_release();
}
/* Marks server <s> up regardless of its checks' statuses and provided it isn't
@ -970,12 +974,16 @@ void srv_set_running(struct server *s, const char *reason, struct check *check)
if (s->slowstart <= 0)
s->next_state = SRV_ST_RUNNING;
srv_register_update(s);
for (srv = s->trackers; srv; srv = srv->tracknext) {
HA_SPIN_LOCK(SERVER_LOCK, &srv->lock);
srv_set_running(srv, NULL, NULL);
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
}
/* propagate changes */
thread_isolate();
srv_update_status(s);
thread_release();
}
/* Marks server <s> stopping regardless of its checks' statuses and provided it
@ -1010,12 +1018,16 @@ void srv_set_stopping(struct server *s, const char *reason, struct check *check)
s->op_st_chg.duration = check->duration;
}
srv_register_update(s);
for (srv = s->trackers; srv; srv = srv->tracknext) {
HA_SPIN_LOCK(SERVER_LOCK, &srv->lock);
srv_set_stopping(srv, NULL, NULL);
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
}
/* propagate changes */
thread_isolate();
srv_update_status(s);
thread_release();
}
/* Enables admin flag <mode> (among SRV_ADMF_*) on server <s>. This is used to
@ -1041,12 +1053,10 @@ void srv_set_admin_flag(struct server *s, enum srv_admin mode, const char *cause
if (cause)
strlcpy2(s->adm_st_chg_cause, cause, sizeof(s->adm_st_chg_cause));
srv_register_update(s);
/* stop going down if the equivalent flag was already present (forced or inherited) */
if (((mode & SRV_ADMF_MAINT) && (s->next_admin & ~mode & SRV_ADMF_MAINT)) ||
((mode & SRV_ADMF_DRAIN) && (s->next_admin & ~mode & SRV_ADMF_DRAIN)))
return;
goto end;
/* compute the inherited flag to propagate */
if (mode & SRV_ADMF_MAINT)
@ -1059,6 +1069,12 @@ void srv_set_admin_flag(struct server *s, enum srv_admin mode, const char *cause
srv_set_admin_flag(srv, mode, cause);
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
}
end:
/* propagate changes */
thread_isolate();
srv_update_status(s);
thread_release();
}
/* Disables admin flag <mode> (among SRV_ADMF_*) on server <s>. This is used to
@ -1080,12 +1096,10 @@ void srv_clr_admin_flag(struct server *s, enum srv_admin mode)
s->next_admin &= ~mode;
srv_register_update(s);
/* stop going down if the equivalent flag is still present (forced or inherited) */
if (((mode & SRV_ADMF_MAINT) && (s->next_admin & SRV_ADMF_MAINT)) ||
((mode & SRV_ADMF_DRAIN) && (s->next_admin & SRV_ADMF_DRAIN)))
return;
goto end;
if (mode & SRV_ADMF_MAINT)
mode = SRV_ADMF_IMAINT;
@ -1097,6 +1111,12 @@ void srv_clr_admin_flag(struct server *s, enum srv_admin mode)
srv_clr_admin_flag(srv, mode);
HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
}
end:
/* propagate changes */
thread_isolate();
srv_update_status(s);
thread_release();
}
/* principle: propagate maint and drain to tracking servers. This is useful
@ -1180,9 +1200,10 @@ static void __listener_init(void)
/* Recomputes the server's eweight based on its state, uweight, the current time,
* and the proxy's algorihtm. To be used after updating sv->uweight. The warmup
* state is automatically disabled if the time is elapsed.
* state is automatically disabled if the time is elapsed. If <must_update> is
* not zero, the update will be propagated immediately.
*/
void server_recalc_eweight(struct server *sv)
void server_recalc_eweight(struct server *sv, int must_update)
{
struct proxy *px = sv->proxy;
unsigned w;
@ -1203,7 +1224,12 @@ void server_recalc_eweight(struct server *sv)
sv->next_eweight = (sv->uweight * w + px->lbprm.wmult - 1) / px->lbprm.wmult;
srv_register_update(sv);
/* propagate changes only if needed (i.e. not recursively) */
if (must_update) {
thread_isolate();
srv_update_status(sv);
thread_release();
}
}
/*
@ -1247,7 +1273,7 @@ const char *server_parse_weight_change_request(struct server *sv,
return "Backend is using a static LB algorithm and only accepts weights '0%' and '100%'.\n";
sv->uweight = w;
server_recalc_eweight(sv);
server_recalc_eweight(sv, 1);
return NULL;
}
@ -2671,17 +2697,6 @@ struct server *server_find_best_match(struct proxy *bk, char *name, int id, int
return NULL;
}
/* Registers changes to be applied asynchronously */
static void srv_register_update(struct server *srv)
{
if (LIST_ISEMPTY(&srv->update_status)) {
HA_SPIN_LOCK(UPDATED_SERVERS_LOCK, &updated_servers_lock);
if (LIST_ISEMPTY(&srv->update_status))
LIST_ADDQ(&updated_servers, &srv->update_status);
HA_SPIN_UNLOCK(UPDATED_SERVERS_LOCK, &updated_servers_lock);
}
}
/* Update a server state using the parameters available in the params list */
static void srv_update_state(struct server *srv, int version, char **params)
{
@ -2960,7 +2975,7 @@ static void srv_update_state(struct server *srv, int version, char **params)
if (srv_iweight == srv->iweight) {
srv->uweight = srv_uweight;
}
server_recalc_eweight(srv);
server_recalc_eweight(srv, 1);
/* load server IP address */
if (strcmp(params[0], "-"))
@ -4516,7 +4531,7 @@ static void __server_init(void)
* is designed to be called asynchronously.
*
*/
void srv_update_status(struct server *s)
static void srv_update_status(struct server *s)
{
struct check *check = &s->check;
int xferred;
@ -4526,7 +4541,6 @@ void srv_update_status(struct server *s)
int log_level;
struct buffer *tmptrash = NULL;
/* If currently main is not set we try to apply pending state changes */
if (!(s->cur_admin & SRV_ADMF_MAINT)) {
int next_admin;
@ -4618,7 +4632,7 @@ void srv_update_status(struct server *s)
if (s->next_state == SRV_ST_STARTING)
task_schedule(s->warmup, tick_add(now_ms, MS_TO_TICKS(MAX(1000, s->slowstart / 20))));
server_recalc_eweight(s);
server_recalc_eweight(s, 0);
/* now propagate the status change to any LB algorithms */
if (px->lbprm.update_server_eweight)
px->lbprm.update_server_eweight(s);
@ -4832,7 +4846,7 @@ void srv_update_status(struct server *s)
tmptrash = NULL;
}
server_recalc_eweight(s);
server_recalc_eweight(s, 0);
/* now propagate the status change to any LB algorithms */
if (px->lbprm.update_server_eweight)
px->lbprm.update_server_eweight(s);
@ -4968,7 +4982,7 @@ void srv_update_status(struct server *s)
if (s->last_change < now.tv_sec) // ignore negative times
s->down_time += now.tv_sec - s->last_change;
s->last_change = now.tv_sec;
server_recalc_eweight(s);
server_recalc_eweight(s, 0);
tmptrash = alloc_trash_chunk();
if (tmptrash) {