From 344c4ab6a9b3b39aa3593ac46bd746d13ee99b9b Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Fri, 22 Sep 2017 10:20:13 +0200 Subject: [PATCH] MEDIUM: spoe/rules: Process "send-spoe-group" action The messages processing is done using existing functions. So here, the main task is to find the SPOE engine to use. To do so, we loop on all filter instances attached to the stream. For each, we check if it is a SPOE filter and, if yes, if its name is the one used to declare the "send-spoe-group" action. We also take care to return an error if the action processing is interrupted by HAProxy (because of a timeout or an error at the HAProxy level). This is done by checking if the flag ACT_FLAG_FINAL is set. The function spoe_send_group is the action_ptr callback ot --- doc/SPOE.txt | 2 + include/types/spoe.h | 1 + src/flt_spoe.c | 126 +++++++++++++++++++++++++++++++++---------- 3 files changed, 100 insertions(+), 29 deletions(-) diff --git a/doc/SPOE.txt b/doc/SPOE.txt index 7f3806887..194fa3dba 100644 --- a/doc/SPOE.txt +++ b/doc/SPOE.txt @@ -279,6 +279,8 @@ option set-on-error * 4 the fragmentation of a payload is aborted. + * 5 The frame processing has been interrupted by HAProxy. + * 255 an unknown error occurred during the event processing. * 256+N a SPOP error occurred during the event processing (see section diff --git a/include/types/spoe.h b/include/types/spoe.h index 2bfd15947..108bc980a 100644 --- a/include/types/spoe.h +++ b/include/types/spoe.h @@ -123,6 +123,7 @@ enum spoe_context_error { SPOE_CTX_ERR_RES, SPOE_CTX_ERR_TOO_BIG, SPOE_CTX_ERR_FRAG_FRAME_ABRT, + SPOE_CTX_ERR_INTERRUPT, SPOE_CTX_ERR_UNKNOWN = 255, SPOE_CTX_ERRS, }; diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 67fe403e5..51730a214 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -2498,6 +2498,37 @@ spoe_stop_processing(struct spoe_context *ctx) } } +static void +spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent, + struct spoe_context *ctx, int dir) +{ + if (agent->eps_max > 0) + update_freq_ctr(&agent->err_per_sec, 1); + + if (agent->var_on_error) { + struct sample smp; + + memset(&smp, 0, sizeof(smp)); + smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); + smp.data.u.sint = ctx->status_code; + smp.data.type = SMP_T_BOOL; + + spoe_set_var(ctx, "txn", agent->var_on_error, + strlen(agent->var_on_error), &smp); + } + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - failed to process messages: code=%u\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, s, ctx->status_code); + send_log(ctx->strm->be, LOG_WARNING, + "SPOE: [%s] failed to process messages: code=%u\n", + agent->id, ctx->status_code); + + ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR) + ? SPOE_CTX_ST_READY + : SPOE_CTX_ST_NONE); +} + /* Process a list of SPOE messages. First, this functions will process messages * and send them to an agent in a NOTIFY frame. Then, it will wait a ACK frame * to process corresponding actions. During all the processing, it returns 0 @@ -2585,31 +2616,7 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx, return ret; error: - if (agent->eps_max > 0) - update_freq_ctr(&agent->err_per_sec, 1); - - if (agent->var_on_error) { - struct sample smp; - - memset(&smp, 0, sizeof(smp)); - smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); - smp.data.u.sint = ctx->status_code; - smp.data.type = SMP_T_BOOL; - - spoe_set_var(ctx, "txn", agent->var_on_error, - strlen(agent->var_on_error), &smp); - } - SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - failed to process messages: code=%u\n", - (int)now.tv_sec, (int)now.tv_usec, agent->id, - __FUNCTION__, ctx->strm, ctx->status_code); - send_log(ctx->strm->be, LOG_WARNING, - "SPOE: [%s] failed to process messages: code=%u\n", - agent->id, ctx->status_code); - - ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR) - ? SPOE_CTX_ST_READY - : SPOE_CTX_ST_NONE); + spoe_handle_processing_error(s, agent, ctx, dir); ret = 1; goto end; @@ -2622,6 +2629,28 @@ spoe_process_messages(struct stream *s, struct spoe_context *ctx, return ret; } +/* Process a SPOE group, ie the list of messages attached to the group . + * See spoe_process_message for details. */ +static int +spoe_process_group(struct stream *s, struct spoe_context *ctx, + struct spoe_group *group, int dir) +{ + int ret; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - ctx-state=%s - Process messages for group=%s\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, + __FUNCTION__, s, spoe_ctx_state_str[ctx->state], + group->id); + + if (LIST_ISEMPTY(&group->messages)) + return 1; + + ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP); + return ret; +} + /* Process a SPOE event, ie the list of messages attached to the event . * See spoe_process_message for details. */ static int @@ -2631,7 +2660,7 @@ spoe_process_event(struct stream *s, struct spoe_context *ctx, int dir, ret; SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" - " - ctx-state=%s - event=%s\n", + " - ctx-state=%s - Process messages for event=%s\n", (int)now.tv_sec, (int)now.tv_usec, ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state], @@ -4016,7 +4045,12 @@ parse_spoe_flt(char **args, int *cur_arg, struct proxy *px, return -1; } -/* Send a SPOE group. TODO */ +/* Send message of a SPOE group. This is the action_ptr callback of a rule + * associated to a "send-spoe-group" action. + * + * It returns ACT_RET_CONT is processing is finished without error, it returns + * ACT_RET_YIELD if the action is in progress. Otherwise it returns + * ACT_RET_ERR. */ static enum act_return spoe_send_group(struct act_rule *rule, struct proxy *px, struct session *sess, struct stream *s, int flags) @@ -4037,9 +4071,43 @@ spoe_send_group(struct act_rule *rule, struct proxy *px, } if (agent == NULL || group == NULL || ctx == NULL) return ACT_RET_ERR; + if (ctx->state == SPOE_CTX_ST_NONE) + return ACT_RET_CONT; - /* TODO */ - return ACT_RET_CONT; + switch (rule->from) { + case ACT_F_TCP_REQ_SES: dir = SMP_OPT_DIR_REQ; break; + case ACT_F_TCP_REQ_CNT: dir = SMP_OPT_DIR_REQ; break; + case ACT_F_TCP_RES_CNT: dir = SMP_OPT_DIR_RES; break; + case ACT_F_HTTP_REQ: dir = SMP_OPT_DIR_REQ; break; + case ACT_F_HTTP_RES: dir = SMP_OPT_DIR_RES; break; + default: + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - internal error while execute spoe-send-group\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, s); + send_log(px, LOG_ERR, "SPOE: [%s] internal error while execute spoe-send-group\n", + agent->id); + return ACT_RET_CONT; + } + + ret = spoe_process_group(s, ctx, group, dir); + if (ret == 1) + return ACT_RET_CONT; + else if (ret == 0) { + if (flags & ACT_FLAG_FINAL) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - failed to process group '%s': interrupted by caller\n", + (int)now.tv_sec, (int)now.tv_usec, + agent->id, __FUNCTION__, s, group->id); + ctx->status_code = SPOE_CTX_ERR_INTERRUPT; + spoe_handle_processing_error(s, agent, ctx, dir); + spoe_stop_processing(ctx); + return ACT_RET_CONT; + } + return ACT_RET_YIELD; + } + else + return ACT_RET_ERR; } /* Check an "send-spoe-group" action. Here, we'll try to find the real SPOE