MEDIUM: stream-int: make use of si_rx_chan_{rdy,blk} to control the stream-int from the channel

The channel can disable reading from the stream-interface using various
methods, such as :
  - CF_DONT_READ
  - !channel_may_recv()
  - and possibly others

Till now this was done by mangling SI_FL_RX_WAIT_EP which is not
appropriate at all since it's not the stream interface which decides
whether it wants to deliver data or not. Some places were also wrongly
relying on SI_FL_RXBLK_ROOM since it was the only other alternative,
but it's not suitable for CF_DONT_READ.

Let's use the SI_FL_RXBLK_CHAN flag for this instead. It will properly
prevent the stream interface from being woken up and reads from
subscribing to more receipt without being accidently removed. It is
automatically reset if CF_DONT_READ is not set in stream_int_notify().

The code is not trivial because it splits the logic between everything
related to buffer contents (channel_is_empty(), CF_WRITE_PARTIAL, etc)
and buffer policy (CF_DONT_READ). Also it now needs to decide timeouts
based on any blocking flag and not just SI_FL_RXBLK_ROOM anymore.

It looks like this patch has caused a minor performance degradation on
connection rate, which possibly deserves being investigated deeper as
the test conditions are uncertain (e.g. slightly more subscribe calls?).
This commit is contained in:
Willy Tarreau 2018-11-14 17:10:36 +01:00
parent 47baeb85d4
commit b26a6f9708
2 changed files with 39 additions and 17 deletions

View File

@ -278,6 +278,18 @@ static inline void si_rx_endp_done(struct stream_interface *si)
si->flags |= SI_FL_RX_WAIT_EP;
}
/* Tell a stream interface the input channel is OK with it sending it some data */
static inline void si_rx_chan_rdy(struct stream_interface *si)
{
si->flags &= ~SI_FL_RXBLK_CHAN;
}
/* Tell a stream interface the input channel is not OK with it sending it some data */
static inline void si_rx_chan_blk(struct stream_interface *si)
{
si->flags |= SI_FL_RXBLK_CHAN;
}
/* The stream interface just got the input buffer it was waiting for */
static inline void si_rx_buff_rdy(struct stream_interface *si)
{

View File

@ -481,10 +481,13 @@ void stream_int_notify(struct stream_interface *si)
}
if ((sio->flags & SI_FL_RXBLK_ROOM) &&
((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
channel_is_empty(oc))) {
((oc->flags & CF_WRITE_PARTIAL) || channel_is_empty(oc)))
sio->flags &= ~SI_FL_RXBLK_ROOM;
}
if (oc->flags & CF_DONT_READ)
si_rx_chan_blk(sio);
else
si_rx_chan_rdy(sio);
/* Notify the other side when we've injected data into the IC that
* needs to be forwarded. We can do fast-forwarding as soon as there
@ -519,13 +522,16 @@ void stream_int_notify(struct stream_interface *si)
si->flags &= ~SI_FL_RXBLK_ROOM;
}
if (!(ic->flags & CF_DONT_READ))
si_rx_chan_rdy(si);
si_chk_rcv(si);
si_chk_rcv(sio);
if (si->flags & SI_FL_RXBLK_ROOM) {
if (si_rx_blocked(si)) {
ic->rex = TICK_ETERNITY;
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL) {
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
/* we must re-enable reading if si_chk_snd() has freed some space */
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
@ -725,11 +731,8 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
if (!(si->wait_event.wait_reason & SUB_CAN_SEND) && !channel_is_empty(si_oc(si)))
ret = si_cs_send(cs);
if (!(si->wait_event.wait_reason & SUB_CAN_RECV)) {
if (!(si->wait_event.wait_reason & SUB_CAN_RECV))
ret |= si_cs_recv(cs);
if (!(si_ic(si)->flags & (CF_SHUTR|CF_DONT_READ)))
si->flags &= ~SI_FL_RX_WAIT_EP;
}
if (ret != 0)
si_cs_process(cs);
@ -751,10 +754,14 @@ void stream_int_update(struct stream_interface *si)
if (!(ic->flags & CF_SHUTR)) {
/* Read not closed, update FD status and timeout for reads */
if ((ic->flags & CF_DONT_READ) || !channel_is_empty(ic)) {
if (ic->flags & CF_DONT_READ)
si_rx_chan_blk(si);
else
si_rx_chan_rdy(si);
if (!channel_is_empty(ic)) {
/* stop reading, imposed by channel's policy or contents */
si_cant_put(si);
ic->rex = TICK_ETERNITY;
}
else {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
@ -763,9 +770,12 @@ void stream_int_update(struct stream_interface *si)
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_RXBLK_ROOM;
if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
ic->rex = TICK_ETERNITY;
else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
si_chk_rcv(si);
}
else
@ -1254,7 +1264,7 @@ int si_cs_recv(struct conn_stream *cs)
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
/* we're stopped by the channel's policy */
si_cant_put(si);
si_rx_chan_blk(si);
break;
}
@ -1269,7 +1279,7 @@ int si_cs_recv(struct conn_stream *cs)
*/
if (ic->flags & CF_STREAMER) {
/* we're stopped by the channel's policy */
si_cant_put(si);
si_rx_chan_blk(si);
break;
}
@ -1278,7 +1288,7 @@ int si_cs_recv(struct conn_stream *cs)
*/
if (ret >= global.tune.recv_enough) {
/* we're stopped by the channel's policy */
si_cant_put(si);
si_rx_chan_blk(si);
break;
}
}
@ -1286,7 +1296,7 @@ int si_cs_recv(struct conn_stream *cs)
/* if we are waiting for more space, don't try to read more data
* right now.
*/
if (si->flags & SI_FL_RXBLK_ROOM)
if (si_rx_blocked(si))
break;
} /* while !flags */