MEDIUM: cache: Add support for endp-to-endp fast-forwarding

It is now possible to directly forward data to the opposite side from the
cache applet. To do so, dedicated functions were added to fast-forward the
payload part of the cached objects. Of course headers and trailers are still
sent via the channel's buffer, using the HTX.

When an object is delivered from the cache, once the applet reaches the
HTX_CACHE_DATA state, it declares it can fast-forward data. From this point,
all data are directly transferred to the oppposite side.
This commit is contained in:
Christopher Faulet 2023-11-23 17:25:30 +01:00
parent 5baa9ea168
commit ebead3c0a1
1 changed files with 219 additions and 7 deletions

View File

@ -35,6 +35,7 @@
#include <haproxy/stconn.h>
#include <haproxy/stream.h>
#include <haproxy/tools.h>
#include <haproxy/xref.h>
#include <haproxy/xxhash.h>
#define CACHE_FLT_F_IMPLICIT_DECL 0x00000001 /* The cache filtre was implicitly declared (ie without
@ -1625,6 +1626,141 @@ static size_t htx_cache_dump_msg(struct appctx *appctx, struct htx *htx, unsigne
return total;
}
static void ff_cache_skip_tlr_blk(struct appctx *appctx, uint32_t info, struct shared_block *shblk, unsigned int offset)
{
struct cache_appctx *ctx = appctx->svcctx;
struct cache_flt_conf *cconf = appctx->rule->arg.act.p[0];
struct shared_context *shctx = shctx_ptr(cconf->c.cache);
unsigned int max, total;
uint32_t blksz;
blksz = (info & 0xff) + ((info >> 8) & 0xfffff);
total = 4;
while (blksz) {
max = MIN(blksz, shctx->block_size - offset);
offset += max;
blksz -= max;
total += max;
if (blksz || offset == shctx->block_size) {
shblk = LIST_NEXT(&shblk->list, typeof(shblk), list);
offset = 0;
}
}
ctx->offset = offset;
ctx->next = shblk;
ctx->sent += total;
}
static unsigned int ff_cache_dump_data_blk(struct appctx *appctx, struct buffer *buf, unsigned int len,
uint32_t info, struct shared_block *shblk, unsigned int offset)
{
struct cache_appctx *ctx = appctx->svcctx;
struct cache_flt_conf *cconf = appctx->rule->arg.act.p[0];
struct shared_context *shctx = shctx_ptr(cconf->c.cache);
unsigned int total, rem_data, data_len;
uint32_t blksz;
total = 0;
data_len = 0;
rem_data = 0;
if (ctx->rem_data)
blksz = ctx->rem_data;
else {
blksz = (info & 0xfffffff);
ctx->sent += 4;
}
if (blksz > len) {
rem_data = blksz - len;
blksz = len;
}
while (blksz) {
size_t sz;
len = MIN(blksz, shctx->block_size - offset);
sz = b_putblk(buf, (char *)(shblk->data + offset), len);
offset += sz;
blksz -= sz;
total += sz;
data_len += sz;
if (sz < len)
break;
if (blksz || offset == shctx->block_size) {
shblk = LIST_NEXT(&shblk->list, typeof(shblk), list);
offset = 0;
}
}
ctx->offset = offset;
ctx->next = shblk;
ctx->sent += total;
ctx->data_sent += data_len;
ctx->rem_data = rem_data + blksz;
return total;
}
static size_t ff_cache_dump_msg(struct appctx *appctx, struct buffer *buf, unsigned int len)
{
struct cache_appctx *ctx = appctx->svcctx;
struct cache_entry *cache_ptr = ctx->entry;
struct shared_block *first = block_ptr(cache_ptr);
struct cache_flt_conf *cconf = appctx->rule->arg.act.p[0];
struct shared_context *shctx = shctx_ptr(cconf->c.cache);
struct shared_block *shblk;
unsigned int offset, sz;
unsigned int ret, total = 0;
while (len && (ctx->sent != first->len - sizeof(*cache_ptr))) {
enum htx_blk_type type;
uint32_t info;
shblk = ctx->next;
offset = ctx->offset;
if (ctx->rem_data) {
type = HTX_BLK_DATA;
info = 0;
goto add_data_blk;
}
/* Get info of the next HTX block. May be split on 2 shblk */
sz = MIN(4, shctx->block_size - offset);
memcpy((char *)&info, (const char *)shblk->data + offset, sz);
offset += sz;
if (sz < 4) {
shblk = LIST_NEXT(&shblk->list, typeof(shblk), list);
memcpy(((char *)&info)+sz, (const char *)shblk->data, 4 - sz);
offset = (4 - sz);
}
/* Get payload of the next HTX block and insert it. */
type = (info >> 28);
if (type == HTX_BLK_DATA) {
add_data_blk:
ret = ff_cache_dump_data_blk(appctx, buf, len, info, shblk, offset);
}
else if (type == HTX_BLK_TLR) {
ff_cache_skip_tlr_blk(appctx, info, shblk, offset);
BUG_ON(ctx->sent != first->len - sizeof(*cache_ptr));
ret = 0;
}
else {
sc_ep_clr(appctx_sc(appctx), SE_FL_MAY_FASTFWD);
ret = 0;
}
if (!ret)
break;
total += ret;
len -= ret;
if (ctx->rem_data)
break;
}
return total;
}
static int htx_cache_add_age_hdr(struct appctx *appctx, struct htx *htx)
{
struct cache_appctx *ctx = appctx->svcctx;
@ -1656,12 +1792,83 @@ static void http_cache_io_handler(struct appctx *appctx)
unsigned int len;
size_t ret, total = 0;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW))))
goto exit;
applet_have_more_data(appctx);
if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) &&
sc_ep_test(sc, SE_FL_MAY_FASTFWD) &&
res->to_forward &&
ctx->data_sent != cache_ptr->body_size) {
struct xref *peer;
struct sedesc *sdo = NULL;
se_fl_clr(appctx->sedesc, SE_FL_WANT_ROOM);
if (channel_data(res)) {
sc_need_room(sc, -1);
goto exit;
}
peer = xref_get_peer_and_lock(&appctx->sedesc->xref);
if (!peer)
goto error;
sdo = container_of(peer, struct sedesc, xref);
xref_unlock(&appctx->sedesc->xref, peer);
len = cache_ptr->body_size - ctx->data_sent;
if (len > res->to_forward)
len = res->to_forward;
len = se_nego_ff(sdo, &BUF_NULL, len, 0);
if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
sc_ep_clr(sc, SE_FL_MAY_FASTFWD);
goto abort_fastfwd;
}
if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
sc_need_room(sc, -1);
goto exit;
}
total = sdo->iobuf.data;
b_add(sdo->iobuf.buf, sdo->iobuf.offset);
ret = ff_cache_dump_msg(appctx, sdo->iobuf.buf, len);
b_sub(sdo->iobuf.buf, sdo->iobuf.offset);
total += ret;
sdo->iobuf.data += ret;
if (ctx->sent == first->len - sizeof(*cache_ptr)) {
sc_ep_clr(sc, SE_FL_MAY_FASTFWD);
se_fl_set(appctx->sedesc, SE_FL_EOI|SE_FL_EOS);
BUG_ON(ctx->data_sent != cache_ptr->body_size);
appctx->st0 = HTX_CACHE_END;
}
if (se_fl_test(appctx->sedesc, SE_FL_EOI)) {
sdo->iobuf.flags |= IOBUF_FL_EOI; /* TODO: it may be good to have a flag to be sure we can
* forward the EOI the to consumer side
*/
}
/* else */
/* applet_have_more_data(appctx); */
se_done_ff(sdo);
if (total > 0) {
if (res->to_forward != CHN_INFINITE_FORWARD)
res->to_forward -= total;
res->total += total;
res->flags |= CF_READ_EVENT;
sc_ep_report_read_activity(sc);
}
goto exit;
}
abort_fastfwd:
len = first->len - sizeof(*cache_ptr) - ctx->sent;
res_htx = htx_from_buf(&res->buf);
total = res_htx->data;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW))))
goto out;
/* Check if the input buffer is available. */
if (!b_size(&res->buf)) {
sc_need_room(sc, 0);
@ -1678,7 +1885,6 @@ static void http_cache_io_handler(struct appctx *appctx)
if (appctx->st0 == HTX_CACHE_HEADER) {
/* Headers must be dump at once. Otherwise it is an error */
len = first->len - sizeof(*cache_ptr) - ctx->sent;
ret = htx_cache_dump_msg(appctx, res_htx, len, HTX_BLK_EOH);
if (!ret || (htx_get_tail_type(res_htx) != HTX_BLK_EOH) ||
!htx_cache_add_age_hdr(appctx, res_htx))
@ -1697,12 +1903,14 @@ static void http_cache_io_handler(struct appctx *appctx)
* Modified" response. */
if (__sc_strm(sc)->txn->meth == HTTP_METH_HEAD || ctx->send_notmodified)
appctx->st0 = HTX_CACHE_EOM;
else
else {
se_fl_set(appctx->sedesc, SE_FL_MAY_FASTFWD);
len = first->len - sizeof(*cache_ptr) - ctx->sent;
appctx->st0 = HTX_CACHE_DATA;
}
}
if (appctx->st0 == HTX_CACHE_DATA) {
len = first->len - sizeof(*cache_ptr) - ctx->sent;
if (len) {
ret = htx_cache_dump_msg(appctx, res_htx, len, HTX_BLK_UNUSED);
if (ret < len) {
@ -1710,6 +1918,7 @@ static void http_cache_io_handler(struct appctx *appctx)
goto out;
}
}
BUG_ON(ctx->data_sent != cache_ptr->body_size);
appctx->st0 = HTX_CACHE_EOM;
}
@ -1721,8 +1930,10 @@ static void http_cache_io_handler(struct appctx *appctx)
}
end:
if (appctx->st0 == HTX_CACHE_END)
if (appctx->st0 == HTX_CACHE_END) {
applet_have_no_more_data(appctx);
se_fl_set(appctx->sedesc, SE_FL_EOS);
}
out:
total = res_htx->data - total;
@ -1730,6 +1941,7 @@ static void http_cache_io_handler(struct appctx *appctx)
channel_add_input(res, total);
htx_to_buf(res_htx, &res->buf);
exit:
/* eat the whole request */
if (co_data(req)) {
req_htx = htx_from_buf(&req->buf);