BUG/MEDIUM: stconn: Report send activity during mux-to-mux fast-forward

When data are directly forwarded from a mux to the opposite one, we must not
forget to report send activity when data are successfully sent or report a
blocked send with data are blocked. It is important because otherwise, if
the transfer is quite long, longer than the client or server timeout, an
error may be triggered because the write timeout is reached.

H1, H2 and PT muxes are concerned. To fix the issue, The done_fastword()
callback now returns the amount of data consummed. This way it is possible
to update/reset the FSB data accordingly.

No backport needed.
This commit is contained in:
Christopher Faulet 2023-10-31 13:43:21 +01:00
parent d7eaa0d553
commit 141b489291
5 changed files with 53 additions and 42 deletions

View File

@ -424,7 +424,7 @@ struct mux_ops {
size_t (*rcv_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
size_t (*snd_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to send data */
size_t (*nego_fastfwd)(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice); /* Callback to fill the SD iobuf */
void (*done_fastfwd)(struct stconn *sc); /* Callback to terminate fast data forwarding */
size_t (*done_fastfwd)(struct stconn *sc); /* Callback to terminate fast data forwarding */
int (*fastfwd)(struct stconn *sc, unsigned int count, unsigned int flags); /* Callback to init fast data forwarding */
int (*resume_fastfwd)(struct stconn *sc, unsigned int flags); /* Callback to resume fast data forwarding */
void (*shutr)(struct stconn *sc, enum co_shr_mode); /* shutr function */

View File

@ -132,41 +132,6 @@ static inline size_t se_ff_data(struct sedesc *se)
return (se->iobuf.data + (se->iobuf.pipe ? se->iobuf.pipe->data : 0));
}
static inline size_t se_nego_ff(struct sedesc *se, struct buffer *input, size_t count, unsigned int may_splice)
{
size_t ret = 0;
if (se_fl_test(se, SE_FL_T_MUX)) {
const struct mux_ops *mux = se->conn->mux;
se->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED;
if (mux->nego_fastfwd && mux->done_fastfwd) {
ret = mux->nego_fastfwd(se->sc, input, count, may_splice);
if ((se->iobuf.flags & IOBUF_FL_FF_BLOCKED) && !(se->sc->wait_event.events & SUB_RETRY_SEND)) {
/* The SC must be subs for send to be notify when some
* space is made
*/
mux->subscribe(se->sc, SUB_RETRY_SEND, &se->sc->wait_event);
}
goto end;
}
}
se->iobuf.flags |= IOBUF_FL_NO_FF;
end:
return ret;
}
static inline void se_done_ff(struct sedesc *se)
{
if (se_fl_test(se, SE_FL_T_MUX)) {
const struct mux_ops *mux = se->conn->mux;
BUG_ON(!mux->done_fastfwd);
mux->done_fastfwd(se->sc);
}
}
/* stream connector version */
static forceinline void sc_ep_zero(struct stconn *sc)
{
@ -544,4 +509,45 @@ static inline void se_need_more_data(struct sedesc *se)
se_fl_set(se, SE_FL_WAIT_DATA);
}
static inline size_t se_nego_ff(struct sedesc *se, struct buffer *input, size_t count, unsigned int may_splice)
{
size_t ret = 0;
if (se_fl_test(se, SE_FL_T_MUX)) {
const struct mux_ops *mux = se->conn->mux;
se->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED;
if (mux->nego_fastfwd && mux->done_fastfwd) {
ret = mux->nego_fastfwd(se->sc, input, count, may_splice);
if ((se->iobuf.flags & IOBUF_FL_FF_BLOCKED) && !(se->sc->wait_event.events & SUB_RETRY_SEND)) {
/* The SC must be subs for send to be notify when some
* space is made
*/
mux->subscribe(se->sc, SUB_RETRY_SEND, &se->sc->wait_event);
}
goto end;
}
}
se->iobuf.flags |= IOBUF_FL_NO_FF;
end:
return ret;
}
static inline void se_done_ff(struct sedesc *se)
{
if (se_fl_test(se, SE_FL_T_MUX)) {
const struct mux_ops *mux = se->conn->mux;
size_t sent, to_send = se_ff_data(se);
BUG_ON(!mux->done_fastfwd);
sent = mux->done_fastfwd(se->sc);
if (sent > 0)
sc_ep_report_send_activity(se->sc);
else if (to_send > 0) /* implies sent == 0 */
sc_ep_report_blocked_send(se->sc);
}
}
#endif /* _HAPROXY_STCONN_H */

View File

@ -4479,7 +4479,7 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count,
return ret;
}
static void h1_done_ff(struct stconn *sc)
static size_t h1_done_ff(struct stconn *sc)
{
struct h1s *h1s = __sc_mux_strm(sc);
struct h1c *h1c = h1s->h1c;
@ -4546,6 +4546,7 @@ static void h1_done_ff(struct stconn *sc)
}
TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){total});
return total;
}
static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)

View File

@ -7023,13 +7023,14 @@ static size_t h2_nego_ff(struct stconn *sc, struct buffer *input, size_t count,
return ret;
}
static void h2_done_ff(struct stconn *sc)
static size_t h2_done_ff(struct stconn *sc)
{
struct h2s *h2s = __sc_mux_strm(sc);
struct h2c *h2c = h2s->h2c;
struct sedesc *sd = h2s->sd;
struct buffer *mbuf;
char *head;
size_t total = 0;
TRACE_ENTER(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
@ -7067,10 +7068,11 @@ static void h2_done_ff(struct stconn *sc)
/* Perform a synchronous send but in all cases, consider
* everything was already sent from the SC point of view.
*/
h2_set_frame_size(head, sd->iobuf.data);
total = sd->iobuf.data;
h2_set_frame_size(head, total);
b_add(mbuf, 9);
h2s->sws -= sd->iobuf.data;
h2c->mws -= sd->iobuf.data;
h2s->sws -= total;
h2c->mws -= total;
if (h2_send(h2s->h2c))
tasklet_wakeup(h2s->h2c->wait_event.tasklet);
@ -7080,6 +7082,7 @@ static void h2_done_ff(struct stconn *sc)
sd->iobuf.data = 0;
TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
return total;
}
static int h2_resume_ff(struct stconn *sc, unsigned int flags)

View File

@ -612,7 +612,7 @@ static size_t mux_pt_nego_ff(struct stconn *sc, struct buffer *input, size_t cou
return ret;
}
static void mux_pt_done_ff(struct stconn *sc)
static size_t mux_pt_done_ff(struct stconn *sc)
{
struct connection *conn = __sc_conn(sc);
struct mux_pt_ctx *ctx = conn->ctx;
@ -641,6 +641,7 @@ static void mux_pt_done_ff(struct stconn *sc)
}
TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total});
return total;
}
static int mux_pt_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)