MAJOR: filters/http: Rewrite the HTTP compression as a filter

HTTP compression has been rewritten to use the filter API. This is more a PoC
than other thing for now. It allocates memory to work. So, if only for that, it
should be rewritten.

In the mean time, the implementation has been refactored to allow its use with
other filters. However, there are limitations that should be respected:

  - No filter placed after the compression one is allowed to change input data
    (in 'http_data' callback).
  - No filter placed before the compression one is allowed to change forwarded
    data (in 'http_forward_data' callback).

For now, these limitations are informal, so you should be careful when you use
several filters.

About the configuration, 'compression' keywords are still supported and must be
used to configure the HTTP compression behavior. In absence of a 'filter' line
for the compression filter, it is added in the filter chain when the first
compression' line is parsed. This is an easy way to do when you do not use other
filters. But another filter exists, an error is reported so that the user must
explicitly declare the filter.

For example:

  listen tst
      ...
      compression algo gzip
      compression offload
      ...
      filter flt_1
      filter compression
      filter flt_2
      ...
This commit is contained in:
Christopher Faulet 2015-11-05 13:35:03 +01:00 committed by Willy Tarreau
parent 3d97c90974
commit 92d3638d2d
10 changed files with 665 additions and 191 deletions

View File

@ -43,6 +43,9 @@ int flt_check(struct proxy *p);
int flt_stream_start(struct stream *s);
void flt_stream_stop(struct stream *s);
int flt_set_stream_backend(struct stream *s, struct proxy *be);
int flt_stream_init(struct stream *s);
void flt_stream_release(struct stream *s, int only_backend);
int flt_http_headers(struct stream *s, struct http_msg *msg);
int flt_http_start_chunk(struct stream *s, struct http_msg *msg);

View File

@ -21,18 +21,9 @@
#ifndef _PROTO_FLT_HTTP_COMP_H
#define _PROTO_FLT_HTTP_COMP_H
/* NOTE: This is a temporary header file. It will be removed when the
* compression filter will added */
#include <types/proxy.h>
#include <common/buffer.h>
#include <types/stream.h>
int select_compression_request_header(struct stream *s, struct buffer *req);
int select_compression_response_header(struct stream *s, struct buffer *res);
int http_compression_buffer_init(struct stream *s, struct buffer *in, struct buffer *out);
int http_compression_buffer_add_data(struct stream *s, struct buffer *in, struct buffer *out);
int http_compression_buffer_end(struct stream *s, struct buffer **in, struct buffer **out, int end);
int check_legacy_http_comp_flt(struct proxy *proxy);
#endif /* _PROTO_FLT_HTTP_COMP_H */
#endif // _PROTO_FLT_HTTP_COMP_H

View File

