MEDIUM: stream-int: make si_update() synchronize flag changes before the I/O

With the new synchronous si_cs_send() at the end of process_stream(),
we're seeing re-appear the I/O layer specific part of the stream interface
which is supposed to deal with I/O event subscription. The only difference
is that now we subscribe to I/Os only after having attempted (and failed)
them.

This patch brings a cleanup in this by reintroducing stream_int_update_conn()
with the send code from process_stream(). However this alone would not be
enough because the flags which are cleared afterwards would result in the
loss of the possible events (write events only at the moment). So the flags
clearing and stream-int state updates are also performed inside si_update()
between the generic code and the I/O specific code. This definitely makes
sense as after this call we can simply check again for channel and SI flag
changes and decide to loop once again or not.
This commit is contained in:
Willy Tarreau 2018-10-25 11:06:57 +02:00
parent 0f8d3ab362
commit 85f890174a
3 changed files with 47 additions and 33 deletions

View File

@ -357,11 +357,23 @@ static inline void si_shutw(struct stream_interface *si)
si->ops->shutw(si);
}
/* Updates the stream interface and timers, then updates the data layer below */
/* Updates the stream interface and timers, to complete the work after the
* analysers, then clears the relevant channel flags, and the errors and
* expirations, then updates the data layer below. This will ensure that any
* synchronous update performed at the data layer will be reflected in the
* channel flags and/or stream-interface.
*/
static inline void si_update(struct stream_interface *si)
{
stream_int_update(si);
if (si->ops->update)
if (si->state == SI_ST_EST)
stream_int_update(si);
si_ic(si)->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED);
si_oc(si)->flags &= ~(CF_WRITE_NULL|CF_WRITE_PARTIAL);
si->flags &= ~(SI_FL_ERR|SI_FL_EXP);
si->prev_state = si->state;
if (si->ops->update && (si->state == SI_ST_CON || si->state == SI_ST_EST))
si->ops->update(si);
}

View File

@ -1659,7 +1659,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
struct channel *req, *res;
struct stream_interface *si_f, *si_b;
struct conn_stream *cs;
int ret;
activity[tid].stream++;
@ -2447,38 +2446,13 @@ redo:
if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
stream_process_counters(s);
cs = objt_cs(si_f->end);
ret = 0;
if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
!(cs->flags & CS_FL_ERROR) &&
!(si_oc(si_f)->flags & CF_SHUTW) &&
!(si_f->wait_event.wait_reason & SUB_CAN_SEND) &&
co_data(si_oc(si_f)))
ret = si_cs_send(cs);
cs = objt_cs(si_b->end);
if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
!(cs->flags & CS_FL_ERROR) &&
!(si_oc(si_b)->flags & CF_SHUTW) &&
!(si_b->wait_event.wait_reason & SUB_CAN_SEND) &&
co_data(si_oc(si_b)))
ret |= si_cs_send(cs);
si_update(si_f);
si_update(si_b);
if (ret)
if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS ||
(((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER))
goto redo;
if (si_f->state == SI_ST_EST)
si_update(si_f);
if (si_b->state == SI_ST_EST)
si_update(si_b);
req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED);
res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED);
si_f->prev_state = si_f->state;
si_b->prev_state = si_b->state;
si_f->flags &= ~(SI_FL_ERR|SI_FL_EXP);
si_b->flags &= ~(SI_FL_ERR|SI_FL_EXP);
/* Trick: if a request is being waiting for the server to respond,
* and if we know the server can timeout, we don't want the timeout
* to expire on the client side first, but we're still interested

View File

@ -66,6 +66,7 @@ struct si_ops si_embedded_ops = {
/* stream-interface operations for connections */
struct si_ops si_conn_ops = {
.update = stream_int_update_conn,
.chk_rcv = stream_int_chk_rcv_conn,
.chk_snd = stream_int_chk_snd_conn,
.shutr = stream_int_shutr_conn,
@ -798,6 +799,33 @@ void stream_int_update(struct stream_interface *si)
}
}
/* Updates the active status of a connection outside of the connection handler
* based on the channel's flags and the stream interface's flags. It needs to
* be called once after the channels' flags have settled down and the stream
* has been updated. It is not designed to be called from within the connection
* handler itself.
*/
void stream_int_update_conn(struct stream_interface *si)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
struct conn_stream *cs = __objt_cs(si->end);
if (!(ic->flags & CF_SHUTR)) {
/* Read not closed, it doesn't seem we have to do anything here */
}
if (!(oc->flags & CF_SHUTW)) {
/* Write not closed */
if (!channel_is_empty(oc) &&
!(cs->conn->flags & CO_FL_ERROR) &&
!(cs->flags & CS_FL_ERROR) &&
!(oc->flags & CF_SHUTW) &&
!(si->wait_event.wait_reason & SUB_CAN_SEND))
si_cs_send(cs);
}
}
/*
* This function performs a shutdown-read on a stream interface attached to
* a connection in a connected or init state (it does nothing for other