MEDIUM: stream_interface: Make recv() subscribe when more data is needed.

Refactor the code so that si_cs_recv() subscribes to receive events.
This commit is contained in:
Olivier Houchard 2018-08-31 17:29:12 +02:00 committed by Willy Tarreau
parent 7505f94f90
commit f653528dc1

View File

@ -51,10 +51,10 @@ static void stream_int_shutr_applet(struct stream_interface *si);
static void stream_int_shutw_applet(struct stream_interface *si);
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
static void si_cs_recv(struct conn_stream *cs);
static int si_cs_recv(struct conn_stream *cs);
static int si_cs_wake_cb(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct conn_stream *cs);
static struct task * si_cs_send(struct conn_stream *cs);
static int si_cs_send(struct conn_stream *cs);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
@ -631,7 +631,7 @@ static int si_cs_wake_cb(struct conn_stream *cs)
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
static struct task * si_cs_send(struct conn_stream *cs)
static int si_cs_send(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
@ -641,21 +641,21 @@ static struct task * si_cs_send(struct conn_stream *cs)
/* We're already waiting to be able to send, give up */
if (si->wait_list.wait_reason & SUB_CAN_SEND)
return NULL;
return 0;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return NULL;
return 0;
if (conn->flags & CO_FL_HANDSHAKE) {
/* a handshake was requested */
/* Schedule ourself to be woken up once the handshake is done */
conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_list);
return NULL;
return 0;
}
/* we might have been called just after an asynchronous shutw */
if (si_oc(si)->flags & CF_SHUTW)
return NULL;
return 0;
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
@ -673,7 +673,7 @@ static struct task * si_cs_send(struct conn_stream *cs)
}
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return NULL;
return 0;
}
/* At this point, the pipe is empty, but we may still have data pending
@ -753,20 +753,24 @@ wake_others:
}
}
return NULL;
return did_send;
}
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
{
struct stream_interface *si = ctx;
struct conn_stream *cs = objt_cs(si->end);
int ret = 0;
if (!cs)
return NULL;
if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) {
si_cs_send(cs);
if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
ret = si_cs_send(cs);
if (!(si->wait_list.wait_reason & SUB_CAN_RECV))
ret |= si_cs_recv(cs);
if (ret != 0)
si_cs_wake_cb(cs);
}
return (NULL);
}
@ -1138,12 +1142,12 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
* into the buffer from the connection. It iterates over the mux layer's
* rcv_buf function.
*/
static void si_cs_recv(struct conn_stream *cs)
static int si_cs_recv(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
int ret, max, cur_read;
int ret, max, cur_read = 0;
int read_poll = MAX_READ_POLL_LOOPS;
/* stop immediately on errors. Note that we DON'T want to stop on
@ -1153,18 +1157,22 @@ static void si_cs_recv(struct conn_stream *cs)
* which rejects it before reading it all.
*/
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return;
return 0;
/* If another call to si_cs_recv() failed, and we subscribed to
* recv events already, give up now.
*/
if (si->wait_list.wait_reason & SUB_CAN_RECV)
return 0;
/* maybe we were called immediately after an asynchronous shutr */
if (ic->flags & CF_SHUTR)
return;
return 0;
/* stop here if we reached the end of data */
if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
cur_read = 0;
if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
global.tune.idle_timer &&
(unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
@ -1218,7 +1226,7 @@ static void si_cs_recv(struct conn_stream *cs)
goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return;
return cur_read != 0;
if (conn->flags & CO_FL_WAIT_ROOM) {
/* the pipe is full or we have read enough data that it
@ -1377,13 +1385,16 @@ static void si_cs_recv(struct conn_stream *cs)
end_recv:
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return;
return cur_read != 0;
if (cs->flags & CS_FL_EOS)
/* connection closed */
goto out_shutdown_r;
return;
/* Subscribe to receive events */
conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_list);
return cur_read != 0;
out_shutdown_r:
/* we received a shutdown */
@ -1391,7 +1402,7 @@ static void si_cs_recv(struct conn_stream *cs)
if (ic->flags & CF_AUTO_CLOSE)
channel_shutw_now(ic);
stream_sock_read0(si);
return;
return cur_read != 0;
}
/*