@ -32,6 +32,8 @@
#include <zlib.h>
#endif
#include <common/buffer.h>
struct comp {
struct comp_algo *algos;
struct comp_type *types;

View File

@ -32,7 +32,6 @@
#include <common/mini-clist.h>
#include <types/channel.h>
#include <types/compression.h>
#include <types/filters.h>
#include <types/hlua.h>
#include <types/obj_type.h>
@ -90,8 +89,7 @@
#define SF_IGNORE_PRST 0x00080000 /* ignore persistence */
#define SF_COMP_READY 0x00100000 /* the compression is initialized */
#define SF_SRV_REUSED 0x00200000 /* the server-side connection was reused */
#define SF_SRV_REUSED 0x00100000 /* the server-side connection was reused */
/* some external definitions */
struct strm_logs {
@ -158,8 +156,7 @@ struct stream {
void (*do_log)(struct stream *s); /* the function to call in order to log (or NULL) */
void (*srv_error)(struct stream *s, /* the function to call upon unrecoverable server errors (or NULL) */
struct stream_interface *si);
struct comp_ctx *comp_ctx; /* HTTP compression context */
struct comp_algo *comp_algo; /* HTTP compression algorithm if not NULL */
char *unique_id; /* custom unique ID */
/* These two pointers are used to resume the execution of the rule lists. */

View File

@ -24,6 +24,7 @@
#include <proto/compression.h>
#include <proto/filters.h>
#include <proto/flt_http_comp.h>
#include <proto/proto_http.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
@ -259,6 +260,7 @@ flt_check(struct proxy *proxy)
if (filter->ops->check)
err += filter->ops->check(proxy, filter);
}
err += check_legacy_http_comp_flt(proxy);
return err;
}
@ -279,6 +281,60 @@ flt_deinit(struct proxy *proxy)
}
}
/* Attaches a filter to a stream. Returns -1 if an error occurs, 0 otherwise. */
static int
flt_stream_add_filter(struct stream *s, struct filter *filter,
int is_backend)
{
struct filter *f = pool_alloc2(pool2_filter);
if (!f) /* not enough memory */
return -1;
memset(f, 0, sizeof(*f));
f->id = filter->id;
f->ops = filter->ops;
f->conf = filter->conf;
f->is_backend_filter = is_backend;
LIST_ADDQ(&s->strm_flt.filters, &f->list);
return 0;
}
/*
* Called when a stream is created. It attaches all frontend filters to the
* stream. Returns -1 if an error occurs, 0 otherwise.
*/
int
flt_stream_init(struct stream *s)
{
struct filter *filter;
LIST_INIT(&s->strm_flt.filters);
memset(s->strm_flt.current, 0, sizeof(s->strm_flt.current));
list_for_each_entry(filter, &strm_fe(s)->filters, list) {
if (flt_stream_add_filter(s, filter, 0) < 0)
return -1;
}
return 0;
}
/*
* Called when a stream is closed or when analyze ends (For an HTTP stream, this
* happens after each request/response exchange). When analyze ends, backend
* filters are removed. When the stream is closed, all filters attached to the
* stream are removed.
*/
void
flt_stream_release(struct stream *s, int only_backend)
{
struct filter *filter, *back;
list_for_each_entry_safe(filter, back, &s->strm_flt.filters, list) {
if (!only_backend || filter->is_backend_filter) {
LIST_DEL(&filter->list);
pool_free2(pool2_filter, filter);
}
}
}
/*
* Calls 'stream_start' for all filters attached to a stream. This happens when
* the stream is created, just after calling flt_stream_init
@ -311,6 +367,26 @@ flt_stream_stop(struct stream *s)
}
}
/*
* Called when a backend is set for a stream. If the frontend and the backend
* are the same, this function does nothing. Else it attaches all backend
* filters to the stream. Returns -1 if an error occurs, 0 otherwise.
*/
int
flt_set_stream_backend(struct stream *s, struct proxy *be)
{
struct filter *filter;
if (strm_fe(s) == be)
return 0;
list_for_each_entry(filter, &be->filters, list) {
if (flt_stream_add_filter(s, filter, 1) < 0)
return -1;
}
return 0;
}
int
flt_http_headers(struct stream *s, struct http_msg *msg)
{
@ -691,8 +767,6 @@ end:
/* Check if 'channel_end_analyze' callback has been called for the
* request and the response. */
if (!(s->req.analysers & AN_FLT_END) && !(s->res.analysers & AN_FLT_END)) {
struct filter *filter, *back;
/* When we are waiting for a new request, so we must reset
* stream analyzers. The input must not be closed the request
* channel, else it is useless to wait. */
@ -701,12 +775,8 @@ end:
s->res.analysers = 0;
}
list_for_each_entry_safe(filter, back, &s->strm_flt.filters, list) {
if (filter->is_backend_filter) {
LIST_DEL(&filter->list);
pool_free2(pool2_filter, filter);
}
}
/* Remove backend filters from the list */
flt_stream_release(s, 1);
}
else if (ret) {
/* Analyzer ends only for one channel. So wake up the stream to

View File

@ -22,21 +22,406 @@
#include <types/sample.h>
#include <proto/compression.h>
#include <proto/filters.h>
#include <proto/hdr_idx.h>
#include <proto/proto_http.h>
#include <proto/sample.h>
#include <proto/stream.h>
static const char *http_comp_flt_id = "compression filter";
struct flt_ops comp_ops;
static struct buffer *tmpbuf = &buf_empty;
struct comp_chunk {
unsigned int start; /* start of the chunk relative to FLT_FWD offset */
unsigned int end; /* end of the chunk relative to FLT_FWD offset */
int skip; /* if set to 1, the chunk is skipped. Otherwise it is compressed */
int is_last; /* if set, this is the last chunk. Data after this
* chunk will be forwarded as it is. */
struct list list;
};
struct comp_state {
struct comp_ctx *comp_ctx; /* compression context */
struct comp_algo *comp_algo; /* compression algorithm if not NULL */
struct list comp_chunks; /* data chunks that should be compressed or skipped */
unsigned int first; /* offset of the first chunk. Data before
* this offset will be forwarded as it
* is. */
};
static int add_comp_chunk(struct comp_state *st, unsigned int start,
unsigned int len, int skip, int is_last);
static int skip_input_data(struct filter *filter, struct http_msg *msg,
unsigned int consumed);
static int select_compression_request_header(struct comp_state *st,
struct stream *s,
struct http_msg *msg);
static int select_compression_response_header(struct comp_state *st,
struct stream *s,
struct http_msg *msg);
static int http_compression_buffer_init(struct buffer *in, struct buffer *out);
static int http_compression_buffer_add_data(struct comp_state *st,
struct buffer *in,
struct buffer *out, int sz);
static int http_compression_buffer_end(struct comp_state *st, struct stream *s,
struct buffer **in, struct buffer **out,
unsigned int consumed, int end);
/***********************************************************************/
static int
comp_flt_init(struct proxy *px, struct filter *filter)
{
/* We need a compression buffer in the DATA state to put the output of
* compressed data, and in CRLF state to let the TRAILERS state finish
* the job of removing the trailing CRLF.
*/
if (!tmpbuf->size) {
if (b_alloc(&tmpbuf) == NULL)
return -1;
}
return 0;
}
static void
comp_flt_deinit(struct proxy *px, struct filter *filter)
{
if (tmpbuf->size)
b_free(&tmpbuf);
}
static int
comp_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
{
if (filter->ctx == NULL) {
struct comp_state *st;
if (!(st = malloc(sizeof(*st))))
return -1;
LIST_INIT(&st->comp_chunks);
st->comp_algo = NULL;
st->comp_ctx = NULL;
st->first = 0;
filter->ctx = st;
}
return 1;
}
static int
comp_analyze(struct stream *s, struct filter *filter, struct channel *chn,
unsigned int an_bit)
{
struct comp_state *st = filter->ctx;
if (!strm_fe(s)->comp && !s->be->comp)
goto end;
switch (an_bit) {
case AN_RES_HTTP_PROCESS_BE:
select_compression_response_header(st, s, &s->txn->rsp);
break;
}
end:
return 1;
}
static int
comp_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
{
struct comp_state *st = filter->ctx;
struct comp_chunk *cc, *back;
if (!st || !(chn->flags & CF_ISRESP))
goto end;
list_for_each_entry_safe(cc, back, &st->comp_chunks, list) {
LIST_DEL(&cc->list);
free(cc);
}
if (!st->comp_algo || !s->txn->status)
goto release_ctx;
if (strm_fe(s)->mode == PR_MODE_HTTP)
strm_fe(s)->fe_counters.p.http.comp_rsp++;
if ((s->flags & SF_BE_ASSIGNED) && (s->be->mode == PR_MODE_HTTP))
s->be->be_counters.p.http.comp_rsp++;
/* release any possible compression context */
st->comp_algo->end(&st->comp_ctx);
release_ctx:
free(st);
filter->ctx = NULL;
end:
return 1;
}
static int
comp_http_headers(struct stream *s, struct filter *filter,
struct http_msg *msg)
{
struct comp_state *st = filter->ctx;
if (strm_fe(s)->comp || s->be->comp) {
if (!(msg->chn->flags & CF_ISRESP))
select_compression_request_header(st, s, msg);
}
return 1;
}
static int
comp_skip_http_chunk_envelope(struct stream *s, struct filter *filter,
struct http_msg *msg)
{
struct comp_state *st = filter->ctx;
unsigned int start;
int ret;
if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) {
flt_set_forward_data(filter, msg->chn);
return 1;
}
start = FLT_NXT(filter, msg->chn) - FLT_FWD(filter, msg->chn);
/* If this is the last chunk, we flag it */
if (msg->chunk_len == 0 && msg->msg_state == HTTP_MSG_CHUNK_SIZE)
ret = add_comp_chunk(st, start, 0, 1, 1);
else
ret = add_comp_chunk(st, start, msg->sol, 1, 0);
return !ret ? 1 : -1;
}
static int
comp_http_data(struct stream *s, struct filter *filter,
struct http_msg *msg)
{
struct comp_state *st = filter->ctx;
unsigned int start;
int is_last, ret;
ret = MIN(msg->chunk_len + msg->next, msg->chn->buf->i) - FLT_NXT(filter, msg->chn);
if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) {
flt_set_forward_data(filter, msg->chn);
goto end;
}
if (!ret)
goto end;
start = FLT_NXT(filter, msg->chn) - FLT_FWD(filter, msg->chn);
is_last = (!(msg->flags & HTTP_MSGF_TE_CHNK) &&
(msg->chunk_len == ret - msg->next + FLT_NXT(filter, msg->chn)));
if (add_comp_chunk(st, start, ret, 0, is_last) == -1)
ret = -1;
end:
return ret;
}
static int
comp_http_forward_data(struct stream *s, struct filter *filter,
struct http_msg *msg, unsigned int len)
{
struct comp_state *st = filter->ctx;
struct comp_chunk *cc, *back;
unsigned int sz, consumed = 0, compressed = 0;
int is_last = 0, ret = len;
if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) {
flt_set_forward_data(filter, msg->chn);
goto end;
}
/* no data to forward or no chunk or the first chunk is too far */
if (!len || LIST_ISEMPTY(&st->comp_chunks))
goto end;
if (st->first > len) {
consumed = len;
goto update_chunks;
}
/* initialize the buffer used to write compressed data */
b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first);
ret = http_compression_buffer_init(msg->chn->buf, tmpbuf);
b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first);
if (ret < 0) {
msg->chn->flags |= CF_WAKE_WRITE;
return 0;
}
/* Loop on all chunks */
list_for_each_entry_safe(cc, back, &st->comp_chunks, list) {
/* current chunk must not be handled yet */
if (len <= cc->start) {
consumed = len;
break;
}
/* Get the number of bytes that must be handled in the current
* chunk */
sz = MIN(len, cc->end) - cc->start;
if (cc->skip) {
/* No compression for this chunk, data must be
* skipped. This happens when the HTTP response is
* chunked, the chunk envelope is skipped. */
ret = sz;
}
else {
/* Compress the chunk */
b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + cc->start);
ret = http_compression_buffer_add_data(st, msg->chn->buf, tmpbuf, sz);
b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + cc->start);
if (ret < 0)
goto end;
compressed += ret;
}
/* Update the chunk by removing consumed bytes. If all bytes are
* consumed, the chunk is removed from the list and we
* loop. Otherwise, we stop here. */
cc->start += ret;
consumed = cc->start;
if (cc->start != cc->end)
break;
/* Remember if this is the last chunk */
is_last = cc->is_last;
LIST_DEL(&cc->list);
free(cc);
}
if (compressed) {
/* Some data was compressed so we can switch buffers to replace
* uncompressed data by compressed ones. */
b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first);
ret = http_compression_buffer_end(st, s, &msg->chn->buf, &tmpbuf,
consumed - st->first, is_last);
b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first);
}
else {
/* Here some data was consumed but no compression was
* preformed. This means that all consumed data must be
* skipped.
*/
ret = skip_input_data(filter, msg, consumed);
}
if (is_last && !(msg->flags & HTTP_MSGF_TE_CHNK)) {
/* At the end of data, if the original response was not
* chunked-encoded, we must write the empty chunk 0<CRLF>, and
* terminate the (empty) trailers section with a last <CRLF>. If
* we're forwarding a chunked-encoded response, these parts are
* preserved and not rewritten.
*/
char *p = bi_end(msg->chn->buf);
memcpy(p, "0\r\n\r\n", 5);
msg->chn->buf->i += 5;
ret += 5;
}
/* Then, the last step. We need to update state of other filters. */
if (ret >= 0) {
flt_change_forward_size(filter, msg->chn, -(consumed - st->first - ret));
msg->next -= (consumed - st->first - ret);
ret += st->first;
}
update_chunks:
/* Now, we need to update all remaining chunks to keep them synchronized
* with the next position of buf->p. If the chunk list is empty, we
* forward remaining data, if any. */
st->first -= MIN(st->first, consumed);
if (LIST_ISEMPTY(&st->comp_chunks))
ret += len - consumed;
else {
list_for_each_entry(cc, &st->comp_chunks, list) {
cc->start -= consumed;
cc->end -= consumed;
}
}
end:
return ret;
}
/***********************************************************************/
static int
add_comp_chunk(struct comp_state *st, unsigned int start, unsigned int len,
int skip, int is_last)
{
struct comp_chunk *cc;
if (!(cc = malloc(sizeof(*cc))))
return -1;
cc->start = start;
cc->end = start + len;
cc->skip = skip;
cc->is_last = is_last;
if (LIST_ISEMPTY(&st->comp_chunks))
st->first = cc->start;
LIST_ADDQ(&st->comp_chunks, &cc->list);
return 0;
}
/* This function might be moved in a filter function, probably with others to
* add/remove/move/replace buffer data */
static int
skip_input_data(struct filter *filter, struct http_msg *msg,
unsigned int consumed)
{
struct comp_state *st = filter->ctx;
int block1, block2;
/* 1. Copy input data, skipping consumed ones. */
b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first + consumed);
block1 = msg->chn->buf->i;
if (block1 > bi_contig_data(msg->chn->buf))
block1 = bi_contig_data(msg->chn->buf);
block2 = msg->chn->buf->i - block1;
memcpy(trash.str, bi_ptr(msg->chn->buf), block1);
if (block2 > 0)
memcpy(trash.str + block1, msg->chn->buf->data, block2);
trash.len = block1 + block2;
b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first + consumed);
/* 2. Then write back these data at the right place in the buffer */
b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first);
block1 = trash.len;
if (block1 > bi_contig_data(msg->chn->buf))
block1 = bi_contig_data(msg->chn->buf);
block2 = trash.len - block1;
memcpy(bi_ptr(msg->chn->buf), trash.str, block1);
if (block2 > 0)
memcpy(msg->chn->buf->data, trash.str + block1, block2);
b_rew(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->first);
/* Then adjut the input size */
msg->chn->buf->i -= consumed;
return 0;
}
/***********************************************************************/
/*
* Selects a compression algorithm depending on the client request.
*/
int
select_compression_request_header(struct stream *s, struct buffer *req)
select_compression_request_header(struct comp_state *st, struct stream *s,
struct http_msg *msg)
{
struct http_txn *txn = s->txn;
struct http_msg *msg = &txn->req;
struct buffer *req = msg->chn->buf;
struct hdr_ctx ctx;
struct comp_algo *comp_algo = NULL;
struct comp_algo *comp_algo_back = NULL;
@ -54,12 +439,13 @@ select_compression_request_header(struct stream *s, struct buffer *req)
ctx.line[ctx.val + 30] < '6' ||
(ctx.line[ctx.val + 30] == '6' &&
(ctx.vlen < 54 || memcmp(ctx.line + 51, "SV1", 3) != 0)))) {
s->comp_algo = NULL;
st->comp_algo = NULL;
return 0;
}
/* search for the algo in the backend in priority or the frontend */
if ((s->be->comp && (comp_algo_back = s->be->comp->algos)) || (strm_fe(s)->comp && (comp_algo_back = strm_fe(s)->comp->algos))) {
if ((s->be->comp && (comp_algo_back = s->be->comp->algos)) ||
(strm_fe(s)->comp && (comp_algo_back = strm_fe(s)->comp->algos))) {
int best_q = 0;
ctx.idx = 0;
@ -107,7 +493,7 @@ select_compression_request_header(struct stream *s, struct buffer *req)
for (comp_algo = comp_algo_back; comp_algo; comp_algo = comp_algo->next) {
if (*(ctx.line + ctx.val) == '*' ||
word_match(ctx.line + ctx.val, toklen, comp_algo->ua_name, comp_algo->ua_name_len)) {
s->comp_algo = comp_algo;
st->comp_algo = comp_algo;
best_q = q;
break;
}
@ -116,8 +502,9 @@ select_compression_request_header(struct stream *s, struct buffer *req)
}
/* remove all occurrences of the header when "compression offload" is set */
if (s->comp_algo) {
if ((s->be->comp && s->be->comp->offload) || (strm_fe(s)->comp && strm_fe(s)->comp->offload)) {
if (st->comp_algo) {
if ((s->be->comp && s->be->comp->offload) ||
(strm_fe(s)->comp && strm_fe(s)->comp->offload)) {
http_remove_header2(msg, &txn->hdr_idx, &ctx);
ctx.idx = 0;
while (http_find_header2("Accept-Encoding", 15, req->p, &txn->hdr_idx, &ctx)) {
@ -128,38 +515,43 @@ select_compression_request_header(struct stream *s, struct buffer *req)
}
/* identity is implicit does not require headers */
if ((s->be->comp && (comp_algo_back = s->be->comp->algos)) || (strm_fe(s)->comp && (comp_algo_back = strm_fe(s)->comp->algos))) {
if ((s->be->comp && (comp_algo_back = s->be->comp->algos)) ||
(strm_fe(s)->comp && (comp_algo_back = strm_fe(s)->comp->algos))) {
for (comp_algo = comp_algo_back; comp_algo; comp_algo = comp_algo->next) {
if (comp_algo->cfg_name_len == 8 && memcmp(comp_algo->cfg_name, "identity", 8) == 0) {
s->comp_algo = comp_algo;
st->comp_algo = comp_algo;
return 1;
}
}
}
s->comp_algo = NULL;
st->comp_algo = NULL;
return 0;
}
/*
* Selects a comression algorithm depending of the server response.
*/
int
select_compression_response_header(struct stream *s, struct buffer *res)
static int
select_compression_response_header(struct comp_state *st, struct stream *s, struct http_msg *msg)
{
struct http_txn *txn = s->txn;
struct http_msg *msg = &txn->rsp;
struct buffer *res = msg->chn->buf;
struct hdr_ctx ctx;
struct comp_type *comp_type;
/* no common compression algorithm was found in request header */
if (s->comp_algo == NULL)
if (st->comp_algo == NULL)
goto fail;
/* HTTP < 1.1 should not be compressed */
if (!(msg->flags & HTTP_MSGF_VER_11) || !(txn->req.flags & HTTP_MSGF_VER_11))
goto fail;
if (txn->meth == HTTP_METH_HEAD)
goto fail;
/* compress 200,201,202,203 responses only */
if ((txn->status != 200) &&
(txn->status != 201) &&
@ -210,7 +602,8 @@ select_compression_response_header(struct stream *s, struct buffer *res)
}
}
else { /* no content-type header */
if ((s->be->comp && s->be->comp->types) || (strm_fe(s)->comp && strm_fe(s)->comp->types))
if ((s->be->comp && s->be->comp->types) ||
(strm_fe(s)->comp && strm_fe(s)->comp->types))
goto fail; /* a content-type was required */
}
@ -224,11 +617,9 @@ select_compression_response_header(struct stream *s, struct buffer *res)
goto fail;
/* initialize compression */
if (s->comp_algo->init(&s->comp_ctx, global.tune.comp_maxlevel) < 0)
if (st->comp_algo->init(&st->comp_ctx, global.tune.comp_maxlevel) < 0)
goto fail;
s->flags |= SF_COMP_READY;
/* remove Content-Length header */
ctx.idx = 0;
if ((msg->flags & HTTP_MSGF_CNT_LEN) && http_find_header2("Content-Length", 14, res->p, &txn->hdr_idx, &ctx))
@ -244,18 +635,19 @@ select_compression_response_header(struct stream *s, struct buffer *res)
* Accept-Encoding header, and SHOULD NOT be used in the Content-Encoding
* header.
*/
if (s->comp_algo->cfg_name_len != 8 || memcmp(s->comp_algo->cfg_name, "identity", 8) != 0) {
if (st->comp_algo->cfg_name_len != 8 || memcmp(st->comp_algo->cfg_name, "identity", 8) != 0) {
trash.len = 18;
memcpy(trash.str, "Content-Encoding: ", trash.len);
memcpy(trash.str + trash.len, s->comp_algo->ua_name, s->comp_algo->ua_name_len);
trash.len += s->comp_algo->ua_name_len;
memcpy(trash.str + trash.len, st->comp_algo->ua_name, st->comp_algo->ua_name_len);
trash.len += st->comp_algo->ua_name_len;
trash.str[trash.len] = '\0';
http_header_add_tail2(&txn->rsp, &txn->hdr_idx, trash.str, trash.len);
}
msg->flags |= HTTP_MSGF_COMPRESSING;
return 1;
fail:
s->comp_algo = NULL;
st->comp_algo = NULL;
return 0;
}
@ -282,8 +674,8 @@ http_emit_chunk_size(char *end, unsigned int chksz)
/*
* Init HTTP compression
*/
int
http_compression_buffer_init(struct stream *s, struct buffer *in, struct buffer *out)
static int
http_compression_buffer_init(struct buffer *in, struct buffer *out)
{
/* output stream requires at least 10 bytes for the gzip header, plus
* at least 8 bytes for the gzip trailer (crc+len), plus a possible
@ -307,43 +699,37 @@ http_compression_buffer_init(struct stream *s, struct buffer *in, struct buffer
/*
* Add data to compress
*/
int
http_compression_buffer_add_data(struct stream *s, struct buffer *in, struct buffer *out)
static int
http_compression_buffer_add_data(struct comp_state *st, struct buffer *in,
struct buffer *out, int sz)
{
struct http_msg *msg = &s->txn->rsp;
int consumed_data = 0;
int data_process_len;
int block1, block2;
/*
* Temporarily skip already parsed data and chunks to jump to the
* actual data block. It is fixed before leaving.
*/
b_adv(in, msg->next);
if (!sz)
return 0;
/*
* select the smallest size between the announced chunk size, the input
/* select the smallest size between the announced chunk size, the input
* data, and the available output buffer size. The compressors are
* assumed to be able to process all the bytes we pass to them at once.
*/
data_process_len = MIN(in->i, msg->chunk_len);
* assumed to be able to process all the bytes we pass to them at
* once. */
data_process_len = sz;
data_process_len = MIN(out->size - buffer_len(out), data_process_len);
block1 = data_process_len;
if (block1 > bi_contig_data(in))
block1 = bi_contig_data(in);
block2 = data_process_len - block1;
/* compressors return < 0 upon error or the amount of bytes read */
consumed_data = s->comp_algo->add_data(s->comp_ctx, bi_ptr(in), block1, out);
consumed_data = st->comp_algo->add_data(st->comp_ctx, bi_ptr(in), block1, out);
if (consumed_data >= 0 && block2 > 0) {
consumed_data = s->comp_algo->add_data(s->comp_ctx, in->data, block2, out);
consumed_data = st->comp_algo->add_data(st->comp_ctx, in->data, block2, out);
if (consumed_data >= 0)
consumed_data += block1;
}
/* restore original buffer pointer */
b_rew(in, msg->next);
return consumed_data;
}
@ -351,24 +737,23 @@ http_compression_buffer_add_data(struct stream *s, struct buffer *in, struct buf
* Flush data in process, and write the header and footer of the chunk. Upon
* success, in and out buffers are swapped to avoid a copy.
*/
int
http_compression_buffer_end(struct stream *s, struct buffer **in, struct buffer **out, int end)
static int
http_compression_buffer_end(struct comp_state *st, struct stream *s,
struct buffer **in, struct buffer **out,
unsigned int consumed, int end)
{
int to_forward;
int left;
struct http_msg *msg = &s->txn->rsp;
struct buffer *ib = *in, *ob = *out;
char *tail;
int to_forward, left;
#if defined(USE_SLZ) || defined(USE_ZLIB)
int ret;
/* flush data here */
if (end)
ret = s->comp_algo->finish(s->comp_ctx, ob); /* end of data */
ret = st->comp_algo->finish(st->comp_ctx, ob); /* end of data */
else
ret = s->comp_algo->flush(s->comp_ctx, ob); /* end of buffer */
ret = st->comp_algo->flush(st->comp_ctx, ob); /* end of buffer */
if (ret < 0)
return -1; /* flush failed */
@ -419,39 +804,21 @@ http_compression_buffer_end(struct stream *s, struct buffer **in, struct buffer
*tail++ = '\r';
*tail++ = '\n';
/* At the end of data, we must write the empty chunk 0<CRLF>,
* and terminate the trailers section with a last <CRLF>. If
* we're forwarding a chunked-encoded response, we'll have a
* trailers section after the empty chunk which needs to be
* forwarded and which will provide the last CRLF. Otherwise
* we write it ourselves.
*/
if (msg->msg_state >= HTTP_MSG_TRAILERS) {
memcpy(tail, "0\r\n", 3);
tail += 3;
if (msg->msg_state >= HTTP_MSG_ENDING) {
memcpy(tail, "\r\n", 2);
tail += 2;
}
}
ob->i = tail - ob->p;
to_forward = ob->i;
/* update input rate */
if (s->comp_ctx && s->comp_ctx->cur_lvl > 0) {
update_freq_ctr(&global.comp_bps_in, msg->next);
strm_fe(s)->fe_counters.comp_in += msg->next;
s->be->be_counters.comp_in += msg->next;
if (st->comp_ctx && st->comp_ctx->cur_lvl > 0) {
update_freq_ctr(&global.comp_bps_in, consumed);
strm_fe(s)->fe_counters.comp_in += consumed;
s->be->be_counters.comp_in += consumed;
} else {
strm_fe(s)->fe_counters.comp_byp += msg->next;
s->be->be_counters.comp_byp += msg->next;
strm_fe(s)->fe_counters.comp_byp += consumed;
s->be->be_counters.comp_byp += consumed;
}
/* copy the remaining data in the tmp buffer. */
b_adv(ib, msg->next);
msg->next = 0;
b_adv(ib, consumed);
if (ib->i > 0) {
left = bi_contig_data(ib);
memcpy(ob->p + ob->i, bi_ptr(ib), left);
@ -466,26 +833,40 @@ http_compression_buffer_end(struct stream *s, struct buffer **in, struct buffer
*in = ob;
*out = ib;
if (s->comp_ctx && s->comp_ctx->cur_lvl > 0) {
if (st->comp_ctx && st->comp_ctx->cur_lvl > 0) {
update_freq_ctr(&global.comp_bps_out, to_forward);
strm_fe(s)->fe_counters.comp_out += to_forward;
s->be->be_counters.comp_out += to_forward;
}
/* forward the new chunk without remaining data */
b_adv(ob, to_forward);
return to_forward;
}
/***********************************************************************/
struct flt_ops comp_ops = {
.init = comp_flt_init,
.deinit = comp_flt_deinit,
.channel_start_analyze = comp_start_analyze,
.channel_analyze = comp_analyze,
.channel_end_analyze = comp_end_analyze,
.http_headers = comp_http_headers,
.http_start_chunk = comp_skip_http_chunk_envelope,
.http_end_chunk = comp_skip_http_chunk_envelope,
.http_last_chunk = comp_skip_http_chunk_envelope,
.http_data = comp_http_data,
.http_forward_data = comp_http_forward_data,
};
static int
parse_compression_options(char **args, int section, struct proxy *proxy,
struct proxy *defpx, const char *file, int line,
char **err)
{
struct comp *comp;
struct comp *comp;
if (proxy->comp == NULL) {
comp = calloc(1, sizeof(struct comp));
@ -544,27 +925,107 @@ parse_compression_options(char **args, int section, struct proxy *proxy,
return 0;
}
/* boolean, returns true if compression is used (either gzip or deflate) in the response */
static int
smp_fetch_res_comp(const struct arg *args, struct sample *smp, const char *kw, void *private)
parse_http_comp_flt(char **args, int *cur_arg, struct proxy *px,
struct filter *filter, char **err)
{
struct filter *flt, *back;
list_for_each_entry_safe(flt, back, &px->filters, list) {
if (flt->id == http_comp_flt_id) {
memprintf(err, "%s: Proxy supports only one compression filter\n", px->id);
return -1;
}
}
filter->id = http_comp_flt_id;
filter->conf = NULL;
filter->ops = &comp_ops;
(*cur_arg)++;
return 0;
}
int
check_legacy_http_comp_flt(struct proxy *proxy)
{
struct filter *filter;
int err = 0;
if (proxy->comp == NULL)
goto end;
if (!LIST_ISEMPTY(&proxy->filters)) {
list_for_each_entry(filter, &proxy->filters, list) {
if (filter->id == http_comp_flt_id)
goto end;
}
Alert("config: %s '%s': require an explicit filter declaration to use HTTP compression\n",
proxy_type_str(proxy), proxy->id);
err++;
goto end;
}
filter = pool_alloc2(pool2_filter);
if (!filter) {
Alert("config: %s '%s': out of memory\n",
proxy_type_str(proxy), proxy->id);
err++;
goto end;
}
memset(filter, 0, sizeof(*filter));
filter->id = http_comp_flt_id;
filter->conf = NULL;
filter->ops = &comp_ops;
LIST_ADDQ(&proxy->filters, &filter->list);
end:
return err;
}
/*
* boolean, returns true if compression is used (either gzip or deflate) in the
* response.
*/
static int
smp_fetch_res_comp(const struct arg *args, struct sample *smp, const char *kw,
void *private)
{
struct http_txn *txn = smp->strm->txn;
smp->data.type = SMP_T_BOOL;
smp->data.u.sint = (smp->strm->comp_algo != NULL);
smp->data.u.sint = (txn && (txn->rsp.flags & HTTP_MSGF_COMPRESSING));
return 1;
}
/* string, returns algo */
/*
* string, returns algo
*/
static int
smp_fetch_res_comp_algo(const struct arg *args, struct sample *smp, const char *kw, void *private)
smp_fetch_res_comp_algo(const struct arg *args, struct sample *smp,
const char *kw, void *private)
{
if (!smp->strm->comp_algo)
struct http_txn *txn = smp->strm->txn;
struct filter *filter;
struct comp_state *st;
if (!(txn || !(txn->rsp.flags & HTTP_MSGF_COMPRESSING)))
return 0;
smp->data.type = SMP_T_STR;
smp->flags = SMP_F_CONST;
smp->data.u.str.str = smp->strm->comp_algo->cfg_name;
smp->data.u.str.len = smp->strm->comp_algo->cfg_name_len;
return 1;
list_for_each_entry(filter, &smp->strm->strm_flt.filters, list) {
if (filter->id != http_comp_flt_id)
continue;
if (!(st = filter->ctx))
break;
smp->data.type = SMP_T_STR;
smp->flags = SMP_F_CONST;
smp->data.u.str.str = st->comp_algo->cfg_name;
smp->data.u.str.len = st->comp_algo->cfg_name_len;
return 1;
}
return 0;
}
/* Declare the config parser for "compression" keyword */
@ -574,16 +1035,26 @@ static struct cfg_kw_list cfg_kws = {ILH, {
}
};
/* Declare the filter parser for "compression" keyword */
static struct flt_kw_list filter_kws = { "COMP", { }, {
{ "compression", parse_http_comp_flt },
{ NULL, NULL },
}
};
/* Note: must not be declared <const> as its list will be overwritten */
static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, {
{ "res.comp", smp_fetch_res_comp, 0, NULL, SMP_T_BOOL, SMP_USE_HRSHP },
{ "res.comp_algo", smp_fetch_res_comp_algo, 0, NULL, SMP_T_STR, SMP_USE_HRSHP },
{ /* END */ },
}};
{ "res.comp", smp_fetch_res_comp, 0, NULL, SMP_T_BOOL, SMP_USE_HRSHP },
{ "res.comp_algo", smp_fetch_res_comp_algo, 0, NULL, SMP_T_STR, SMP_USE_HRSHP },
{ /* END */ },
}
};
__attribute__((constructor))
static void __flt_http_comp_init(void)
static void
__flt_http_comp_init(void)
{
cfg_register_keywords(&cfg_kws);
flt_register_keywords(&filter_kws);
sample_register_fetches(&sample_fetch_keywords);
}

View File

@ -76,6 +76,7 @@
#include <common/version.h>
#include <types/capture.h>
#include <types/compression.h>
#include <types/filters.h>
#include <types/global.h>
#include <types/acl.h>

View File

@ -69,8 +69,6 @@
#include <proto/pattern.h>
#include <proto/vars.h>
#include <proto/flt_http_comp.h> /* NOTE: temporary include, will be removed very soon */
const char HTTP_100[] =
"HTTP/1.1 100 Continue\r\n\r\n";
@ -4193,10 +4191,6 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
if (!(s->flags & SF_FINST_MASK))
s->flags |= SF_FINST_R;
/* we may want to compress the stats page */
if (sess->fe->comp || s->be->comp)
select_compression_request_header(s, req->buf);
/* enable the minimally required analyzers to handle keep-alive and compression on the HTTP response */
req->analysers &= (AN_REQ_HTTP_BODY | AN_FLT_END);
req->analysers &= ~AN_FLT_XFER_DATA;
@ -4335,9 +4329,6 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit)
req->buf->i,
req->analysers);
if (sess->fe->comp || s->be->comp)
select_compression_request_header(s, req->buf);
/*
* Right now, we know that we have processed the entire headers
* and that unwanted requests have been filtered out. We can do
@ -4942,15 +4933,11 @@ void http_end_txn_clean_session(struct stream *s)
if (fe->mode == PR_MODE_HTTP) {
fe->fe_counters.p.http.rsp[n]++;
if (s->comp_algo && (s->flags & SF_COMP_READY))
fe->fe_counters.p.http.comp_rsp++;
}
if ((s->flags & SF_BE_ASSIGNED) &&
(be->mode == PR_MODE_HTTP)) {
be->be_counters.p.http.rsp[n]++;
be->be_counters.p.http.cum_req++;
if (s->comp_algo && (s->flags & SF_COMP_READY))
be->be_counters.p.http.comp_rsp++;
}
}
@ -6289,7 +6276,6 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
(txn->status >= 100 && txn->status < 200) ||
txn->status == 204 || txn->status == 304) {
msg->flags |= HTTP_MSGF_XFER_LEN;
s->comp_algo = NULL;
goto skip_content_length;
}
@ -6339,9 +6325,6 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
msg->body_len = msg->chunk_len = cl;
}
if (sess->fe->comp || s->be->comp)
select_compression_response_header(s, rep->buf);
skip_content_length:
/* Now we have to check if we need to modify the Connection header.
* This is more difficult on the response than it is on the request,
@ -7038,8 +7021,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
if (msg->sov > 0)
msg->sov -= ret;
if ((s->comp_algo == NULL || msg->msg_state >= HTTP_MSG_TRAILERS) &&
LIST_ISEMPTY(&s->strm_flt.filters))
if (LIST_ISEMPTY(&s->strm_flt.filters))
msg->chunk_len -= channel_forward(res, msg->chunk_len);
if (res->flags & CF_SHUTW)
@ -7073,7 +7055,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
* Similarly, with keep-alive on the client side, we don't want to forward a
* close.
*/
if ((msg->flags & HTTP_MSGF_TE_CHNK) || s->comp_algo || !msg->body_len ||
if ((msg->flags & HTTP_MSGF_TE_CHNK) || !msg->body_len ||
(msg->flags & HTTP_MSGF_COMPRESSING) ||
(txn->flags & TX_CON_WANT_MSK) == TX_CON_WANT_KAL ||
(txn->flags & TX_CON_WANT_MSK) == TX_CON_WANT_SCL)
channel_dont_close(res);
@ -7086,7 +7069,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
* flag with the last block of forwarded data, which would cause an
* additional delay to be observed by the receiver.
*/
if ((msg->flags & HTTP_MSGF_TE_CHNK) || s->comp_algo)
if ((msg->flags & HTTP_MSGF_TE_CHNK) || (msg->flags & HTTP_MSGF_COMPRESSING))
res->flags |= CF_EXPECT_MORE;
/* the stream handler will take care of timeouts and errors */
@ -8810,12 +8793,6 @@ void http_end_txn(struct stream *s)
struct http_txn *txn = s->txn;
struct proxy *fe = strm_fe(s);
/* release any possible compression context */
if (s->flags & SF_COMP_READY)
s->comp_algo->end(&s->comp_ctx);
s->comp_algo = NULL;
s->flags &= ~SF_COMP_READY;
/* these ones will have been dynamically allocated */
pool_free2(pool2_requri, txn->uri);
pool_free2(pool2_capture, txn->cli_cookie);

View File

@ -1130,8 +1130,6 @@ void resume_proxies(void)
*/
int stream_set_backend(struct stream *s, struct proxy *be)
{
struct filter *filter;
if (s->flags & SF_BE_ASSIGNED)
return 1;
s->be = be;
@ -1140,19 +1138,8 @@ int stream_set_backend(struct stream *s, struct proxy *be)
be->be_counters.conn_max = be->beconn;
proxy_inc_be_ctr(be);
if (strm_fe(s) != be) {
list_for_each_entry(filter, &be->filters, list) {
struct filter *f = pool_alloc2(pool2_filter);
if (!f)
return 0; /* not enough memory */
memset(f, 0, sizeof(*f));
f->id = filter->id;
f->ops = filter->ops;
f->conf = filter->conf;
f->is_backend_filter = 1;
LIST_ADDQ(&s->strm_flt.filters, &f->list);
}
}
if (flt_set_stream_backend(s, be) < 0)
return 0;
/* assign new parameters to the stream from the new backend */
s->si[1].flags &= ~SI_FL_INDEP_STR;

View File

@ -76,7 +76,6 @@ static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *origin)
{
struct stream *s;
struct filter *filter, *back;
struct connection *conn = objt_conn(origin);
struct appctx *appctx = objt_appctx(origin);
@ -147,7 +146,6 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
* when the default backend is assigned.
*/
s->be = sess->fe;
s->comp_algo = NULL;
s->req.buf = s->res.buf = NULL;
s->req_cap = NULL;
s->res_cap = NULL;
@ -217,19 +215,7 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
HLUA_INIT(&s->hlua);
LIST_INIT(&s->strm_flt.filters);
memset(s->strm_flt.current, 0, sizeof(s->strm_flt.current));
list_for_each_entry(filter, &sess->fe->filters, list) {
struct filter *f = pool_alloc2(pool2_filter);
if (!f)
goto out_fail_accept;
memset(f, 0, sizeof(*f));
f->id = filter->id;
f->ops = filter->ops;
f->conf = filter->conf;
LIST_ADDQ(&s->strm_flt.filters, &f->list);
}
if (flt_stream_start(s) < 0)
if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
goto out_fail_accept;
/* finish initialization of the accepted file descriptor */
@ -250,10 +236,7 @@ struct stream *stream_new(struct session *sess, struct task *t, enum obj_type *o
/* Error unrolling */
out_fail_accept:
list_for_each_entry_safe(filter, back, &s->strm_flt.filters, list) {
LIST_DEL(&filter->list);
pool_free2(pool2_filter, filter);
}
flt_stream_release(s, 0);
LIST_DEL(&s->list);
pool_free2(pool2_stream, s);
return NULL;
@ -268,7 +251,6 @@ static void stream_free(struct stream *s)
struct proxy *fe = sess->fe;
struct bref *bref, *back;
struct connection *cli_conn = objt_conn(sess->origin);
struct filter *filter, *fback;
int i;
if (s->pend_pos)
@ -330,10 +312,7 @@ static void stream_free(struct stream *s)
}
flt_stream_stop(s);
list_for_each_entry_safe(filter, fback, &s->strm_flt.filters, list) {
LIST_DEL(&filter->list);
pool_free2(pool2_filter, filter);
}
flt_stream_release(s, 0);
if (fe) {
pool_free2(fe->rsp_cap_pool, s->res_cap);
@ -2552,15 +2531,11 @@ struct task *process_stream(struct task *t)
if (sess->fe->mode == PR_MODE_HTTP) {
sess->fe->fe_counters.p.http.rsp[n]++;
if (s->comp_algo && (s->flags & SF_COMP_READY))
sess->fe->fe_counters.p.http.comp_rsp++;
}
if ((s->flags & SF_BE_ASSIGNED) &&
(s->be->mode == PR_MODE_HTTP)) {
s->be->be_counters.p.http.rsp[n]++;
s->be->be_counters.p.http.cum_req++;
if (s->comp_algo && (s->flags & SF_COMP_READY))
s->be->be_counters.p.http.comp_rsp++;
}
}