diff --git a/src/stream.c b/src/stream.c index 8e115e595..fe07d4de5 100644 --- a/src/stream.c +++ b/src/stream.c @@ -673,6 +673,7 @@ static int sess_update_st_con_tcp(struct stream *s) /* OK, this means that a connection succeeded. The caller will be * responsible for handling the transition from CON to EST. */ + si_want_put(si); si->state = SI_ST_EST; si->err_type = SI_ET_NONE; return 1; diff --git a/src/stream_interface.c b/src/stream_interface.c index 65d2c6120..1999589b8 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -475,12 +475,13 @@ void stream_int_notify(struct stream_interface *si) if (!(si->flags & SI_FL_INDEP_STR)) if (tick_isset(ic->rex)) ic->rex = tick_add_ifset(now_ms, ic->rto); + } - if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && - (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) { - si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM; - si_chk_rcv(si_opposite(si)); - } + if ((si_opposite(si)->flags & SI_FL_WAIT_ROOM) && + ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL || + channel_is_empty(oc))) { + si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM; + si_chk_rcv(si_opposite(si)); } /* Notify the other side when we've injected data into the IC that @@ -745,13 +746,10 @@ void stream_int_update(struct stream_interface *si) struct channel *oc = si_oc(si); if (!(ic->flags & CF_SHUTR)) { - if (!(ic->flags & CF_DONT_READ)) - si_want_put(si); - /* Read not closed, update FD status and timeout for reads */ if ((ic->flags & CF_DONT_READ) || co_data(ic)) { - /* stop reading */ - si_stop_put(si); + /* stop reading, imposed by channel's policy or contents */ + si_cant_put(si); ic->rex = TICK_ETERNITY; } else { @@ -846,14 +844,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b } /* it's time to try to receive */ - if (!(req->flags & (CF_SHUTR|CF_DONT_READ))) - si_want_put(si_f); - si_chk_rcv(si_f); - - if (!(res->flags & (CF_SHUTR|CF_DONT_READ))) - si_want_put(si_b); - si_chk_rcv(si_b); /* let's recompute both sides states */ @@ -1133,9 +1124,6 @@ int si_cs_recv(struct conn_stream *cs) if (si->wait_event.wait_reason & SUB_CAN_RECV) return 0; - /* by default nothing to deliver */ - si_stop_put(si); - /* maybe we were called immediately after an asynchronous shutr */ if (ic->flags & CF_SHUTR) return 1; @@ -1144,6 +1132,9 @@ int si_cs_recv(struct conn_stream *cs) if (cs->flags & CS_FL_EOS) goto out_shutdown_r; + /* start by claiming we'll want to receive and change our mind later if needed */ + si_want_put(si); + if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) && global.tune.idle_timer && (unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) { @@ -1235,8 +1226,10 @@ int si_cs_recv(struct conn_stream *cs) if (cs->flags & CS_FL_RCV_MORE) si_cant_put(si); - if (ret <= 0) + if (ret <= 0) { + si_stop_put(si); break; + } cur_read += ret; @@ -1254,8 +1247,11 @@ int si_cs_recv(struct conn_stream *cs) ic->flags |= CF_READ_PARTIAL; ic->total += ret; - if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) + if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) { + /* we're stopped by the channel's policy */ + si_cant_put(si); break; + } /* if too many bytes were missing from last read, it means that * it's pointless trying to read again because the system does @@ -1266,14 +1262,20 @@ int si_cs_recv(struct conn_stream *cs) * have exhausted system buffers. It's not worth trying * again. */ - if (ic->flags & CF_STREAMER) + if (ic->flags & CF_STREAMER) { + /* we're stopped by the channel's policy */ + si_cant_put(si); break; + } /* if we read a large block smaller than what we requested, * it's almost certain we'll never get anything more. */ - if (ret >= global.tune.recv_enough) + if (ret >= global.tune.recv_enough) { + /* we're stopped by the channel's policy */ + si_cant_put(si); break; + } } /* if we are waiting for more space, don't try to read more data