From 4802672274eddc91aa5ab2d6b12810d050436536 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Wed, 16 Nov 2016 15:01:12 +0100 Subject: [PATCH] MINOR: spoe: Add "maxconnrate" and "maxerrrate" statements "maxconnrate" is the maximum number of connections per second. The SPOE will stop to open new connections if the maximum is reached and will wait to acquire an existing one. "maxerrrate" is the maximum number of errors per second. The SPOE will stop its processing if the maximum is reached. These options replace hardcoded macros MAX_NEW_SPOE_APPLETS and MAX_NEW_SPOE_APPLET_ERRS. We use it to limit SPOE activity, especially when servers are down.. --- doc/SPOE.txt | 14 ++++++++ src/flt_spoe.c | 93 ++++++++++++++++++++++++++++++++++---------------- 2 files changed, 78 insertions(+), 29 deletions(-) diff --git a/doc/SPOE.txt b/doc/SPOE.txt index 1012f3517..83c52b1a6 100644 --- a/doc/SPOE.txt +++ b/doc/SPOE.txt @@ -156,6 +156,8 @@ spoe-agent is the name of the agent section. following keywords are supported : + - maxconnrate + - maxerrrate - messages - option continue-on-error - option var-prefix @@ -163,6 +165,18 @@ spoe-agent - use-backend +maxconnrate + Set the maximum number of connections per second to . The SPOE will + stop to open new connections if the maximum is reached and will wait to + acquire an existing one. So it is important to set "timeout hello" to a + relatively small value. + + +maxerrrate + Set the maximum number of errors per second to . The SPOE will stop + its processing if the maximum is reached. + + messages ... Declare the list of SPOE messages that an agent will handle. diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 06d6e5d25..ba7661117 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -51,13 +52,6 @@ /* Helper to get ctx inside an appctx */ #define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe) -/* TODO: add an option to customize these values */ -/* The maximum number of new applet waiting the end of the hello handshake */ -#define MAX_NEW_SPOE_APPLETS 5 - -/* The maximum number of error when a stream is waiting of a SPOE applet */ -#define MAX_NEW_SPOE_APPLET_ERRS 3 - /* Minimal size for a frame */ #define MIN_FRAME_SIZE 256 @@ -200,6 +194,8 @@ struct spoe_agent { char *var_pfx; /* Prefix used for vars set by the agent */ unsigned int flags; /* SPOE_FL_* */ + unsigned int cps_max; /* Maximum number of connections per second */ + unsigned int eps_max; /* Maximum number of errors per second */ struct list cache; /* List used to cache SPOE streams. In * fact, we cache the SPOE applect ctx */ @@ -208,7 +204,8 @@ struct spoe_agent { * for each supported events */ struct list applet_wq; /* List of streams waiting for a SPOE applet */ - unsigned int new_applets; /* The number of new SPOE applets */ + struct freq_ctr conn_per_sec; /* connections per second */ + struct freq_ctr err_per_sec; /* connetion errors per second */ }; /* SPOE filter configuration */ @@ -229,8 +226,6 @@ struct spoe_context { struct list buffer_wait; /* position in the list of streams waiting for a buffer */ struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */ - unsigned int errs; /* The number of errors to acquire a SPOE applet */ - enum spoe_ctx_state state; /* SPOE_CTX_ST_* */ unsigned int flags; /* SPOE_CTX_FL_* */ @@ -1593,9 +1588,9 @@ create_spoe_appctx(struct spoe_config *conf) si_applet_cant_get(&strm->si[0]); appctx_wakeup(appctx); - /* Increase the number of applets waiting the end of the hello - * handshake. */ - conf->agent->new_applets++; + /* Increase the per-process number of cumulated connections */ + if (conf->agent->cps_max > 0) + update_freq_ctr(&conf->agent->conn_per_sec, 1); strm->do_log = NULL; strm->res.flags |= CF_READ_DONTWAIT; @@ -1665,9 +1660,7 @@ on_new_spoe_appctx_failure(struct spoe_agent *agent) { struct spoe_context *ctx; - agent->new_applets--; list_for_each_entry(ctx, &agent->applet_wq, applet_wait) { - ctx->errs++; task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - wake up stream because to SPOE applet connection failed\n", @@ -1679,7 +1672,6 @@ on_new_spoe_appctx_failure(struct spoe_agent *agent) static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx) { - agent->new_applets--; offer_spoe_appctx(agent, appctx); } /* Retrieve a SPOE applet from the agent cache if possible, else create it. It @@ -1723,10 +1715,9 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) goto success; } - /* If there is no server up for the agent's backend or it too many - * failure occurred, this is an error. */ - if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) || - ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS) + /* If there is no server up for the agent's backend, this is an + * error. */ + if (!agent->b.be->srv_act && !agent->b.be->srv_bck) goto error; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" @@ -1739,10 +1730,12 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait); /* Finally, create new SPOE applet if we can */ - if (agent->new_applets < MAX_NEW_SPOE_APPLETS) { - if (create_spoe_appctx(conf) == NULL) - goto error; + if (agent->cps_max > 0) { + if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) + goto wait; } + if (create_spoe_appctx(conf) == NULL) + goto error; wait: return 0; @@ -1759,7 +1752,6 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) ctx->flags |= ((dir == SMP_OPT_DIR_REQ) ? SPOE_CTX_FL_REQ_PROCESS : SPOE_CTX_FL_RSP_PROCESS); - ctx->errs = 0; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - acquire SPOE appctx %p from cache\n", @@ -1775,9 +1767,9 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) } SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - failed to acquire SPOE appctx errs=%u\n", + " - failed to acquire SPOE appctx\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm, ctx->errs); + __FUNCTION__, ctx->strm); send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n"); return -1; @@ -2104,6 +2096,16 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state], spoe_event_str[ev]); + if (agent->eps_max > 0) { + if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - skip event '%s': max EPS reached\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, spoe_event_str[ev]); + goto skip; + } + } + dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES); if (LIST_ISEMPTY(&(ctx->messages[ev]))) @@ -2181,6 +2183,9 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, return 1; error: + if (agent->eps_max > 0) + update_freq_ctr(&agent->err_per_sec, 1); + release_spoe_appctx(ctx); ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR) ? SPOE_CTX_ST_READY @@ -2206,7 +2211,6 @@ create_spoe_context(struct filter *filter) ctx->filter = filter; ctx->state = SPOE_CTX_ST_NONE; ctx->flags = 0; - ctx->errs = 0; ctx->messages = conf->agent->messages; ctx->buffer = &buf_empty; LIST_INIT(&ctx->buffer_wait); @@ -2636,7 +2640,8 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) curagent->timeout.processing = TICK_ETERNITY; curagent->var_pfx = NULL; curagent->flags = 0; - curagent->new_applets = 0; + curagent->cps_max = 0; + curagent->eps_max = 0; for (i = 0; i < SPOE_EV_EVENTS; ++i) LIST_INIT(&curagent->messages[i]); @@ -2759,7 +2764,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) else if (!strcmp(args[1], "continue-on-error")) { if (*args[2]) { Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", - file, linenum, args[3]); + file, linenum, args[2]); err_code |= ERR_ALERT | ERR_ABORT; goto out; } @@ -2772,6 +2777,36 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) goto out; } } + else if (!strcmp(args[0], "maxconnrate")) { + if (!*args[1]) { + Alert("parsing [%s:%d] : '%s' expects an integer argument.\n", + file, linenum, args[0]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + if (*args[2]) { + Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", + file, linenum, args[2]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + curagent->cps_max = atol(args[1]); + } + else if (!strcmp(args[0], "maxerrrate")) { + if (!*args[1]) { + Alert("parsing [%s:%d] : '%s' expects an integer argument.\n", + file, linenum, args[0]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + if (*args[2]) { + Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", + file, linenum, args[2]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + curagent->eps_max = atol(args[1]); + } else if (*args[0]) { Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n", file, linenum, args[0]);