MEDIUM: filters: Optimize the HTTP compression for chunk encoded response
Instead of compressing all chunks as they come, we store them in a temporary buffer. The compression happens during the forwarding phase. This change speeds up the compression of chunked response.
This commit is contained in:
parent
3e7bc67722
commit
b77c5c2693
|
@ -33,13 +33,16 @@ static const char *http_comp_flt_id = "compression filter";
|
||||||
struct flt_ops comp_ops;
|
struct flt_ops comp_ops;
|
||||||
|
|
||||||
static struct buffer *tmpbuf = &buf_empty;
|
static struct buffer *tmpbuf = &buf_empty;
|
||||||
|
static struct buffer *zbuf = &buf_empty;
|
||||||
|
|
||||||
struct comp_state {
|
struct comp_state {
|
||||||
struct comp_ctx *comp_ctx; /* compression context */
|
struct comp_ctx *comp_ctx; /* compression context */
|
||||||
struct comp_algo *comp_algo; /* compression algorithm if not NULL */
|
struct comp_algo *comp_algo; /* compression algorithm if not NULL */
|
||||||
int sov;
|
int hdrs_len;
|
||||||
|
int tlrs_len;
|
||||||
int consumed;
|
int consumed;
|
||||||
int initialized;
|
int initialized;
|
||||||
|
int finished;
|
||||||
};
|
};
|
||||||
|
|
||||||
static int select_compression_request_header(struct comp_state *st,
|
static int select_compression_request_header(struct comp_state *st,
|
||||||
|
@ -62,14 +65,10 @@ static int
|
||||||
comp_flt_init(struct proxy *px, struct filter *filter)
|
comp_flt_init(struct proxy *px, struct filter *filter)
|
||||||
{
|
{
|
||||||
|
|
||||||
/* We need a compression buffer in the DATA state to put the output of
|
if (!tmpbuf->size && b_alloc(&tmpbuf) == NULL)
|
||||||
* compressed data, and in CRLF state to let the TRAILERS state finish
|
return -1;
|
||||||
* the job of removing the trailing CRLF.
|
if (!zbuf->size && b_alloc(&zbuf) == NULL)
|
||||||
*/
|
return -1;
|
||||||
if (!tmpbuf->size) {
|
|
||||||
if (b_alloc(&tmpbuf) == NULL)
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,6 +77,8 @@ comp_flt_deinit(struct proxy *px, struct filter *filter)
|
||||||
{
|
{
|
||||||
if (tmpbuf->size)
|
if (tmpbuf->size)
|
||||||
b_free(&tmpbuf);
|
b_free(&tmpbuf);
|
||||||
|
if (zbuf->size)
|
||||||
|
b_free(&zbuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
|
@ -91,9 +92,11 @@ comp_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
|
||||||
|
|
||||||
st->comp_algo = NULL;
|
st->comp_algo = NULL;
|
||||||
st->comp_ctx = NULL;
|
st->comp_ctx = NULL;
|
||||||
st->sov = 0;
|
st->hdrs_len = 0;
|
||||||
|
st->tlrs_len = 0;
|
||||||
st->consumed = 0;
|
st->consumed = 0;
|
||||||
st->initialized = 0;
|
st->initialized = 0;
|
||||||
|
st->finished = 0;
|
||||||
filter->ctx = st;
|
filter->ctx = st;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -114,8 +117,8 @@ comp_analyze(struct stream *s, struct filter *filter, struct channel *chn,
|
||||||
else {
|
else {
|
||||||
select_compression_response_header(st, s, &s->txn->rsp);
|
select_compression_response_header(st, s, &s->txn->rsp);
|
||||||
if (st->comp_algo) {
|
if (st->comp_algo) {
|
||||||
st->sov = s->txn->rsp.sov;
|
|
||||||
register_data_filter(s, chn, filter);
|
register_data_filter(s, chn, filter);
|
||||||
|
st->hdrs_len = s->txn->rsp.sov;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -164,21 +167,38 @@ comp_http_data(struct stream *s, struct filter *filter, struct http_msg *msg)
|
||||||
return len;
|
return len;
|
||||||
|
|
||||||
if (!st->initialized) {
|
if (!st->initialized) {
|
||||||
unsigned int fwd = flt_rsp_fwd(filter) + st->sov;
|
unsigned int fwd = flt_rsp_fwd(filter) + st->hdrs_len;
|
||||||
|
|
||||||
|
b_reset(tmpbuf);
|
||||||
b_adv(buf, fwd);
|
b_adv(buf, fwd);
|
||||||
ret = http_compression_buffer_init(buf, tmpbuf);
|
ret = http_compression_buffer_init(buf, zbuf);
|
||||||
b_rew(buf, fwd);
|
b_rew(buf, fwd);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
msg->chn->flags |= CF_WAKE_WRITE;
|
msg->chn->flags |= CF_WAKE_WRITE;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
b_adv(buf, *nxt);
|
|
||||||
ret = http_compression_buffer_add_data(st, buf, tmpbuf, len);
|
if (msg->flags & HTTP_MSGF_TE_CHNK) {
|
||||||
b_rew(buf, *nxt);
|
int block = bi_contig_data(buf);
|
||||||
if (ret < 0)
|
|
||||||
return ret;
|
len = MIN(tmpbuf->size - buffer_len(tmpbuf), len);
|
||||||
|
if (len > block) {
|
||||||
|
memcpy(bi_end(tmpbuf), b_ptr(buf, *nxt), block);
|
||||||
|
memcpy(bi_end(tmpbuf)+block, buf->data, len - block);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
memcpy(bi_end(tmpbuf), b_ptr(buf, *nxt), len);
|
||||||
|
tmpbuf->i += len;
|
||||||
|
ret = len;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
b_adv(buf, *nxt);
|
||||||
|
ret = http_compression_buffer_add_data(st, buf, zbuf, len);
|
||||||
|
b_rew(buf, *nxt);
|
||||||
|
if (ret < 0)
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
st->initialized = 1;
|
st->initialized = 1;
|
||||||
msg->next += ret;
|
msg->next += ret;
|
||||||
|
@ -192,22 +212,20 @@ comp_http_chunk_trailers(struct stream *s, struct filter *filter,
|
||||||
struct http_msg *msg)
|
struct http_msg *msg)
|
||||||
{
|
{
|
||||||
struct comp_state *st = filter->ctx;
|
struct comp_state *st = filter->ctx;
|
||||||
int ret;
|
|
||||||
|
|
||||||
if (!st->initialized)
|
if (!st->initialized) {
|
||||||
return 1;
|
if (!st->finished) {
|
||||||
|
struct buffer *buf = msg->chn->buf;
|
||||||
|
unsigned int fwd = flt_rsp_fwd(filter) + st->hdrs_len;
|
||||||
|
|
||||||
st->consumed = msg->next - st->sov;
|
b_reset(tmpbuf);
|
||||||
b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->sov);
|
b_adv(buf, fwd);
|
||||||
ret = http_compression_buffer_end(st, s, &msg->chn->buf, &tmpbuf, 1);
|
http_compression_buffer_init(buf, zbuf);
|
||||||
if (ret < 0)
|
b_rew(buf, fwd);
|
||||||
return ret;
|
st->initialized = 1;
|
||||||
|
}
|
||||||
st->initialized = 0;
|
}
|
||||||
st->sov = 0;
|
st->tlrs_len = msg->sol;
|
||||||
msg->next = ret;
|
|
||||||
FLT_NXT(filter, msg->chn) = ret;
|
|
||||||
FLT_FWD(filter, msg->chn) = 0;
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,29 +238,65 @@ comp_http_forward_data(struct stream *s, struct filter *filter,
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
/* To work, previous filters MUST forward all data */
|
/* To work, previous filters MUST forward all data */
|
||||||
if (FLT_FWD(filter, msg->chn) + len != FLT_NXT(filter, msg->chn)) {
|
if (flt_rsp_fwd(filter) + len != flt_rsp_nxt(filter)) {
|
||||||
Warning("HTTP compression failed: unexpected behavior of previous filters\n");
|
Warning("HTTP compression failed: unexpected behavior of previous filters\n");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!st->initialized) {
|
if (!st->initialized) {
|
||||||
ret = len;
|
if (!len) {
|
||||||
st->sov = ((st->sov > ret) ? (st->sov-ret) : 0);
|
/* Nothing to foward */
|
||||||
|
ret = len;
|
||||||
|
}
|
||||||
|
else if (st->hdrs_len > len) {
|
||||||
|
/* Forward part of headers */
|
||||||
|
ret = len;
|
||||||
|
st->hdrs_len -= len;
|
||||||
|
}
|
||||||
|
else if (st->hdrs_len > 0) {
|
||||||
|
/* Forward remaining headers */
|
||||||
|
ret = st->hdrs_len;
|
||||||
|
st->hdrs_len = 0;
|
||||||
|
}
|
||||||
|
else if (msg->msg_state < HTTP_MSG_TRAILERS) {
|
||||||
|
/* Do not forward anything for now. This only happens
|
||||||
|
* with chunk-encoded responses. Waiting data are part
|
||||||
|
* of the chunk envelope (the chunk size or the chunk
|
||||||
|
* CRLF). These data will be skipped during the
|
||||||
|
* compression. */
|
||||||
|
ret = 0;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
/* Forward trailers data */
|
||||||
|
ret = len;
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
st->consumed = len - st->sov;
|
if (msg->flags & HTTP_MSGF_TE_CHNK) {
|
||||||
b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->sov);
|
ret = http_compression_buffer_add_data(st, tmpbuf, zbuf, tmpbuf->i);
|
||||||
ret = http_compression_buffer_end(st, s, &msg->chn->buf, &tmpbuf,
|
if (ret != tmpbuf->i) {
|
||||||
msg->msg_state == HTTP_MSG_ENDING);
|
Warning("HTTP compression failed: Must consume %d bytes but only %d bytes consumed\n",
|
||||||
|
tmpbuf->i, ret);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
st->consumed = len - st->hdrs_len - st->tlrs_len;
|
||||||
|
b_adv(msg->chn->buf, flt_rsp_fwd(filter) + st->hdrs_len);
|
||||||
|
ret = http_compression_buffer_end(st, s, &msg->chn->buf, &zbuf, msg->msg_state >= HTTP_MSG_TRAILERS);
|
||||||
|
b_rew(msg->chn->buf, flt_rsp_fwd(filter) + st->hdrs_len);
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
|
flt_change_forward_size(filter, msg->chn, ret - st->consumed);
|
||||||
|
msg->next += (ret - st->consumed);
|
||||||
|
ret += st->hdrs_len + st->tlrs_len;
|
||||||
|
|
||||||
st->initialized = 0;
|
st->initialized = 0;
|
||||||
st->sov = 0;
|
st->finished = (msg->msg_state >= HTTP_MSG_TRAILERS);
|
||||||
msg->next = ret;
|
st->hdrs_len = 0;
|
||||||
FLT_NXT(filter, msg->chn) = ret;
|
st->tlrs_len = 0;
|
||||||
FLT_FWD(filter, msg->chn) = 0;
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -594,7 +648,6 @@ http_compression_buffer_end(struct comp_state *st, struct stream *s,
|
||||||
return -1; /* flush failed */
|
return -1; /* flush failed */
|
||||||
|
|
||||||
#endif /* USE_ZLIB */
|
#endif /* USE_ZLIB */
|
||||||
|
|
||||||
if (ob->i == 10) {
|
if (ob->i == 10) {
|
||||||
/* No data were appended, let's drop the output buffer and
|
/* No data were appended, let's drop the output buffer and
|
||||||
* keep the input buffer unchanged.
|
* keep the input buffer unchanged.
|
||||||
|
@ -651,7 +704,7 @@ http_compression_buffer_end(struct comp_state *st, struct stream *s,
|
||||||
|
|
||||||
memcpy(tail, "0\r\n", 3);
|
memcpy(tail, "0\r\n", 3);
|
||||||
tail += 3;
|
tail += 3;
|
||||||
if (msg->msg_state == HTTP_MSG_ENDING) {
|
if (!(msg->flags & HTTP_MSGF_TE_CHNK)) {
|
||||||
memcpy(tail, "\r\n", 2);
|
memcpy(tail, "\r\n", 2);
|
||||||
tail += 2;
|
tail += 2;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue