From 829bd4710f69ac1d4cce76ac35c56e004ffef26d Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Thu, 6 Jun 2019 09:17:23 +0200 Subject: [PATCH] MEDIUM: stream: rearrange the events to remove the loop The "goto redo" at the end of process_stream() to make the states converge is still a big source of problems and mostly stems from the very late call to the send() functions, whose results need to be considered, while it's being done in si_update_both() when leaving. This patch extracts the si_sync_send() calls from si_update_both(), and places them at the relevant places in process_stream(), which are just after the amount of data to forward is updated and before the shutw() calls (which were also moved). The stream-interface resynchronization needs to go slightly upper to take into account the transition from CON to RDY that will happen consecutive to some successful send(), and that's all. By doing so we can now get rid of this loop and have si_update_both() called only to update the stream interface and channel when leaving the function, as it was initially designed to work. It is worth noting that a number of the remaining conditions to perform a goto resync_XXX still seem suboptimal and would benefit from being refined to perform les resynchronization. But what matters at this stage is that the code remains valid and efficient. --- src/stream.c | 112 +++++++++++++++++++---------------------- src/stream_interface.c | 6 +-- 2 files changed, 54 insertions(+), 64 deletions(-) diff --git a/src/stream.c b/src/stream.c index 22861f8dc..f92a0085f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1843,7 +1843,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) si_sync_recv(si_f); si_sync_recv(si_b); -redo: rate = update_freq_ctr(&s->call_rate, 1); if (rate >= 100000 && s->call_rate.prev_ctr) { // make sure to wait at least a full second stream_dump_and_crash(&s->obj_type, read_freq_ctr(&s->call_rate)); @@ -1929,6 +1928,7 @@ redo: } } + resync_stream_interface: /* below we may emit error messages so we have to ensure that we have * our buffers properly allocated. */ @@ -2014,7 +2014,6 @@ redo: rp_cons_last = si_f->state; rp_prod_last = si_b->state; - resync_stream_interface: /* Check for connection closure */ DPRINTF(stderr, @@ -2385,40 +2384,6 @@ redo: /* reflect what the L7 analysers have seen last */ rqf_last = req->flags; - /* - * Now forward all shutdown requests between both sides of the buffer - */ - - /* first, let's check if the request buffer needs to shutdown(write), which may - * happen either because the input is closed or because we want to force a close - * once the server has begun to respond. If a half-closed timeout is set, we adjust - * the other side's timeout as well. - */ - if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CLOSE|CF_SHUTR)) == - (CF_AUTO_CLOSE|CF_SHUTR))) { - channel_shutw_now(req); - } - - /* shutdown(write) pending */ - if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW && - channel_is_empty(req))) { - if (req->flags & CF_READ_ERROR) - si_b->flags |= SI_FL_NOLINGER; - si_shutw(si_b); - } - - /* shutdown(write) done on server side, we must stop the client too */ - if (unlikely((req->flags & (CF_SHUTW|CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTW && - !req->analysers)) - channel_shutr_now(req); - - /* shutdown(read) pending */ - if (unlikely((req->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) { - if (si_f->flags & SI_FL_NOHALF) - si_f->flags |= SI_FL_NOLINGER; - si_shutr(si_f); - } - /* it's possible that an upper layer has requested a connection setup or abort. * There are 2 situations where we decide to establish a new connection : * - there are data scheduled for emission in the buffer @@ -2490,8 +2455,48 @@ redo: } while (si_b->state == SI_ST_ASS); } + /* Let's see if we can send the pending request now */ + si_sync_send(si_b); + + /* + * Now forward all shutdown requests between both sides of the request buffer + */ + + /* first, let's check if the request buffer needs to shutdown(write), which may + * happen either because the input is closed or because we want to force a close + * once the server has begun to respond. If a half-closed timeout is set, we adjust + * the other side's timeout as well. + */ + if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CLOSE|CF_SHUTR)) == + (CF_AUTO_CLOSE|CF_SHUTR))) { + channel_shutw_now(req); + } + + /* shutdown(write) pending */ + if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW && + channel_is_empty(req))) { + if (req->flags & CF_READ_ERROR) + si_b->flags |= SI_FL_NOLINGER; + si_shutw(si_b); + } + + /* shutdown(write) done on server side, we must stop the client too */ + if (unlikely((req->flags & (CF_SHUTW|CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTW && + !req->analysers)) + channel_shutr_now(req); + + /* shutdown(read) pending */ + if (unlikely((req->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) { + if (si_f->flags & SI_FL_NOHALF) + si_f->flags |= SI_FL_NOLINGER; + si_shutr(si_f); + } + /* Benchmarks have shown that it's optimal to do a full resync now */ - if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS) + if (si_f->state == SI_ST_DIS || + si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) || + (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) || + (si_b->flags & SI_FL_ERR && si_b->state != SI_ST_CLO)) goto resync_stream_interface; /* otherwise we want to check if we need to resync the req buffer or not */ @@ -2583,6 +2588,9 @@ redo: /* reflect what the L7 analysers have seen last */ rpf_last = res->flags; + /* Let's see if we can send the pending response now */ + si_sync_send(si_f); + /* * Now forward all shutdown requests between both sides of the buffer */ @@ -2615,7 +2623,10 @@ redo: si_shutr(si_b); } - if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS) + if (si_f->state == SI_ST_DIS || + si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) || + (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) || + (si_b->flags & SI_FL_ERR && si_b->state != SI_ST_CLO)) goto resync_stream_interface; if (req->flags != rqf_last) @@ -2624,6 +2635,9 @@ redo: if ((res->flags ^ rpf_last) & CF_MASK_STATIC) goto resync_response; + if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER) + goto resync_request; + /* we're interested in getting wakeups again */ si_f->flags &= ~SI_FL_DONT_WAKE; si_b->flags &= ~SI_FL_DONT_WAKE; @@ -2656,33 +2670,11 @@ redo: } if (likely((si_f->state != SI_ST_CLO) || !si_state_in(si_b->state, SI_SB_INI|SI_SB_CLO))) { - enum si_state si_b_prev_state, si_f_prev_state; - - si_f_prev_state = si_f->prev_state; - si_b_prev_state = si_b->prev_state; - if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED)) stream_process_counters(s); - /* take the exact same flags si_update_both() will have before - * trying to update again. - */ - rqf_last = req->flags & ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); - rpf_last = res->flags & ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); - si_update_both(si_f, si_b); - /* changes requiring immediate attention are processed right now */ - if (si_f->state == SI_ST_DIS || si_f->state != si_f_prev_state || - si_b->state == SI_ST_DIS || si_b->state != si_b_prev_state || - ((si_f->flags & SI_FL_ERR) && si_f->state != SI_ST_CLO) || - ((si_b->flags & SI_FL_ERR) && si_b->state != SI_ST_CLO)) - goto redo; - - /* I/O events (mostly CF_WRITE_PARTIAL) are aggregated with other I/Os */ - if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER) - task_wakeup(s->task, TASK_WOKEN_IO); - /* Trick: if a request is being waiting for the server to respond, * and if we know the server can timeout, we don't want the timeout * to expire on the client side first, but we're still interested diff --git a/src/stream_interface.c b/src/stream_interface.c index 3bb32488e..5a85b3bfd 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -918,11 +918,12 @@ void si_sync_send(struct stream_interface *si) si_rx_room_rdy(si_opposite(si)); } -/* updates both stream ints of a same stream at once */ /* Updates at once the channel flags, and timers of both stream interfaces of a * same stream, to complete the work after the analysers, then updates the data * layer below. This will ensure that any synchronous update performed at the * data layer will be reflected in the channel flags and/or stream-interface. + * Note that this does not change the stream interface's current state, though + * it updates the previous state to the current one. */ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b) { @@ -935,9 +936,6 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b si_f->prev_state = si_f->state; si_b->prev_state = si_b->state; - si_sync_send(si_f); - si_sync_send(si_b); - /* let's recompute both sides states */ if (si_state_in(si_f->state, SI_SB_RDY|SI_SB_EST)) si_update(si_f);