MINOR: spoe: Add "maxconnrate" and "maxerrrate" statements

"maxconnrate" is the maximum number of connections per second. The SPOE will
stop to open new connections if the maximum is reached and will wait to acquire
an existing one.

"maxerrrate" is the maximum number of errors per second. The SPOE will stop its
processing if the maximum is reached.

These options replace hardcoded macros MAX_NEW_SPOE_APPLETS and
MAX_NEW_SPOE_APPLET_ERRS. We use it to limit SPOE activity, especially when
servers are down..
This commit is contained in:
Christopher Faulet 2016-11-16 15:01:12 +01:00 committed by Willy Tarreau
parent ea62c2a345
commit 4802672274
2 changed files with 78 additions and 29 deletions

View File

@ -156,6 +156,8 @@ spoe-agent <name>
<name> is the name of the agent section.
following keywords are supported :
- maxconnrate
- maxerrrate
- messages
- option continue-on-error
- option var-prefix
@ -163,6 +165,18 @@ spoe-agent <name>
- use-backend
maxconnrate <number>
Set the maximum number of connections per second to <number>. The SPOE will
stop to open new connections if the maximum is reached and will wait to
acquire an existing one. So it is important to set "timeout hello" to a
relatively small value.
maxerrrate <number>
Set the maximum number of errors per second to <number>. The SPOE will stop
its processing if the maximum is reached.
messages <msg-name> ...
Declare the list of SPOE messages that an agent will handle.

View File

