MEDIUM: filters: Adapt filters API to allow again TCP filtering on HTX streams

This change make the payload filtering uniform between TCP and HTTP
filters. Now, in TCP, like in HTTP, there is only one callback responsible to
forward data. Thus, old callbacks, tcp_data() and tcp_forward_data(), are
replaced by a single callback function, tcp_payload(). This new callback gets
the offset in the payload to (re)start the filtering and the maximum amount of
data it can forward. It is the filter's responsibility to be compatible with HTX
streams. If not, it must not set the flag FLT_CFG_FL_HTX.

Because of this change, nxt and fwd offsets are no longer needed. Thus they are
removed from the filter structure with their update functions,
flt_change_next_size() and flt_change_forward_size(). Moreover, the trace filter
has been updated accordingly.

This patch breaks the compatibility with the old API. Thus it should probably
not be backported. But, AFAIK, there is no TCP filter, thus the breakage is very
limited.
This commit is contained in:
Christopher Faulet 2019-11-12 11:13:01 +01:00
parent bb9a7e04bd
commit b2e58492b1
4 changed files with 112 additions and 245 deletions

View File

@ -45,13 +45,6 @@ extern const char *fcgi_flt_id;
#define FLT_STRM_OFF(s, chn) (strm_flt(s)->offset[CHN_IDX(chn)])
#define FLT_OFF(flt, chn) ((flt)->offset[CHN_IDX(chn)])
#define FLT_NXT(flt, chn) ((flt)->next[CHN_IDX(chn)])
#define FLT_FWD(flt, chn) ((flt)->fwd[CHN_IDX(chn)])
#define flt_req_nxt(flt) ((flt)->next[0])
#define flt_rsp_nxt(flt) ((flt)->next[1])
#define flt_req_fwd(flt) ((flt)->fwd[0])
#define flt_rsp_fwd(flt) ((flt)->fwd[1])
#define HAS_FILTERS(strm) ((strm)->strm_flt.flags & STRM_FLT_FL_HAS_FILTERS)
#define HAS_REQ_DATA_FILTERS(strm) ((strm)->strm_flt.nb_req_data_filters != 0)
@ -174,58 +167,6 @@ unregister_data_filter(struct stream *s, struct channel *chn, struct filter *fil
}
}
/* This function must be called when a filter alter incoming data. It updates
* next offset value of all filter's predecessors. Do not call this function
* when a filter change the size of incomding data leads to an undefined
* behavior.
*
* This is the filter's responsiblitiy to update data itself. For now, it is
* unclear to know how to handle data updates, so we do the minimum here. For
* example, if you filter an HTTP message, we must update msg->next and
* msg->chunk_len values.
*/
static inline void
flt_change_next_size(struct filter *filter, struct channel *chn, int len)
{
struct stream *s = chn_strm(chn);
struct filter *f;
list_for_each_entry(f, &strm_flt(s)->filters, list) {
if (f == filter)
break;
if (IS_DATA_FILTER(filter, chn))
FLT_NXT(f, chn) += len;
}
}
/* This function must be called when a filter alter forwarded data. It updates
* offset values (next and forward) of all filters. Do not call this function
* when a filter change the size of forwarded data leads to an undefined
* behavior.
*
* This is the filter's responsiblitiy to update data itself. For now, it is
* unclear to know how to handle data updates, so we do the minimum here. For
* example, if you filter an HTTP message, we must update msg->next and
* msg->chunk_len values.
*/
static inline void
flt_change_forward_size(struct filter *filter, struct channel *chn, int len)
{
struct stream *s = chn_strm(chn);
struct filter *f;
int before = 1;
list_for_each_entry(f, &strm_flt(s)->filters, list) {
if (f == filter)
before = 0;
if (IS_DATA_FILTER(filter, chn)) {
if (before)
FLT_FWD(f, chn) += len;
FLT_NXT(f, chn) += len;
}
}
}
/* This function must be called when a filter alter payload data. It updates
* offsets of all previous filters and the offset of the stream. Do not call
* this function when a filter change the size of payload data leads to an

View File

@ -138,12 +138,11 @@ struct flt_kw_list {
* to the client (mainly, when an error or a redirect
* occur).
* Returns nothing.
* - tcp_data : Called when unparsed data are available.
*
*
* - tcp_payload : Called when some data can be consumed.
* Returns a negative value if an error occurs, else
* the number of consumed bytes.
* - tcp_forward_data : Called when some data can be consumed.
* Returns a negative value if an error occurs, else
* or the number of forwarded bytes.
* the number of forwarded bytes.
*/
struct flt_ops {
/*
@ -186,9 +185,8 @@ struct flt_ops {
/*
* TCP callbacks
*/
int (*tcp_data) (struct stream *s, struct filter *f, struct channel *chn);
int (*tcp_forward_data)(struct stream *s, struct filter *f, struct channel *chn,
unsigned int len);
int (*tcp_payload) (struct stream *s, struct filter *f, struct channel *chn,
unsigned int offset, unsigned int len);
};
/* Flags set on a filter config */
@ -227,11 +225,8 @@ struct filter {
struct flt_conf *config; /* the filter's configuration */
void *ctx; /* The filter context (opaque) */
unsigned short flags; /* FLT_FL_* */
unsigned int next[2]; /* Offset, relative to buf->p, to the next byte to parse for a specific channel
unsigned long long offset[2]; /* Offset of input data already filtered for a specific channel
* 0: request channel, 1: response channel */
unsigned int fwd[2]; /* Offset, relative to buf->p, to the next byte to forward for a specific channel
* 0: request channel, 1: response channel */
unsigned long long offset[2];
unsigned int pre_analyzers; /* bit field indicating analyzers to pre-process */
unsigned int post_analyzers; /* bit field indicating analyzers to post-process */
struct list list; /* Next filter for the same proxy/stream */

View File

@ -670,9 +670,7 @@ flt_start_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
continue;
}
FLT_NXT(filter, chn) = 0;
FLT_FWD(filter, chn) = 0;
FLT_OFF(filter, chn) = 0;
if (FLT_OPS(filter)->channel_start_analyze) {
DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_FLT_ANA, s);
ret = FLT_OPS(filter)->channel_start_analyze(s, filter, chn);
@ -799,8 +797,7 @@ flt_end_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
goto sync;
RESUME_FILTER_LOOP(s, chn) {
FLT_NXT(filter, chn) = 0;
FLT_FWD(filter, chn) = 0;
FLT_OFF(filter, chn) = 0;
unregister_data_filter(s, chn, filter);
if (FLT_OPS(filter)->channel_end_analyze) {
@ -850,118 +847,42 @@ flt_end_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
/*
* Calls 'tcp_data' callback for all "data" filters attached to a stream. This
* function is called when incoming data are available. It takes care to update
* the next offset of filters and adjusts available data to be sure that a
* filter cannot parse more data than its predecessors. A filter can choose to
* not consume all available data. Returns -1 if an error occurs, the number of
* consumed bytes otherwise.
* Calls 'tcp_payload' callback for all "data" filters attached to a
* stream. This function is called when some data can be forwarded in the
* AN_REQ_FLT_XFER_BODY and AN_RES_FLT_XFER_BODY analyzers. It takes care to
* update the filters and the stream offset to be sure that a filter cannot
* forward more data than its predecessors. A filter can choose to not forward
* all data. Returns a negative value if an error occurs, else the number of
* forwarded bytes.
*/
static int
flt_data(struct stream *s, struct channel *chn)
int
flt_tcp_payload(struct stream *s, struct channel *chn, unsigned int len)
{
struct filter *filter;
unsigned int buf_i;
int delta = 0, ret = 0;
/* Save buffer state */
buf_i = ci_data(chn);
unsigned long long *strm_off = &FLT_STRM_OFF(s, chn);
unsigned int out = co_data(chn);
int ret = len - out;
DBG_TRACE_ENTER(STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
list_for_each_entry(filter, &strm_flt(s)->filters, list) {
unsigned int *nxt;
/* Call "data" filters only */
if (!IS_DATA_FILTER(filter, chn))
continue;
nxt = &FLT_NXT(filter, chn);
if (FLT_OPS(filter)->tcp_data) {
unsigned int i = ci_data(chn);
if (FLT_OPS(filter)->tcp_payload) {
unsigned long long *flt_off = &FLT_OFF(filter, chn);
unsigned int offset = *flt_off - *strm_off;
DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
ret = FLT_OPS(filter)->tcp_data(s, filter, chn);
if (ret < 0)
break;
delta += (int)(ci_data(chn) - i);
/* Increase next offset of the current filter */
*nxt += ret;
/* And set this value as the bound for the next
* filter. It will not able to parse more data than the
* current one. */
b_set_data(&chn->buf, co_data(chn) + *nxt);
}
else {
/* Consume all available data */
*nxt = ci_data(chn);
}
/* Update <ret> value to be sure to have the last one when we
* exit from the loop. This value will be used to know how much
* data are "forwardable" */
ret = *nxt;
}
/* Restore the original buffer state */
b_set_data(&chn->buf, co_data(chn) + buf_i + delta);
return ret;
}
/*
* Calls 'tcp_forward_data' callback for all "data" filters attached to a
* stream. This function is called when some data can be forwarded. It takes
* care to update the forward offset of filters and adjusts "forwardable" data
* to be sure that a filter cannot forward more data than its predecessors. A
* filter can choose to not forward all parsed data. Returns a negative value if
* an error occurs, else the number of forwarded bytes.
*/
static int
flt_forward_data(struct stream *s, struct channel *chn, unsigned int len)
{
struct filter *filter;
int ret = len;
list_for_each_entry(filter, &strm_flt(s)->filters, list) {
unsigned int *fwd;
/* Call "data" filters only */
if (!IS_DATA_FILTER(filter, chn))
continue;
fwd = &FLT_FWD(filter, chn);
if (FLT_OPS(filter)->tcp_forward_data) {
/* Remove bytes that the current filter considered as
* forwarded */
DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
ret = FLT_OPS(filter)->tcp_forward_data(s, filter, chn, ret - *fwd);
ret = FLT_OPS(filter)->tcp_payload(s, filter, chn, out + offset, ret - offset);
if (ret < 0)
goto end;
*flt_off += ret;
ret += offset;
}
/* Adjust bytes that the current filter considers as
* forwarded */
*fwd += ret;
/* And set this value as the bound for the next filter. It will
* not able to forward more data than the current one. */
ret = *fwd;
}
if (!ret)
goto end;
/* Finally, adjust filters offsets by removing data that HAProxy will
* forward. */
list_for_each_entry(filter, &strm_flt(s)->filters, list) {
if (!IS_DATA_FILTER(filter, chn))
continue;
FLT_NXT(filter, chn) -= ret;
FLT_FWD(filter, chn) -= ret;
}
*strm_off += ret;
end:
DBG_TRACE_LEAVE(STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
return ret;
}
@ -976,12 +897,13 @@ flt_forward_data(struct stream *s, struct channel *chn, unsigned int len)
int
flt_xfer_data(struct stream *s, struct channel *chn, unsigned int an_bit)
{
unsigned int len;
int ret = 1;
DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
/* If there is no "data" filters, we do nothing */
if (!HAS_DATA_FILTERS(s, chn) || (s->flags & SF_HTX))
if (!HAS_DATA_FILTERS(s, chn))
goto end;
/* Be sure that the output is still opened. Else we stop the data
@ -990,26 +912,30 @@ flt_xfer_data(struct stream *s, struct channel *chn, unsigned int an_bit)
((chn->flags & CF_SHUTW) && (chn->to_forward || co_data(chn))))
goto end;
/* Let all "data" filters parsing incoming data */
ret = flt_data(s, chn);
if (s->flags & SF_HTX) {
struct htx *htx = htxbuf(&chn->buf);
len = htx->data;
}
else
len = c_data(chn);
ret = flt_tcp_payload(s, chn, len);
if (ret < 0)
goto end;
/* And forward them */
ret = flt_forward_data(s, chn, ret);
if (ret < 0)
goto end;
/* Consume data that all filters consider as forwarded. */
c_adv(chn, ret);
/* Stop waiting data if the input in closed and no data is pending or if
* the output is closed. */
if ((chn->flags & CF_SHUTW) ||
((chn->flags & CF_SHUTR) && !ci_data(chn))) {
if (chn->flags & CF_SHUTW) {
ret = 1;
goto end;
}
if (chn->flags & CF_SHUTR) {
if (((s->flags & SF_HTX) && htx_is_empty(htxbuf(&chn->buf))) || c_empty(chn)) {
ret = 1;
goto end;
}
}
/* Wait for data */
DBG_TRACE_DEVEL("waiting for more data", STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);

View File

@ -110,17 +110,17 @@ trace_hexdump(struct ist ist)
}
static void
trace_raw_hexdump(struct buffer *buf, int len, int out)
trace_raw_hexdump(struct buffer *buf, unsigned int offset, unsigned int len)
{
unsigned char p[len];
int block1, block2;
block1 = len;
if (block1 > b_contig_data(buf, out))
block1 = b_contig_data(buf, out);
if (block1 > b_contig_data(buf, offset))
block1 = b_contig_data(buf, offset);
block2 = len - block1;
memcpy(p, b_head(buf), block1);
memcpy(p, b_peek(buf, offset), block1);
memcpy(p+block1, b_orig(buf), block2);
trace_hexdump(ist2(p, len));
}
@ -153,6 +153,31 @@ trace_htx_hexdump(struct htx *htx, unsigned int offset, unsigned int len)
}
}
static unsigned int
trace_get_htx_datalen(struct htx *htx, unsigned int offset, unsigned int len)
{
struct htx_blk *blk;
uint32_t sz, data = 0;
for (blk = htx_get_first_blk(htx); blk; blk = htx_get_next_blk(htx, blk)) {
if (htx_get_blk_type(blk) != HTX_BLK_DATA)
break;
sz = htx_get_blksz(blk);
if (offset >= sz) {
offset -= sz;
continue;
}
data += sz - offset;
offset = 0;
if (data > len) {
data = len;
break;
}
}
return data;
}
/***************************************************************************
* Hooks that manage the filter lifecycle (init/check/deinit)
**************************************************************************/
@ -441,28 +466,9 @@ trace_http_payload(struct stream *s, struct filter *filter, struct http_msg *msg
int ret = len;
if (ret && conf->rand_forwarding) {
struct htx *htx = htxbuf(&msg->chn->buf);
struct htx_blk *blk;
uint32_t sz, data = 0;
unsigned int off = offset;
unsigned int data = trace_get_htx_datalen(htxbuf(&msg->chn->buf), offset, len);
for (blk = htx_get_first_blk(htx); blk; blk = htx_get_next_blk(htx, blk)) {
if (htx_get_blk_type(blk) != HTX_BLK_DATA)
break;
sz = htx_get_blksz(blk);
if (off >= sz) {
off -= sz;
continue;
}
data += sz - off;
off = 0;
if (data > len) {
data = len;
break;
}
}
if (data) {
if (data) {
ret = random() % (ret+1);
if (!ret || ret >= data)
ret = len;
@ -476,7 +482,7 @@ trace_http_payload(struct stream *s, struct filter *filter, struct http_msg *msg
offset, len, ret);
if (conf->hexdump)
trace_htx_hexdump(htxbuf(&msg->chn->buf), offset, len);
trace_htx_hexdump(htxbuf(&msg->chn->buf), offset, ret);
if (ret != len)
task_wakeup(s->task, TASK_WOKEN_MSG);
@ -520,51 +526,51 @@ trace_http_reply(struct stream *s, struct filter *filter, short status,
* Hooks to filter TCP data
*************************************************************************/
static int
trace_tcp_data(struct stream *s, struct filter *filter, struct channel *chn)
trace_tcp_payload(struct stream *s, struct filter *filter, struct channel *chn,
unsigned int offset, unsigned int len)
{
struct trace_config *conf = FLT_CONF(filter);
int avail = ci_data(chn) - FLT_NXT(filter, chn);
int ret = avail;
int ret = len;
if (ret && conf->rand_parsing)
ret = random() % (ret+1);
if (s->flags & SF_HTX) {
if (ret && conf->rand_forwarding) {
unsigned int data = trace_get_htx_datalen(htxbuf(&chn->buf), offset, len);
FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - next=%u - avail=%u - consume=%d",
__FUNCTION__,
channel_label(chn), proxy_mode(s), stream_pos(s),
FLT_NXT(filter, chn), avail, ret);
if (data) {
ret = random() % (ret+1);
if (!ret || ret >= data)
ret = len;
}
}
if (ret != avail)
task_wakeup(s->task, TASK_WOKEN_MSG);
return ret;
}
FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - "
"offset=%u - len=%u - forward=%d",
__FUNCTION__,
channel_label(chn), proxy_mode(s), stream_pos(s),
offset, len, ret);
static int
trace_tcp_forward_data(struct stream *s, struct filter *filter, struct channel *chn,
unsigned int len)
{
struct trace_config *conf = FLT_CONF(filter);
int ret = len;
if (conf->hexdump)
trace_htx_hexdump(htxbuf(&chn->buf), offset, ret);
}
else {
if (ret && conf->rand_forwarding)
ret = random() % (ret+1);
if (ret && conf->rand_forwarding)
ret = random() % (ret+1);
FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - len=%u - fwd=%u - forward=%d",
__FUNCTION__,
channel_label(chn), proxy_mode(s), stream_pos(s), len,
FLT_FWD(filter, chn), ret);
FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - "
"offset=%u - len=%u - forward=%d",
__FUNCTION__,
channel_label(chn), proxy_mode(s), stream_pos(s),
offset, len, ret);
if (conf->hexdump) {
c_adv(chn, FLT_FWD(filter, chn));
trace_raw_hexdump(&chn->buf, ret, co_data(chn));
c_rew(chn, FLT_FWD(filter, chn));
if (conf->hexdump)
trace_raw_hexdump(&chn->buf, offset, ret);
}
if (ret != len)
task_wakeup(s->task, TASK_WOKEN_MSG);
if (ret != len)
task_wakeup(s->task, TASK_WOKEN_MSG);
return ret;
}
/********************************************************************
* Functions that manage the filter initialization
********************************************************************/
@ -598,8 +604,7 @@ struct flt_ops trace_ops = {
.http_reply = trace_http_reply,
/* Filter TCP data */
.tcp_data = trace_tcp_data,
.tcp_forward_data = trace_tcp_forward_data,
.tcp_payload = trace_tcp_payload,
};
/* Return -1 on error, else 0 */