From a1cda029958da17f4f467ab4316a8bf4bac668f1 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Wed, 21 Dec 2016 08:58:06 +0100 Subject: [PATCH] MAJOR: spoe: Add support of pipelined and asynchronous exchanges with agents Now, HAProxy and agents can announce the support for "pipelining" and/or "async" capabilities during the HELLO handshake. For now, HAProxy always announces the support of both. In addition, in its HELLO frames. HAproxy adds the "engine-id" key. It is a uniq string that identify a SPOE engine. The "pipelining" capability is the ability for a peer to decouple NOTIFY and ACK frames. This is a symmectical capability. To be used, it must be supported by HAproxy and agents. Unlike HTTP pipelining, the ACK frames can be send in any order, but always on the same TCP connection used for the corresponding NOTIFY frame. The "async" capability is similar to the pipelining, but here any TCP connection established between HAProxy and the agent can be used to send ACK frames. if an agent accepts connections from multiple HAProxy, it can use the "engine-id" value to group TCP connections. --- doc/SPOE.txt | 27 +- include/types/applet.h | 3 +- src/flt_spoe.c | 1437 ++++++++++++++++++++++++---------------- 3 files changed, 887 insertions(+), 580 deletions(-) diff --git a/doc/SPOE.txt b/doc/SPOE.txt index 6d2c4ae4c6..d716c6b6b6 100644 --- a/doc/SPOE.txt +++ b/doc/SPOE.txt @@ -505,8 +505,23 @@ equal to 256 bytes. Here are the list of official capabilities that HAProxy and agents can support: - * fragmentation: This is the abaility for a peer to support fragmented - payload in received frames. + * fragmentation: This is the ability for a peer to support fragmented + payload in received frames. This is an asymmectical + capability, it only concerns the peer that announces + it. This is the responsibility to the other peer to use it + or not. + + * pipelining: This is the ability for a peer to decouple NOTIFY and ACK + frames. This is a symmectical capability. To be used, it must + be supported by HAproxy and agents. Unlike HTTP pipelining, the + ACK frames can be send in any order, but always on the same TCP + connection used for the corresponding NOTIFY frame. + + * async: This ability is similar to the pipelining, but here any TCP + connection established between HAProxy and the agent can be used to + send ACK frames. if an agent accepts connections from multiple + HAProxy, it can use the "engine-id" value to group TCP + connections. See details about HAPROXY-HELLO frame. Unsupported or unknown capabilities are silently ignored, when possible. @@ -653,6 +668,10 @@ Following optional items can be added in the KV-LIST: If this item is set to TRUE, then the HAPROXY-HELLO frame is sent during a SPOE health check. When set to FALSE, this item can be ignored. + * "engine-id" + + This is a uniq string that identify a SPOE engine. + To finish the HELLO handshake, the agent must return an AGENT-HELLO frame with its supported SPOP version, the lower value between its maximum size allowed for a frame and the HAProxy one and capabilities it supports. If an error @@ -834,7 +853,7 @@ Here is the list of supported actions: SESSION : <1> TRANSACTION : <2> REQUEST : <3> - RESERVED : <4> + RESPONSE : <4> * unset-var unset the value for an existing variable. 2 arguments must be attached to this action: the variable scope (proc, sess, txn, @@ -851,7 +870,7 @@ Here is the list of supported actions: SESSION : <1> TRANSACTION : <2> REQUEST : <3> - RESERVED : <4> + RESPONSE : <4> NOTE: Name of the variables will be automatically prefixed by HAProxy to avoid diff --git a/include/types/applet.h b/include/types/applet.h index 642c7931b9..851948b4a3 100644 --- a/include/types/applet.h +++ b/include/types/applet.h @@ -85,10 +85,11 @@ struct appctx { } hlua_apphttp; /* used by the Lua HTTP services */ struct { struct task *task; - void *ctx; void *agent; unsigned int version; unsigned int max_frame_size; + unsigned int flags; + struct list waiting_queue; struct list list; } spoe; /* used by SPOE filter */ struct { diff --git a/src/flt_spoe.c b/src/flt_spoe.c index f5918dc5f0..17290d2cc4 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -66,6 +66,11 @@ #define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS) +/* Flags set on the SPOE applet */ +#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */ +#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */ +#define SPOE_APPCTX_FL_PERSIST 0x00000004 /* Set if the applet is persistent */ + #define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */ #define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */ @@ -83,6 +88,7 @@ enum spoe_ctx_state { enum spoe_appctx_state { SPOE_APPCTX_ST_CONNECT = 0, SPOE_APPCTX_ST_CONNECTING, + SPOE_APPCTX_ST_IDLE, SPOE_APPCTX_ST_PROCESSING, SPOE_APPCTX_ST_DISCONNECT, SPOE_APPCTX_ST_DISCONNECTING, @@ -162,15 +168,15 @@ struct spoe_msg_placeholder { /* Describe a message that will be sent in a NOTIFY frame. A message has a name, * an argument list (see above) and it is linked to a specific event. */ struct spoe_message { - char *id; /* SPOE message id */ - unsigned int id_len; /* The message id length */ + char *id; /* SPOE message id */ + unsigned int id_len; /* The message id length */ struct spoe_agent *agent; /* SPOE agent owning this SPOE message */ struct { - char *file; /* file where the SPOE message appears */ - int line; /* line where the SPOE message appears */ - } conf; /* config information */ - struct list args; /* Arguments added when the SPOE messages is sent */ - struct list list; /* Used to chain SPOE messages */ + char *file; /* file where the SPOE message appears */ + int line; /* line where the SPOE message appears */ + } conf; /* config information */ + struct list args; /* Arguments added when the SPOE messages is sent */ + struct list list; /* Used to chain SPOE messages */ enum spoe_event event; /* SPOE_EV_* */ }; @@ -192,21 +198,32 @@ struct spoe_agent { unsigned int processing; /* Max time to process an event (in the main stream) */ } timeout; + /* Config info */ + char *engine_id; /* engine-id string */ char *var_pfx; /* Prefix used for vars set by the agent */ char *var_on_error; /* Variable to set when an error occured, in the TXN scope */ unsigned int flags; /* SPOE_FL_* */ - unsigned int cps_max; /* Maximum number of connections per second */ - unsigned int eps_max; /* Maximum number of errors per second */ - - struct list cache; /* List used to cache SPOE streams. In - * fact, we cache the SPOE applect ctx */ + unsigned int cps_max; /* Maximum # of connections per second */ + unsigned int eps_max; /* Maximum # of errors per second */ + unsigned int max_frame_size; /* Maximum frame size for this agent, before any negotiation */ + unsigned int min_applets; /* Minimum # applets alive at a time */ + unsigned int max_fpa; /* Maximum # of frames handled per applet at once */ struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent * for each supported events */ - struct list applet_wq; /* List of streams waiting for a SPOE applet */ - struct freq_ctr conn_per_sec; /* connections per second */ - struct freq_ctr err_per_sec; /* connetion errors per second */ + /* running info */ + unsigned int applets_act; /* # of applets alive at a time */ + unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */ + unsigned int sending_rate; /* the global sending rate */ + + struct freq_ctr conn_per_sec; /* connections per second */ + struct freq_ctr err_per_sec; /* connetion errors per second */ + + struct list applets; /* List of available SPOE applets */ + struct list sending_queue; /* Queue of streams waiting to send data */ + struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */ + }; /* SPOE filter configuration */ @@ -221,11 +238,11 @@ struct spoe_config { struct spoe_context { struct filter *filter; /* The SPOE filter */ struct stream *strm; /* The stream that should be offloaded */ - struct appctx *appctx; /* The SPOE appctx */ + struct list *messages; /* List of messages that will be sent during the stream processing */ struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */ struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */ - struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */ + struct list list; enum spoe_ctx_state state; /* SPOE_CTX_ST_* */ unsigned int flags; /* SPOE_CTX_FL_* */ @@ -266,9 +283,9 @@ char spoe_reason[256]; struct flt_ops spoe_ops; -static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx); -static void on_new_spoe_appctx_failure(struct spoe_agent *agent); -static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx); +static int queue_spoe_context(struct spoe_context *ctx); +static int acquire_spoe_buffer(struct spoe_context *ctx); +static void release_spoe_buffer(struct spoe_context *ctx); /******************************************************************** * helper functions/globals @@ -312,6 +329,7 @@ release_spoe_agent(struct spoe_agent *agent) free(agent->id); free(agent->conf.file); free(agent->var_pfx); + free(agent->engine_id); free(agent->var_on_error); for (i = 0; i < SPOE_EV_EVENTS; ++i) { list_for_each_entry_safe(msg, back, &agent->messages[i], list) { @@ -363,6 +381,7 @@ static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = { static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = { [SPOE_APPCTX_ST_CONNECT] = "CONNECT", [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING", + [SPOE_APPCTX_ST_IDLE] = "IDLE", [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING", [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT", [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING", @@ -371,6 +390,49 @@ static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = { }; #endif + +static char * +generate_pseudo_uuid() +{ + static int init = 0; + + const char uuid_fmt[] = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"; + const char uuid_chr[] = "0123456789ABCDEF-"; + char *uuid; + int i; + + if ((uuid = calloc(1, sizeof(uuid_fmt))) == NULL) + return NULL; + + if (!init) { + srand(now_ms); + init = 1; + } + + for (i = 0; i < sizeof(uuid_fmt)-1; i++) { + int r = rand () % 16; + + switch (uuid_fmt[i]) { + case 'x' : uuid[i] = uuid_chr[r]; break; + case 'y' : uuid[i] = uuid_chr[(r & 0x03) | 0x08]; break; + default : uuid[i] = uuid_fmt[i]; break; + } + } + return uuid; +} + +static inline unsigned int +min_applets_act(struct spoe_agent *agent) +{ + unsigned int nbsrv; + + if (agent->min_applets) + return agent->min_applets; + + nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck); + return 2*nbsrv; +} + /******************************************************************** * Functions that encode/decode SPOE frames ********************************************************************/ @@ -418,6 +480,7 @@ enum spoe_data_type { #define VERSION_KEY "version" #define MAX_FRAME_SIZE_KEY "max-frame-size" #define CAPABILITIES_KEY "capabilities" +#define ENGINE_ID_KEY "engine-id" #define HEALTHCHECK_KEY "healthcheck" #define STATUS_CODE_KEY "status-code" #define MSG_KEY "message" @@ -438,7 +501,8 @@ static struct spoe_version supported_versions[] = { #define SUPPORTED_VERSIONS_VAL "1.0" /* Comma-separated list of supported capabilities (none for now) */ -#define CAPABILITIES_VAL "" +//#define CAPABILITIES_VAL "" +#define CAPABILITIES_VAL "pipelining,async" static int decode_spoe_version(const char *str, size_t len) @@ -707,11 +771,13 @@ skip_spoe_action(char *frame, char *end) static int prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size) { + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; int idx = 0; size_t max = (7 /* TYPE + METADATA */ + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL) + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4 - + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)); + + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL) + + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36); if (size < max) return -1; @@ -745,6 +811,13 @@ prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size) frame[idx++] = SPOE_DATA_T_STR; idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx); + /* "engine-id" K/V item */ + if (agent != NULL && agent->engine_id != NULL) { + idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx); + frame[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx); + } + return idx; } @@ -798,10 +871,10 @@ prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size) /* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on * success, 0 if the frame can be ignored and -1 if an error occurred. */ static int -prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size) +prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, + char *frame, size_t size) { - struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; - int idx = 0; + int idx = 0; if (size < APPCTX_SPOE(appctx).max_frame_size) return -1; @@ -816,6 +889,10 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size) idx += encode_spoe_varint(ctx->stream_id, frame+idx); idx += encode_spoe_varint(ctx->frame_id, frame+idx); + /* Copy encoded messages */ + if (idx + ctx->buffer->i > size) + return 0; + /* Copy encoded messages */ memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i); idx += ctx->buffer->i; @@ -828,7 +905,7 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size) static int handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) { - int vsn, max_frame_size; + int vsn, max_frame_size, flags; int i, idx = 0; size_t min_size = (7 /* TYPE + METADATA */ + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3 @@ -858,7 +935,7 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) * "capabilities" */ /* Loop on K/V items */ - vsn = max_frame_size = 0; + vsn = max_frame_size = flags = 0; while (idx < size) { char *str; uint64_t sz; @@ -921,7 +998,42 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) } max_frame_size = sz; } - /* Skip "capabilities" K/V item for now */ + /* Check "capabilities" K/V item */ + else if (!memcmp(str, CAPABILITIES_KEY, sz)) { + int i; + + /* The value must be a string */ + if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); + if (str == NULL) + continue; + + i = 0; + while (i < sz) { + char *delim; + + /* Skip leading spaces */ + for (; isspace(str[i]) && i < sz; i++); + + if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) { + i += 10; + if (sz == i || isspace(str[i]) || str[i] == ',') + flags |= SPOE_APPCTX_FL_PIPELINING; + } + else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) { + i += 5; + if (sz == i || isspace(str[i]) || str[i] == ',') + flags |= SPOE_APPCTX_FL_ASYNC; + } + + if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL) + break; + i = (delim - str) + 1; + } + } else { /* Silently ignore unknown item */ if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) { @@ -944,6 +1056,7 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) APPCTX_SPOE(appctx).version = (unsigned int)vsn; APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size; + APPCTX_SPOE(appctx).flags |= flags; return idx; } @@ -1041,14 +1154,15 @@ handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size) } -/* Decode ACK frame sent by an agent. It returns the number of by read bytes on +/* Decode ACK frame sent by an agent. It returns the number of read bytes on * success, 0 if the frame can be ignored and -1 if an error occurred. */ static int handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) { - struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + struct spoe_context *ctx, *back; uint64_t stream_id, frame_id; - int idx = 0; + int i, idx = 0; size_t min_size = (7 /* TYPE + METADATA */); /* Check frame type */ @@ -1064,19 +1178,45 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) idx += 4; /* Get the stream-id and the frame-id */ - idx += decode_spoe_varint(frame+idx, frame+size, &stream_id); - idx += decode_spoe_varint(frame+idx, frame+size, &frame_id); - - /* Check stream-id and frame-id */ - if (ctx->stream_id != (unsigned int)stream_id || - ctx->frame_id != (unsigned int)frame_id) + if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1) return 0; + idx += i; + if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1) + return 0; + idx += i; + + if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) { + list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) { + if (ctx->stream_id == (unsigned int)stream_id && + ctx->frame_id == (unsigned int)frame_id) + goto found; + } + } + else { + list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) { + if (ctx->stream_id == (unsigned int)stream_id && + ctx->frame_id == (unsigned int)frame_id) + goto found; + } + } + + /* No Stream found, ignore the frame */ + return 0; + + found: + if (acquire_spoe_buffer(ctx) <= 0) + return 1; /* Retry later */ /* Copy encoded actions */ - b_reset(ctx->buffer); memcpy(ctx->buffer->p, frame+idx, size-idx); ctx->buffer->i = size-idx; + /* Notify the stream */ + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_DONE; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + return idx; } @@ -1093,7 +1233,7 @@ prepare_spoe_healthcheck_request(char **req, int *len) memset(&a, 0, sizeof(a)); memset(buf, 0, sizeof(buf)); - APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize; + APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4; frame = buf+4; idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4); @@ -1126,7 +1266,7 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen int r; memset(&a, 0, sizeof(a)); - APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize; + APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4; if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0) goto error; @@ -1145,6 +1285,62 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen return -1; } +/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when + * the frame can be ignored, 1 to retry later, and the frame legnth on + * success. */ +static int +send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) +{ + struct stream_interface *si = appctx->owner; + int ret; + uint32_t netint; + + if (si_ic(si)->buf == &buf_empty) + return 1; + + netint = htonl(framesz); + memcpy(buf, (char *)&netint, 4); + ret = bi_putblk(si_ic(si), buf, framesz+4); + + if (ret <= 0) { + if (ret == -1) + return 1; /* retry */ + return -1; /* error */ + } + return framesz; +} + +/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0 + * when the frame can be ignored, 1 to retry later and the frame length on + * success. */ +static int +recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) +{ + struct stream_interface *si = appctx->owner; + int ret; + uint32_t netint; + + if (si_oc(si)->buf == &buf_empty) + return 1; + + ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0); + if (ret > 0) { + framesz = ntohl(netint); + if (framesz > APPCTX_SPOE(appctx).max_frame_size) { + spoe_status_code = SPOE_FRM_ERR_TOO_BIG; + return -1; + } + ret = bo_getblk(si_oc(si), trash.str, framesz, 4); + } + if (ret <= 0) { + if (ret == 0) + return 1; /* retry */ + spoe_status_code = SPOE_FRM_ERR_IO; + return -1; /* error */ + } + return framesz; +} + /******************************************************************** * Functions that manage the SPOE applet ********************************************************************/ @@ -1161,29 +1357,11 @@ process_spoe_applet(struct task * task) appctx->st1 = SPOE_APPCTX_ERR_TOUT; } si_applet_want_get(appctx->owner); + si_applet_want_put(appctx->owner); appctx_wakeup(appctx); return task; } -/* Remove a SPOE applet from the agent cache */ -static void -remove_spoe_applet_from_cache(struct appctx *appctx) -{ - struct appctx *a, *back; - struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; - - if (LIST_ISEMPTY(&agent->cache)) - return; - - list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) { - if (a == appctx) { - LIST_DEL(&APPCTX_SPOE(appctx).list); - break; - } - } -} - - /* Callback function that releases a SPOE applet. This happens when the * connection with the agent is closed. */ static void @@ -1191,143 +1369,426 @@ release_spoe_applet(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; - struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; + struct spoe_context *ctx, *back; - if (appctx->st0 == SPOE_APPCTX_ST_CONNECT || - appctx->st0 == SPOE_APPCTX_ST_CONNECTING) - on_new_spoe_appctx_failure(agent); + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx); + + agent->applets_act--; + if (!LIST_ISEMPTY(&APPCTX_SPOE(appctx).list)) { + LIST_DEL(&APPCTX_SPOE(appctx).list); + LIST_INIT(&APPCTX_SPOE(appctx).list); + } if (appctx->st0 != SPOE_APPCTX_ST_END) { + if (appctx->st0 == SPOE_APPCTX_ST_IDLE) + agent->applets_idle--; + si_shutw(si); si_shutr(si); si_ic(si)->flags |= CF_READ_NULL; appctx->st0 = SPOE_APPCTX_ST_END; } - if (ctx != NULL) { - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - ctx->appctx = NULL; - } - - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, appctx); - - /* Release the task attached to the SPOE applet */ if (APPCTX_SPOE(appctx).task) { task_delete(APPCTX_SPOE(appctx).task); task_free(APPCTX_SPOE(appctx).task); } - /* And remove it from the agent cache */ - remove_spoe_applet_from_cache(appctx); - APPCTX_SPOE(appctx).ctx = NULL; + list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) { + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + } + + if (!LIST_ISEMPTY(&agent->applets)) + return; + + list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) { + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + } + + list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) { + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + } } -/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when - * the frame can be ignored, 0 to retry later and 1 on success. The frame is - * encoded using the callback function . */ static int -send_spoe_frame(struct appctx *appctx, - int (*prepare)(struct appctx *, char *, size_t)) +handle_connect_spoe_applet(struct appctx *appctx) { - struct stream_interface *si = appctx->owner; - int framesz, ret; - uint32_t netint; + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + char *frame = trash.str; + int ret; - if (si_ic(si)->buf->size == 0) - return -1; + if (si->state <= SI_ST_CON) { + si_applet_want_put(si); + task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG); + goto stop; + } + if (si->state != SI_ST_EST) + goto exit; - ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size); - if (ret <= 0) - goto skip_or_error; - framesz = ret; - netint = htonl(framesz); - ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint)); - if (ret > 0) - ret = bi_putblk(si_ic(si), trash.str, framesz); - if (ret <= 0) { - if (ret == -1) - return -1; - return -2; + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx); + goto exit; + } + + if (APPCTX_SPOE(appctx).task->expire == TICK_ETERNITY) + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello); + + ret = prepare_spoe_hahello_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) + ret = send_spoe_frame(appctx, frame, ret); + + switch (ret) { + case -1: /* error */ + goto exit; + + case 0: /* ignore => an error, cannot be ignored */ + goto exit; + + case 1: /* retry later */ + si_applet_cant_put(si); + goto stop; + + default: /* CONNECT frame successfully sent */ + appctx->st0 = SPOE_APPCTX_ST_CONNECTING; + goto next; + } + + next: + return 0; + stop: + return 1; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; +} + +static int +handle_connecting_spoe_applet(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + char *frame = trash.str; + int ret, framesz = 0; + + + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) + goto exit; + + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx); + goto exit; + } + + ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) { + if (*frame == SPOE_FRM_T_AGENT_DISCON) { + appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + goto next; + } + framesz = ret; + ret = handle_spoe_agenthello_frame(appctx, frame, framesz); + } + + switch (ret) { + case -1: /* error */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto next; + + case 0: /* ignore */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto next; + + case 1: /* retry later */ + goto stop; + + default: + /* hello handshake is finished, set the idle timeout, + * Add the appctx in the agent cache, decrease the + * number of new applets and wake up waiting streams. */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + agent->applets_idle++; + appctx->st0 = SPOE_APPCTX_ST_IDLE; + goto next; + } + + next: + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + return 0; + stop: + return 1; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; +} + +static int +handle_processing_spoe_applet(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + struct spoe_context *ctx; + char *frame = trash.str; + unsigned int fpa = 0; + int ret, framesz = 0, skip_sending = 0, skip_receiving = 0; + + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) + goto exit; + + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + spoe_status_code = SPOE_FRM_ERR_TOUT; + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + appctx->st1 = SPOE_APPCTX_ERR_NONE; + goto next; + } + + process: + if (fpa > agent->max_fpa || (skip_sending && skip_receiving)) + goto stop; + + /* Frames must be handled synchronously and a the applet is waiting for + * a ACK frame */ + if (!(APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) && + !LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) { + if (skip_receiving) + goto stop; + goto recv_frame; + } + + if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) { + skip_sending = 1; + goto recv_frame; + } + + ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list); + ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) + ret = send_spoe_frame(appctx, frame, ret); + + switch (ret) { + case -1: /* error */ + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto next; + + case 0: /* ignore */ + agent->sending_rate++; + ctx->state = SPOE_CTX_ST_ERROR; + release_spoe_buffer(ctx); + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + fpa++; + break; + + case 1: /* retry */ + si_applet_cant_put(si); + skip_sending = 1; + break; + + default: + agent->sending_rate++; + ctx->state = SPOE_CTX_ST_WAITING_ACK; + release_spoe_buffer(ctx); + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) + LIST_ADDQ(&agent->waiting_queue, &ctx->list); + else + LIST_ADDQ(&APPCTX_SPOE(appctx).waiting_queue, &ctx->list); + fpa++; + } + + if (fpa > agent->max_fpa) + goto stop; + + recv_frame: + if (skip_receiving) + goto process; + + framesz = 0; + ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) { + if (*frame == SPOE_FRM_T_AGENT_DISCON) { + appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + goto next; + } + framesz = ret; + ret = handle_spoe_agentack_frame(appctx, frame, framesz); + } + + switch (ret) { + case -1: /* error */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto next; + + case 0: /* ignore */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + fpa++; + break; + + case 1: /* retry */ + skip_receiving = 1; + break; + + default: + if (framesz) + bo_skip(si_oc(si), framesz+4); + fpa++; + } + goto process; + + next: + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + return 0; + stop: + if ((APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) || + LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) { + agent->applets_idle++; + appctx->st0 = SPOE_APPCTX_ST_IDLE; + } + if (fpa || (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_PERSIST)) { + LIST_DEL(&APPCTX_SPOE(appctx).list); + LIST_ADD(&agent->applets, &APPCTX_SPOE(appctx).list); + if (fpa) + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); } return 1; - skip_or_error: - if (!ret) - return -1; - return -2; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; } -/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1 - * when the frame can be ignored, 0 to retry later and 1 on success. The frame - * is decoded using the callback function . */ static int -recv_spoe_frame(struct appctx *appctx, - int (*handle)(struct appctx *, char *, size_t)) +handle_disconnect_spoe_applet(struct appctx *appctx) { - struct stream_interface *si = appctx->owner; - int framesz, ret; - uint32_t netint; + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + char *frame = trash.str; + int ret; - ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0); - if (ret <= 0) - goto empty_or_error; - framesz = ntohl(netint); - if (framesz > APPCTX_SPOE(appctx).max_frame_size) { - spoe_status_code = SPOE_FRM_ERR_TOO_BIG; - return -2; + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) + goto exit; + + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) + goto exit; + + ret = prepare_spoe_hadiscon_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) + ret = send_spoe_frame(appctx, frame, ret); + + switch (ret) { + case -1: /* error */ + goto exit; + + case 0: /* ignore */ + goto exit; + + case 1: /* retry */ + si_applet_cant_put(si); + goto stop; + + default: + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - disconnected by HAProxy (%d): %s\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx, spoe_status_code, + spoe_frm_err_reasons[spoe_status_code]); + + appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + goto next; } - ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint)); - if (ret <= 0) - goto empty_or_error; - bo_skip(si_oc(si), ret+sizeof(netint)); + next: + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + return 0; + stop: + return 1; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; +} - /* First check if the received frame is a DISCONNECT frame */ - ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz); - if (ret != 0) { - if (ret > 0) { +static int +handle_disconnecting_spoe_applet(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + char *frame = trash.str; + int ret, framesz = 0; + + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) + goto exit; + + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) + goto exit; + + framesz = 0; + ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) { + framesz = ret; + ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz); + } + + switch (ret) { + case -1: /* error */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - error on frame (%s)\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, + __FUNCTION__, appctx, + spoe_frm_err_reasons[spoe_status_code]); + goto exit; + + case 0: /* ignore */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + goto next; + + case 1: /* retry */ + goto stop; + + default: + if (framesz) + bo_skip(si_oc(si), framesz+4); SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" " - disconnected by peer (%d): %s\n", (int)now.tv_sec, (int)now.tv_usec, ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, __FUNCTION__, appctx, spoe_status_code, spoe_reason); - return 2; - } - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - error on frame (%s)\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, - __FUNCTION__, appctx, - spoe_frm_err_reasons[spoe_status_code]); - return -2; + goto exit; } - if (handle == NULL) - goto out; - /* If not, try to decode it */ - ret = handle(appctx, trash.str, framesz); - if (ret <= 0) { - if (!ret) - return -1; - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - error on frame (%s)\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, - __FUNCTION__, appctx, - spoe_frm_err_reasons[spoe_status_code]); - return -2; - } - out: + next: + return 0; + stop: return 1; - - empty_or_error: - if (!ret) - return 0; - spoe_status_code = SPOE_FRM_ERR_IO; - return -2; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; } /* I/O Handler processing messages exchanged with the agent */ @@ -1335,12 +1796,9 @@ static void handle_spoe_applet(struct appctx *appctx) { struct stream_interface *si = appctx->owner; - struct stream *s = si_strm(si); struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; - struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; - int ret; - switchstate: + switchstate: SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" " - appctx-state=%s\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, @@ -1349,177 +1807,41 @@ handle_spoe_applet(struct appctx *appctx) switch (appctx->st0) { case SPOE_APPCTX_ST_CONNECT: spoe_status_code = SPOE_FRM_ERR_NONE; - if (si->state <= SI_ST_CON) { - si_applet_want_put(si); - task_wakeup(s->task, TASK_WOKEN_MSG); - break; - } - else if (si->state != SI_ST_EST) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame); - if (ret < 0) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - else if (!ret) - goto full; - - /* Hello frame was sent. Set the hello timeout and - * wait for the reply. */ - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello); - appctx->st0 = SPOE_APPCTX_ST_CONNECTING; - /* fall through */ + if (handle_connect_spoe_applet(appctx)) + goto out; + goto switchstate; case SPOE_APPCTX_ST_CONNECTING: - if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - Connection timed out\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, - __FUNCTION__, appctx); - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame); - if (ret < 0) { - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - if (ret == 2) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - if (!ret) + if (handle_connecting_spoe_applet(appctx)) goto out; + goto switchstate; - /* hello handshake is finished, set the idle timeout, - * Add the appctx in the agent cache, decrease the - * number of new applets and wake up waiting streams. */ - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - on_new_spoe_appctx_success(agent, appctx); - break; - - case SPOE_APPCTX_ST_PROCESSING: - if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - spoe_status_code = SPOE_FRM_ERR_TOUT; + case SPOE_APPCTX_ST_IDLE: + if (stopping && + LIST_ISEMPTY(&agent->sending_queue) && + LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) { + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - appctx->st1 = SPOE_APPCTX_ERR_NONE; goto switchstate; } - if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) { - ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame); - if (ret < 0) { - if (ret == -1) { - ctx->state = SPOE_CTX_ST_ERROR; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - goto skip_notify_frame; - } - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - else if (!ret) - goto full; - ctx->state = SPOE_CTX_ST_WAITING_ACK; - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - } - - skip_notify_frame: - if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) { - ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame); - if (ret < 0) { - if (ret == -1) - goto skip_notify_frame; - ctx->state = SPOE_CTX_ST_ERROR; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - goto switchstate; - } - if (!ret) - goto out; - if (ret == 2) { - ctx->state = SPOE_CTX_ST_ERROR; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - ctx->state = SPOE_CTX_ST_DONE; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - } - else { - if (stopping) { - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - goto switchstate; - } - - ret = recv_spoe_frame(appctx, NULL); - if (ret < 0) { - if (ret == -1) - goto skip_notify_frame; - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - goto switchstate; - } - if (!ret) - goto out; - if (ret == 2) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - } - break; - - case SPOE_APPCTX_ST_DISCONNECT: - ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame); - if (ret < 0) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - else if (!ret) - goto full; - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - disconnected by HAProxy (%d): %s\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, - __FUNCTION__, appctx, spoe_status_code, - spoe_frm_err_reasons[spoe_status_code]); - - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + agent->applets_idle--; + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; /* fall through */ + case SPOE_APPCTX_ST_PROCESSING: + if (handle_processing_spoe_applet(appctx)) + goto out; + goto switchstate; + + case SPOE_APPCTX_ST_DISCONNECT: + if (handle_disconnect_spoe_applet(appctx)) + goto out; + goto switchstate; + case SPOE_APPCTX_ST_DISCONNECTING: - if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - ret = recv_spoe_frame(appctx, NULL); - if (ret < 0 || ret == 2) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - break; + if (handle_disconnecting_spoe_applet(appctx)) + goto out; + goto switchstate; case SPOE_APPCTX_ST_EXIT: si_shutw(si); @@ -1532,16 +1854,11 @@ handle_spoe_applet(struct appctx *appctx) case SPOE_APPCTX_ST_END: return; } - - out: + out: if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY) task_queue(APPCTX_SPOE(appctx).task); si_oc(si)->flags |= CF_READ_DONTWAIT; task_wakeup(si_strm(si)->task, TASK_WOKEN_IO); - return; - full: - si_applet_cant_put(si); - goto out; } struct applet spoe_applet = { @@ -1568,13 +1885,15 @@ create_spoe_appctx(struct spoe_config *conf) if ((APPCTX_SPOE(appctx).task = task_new()) == NULL) goto out_free_appctx; APPCTX_SPOE(appctx).task->process = process_spoe_applet; - APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY; + APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;//tick_add_ifset(now_ms, conf->agent->timeout.hello); APPCTX_SPOE(appctx).task->context = appctx; APPCTX_SPOE(appctx).agent = conf->agent; - APPCTX_SPOE(appctx).ctx = NULL; APPCTX_SPOE(appctx).version = 0; - APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize; - task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT); + APPCTX_SPOE(appctx).max_frame_size = conf->agent->max_frame_size; + APPCTX_SPOE(appctx).flags = 0; + + LIST_INIT(&APPCTX_SPOE(appctx).list); + LIST_INIT(&APPCTX_SPOE(appctx).waiting_queue); sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type); if (!sess) @@ -1592,10 +1911,6 @@ create_spoe_appctx(struct spoe_config *conf) si_applet_cant_get(&strm->si[0]); appctx_wakeup(appctx); - /* Increase the per-process number of cumulated connections */ - if (conf->agent->cps_max > 0) - update_freq_ctr(&conf->agent->conn_per_sec, 1); - strm->do_log = NULL; strm->res.flags |= CF_READ_DONTWAIT; @@ -1603,6 +1918,9 @@ create_spoe_appctx(struct spoe_config *conf) jobs++; totalconn++; + task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT); + LIST_ADDQ(&conf->agent->applets, &APPCTX_SPOE(appctx).list); + conf->agent->applets_act++; return appctx; /* Error unrolling */ @@ -1618,200 +1936,92 @@ create_spoe_appctx(struct spoe_config *conf) return NULL; } -/* Wake up a SPOE applet attached to a SPOE context. */ -static void -wakeup_spoe_appctx(struct spoe_context *ctx) -{ - if (ctx->appctx == NULL) - return; - if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) { - si_applet_want_get(ctx->appctx->owner); - si_applet_want_put(ctx->appctx->owner); - appctx_wakeup(ctx->appctx); - } -} - - -/* Run across the list of pending streams waiting for a SPOE applet and wake the - * first. */ -static void -offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx) -{ - struct spoe_context *ctx; - - if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING) - return; - - if (LIST_ISEMPTY(&agent->applet_wq)) - LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list); - else { - ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait); - APPCTX_SPOE(appctx).ctx = ctx; - ctx->appctx = appctx; - LIST_DEL(&ctx->applet_wait); - LIST_INIT(&ctx->applet_wait); - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - wake up stream to get available SPOE applet\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm); - } -} - -/* A failure occurred during SPOE applet creation. */ -static void -on_new_spoe_appctx_failure(struct spoe_agent *agent) -{ - struct spoe_context *ctx; - - list_for_each_entry(ctx, &agent->applet_wq, applet_wait) { - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - wake up stream because to SPOE applet connection failed\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm); - } -} - -static void -on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx) -{ - offer_spoe_appctx(agent, appctx); -} -/* Retrieve a SPOE applet from the agent cache if possible, else create it. It - * returns 1 on success, 0 to retry later and -1 if an error occurred. */ static int -acquire_spoe_appctx(struct spoe_context *ctx, int dir) +queue_spoe_context(struct spoe_context *ctx) { struct spoe_config *conf = FLT_CONF(ctx->filter); struct spoe_agent *agent = conf->agent; struct appctx *appctx; + unsigned int min_applets; - /* If a process is already started for this SPOE context, retry - * later. */ - if (ctx->flags & SPOE_CTX_FL_PROCESS) - goto wait; + min_applets = min_applets_act(agent); - /* If needed, initialize the buffer that will be used to encode messages - * and decode actions. */ - if (ctx->buffer == &buf_empty) { - if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { - LIST_DEL(&ctx->buffer_wait.list); - LIST_INIT(&ctx->buffer_wait.list); - } - - if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) { - LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list); - goto wait; - } - } - - /* If the SPOE applet was already set, all is done. */ - if (ctx->appctx) - goto success; - - /* Else try to retrieve it from the agent cache */ - if (!LIST_ISEMPTY(&agent->cache)) { - appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list); - LIST_DEL(&APPCTX_SPOE(appctx).list); - APPCTX_SPOE(appctx).ctx = ctx; - ctx->appctx = appctx; - goto success; - } - - /* If there is no server up for the agent's backend, this is an - * error. */ - if (!agent->b.be->srv_act && !agent->b.be->srv_bck) - goto error; + /* Check if we need to create a new SPOE applet or not. */ + if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate) + goto end; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - waiting for available SPOE appctx\n", + " - try to create new SPOE appctx\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, ctx->strm); - /* Else add the stream in the waiting queue. */ - if (LIST_ISEMPTY(&ctx->applet_wait)) - LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait); + /* Do not try to create a new applet if there is no server up for the + * agent's backend. */ + if (!agent->b.be->srv_act && !agent->b.be->srv_bck) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - cannot create SPOE appctx: no server up\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm); + goto end; + } - /* Finally, create new SPOE applet if we can */ + /* Do not try to create a new applet if we have reached the maximum of + * connection per seconds */ if (agent->cps_max > 0) { - if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) - goto wait; - } - if (create_spoe_appctx(conf) == NULL) - goto error; - - wait: - return 0; - - success: - /* Remove the stream from the waiting queue */ - if (!LIST_ISEMPTY(&ctx->applet_wait)) { - LIST_DEL(&ctx->applet_wait); - LIST_INIT(&ctx->applet_wait); + if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - cannot create SPOE appctx: max CPS reached\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm); + goto end; + } } - /* Set the right flag to prevent request and response processing - * in same time. */ - ctx->flags |= ((dir == SMP_OPT_DIR_REQ) - ? SPOE_CTX_FL_REQ_PROCESS - : SPOE_CTX_FL_RSP_PROCESS); + appctx = create_spoe_appctx(conf); + if (appctx == NULL) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - failed to create SPOE appctx\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm); + goto end; + } + if (agent->applets_act <= min_applets) + APPCTX_SPOE(appctx).flags |= SPOE_APPCTX_FL_PERSIST; + + /* Increase the per-process number of cumulated connections */ + if (agent->cps_max > 0) + update_freq_ctr(&agent->conn_per_sec, 1); + + end: + /* The only reason to return an error is when there is no applet */ + if (LIST_ISEMPTY(&agent->applets)) + return 0; + + /* Add the SPOE context in the sending queue and update all running + * info */ + LIST_ADDQ(&agent->sending_queue, &ctx->list); + if (agent->sending_rate) + agent->sending_rate--; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - acquire SPOE appctx %p from cache\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm, ctx->appctx); + " - Add stream in sending queue - applets_act=%u - applets_idle=%u" + " - sending_rate=%u\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, + ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate); + + /* Finally try to wakeup the first IDLE applet found and move it at the + * end of the list. */ + list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) { + if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { + si_applet_want_get(appctx->owner); + si_applet_want_put(appctx->owner); + appctx_wakeup(appctx); + LIST_DEL(&APPCTX_SPOE(appctx).list); + LIST_ADDQ(&agent->applets, &APPCTX_SPOE(appctx).list); + break; + } + } return 1; - - error: - /* Remove the stream from the waiting queue */ - if (!LIST_ISEMPTY(&ctx->applet_wait)) { - LIST_DEL(&ctx->applet_wait); - LIST_INIT(&ctx->applet_wait); - } - - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - failed to acquire SPOE appctx\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm); - send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n"); - - return -1; -} - -/* Release a SPOE applet and push it in the agent cache. */ -static void -release_spoe_appctx(struct spoe_context *ctx) -{ - struct spoe_config *conf = FLT_CONF(ctx->filter); - struct spoe_agent *agent = conf->agent; - struct appctx *appctx = ctx->appctx; - - /* Reset the flag to allow next processing */ - ctx->flags &= ~SPOE_CTX_FL_PROCESS; - - /* Reset processing timer */ - ctx->process_exp = TICK_ETERNITY; - - /* Release the buffer if needed */ - if (ctx->buffer != &buf_empty) { - b_free(&ctx->buffer); - offer_buffers(ctx, tasks_run_queue + applets_active_queue); - } - - /* If there is no SPOE applet, all is done */ - if (!appctx) - return; - - /* Else, reassign it or push it in the agent cache */ - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - release SPOE appctx %p\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm, appctx); - - APPCTX_SPOE(appctx).ctx = NULL; - ctx->appctx = NULL; - offer_spoe_appctx(agent, appctx); } /*************************************************************************** @@ -1824,6 +2034,8 @@ static int process_spoe_messages(struct stream *s, struct spoe_context *ctx, struct list *messages, int dir) { + struct spoe_config *conf = FLT_CONF(ctx->filter); + struct spoe_agent *agent = conf->agent; struct spoe_message *msg; struct sample *smp; struct spoe_arg *arg; @@ -1832,9 +2044,8 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx, int off, flag, idx = 0; /* Reserve 32 bytes from the frame Metadata */ - max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32; + max_size = agent->max_frame_size - 32; - b_reset(ctx->buffer); p = ctx->buffer->p; /* Loop on messages */ @@ -1937,7 +2148,6 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx, return 1; skip: - b_reset(ctx->buffer); return 0; } @@ -2081,6 +2291,47 @@ process_spoe_actions(struct stream *s, struct spoe_context *ctx, return 0; } +static int +start_event_processing(struct spoe_context *ctx, int dir) +{ + int ret; + /* If a process is already started for this SPOE context, retry + * later. */ + if (ctx->flags & SPOE_CTX_FL_PROCESS) + goto wait; + + ret = acquire_spoe_buffer(ctx); + if (ret <= 0) + return ret; + + /* Set the right flag to prevent request and response processing + * in same time. */ + ctx->flags |= ((dir == SMP_OPT_DIR_REQ) + ? SPOE_CTX_FL_REQ_PROCESS + : SPOE_CTX_FL_RSP_PROCESS); + + return 1; + + wait: + return 0; +} + +static void +stop_event_processing(struct spoe_context *ctx) +{ + /* Reset the flag to allow next processing */ + ctx->flags &= ~SPOE_CTX_FL_PROCESS; + + /* Reset processing timer */ + ctx->process_exp = TICK_ETERNITY; + + release_spoe_buffer(ctx); + + if (!LIST_ISEMPTY(&ctx->list)) { + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + } +} /* Process a SPOE event. First, this functions will process messages attached to * this event and send them to an agent in a NOTIFY frame. Then, it will wait a @@ -2101,15 +2352,6 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state], spoe_event_str[ev]); - if (agent->eps_max > 0) { - if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) { - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - skip event '%s': max EPS reached\n", - (int)now.tv_sec, (int)now.tv_usec, - agent->id, __FUNCTION__, s, spoe_event_str[ev]); - goto skip; - } - } dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES); @@ -2131,38 +2373,43 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, } if (ctx->state == SPOE_CTX_ST_READY) { + if (agent->eps_max > 0) { + if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - skip event '%s': max EPS reached\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, spoe_event_str[ev]); + goto skip; + } + } + if (!tick_isset(ctx->process_exp)) { ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing); s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire), ctx->process_exp); } - - ret = acquire_spoe_appctx(ctx, dir); + ret = start_event_processing(ctx, dir); if (ret <= 0) { if (!ret) goto out; goto error; } - ctx->state = SPOE_CTX_ST_SENDING_MSGS; - } - - if (ctx->appctx == NULL) - goto error; - - if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) { ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir); if (ret <= 0) { if (!ret) goto skip; goto error; } - wakeup_spoe_appctx(ctx); - ret = 0; - goto out; + + if (!queue_spoe_context(ctx)) + goto error; + + ctx->state = SPOE_CTX_ST_SENDING_MSGS; + /* fall through */ } - if (ctx->state == SPOE_CTX_ST_WAITING_ACK) { - wakeup_spoe_appctx(ctx); + if (ctx->state == SPOE_CTX_ST_SENDING_MSGS || + ctx->state == SPOE_CTX_ST_WAITING_ACK) { ret = 0; goto out; } @@ -2175,18 +2422,13 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, goto error; } ctx->frame_id++; - release_spoe_appctx(ctx); ctx->state = SPOE_CTX_ST_READY; + goto end; } out: return ret; - skip: - release_spoe_appctx(ctx); - ctx->state = SPOE_CTX_ST_READY; - return 1; - error: if (agent->eps_max > 0) update_freq_ctr(&agent->err_per_sec, 1); @@ -2194,6 +2436,7 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, if (agent->var_on_error) { struct sample smp; + // FIXME: Get the error code here memset(&smp, 0, sizeof(smp)); smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); smp.data.u.sint = 1; @@ -2203,17 +2446,57 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, strlen(agent->var_on_error), &smp); } - release_spoe_appctx(ctx); ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR) ? SPOE_CTX_ST_READY : SPOE_CTX_ST_ERROR); - return 1; -} + ret = 1; + goto end; + skip: + ctx->state = SPOE_CTX_ST_READY; + ret = 1; + + end: + stop_event_processing(ctx); + return ret; +} /*************************************************************************** * Functions that create/destroy SPOE contexts **************************************************************************/ +static int +acquire_spoe_buffer(struct spoe_context *ctx) +{ + if (ctx->buffer != &buf_empty) + return 1; + + if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { + LIST_DEL(&ctx->buffer_wait.list); + LIST_INIT(&ctx->buffer_wait.list); + } + + if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) + return 1; + + LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list); + return 0; +} + +static void +release_spoe_buffer(struct spoe_context *ctx) +{ + if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { + LIST_DEL(&ctx->buffer_wait.list); + LIST_INIT(&ctx->buffer_wait.list); + } + + /* Release the buffer if needed */ + if (ctx->buffer != &buf_empty) { + b_free(&ctx->buffer); + offer_buffers(ctx, tasks_run_queue + applets_active_queue); + } +} + static int wakeup_spoe_context(struct spoe_context *ctx) { task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); @@ -2239,7 +2522,7 @@ create_spoe_context(struct filter *filter) LIST_INIT(&ctx->buffer_wait.list); ctx->buffer_wait.target = ctx; ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context; - LIST_INIT(&ctx->applet_wait); + LIST_INIT(&ctx->list); ctx->stream_id = 0; ctx->frame_id = 1; @@ -2254,12 +2537,10 @@ destroy_spoe_context(struct spoe_context *ctx) if (!ctx) return; - if (ctx->appctx) - APPCTX_SPOE(ctx->appctx).ctx = NULL; if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) LIST_DEL(&ctx->buffer_wait.list); - if (!LIST_ISEMPTY(&ctx->applet_wait)) - LIST_DEL(&ctx->applet_wait); + if (!LIST_ISEMPTY(&ctx->list)) + LIST_DEL(&ctx->list); pool_free2(pool2_spoe_ctx, ctx); } @@ -2295,7 +2576,7 @@ sig_stop_spoe(struct sig_handler *sh) conf = fconf->conf; agent = conf->agent; - list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) { + list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) { si_applet_want_get(appctx->owner); si_applet_want_put(appctx->owner); appctx_wakeup(appctx); @@ -2437,17 +2718,11 @@ spoe_start(struct stream *s, struct filter *filter) static void spoe_stop(struct stream *s, struct filter *filter) { - struct spoe_context *ctx = filter->ctx; - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n", (int)now.tv_sec, (int)now.tv_usec, ((struct spoe_config *)FLT_CONF(filter))->agent->id, __FUNCTION__, s); - - if (ctx) { - release_spoe_appctx(ctx); - destroy_spoe_context(ctx); - } + destroy_spoe_context(filter->ctx); } @@ -2461,10 +2736,7 @@ spoe_check_timeouts(struct stream *s, struct filter *filter) if (tick_is_expired(ctx->process_exp, now_ms)) { s->pending_events |= TASK_WOKEN_MSG; - if (ctx->buffer != &buf_empty) { - b_free(&ctx->buffer); - offer_buffers(ctx, tasks_run_queue + applets_active_queue); - } + release_spoe_buffer(ctx); } } @@ -2511,13 +2783,13 @@ spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn) goto out; } ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED; + if (!ret) { + channel_dont_read(chn); + channel_dont_close(chn); + } } out: - if (!ret) { - channel_dont_read(chn); - channel_dont_close(chn); - } return ret; } @@ -2654,21 +2926,34 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) } curagent->id = strdup(args[1]); + curagent->conf.file = strdup(file); curagent->conf.line = linenum; - curagent->timeout.hello = TICK_ETERNITY; - curagent->timeout.idle = TICK_ETERNITY; + + curagent->timeout.hello = TICK_ETERNITY; + curagent->timeout.idle = TICK_ETERNITY; curagent->timeout.processing = TICK_ETERNITY; - curagent->var_pfx = NULL; - curagent->var_on_error = NULL; - curagent->flags = 0; - curagent->cps_max = 0; - curagent->eps_max = 0; + + curagent->engine_id = NULL; + curagent->var_pfx = NULL; + curagent->var_on_error = NULL; + curagent->flags = 0; + curagent->cps_max = 0; + curagent->eps_max = 0; + curagent->max_frame_size = global.tune.bufsize - 4; + curagent->min_applets = 0; + curagent->max_fpa = 100; for (i = 0; i < SPOE_EV_EVENTS; ++i) LIST_INIT(&curagent->messages[i]); - LIST_INIT(&curagent->cache); - LIST_INIT(&curagent->applet_wq); + + curagent->applets_act = 0; + curagent->applets_idle = 0; + curagent->sending_rate = 0; + + LIST_INIT(&curagent->applets); + LIST_INIT(&curagent->sending_queue); + LIST_INIT(&curagent->waiting_queue); } else if (!strcmp(args[0], "use-backend")) { if (!*args[1]) { @@ -3114,6 +3399,8 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px, } curagent->var_pfx = strdup(curagent->id); } + if (curagent->engine_id == NULL) + curagent->engine_id = generate_pseudo_uuid(); if (LIST_ISEMPTY(&curmps)) { Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",