MINOR: spoe: Improve implementation of the payload fragmentation

Now, when a payload is fragmented, the first frame must define the frame type
and the followings must use the special type SPOE_FRM_T_UNSET. This way, it is
easy to know if a fragment is the first one or not. Of course, all frames must
still share the same stream-id and frame-id.

Update SPOA example accordingly.
This commit is contained in:
Christopher Faulet 2017-02-17 15:18:35 +01:00 committed by Willy Tarreau
parent 4ff3e574ac
commit f032c3ec09
3 changed files with 226 additions and 113 deletions

View File

@ -401,6 +401,42 @@ check_engine_id(struct spoe_frame *frame, char **buf, char *end)
return ret;
}
static int
acc_payload(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *buf;
size_t len = frame->len - frame->offset;
int ret = frame->offset;
/* No need to accumulation payload */
if (frame->fragmented == false)
return ret;
buf = realloc(frame->frag_buf, frame->frag_len + len);
if (buf == NULL) {
client->status_code = SPOE_FRM_ERR_RES;
return -1;
}
memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
frame->frag_buf = buf;
frame->frag_len += len;
if (!(frame->flags & SPOE_FRM_FL_FIN)) {
/* Wait for next parts */
frame->buf = (char *)(frame->data);
frame->offset = 0;
frame->len = 0;
frame->flags = 0;
return 1;
}
frame->buf = frame->frag_buf;
frame->len = frame->frag_len;
frame->offset = 0;
return ret;
}
/* Check disconnect status code. It returns -1 if an error occurred, the number
* of read bytes otherwise. */
static int
@ -454,6 +490,8 @@ check_discon_message(struct spoe_frame *frame, char **buf, char *end)
return ret;
}
/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
* occurred, otherwise the number of read bytes. HELLO frame cannot be
* ignored and having another frame than a HELLO frame is an error. */
@ -664,7 +702,7 @@ handle_hanotify(struct spoe_frame *frame)
memcpy((char *)&(frame->flags), p, 4);
p += 4;
/* Fragmentation is not supported for DISCONNECT frame */
/* Fragmentation is not supported */
if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
goto error;
@ -676,44 +714,87 @@ handle_hanotify(struct spoe_frame *frame)
if (spoe_decode_varint(&p, end, &frame_id) == -1)
goto ignore;
if (frame->fragmented == true) {
if (frame->stream_id != (unsigned int)stream_id ||
frame->frame_id != (unsigned int)frame_id) {
client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
goto error;
}
frame->stream_id = (unsigned int)stream_id;
frame->frame_id = (unsigned int)frame_id;
if (frame->flags & SPOE_FRM_FL_ABRT) {
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
" - Abort processing of a fragmented frame"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
frame->frag_len, frame->len, p - frame->buf);
goto ignore;
}
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
" - %s frame received"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
(frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
frame->frag_len, frame->len, p - frame->buf);
frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
frame->offset = (p - frame->buf);
return acc_payload(frame);
ignore:
return 0;
error:
return -1;
}
/* Decode next part of a fragmented frame received from HAProxy. It returns -1
* if an error occurred, 0 if it must be must be ignored, otherwise the number
* of read bytes. */
static int
handle_hafrag(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *p, *end;
uint64_t stream_id, frame_id;
p = frame->buf;
end = frame->buf + frame->len;
/* Check frame type */
if (*p++ != SPOE_FRM_T_UNSET)
goto ignore;
DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id);
/* Fragmentation is not supported */
if (fragmentation == false) {
client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
goto error;
}
/* Retrieve flags */
memcpy((char *)&(frame->flags), p, 4);
p+= 4;
/* Read the stream-id and frame-id */
if (spoe_decode_varint(&p, end, &stream_id) == -1)
goto ignore;
if (spoe_decode_varint(&p, end, &frame_id) == -1)
goto ignore;
if (frame->fragmented == false ||
frame->stream_id != (unsigned int)stream_id ||
frame->frame_id != (unsigned int)frame_id) {
client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
goto error;
}
if (frame->flags & SPOE_FRM_FL_ABRT) {
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
" - %s fragment of a fragmented frame received"
" - Abort processing of a fragmented frame"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
(frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
frame->frag_len, frame->len, p - frame->buf);
goto ignore;
}
else {
frame->stream_id = (unsigned int)stream_id;
frame->frame_id = (unsigned int)frame_id;
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
" - %s frame received"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
(frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
frame->frag_len, frame->len, p - frame->buf);
frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
}
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
" - %s fragment of a fragmented frame received"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
(frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
frame->frag_len, frame->len, p - frame->buf);
frame->offset = (p - frame->buf);
return frame->offset;
return acc_payload(frame);
ignore:
return 0;
@ -1356,7 +1437,11 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg)
client->state = SPOA_ST_DISCONNECTING;
goto disconnecting;
}
n = handle_hanotify(frame);
if (frame->buf[0] == SPOE_FRM_T_UNSET)
n = handle_hafrag(frame);
else
n = handle_hanotify(frame);
if (n < 0) {
LOG(client->worker, "Failed to decode frame: %s",
spoe_frm_err_reasons[client->status_code]);
@ -1366,6 +1451,8 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg)
LOG(client->worker, "Ignore invalid/unknown/aborted frame");
goto ignore_frame;
}
else if (n == 1)
goto noop;
else
goto process_frame;
@ -1382,39 +1469,14 @@ read_frame_cb(evutil_socket_t fd, short events, void *arg)
goto disconnect;
}
noop:
return;
ignore_frame:
reset_frame(frame);
return;
process_frame:
if (frame->fragmented == true) {
char *buf;
size_t len = frame->len - frame->offset;
buf = realloc(frame->frag_buf, frame->frag_len + len);
if (buf == NULL) {
client->status_code = SPOE_FRM_ERR_RES;
goto disconnect;
}
memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
frame->frag_buf = buf;
frame->frag_len += len;
if (!(frame->flags & SPOE_FRM_FL_FIN)) {
/* Wait for next fragments */
frame->buf = (char *)(frame->data);
frame->offset = 0;
frame->len = 0;
frame->flags = 0;
return;
}
frame->buf = frame->frag_buf;
frame->len = frame->frag_len;
frame->offset = 0;
/* fall through */
}
process_incoming_frame(frame);
client->incoming_frame = NULL;
return;

