diff --git a/doc/SPOE.txt b/doc/SPOE.txt index 1012f3517..83c52b1a6 100644 --- a/doc/SPOE.txt +++ b/doc/SPOE.txt @@ -156,6 +156,8 @@ spoe-agent is the name of the agent section. following keywords are supported : + - maxconnrate + - maxerrrate - messages - option continue-on-error - option var-prefix @@ -163,6 +165,18 @@ spoe-agent - use-backend +maxconnrate + Set the maximum number of connections per second to . The SPOE will + stop to open new connections if the maximum is reached and will wait to + acquire an existing one. So it is important to set "timeout hello" to a + relatively small value. + + +maxerrrate + Set the maximum number of errors per second to . The SPOE will stop + its processing if the maximum is reached. + + messages ... Declare the list of SPOE messages that an agent will handle. diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 06d6e5d25..ba7661117 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -51,13 +52,6 @@ /* Helper to get ctx inside an appctx */ #define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe) -/* TODO: add an option to customize these values */ -/* The maximum number of new applet waiting the end of the hello handshake */ -#define MAX_NEW_SPOE_APPLETS 5 - -/* The maximum number of error when a stream is waiting of a SPOE applet */ -#define MAX_NEW_SPOE_APPLET_ERRS 3 - /* Minimal size for a frame */ #define MIN_FRAME_SIZE 256 @@ -200,6 +194,8 @@ struct spoe_agent { char *var_pfx; /* Prefix used for vars set by the agent */ unsigned int flags; /* SPOE_FL_* */ + unsigned int cps_max; /* Maximum number of connections per second */ + unsigned int eps_max; /* Maximum number of errors per second */ struct list cache; /* List used to cache SPOE streams. In * fact, we cache the SPOE applect ctx */ @@ -208,7 +204,8 @@ struct spoe_agent { * for each supported events */ struct list applet_wq; /* List of streams waiting for a SPOE applet */ - unsigned int new_applets; /* The number of new SPOE applets */ + struct freq_ctr conn_per_sec; /* connections per second */ + struct freq_ctr err_per_sec; /* connetion errors per second */ }; /* SPOE filter configuration */ @@ -229,8 +226,6 @@ struct spoe_context { struct list buffer_wait; /* position in the list of streams waiting for a buffer */ struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */ - unsigned int errs; /* The number of errors to acquire a SPOE applet */ - enum spoe_ctx_state state; /* SPOE_CTX_ST_* */ unsigned int flags; /* SPOE_CTX_FL_* */ @@ -1593,9 +1588,9 @@ create_spoe_appctx(struct spoe_config *conf) si_applet_cant_get(&strm->si[0]); appctx_wakeup(appctx); - /* Increase the number of applets waiting the end of the hello - * handshake. */ - conf->agent->new_applets++; + /* Increase the per-process number of cumulated connections */ + if (conf->agent->cps_max > 0) + update_freq_ctr(&conf->agent->conn_per_sec, 1); strm->do_log = NULL; strm->res.flags |= CF_READ_DONTWAIT; @@ -1665,9 +1660,7 @@ on_new_spoe_appctx_failure(struct spoe_agent *agent) { struct spoe_context *ctx; - agent->new_applets--; list_for_each_entry(ctx, &agent->applet_wq, applet_wait) { - ctx->errs++; task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - wake up stream because to SPOE applet connection failed\n", @@ -1679,7 +1672,6 @@ on_new_spoe_appctx_failure(struct spoe_agent *agent) static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx) { - agent->new_applets--; offer_spoe_appctx(agent, appctx); } /* Retrieve a SPOE applet from the agent cache if possible, else create it. It @@ -1723,10 +1715,9 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) goto success; } - /* If there is no server up for the agent's backend or it too many - * failure occurred, this is an error. */ - if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) || - ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS) + /* If there is no server up for the agent's backend, this is an + * error. */ + if (!agent->b.be->srv_act && !agent->b.be->srv_bck) goto error; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" @@ -1739,10 +1730,12 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait); /* Finally, create new SPOE applet if we can */ - if (agent->new_applets < MAX_NEW_SPOE_APPLETS) { - if (create_spoe_appctx(conf) == NULL) - goto error; + if (agent->cps_max > 0) { + if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) + goto wait; } + if (create_spoe_appctx(conf) == NULL) + goto error; wait: return 0; @@ -1759,7 +1752,6 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) ctx->flags |= ((dir == SMP_OPT_DIR_REQ) ? SPOE_CTX_FL_REQ_PROCESS : SPOE_CTX_FL_RSP_PROCESS); - ctx->errs = 0; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" " - acquire SPOE appctx %p from cache\n", @@ -1775,9 +1767,9 @@ acquire_spoe_appctx(struct spoe_context *ctx, int dir) } SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - failed to acquire SPOE appctx errs=%u\n", + " - failed to acquire SPOE appctx\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm, ctx->errs); + __FUNCTION__, ctx->strm); send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n"); return -1; @@ -2104,6 +2096,16 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state], spoe_event_str[ev]); + if (agent->eps_max > 0) { + if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - skip event '%s': max EPS reached\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, spoe_event_str[ev]); + goto skip; + } + } + dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES); if (LIST_ISEMPTY(&(ctx->messages[ev]))) @@ -2181,6 +2183,9 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, return 1; error: + if (agent->eps_max > 0) + update_freq_ctr(&agent->err_per_sec, 1); + release_spoe_appctx(ctx); ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR) ? SPOE_CTX_ST_READY @@ -2206,7 +2211,6 @@ create_spoe_context(struct filter *filter) ctx->filter = filter; ctx->state = SPOE_CTX_ST_NONE; ctx->flags = 0; - ctx->errs = 0; ctx->messages = conf->agent->messages; ctx->buffer = &buf_empty; LIST_INIT(&ctx->buffer_wait); @@ -2636,7 +2640,8 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) curagent->timeout.processing = TICK_ETERNITY; curagent->var_pfx = NULL; curagent->flags = 0; - curagent->new_applets = 0; + curagent->cps_max = 0; + curagent->eps_max = 0; for (i = 0; i < SPOE_EV_EVENTS; ++i) LIST_INIT(&curagent->messages[i]); @@ -2759,7 +2764,7 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) else if (!strcmp(args[1], "continue-on-error")) { if (*args[2]) { Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", - file, linenum, args[3]); + file, linenum, args[2]); err_code |= ERR_ALERT | ERR_ABORT; goto out; } @@ -2772,6 +2777,36 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) goto out; } } + else if (!strcmp(args[0], "maxconnrate")) { + if (!*args[1]) { + Alert("parsing [%s:%d] : '%s' expects an integer argument.\n", + file, linenum, args[0]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + if (*args[2]) { + Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", + file, linenum, args[2]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + curagent->cps_max = atol(args[1]); + } + else if (!strcmp(args[0], "maxerrrate")) { + if (!*args[1]) { + Alert("parsing [%s:%d] : '%s' expects an integer argument.\n", + file, linenum, args[0]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + if (*args[2]) { + Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", + file, linenum, args[2]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + curagent->eps_max = atol(args[1]); + } else if (*args[0]) { Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n", file, linenum, args[0]);