mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-10 19:21:37 +00:00
MEDIUM: connections: Don't directly mess with the polling from the upper layers.
Avoid using conn_xprt_want_send/recv, and totally nuke cs_want_send/recv, from the upper layers. The polling is now directly handled by the connection layer, it is activated on subscribe(), and unactivated once we got the event and we woke the related task.
This commit is contained in:
parent
81a15af6bc
commit
53216e7db9
@ -291,17 +291,6 @@ static inline void conn_cond_update_polling(struct connection *c)
|
||||
}
|
||||
}
|
||||
|
||||
/* recompute the mux polling flags after updating the current conn_stream and
|
||||
* propagate the result down the transport layer.
|
||||
*/
|
||||
static inline void cs_update_mux_polling(struct conn_stream *cs)
|
||||
{
|
||||
struct connection *conn = cs->conn;
|
||||
|
||||
if (conn->mux && conn->mux->update_poll)
|
||||
conn->mux->update_poll(cs);
|
||||
}
|
||||
|
||||
/***** Event manipulation primitives for use by DATA I/O callbacks *****/
|
||||
/* The __conn_* versions do not propagate to lower layers and are only meant
|
||||
* to be used by handlers called by the connection handler. The other ones
|
||||
@ -317,28 +306,6 @@ static inline void __conn_xprt_stop_recv(struct connection *c)
|
||||
c->flags &= ~CO_FL_XPRT_RD_ENA;
|
||||
}
|
||||
|
||||
static inline void __cs_want_recv(struct conn_stream *cs)
|
||||
{
|
||||
cs->flags |= CS_FL_DATA_RD_ENA;
|
||||
}
|
||||
|
||||
static inline void __cs_stop_recv(struct conn_stream *cs)
|
||||
{
|
||||
cs->flags &= ~CS_FL_DATA_RD_ENA;
|
||||
}
|
||||
|
||||
static inline void cs_want_recv(struct conn_stream *cs)
|
||||
{
|
||||
__cs_want_recv(cs);
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
static inline void cs_stop_recv(struct conn_stream *cs)
|
||||
{
|
||||
__cs_stop_recv(cs);
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
/* this one is used only to stop speculative recv(). It doesn't stop it if the
|
||||
* fd is already polled in order to avoid expensive polling status changes.
|
||||
* Since it might require the upper layer to re-enable reading, we'll return 1
|
||||
@ -368,40 +335,6 @@ static inline void __conn_xprt_stop_both(struct connection *c)
|
||||
c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA);
|
||||
}
|
||||
|
||||
static inline void __cs_want_send(struct conn_stream *cs)
|
||||
{
|
||||
cs->flags |= CS_FL_DATA_WR_ENA;
|
||||
}
|
||||
|
||||
static inline void __cs_stop_send(struct conn_stream *cs)
|
||||
{
|
||||
cs->flags &= ~CS_FL_DATA_WR_ENA;
|
||||
}
|
||||
|
||||
static inline void cs_stop_send(struct conn_stream *cs)
|
||||
{
|
||||
__cs_stop_send(cs);
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
static inline void cs_want_send(struct conn_stream *cs)
|
||||
{
|
||||
__cs_want_send(cs);
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
static inline void __cs_stop_both(struct conn_stream *cs)
|
||||
{
|
||||
cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA);
|
||||
}
|
||||
|
||||
static inline void cs_stop_both(struct conn_stream *cs)
|
||||
{
|
||||
__cs_stop_both(cs);
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
|
||||
static inline void conn_xprt_want_recv(struct connection *c)
|
||||
{
|
||||
__conn_xprt_want_recv(c);
|
||||
@ -546,7 +479,6 @@ static inline void conn_xprt_shutw_hard(struct connection *c)
|
||||
/* shut read */
|
||||
static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||
{
|
||||
__cs_stop_recv(cs);
|
||||
|
||||
/* clean data-layer shutdown */
|
||||
if (cs->conn->mux && cs->conn->mux->shutr)
|
||||
@ -557,7 +489,6 @@ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
|
||||
/* shut write */
|
||||
static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
|
||||
{
|
||||
__cs_stop_send(cs);
|
||||
|
||||
/* clean data-layer shutdown */
|
||||
if (cs->conn->mux && cs->conn->mux->shutw)
|
||||
|
@ -199,7 +199,6 @@ static inline void si_idle_cs(struct stream_interface *si, struct list *pool)
|
||||
LIST_ADD(pool, &conn->list);
|
||||
|
||||
cs_attach(cs, si, &si_idle_conn_cb);
|
||||
cs_want_recv(cs);
|
||||
}
|
||||
|
||||
/* Attach conn_stream <cs> to the stream interface <si>. The stream interface
|
||||
@ -349,13 +348,15 @@ static inline void si_update(struct stream_interface *si)
|
||||
/* Calls chk_rcv on the connection using the data layer */
|
||||
static inline void si_chk_rcv(struct stream_interface *si)
|
||||
{
|
||||
si->ops->chk_rcv(si);
|
||||
if (si->ops->chk_rcv)
|
||||
si->ops->chk_rcv(si);
|
||||
}
|
||||
|
||||
/* Calls chk_snd on the connection using the data layer */
|
||||
static inline void si_chk_snd(struct stream_interface *si)
|
||||
{
|
||||
si->ops->chk_snd(si);
|
||||
if (si->ops->chk_snd)
|
||||
si->ops->chk_snd(si);
|
||||
}
|
||||
|
||||
/* Calls chk_snd on the connection using the ctrl layer */
|
||||
@ -378,10 +379,6 @@ static inline int si_connect(struct stream_interface *si)
|
||||
}
|
||||
else {
|
||||
/* reuse the existing connection */
|
||||
if (!channel_is_empty(si_oc(si))) {
|
||||
/* we'll have to send a request there. */
|
||||
cs_want_send(cs);
|
||||
}
|
||||
|
||||
/* the connection is established */
|
||||
si->state = SI_ST_EST;
|
||||
|
@ -66,9 +66,6 @@ union conn_handle {
|
||||
/* conn_stream flags */
|
||||
enum {
|
||||
CS_FL_NONE = 0x00000000, /* Just for initialization purposes */
|
||||
CS_FL_DATA_RD_ENA = 0x00000001, /* receiving data is allowed */
|
||||
CS_FL_DATA_WR_ENA = 0x00000002, /* sending data is desired */
|
||||
|
||||
CS_FL_SHRD = 0x00000010, /* read shut, draining extra data */
|
||||
CS_FL_SHRR = 0x00000020, /* read shut, resetting extra data */
|
||||
CS_FL_SHR = CS_FL_SHRD | CS_FL_SHRR, /* read shut status */
|
||||
@ -315,7 +312,6 @@ struct xprt_ops {
|
||||
struct mux_ops {
|
||||
int (*init)(struct connection *conn, struct proxy *prx); /* early initialization */
|
||||
int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */
|
||||
void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */
|
||||
size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
|
||||
size_t (*snd_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to send data */
|
||||
int (*rcv_pipe)(struct conn_stream *cs, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */
|
||||
|
36
src/checks.c
36
src/checks.c
@ -745,7 +745,6 @@ static void __event_srv_chk_w(struct conn_stream *cs)
|
||||
|
||||
if (retrieve_errno_from_socket(conn)) {
|
||||
chk_report_conn_err(check, errno, 0);
|
||||
__cs_stop_both(cs);
|
||||
goto out_wakeup;
|
||||
}
|
||||
|
||||
@ -771,7 +770,6 @@ static void __event_srv_chk_w(struct conn_stream *cs)
|
||||
b_realign_if_empty(&check->bo);
|
||||
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
|
||||
chk_report_conn_err(check, errno, 0);
|
||||
__cs_stop_both(cs);
|
||||
goto out_wakeup;
|
||||
}
|
||||
if (b_data(&check->bo)) {
|
||||
@ -785,12 +783,10 @@ static void __event_srv_chk_w(struct conn_stream *cs)
|
||||
t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check);
|
||||
task_queue(t);
|
||||
}
|
||||
goto out_nowake;
|
||||
goto out;
|
||||
|
||||
out_wakeup:
|
||||
task_wakeup(t, TASK_WOKEN_IO);
|
||||
out_nowake:
|
||||
__cs_stop_send(cs); /* nothing more to write */
|
||||
out:
|
||||
return;
|
||||
}
|
||||
@ -1373,7 +1369,6 @@ static void __event_srv_chk_r(struct conn_stream *cs)
|
||||
* range quickly. To avoid sending RSTs all the time, we first try to
|
||||
* drain pending data.
|
||||
*/
|
||||
__cs_stop_both(cs);
|
||||
cs_shutw(cs, CS_SHW_NORMAL);
|
||||
|
||||
/* OK, let's not stay here forever */
|
||||
@ -1385,7 +1380,6 @@ out:
|
||||
return;
|
||||
|
||||
wait_more_data:
|
||||
__cs_want_recv(cs);
|
||||
cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &check->wait_list);
|
||||
goto out;
|
||||
}
|
||||
@ -1420,10 +1414,9 @@ static int wake_srv_chk(struct conn_stream *cs)
|
||||
* we expect errno to still be valid.
|
||||
*/
|
||||
chk_report_conn_err(check, errno, 0);
|
||||
__cs_stop_both(cs);
|
||||
task_wakeup(check->task, TASK_WOKEN_IO);
|
||||
}
|
||||
else if (!(conn->flags & CO_FL_HANDSHAKE) && !(cs->flags & (CS_FL_DATA_RD_ENA|CS_FL_DATA_WR_ENA))) {
|
||||
else if (!(conn->flags & CO_FL_HANDSHAKE) && !check->type) {
|
||||
/* we may get here if only a connection probe was required : we
|
||||
* don't have any data to send nor anything expected in response,
|
||||
* so the completion of the connection establishment is enough.
|
||||
@ -1624,8 +1617,6 @@ static int connect_conn_chk(struct task *t)
|
||||
if (proto && proto->connect)
|
||||
ret = proto->connect(conn, check->type, quickack ? 2 : 0);
|
||||
|
||||
if (check->type)
|
||||
cs_want_send(cs);
|
||||
|
||||
#ifdef USE_OPENSSL
|
||||
if (s->check.sni)
|
||||
@ -2180,10 +2171,8 @@ static struct task *process_chk_conn(struct task *t, void *context, unsigned sho
|
||||
t->expire = tick_first(t->expire, t_con);
|
||||
}
|
||||
|
||||
if (check->type) {
|
||||
cs_want_recv(cs); /* prepare for reading a possible reply */
|
||||
if (check->type)
|
||||
__event_srv_chk_r(cs);
|
||||
}
|
||||
|
||||
task_set_affinity(t, tid_bit);
|
||||
goto reschedule;
|
||||
@ -2683,10 +2672,6 @@ static int tcpcheck_main(struct check *check)
|
||||
t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check);
|
||||
}
|
||||
|
||||
/* It's only the rules which will enable send/recv */
|
||||
if (cs)
|
||||
cs_stop_both(cs);
|
||||
|
||||
while (1) {
|
||||
/* We have to try to flush the output buffer before reading, at
|
||||
* the end, or if we're about to send a string that does not fit
|
||||
@ -2699,14 +2684,12 @@ static int tcpcheck_main(struct check *check)
|
||||
check->current_step->string_len >= b_room(&check->bo))) {
|
||||
int ret;
|
||||
|
||||
__cs_want_send(cs);
|
||||
ret = cs->conn->mux->snd_buf(cs, &check->bo, b_data(&check->bo), 0);
|
||||
b_realign_if_empty(&check->bo);
|
||||
|
||||
if (ret <= 0) {
|
||||
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
|
||||
chk_report_conn_err(check, errno, 0);
|
||||
__cs_stop_both(cs);
|
||||
goto out_end_tcpcheck;
|
||||
}
|
||||
break;
|
||||
@ -2924,7 +2907,6 @@ static int tcpcheck_main(struct check *check)
|
||||
if (unlikely(check->result == CHK_RES_FAILED))
|
||||
goto out_end_tcpcheck;
|
||||
|
||||
__cs_want_recv(cs);
|
||||
if (cs->conn->mux->rcv_buf(cs, &check->bi, b_size(&check->bi), 0) <= 0) {
|
||||
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) {
|
||||
done = 1;
|
||||
@ -3026,7 +3008,6 @@ static int tcpcheck_main(struct check *check)
|
||||
|
||||
if (check->current_step->action == TCPCHK_ACT_EXPECT)
|
||||
goto tcpcheck_expect;
|
||||
__cs_stop_recv(cs);
|
||||
}
|
||||
}
|
||||
else {
|
||||
@ -3046,7 +3027,6 @@ static int tcpcheck_main(struct check *check)
|
||||
|
||||
if (check->current_step->action == TCPCHK_ACT_EXPECT)
|
||||
goto tcpcheck_expect;
|
||||
__cs_stop_recv(cs);
|
||||
}
|
||||
/* not matched but was supposed to => ERROR */
|
||||
else {
|
||||
@ -3098,15 +3078,9 @@ static int tcpcheck_main(struct check *check)
|
||||
goto out_end_tcpcheck;
|
||||
}
|
||||
|
||||
/* warning, current_step may now point to the head */
|
||||
if (b_data(&check->bo))
|
||||
__cs_want_send(cs);
|
||||
|
||||
if (&check->current_step->list != head &&
|
||||
check->current_step->action == TCPCHK_ACT_EXPECT) {
|
||||
__cs_want_recv(cs);
|
||||
check->current_step->action == TCPCHK_ACT_EXPECT)
|
||||
__event_srv_chk_r(cs);
|
||||
}
|
||||
goto out;
|
||||
|
||||
out_end_tcpcheck:
|
||||
@ -3120,8 +3094,6 @@ static int tcpcheck_main(struct check *check)
|
||||
if (check->result == CHK_RES_FAILED)
|
||||
conn->flags |= CO_FL_ERROR;
|
||||
|
||||
__cs_stop_both(cs);
|
||||
|
||||
out:
|
||||
return retcode;
|
||||
}
|
||||
|
@ -134,6 +134,7 @@ void conn_fd_handler(int fd)
|
||||
conn->send_wait = NULL;
|
||||
} else
|
||||
io_available = 1;
|
||||
__conn_xprt_stop_send(conn);
|
||||
}
|
||||
|
||||
/* The data transfer starts here and stops on error and handshakes. Note
|
||||
@ -153,6 +154,7 @@ void conn_fd_handler(int fd)
|
||||
conn->recv_wait = NULL;
|
||||
} else
|
||||
io_available = 1;
|
||||
__conn_xprt_stop_recv(conn);
|
||||
}
|
||||
|
||||
/* It may happen during the data phase that a handshake is
|
||||
@ -341,6 +343,7 @@ int conn_unsubscribe(struct connection *conn, int event_type, void *param)
|
||||
conn->recv_wait = NULL;
|
||||
sw->wait_reason &= ~SUB_CAN_RECV;
|
||||
}
|
||||
__conn_xprt_stop_recv(conn);
|
||||
}
|
||||
if (event_type & SUB_CAN_SEND) {
|
||||
sw = param;
|
||||
@ -348,7 +351,9 @@ int conn_unsubscribe(struct connection *conn, int event_type, void *param)
|
||||
conn->send_wait = NULL;
|
||||
sw->wait_reason &= ~SUB_CAN_SEND;
|
||||
}
|
||||
__conn_xprt_stop_send(conn);
|
||||
}
|
||||
conn_update_xprt_polling(conn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -363,6 +368,7 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
|
||||
conn->recv_wait = sw;
|
||||
}
|
||||
event_type &= ~SUB_CAN_RECV;
|
||||
__conn_xprt_want_recv(conn);
|
||||
}
|
||||
if (event_type & SUB_CAN_SEND) {
|
||||
sw = param;
|
||||
@ -371,9 +377,11 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
|
||||
conn->send_wait = sw;
|
||||
}
|
||||
event_type &= ~SUB_CAN_SEND;
|
||||
__conn_xprt_want_send(conn);
|
||||
}
|
||||
if (event_type != 0)
|
||||
return (-1);
|
||||
conn_update_xprt_polling(conn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -603,6 +611,7 @@ int conn_recv_proxy(struct connection *conn, int flag)
|
||||
}
|
||||
line++;
|
||||
}
|
||||
__conn_xprt_stop_recv(conn);
|
||||
|
||||
if (!dst_s || !sport_s || !dport_s)
|
||||
goto bad_header;
|
||||
|
98
src/mux_h2.c
98
src/mux_h2.c
@ -286,24 +286,18 @@ static int h2_buf_available(void *target)
|
||||
|
||||
if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc_margin(&h2c->dbuf, 0)) {
|
||||
h2c->flags &= ~H2_CF_DEM_DALLOC;
|
||||
if (h2_recv_allowed(h2c)) {
|
||||
conn_xprt_want_recv(h2c->conn);
|
||||
if (h2_recv_allowed(h2c))
|
||||
tasklet_wakeup(h2c->wait_event.task);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
if ((h2c->flags & H2_CF_MUX_MALLOC) && b_alloc_margin(&h2c->mbuf, 0)) {
|
||||
h2c->flags &= ~H2_CF_MUX_MALLOC;
|
||||
if (!(h2c->flags & H2_CF_MUX_BLOCK_ANY))
|
||||
conn_xprt_want_send(h2c->conn);
|
||||
|
||||
if (h2c->flags & H2_CF_DEM_MROOM) {
|
||||
h2c->flags &= ~H2_CF_DEM_MROOM;
|
||||
if (h2_recv_allowed(h2c)) {
|
||||
conn_xprt_want_recv(h2c->conn);
|
||||
if (h2_recv_allowed(h2c))
|
||||
tasklet_wakeup(h2c->wait_event.task);
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
@ -312,10 +306,8 @@ static int h2_buf_available(void *target)
|
||||
(h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s->cs &&
|
||||
b_alloc_margin(&h2s->rxbuf, 0)) {
|
||||
h2c->flags &= ~H2_CF_DEM_SALLOC;
|
||||
if (h2_recv_allowed(h2c)) {
|
||||
conn_xprt_want_recv(h2c->conn);
|
||||
if (h2_recv_allowed(h2c))
|
||||
tasklet_wakeup(h2c->wait_event.task);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -427,7 +419,6 @@ static int h2_init(struct connection *conn, struct proxy *prx)
|
||||
task_queue(t);
|
||||
|
||||
/* prepare to read something */
|
||||
conn_xprt_want_recv(conn);
|
||||
tasklet_wakeup(h2c->wait_event.task);
|
||||
return 0;
|
||||
fail:
|
||||
@ -2255,10 +2246,8 @@ static int h2_recv(struct h2c *h2c)
|
||||
ret = 0;
|
||||
} while (ret > 0);
|
||||
|
||||
if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size)) {
|
||||
conn_xprt_want_recv(conn);
|
||||
if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size))
|
||||
conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_event);
|
||||
}
|
||||
|
||||
if (!b_data(buf)) {
|
||||
h2_release_buf(h2c, &h2c->dbuf);
|
||||
@ -2445,25 +2434,13 @@ static int h2_process(struct h2c *h2c)
|
||||
if (!b_data(&h2c->dbuf))
|
||||
h2_release_buf(h2c, &h2c->dbuf);
|
||||
|
||||
/* stop being notified of incoming data if we can't process them */
|
||||
if (!h2_recv_allowed(h2c))
|
||||
__conn_xprt_stop_recv(conn);
|
||||
else
|
||||
__conn_xprt_want_recv(conn);
|
||||
|
||||
/* adjust output polling */
|
||||
if (!(conn->flags & CO_FL_SOCK_WR_SH) &&
|
||||
h2c->st0 != H2_CS_ERROR2 && !(h2c->flags & H2_CF_GOAWAY_FAILED) &&
|
||||
(h2c->st0 == H2_CS_ERROR ||
|
||||
b_data(&h2c->mbuf) ||
|
||||
(h2c->mws > 0 && !LIST_ISEMPTY(&h2c->fctl_list)) ||
|
||||
(!(h2c->flags & H2_CF_MUX_BLOCK_ANY) && !LIST_ISEMPTY(&h2c->send_list)))) {
|
||||
__conn_xprt_want_send(conn);
|
||||
}
|
||||
else {
|
||||
if ((conn->flags & CO_FL_SOCK_WR_SH) ||
|
||||
h2c->st0 == H2_CS_ERROR2 || (h2c->flags & H2_CF_GOAWAY_FAILED) ||
|
||||
(h2c->st0 != H2_CS_ERROR &&
|
||||
!b_data(&h2c->mbuf) &&
|
||||
(h2c->mws <= 0 || LIST_ISEMPTY(&h2c->fctl_list)) &&
|
||||
((h2c->flags & H2_CF_MUX_BLOCK_ANY) || LIST_ISEMPTY(&h2c->send_list))))
|
||||
h2_release_buf(h2c, &h2c->mbuf);
|
||||
__conn_xprt_stop_send(conn);
|
||||
}
|
||||
|
||||
if (h2c->task) {
|
||||
if (eb_is_empty(&h2c->streams_by_id) || b_data(&h2c->mbuf)) {
|
||||
@ -2553,48 +2530,6 @@ static struct conn_stream *h2_attach(struct connection *conn)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* callback used to update the mux's polling flags after changing a cs' status.
|
||||
* The caller (cs_update_mux_polling) will take care of propagating any changes
|
||||
* to the transport layer.
|
||||
*/
|
||||
static void h2_update_poll(struct conn_stream *cs)
|
||||
{
|
||||
struct h2s *h2s = cs->ctx;
|
||||
|
||||
if (!h2s)
|
||||
return;
|
||||
|
||||
/* we may unblock a blocked read */
|
||||
|
||||
if (cs->flags & CS_FL_DATA_RD_ENA) {
|
||||
/* the stream indicates it's willing to read */
|
||||
h2s->h2c->flags &= ~H2_CF_DEM_SFULL;
|
||||
if (h2s->h2c->dsi == h2s->id) {
|
||||
conn_xprt_want_recv(cs->conn);
|
||||
tasklet_wakeup(h2s->h2c->wait_event.task);
|
||||
conn_xprt_want_send(cs->conn);
|
||||
}
|
||||
}
|
||||
|
||||
/* Note: the stream and stream-int code doesn't allow us to perform a
|
||||
* synchronous send() here unfortunately, because this code is called
|
||||
* as si_update() from the process_stream() context. This means that
|
||||
* we have to queue the current cs and defer its processing after the
|
||||
* connection's cs list is processed anyway.
|
||||
*/
|
||||
|
||||
if (cs->flags & CS_FL_DATA_WR_ENA) {
|
||||
if (!b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_SOCK_WR_SH))
|
||||
conn_xprt_want_send(cs->conn);
|
||||
tasklet_wakeup(h2s->h2c->wait_event.task);
|
||||
}
|
||||
/* We don't support unsubscribing from here, it shouldn't be a problem */
|
||||
|
||||
/* this can happen from within si_chk_snd() */
|
||||
if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA))
|
||||
conn_xprt_want_send(cs->conn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Detach the stream from the connection and possibly release the connection.
|
||||
*/
|
||||
@ -2613,11 +2548,8 @@ static void h2_detach(struct conn_stream *cs)
|
||||
if (h2c->flags & H2_CF_DEM_TOOMANY &&
|
||||
!h2_has_too_many_cs(h2c)) {
|
||||
h2c->flags &= ~H2_CF_DEM_TOOMANY;
|
||||
if (h2_recv_allowed(h2c)) {
|
||||
__conn_xprt_want_recv(h2c->conn);
|
||||
if (h2_recv_allowed(h2c))
|
||||
tasklet_wakeup(h2c->wait_event.task);
|
||||
conn_xprt_want_send(h2c->conn);
|
||||
}
|
||||
}
|
||||
|
||||
/* this stream may be blocked waiting for some data to leave (possibly
|
||||
@ -2635,9 +2567,7 @@ static void h2_detach(struct conn_stream *cs)
|
||||
*/
|
||||
h2c->flags &= ~H2_CF_DEM_BLOCK_ANY;
|
||||
h2c->flags &= ~H2_CF_MUX_BLOCK_ANY;
|
||||
conn_xprt_want_recv(cs->conn);
|
||||
tasklet_wakeup(h2c->wait_event.task);
|
||||
conn_xprt_want_send(cs->conn);
|
||||
}
|
||||
|
||||
h2s_destroy(h2s);
|
||||
@ -2688,8 +2618,6 @@ static void h2_do_shutr(struct h2s *h2s)
|
||||
!(h2s->h2c->flags & (H2_CF_GOAWAY_SENT|H2_CF_GOAWAY_FAILED)) &&
|
||||
h2c_send_goaway_error(h2c, h2s) <= 0)
|
||||
return;
|
||||
if (b_data(&h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA))
|
||||
conn_xprt_want_send(h2c->conn);
|
||||
|
||||
h2s_close(h2s);
|
||||
|
||||
@ -2747,8 +2675,6 @@ static void h2_do_shutw(struct h2s *h2s)
|
||||
h2s_close(h2s);
|
||||
}
|
||||
|
||||
if (b_data(&h2s->h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA))
|
||||
conn_xprt_want_send(h2c->conn);
|
||||
|
||||
add_to_list:
|
||||
if (LIST_ISEMPTY(&h2s->list)) {
|
||||
@ -3690,7 +3616,6 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
|
||||
|
||||
b_del(buf, total);
|
||||
if (total > 0) {
|
||||
conn_xprt_want_send(h2s->h2c->conn);
|
||||
if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND))
|
||||
tasklet_wakeup(h2s->h2c->wait_event.task);
|
||||
}
|
||||
@ -3795,7 +3720,6 @@ static int h2_parse_max_concurrent_streams(char **args, int section_type, struct
|
||||
const struct mux_ops h2_ops = {
|
||||
.init = h2_init,
|
||||
.wake = h2_wake,
|
||||
.update_poll = h2_update_poll,
|
||||
.snd_buf = h2_snd_buf,
|
||||
.rcv_buf = h2_rcv_buf,
|
||||
.subscribe = h2_subscribe,
|
||||
|
23
src/mux_pt.c
23
src/mux_pt.c
@ -60,31 +60,9 @@ static int mux_pt_wake(struct connection *conn)
|
||||
if ((conn->flags & (CO_FL_EARLY_DATA | CO_FL_EARLY_SSL_HS | CO_FL_HANDSHAKE)) ==
|
||||
CO_FL_EARLY_DATA)
|
||||
conn->flags &= ~CO_FL_EARLY_DATA;
|
||||
if (ret >= 0)
|
||||
cs_update_mux_polling(cs);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* callback used to update the mux's polling flags after changing a cs' status.
|
||||
* The caller (cs_mux_update_poll) will take care of propagating any changes to
|
||||
* the transport layer.
|
||||
*/
|
||||
static void mux_pt_update_poll(struct conn_stream *cs)
|
||||
{
|
||||
struct connection *conn = cs->conn;
|
||||
int flags = 0;
|
||||
|
||||
conn_refresh_polling_flags(conn);
|
||||
|
||||
if (cs->flags & CS_FL_DATA_RD_ENA)
|
||||
flags |= CO_FL_XPRT_RD_ENA;
|
||||
if (cs->flags & CS_FL_DATA_WR_ENA)
|
||||
flags |= CO_FL_XPRT_WR_ENA;
|
||||
|
||||
conn->flags = (conn->flags & ~(CO_FL_XPRT_RD_ENA | CO_FL_XPRT_WR_ENA)) | flags;
|
||||
conn_cond_update_xprt_polling(conn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Attach a new stream to a connection
|
||||
* (Used for outgoing connections)
|
||||
@ -191,7 +169,6 @@ static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
|
||||
const struct mux_ops mux_pt_ops = {
|
||||
.init = mux_pt_init,
|
||||
.wake = mux_pt_wake,
|
||||
.update_poll = mux_pt_update_poll,
|
||||
.rcv_buf = mux_pt_rcv_buf,
|
||||
.snd_buf = mux_pt_snd_buf,
|
||||
.subscribe = mux_pt_subscribe,
|
||||
|
34
src/stream.c
34
src/stream.c
@ -268,9 +268,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
||||
goto out_fail_accept;
|
||||
|
||||
/* finish initialization of the accepted file descriptor */
|
||||
if (cs)
|
||||
cs_want_recv(cs);
|
||||
else if (appctx)
|
||||
if (appctx)
|
||||
si_applet_want_get(&s->si[0]);
|
||||
|
||||
if (sess->fe->accept && sess->fe->accept(s) < 0)
|
||||
@ -1679,6 +1677,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
||||
si_cs_send(cs);
|
||||
si_cs_recv(cs);
|
||||
}
|
||||
redo:
|
||||
|
||||
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
|
||||
// si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
|
||||
@ -2444,6 +2443,19 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
||||
if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
|
||||
stream_process_counters(s);
|
||||
|
||||
cs = objt_cs(si_f->end);
|
||||
ret = 0;
|
||||
if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
|
||||
!(cs->flags & CS_FL_ERROR) && !(si_oc(si_f)->flags & CF_SHUTW))
|
||||
ret = si_cs_send(cs);
|
||||
cs = objt_cs(si_b->end);
|
||||
if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
|
||||
!(cs->flags & CS_FL_ERROR) && !(si_oc(si_b)->flags & CF_SHUTW))
|
||||
ret |= si_cs_send(cs);
|
||||
|
||||
if (ret)
|
||||
goto redo;
|
||||
|
||||
if (si_f->state == SI_ST_EST)
|
||||
si_update(si_f);
|
||||
|
||||
@ -2510,22 +2522,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
|
||||
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
|
||||
stream_release_buffers(s);
|
||||
/* We may have free'd some space in buffers, or have more to send/recv, try again */
|
||||
cs = objt_cs(si_f->end);
|
||||
ret = 0;
|
||||
if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
|
||||
ret |= si_cs_send(cs);
|
||||
si_cs_recv(cs);
|
||||
ret |= (si_ic(si_f)->flags & CF_READ_PARTIAL) | (cs->conn->flags & CO_FL_ERROR);
|
||||
}
|
||||
cs = objt_cs(si_b->end);
|
||||
if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
|
||||
ret |= si_cs_send(cs);
|
||||
si_cs_recv(cs);
|
||||
ret |= (si_ic(si_b)->flags & CF_READ_PARTIAL) | (cs->conn->flags & CO_FL_ERROR);
|
||||
|
||||
}
|
||||
if (ret)
|
||||
task_wakeup(t, TASK_WOKEN_IO);
|
||||
return t; /* nothing more to do */
|
||||
}
|
||||
|
||||
|
@ -66,7 +66,6 @@ struct si_ops si_embedded_ops = {
|
||||
|
||||
/* stream-interface operations for connections */
|
||||
struct si_ops si_conn_ops = {
|
||||
.update = stream_int_update_conn,
|
||||
.chk_rcv = stream_int_chk_rcv_conn,
|
||||
.chk_snd = stream_int_chk_snd_conn,
|
||||
.shutr = stream_int_shutr_conn,
|
||||
@ -258,6 +257,7 @@ static void stream_int_chk_rcv(struct stream_interface *si)
|
||||
else {
|
||||
/* (re)start reading */
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
tasklet_wakeup(si->wait_event.task);
|
||||
if (!(si->flags & SI_FL_DONT_WAKE))
|
||||
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||
}
|
||||
@ -518,8 +518,10 @@ void stream_int_notify(struct stream_interface *si)
|
||||
/* check if the consumer has freed some space either in the
|
||||
* buffer or in the pipe.
|
||||
*/
|
||||
if (channel_may_recv(ic) && new_len < last_len)
|
||||
if (channel_may_recv(ic) && new_len < last_len) {
|
||||
tasklet_wakeup(si->wait_event.task);
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
}
|
||||
}
|
||||
|
||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||
@ -566,6 +568,7 @@ static int si_cs_process(struct conn_stream *cs)
|
||||
struct stream_interface *si = cs->data;
|
||||
struct channel *ic = si_ic(si);
|
||||
struct channel *oc = si_oc(si);
|
||||
int wait_room = si->flags & SI_FL_WAIT_ROOM;
|
||||
|
||||
/* If we have data to send, try it now */
|
||||
if (!channel_is_empty(oc) && objt_cs(si->end))
|
||||
@ -600,20 +603,9 @@ static int si_cs_process(struct conn_stream *cs)
|
||||
stream_int_notify(si);
|
||||
channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
|
||||
|
||||
/* Third step : update the connection's polling status based on what
|
||||
* was done above (eg: maybe some buffers got emptied).
|
||||
*/
|
||||
if (channel_is_empty(oc))
|
||||
__cs_stop_send(cs);
|
||||
|
||||
|
||||
if (si->flags & SI_FL_WAIT_ROOM) {
|
||||
__cs_stop_recv(cs);
|
||||
}
|
||||
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
|
||||
channel_may_recv(ic)) {
|
||||
__cs_want_recv(cs);
|
||||
}
|
||||
/* Try to recv() again if we free'd some room in the process */
|
||||
if (wait_room && !(si->flags & SI_FL_WAIT_ROOM))
|
||||
si_cs_recv(cs);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -720,10 +712,8 @@ int si_cs_send(struct conn_stream *cs)
|
||||
}
|
||||
}
|
||||
/* We couldn't send all of our data, let the mux know we'd like to send more */
|
||||
if (co_data(oc)) {
|
||||
cs_want_send(cs);
|
||||
if (co_data(oc))
|
||||
conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_event);
|
||||
}
|
||||
return did_send;
|
||||
}
|
||||
|
||||
@ -776,6 +766,7 @@ void stream_int_update(struct stream_interface *si)
|
||||
* have updated it if there has been a completed I/O.
|
||||
*/
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
tasklet_wakeup(si->wait_event.task);
|
||||
if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
|
||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||
}
|
||||
@ -814,37 +805,6 @@ void stream_int_update(struct stream_interface *si)
|
||||
}
|
||||
}
|
||||
|
||||
/* Updates the polling status of a connection outside of the connection handler
|
||||
* based on the channel's flags and the stream interface's flags. It needs to be
|
||||
* called once after the channels' flags have settled down and the stream has
|
||||
* been updated. It is not designed to be called from within the connection
|
||||
* handler itself.
|
||||
*/
|
||||
void stream_int_update_conn(struct stream_interface *si)
|
||||
{
|
||||
struct channel *ic = si_ic(si);
|
||||
struct channel *oc = si_oc(si);
|
||||
struct conn_stream *cs = __objt_cs(si->end);
|
||||
|
||||
if (!(ic->flags & CF_SHUTR)) {
|
||||
/* Read not closed */
|
||||
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
|
||||
__cs_stop_recv(cs);
|
||||
else
|
||||
__cs_want_recv(cs);
|
||||
}
|
||||
|
||||
if (!(oc->flags & CF_SHUTW)) {
|
||||
/* Write not closed */
|
||||
if (channel_is_empty(oc))
|
||||
__cs_stop_send(cs);
|
||||
else
|
||||
__cs_want_send(cs);
|
||||
}
|
||||
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
/*
|
||||
* This function performs a shutdown-read on a stream interface attached to
|
||||
* a connection in a connected or init state (it does nothing for other
|
||||
@ -858,7 +818,6 @@ void stream_int_update_conn(struct stream_interface *si)
|
||||
static void stream_int_shutr_conn(struct stream_interface *si)
|
||||
{
|
||||
struct conn_stream *cs = __objt_cs(si->end);
|
||||
struct connection *conn = cs->conn;
|
||||
struct channel *ic = si_ic(si);
|
||||
|
||||
ic->flags &= ~CF_SHUTR_NOW;
|
||||
@ -880,10 +839,6 @@ static void stream_int_shutr_conn(struct stream_interface *si)
|
||||
/* we want to immediately forward this close to the write side */
|
||||
return stream_int_shutw_conn(si);
|
||||
}
|
||||
else if (conn->ctrl) {
|
||||
/* we want the caller to disable polling on this FD */
|
||||
cs_stop_recv(cs);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -980,24 +935,23 @@ static void stream_int_shutw_conn(struct stream_interface *si)
|
||||
static void stream_int_chk_rcv_conn(struct stream_interface *si)
|
||||
{
|
||||
struct channel *ic = si_ic(si);
|
||||
struct conn_stream *cs = __objt_cs(si->end);
|
||||
|
||||
if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
|
||||
return;
|
||||
|
||||
if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
|
||||
/* stop reading */
|
||||
if (!(ic->flags & CF_DONT_READ)) /* full */ {
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
}
|
||||
__cs_stop_recv(cs);
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
}
|
||||
else {
|
||||
struct conn_stream *cs = objt_cs(si->end);
|
||||
/* (re)start reading */
|
||||
si->flags &= ~SI_FL_WAIT_ROOM;
|
||||
__cs_want_recv(cs);
|
||||
if (cs) {
|
||||
si_cs_recv(cs);
|
||||
tasklet_wakeup(si->wait_event.task);
|
||||
}
|
||||
}
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
|
||||
@ -1024,20 +978,10 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
||||
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
|
||||
return;
|
||||
|
||||
if (cs->flags & CS_FL_DATA_WR_ENA) {
|
||||
/* already subscribed to write notifications, will be called
|
||||
* anyway, so let's avoid calling it especially if the reader
|
||||
* is not ready.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
__cs_want_send(cs);
|
||||
|
||||
si_cs_send(cs);
|
||||
tasklet_wakeup(si->wait_event.task);
|
||||
if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
|
||||
/* Write error on the file descriptor */
|
||||
__cs_stop_both(cs);
|
||||
si->flags |= SI_FL_ERR;
|
||||
goto out_wakeup;
|
||||
}
|
||||
@ -1051,7 +995,6 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
||||
* ->o limit was reached. Maybe we just wrote the last
|
||||
* chunk and need to close.
|
||||
*/
|
||||
__cs_stop_send(cs);
|
||||
if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
|
||||
(CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
|
||||
(si->state == SI_ST_EST)) {
|
||||
@ -1067,7 +1010,6 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
||||
/* Otherwise there are remaining data to be sent in the buffer,
|
||||
* which means we have to poll before doing so.
|
||||
*/
|
||||
__cs_want_send(cs);
|
||||
si->flags &= ~SI_FL_WAIT_DATA;
|
||||
if (!tick_isset(oc->wex))
|
||||
oc->wex = tick_add_ifset(now_ms, oc->wto);
|
||||
@ -1105,9 +1047,6 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
||||
if (!(si->flags & SI_FL_DONT_WAKE))
|
||||
task_wakeup(si_task(si), TASK_WOKEN_IO);
|
||||
}
|
||||
|
||||
/* commit possible polling changes */
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1208,7 +1147,6 @@ int si_cs_recv(struct conn_stream *cs)
|
||||
* could soon be full. Let's stop before needing to poll.
|
||||
*/
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
__cs_stop_recv(cs);
|
||||
}
|
||||
|
||||
/* splice not possible (anymore), let's go on on standard copy */
|
||||
@ -1274,7 +1212,6 @@ int si_cs_recv(struct conn_stream *cs)
|
||||
* This was changed to accomodate with the mux code,
|
||||
* but we may have lost a worthwhile optimization.
|
||||
*/
|
||||
__cs_stop_recv(cs);
|
||||
si->flags |= SI_FL_WAIT_ROOM;
|
||||
break;
|
||||
}
|
||||
@ -1347,9 +1284,10 @@ int si_cs_recv(struct conn_stream *cs)
|
||||
goto out_shutdown_r;
|
||||
|
||||
/* Subscribe to receive events */
|
||||
conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
|
||||
if (!(si->flags & SI_FL_WAIT_ROOM))
|
||||
conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
|
||||
|
||||
return cur_read != 0;
|
||||
return (cur_read != 0 || (si->flags & SI_FL_WAIT_ROOM));
|
||||
|
||||
out_shutdown_r:
|
||||
/* we received a shutdown */
|
||||
@ -1392,7 +1330,6 @@ void stream_sock_read0(struct stream_interface *si)
|
||||
}
|
||||
|
||||
/* otherwise that's just a normal read shutdown */
|
||||
__cs_stop_recv(cs);
|
||||
return;
|
||||
|
||||
do_close:
|
||||
|
Loading…
Reference in New Issue
Block a user