diff --git a/include/haproxy/connection-t.h b/include/haproxy/connection-t.h index cf1a4cb6a..3c0693883 100644 --- a/include/haproxy/connection-t.h +++ b/include/haproxy/connection-t.h @@ -424,7 +424,7 @@ struct mux_ops { size_t (*rcv_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */ size_t (*snd_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to send data */ size_t (*nego_fastfwd)(struct stconn *sc, struct buffer *input, size_t count, unsigned int may_splice); /* Callback to fill the SD iobuf */ - void (*done_fastfwd)(struct stconn *sc); /* Callback to terminate fast data forwarding */ + size_t (*done_fastfwd)(struct stconn *sc); /* Callback to terminate fast data forwarding */ int (*fastfwd)(struct stconn *sc, unsigned int count, unsigned int flags); /* Callback to init fast data forwarding */ int (*resume_fastfwd)(struct stconn *sc, unsigned int flags); /* Callback to resume fast data forwarding */ void (*shutr)(struct stconn *sc, enum co_shr_mode); /* shutr function */ diff --git a/include/haproxy/stconn.h b/include/haproxy/stconn.h index 71751f697..4ba6aca20 100644 --- a/include/haproxy/stconn.h +++ b/include/haproxy/stconn.h @@ -132,41 +132,6 @@ static inline size_t se_ff_data(struct sedesc *se) return (se->iobuf.data + (se->iobuf.pipe ? se->iobuf.pipe->data : 0)); } -static inline size_t se_nego_ff(struct sedesc *se, struct buffer *input, size_t count, unsigned int may_splice) -{ - size_t ret = 0; - - if (se_fl_test(se, SE_FL_T_MUX)) { - const struct mux_ops *mux = se->conn->mux; - - se->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED; - if (mux->nego_fastfwd && mux->done_fastfwd) { - ret = mux->nego_fastfwd(se->sc, input, count, may_splice); - if ((se->iobuf.flags & IOBUF_FL_FF_BLOCKED) && !(se->sc->wait_event.events & SUB_RETRY_SEND)) { - /* The SC must be subs for send to be notify when some - * space is made - */ - mux->subscribe(se->sc, SUB_RETRY_SEND, &se->sc->wait_event); - } - goto end; - } - } - se->iobuf.flags |= IOBUF_FL_NO_FF; - - end: - return ret; -} - -static inline void se_done_ff(struct sedesc *se) -{ - if (se_fl_test(se, SE_FL_T_MUX)) { - const struct mux_ops *mux = se->conn->mux; - - BUG_ON(!mux->done_fastfwd); - mux->done_fastfwd(se->sc); - } -} - /* stream connector version */ static forceinline void sc_ep_zero(struct stconn *sc) { @@ -544,4 +509,45 @@ static inline void se_need_more_data(struct sedesc *se) se_fl_set(se, SE_FL_WAIT_DATA); } + +static inline size_t se_nego_ff(struct sedesc *se, struct buffer *input, size_t count, unsigned int may_splice) +{ + size_t ret = 0; + + if (se_fl_test(se, SE_FL_T_MUX)) { + const struct mux_ops *mux = se->conn->mux; + + se->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED; + if (mux->nego_fastfwd && mux->done_fastfwd) { + ret = mux->nego_fastfwd(se->sc, input, count, may_splice); + if ((se->iobuf.flags & IOBUF_FL_FF_BLOCKED) && !(se->sc->wait_event.events & SUB_RETRY_SEND)) { + /* The SC must be subs for send to be notify when some + * space is made + */ + mux->subscribe(se->sc, SUB_RETRY_SEND, &se->sc->wait_event); + } + goto end; + } + } + se->iobuf.flags |= IOBUF_FL_NO_FF; + + end: + return ret; +} + +static inline void se_done_ff(struct sedesc *se) +{ + if (se_fl_test(se, SE_FL_T_MUX)) { + const struct mux_ops *mux = se->conn->mux; + size_t sent, to_send = se_ff_data(se); + + BUG_ON(!mux->done_fastfwd); + sent = mux->done_fastfwd(se->sc); + if (sent > 0) + sc_ep_report_send_activity(se->sc); + else if (to_send > 0) /* implies sent == 0 */ + sc_ep_report_blocked_send(se->sc); + } +} + #endif /* _HAPROXY_STCONN_H */ diff --git a/src/mux_h1.c b/src/mux_h1.c index 7dc424edb..4477c5a16 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -4479,7 +4479,7 @@ static size_t h1_nego_ff(struct stconn *sc, struct buffer *input, size_t count, return ret; } -static void h1_done_ff(struct stconn *sc) +static size_t h1_done_ff(struct stconn *sc) { struct h1s *h1s = __sc_mux_strm(sc); struct h1c *h1c = h1s->h1c; @@ -4546,6 +4546,7 @@ static void h1_done_ff(struct stconn *sc) } TRACE_LEAVE(H1_EV_STRM_RECV, h1c->conn, h1s, 0, (size_t[]){total}); + return total; } static int h1_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) diff --git a/src/mux_h2.c b/src/mux_h2.c index 95003cfb3..98b796b81 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -7023,13 +7023,14 @@ static size_t h2_nego_ff(struct stconn *sc, struct buffer *input, size_t count, return ret; } -static void h2_done_ff(struct stconn *sc) +static size_t h2_done_ff(struct stconn *sc) { struct h2s *h2s = __sc_mux_strm(sc); struct h2c *h2c = h2s->h2c; struct sedesc *sd = h2s->sd; struct buffer *mbuf; char *head; + size_t total = 0; TRACE_ENTER(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s); @@ -7067,10 +7068,11 @@ static void h2_done_ff(struct stconn *sc) /* Perform a synchronous send but in all cases, consider * everything was already sent from the SC point of view. */ - h2_set_frame_size(head, sd->iobuf.data); + total = sd->iobuf.data; + h2_set_frame_size(head, total); b_add(mbuf, 9); - h2s->sws -= sd->iobuf.data; - h2c->mws -= sd->iobuf.data; + h2s->sws -= total; + h2c->mws -= total; if (h2_send(h2s->h2c)) tasklet_wakeup(h2s->h2c->wait_event.tasklet); @@ -7080,6 +7082,7 @@ static void h2_done_ff(struct stconn *sc) sd->iobuf.data = 0; TRACE_LEAVE(H2_EV_H2S_SEND|H2_EV_STRM_SEND, h2s->h2c->conn, h2s); + return total; } static int h2_resume_ff(struct stconn *sc, unsigned int flags) diff --git a/src/mux_pt.c b/src/mux_pt.c index 52a5527d9..802aa4c8a 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -612,7 +612,7 @@ static size_t mux_pt_nego_ff(struct stconn *sc, struct buffer *input, size_t cou return ret; } -static void mux_pt_done_ff(struct stconn *sc) +static size_t mux_pt_done_ff(struct stconn *sc) { struct connection *conn = __sc_conn(sc); struct mux_pt_ctx *ctx = conn->ctx; @@ -641,6 +641,7 @@ static void mux_pt_done_ff(struct stconn *sc) } TRACE_LEAVE(PT_EV_TX_DATA, conn, sc, 0, (size_t[]){total}); + return total; } static int mux_pt_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)