From ebead3c0a168a38e6cadc4090b1935a26aa7d3a3 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Thu, 23 Nov 2023 17:25:30 +0100 Subject: [PATCH] MEDIUM: cache: Add support for endp-to-endp fast-forwarding It is now possible to directly forward data to the opposite side from the cache applet. To do so, dedicated functions were added to fast-forward the payload part of the cached objects. Of course headers and trailers are still sent via the channel's buffer, using the HTX. When an object is delivered from the cache, once the applet reaches the HTX_CACHE_DATA state, it declares it can fast-forward data. From this point, all data are directly transferred to the oppposite side. --- src/cache.c | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 219 insertions(+), 7 deletions(-) diff --git a/src/cache.c b/src/cache.c index d212f6fa79..8e809f3ac9 100644 --- a/src/cache.c +++ b/src/cache.c @@ -35,6 +35,7 @@ #include #include #include +#include #include #define CACHE_FLT_F_IMPLICIT_DECL 0x00000001 /* The cache filtre was implicitly declared (ie without @@ -1625,6 +1626,141 @@ static size_t htx_cache_dump_msg(struct appctx *appctx, struct htx *htx, unsigne return total; } + +static void ff_cache_skip_tlr_blk(struct appctx *appctx, uint32_t info, struct shared_block *shblk, unsigned int offset) +{ + struct cache_appctx *ctx = appctx->svcctx; + struct cache_flt_conf *cconf = appctx->rule->arg.act.p[0]; + struct shared_context *shctx = shctx_ptr(cconf->c.cache); + unsigned int max, total; + uint32_t blksz; + + blksz = (info & 0xff) + ((info >> 8) & 0xfffff); + total = 4; + while (blksz) { + max = MIN(blksz, shctx->block_size - offset); + offset += max; + blksz -= max; + total += max; + if (blksz || offset == shctx->block_size) { + shblk = LIST_NEXT(&shblk->list, typeof(shblk), list); + offset = 0; + } + } + ctx->offset = offset; + ctx->next = shblk; + ctx->sent += total; +} + +static unsigned int ff_cache_dump_data_blk(struct appctx *appctx, struct buffer *buf, unsigned int len, + uint32_t info, struct shared_block *shblk, unsigned int offset) +{ + struct cache_appctx *ctx = appctx->svcctx; + struct cache_flt_conf *cconf = appctx->rule->arg.act.p[0]; + struct shared_context *shctx = shctx_ptr(cconf->c.cache); + unsigned int total, rem_data, data_len; + uint32_t blksz; + + total = 0; + data_len = 0; + rem_data = 0; + if (ctx->rem_data) + blksz = ctx->rem_data; + else { + blksz = (info & 0xfffffff); + ctx->sent += 4; + } + if (blksz > len) { + rem_data = blksz - len; + blksz = len; + } + + while (blksz) { + size_t sz; + + len = MIN(blksz, shctx->block_size - offset); + sz = b_putblk(buf, (char *)(shblk->data + offset), len); + offset += sz; + blksz -= sz; + total += sz; + data_len += sz; + if (sz < len) + break; + if (blksz || offset == shctx->block_size) { + shblk = LIST_NEXT(&shblk->list, typeof(shblk), list); + offset = 0; + } + } + + ctx->offset = offset; + ctx->next = shblk; + ctx->sent += total; + ctx->data_sent += data_len; + ctx->rem_data = rem_data + blksz; + return total; +} + +static size_t ff_cache_dump_msg(struct appctx *appctx, struct buffer *buf, unsigned int len) +{ + struct cache_appctx *ctx = appctx->svcctx; + struct cache_entry *cache_ptr = ctx->entry; + struct shared_block *first = block_ptr(cache_ptr); + struct cache_flt_conf *cconf = appctx->rule->arg.act.p[0]; + struct shared_context *shctx = shctx_ptr(cconf->c.cache); + struct shared_block *shblk; + unsigned int offset, sz; + unsigned int ret, total = 0; + + while (len && (ctx->sent != first->len - sizeof(*cache_ptr))) { + enum htx_blk_type type; + uint32_t info; + + shblk = ctx->next; + offset = ctx->offset; + if (ctx->rem_data) { + type = HTX_BLK_DATA; + info = 0; + goto add_data_blk; + } + + /* Get info of the next HTX block. May be split on 2 shblk */ + sz = MIN(4, shctx->block_size - offset); + memcpy((char *)&info, (const char *)shblk->data + offset, sz); + offset += sz; + if (sz < 4) { + shblk = LIST_NEXT(&shblk->list, typeof(shblk), list); + memcpy(((char *)&info)+sz, (const char *)shblk->data, 4 - sz); + offset = (4 - sz); + } + + /* Get payload of the next HTX block and insert it. */ + type = (info >> 28); + if (type == HTX_BLK_DATA) { + add_data_blk: + ret = ff_cache_dump_data_blk(appctx, buf, len, info, shblk, offset); + } + else if (type == HTX_BLK_TLR) { + ff_cache_skip_tlr_blk(appctx, info, shblk, offset); + BUG_ON(ctx->sent != first->len - sizeof(*cache_ptr)); + ret = 0; + } + else { + sc_ep_clr(appctx_sc(appctx), SE_FL_MAY_FASTFWD); + ret = 0; + } + + if (!ret) + break; + total += ret; + len -= ret; + + if (ctx->rem_data) + break; + } + + return total; +} + static int htx_cache_add_age_hdr(struct appctx *appctx, struct htx *htx) { struct cache_appctx *ctx = appctx->svcctx; @@ -1656,12 +1792,83 @@ static void http_cache_io_handler(struct appctx *appctx) unsigned int len; size_t ret, total = 0; + if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW)))) + goto exit; + + applet_have_more_data(appctx); + + if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) && + sc_ep_test(sc, SE_FL_MAY_FASTFWD) && + res->to_forward && + ctx->data_sent != cache_ptr->body_size) { + struct xref *peer; + struct sedesc *sdo = NULL; + + se_fl_clr(appctx->sedesc, SE_FL_WANT_ROOM); + if (channel_data(res)) { + sc_need_room(sc, -1); + goto exit; + } + + peer = xref_get_peer_and_lock(&appctx->sedesc->xref); + if (!peer) + goto error; + sdo = container_of(peer, struct sedesc, xref); + xref_unlock(&appctx->sedesc->xref, peer); + + len = cache_ptr->body_size - ctx->data_sent; + if (len > res->to_forward) + len = res->to_forward; + + len = se_nego_ff(sdo, &BUF_NULL, len, 0); + if (sdo->iobuf.flags & IOBUF_FL_NO_FF) { + sc_ep_clr(sc, SE_FL_MAY_FASTFWD); + goto abort_fastfwd; + } + if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) { + sc_need_room(sc, -1); + goto exit; + } + + total = sdo->iobuf.data; + b_add(sdo->iobuf.buf, sdo->iobuf.offset); + ret = ff_cache_dump_msg(appctx, sdo->iobuf.buf, len); + b_sub(sdo->iobuf.buf, sdo->iobuf.offset); + total += ret; + sdo->iobuf.data += ret; + + if (ctx->sent == first->len - sizeof(*cache_ptr)) { + sc_ep_clr(sc, SE_FL_MAY_FASTFWD); + se_fl_set(appctx->sedesc, SE_FL_EOI|SE_FL_EOS); + BUG_ON(ctx->data_sent != cache_ptr->body_size); + appctx->st0 = HTX_CACHE_END; + } + + if (se_fl_test(appctx->sedesc, SE_FL_EOI)) { + sdo->iobuf.flags |= IOBUF_FL_EOI; /* TODO: it may be good to have a flag to be sure we can + * forward the EOI the to consumer side + */ + } + /* else */ + /* applet_have_more_data(appctx); */ + + se_done_ff(sdo); + + if (total > 0) { + if (res->to_forward != CHN_INFINITE_FORWARD) + res->to_forward -= total; + res->total += total; + res->flags |= CF_READ_EVENT; + sc_ep_report_read_activity(sc); + } + goto exit; + } + + abort_fastfwd: + len = first->len - sizeof(*cache_ptr) - ctx->sent; res_htx = htx_from_buf(&res->buf); total = res_htx->data; - if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR|SE_FL_SHR|SE_FL_SHW)))) - goto out; - /* Check if the input buffer is available. */ if (!b_size(&res->buf)) { sc_need_room(sc, 0); @@ -1678,7 +1885,6 @@ static void http_cache_io_handler(struct appctx *appctx) if (appctx->st0 == HTX_CACHE_HEADER) { /* Headers must be dump at once. Otherwise it is an error */ - len = first->len - sizeof(*cache_ptr) - ctx->sent; ret = htx_cache_dump_msg(appctx, res_htx, len, HTX_BLK_EOH); if (!ret || (htx_get_tail_type(res_htx) != HTX_BLK_EOH) || !htx_cache_add_age_hdr(appctx, res_htx)) @@ -1697,12 +1903,14 @@ static void http_cache_io_handler(struct appctx *appctx) * Modified" response. */ if (__sc_strm(sc)->txn->meth == HTTP_METH_HEAD || ctx->send_notmodified) appctx->st0 = HTX_CACHE_EOM; - else + else { + se_fl_set(appctx->sedesc, SE_FL_MAY_FASTFWD); + len = first->len - sizeof(*cache_ptr) - ctx->sent; appctx->st0 = HTX_CACHE_DATA; + } } if (appctx->st0 == HTX_CACHE_DATA) { - len = first->len - sizeof(*cache_ptr) - ctx->sent; if (len) { ret = htx_cache_dump_msg(appctx, res_htx, len, HTX_BLK_UNUSED); if (ret < len) { @@ -1710,6 +1918,7 @@ static void http_cache_io_handler(struct appctx *appctx) goto out; } } + BUG_ON(ctx->data_sent != cache_ptr->body_size); appctx->st0 = HTX_CACHE_EOM; } @@ -1721,8 +1930,10 @@ static void http_cache_io_handler(struct appctx *appctx) } end: - if (appctx->st0 == HTX_CACHE_END) + if (appctx->st0 == HTX_CACHE_END) { + applet_have_no_more_data(appctx); se_fl_set(appctx->sedesc, SE_FL_EOS); + } out: total = res_htx->data - total; @@ -1730,6 +1941,7 @@ static void http_cache_io_handler(struct appctx *appctx) channel_add_input(res, total); htx_to_buf(res_htx, &res->buf); + exit: /* eat the whole request */ if (co_data(req)) { req_htx = htx_from_buf(&req->buf);