MEDIUM: applet: Add support for zero-copy forwarding from an applet

Thanks to this patch, it is possible to an applet to directly send data to
the opposite endpoint. To do so, it must implement <fastfwd> appctx callback
function and set SE_FL_MAY_FASTFWD flag.

Everything will be handled by appctx_fastfwd() function. The applet is only
responsible to transfer data. If it sets <to_forward> value, it is used to
limit the amount of data to forward.
This commit is contained in:
Christopher Faulet 2024-01-23 07:56:34 +01:00
parent 62a81cb6a6
commit 39b6f5b04c
3 changed files with 109 additions and 1 deletions

View File

@ -51,6 +51,7 @@ void appctx_free(struct appctx *appctx);
size_t appctx_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags);
static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc)
{

View File

@ -23,6 +23,7 @@
#include <haproxy/stream.h>
#include <haproxy/task.h>
#include <haproxy/trace.h>
#include <haproxy/xref.h>
unsigned int nb_applets = 0;
@ -594,6 +595,75 @@ size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsig
return ret;
}
int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
{
struct appctx *appctx = __sc_appctx(sc);
struct xref *peer;
struct sedesc *sdo = NULL;
unsigned int len;
int ret = 0;
TRACE_ENTER(APPLET_EV_RECV, appctx);
/* TODO: outbuf must be empty. Find a better way to handle that but for now just return -1 */
if (b_data(&appctx->outbuf)) {
TRACE_STATE("Output buffer not empty, cannot fast-forward data", APPLET_EV_RECV, appctx);
return -1;
}
peer = xref_get_peer_and_lock(&appctx->sedesc->xref);
if (!peer) {
TRACE_STATE("Opposite endpoint not available yet", APPLET_EV_RECV, appctx);
goto end;
}
sdo = container_of(peer, struct sedesc, xref);
xref_unlock(&appctx->sedesc->xref, peer);
if (appctx->to_forward && count > appctx->to_forward)
count = appctx->to_forward;
len = se_nego_ff(sdo, &BUF_NULL, count, 0);
if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
sc_ep_clr(sc, SE_FL_MAY_FASTFWD);
TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", APPLET_EV_RECV, appctx);
goto end;
}
if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
sc_ep_set(sc, /* SE_FL_RCV_MORE | */SE_FL_WANT_ROOM);
TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
goto end;
}
b_add(sdo->iobuf.buf, sdo->iobuf.offset);
ret = appctx->applet->fastfwd(appctx, sdo->iobuf.buf, len, 0);
b_sub(sdo->iobuf.buf, sdo->iobuf.offset);
sdo->iobuf.data += ret;
if (applet_fl_test(appctx, APPCTX_FL_EOI)) {
se_fl_set(appctx->sedesc, SE_FL_EOI);
sdo->iobuf.flags |= IOBUF_FL_EOI; /* TODO: it may be good to have a flag to be sure we can
* forward the EOI the to consumer side
*/
TRACE_STATE("report EOI to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
}
if (applet_fl_test(appctx, APPCTX_FL_EOS)) {
se_fl_set(appctx->sedesc, SE_FL_EOS);
TRACE_STATE("report EOS to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
}
if (applet_fl_test(appctx, APPCTX_FL_ERROR)) {
se_fl_set(appctx->sedesc, SE_FL_ERROR);
TRACE_STATE("report ERROR to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
}
/* else */
/* applet_have_more_data(appctx); */
se_done_ff(sdo);
end:
TRACE_LEAVE(APPLET_EV_RECV, appctx);
return ret;
}
/* Default applet handler */
struct task *task_run_applet(struct task *t, void *context, unsigned int state)
{

View File

@ -1904,8 +1904,42 @@ int sc_applet_recv(struct stconn *sc)
channel_check_idletimer(ic);
/* TODO: Handle fastfwd here be implement callback function first ! */
/* First, let's see if we may fast-forward data from a side to the other
* one without using the channel buffer.
*/
if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) &&
sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) {
if (channel_data(ic)) {
/* We're embarrassed, there are already data pending in
* the buffer and we don't want to have them at two
* locations at a time. Let's indicate we need some
* place and ask the consumer to hurry.
*/
flags |= CO_RFL_BUF_FLUSH;
goto abort_fastfwd;
}
ret = appctx_fastfwd(sc, ic->to_forward, flags);
if (ret < 0)
goto abort_fastfwd;
else if (ret > 0) {
if (ic->to_forward != CHN_INFINITE_FORWARD)
ic->to_forward -= ret;
ic->total += ret;
cur_read += ret;
ic->flags |= CF_READ_EVENT;
}
if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR))
goto end_recv;
if (sc_ep_test(sc, SE_FL_WANT_ROOM))
sc_need_room(sc, -1);
if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward)
goto done_recv;
}
abort_fastfwd:
if (!sc_alloc_ibuf(sc, &appctx->buffer_wait))
goto end_recv;
@ -2108,6 +2142,9 @@ int sc_applet_send(struct stconn *sc)
if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
return 0;
/* TODO: Splicing is not supported, so it is not possible to have FF data stuck into the I/O buf */
BUG_ON(sc_ep_have_ff_data(sc));
if (co_data(oc)) {
ret = appctx->applet->snd_buf(sc, &oc->buf, co_data(oc), 0);
if (ret > 0) {