mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-01-31 18:41:39 +00:00
BUG/MEDIUM: stream-int: change the way buffer room is requested by a stream-int
Subsequent to the recent stream-int updates, we started to consider that SI_FL_WANT_PUT needs to be set when receipt is enabled, but this is wrong and results in 100% CPU when an HTTP client stays idle after a keep-alive request because the stream-int has nothing to provide and nothing to send. In fact just like for applets this flag should reflect the continuation of an attempt. So it's si_cs_recv() which should set the flag, and clear it if it has nothing more to provide. This function is called the first time in process_stream()), and called again during transfers, so it will always be up to date during stream_int_update() and stream_int_notify(). As a special case, it should also be set when a connection switches to the established state. And we should absolutely refrain from calling si_cs_recv() to re-enable reading, normally just setting this flag (from within the stream-int's handler or prior to calling si_chk_rcv()) is expected to be OK. A corner case remains where it was observed that in stream_int_notify() we can sometimes be called with an empty output channel with SI_FL_WAIT_ROOM and no CF_WRITE_PARTIAL, so there's no way to detect that we should re-enable receiving. It's easy to also take care of this condition there for the time it takes to figure if this situation is expected or not. Now it becomes more obvious that relying on a single flag to request room (or on two flags to arbiter activity) is not workable given the autonomy of both sides. The mux_h2 has taught us that blocking flags are much more reliable, require much less condition and are much easier to deal with. That's probably something to consider quickly in this area. No backport is needed.
This commit is contained in:
parent
c1b0645dac
commit
f26c26cca2
@ -673,6 +673,7 @@ static int sess_update_st_con_tcp(struct stream *s)
|
||||
/* OK, this means that a connection succeeded. The caller will be
|
||||
* responsible for handling the transition from CON to EST.
|
||||
*/
|
||||
si_want_put(si);
|
||||
si->state = SI_ST_EST;
|
||||
si->err_type = SI_ET_NONE;
|
||||
return 1;
|
||||
|
@ -475,12 +475,13 @@ void stream_int_notify(struct stream_interface *si)
|
||||
if (!(si->flags & SI_FL_INDEP_STR))
|
||||
if (tick_isset(ic->rex))
|
||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||
}
|
||||
|
||||
if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
|
||||
(si_opposite(si)->flags & SI_FL_WAIT_ROOM))) {
|
||||
si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM;
|
||||
si_chk_rcv(si_opposite(si));
|
||||
}
|
||||
if ((si_opposite(si)->flags & SI_FL_WAIT_ROOM) &&
|
||||
((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
|
||||
channel_is_empty(oc))) {
|
||||
si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM;
|
||||
si_chk_rcv(si_opposite(si));
|
||||
}
|
||||
|
||||
/* Notify the other side when we've injected data into the IC that
|
||||
@ -745,13 +746,10 @@ void stream_int_update(struct stream_interface *si)
|
||||
struct channel *oc = si_oc(si);
|
||||
|
||||
if (!(ic->flags & CF_SHUTR)) {
|
||||
if (!(ic->flags & CF_DONT_READ))
|
||||
si_want_put(si);
|
||||
|
||||
/* Read not closed, update FD status and timeout for reads */
|
||||
if ((ic->flags & CF_DONT_READ) || co_data(ic)) {
|
||||
/* stop reading */
|
||||
si_stop_put(si);
|
||||
/* stop reading, imposed by channel's policy or contents */
|
||||
si_cant_put(si);
|
||||
ic->rex = TICK_ETERNITY;
|
||||
}
|
||||
else {
|
||||
@ -846,14 +844,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
|
||||
}
|
||||
|
||||
/* it's time to try to receive */
|
||||
if (!(req->flags & (CF_SHUTR|CF_DONT_READ)))
|
||||
si_want_put(si_f);
|
||||
|
||||
si_chk_rcv(si_f);
|
||||
|
||||
if (!(res->flags & (CF_SHUTR|CF_DONT_READ)))
|
||||
si_want_put(si_b);
|
||||
|
||||
si_chk_rcv(si_b);
|
||||
|
||||
/* let's recompute both sides states */
|
||||
@ -1133,9 +1124,6 @@ int si_cs_recv(struct conn_stream *cs)
|
||||
if (si->wait_event.wait_reason & SUB_CAN_RECV)
|
||||
return 0;
|
||||
|
||||
/* by default nothing to deliver */
|
||||
si_stop_put(si);
|
||||
|
||||
/* maybe we were called immediately after an asynchronous shutr */
|
||||
if (ic->flags & CF_SHUTR)
|
||||
return 1;
|
||||
@ -1144,6 +1132,9 @@ int si_cs_recv(struct conn_stream *cs)
|
||||
if (cs->flags & CS_FL_EOS)
|
||||
goto out_shutdown_r;
|
||||
|
||||
/* start by claiming we'll want to receive and change our mind later if needed */
|
||||
si_want_put(si);
|
||||
|
||||
if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
|
||||
global.tune.idle_timer &&
|
||||
(unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
|
||||
@ -1235,8 +1226,10 @@ int si_cs_recv(struct conn_stream *cs)
|
||||
if (cs->flags & CS_FL_RCV_MORE)
|
||||
si_cant_put(si);
|
||||
|
||||
if (ret <= 0)
|
||||
if (ret <= 0) {
|
||||
si_stop_put(si);
|
||||
break;
|
||||
}
|
||||
|
||||
cur_read += ret;
|
||||
|
||||
@ -1254,8 +1247,11 @@ int si_cs_recv(struct conn_stream *cs)
|
||||
ic->flags |= CF_READ_PARTIAL;
|
||||
ic->total += ret;
|
||||
|
||||
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0)
|
||||
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
|
||||
/* we're stopped by the channel's policy */
|
||||
si_cant_put(si);
|
||||
break;
|
||||
}
|
||||
|
||||
/* if too many bytes were missing from last read, it means that
|
||||
* it's pointless trying to read again because the system does
|
||||
@ -1266,14 +1262,20 @@ int si_cs_recv(struct conn_stream *cs)
|
||||
* have exhausted system buffers. It's not worth trying
|
||||
* again.
|
||||
*/
|
||||
if (ic->flags & CF_STREAMER)
|
||||
if (ic->flags & CF_STREAMER) {
|
||||
/* we're stopped by the channel's policy */
|
||||
si_cant_put(si);
|
||||
break;
|
||||
}
|
||||
|
||||
/* if we read a large block smaller than what we requested,
|
||||
* it's almost certain we'll never get anything more.
|
||||
*/
|
||||
if (ret >= global.tune.recv_enough)
|
||||
if (ret >= global.tune.recv_enough) {
|
||||
/* we're stopped by the channel's policy */
|
||||
si_cant_put(si);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* if we are waiting for more space, don't try to read more data
|
||||
|
Loading…
Reference in New Issue
Block a user