MINOR: spoe: Add a generic function to encode a list of SPOE message

So it will be possible to encode messages chained by event or by group. For now,
it is only possible to do it by event.
This commit is contained in:
Christopher Faulet 2017-09-21 16:57:24 +02:00 committed by Willy Tarreau
parent c718b82dfe
commit 58d0368588
1 changed files with 48 additions and 37 deletions

View File

@ -2412,8 +2412,7 @@ spoe_decode_action_unset_var(struct stream *s, struct spoe_context *ctx,
/* Process SPOE actions for a specific event. It returns 1 on success. If an
* error occurred, 0 is returned. */
static int
spoe_process_actions(struct stream *s, struct spoe_context *ctx,
enum spoe_event ev, int dir)
spoe_process_actions(struct stream *s, struct spoe_context *ctx, int dir)
{
char *p, *end;
int ret;
@ -2452,7 +2451,7 @@ spoe_process_actions(struct stream *s, struct spoe_context *ctx,
* Functions that process SPOE events
**************************************************************************/
static inline int
spoe_start_event_processing(struct spoe_context *ctx, int dir)
spoe_start_processing(struct spoe_context *ctx, int dir)
{
/* If a process is already started for this SPOE context, retry
* later. */
@ -2468,7 +2467,7 @@ spoe_start_event_processing(struct spoe_context *ctx, int dir)
}
static inline void
spoe_stop_event_processing(struct spoe_context *ctx)
spoe_stop_processing(struct spoe_context *ctx)
{
struct spoe_appctx *sa = ctx->frag_ctx.spoe_appctx;
@ -2499,38 +2498,27 @@ spoe_stop_event_processing(struct spoe_context *ctx)
}
}
/* Process a SPOE event. First, this functions will process messages attached to
* this event and send them to an agent in a NOTIFY frame. Then, it will wait a
* ACK frame to process corresponding actions. During all the processing, it
* returns 0 and it returns 1 when the processing is finished. If an error
* occurred, -1 is returned. */
/* Process a list of SPOE messages. First, this functions will process messages
* and send them to an agent in a NOTIFY frame. Then, it will wait a ACK frame
* to process corresponding actions. During all the processing, it returns 0
* and it returns 1 when the processing is finished. If an error occurred, -1
* is returned. */
static int
spoe_process_event(struct stream *s, struct spoe_context *ctx,
enum spoe_event ev)
spoe_process_messages(struct stream *s, struct spoe_context *ctx,
struct list *messages, int dir, int type)
{
struct spoe_config *conf = FLT_CONF(ctx->filter);
struct spoe_agent *agent = conf->agent;
int dir, ret = 1;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - ctx-state=%s - event=%s\n",
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
spoe_event_str[ev]);
dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
if (LIST_ISEMPTY(&(ctx->events[ev])))
goto out;
int ret = 1;
if (ctx->state == SPOE_CTX_ST_ERROR)
goto error;
if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to process event '%s': timeout\n",
" - failed to process messages: timeout\n",
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, spoe_event_str[ev]);
agent->id, __FUNCTION__, s);
ctx->status_code = SPOE_CTX_ERR_TOUT;
goto error;
}
@ -2539,9 +2527,9 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx,
if (agent->eps_max > 0) {
if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - skip event '%s': max EPS reached\n",
" - skip processing of messages: max EPS reached\n",
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, spoe_event_str[ev]);
agent->id, __FUNCTION__, s);
goto skip;
}
}
@ -2551,7 +2539,7 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx,
s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
ctx->process_exp);
}
ret = spoe_start_event_processing(ctx, dir);
ret = spoe_start_processing(ctx, dir);
if (!ret)
goto out;
@ -2565,7 +2553,7 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx,
if (ctx->state == SPOE_CTX_ST_ENCODING_MSGS) {
if (!spoe_acquire_buffer(&ctx->buffer, &ctx->buffer_wait))
goto out;
ret = spoe_encode_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT);
ret = spoe_encode_messages(s, ctx, messages, dir, type);
if (ret < 0)
goto error;
if (!ret)
@ -2586,7 +2574,7 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx,
}
if (ctx->state == SPOE_CTX_ST_DONE) {
spoe_process_actions(s, ctx, ev, dir);
spoe_process_actions(s, ctx, dir);
ret = 1;
ctx->frame_id++;
ctx->state = SPOE_CTX_ST_READY;
@ -2612,13 +2600,12 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx,
strlen(agent->var_on_error), &smp);
}
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to create process event '%s': code=%u\n",
" - failed to process messages: code=%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, ctx->strm, spoe_event_str[ev],
ctx->status_code);
__FUNCTION__, ctx->strm, ctx->status_code);
send_log(ctx->strm->be, LOG_WARNING,
"SPOE: [%s] failed to process event '%s': code=%u\n",
agent->id, spoe_event_str[ev], ctx->status_code);
"SPOE: [%s] failed to process messages: code=%u\n",
agent->id, ctx->status_code);
ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
? SPOE_CTX_ST_READY
@ -2631,7 +2618,31 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx,
ret = 1;
end:
spoe_stop_event_processing(ctx);
spoe_stop_processing(ctx);
return ret;
}
/* Process a SPOE event, ie the list of messages attached to the event <ev>.
* See spoe_process_message for details. */
static int
spoe_process_event(struct stream *s, struct spoe_context *ctx,
enum spoe_event ev)
{
int dir, ret;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - ctx-state=%s - event=%s\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
__FUNCTION__, s, spoe_ctx_state_str[ctx->state],
spoe_event_str[ev]);
dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
if (LIST_ISEMPTY(&(ctx->events[ev])))
return 1;
ret = spoe_process_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT);
return ret;
}
@ -2715,7 +2726,7 @@ spoe_destroy_context(struct spoe_context *ctx)
if (!ctx)
return;
spoe_stop_event_processing(ctx);
spoe_stop_processing(ctx);
pool_free2(pool2_spoe_ctx, ctx);
}