MINOR: spoe: Use the min of all known max_frame_size to encode messages

The max_frame_size value is negociated between HAProxy and SPOE agents during
the HELLO handshake. It is a per-connection value. Different SPOE agents can
choose to use different max_frame_size values. So, now, we keep the minimum of
all known max_frame_size. This minimum is updated when a new connection to a
SPOE agent is opened and when a connection is closed. We use this value as a
limit to encode messages in NOTIFY frames.
This commit is contained in:
Christopher Faulet 2017-01-13 11:30:50 +01:00 committed by Willy Tarreau
parent 4596fb7056
commit 7aa0b2b0dd

View File

@ -49,8 +49,16 @@
#define SPOE_PRINTF(x...) #define SPOE_PRINTF(x...)
#endif #endif
/* Minimal size for a frame */ /* Reserved 4 bytes to the frame size. So a frame and its size can be written
#define MIN_FRAME_SIZE 256 * together in a buffer */
#define MAX_FRAME_SIZE global.tune.bufsize - 4
/* The minimum size for a frame */
#define MIN_FRAME_SIZE 256
/* Reserved for the metadata and the frame type. So <MAX_FRAME_SIZE> -
* <FRAME_HDR_SIZE> is the maximum payload size */
#define FRAME_HDR_SIZE 32
/* Flags set on the SPOE agent */ /* Flags set on the SPOE agent */
#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */ #define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
@ -219,6 +227,7 @@ struct spoe_agent {
* for each supported events */ * for each supported events */
/* running info */ /* running info */
unsigned int frame_size; /* current maximum frame size, only used to encode messages */
unsigned int applets_act; /* # of applets alive at a time */ unsigned int applets_act; /* # of applets alive at a time */
unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */ unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
unsigned int sending_rate; /* the global sending rate */ unsigned int sending_rate; /* the global sending rate */
@ -1260,7 +1269,7 @@ prepare_spoe_healthcheck_request(char **req, int *len)
{ {
struct appctx appctx; struct appctx appctx;
struct spoe_appctx spoe_appctx; struct spoe_appctx spoe_appctx;
char *frame, buf[global.tune.bufsize]; char *frame, buf[MAX_FRAME_SIZE+4];
unsigned int framesz; unsigned int framesz;
int idx; int idx;
@ -1269,13 +1278,13 @@ prepare_spoe_healthcheck_request(char **req, int *len)
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
appctx.ctx.spoe.ptr = &spoe_appctx; appctx.ctx.spoe.ptr = &spoe_appctx;
SPOE_APPCTX(&appctx)->max_frame_size = global.tune.bufsize-4; SPOE_APPCTX(&appctx)->max_frame_size = MAX_FRAME_SIZE;
frame = buf+4; frame = buf+4;
idx = prepare_spoe_hahello_frame(&appctx, frame, global.tune.bufsize-4); idx = prepare_spoe_hahello_frame(&appctx, frame, MAX_FRAME_SIZE);
if (idx <= 0) if (idx <= 0)
return -1; return -1;
if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4) if (idx + SLEN(HEALTHCHECK_KEY) + 1 > MAX_FRAME_SIZE)
return -1; return -1;
/* "healthcheck" K/V item */ /* "healthcheck" K/V item */
@ -1306,7 +1315,7 @@ handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen
memset(&spoe_appctx, 0, sizeof(spoe_appctx)); memset(&spoe_appctx, 0, sizeof(spoe_appctx));
appctx.ctx.spoe.ptr = &spoe_appctx; appctx.ctx.spoe.ptr = &spoe_appctx;
SPOE_APPCTX(&appctx)->max_frame_size = global.tune.bufsize-4; SPOE_APPCTX(&appctx)->max_frame_size = MAX_FRAME_SIZE;
if (handle_spoe_agentdiscon_frame(&appctx, frame, size) != 0) if (handle_spoe_agentdiscon_frame(&appctx, frame, size) != 0)
goto error; goto error;
@ -1418,6 +1427,7 @@ release_spoe_applet(struct appctx *appctx)
struct stream_interface *si = appctx->owner; struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent; struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
struct spoe_context *ctx, *back; struct spoe_context *ctx, *back;
struct spoe_appctx *spoe_appctx;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n", SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id, (int)now.tv_sec, (int)now.tv_usec, agent->id,
@ -1458,7 +1468,7 @@ release_spoe_applet(struct appctx *appctx)
pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx)); pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
if (!LIST_ISEMPTY(&agent->applets)) if (!LIST_ISEMPTY(&agent->applets))
return; goto end;
list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) { list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
LIST_DEL(&ctx->list); LIST_DEL(&ctx->list);
@ -1475,6 +1485,12 @@ release_spoe_applet(struct appctx *appctx)
ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100); ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
} }
end:
/* Update runtinme agent info */
agent->frame_size = agent->max_frame_size;
list_for_each_entry(spoe_appctx, &agent->applets, list)
agent->frame_size = MIN(spoe_appctx->max_frame_size, agent->frame_size);
} }
static int static int
@ -1592,6 +1608,9 @@ handle_connecting_spoe_applet(struct appctx *appctx)
appctx->st0 = SPOE_APPCTX_ST_IDLE; appctx->st0 = SPOE_APPCTX_ST_IDLE;
LIST_DEL(&SPOE_APPCTX(appctx)->list); LIST_DEL(&SPOE_APPCTX(appctx)->list);
LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list); LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
/* Update runtinme agent info */
agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size, agent->frame_size);
goto next; goto next;
} }
@ -2145,8 +2164,7 @@ process_spoe_messages(struct stream *s, struct spoe_context *ctx,
size_t max_size; size_t max_size;
int off, flag, idx = 0; int off, flag, idx = 0;
/* Reserve 32 bytes from the frame Metadata */ max_size = agent->frame_size - FRAME_HDR_SIZE;
max_size = agent->max_frame_size - 32;
p = ctx->buffer->p; p = ctx->buffer->p;
@ -3040,13 +3058,14 @@ cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
curagent->flags = 0; curagent->flags = 0;
curagent->cps_max = 0; curagent->cps_max = 0;
curagent->eps_max = 0; curagent->eps_max = 0;
curagent->max_frame_size = global.tune.bufsize - 4; curagent->max_frame_size = MAX_FRAME_SIZE;
curagent->min_applets = 0; curagent->min_applets = 0;
curagent->max_fpa = 100; curagent->max_fpa = 100;
for (i = 0; i < SPOE_EV_EVENTS; ++i) for (i = 0; i < SPOE_EV_EVENTS; ++i)
LIST_INIT(&curagent->messages[i]); LIST_INIT(&curagent->messages[i]);
curagent->frame_size = curagent->max_frame_size;
curagent->applets_act = 0; curagent->applets_act = 0;
curagent->applets_idle = 0; curagent->applets_idle = 0;
curagent->sending_rate = 0; curagent->sending_rate = 0;