MINOR: stream: Save the entity waiting to continue its processing

When a rule or a filter yields because it waits for something to be able to
continue its processing, this entity is saved in the stream. If an error or
a timeout occurred, info on this entity may be retrieved via the
"waiting_entity" sample fetch, for instance to dump it in the logs. This
info may be useful to found root cause of some bugs because it is a way to
know the processing was temporarily stopped. This may explain timeouts for
instance.

The sample fetch is not documented yet.
This commit is contained in:
Christopher Faulet 2024-10-31 11:34:54 +01:00
parent 53de6da1c0
commit 537f20eb3e
5 changed files with 82 additions and 3 deletions

View File

@ -295,6 +295,11 @@ struct stream {
int type; /* entity type (0: undef, 1: rule, 2: filter) */
} last_entity; /* last evaluated entity that interrupted processing */
struct {
void *ptr; /* Pointer on the entity (def: NULL) */
int type; /* entity type (0: undef, 1: rule, 2: filter) */
} waiting_entity; /* The entity waiting to continue its processing and interrupted by an error/timeout */
unsigned int stream_epoch; /* copy of stream_epoch when the stream was created */
struct hlua *hlua[2]; /* lua runtime context (0: global, 1: per-thread) */

View File

@ -57,9 +57,14 @@ static int handle_analyzer_result(struct stream *s, struct channel *chn, unsigne
do { \
struct filter *filter; \
\
if (strm_flt(strm)->current[CHN_IDX(chn)]) { \
if (strm_flt(strm)->current[CHN_IDX(chn)]) { \
filter = strm_flt(strm)->current[CHN_IDX(chn)]; \
strm_flt(strm)->current[CHN_IDX(chn)] = NULL; \
strm_flt(strm)->current[CHN_IDX(chn)] = NULL; \
if (!(chn_prod(chn)->flags & SC_FL_ERROR) && \
!(chn->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { \
(strm)->waiting_entity.type = 0; \
(strm)->waiting_entity.ptr = NULL; \
} \
goto resume_execution; \
} \
\
@ -72,7 +77,11 @@ static int handle_analyzer_result(struct stream *s, struct channel *chn, unsigne
#define BREAK_EXECUTION(strm, chn, label) \
do { \
if (ret < 0) { \
if (ret == 0) { \
s->waiting_entity.type = 2; \
s->waiting_entity.ptr = filter; \
} \
else if (ret < 0) { \
(strm)->last_entity.type = 2; \
(strm)->last_entity.ptr = filter; \
} \

View File

@ -2734,6 +2734,11 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis
(px->options & PR_O_ABRT_CLOSE)))
act_opts |= ACT_OPT_FINAL;
if (!(s->scf->flags & SC_FL_ERROR) & !(s->req.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) {
s->waiting_entity.type = 0;
s->waiting_entity.ptr = NULL;
}
switch (rule->action_ptr(rule, px, sess, s, act_opts)) {
case ACT_RET_CONT:
break;
@ -2753,6 +2758,8 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis
rule_ret = HTTP_RULE_RES_ERROR;
goto end;
}
s->waiting_entity.type = 1;
s->waiting_entity.ptr = rule;
rule_ret = HTTP_RULE_RES_YIELD;
goto end;
case ACT_RET_ERR:
@ -2908,6 +2915,11 @@ resume_execution:
(px->options & PR_O_ABRT_CLOSE)))
act_opts |= ACT_OPT_FINAL;
if (!(s->scb->flags & SC_FL_ERROR) & !(s->res.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) {
s->waiting_entity.type = 0;
s->waiting_entity.ptr = NULL;
}
switch (rule->action_ptr(rule, px, sess, s, act_opts)) {
case ACT_RET_CONT:
break;
@ -2927,6 +2939,8 @@ resume_execution:
rule_ret = HTTP_RULE_RES_ERROR;
goto end;
}
s->waiting_entity.type = 1;
s->waiting_entity.ptr = rule;
rule_ret = HTTP_RULE_RES_YIELD;
goto end;
case ACT_RET_ERR:

View File

@ -391,6 +391,8 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer
s->rules_exp = TICK_ETERNITY;
s->last_entity.type = 0;
s->last_entity.ptr = NULL;
s->waiting_entity.type = 0;
s->waiting_entity.ptr = NULL;
s->stkctr = NULL;
if (pool_head_stk_ctr) {
@ -4158,6 +4160,41 @@ static int smp_fetch_last_entity(const struct arg *args, struct sample *smp, con
return 1;
}
static int smp_fetch_waiting_entity(const struct arg *args, struct sample *smp, const char *km, void *private)
{
smp->flags = SMP_F_VOL_TXN;
smp->data.type = SMP_T_STR;
if (!smp->strm)
return 0;
if (smp->strm->waiting_entity.type == 1) {
struct act_rule *rule = smp->strm->waiting_entity.ptr;
struct buffer *trash = get_trash_chunk();
trash->data = snprintf(trash->area, trash->size, "%s:%d", rule->conf.file, rule->conf.line);
smp->data.u.str = *trash;
}
else if (smp->strm->waiting_entity.type == 2) {
struct filter *filter = smp->strm->waiting_entity.ptr;
if (FLT_ID(filter)) {
smp->flags |= SMP_F_CONST;
smp->data.u.str.area = (char *)FLT_ID(filter);
smp->data.u.str.data = strlen(FLT_ID(filter));
}
else {
struct buffer *trash = get_trash_chunk();
trash->data = snprintf(trash->area, trash->size, "%p", filter->config);
smp->data.u.str = *trash;
}
}
else
return 0;
return 1;
}
static int smp_fetch_sess_term_state(const struct arg *args, struct sample *smp, const char *km, void *private)
{
struct buffer *trash = get_trash_chunk();
@ -4226,6 +4263,7 @@ static struct sample_fetch_kw_list smp_kws = {ILH, {
{ "txn.id32", smp_fetch_id32, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, },
{ "txn.redispatched", smp_fetch_redispatched, 0, NULL, SMP_T_BOOL, SMP_USE_L4SRV, },
{ "txn.sess_term_state",smp_fetch_sess_term_state, 0, NULL, SMP_T_STR, SMP_USE_INTRN, },
{ "waiting_entity", smp_fetch_waiting_entity, 0, NULL, SMP_T_STR, SMP_USE_INTRN, },
{ NULL, NULL, 0, 0, 0 },
}};

View File

@ -137,6 +137,10 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit)
if (s->current_rule) {
rule = s->current_rule;
s->current_rule = NULL;
if (!(req->flags & SC_FL_ERROR) && !(req->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) {
s->waiting_entity.type = 0;
s->waiting_entity.ptr = NULL;
}
if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules)
goto resume_execution;
}
@ -179,6 +183,8 @@ resume_execution:
s->last_entity.ptr = rule;
goto internal;
}
s->waiting_entity.type = 1;
s->waiting_entity.ptr = rule;
goto missing_data;
case ACT_RET_DENY:
s->last_entity.type = 1;
@ -321,6 +327,10 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit)
if (s->current_rule) {
rule = s->current_rule;
s->current_rule = NULL;
if (!(rep->flags & SC_FL_ERROR) && !(rep->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) {
s->waiting_entity.type = 0;
s->waiting_entity.ptr = NULL;
}
if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules)
goto resume_execution;
}
@ -343,6 +353,7 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit)
if (ret) {
act_opts |= ACT_OPT_FIRST;
resume_execution:
/* Always call the action function if defined */
if (rule->action_ptr) {
switch (rule->action_ptr(rule, s->be, s->sess, s, act_opts)) {
@ -363,6 +374,8 @@ resume_execution:
s->last_entity.ptr = rule;
goto internal;
}
s->waiting_entity.type = 1;
s->waiting_entity.ptr = rule;
channel_dont_close(rep);
goto missing_data;
case ACT_RET_DENY: