MEDIUM: spoe/rules: Process "send-spoe-group" action

The messages processing is done using existing functions. So here, the main task
is to find the SPOE engine to use. To do so, we loop on all filter instances
attached to the stream. For each, we check if it is a SPOE filter and, if yes,
if its name is the one used to declare the "send-spoe-group" action.

We also take care to return an error if the action processing is interrupted by
HAProxy (because of a timeout or an error at the HAProxy level). This is done by
checking if the flag ACT_FLAG_FINAL is set.

The function spoe_send_group is the action_ptr callback ot
This commit is contained in:
Christopher Faulet 2017-09-22 10:20:13 +02:00 committed by Willy Tarreau
parent 58d0368588
commit 344c4ab6a9
3 changed files with 100 additions and 29 deletions

View File

@ -279,6 +279,8 @@ option set-on-error <var name>
* 4 the fragmentation of a payload is aborted.
* 5 The frame processing has been interrupted by HAProxy.
* 255 an unknown error occurred during the event processing.
* 256+N a SPOP error occurred during the event processing (see section

View File

@ -123,6 +123,7 @@ enum spoe_context_error {
SPOE_CTX_ERR_RES,
SPOE_CTX_ERR_TOO_BIG,
SPOE_CTX_ERR_FRAG_FRAME_ABRT,
SPOE_CTX_ERR_INTERRUPT,
SPOE_CTX_ERR_UNKNOWN = 255,
SPOE_CTX_ERRS,
};

View File

@ -2498,6 +2498,37 @@ spoe_stop_processing(struct spoe_context *ctx)
}
}
static void
spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
struct spoe_context *ctx, int dir)
{
if (agent->eps_max > 0)
update_freq_ctr(&agent->err_per_sec, 1);
if (agent->var_on_error) {
struct sample smp;
memset(&smp, 0, sizeof(smp));
smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
smp.data.u.sint = ctx->status_code;
smp.data.type = SMP_T_BOOL;
spoe_set_var(ctx, "txn", agent->var_on_error,
strlen(agent->var_on_error), &smp);
}
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to process messages: code=%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, s, ctx->status_code);
send_log(ctx->strm->be, LOG_WARNING,
"SPOE: [%s] failed to process messages: code=%u\n",
agent->id, ctx->status_code);
ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
? SPOE_CTX_ST_READY
: SPOE_CTX_ST_NONE);
}
/* Process a list of SPOE messages. First, this functions will process messages
* and send them to an agent in a NOTIFY frame. Then, it will wait a ACK frame
* to process corresponding actions. During all the processing, it returns 0
@ -2585,31 +2616,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
return ret;
error:
if (agent->eps_max > 0)
update_freq_ctr(&agent->err_per_sec, 1);
if (agent->var_on_error) {
struct sample smp;
memset(&smp, 0, sizeof(smp));
smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
smp.data.u.sint = ctx->status_code;
smp.data.type = SMP_T_BOOL;
spoe_set_var(ctx, "txn", agent->var_on_error,
strlen(agent->var_on_error), &smp);
}
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to process messages: code=%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, ctx->strm, ctx->status_code);
send_log(ctx->strm->be, LOG_WARNING,
"SPOE: [%s] failed to process messages: code=%u\n",
agent->id, ctx->status_code);
ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
? SPOE_CTX_ST_READY
: SPOE_CTX_ST_NONE);
spoe_handle_processing_error(s, agent, ctx, dir);
ret = 1;
goto end;
@ -2622,6 +2629,28 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx,
return ret;
}
/* Process a SPOE group, ie the list of messages attached to the group <grp>.
* See spoe_process_message for details. */
static int
spoe_process_group(struct stream *s, struct spoe_context *ctx,
struct spoe_group *group, int dir)
{
int ret;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - ctx-state=%s - Process messages for group=%s\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
__FUNCTION__, s, spoe_ctx_state_str[ctx->state],
group->id);
if (LIST_ISEMPTY(&group->messages))
return 1;
ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP);
return ret;
}
/* Process a SPOE event, ie the list of messages attached to the event <ev>.
* See spoe_process_message for details. */
static int
@ -2631,7 +2660,7 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx,
int dir, ret;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - ctx-state=%s - event=%s\n",
" - ctx-state=%s - Process messages for event=%s\n",
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
__FUNCTION__, s, spoe_ctx_state_str[ctx->state],
@ -4016,7 +4045,12 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
return -1;
}
/* Send a SPOE group. TODO */
/* Send message of a SPOE group. This is the action_ptr callback of a rule
* associated to a "send-spoe-group" action.
*
* It returns ACT_RET_CONT is processing is finished without error, it returns
* ACT_RET_YIELD if the action is in progress. Otherwise it returns
* ACT_RET_ERR. */
static enum act_return
spoe_send_group(struct act_rule *rule, struct proxy *px,
struct session *sess, struct stream *s, int flags)
@ -4037,9 +4071,43 @@ spoe_send_group(struct act_rule *rule, struct proxy *px,
}
if (agent == NULL || group == NULL || ctx == NULL)
return ACT_RET_ERR;
if (ctx->state == SPOE_CTX_ST_NONE)
return ACT_RET_CONT;
/* TODO */
return ACT_RET_CONT;
switch (rule->from) {
case ACT_F_TCP_REQ_SES: dir = SMP_OPT_DIR_REQ; break;
case ACT_F_TCP_REQ_CNT: dir = SMP_OPT_DIR_REQ; break;
case ACT_F_TCP_RES_CNT: dir = SMP_OPT_DIR_RES; break;
case ACT_F_HTTP_REQ: dir = SMP_OPT_DIR_REQ; break;
case ACT_F_HTTP_RES: dir = SMP_OPT_DIR_RES; break;
default:
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - internal error while execute spoe-send-group\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, s);
send_log(px, LOG_ERR, "SPOE: [%s] internal error while execute spoe-send-group\n",
agent->id);
return ACT_RET_CONT;
}
ret = spoe_process_group(s, ctx, group, dir);
if (ret == 1)
return ACT_RET_CONT;
else if (ret == 0) {
if (flags & ACT_FLAG_FINAL) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to process group '%s': interrupted by caller\n",
(int)now.tv_sec, (int)now.tv_usec,
agent->id, __FUNCTION__, s, group->id);
ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
spoe_handle_processing_error(s, agent, ctx, dir);
spoe_stop_processing(ctx);
return ACT_RET_CONT;
}
return ACT_RET_YIELD;
}
else
return ACT_RET_ERR;
}
/* Check an "send-spoe-group" action. Here, we'll try to find the real SPOE