@ -30,6 +30,7 @@
#include <proto/arg.h>
#include <proto/backend.h>
#include <proto/filters.h>
#include <proto/freq_ctr.h>
#include <proto/frontend.h>
#include <proto/log.h>
#include <proto/proto_http.h>
@ -51,13 +52,6 @@
/* Helper to get ctx inside an appctx */
#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
/* TODO: add an option to customize these values */
/* The maximum number of new applet waiting the end of the hello handshake */
#define MAX_NEW_SPOE_APPLETS 5
/* The maximum number of error when a stream is waiting of a SPOE applet */
#define MAX_NEW_SPOE_APPLET_ERRS 3
/* Minimal size for a frame */
#define MIN_FRAME_SIZE 256
@ -200,6 +194,8 @@ struct spoe_agent {
char *var_pfx; /* Prefix used for vars set by the agent */
unsigned int flags; /* SPOE_FL_* */
unsigned int cps_max; /* Maximum number of connections per second */
unsigned int eps_max; /* Maximum number of errors per second */
struct list cache; /* List used to cache SPOE streams. In
* fact, we cache the SPOE applect ctx */
@ -208,7 +204,8 @@ struct spoe_agent {
* for each supported events */
struct list applet_wq; /* List of streams waiting for a SPOE applet */
unsigned int new_applets; /* The number of new SPOE applets */
struct freq_ctr conn_per_sec; /* connections per second */
struct freq_ctr err_per_sec; /* connetion errors per second */
};
/* SPOE filter configuration */
@ -229,8 +226,6 @@ struct spoe_context {
struct list buffer_wait; /* position in the list of streams waiting for a buffer */
struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
unsigned int errs; /* The number of errors to acquire a SPOE applet */
enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
unsigned int flags; /* SPOE_CTX_FL_* */
@ -1593,9 +1588,9 @@ create_spoe_appctx(struct spoe_config *conf)
si_applet_cant_get(&strm->si[0]);
appctx_wakeup(appctx);
/* Increase the number of applets waiting the end of the hello
* handshake. */
conf->agent->new_applets++;
/* Increase the per-process number of cumulated connections */
if (conf->agent->cps_max > 0)
update_freq_ctr(&conf->agent->conn_per_sec, 1);
strm->do_log = NULL;
strm->res.flags |= CF_READ_DONTWAIT;
@ -1665,9 +1660,7 @@ on_new_spoe_appctx_failure(struct spoe_agent *agent)
{
struct spoe_context *ctx;
agent->new_applets--;
list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
ctx->errs++;
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - wake up stream because to SPOE applet connection failed\n",
@ -1679,7 +1672,6 @@ on_new_spoe_appctx_failure(struct spoe_agent *agent)
static void
on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
{
agent->new_applets--;
offer_spoe_appctx(agent, appctx);
}
/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
@ -1723,10 +1715,9 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir)
goto success;
}
/* If there is no server up for the agent's backend or it too many
* failure occurred, this is an error. */
if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) ||
ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS)
/* If there is no server up for the agent's backend, this is an
* error. */
if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
goto error;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@ -1739,10 +1730,12 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir)
LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
/* Finally, create new SPOE applet if we can */
if (agent->new_applets < MAX_NEW_SPOE_APPLETS) {
if (create_spoe_appctx(conf) == NULL)
goto error;
if (agent->cps_max > 0) {
if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
goto wait;
}
if (create_spoe_appctx(conf) == NULL)
goto error;
wait:
return 0;
@ -1759,7 +1752,6 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir)
ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
? SPOE_CTX_FL_REQ_PROCESS
: SPOE_CTX_FL_RSP_PROCESS);
ctx->errs = 0;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - acquire SPOE appctx %p from cache\n",
@ -1775,9 +1767,9 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir)
}
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to acquire SPOE appctx errs=%u\n",
" - failed to acquire SPOE appctx\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, ctx->strm, ctx->errs);
__FUNCTION__, ctx->strm);
send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
return -1;
@ -2104,6 +2096,16 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
spoe_event_str[ev]);
if (agent->eps_max > 0) {
if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - skip event '%s': max EPS reached\n",
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, spoe_event_str[ev]);
goto skip;
}
}
dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
if (LIST_ISEMPTY(&(ctx->messages[ev])))
@ -2181,6 +2183,9 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx,
return 1;
error:
if (agent->eps_max > 0)
update_freq_ctr(&agent->err_per_sec, 1);
release_spoe_appctx(ctx);
ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
? SPOE_CTX_ST_READY
@ -2206,7 +2211,6 @@ create_spoe_context(struct filter *filter)
ctx->filter = filter;
ctx->state = SPOE_CTX_ST_NONE;
ctx->flags = 0;
ctx->errs = 0;
ctx->messages = conf->agent->messages;
ctx->buffer = &buf_empty;
LIST_INIT(&ctx->buffer_wait);
@ -2636,7 +2640,8 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
curagent->timeout.processing = TICK_ETERNITY;
curagent->var_pfx = NULL;
curagent->flags = 0;
curagent->new_applets = 0;
curagent->cps_max = 0;
curagent->eps_max = 0;
for (i = 0; i < SPOE_EV_EVENTS; ++i)
LIST_INIT(&curagent->messages[i]);
@ -2759,7 +2764,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
else if (!strcmp(args[1], "continue-on-error")) {
if (*args[2]) {
Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
file, linenum, args[3]);
file, linenum, args[2]);
err_code |= ERR_ALERT | ERR_ABORT;
goto out;
}
@ -2772,6 +2777,36 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
goto out;
}
}
else if (!strcmp(args[0], "maxconnrate")) {
if (!*args[1]) {
Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
file, linenum, args[0]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
if (*args[2]) {
Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
file, linenum, args[2]);
err_code |= ERR_ALERT | ERR_ABORT;
goto out;
}
curagent->cps_max = atol(args[1]);
}
else if (!strcmp(args[0], "maxerrrate")) {
if (!*args[1]) {
Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
file, linenum, args[0]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
if (*args[2]) {
Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
file, linenum, args[2]);
err_code |= ERR_ALERT | ERR_ABORT;
goto out;
}
curagent->eps_max = atol(args[1]);
}
else if (*args[0]) {
Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
file, linenum, args[0]);