diff --git a/doc/SPOE.txt b/doc/SPOE.txt index 6d2c4ae4c6..d716c6b6b6 100644 --- a/doc/SPOE.txt +++ b/doc/SPOE.txt @@ -505,8 +505,23 @@ equal to 256 bytes. Here are the list of official capabilities that HAProxy and agents can support: - * fragmentation: This is the abaility for a peer to support fragmented - payload in received frames. + * fragmentation: This is the ability for a peer to support fragmented + payload in received frames. This is an asymmectical + capability, it only concerns the peer that announces + it. This is the responsibility to the other peer to use it + or not. + + * pipelining: This is the ability for a peer to decouple NOTIFY and ACK + frames. This is a symmectical capability. To be used, it must + be supported by HAproxy and agents. Unlike HTTP pipelining, the + ACK frames can be send in any order, but always on the same TCP + connection used for the corresponding NOTIFY frame. + + * async: This ability is similar to the pipelining, but here any TCP + connection established between HAProxy and the agent can be used to + send ACK frames. if an agent accepts connections from multiple + HAProxy, it can use the "engine-id" value to group TCP + connections. See details about HAPROXY-HELLO frame. Unsupported or unknown capabilities are silently ignored, when possible. @@ -653,6 +668,10 @@ Following optional items can be added in the KV-LIST: If this item is set to TRUE, then the HAPROXY-HELLO frame is sent during a SPOE health check. When set to FALSE, this item can be ignored. + * "engine-id" + + This is a uniq string that identify a SPOE engine. + To finish the HELLO handshake, the agent must return an AGENT-HELLO frame with its supported SPOP version, the lower value between its maximum size allowed for a frame and the HAProxy one and capabilities it supports. If an error @@ -834,7 +853,7 @@ Here is the list of supported actions: SESSION : <1> TRANSACTION : <2> REQUEST : <3> - RESERVED : <4> + RESPONSE : <4> * unset-var unset the value for an existing variable. 2 arguments must be attached to this action: the variable scope (proc, sess, txn, @@ -851,7 +870,7 @@ Here is the list of supported actions: SESSION : <1> TRANSACTION : <2> REQUEST : <3> - RESERVED : <4> + RESPONSE : <4> NOTE: Name of the variables will be automatically prefixed by HAProxy to avoid diff --git a/include/types/applet.h b/include/types/applet.h index 642c7931b9..851948b4a3 100644 --- a/include/types/applet.h +++ b/include/types/applet.h @@ -85,10 +85,11 @@ struct appctx { } hlua_apphttp; /* used by the Lua HTTP services */ struct { struct task *task; - void *ctx; void *agent; unsigned int version; unsigned int max_frame_size; + unsigned int flags; + struct list waiting_queue; struct list list; } spoe; /* used by SPOE filter */ struct { diff --git a/src/flt_spoe.c b/src/flt_spoe.c index f5918dc5f0..17290d2cc4 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -66,6 +66,11 @@ #define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS) +/* Flags set on the SPOE applet */ +#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */ +#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */ +#define SPOE_APPCTX_FL_PERSIST 0x00000004 /* Set if the applet is persistent */ + #define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */ #define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */ @@ -83,6 +88,7 @@ enum spoe_ctx_state { enum spoe_appctx_state { SPOE_APPCTX_ST_CONNECT = 0, SPOE_APPCTX_ST_CONNECTING, + SPOE_APPCTX_ST_IDLE, SPOE_APPCTX_ST_PROCESSING, SPOE_APPCTX_ST_DISCONNECT, SPOE_APPCTX_ST_DISCONNECTING, @@ -162,15 +168,15 @@ struct spoe_msg_placeholder { /* Describe a message that will be sent in a NOTIFY frame. A message has a name, * an argument list (see above) and it is linked to a specific event. */ struct spoe_message { - char *id; /* SPOE message id */ - unsigned int id_len; /* The message id length */ + char *id; /* SPOE message id */ + unsigned int id_len; /* The message id length */ struct spoe_agent *agent; /* SPOE agent owning this SPOE message */ struct { - char *file; /* file where the SPOE message appears */ - int line; /* line where the SPOE message appears */ - } conf; /* config information */ - struct list args; /* Arguments added when the SPOE messages is sent */ - struct list list; /* Used to chain SPOE messages */ + char *file; /* file where the SPOE message appears */ + int line; /* line where the SPOE message appears */ + } conf; /* config information */ + struct list args; /* Arguments added when the SPOE messages is sent */ + struct list list; /* Used to chain SPOE messages */ enum spoe_event event; /* SPOE_EV_* */ }; @@ -192,21 +198,32 @@ struct spoe_agent { unsigned int processing; /* Max time to process an event (in the main stream) */ } timeout; + /* Config info */ + char *engine_id; /* engine-id string */ char *var_pfx; /* Prefix used for vars set by the agent */ char *var_on_error; /* Variable to set when an error occured, in the TXN scope */ unsigned int flags; /* SPOE_FL_* */ - unsigned int cps_max; /* Maximum number of connections per second */ - unsigned int eps_max; /* Maximum number of errors per second */ - - struct list cache; /* List used to cache SPOE streams. In - * fact, we cache the SPOE applect ctx */ + unsigned int cps_max; /* Maximum # of connections per second */ + unsigned int eps_max; /* Maximum # of errors per second */ + unsigned int max_frame_size; /* Maximum frame size for this agent, before any negotiation */ + unsigned int min_applets; /* Minimum # applets alive at a time */ + unsigned int max_fpa; /* Maximum # of frames handled per applet at once */ struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent * for each supported events */ - struct list applet_wq; /* List of streams waiting for a SPOE applet */ - struct freq_ctr conn_per_sec; /* connections per second */ - struct freq_ctr err_per_sec; /* connetion errors per second */ + /* running info */ + unsigned int applets_act; /* # of applets alive at a time */ + unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */ + unsigned int sending_rate; /* the global sending rate */ + + struct freq_ctr conn_per_sec; /* connections per second */ + struct freq_ctr err_per_sec; /* connetion errors per second */ + + struct list applets; /* List of available SPOE applets */ + struct list sending_queue; /* Queue of streams waiting to send data */ + struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */ + }; /* SPOE filter configuration */ @@ -221,11 +238,11 @@ struct spoe_config { struct spoe_context { struct filter *filter; /* The SPOE filter */ struct stream *strm; /* The stream that should be offloaded */ - struct appctx *appctx; /* The SPOE appctx */ + struct list *messages; /* List of messages that will be sent during the stream processing */ struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */ struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */ - struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */ + struct list list; enum spoe_ctx_state state; /* SPOE_CTX_ST_* */ unsigned int flags; /* SPOE_CTX_FL_* */ @@ -266,9 +283,9 @@ char spoe_reason[256]; struct flt_ops spoe_ops; -static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx); -static void on_new_spoe_appctx_failure(struct spoe_agent *agent); -static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx); +static int queue_spoe_context(struct spoe_context *ctx); +static int acquire_spoe_buffer(struct spoe_context *ctx); +static void release_spoe_buffer(struct spoe_context *ctx); /******************************************************************** * helper functions/globals @@ -312,6 +329,7 @@ release_spoe_agent(struct spoe_agent *agent) free(agent->id); free(agent->conf.file); free(agent->var_pfx); + free(agent->engine_id); free(agent->var_on_error); for (i = 0; i < SPOE_EV_EVENTS; ++i) { list_for_each_entry_safe(msg, back, &agent->messages[i], list) { @@ -363,6 +381,7 @@ static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = { static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = { [SPOE_APPCTX_ST_CONNECT] = "CONNECT", [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING", + [SPOE_APPCTX_ST_IDLE] = "IDLE", [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING", [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT", [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING", @@ -371,6 +390,49 @@ static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = { }; #endif + +static char * +generate_pseudo_uuid() +{ + static int init = 0; + + const char uuid_fmt[] = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"; + const char uuid_chr[] = "0123456789ABCDEF-"; + char *uuid; + int i; + + if ((uuid = calloc(1, sizeof(uuid_fmt))) == NULL) + return NULL; + + if (!init) { + srand(now_ms); + init = 1; + } + + for (i = 0; i < sizeof(uuid_fmt)-1; i++) { + int r = rand () % 16; + + switch (uuid_fmt[i]) { + case 'x' : uuid[i] = uuid_chr[r]; break; + case 'y' : uuid[i] = uuid_chr[(r & 0x03) | 0x08]; break; + default : uuid[i] = uuid_fmt[i]; break; + } + } + return uuid; +} + +static inline unsigned int +min_applets_act(struct spoe_agent *agent) +{ + unsigned int nbsrv; + + if (agent->min_applets) + return agent->min_applets; + + nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck); + return 2*nbsrv; +} + /******************************************************************** * Functions that encode/decode SPOE frames ********************************************************************/ @@ -418,6 +480,7 @@ enum spoe_data_type { #define VERSION_KEY "version" #define MAX_FRAME_SIZE_KEY "max-frame-size" #define CAPABILITIES_KEY "capabilities" +#define ENGINE_ID_KEY "engine-id" #define HEALTHCHECK_KEY "healthcheck" #define STATUS_CODE_KEY "status-code" #define MSG_KEY "message" @@ -438,7 +501,8 @@ static struct spoe_version supported_versions[] = { #define SUPPORTED_VERSIONS_VAL "1.0" /* Comma-separated list of supported capabilities (none for now) */ -#define CAPABILITIES_VAL "" +//#define CAPABILITIES_VAL "" +#define CAPABILITIES_VAL "pipelining,async" static int decode_spoe_version(const char *str, size_t len) @@ -707,11 +771,13 @@ skip_spoe_action(char *frame, char *end) static int prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size) { + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; int idx = 0; size_t max = (7 /* TYPE + METADATA */ + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL) + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4 - + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)); + + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL) + + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36); if (size < max) return -1; @@ -745,6 +811,13 @@ prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size) frame[idx++] = SPOE_DATA_T_STR; idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx); + /* "engine-id" K/V item */ + if (agent != NULL && agent->engine_id != NULL) { + idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx); + frame[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx); + } + return idx; } @@ -798,10 +871,10 @@ prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size) /* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on * success, 0 if the frame can be ignored and -1 if an error occurred. */ static int -prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size) +prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx, + char *frame, size_t size) { - struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; - int idx = 0; + int idx = 0; if (size < APPCTX_SPOE(appctx).max_frame_size) return -1; @@ -816,6 +889,10 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size) idx += encode_spoe_varint(ctx->stream_id, frame+idx); idx += encode_spoe_varint(ctx->frame_id, frame+idx); + /* Copy encoded messages */ + if (idx + ctx->buffer->i > size) + return 0; + /* Copy encoded messages */ memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i); idx += ctx->buffer->i; @@ -828,7 +905,7 @@ prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size) static int handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) { - int vsn, max_frame_size; + int vsn, max_frame_size, flags; int i, idx = 0; size_t min_size = (7 /* TYPE + METADATA */ + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3 @@ -858,7 +935,7 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) * "capabilities" */ /* Loop on K/V items */ - vsn = max_frame_size = 0; + vsn = max_frame_size = flags = 0; while (idx < size) { char *str; uint64_t sz; @@ -921,7 +998,42 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) } max_frame_size = sz; } - /* Skip "capabilities" K/V item for now */ + /* Check "capabilities" K/V item */ + else if (!memcmp(str, CAPABILITIES_KEY, sz)) { + int i; + + /* The value must be a string */ + if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); + if (str == NULL) + continue; + + i = 0; + while (i < sz) { + char *delim; + + /* Skip leading spaces */ + for (; isspace(str[i]) && i < sz; i++); + + if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) { + i += 10; + if (sz == i || isspace(str[i]) || str[i] == ',') + flags |= SPOE_APPCTX_FL_PIPELINING; + } + else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) { + i += 5; + if (sz == i || isspace(str[i]) || str[i] == ',') + flags |= SPOE_APPCTX_FL_ASYNC; + } + + if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL) + break; + i = (delim - str) + 1; + } + } else { /* Silently ignore unknown item */ if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) { @@ -944,6 +1056,7 @@ handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) APPCTX_SPOE(appctx).version = (unsigned int)vsn; APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size; + APPCTX_SPOE(appctx).flags |= flags; return idx; } @@ -1041,14 +1154,15 @@ handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size) } -/* Decode ACK frame sent by an agent. It returns the number of by read bytes on +/* Decode ACK frame sent by an agent. It returns the number of read bytes on * success, 0 if the frame can be ignored and -1 if an error occurred. */ static int handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) { - struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + struct spoe_context *ctx, *back; uint64_t stream_id, frame_id; - int idx = 0; + int i, idx = 0; size_t min_size = (7 /* TYPE + METADATA */); /* Check frame type */ @@ -1064,19 +1178,45 @@ handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) idx += 4; /* Get the stream-id and the frame-id */ - idx += decode_spoe_varint(frame+idx, frame+size, &stream_id); - idx += decode_spoe_varint(frame+idx, frame+size, &frame_id); - - /* Check stream-id and frame-id */ - if (ctx->stream_id != (unsigned int)stream_id || - ctx->frame_id != (unsigned int)frame_id) + if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1) return 0; + idx += i; + if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1) + return 0; + idx += i; + + if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) { + list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) { + if (ctx->stream_id == (unsigned int)stream_id && + ctx->frame_id == (unsigned int)frame_id) + goto found; + } + } + else { + list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) { + if (ctx->stream_id == (unsigned int)stream_id && + ctx->frame_id == (unsigned int)frame_id) + goto found; + } + } + + /* No Stream found, ignore the frame */ + return 0; + + found: + if (acquire_spoe_buffer(ctx) <= 0) + return 1; /* Retry later */ /* Copy encoded actions */ - b_reset(ctx->buffer); memcpy(ctx->buffer->p, frame+idx, size-idx); ctx->buffer->i = size-idx; + /* Notify the stream */ + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_DONE; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + return idx; } @@ -1093,7 +1233,7 @@ prepare_spoe_healthcheck_request(char **req, int *len) memset(&a, 0, sizeof(a)); memset(buf, 0, sizeof(buf)); - APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize; + APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4; frame = buf+4; idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4); @@ -1126,7 +1266,7 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen int r; memset(&a, 0, sizeof(a)); - APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize; + APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4; if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0) goto error; @@ -1145,6 +1285,62 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen return -1; } +/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when + * the frame can be ignored, 1 to retry later, and the frame legnth on + * success. */ +static int +send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) +{ + struct stream_interface *si = appctx->owner; + int ret; + uint32_t netint; + + if (si_ic(si)->buf == &buf_empty) + return 1; + + netint = htonl(framesz); + memcpy(buf, (char *)&netint, 4); + ret = bi_putblk(si_ic(si), buf, framesz+4); + + if (ret <= 0) { + if (ret == -1) + return 1; /* retry */ + return -1; /* error */ + } + return framesz; +} + +/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0 + * when the frame can be ignored, 1 to retry later and the frame length on + * success. */ +static int +recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz) +{ + struct stream_interface *si = appctx->owner; + int ret; + uint32_t netint; + + if (si_oc(si)->buf == &buf_empty) + return 1; + + ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0); + if (ret > 0) { + framesz = ntohl(netint); + if (framesz > APPCTX_SPOE(appctx).max_frame_size) { + spoe_status_code = SPOE_FRM_ERR_TOO_BIG; + return -1; + } + ret = bo_getblk(si_oc(si), trash.str, framesz, 4); + } + if (ret <= 0) { + if (ret == 0) + return 1; /* retry */ + spoe_status_code = SPOE_FRM_ERR_IO; + return -1; /* error */ + } + return framesz; +} + /******************************************************************** * Functions that manage the SPOE applet ********************************************************************/ @@ -1161,29 +1357,11 @@ process_spoe_applet(struct task * task) appctx->st1 = SPOE_APPCTX_ERR_TOUT; } si_applet_want_get(appctx->owner); + si_applet_want_put(appctx->owner); appctx_wakeup(appctx); return task; } -/* Remove a SPOE applet from the agent cache */ -static void -remove_spoe_applet_from_cache(struct appctx *appctx) -{ - struct appctx *a, *back; - struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; - - if (LIST_ISEMPTY(&agent->cache)) - return; - - list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) { - if (a == appctx) { - LIST_DEL(&APPCTX_SPOE(appctx).list); - break; - } - } -} - - /* Callback function that releases a SPOE applet. This happens when the * connection with the agent is closed. */ static void @@ -1191,143 +1369,426 @@ release_spoe_applet(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; - struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; + struct spoe_context *ctx, *back; - if (appctx->st0 == SPOE_APPCTX_ST_CONNECT || - appctx->st0 == SPOE_APPCTX_ST_CONNECTING) - on_new_spoe_appctx_failure(agent); + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx); + + agent->applets_act--; + if (!LIST_ISEMPTY(&APPCTX_SPOE(appctx).list)) { + LIST_DEL(&APPCTX_SPOE(appctx).list); + LIST_INIT(&APPCTX_SPOE(appctx).list); + } if (appctx->st0 != SPOE_APPCTX_ST_END) { + if (appctx->st0 == SPOE_APPCTX_ST_IDLE) + agent->applets_idle--; + si_shutw(si); si_shutr(si); si_ic(si)->flags |= CF_READ_NULL; appctx->st0 = SPOE_APPCTX_ST_END; } - if (ctx != NULL) { - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - ctx->appctx = NULL; - } - - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, appctx); - - /* Release the task attached to the SPOE applet */ if (APPCTX_SPOE(appctx).task) { task_delete(APPCTX_SPOE(appctx).task); task_free(APPCTX_SPOE(appctx).task); } - /* And remove it from the agent cache */ - remove_spoe_applet_from_cache(appctx); - APPCTX_SPOE(appctx).ctx = NULL; + list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) { + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + } + + if (!LIST_ISEMPTY(&agent->applets)) + return; + + list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) { + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + } + + list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) { + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + } } -/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when - * the frame can be ignored, 0 to retry later and 1 on success. The frame is - * encoded using the callback function . */ static int -send_spoe_frame(struct appctx *appctx, - int (*prepare)(struct appctx *, char *, size_t)) +handle_connect_spoe_applet(struct appctx *appctx) { - struct stream_interface *si = appctx->owner; - int framesz, ret; - uint32_t netint; + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + char *frame = trash.str; + int ret; - if (si_ic(si)->buf->size == 0) - return -1; + if (si->state <= SI_ST_CON) { + si_applet_want_put(si); + task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG); + goto stop; + } + if (si->state != SI_ST_EST) + goto exit; - ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size); - if (ret <= 0) - goto skip_or_error; - framesz = ret; - netint = htonl(framesz); - ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint)); - if (ret > 0) - ret = bi_putblk(si_ic(si), trash.str, framesz); - if (ret <= 0) { - if (ret == -1) - return -1; - return -2; + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx); + goto exit; + } + + if (APPCTX_SPOE(appctx).task->expire == TICK_ETERNITY) + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello); + + ret = prepare_spoe_hahello_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) + ret = send_spoe_frame(appctx, frame, ret); + + switch (ret) { + case -1: /* error */ + goto exit; + + case 0: /* ignore => an error, cannot be ignored */ + goto exit; + + case 1: /* retry later */ + si_applet_cant_put(si); + goto stop; + + default: /* CONNECT frame successfully sent */ + appctx->st0 = SPOE_APPCTX_ST_CONNECTING; + goto next; + } + + next: + return 0; + stop: + return 1; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; +} + +static int +handle_connecting_spoe_applet(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + char *frame = trash.str; + int ret, framesz = 0; + + + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) + goto exit; + + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx); + goto exit; + } + + ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) { + if (*frame == SPOE_FRM_T_AGENT_DISCON) { + appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + goto next; + } + framesz = ret; + ret = handle_spoe_agenthello_frame(appctx, frame, framesz); + } + + switch (ret) { + case -1: /* error */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto next; + + case 0: /* ignore */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto next; + + case 1: /* retry later */ + goto stop; + + default: + /* hello handshake is finished, set the idle timeout, + * Add the appctx in the agent cache, decrease the + * number of new applets and wake up waiting streams. */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + agent->applets_idle++; + appctx->st0 = SPOE_APPCTX_ST_IDLE; + goto next; + } + + next: + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + return 0; + stop: + return 1; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; +} + +static int +handle_processing_spoe_applet(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + struct spoe_context *ctx; + char *frame = trash.str; + unsigned int fpa = 0; + int ret, framesz = 0, skip_sending = 0, skip_receiving = 0; + + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) + goto exit; + + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + spoe_status_code = SPOE_FRM_ERR_TOUT; + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + appctx->st1 = SPOE_APPCTX_ERR_NONE; + goto next; + } + + process: + if (fpa > agent->max_fpa || (skip_sending && skip_receiving)) + goto stop; + + /* Frames must be handled synchronously and a the applet is waiting for + * a ACK frame */ + if (!(APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) && + !LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) { + if (skip_receiving) + goto stop; + goto recv_frame; + } + + if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) { + skip_sending = 1; + goto recv_frame; + } + + ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list); + ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) + ret = send_spoe_frame(appctx, frame, ret); + + switch (ret) { + case -1: /* error */ + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto next; + + case 0: /* ignore */ + agent->sending_rate++; + ctx->state = SPOE_CTX_ST_ERROR; + release_spoe_buffer(ctx); + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + fpa++; + break; + + case 1: /* retry */ + si_applet_cant_put(si); + skip_sending = 1; + break; + + default: + agent->sending_rate++; + ctx->state = SPOE_CTX_ST_WAITING_ACK; + release_spoe_buffer(ctx); + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) + LIST_ADDQ(&agent->waiting_queue, &ctx->list); + else + LIST_ADDQ(&APPCTX_SPOE(appctx).waiting_queue, &ctx->list); + fpa++; + } + + if (fpa > agent->max_fpa) + goto stop; + + recv_frame: + if (skip_receiving) + goto process; + + framesz = 0; + ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) { + if (*frame == SPOE_FRM_T_AGENT_DISCON) { + appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + goto next; + } + framesz = ret; + ret = handle_spoe_agentack_frame(appctx, frame, framesz); + } + + switch (ret) { + case -1: /* error */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto next; + + case 0: /* ignore */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + fpa++; + break; + + case 1: /* retry */ + skip_receiving = 1; + break; + + default: + if (framesz) + bo_skip(si_oc(si), framesz+4); + fpa++; + } + goto process; + + next: + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + return 0; + stop: + if ((APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) || + LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) { + agent->applets_idle++; + appctx->st0 = SPOE_APPCTX_ST_IDLE; + } + if (fpa || (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_PERSIST)) { + LIST_DEL(&APPCTX_SPOE(appctx).list); + LIST_ADD(&agent->applets, &APPCTX_SPOE(appctx).list); + if (fpa) + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); } return 1; - skip_or_error: - if (!ret) - return -1; - return -2; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; } -/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1 - * when the frame can be ignored, 0 to retry later and 1 on success. The frame - * is decoded using the callback function . */ static int -recv_spoe_frame(struct appctx *appctx, - int (*handle)(struct appctx *, char *, size_t)) +handle_disconnect_spoe_applet(struct appctx *appctx) { - struct stream_interface *si = appctx->owner; - int framesz, ret; - uint32_t netint; + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + char *frame = trash.str; + int ret; - ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0); - if (ret <= 0) - goto empty_or_error; - framesz = ntohl(netint); - if (framesz > APPCTX_SPOE(appctx).max_frame_size) { - spoe_status_code = SPOE_FRM_ERR_TOO_BIG; - return -2; + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) + goto exit; + + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) + goto exit; + + ret = prepare_spoe_hadiscon_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) + ret = send_spoe_frame(appctx, frame, ret); + + switch (ret) { + case -1: /* error */ + goto exit; + + case 0: /* ignore */ + goto exit; + + case 1: /* retry */ + si_applet_cant_put(si); + goto stop; + + default: + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - disconnected by HAProxy (%d): %s\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx, spoe_status_code, + spoe_frm_err_reasons[spoe_status_code]); + + appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + goto next; } - ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint)); - if (ret <= 0) - goto empty_or_error; - bo_skip(si_oc(si), ret+sizeof(netint)); + next: + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + return 0; + stop: + return 1; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; +} - /* First check if the received frame is a DISCONNECT frame */ - ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz); - if (ret != 0) { - if (ret > 0) { +static int +handle_disconnecting_spoe_applet(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + char *frame = trash.str; + int ret, framesz = 0; + + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) + goto exit; + + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) + goto exit; + + framesz = 0; + ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size); + if (ret > 1) { + framesz = ret; + ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz); + } + + switch (ret) { + case -1: /* error */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - error on frame (%s)\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, + __FUNCTION__, appctx, + spoe_frm_err_reasons[spoe_status_code]); + goto exit; + + case 0: /* ignore */ + if (framesz) + bo_skip(si_oc(si), framesz+4); + goto next; + + case 1: /* retry */ + goto stop; + + default: + if (framesz) + bo_skip(si_oc(si), framesz+4); SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" " - disconnected by peer (%d): %s\n", (int)now.tv_sec, (int)now.tv_usec, ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, __FUNCTION__, appctx, spoe_status_code, spoe_reason); - return 2; - } - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - error on frame (%s)\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, - __FUNCTION__, appctx, - spoe_frm_err_reasons[spoe_status_code]); - return -2; + goto exit; } - if (handle == NULL) - goto out; - /* If not, try to decode it */ - ret = handle(appctx, trash.str, framesz); - if (ret <= 0) { - if (!ret) - return -1; - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - error on frame (%s)\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, - __FUNCTION__, appctx, - spoe_frm_err_reasons[spoe_status_code]); - return -2; - } - out: + next: + return 0; + stop: return 1; - - empty_or_error: - if (!ret) - return 0; - spoe_status_code = SPOE_FRM_ERR_IO; - return -2; + exit: + appctx->st0 = SPOE_APPCTX_ST_EXIT; + return 0; } /* I/O Handler processing messages exchanged with the agent */ @@ -1335,12 +1796,9 @@ static void handle_spoe_applet(struct appctx *appctx) { struct stream_interface *si = appctx->owner; - struct stream *s = si_strm(si); struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; - struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; - int ret; - switchstate: + switchstate: SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" " - appctx-state=%s\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, @@ -1349,177 +1807,41 @@ handle_spoe_applet(struct appctx *appctx) switch (appctx->st0) { case SPOE_APPCTX_ST_CONNECT: spoe_status_code = SPOE_FRM_ERR_NONE; - if (si->state <= SI_ST_CON) { - si_applet_want_put(si); - task_wakeup(s->task, TASK_WOKEN_MSG); - break; - } - else if (si->state != SI_ST_EST) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame); - if (ret < 0) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - else if (!ret) - goto full; - - /* Hello frame was sent. Set the hello timeout and - * wait for the reply. */ - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello); - appctx->st0 = SPOE_APPCTX_ST_CONNECTING; - /* fall through */ + if (handle_connect_spoe_applet(appctx)) + goto out; + goto switchstate; case SPOE_APPCTX_ST_CONNECTING: - if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - Connection timed out\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, - __FUNCTION__, appctx); - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame); - if (ret < 0) { - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - if (ret == 2) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - on_new_spoe_appctx_failure(agent); - goto switchstate; - } - if (!ret) + if (handle_connecting_spoe_applet(appctx)) goto out; + goto switchstate; - /* hello handshake is finished, set the idle timeout, - * Add the appctx in the agent cache, decrease the - * number of new applets and wake up waiting streams. */ - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - appctx->st0 = SPOE_APPCTX_ST_PROCESSING; - on_new_spoe_appctx_success(agent, appctx); - break; - - case SPOE_APPCTX_ST_PROCESSING: - if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - spoe_status_code = SPOE_FRM_ERR_TOUT; + case SPOE_APPCTX_ST_IDLE: + if (stopping && + LIST_ISEMPTY(&agent->sending_queue) && + LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) { + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - appctx->st1 = SPOE_APPCTX_ERR_NONE; goto switchstate; } - if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) { - ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame); - if (ret < 0) { - if (ret == -1) { - ctx->state = SPOE_CTX_ST_ERROR; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - goto skip_notify_frame; - } - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - else if (!ret) - goto full; - ctx->state = SPOE_CTX_ST_WAITING_ACK; - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - } - - skip_notify_frame: - if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) { - ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame); - if (ret < 0) { - if (ret == -1) - goto skip_notify_frame; - ctx->state = SPOE_CTX_ST_ERROR; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - goto switchstate; - } - if (!ret) - goto out; - if (ret == 2) { - ctx->state = SPOE_CTX_ST_ERROR; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - ctx->state = SPOE_CTX_ST_DONE; - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - } - else { - if (stopping) { - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - goto switchstate; - } - - ret = recv_spoe_frame(appctx, NULL); - if (ret < 0) { - if (ret == -1) - goto skip_notify_frame; - appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; - goto switchstate; - } - if (!ret) - goto out; - if (ret == 2) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - } - break; - - case SPOE_APPCTX_ST_DISCONNECT: - ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame); - if (ret < 0) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - else if (!ret) - goto full; - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" - " - disconnected by HAProxy (%d): %s\n", - (int)now.tv_sec, (int)now.tv_usec, - ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, - __FUNCTION__, appctx, spoe_status_code, - spoe_frm_err_reasons[spoe_status_code]); - - APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); - appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + agent->applets_idle--; + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; /* fall through */ + case SPOE_APPCTX_ST_PROCESSING: + if (handle_processing_spoe_applet(appctx)) + goto out; + goto switchstate; + + case SPOE_APPCTX_ST_DISCONNECT: + if (handle_disconnect_spoe_applet(appctx)) + goto out; + goto switchstate; + case SPOE_APPCTX_ST_DISCONNECTING: - if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - ret = recv_spoe_frame(appctx, NULL); - if (ret < 0 || ret == 2) { - appctx->st0 = SPOE_APPCTX_ST_EXIT; - goto switchstate; - } - break; + if (handle_disconnecting_spoe_applet(appctx)) + goto out; + goto switchstate; case SPOE_APPCTX_ST_EXIT: si_shutw(si); @@ -1532,16 +1854,11 @@ handle_spoe_applet(struct appctx *appctx) case SPOE_APPCTX_ST_END: return; } - - out: + out: if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY) task_queue(APPCTX_SPOE(appctx).task); si_oc(si)->flags |= CF_READ_DONTWAIT; task_wakeup(si_strm(si)->task, TASK_WOKEN_IO); - return; - full: - si_applet_cant_put(si); - goto out; } struct applet spoe_applet = { @@ -1568,13 +1885,15 @@ create_spoe_appctx(struct spoe_config *conf) if ((APPCTX_SPOE(appctx).task = task_new()) == NULL) goto out_free_appctx; APPCTX_SPOE(appctx).task->process = process_spoe_applet; - APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY; + APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;//tick_add_ifset(now_ms, conf->agent->timeout.hello); APPCTX_SPOE(appctx).task->context = appctx; APPCTX_SPOE(appctx).agent = conf->agent; - APPCTX_SPOE(appctx).ctx = NULL; APPCTX_SPOE(appctx).version = 0; - APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize; - task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT); + APPCTX_SPOE(appctx).max_frame_size = conf->agent->max_frame_size; + APPCTX_SPOE(appctx).flags = 0; + + LIST_INIT(&APPCTX_SPOE(appctx).list); + LIST_INIT(&APPCTX_SPOE(appctx).waiting_queue); sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type); if (!sess) @@ -1592,10 +1911,6 @@ create_spoe_appctx(struct spoe_config *conf) si_applet_cant_get(&strm->si[0]); appctx_wakeup(appctx); - /* Increase the per-process number of cumulated connections */ - if (conf->agent->cps_max > 0) - update_freq_ctr(&conf->agent->conn_per_sec, 1); - strm->do_log = NULL; strm->res.flags |= CF_READ_DONTWAIT; @@ -1603,6 +1918,9 @@ create_spoe_appctx(struct spoe_config *conf) jobs++; totalconn++; + task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT); + LIST_ADDQ(&conf->agent->applets, &APPCTX_SPOE(appctx).list); + conf->agent->applets_act++; return appctx; /* Error unrolling */ @@ -1618,200 +1936,92 @@ create_spoe_appctx(struct spoe_config *conf) return NULL; } -/* Wake up a SPOE applet attached to a SPOE context. */ -static void -wakeup_spoe_appctx(struct spoe_context *ctx) -{ - if (ctx->appctx == NULL) - return; - if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) { - si_applet_want_get(ctx->appctx->owner); - si_applet_want_put(ctx->appctx->owner); - appctx_wakeup(ctx->appctx); - } -} - - -/* Run across the list of pending streams waiting for a SPOE applet and wake the - * first. */ -static void -offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx) -{ - struct spoe_context *ctx; - - if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING) - return; - - if (LIST_ISEMPTY(&agent->applet_wq)) - LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list); - else { - ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait); - APPCTX_SPOE(appctx).ctx = ctx; - ctx->appctx = appctx; - LIST_DEL(&ctx->applet_wait); - LIST_INIT(&ctx->applet_wait); - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - wake up stream to get available SPOE applet\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm); - } -} - -/* A failure occurred during SPOE applet creation. */ -static void -on_new_spoe_appctx_failure(struct spoe_agent *agent) -{ - struct spoe_context *ctx; - - list_for_each_entry(ctx, &agent->applet_wq, applet_wait) { - task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - wake up stream because to SPOE applet connection failed\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm); - } -} - -static void -on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx) -{ - offer_spoe_appctx(agent, appctx); -} -/* Retrieve a SPOE applet from the agent cache if possible, else create it. It - * returns 1 on success, 0 to retry later and -1 if an error occurred. */ static int -acquire_spoe_appctx(struct spoe_context *ctx, int dir) +queue_spoe_context(struct spoe_context *ctx) { struct spoe_config *conf = FLT_CONF(ctx->filter); struct spoe_agent *agent = conf->agent; struct appctx *appctx; + unsigned int min_applets; - /* If a process is already started for this SPOE context, retry - * later. */ - if (ctx->flags & SPOE_CTX_FL_PROCESS) - goto wait; + min_applets = min_applets_act(agent); - /* If needed, initialize the buffer that will be used to encode messages - * and decode actions. */ - if (ctx->buffer == &buf_empty) { - if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { - LIST_DEL(&ctx->buffer_wait.list); - LIST_INIT(&ctx->buffer_wait.list); - } - - if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) { - LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list); - goto wait; - } - } - - /* If the SPOE applet was already set, all is done. */ - if (ctx->appctx) - goto success; - - /* Else try to retrieve it from the agent cache */ - if (!LIST_ISEMPTY(&agent->cache)) { - appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list); - LIST_DEL(&APPCTX_SPOE(appctx).list); - APPCTX_SPOE(appctx).ctx = ctx; - ctx->appctx = appctx; - goto success; - } - - /* If there is no server up for the agent's backend, this is an - * error. */ - if (!agent->b.be->srv_act && !agent->b.be->srv_bck) - goto error; + /* Check if we need to create a new SPOE applet or not. */ + if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate) + goto end; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - waiting for available SPOE appctx\n", + " - try to create new SPOE appctx\n", (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, ctx->strm); - /* Else add the stream in the waiting queue. */ - if (LIST_ISEMPTY(&ctx->applet_wait)) - LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait); + /* Do not try to create a new applet if there is no server up for the + * agent's backend. */ + if (!agent->b.be->srv_act && !agent->b.be->srv_bck) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - cannot create SPOE appctx: no server up\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm); + goto end; + } - /* Finally, create new SPOE applet if we can */ + /* Do not try to create a new applet if we have reached the maximum of + * connection per seconds */ if (agent->cps_max > 0) { - if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) - goto wait; - } - if (create_spoe_appctx(conf) == NULL) - goto error; - - wait: - return 0; - - success: - /* Remove the stream from the waiting queue */ - if (!LIST_ISEMPTY(&ctx->applet_wait)) { - LIST_DEL(&ctx->applet_wait); - LIST_INIT(&ctx->applet_wait); + if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - cannot create SPOE appctx: max CPS reached\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm); + goto end; + } } - /* Set the right flag to prevent request and response processing - * in same time. */ - ctx->flags |= ((dir == SMP_OPT_DIR_REQ) - ? SPOE_CTX_FL_REQ_PROCESS - : SPOE_CTX_FL_RSP_PROCESS); + appctx = create_spoe_appctx(conf); + if (appctx == NULL) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - failed to create SPOE appctx\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm); + goto end; + } + if (agent->applets_act <= min_applets) + APPCTX_SPOE(appctx).flags |= SPOE_APPCTX_FL_PERSIST; + + /* Increase the per-process number of cumulated connections */ + if (agent->cps_max > 0) + update_freq_ctr(&agent->conn_per_sec, 1); + + end: + /* The only reason to return an error is when there is no applet */ + if (LIST_ISEMPTY(&agent->applets)) + return 0; + + /* Add the SPOE context in the sending queue and update all running + * info */ + LIST_ADDQ(&agent->sending_queue, &ctx->list); + if (agent->sending_rate) + agent->sending_rate--; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - acquire SPOE appctx %p from cache\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm, ctx->appctx); + " - Add stream in sending queue - applets_act=%u - applets_idle=%u" + " - sending_rate=%u\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, + ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate); + + /* Finally try to wakeup the first IDLE applet found and move it at the + * end of the list. */ + list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) { + if (appctx->st0 == SPOE_APPCTX_ST_IDLE) { + si_applet_want_get(appctx->owner); + si_applet_want_put(appctx->owner); + appctx_wakeup(appctx); + LIST_DEL(&APPCTX_SPOE(appctx).list); + LIST_ADDQ(&agent->applets, &APPCTX_SPOE(appctx).list); + break; + } + } return 1; - - error: - /* Remove the stream from the waiting queue */ - if (!LIST_ISEMPTY(&ctx->applet_wait)) { - LIST_DEL(&ctx->applet_wait); - LIST_INIT(&ctx->applet_wait); - } - - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - failed to acquire SPOE appctx\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm); - send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n"); - - return -1; -} - -/* Release a SPOE applet and push it in the agent cache. */ -static void -release_spoe_appctx(struct spoe_context *ctx) -{ - struct spoe_config *conf = FLT_CONF(ctx->filter); - struct spoe_agent *agent = conf->agent; - struct appctx *appctx = ctx->appctx; - - /* Reset the flag to allow next processing */ - ctx->flags &= ~SPOE_CTX_FL_PROCESS; - - /* Reset processing timer */ - ctx->process_exp = TICK_ETERNITY; - - /* Release the buffer if needed */ - if (ctx->buffer != &buf_empty) { - b_free(&ctx->buffer); - offer_buffers(ctx, tasks_run_queue + applets_active_queue); - } - - /* If there is no SPOE applet, all is done */ - if (!appctx) - return; - - /* Else, reassign it or push it in the agent cache */ - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - release SPOE appctx %p\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm, appctx); - - APPCTX_SPOE(appctx).ctx = NULL; - ctx->appctx = NULL; - offer_spoe_appctx(agent, appctx); } /*************************************************************************** @@ -1824,6 +2034,8 @@ static int process_spoe_messages(struct stream *s, struct spoe_context *ctx, struct list *messages, int dir) { + struct spoe_config *conf = FLT_CONF(ctx->filter); + struct spoe_agent *agent = conf->agent; struct spoe_message *msg; struct sample *smp; struct spoe_arg *arg; @@ -1832,9 +2044,8 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx, int off, flag, idx = 0; /* Reserve 32 bytes from the frame Metadata */ - max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32; + max_size = agent->max_frame_size - 32; - b_reset(ctx->buffer); p = ctx->buffer->p; /* Loop on messages */ @@ -1937,7 +2148,6 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx, return 1; skip: - b_reset(ctx->buffer); return 0; } @@ -2081,6 +2291,47 @@ process_spoe_actions(struct stream *s, struct spoe_context *ctx, return 0; } +static int +start_event_processing(struct spoe_context *ctx, int dir) +{ + int ret; + /* If a process is already started for this SPOE context, retry + * later. */ + if (ctx->flags & SPOE_CTX_FL_PROCESS) + goto wait; + + ret = acquire_spoe_buffer(ctx); + if (ret <= 0) + return ret; + + /* Set the right flag to prevent request and response processing + * in same time. */ + ctx->flags |= ((dir == SMP_OPT_DIR_REQ) + ? SPOE_CTX_FL_REQ_PROCESS + : SPOE_CTX_FL_RSP_PROCESS); + + return 1; + + wait: + return 0; +} + +static void +stop_event_processing(struct spoe_context *ctx) +{ + /* Reset the flag to allow next processing */ + ctx->flags &= ~SPOE_CTX_FL_PROCESS; + + /* Reset processing timer */ + ctx->process_exp = TICK_ETERNITY; + + release_spoe_buffer(ctx); + + if (!LIST_ISEMPTY(&ctx->list)) { + LIST_DEL(&ctx->list); + LIST_INIT(&ctx->list); + } +} /* Process a SPOE event. First, this functions will process messages attached to * this event and send them to an agent in a NOTIFY frame. Then, it will wait a @@ -2101,15 +2352,6 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state], spoe_event_str[ev]); - if (agent->eps_max > 0) { - if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) { - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - skip event '%s': max EPS reached\n", - (int)now.tv_sec, (int)now.tv_usec, - agent->id, __FUNCTION__, s, spoe_event_str[ev]); - goto skip; - } - } dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES); @@ -2131,38 +2373,43 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, } if (ctx->state == SPOE_CTX_ST_READY) { + if (agent->eps_max > 0) { + if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - skip event '%s': max EPS reached\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, spoe_event_str[ev]); + goto skip; + } + } + if (!tick_isset(ctx->process_exp)) { ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing); s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire), ctx->process_exp); } - - ret = acquire_spoe_appctx(ctx, dir); + ret = start_event_processing(ctx, dir); if (ret <= 0) { if (!ret) goto out; goto error; } - ctx->state = SPOE_CTX_ST_SENDING_MSGS; - } - - if (ctx->appctx == NULL) - goto error; - - if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) { ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir); if (ret <= 0) { if (!ret) goto skip; goto error; } - wakeup_spoe_appctx(ctx); - ret = 0; - goto out; + + if (!queue_spoe_context(ctx)) + goto error; + + ctx->state = SPOE_CTX_ST_SENDING_MSGS; + /* fall through */ } - if (ctx->state == SPOE_CTX_ST_WAITING_ACK) { - wakeup_spoe_appctx(ctx); + if (ctx->state == SPOE_CTX_ST_SENDING_MSGS || + ctx->state == SPOE_CTX_ST_WAITING_ACK) { ret = 0; goto out; } @@ -2175,18 +2422,13 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, goto error; } ctx->frame_id++; - release_spoe_appctx(ctx); ctx->state = SPOE_CTX_ST_READY; + goto end; } out: return ret; - skip: - release_spoe_appctx(ctx); - ctx->state = SPOE_CTX_ST_READY; - return 1; - error: if (agent->eps_max > 0) update_freq_ctr(&agent->err_per_sec, 1); @@ -2194,6 +2436,7 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, if (agent->var_on_error) { struct sample smp; + // FIXME: Get the error code here memset(&smp, 0, sizeof(smp)); smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); smp.data.u.sint = 1; @@ -2203,17 +2446,57 @@ process_spoe_event(struct stream *s, struct spoe_context *ctx, strlen(agent->var_on_error), &smp); } - release_spoe_appctx(ctx); ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR) ? SPOE_CTX_ST_READY : SPOE_CTX_ST_ERROR); - return 1; -} + ret = 1; + goto end; + skip: + ctx->state = SPOE_CTX_ST_READY; + ret = 1; + + end: + stop_event_processing(ctx); + return ret; +} /*************************************************************************** * Functions that create/destroy SPOE contexts **************************************************************************/ +static int +acquire_spoe_buffer(struct spoe_context *ctx) +{ + if (ctx->buffer != &buf_empty) + return 1; + + if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { + LIST_DEL(&ctx->buffer_wait.list); + LIST_INIT(&ctx->buffer_wait.list); + } + + if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) + return 1; + + LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list); + return 0; +} + +static void +release_spoe_buffer(struct spoe_context *ctx) +{ + if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) { + LIST_DEL(&ctx->buffer_wait.list); + LIST_INIT(&ctx->buffer_wait.list); + } + + /* Release the buffer if needed */ + if (ctx->buffer != &buf_empty) { + b_free(&ctx->buffer); + offer_buffers(ctx, tasks_run_queue + applets_active_queue); + } +} + static int wakeup_spoe_context(struct spoe_context *ctx) { task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); @@ -2239,7 +2522,7 @@ create_spoe_context(struct filter *filter) LIST_INIT(&ctx->buffer_wait.list); ctx->buffer_wait.target = ctx; ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context; - LIST_INIT(&ctx->applet_wait); + LIST_INIT(&ctx->list); ctx->stream_id = 0; ctx->frame_id = 1; @@ -2254,12 +2537,10 @@ destroy_spoe_context(struct spoe_context *ctx) if (!ctx) return; - if (ctx->appctx) - APPCTX_SPOE(ctx->appctx).ctx = NULL; if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) LIST_DEL(&ctx->buffer_wait.list); - if (!LIST_ISEMPTY(&ctx->applet_wait)) - LIST_DEL(&ctx->applet_wait); + if (!LIST_ISEMPTY(&ctx->list)) + LIST_DEL(&ctx->list); pool_free2(pool2_spoe_ctx, ctx); } @@ -2295,7 +2576,7 @@ sig_stop_spoe(struct sig_handler *sh) conf = fconf->conf; agent = conf->agent; - list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) { + list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) { si_applet_want_get(appctx->owner); si_applet_want_put(appctx->owner); appctx_wakeup(appctx); @@ -2437,17 +2718,11 @@ spoe_start(struct stream *s, struct filter *filter) static void spoe_stop(struct stream *s, struct filter *filter) { - struct spoe_context *ctx = filter->ctx; - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n", (int)now.tv_sec, (int)now.tv_usec, ((struct spoe_config *)FLT_CONF(filter))->agent->id, __FUNCTION__, s); - - if (ctx) { - release_spoe_appctx(ctx); - destroy_spoe_context(ctx); - } + destroy_spoe_context(filter->ctx); } @@ -2461,10 +2736,7 @@ spoe_check_timeouts(struct stream *s, struct filter *filter) if (tick_is_expired(ctx->process_exp, now_ms)) { s->pending_events |= TASK_WOKEN_MSG; - if (ctx->buffer != &buf_empty) { - b_free(&ctx->buffer); - offer_buffers(ctx, tasks_run_queue + applets_active_queue); - } + release_spoe_buffer(ctx); } } @@ -2511,13 +2783,13 @@ spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn) goto out; } ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED; + if (!ret) { + channel_dont_read(chn); + channel_dont_close(chn); + } } out: - if (!ret) { - channel_dont_read(chn); - channel_dont_close(chn); - } return ret; } @@ -2654,21 +2926,34 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) } curagent->id = strdup(args[1]); + curagent->conf.file = strdup(file); curagent->conf.line = linenum; - curagent->timeout.hello = TICK_ETERNITY; - curagent->timeout.idle = TICK_ETERNITY; + + curagent->timeout.hello = TICK_ETERNITY; + curagent->timeout.idle = TICK_ETERNITY; curagent->timeout.processing = TICK_ETERNITY; - curagent->var_pfx = NULL; - curagent->var_on_error = NULL; - curagent->flags = 0; - curagent->cps_max = 0; - curagent->eps_max = 0; + + curagent->engine_id = NULL; + curagent->var_pfx = NULL; + curagent->var_on_error = NULL; + curagent->flags = 0; + curagent->cps_max = 0; + curagent->eps_max = 0; + curagent->max_frame_size = global.tune.bufsize - 4; + curagent->min_applets = 0; + curagent->max_fpa = 100; for (i = 0; i < SPOE_EV_EVENTS; ++i) LIST_INIT(&curagent->messages[i]); - LIST_INIT(&curagent->cache); - LIST_INIT(&curagent->applet_wq); + + curagent->applets_act = 0; + curagent->applets_idle = 0; + curagent->sending_rate = 0; + + LIST_INIT(&curagent->applets); + LIST_INIT(&curagent->sending_queue); + LIST_INIT(&curagent->waiting_queue); } else if (!strcmp(args[0], "use-backend")) { if (!*args[1]) { @@ -3114,6 +3399,8 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px, } curagent->var_pfx = strdup(curagent->id); } + if (curagent->engine_id == NULL) + curagent->engine_id = generate_pseudo_uuid(); if (LIST_ISEMPTY(&curmps)) { Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",