MEDIUM: stconn: Add functions to handle applets I/O from the SC layer

There is no tasklet to handle I/O subscriptions for applets, but functions
to deal with receives and sends from the SC layer were added. it meanse a
function to retrieve data from an applet with this synchronous version and a
function to push data to an applet wit this synchronous version.

It is pretty similar to the functions used for muxes but there are some
differences. So for now, we keep them separated.

Zero-copy forwarding is not supported for now. In addition, there is no
subscription mechanism.
This commit is contained in:
Christopher Faulet 2024-01-11 10:10:06 +01:00
parent 525ec12305
commit f81b704d01
2 changed files with 310 additions and 0 deletions

View File

@ -40,6 +40,9 @@ struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state);
int sc_conn_sync_recv(struct stconn *sc);
void sc_conn_sync_send(struct stconn *sc);
int sc_applet_sync_recv(struct stconn *sc);
void sc_applet_sync_send(struct stconn *sc);
/* returns the channel which receives data from this stream connector (input channel) */
static inline struct channel *sc_ic(const struct stconn *sc)

View File

@ -1864,6 +1864,313 @@ static void sc_applet_eos(struct stconn *sc)
return sc_app_shut_applet(sc);
}
/*
* This is the callback which is called by the applet layer to receive data into
* the buffer from the appctx. It iterates over the applet's rcv_buf
* function. Please do not statify this function, it's often present in
* backtraces, it's useful to recognize it.
*/
int sc_applet_recv(struct stconn *sc)
{
struct appctx *appctx = __sc_appctx(sc);
struct channel *ic = sc_ic(sc);
int ret, max, cur_read = 0;
int read_poll = MAX_READ_POLL_LOOPS;
int flags = 0;
/* If another call to sc_applet_recv() failed, give up now.
*/
if (sc_waiting_room(sc))
return 0;
/* maybe we were called immediately after an asynchronous abort */
if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
return 1;
/* We must wait because the applet is not fully initialized */
if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
return 0;
/* stop immediately on errors. */
if (!sc_ep_test(sc, SE_FL_RCV_MORE)) {
// TODO: be sure SE_FL_RCV_MORE may be set for applet ?
if (sc_ep_test(sc, SE_FL_ERROR))
goto end_recv;
}
/* prepare to detect if the mux needs more room */
sc_ep_clr(sc, SE_FL_WANT_ROOM);
channel_check_idletimer(ic);
/* TODO: Handle fastfwd here be implement callback function first ! */
if (!sc_alloc_ibuf(sc, &appctx->buffer_wait))
goto end_recv;
/* For an HTX stream, if the buffer is stuck (no output data with some
* input data) and if the HTX message is fragmented or if its free space
* wraps, we force an HTX deframentation. It is a way to have a
* contiguous free space nad to let the mux to copy as much data as
* possible.
*
* NOTE: A possible optim may be to let the mux decides if defrag is
* required or not, depending on amount of data to be xferred.
*/
if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) {
struct htx *htx = htxbuf(&ic->buf);
if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
htx_defrag(htx, NULL, 0);
}
/* Compute transient CO_RFL_* flags */
if (co_data(ic)) {
flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK);
}
/* <max> may be null. This is the mux responsibility to set
* SE_FL_RCV_MORE on the SC if more space is needed.
*/
max = channel_recv_max(ic);
ret = appctx->applet->rcv_buf(sc, &ic->buf, max, flags);
if (sc_ep_test(sc, SE_FL_WANT_ROOM)) {
/* SE_FL_WANT_ROOM must not be reported if the channel's
* buffer is empty.
*/
BUG_ON(c_empty(ic));
sc_need_room(sc, channel_recv_max(ic) + 1);
/* Add READ_PARTIAL because some data are pending but
* cannot be xferred to the channel
*/
ic->flags |= CF_READ_EVENT;
sc_ep_report_read_activity(sc);
}
if (ret <= 0) {
/* if we refrained from reading because we asked for a flush to
* satisfy rcv_pipe(), report that there's not enough room here
* to proceed.
*/
if (flags & CO_RFL_BUF_FLUSH)
sc_need_room(sc, -1);
goto done_recv;
}
cur_read += ret;
/* if we're allowed to directly forward data, we must update ->o */
if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) {
unsigned long fwd = ret;
if (ic->to_forward != CHN_INFINITE_FORWARD) {
if (fwd > ic->to_forward)
fwd = ic->to_forward;
ic->to_forward -= fwd;
}
c_adv(ic, fwd);
}
ic->flags |= CF_READ_EVENT;
ic->total += ret;
/* End-of-input reached, we can leave. In this case, it is
* important to break the loop to not block the SC because of
* the channel's policies.This way, we are still able to receive
* shutdowns.
*/
if (sc_ep_test(sc, SE_FL_EOI))
goto done_recv;
if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) {
/* we don't expect to read more data */
sc_wont_read(sc);
goto done_recv;
}
/* if too many bytes were missing from last read, it means that
* it's pointless trying to read again because the system does
* not have them in buffers.
*/
if (ret < max) {
/* if a streamer has read few data, it may be because we
* have exhausted system buffers. It's not worth trying
* again.
*/
if (ic->flags & CF_STREAMER) {
/* we're stopped by the channel's policy */
sc_wont_read(sc);
goto done_recv;
}
/* if we read a large block smaller than what we requested,
* it's almost certain we'll never get anything more.
*/
if (ret >= global.tune.recv_enough) {
/* we're stopped by the channel's policy */
sc_wont_read(sc);
}
}
done_recv:
if (!cur_read)
se_have_no_more_data(sc->sedesc);
else {
channel_check_xfer(ic, cur_read);
sc_ep_report_read_activity(sc);
}
end_recv:
ret = (cur_read != 0);
/* Report EOI on the channel if it was reached from the mux point of
* view. */
if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
sc_ep_report_read_activity(sc);
sc->flags |= SC_FL_EOI;
ic->flags |= CF_READ_EVENT;
ret = 1;
}
if (sc_ep_test(sc, SE_FL_EOS)) {
/* we received a shutdown */
if (ic->flags & CF_AUTO_CLOSE)
sc_schedule_shutdown(sc_opposite(sc));
sc_applet_eos(sc);
ret = 1;
}
if (sc_ep_test(sc, SE_FL_ERROR)) {
sc->flags |= SC_FL_ERROR;
ret = 1;
}
else if (!cur_read &&
!(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) &&
!(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) {
se_have_no_more_data(sc->sedesc);
}
else {
se_have_more_data(sc->sedesc);
ret = 1;
}
return ret;
}
/* This tries to perform a synchronous receive on the stream connector to
* try to collect last arrived data. In practice it's only implemented on
* stconns. Returns 0 if nothing was done, non-zero if new data or a
* shutdown were collected. This may result on some delayed receive calls
* to be programmed and performed later, though it doesn't provide any
* such guarantee.
*/
int sc_applet_sync_recv(struct stconn *sc)
{
if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST))
return 0;
if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
return 0;
if (!sc_is_recv_allowed(sc))
return 0; // already failed
return sc_applet_recv(sc);
}
/*
* This function is called to send buffer data to an applet. It calls the
* applet's snd_buf function. Please do not statify this function, it's often
* present in backtraces, it's useful to recognize it.
*/
int sc_applet_send(struct stconn *sc)
{
struct appctx *appctx = __sc_appctx(sc);
struct stconn *sco = sc_opposite(sc);
struct channel *oc = sc_oc(sc);
size_t ret;
int did_send = 0;
if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
return 1;
}
if (sc_ep_test(sc, SE_FL_WONT_CONSUME))
return 0;
/* we might have been called just after an asynchronous shutw */
if (sc->flags & SC_FL_SHUT_DONE)
return 1;
/* We must wait because the applet is not fully initialized */
if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
return 0;
if (co_data(oc)) {
ret = appctx->applet->snd_buf(sc, &oc->buf, co_data(oc), 0);
if (ret > 0) {
did_send = 1;
c_rew(oc, ret);
c_realign_if_empty(oc);
if (!co_data(oc)) {
/* Always clear both flags once everything has been sent, they're one-shot */
sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE);
}
/* if some data remain in the buffer, it's only because the
* system buffers are full, we will try next time.
*/
}
}
if (did_send)
oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed)))
sc_have_room(sco);
if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
oc->flags |= CF_WRITE_EVENT;
BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
if (sc_ep_test(sc, SE_FL_ERROR))
sc->flags |= SC_FL_ERROR;
return 1;
}
if (!co_data(oc)) {
if (did_send)
sc_ep_report_send_activity(sc);
}
else {
sc_ep_report_blocked_send(sc, did_send);
}
return did_send;
}
void sc_applet_sync_send(struct stconn *sc)
{
struct channel *oc = sc_oc(sc);
oc->flags &= ~CF_WRITE_EVENT;
if (sc->flags & SC_FL_SHUT_DONE)
return;
if (!co_data(oc))
return;
if (!sc_state_in(sc->state, SC_SB_EST))
return;
if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
return;
sc_applet_send(sc);
}
/* Callback to be used by applet handlers upon completion. It updates the stream
* (which may or may not take this opportunity to try to forward data), then
* may re-enable the applet's based on the channels and stream connector's final