diff --git a/src/stream.c b/src/stream.c index 22861f8dc..f92a0085f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1843,7 +1843,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) si_sync_recv(si_f); si_sync_recv(si_b); -redo: rate = update_freq_ctr(&s->call_rate, 1); if (rate >= 100000 && s->call_rate.prev_ctr) { // make sure to wait at least a full second stream_dump_and_crash(&s->obj_type, read_freq_ctr(&s->call_rate)); @@ -1929,6 +1928,7 @@ redo: } } + resync_stream_interface: /* below we may emit error messages so we have to ensure that we have * our buffers properly allocated. */ @@ -2014,7 +2014,6 @@ redo: rp_cons_last = si_f->state; rp_prod_last = si_b->state; - resync_stream_interface: /* Check for connection closure */ DPRINTF(stderr, @@ -2385,40 +2384,6 @@ redo: /* reflect what the L7 analysers have seen last */ rqf_last = req->flags; - /* - * Now forward all shutdown requests between both sides of the buffer - */ - - /* first, let's check if the request buffer needs to shutdown(write), which may - * happen either because the input is closed or because we want to force a close - * once the server has begun to respond. If a half-closed timeout is set, we adjust - * the other side's timeout as well. - */ - if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CLOSE|CF_SHUTR)) == - (CF_AUTO_CLOSE|CF_SHUTR))) { - channel_shutw_now(req); - } - - /* shutdown(write) pending */ - if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW && - channel_is_empty(req))) { - if (req->flags & CF_READ_ERROR) - si_b->flags |= SI_FL_NOLINGER; - si_shutw(si_b); - } - - /* shutdown(write) done on server side, we must stop the client too */ - if (unlikely((req->flags & (CF_SHUTW|CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTW && - !req->analysers)) - channel_shutr_now(req); - - /* shutdown(read) pending */ - if (unlikely((req->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) { - if (si_f->flags & SI_FL_NOHALF) - si_f->flags |= SI_FL_NOLINGER; - si_shutr(si_f); - } - /* it's possible that an upper layer has requested a connection setup or abort. * There are 2 situations where we decide to establish a new connection : * - there are data scheduled for emission in the buffer @@ -2490,8 +2455,48 @@ redo: } while (si_b->state == SI_ST_ASS); } + /* Let's see if we can send the pending request now */ + si_sync_send(si_b); + + /* + * Now forward all shutdown requests between both sides of the request buffer + */ + + /* first, let's check if the request buffer needs to shutdown(write), which may + * happen either because the input is closed or because we want to force a close + * once the server has begun to respond. If a half-closed timeout is set, we adjust + * the other side's timeout as well. + */ + if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CLOSE|CF_SHUTR)) == + (CF_AUTO_CLOSE|CF_SHUTR))) { + channel_shutw_now(req); + } + + /* shutdown(write) pending */ + if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW && + channel_is_empty(req))) { + if (req->flags & CF_READ_ERROR) + si_b->flags |= SI_FL_NOLINGER; + si_shutw(si_b); + } + + /* shutdown(write) done on server side, we must stop the client too */ + if (unlikely((req->flags & (CF_SHUTW|CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTW && + !req->analysers)) + channel_shutr_now(req); + + /* shutdown(read) pending */ + if (unlikely((req->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) { + if (si_f->flags & SI_FL_NOHALF) + si_f->flags |= SI_FL_NOLINGER; + si_shutr(si_f); + } + /* Benchmarks have shown that it's optimal to do a full resync now */ - if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS) + if (si_f->state == SI_ST_DIS || + si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) || + (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) || + (si_b->flags & SI_FL_ERR && si_b->state != SI_ST_CLO)) goto resync_stream_interface; /* otherwise we want to check if we need to resync the req buffer or not */ @@ -2583,6 +2588,9 @@ redo: /* reflect what the L7 analysers have seen last */ rpf_last = res->flags; + /* Let's see if we can send the pending response now */ + si_sync_send(si_f); + /* * Now forward all shutdown requests between both sides of the buffer */ @@ -2615,7 +2623,10 @@ redo: si_shutr(si_b); } - if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS) + if (si_f->state == SI_ST_DIS || + si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) || + (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) || + (si_b->flags & SI_FL_ERR && si_b->state != SI_ST_CLO)) goto resync_stream_interface; if (req->flags != rqf_last) @@ -2624,6 +2635,9 @@ redo: if ((res->flags ^ rpf_last) & CF_MASK_STATIC) goto resync_response; + if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER) + goto resync_request; + /* we're interested in getting wakeups again */ si_f->flags &= ~SI_FL_DONT_WAKE; si_b->flags &= ~SI_FL_DONT_WAKE; @@ -2656,33 +2670,11 @@ redo: } if (likely((si_f->state != SI_ST_CLO) || !si_state_in(si_b->state, SI_SB_INI|SI_SB_CLO))) { - enum si_state si_b_prev_state, si_f_prev_state; - - si_f_prev_state = si_f->prev_state; - si_b_prev_state = si_b->prev_state; - if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED)) stream_process_counters(s); - /* take the exact same flags si_update_both() will have before - * trying to update again. - */ - rqf_last = req->flags & ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); - rpf_last = res->flags & ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); - si_update_both(si_f, si_b); - /* changes requiring immediate attention are processed right now */ - if (si_f->state == SI_ST_DIS || si_f->state != si_f_prev_state || - si_b->state == SI_ST_DIS || si_b->state != si_b_prev_state || - ((si_f->flags & SI_FL_ERR) && si_f->state != SI_ST_CLO) || - ((si_b->flags & SI_FL_ERR) && si_b->state != SI_ST_CLO)) - goto redo; - - /* I/O events (mostly CF_WRITE_PARTIAL) are aggregated with other I/Os */ - if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER) - task_wakeup(s->task, TASK_WOKEN_IO); - /* Trick: if a request is being waiting for the server to respond, * and if we know the server can timeout, we don't want the timeout * to expire on the client side first, but we're still interested diff --git a/src/stream_interface.c b/src/stream_interface.c index 3bb32488e..5a85b3bfd 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -918,11 +918,12 @@ void si_sync_send(struct stream_interface *si) si_rx_room_rdy(si_opposite(si)); } -/* updates both stream ints of a same stream at once */ /* Updates at once the channel flags, and timers of both stream interfaces of a * same stream, to complete the work after the analysers, then updates the data * layer below. This will ensure that any synchronous update performed at the * data layer will be reflected in the channel flags and/or stream-interface. + * Note that this does not change the stream interface's current state, though + * it updates the previous state to the current one. */ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b) { @@ -935,9 +936,6 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b si_f->prev_state = si_f->state; si_b->prev_state = si_b->state; - si_sync_send(si_f); - si_sync_send(si_b); - /* let's recompute both sides states */ if (si_state_in(si_f->state, SI_SB_RDY|SI_SB_EST)) si_update(si_f);