View File

@ -294,6 +294,8 @@ struct spoe_appctx {
/* Frame Types sent by HAProxy and by agents */
enum spoe_frame_type {
SPOE_FRM_T_UNSET = 0,
/* Frames sent by HAProxy */
SPOE_FRM_T_HAPROXY_HELLO = 1,
SPOE_FRM_T_HAPROXY_DISCON,

View File

@ -495,26 +495,16 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
p = frame;
end = frame+size;
/* <ctx> is null when the stream has aborted the processing of a
* fragmented frame. In this case, we must notify the corresponding
* agent using ids stored in <frag_ctx>. */
if (ctx == NULL) {
flags |= SPOE_FRM_FL_ABRT;
stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
frame_id = SPOE_APPCTX(appctx)->frag_ctx.curfid;
}
else {
stream_id = ctx->stream_id;
frame_id = ctx->frame_id;
stream_id = ctx->stream_id;
frame_id = ctx->frame_id;
if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
/* The fragmentation is not supported by the applet */
if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
return -1;
}
flags = ctx->frag_ctx.flags;
if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
/* The fragmentation is not supported by the applet */
if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
return -1;
}
flags = ctx->frag_ctx.flags;
}
/* Set Frame type */
@ -531,10 +521,10 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
goto too_big;
/* Copy encoded messages, if possible */
sz = SPOE_APPCTX(appctx)->buffer->i;
sz = ctx->buffer->i;
if (p + sz >= end)
goto too_big;
memcpy(p, SPOE_APPCTX(appctx)->buffer->p, sz);
memcpy(p, ctx->buffer->p, sz);
p += sz;
return (p - frame);
@ -544,6 +534,66 @@ spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
return 0;
}
/* Encode next part of a fragmented frame sent by HAProxy to an agent. It
* returns the number of encoded bytes in the frame on success, 0 if an encoding
* error occurred and -1 if a fatal error occurred. */
static int
spoe_prepare_hafrag_frame(struct appctx *appctx, struct spoe_context *ctx,
char *frame, size_t size)
{
char *p, *end;
unsigned int stream_id, frame_id;
unsigned int flags;
size_t sz;
p = frame;
end = frame+size;
/* <ctx> is null when the stream has aborted the processing of a
* fragmented frame. In this case, we must notify the corresponding
* agent using ids stored in <frag_ctx>. */
if (ctx == NULL) {
flags = (SPOE_FRM_FL_FIN|SPOE_FRM_FL_ABRT);
stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
frame_id = SPOE_APPCTX(appctx)->frag_ctx.curfid;
}
else {
flags = ctx->frag_ctx.flags;
stream_id = ctx->stream_id;
frame_id = ctx->frame_id;
}
/* Set Frame type */
*p++ = SPOE_FRM_T_UNSET;
/* Set flags */
memcpy(p, (char *)&flags, 4);
p += 4;
/* Set stream-id and frame-id */
if (spoe_encode_varint(stream_id, &p, end) == -1)
goto too_big;
if (spoe_encode_varint(frame_id, &p, end) == -1)
goto too_big;
if (ctx == NULL)
goto end;
/* Copy encoded messages, if possible */
sz = ctx->buffer->i;
if (p + sz >= end)
goto too_big;
memcpy(p, ctx->buffer->p, sz);
p += sz;
end:
return (p - frame);
too_big:
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
return 0;
}
/* Decode and process the HELLO frame sent by an agent. It returns the number of
* read bytes on success, 0 if a decoding error occurred, and -1 if a fatal
* error occurred. */
@ -1150,12 +1200,13 @@ spoe_release_appctx(struct appctx *appctx)
if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
agent->applets_idle--;
si_shutw(si);
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
appctx->st0 = SPOE_APPCTX_ST_END;
if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
spoe_appctx->status_code = SPOE_FRM_ERR_IO;
si_shutw(si);
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
}
/* Destroy the task attached to this applet */
@ -1351,19 +1402,36 @@ spoe_handle_connecting_appctx(struct appctx *appctx)
return 0;
}
static int
spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx,
int *skip)
spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
{
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
struct spoe_context *ctx = NULL;
char *frame, *buf;
int ret;
/* 4 bytes are reserved at the beginning of <buf> to store the frame
* length. */
buf = trash.str; frame = buf+4;
ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
SPOE_APPCTX(appctx)->max_frame_size);
if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
SPOE_APPCTX(appctx)->max_frame_size);
}
else if (LIST_ISEMPTY(&agent->sending_queue)) {
*skip = 1;
ret = 1;
goto end;
}
else {
ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
SPOE_APPCTX(appctx)->max_frame_size);
}
if (ret > 1)
ret = spoe_send_frame(appctx, buf, ret);
@ -1376,6 +1444,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx
if (ctx == NULL)
goto abort_frag_frame;
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR;
@ -1391,6 +1460,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx
if (ctx == NULL)
goto abort_frag_frame;
spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
@ -1506,7 +1576,6 @@ spoe_handle_processing_appctx(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
struct spoe_context *ctx = NULL;
unsigned int fpa = 0;
int ret, skip_sending = 0, skip_receiving = 0;
@ -1531,39 +1600,21 @@ spoe_handle_processing_appctx(struct appctx *appctx)
skip_sending, skip_receiving,
spoe_appctx_state_str[appctx->st0]);
if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
if (fpa > agent->max_fpa)
goto stop;
else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
else if (skip_sending || appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
if (skip_receiving)
goto stop;
goto recv_frame;
}
else if (skip_sending)
goto recv_frame;
else if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
goto send_frame;
}
else if (LIST_ISEMPTY(&agent->sending_queue)) {
skip_sending = 1;
goto recv_frame;
}
ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
send_frame:
/* Transfer the buffer ownership to the SPOE appctx */
if (ctx) {
SPOE_APPCTX(appctx)->buffer = ctx->buffer;
ctx->buffer = &buf_empty;
}
ret = spoe_handle_sending_frame_appctx(appctx, ctx, &skip_sending);
/* send_frame */
ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
switch (ret) {
case -1: /* error */
goto next;
case 0: /* ignore */
spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
&SPOE_APPCTX(appctx)->buffer_wait);
agent->sending_rate++;
fpa++;
break;
@ -1572,8 +1623,6 @@ spoe_handle_processing_appctx(struct appctx *appctx)
break;
default:
spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
&SPOE_APPCTX(appctx)->buffer_wait);
agent->sending_rate++;
fpa++;
break;
@ -2571,7 +2620,7 @@ static void
spoe_reset_context(struct spoe_context *ctx)
{
ctx->state = SPOE_CTX_ST_READY;
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
}