mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-04-01 22:48:25 +00:00
MEDIUM: connections: Get rid of the recv() method.
Remove the recv() method from mux and conn_stream. The goal is to always receive from the upper layers, instead of waiting for the connection later. For now, recv() is still called from the wake() method, but that should change soon.
This commit is contained in:
parent
4cf7fb148f
commit
af4021e680
@ -310,7 +310,6 @@ struct xprt_ops {
|
||||
*/
|
||||
struct mux_ops {
|
||||
int (*init)(struct connection *conn); /* early initialization */
|
||||
void (*recv)(struct connection *conn); /* mux-layer recv callback */
|
||||
int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */
|
||||
void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */
|
||||
size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
|
||||
@ -336,7 +335,6 @@ struct mux_ops {
|
||||
* data movement. It may abort a connection by returning < 0.
|
||||
*/
|
||||
struct data_cb {
|
||||
void (*recv)(struct conn_stream *cs); /* data-layer recv callback */
|
||||
int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
|
||||
int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
|
||||
char name[8]; /* data layer name, zero-terminated */
|
||||
|
42
src/checks.c
42
src/checks.c
@ -70,6 +70,7 @@ static char * tcpcheck_get_step_comment(struct check *, int);
|
||||
static int tcpcheck_main(struct check *);
|
||||
static void __event_srv_chk_w(struct conn_stream *cs);
|
||||
static int wake_srv_chk(struct conn_stream *cs);
|
||||
static void __event_srv_chk_r(struct conn_stream *cs);
|
||||
|
||||
static struct pool_head *pool_head_email_alert = NULL;
|
||||
static struct pool_head *pool_head_tcpcheck_rule = NULL;
|
||||
@ -709,9 +710,15 @@ static void chk_report_conn_err(struct check *check, int errno_bck, int expired)
|
||||
static struct task *event_srv_chk_io(struct task *t, void *ctx, unsigned short state)
|
||||
{
|
||||
struct conn_stream *cs = ctx;
|
||||
struct check *check = cs->data;
|
||||
|
||||
if (!(cs->wait_list.wait_reason & SUB_CAN_SEND))
|
||||
wake_srv_chk(cs);
|
||||
if (!(cs->wait_list.wait_reason & SUB_CAN_RECV)) {
|
||||
HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
|
||||
__event_srv_chk_r(cs);
|
||||
HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -803,9 +810,11 @@ static void __event_srv_chk_w(struct conn_stream *cs)
|
||||
* etc.
|
||||
*
|
||||
* Please do NOT place any return statement in this function and only leave
|
||||
* via the out_unlock label.
|
||||
* via the out label.
|
||||
*
|
||||
* This must be called with the server lock held.
|
||||
*/
|
||||
static void event_srv_chk_r(struct conn_stream *cs)
|
||||
static void __event_srv_chk_r(struct conn_stream *cs)
|
||||
{
|
||||
struct connection *conn = cs->conn;
|
||||
struct check *check = cs->data;
|
||||
@ -815,17 +824,17 @@ static void event_srv_chk_r(struct conn_stream *cs)
|
||||
int done;
|
||||
unsigned short msglen;
|
||||
|
||||
HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
|
||||
|
||||
if (unlikely(check->result == CHK_RES_FAILED))
|
||||
goto out_wakeup;
|
||||
|
||||
if (conn->flags & CO_FL_HANDSHAKE)
|
||||
goto out_unlock;
|
||||
if (conn->flags & CO_FL_HANDSHAKE) {
|
||||
cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* wake() will take care of calling tcpcheck_main() */
|
||||
if (check->type == PR_O2_TCPCHK_CHK)
|
||||
goto out_unlock;
|
||||
goto out;
|
||||
|
||||
/* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
|
||||
* but the connection was closed on the remote end. Fortunately, recv still
|
||||
@ -1372,13 +1381,13 @@ static void event_srv_chk_r(struct conn_stream *cs)
|
||||
conn->flags |= CO_FL_ERROR;
|
||||
|
||||
task_wakeup(t, TASK_WOKEN_IO);
|
||||
out_unlock:
|
||||
HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
|
||||
out:
|
||||
return;
|
||||
|
||||
wait_more_data:
|
||||
__cs_want_recv(cs);
|
||||
goto out_unlock;
|
||||
cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1443,7 +1452,6 @@ static int wake_srv_chk(struct conn_stream *cs)
|
||||
}
|
||||
|
||||
struct data_cb check_conn_cb = {
|
||||
.recv = event_srv_chk_r,
|
||||
.wake = wake_srv_chk,
|
||||
.name = "CHCK",
|
||||
};
|
||||
@ -2172,8 +2180,10 @@ static struct task *process_chk_conn(struct task *t, void *context, unsigned sho
|
||||
t->expire = tick_first(t->expire, t_con);
|
||||
}
|
||||
|
||||
if (check->type)
|
||||
if (check->type) {
|
||||
cs_want_recv(cs); /* prepare for reading a possible reply */
|
||||
__event_srv_chk_r(cs);
|
||||
}
|
||||
|
||||
task_set_affinity(t, tid_bit);
|
||||
goto reschedule;
|
||||
@ -2928,8 +2938,10 @@ static int tcpcheck_main(struct check *check)
|
||||
goto out_end_tcpcheck;
|
||||
}
|
||||
}
|
||||
else
|
||||
else {
|
||||
conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* mark the step as started */
|
||||
@ -3091,8 +3103,10 @@ static int tcpcheck_main(struct check *check)
|
||||
__cs_want_send(cs);
|
||||
|
||||
if (&check->current_step->list != head &&
|
||||
check->current_step->action == TCPCHK_ACT_EXPECT)
|
||||
check->current_step->action == TCPCHK_ACT_EXPECT) {
|
||||
__cs_want_recv(cs);
|
||||
__event_srv_chk_r(cs);
|
||||
}
|
||||
goto out;
|
||||
|
||||
out_end_tcpcheck:
|
||||
|
@ -64,7 +64,7 @@ void conn_fd_handler(int fd)
|
||||
{
|
||||
struct connection *conn = fdtab[fd].owner;
|
||||
unsigned int flags;
|
||||
int can_send = 0;
|
||||
int io_available = 0;
|
||||
|
||||
if (unlikely(!conn)) {
|
||||
activity[tid].conn_dead++;
|
||||
@ -128,7 +128,8 @@ void conn_fd_handler(int fd)
|
||||
* both of which will be detected below.
|
||||
*/
|
||||
flags = 0;
|
||||
can_send = LIST_ISEMPTY(&conn->send_wait_list);
|
||||
io_available = (LIST_ISEMPTY(&conn->send_wait_list) &&
|
||||
LIST_ISEMPTY(&conn->sendrecv_wait_list));;
|
||||
while (!LIST_ISEMPTY(&conn->send_wait_list)) {
|
||||
struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
@ -138,7 +139,7 @@ void conn_fd_handler(int fd)
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
|
||||
struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
|
||||
struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
@ -159,7 +160,26 @@ void conn_fd_handler(int fd)
|
||||
* both of which will be detected below.
|
||||
*/
|
||||
flags = 0;
|
||||
conn->mux->recv(conn);
|
||||
io_available |= (LIST_ISEMPTY(&conn->recv_wait_list) &&
|
||||
LIST_ISEMPTY(&conn->sendrecv_wait_list));
|
||||
while (!LIST_ISEMPTY(&conn->recv_wait_list)) {
|
||||
struct wait_list *sw = LIST_ELEM(conn->recv_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
sw->wait_reason &= ~SUB_CAN_RECV;
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
|
||||
struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
LIST_ADDQ(&conn->send_wait_list, &sw->list);
|
||||
sw->wait_reason &= ~SUB_CAN_RECV;
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* It may happen during the data phase that a handshake is
|
||||
@ -206,7 +226,7 @@ void conn_fd_handler(int fd)
|
||||
* Note that the wake callback is allowed to release the connection and
|
||||
* the fd (and return < 0 in this case).
|
||||
*/
|
||||
if ((can_send || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
|
||||
if ((io_available || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
|
||||
((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED &&
|
||||
(conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED))) &&
|
||||
conn->mux->wake(conn) < 0)
|
||||
|
79
src/mux_h2.c
79
src/mux_h2.c
@ -121,8 +121,6 @@ struct h2c {
|
||||
struct list fctl_list; /* list of streams blocked by connection's fctl */
|
||||
struct buffer_wait buf_wait; /* wait list for buffer allocations */
|
||||
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
|
||||
struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */
|
||||
struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */
|
||||
struct wait_list wait_list; /* We're in a wait list, to send */
|
||||
};
|
||||
|
||||
@ -186,6 +184,7 @@ struct h2s {
|
||||
enum h2_err errcode; /* H2 err code (H2_ERR_*) */
|
||||
enum h2_ss st;
|
||||
struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
|
||||
struct wait_list *recv_wait_list; /* Somebody subscribed to be waken up on recv */
|
||||
};
|
||||
|
||||
/* descriptor for an h2 frame header */
|
||||
@ -222,6 +221,7 @@ static const struct h2s *h2_idle_stream = &(const struct h2s){
|
||||
|
||||
static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state);
|
||||
static void h2_send(struct h2c *h2c);
|
||||
static void h2_recv(struct h2c *h2c);
|
||||
static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short state);
|
||||
static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id);
|
||||
static int h2_frt_decode_headers(struct h2s *h2s);
|
||||
@ -280,8 +280,10 @@ static int h2_buf_available(void *target)
|
||||
|
||||
if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc_margin(&h2c->dbuf, 0)) {
|
||||
h2c->flags &= ~H2_CF_DEM_DALLOC;
|
||||
if (h2_recv_allowed(h2c))
|
||||
if (h2_recv_allowed(h2c)) {
|
||||
conn_xprt_want_recv(h2c->conn);
|
||||
h2_recv(h2c);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -292,8 +294,10 @@ static int h2_buf_available(void *target)
|
||||
|
||||
if (h2c->flags & H2_CF_DEM_MROOM) {
|
||||
h2c->flags &= ~H2_CF_DEM_MROOM;
|
||||
if (h2_recv_allowed(h2c))
|
||||
if (h2_recv_allowed(h2c)) {
|
||||
conn_xprt_want_recv(h2c->conn);
|
||||
h2_recv(h2c);
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
@ -302,8 +306,10 @@ static int h2_buf_available(void *target)
|
||||
(h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s->cs &&
|
||||
b_alloc_margin(&h2s->rxbuf, 0)) {
|
||||
h2c->flags &= ~H2_CF_DEM_SALLOC;
|
||||
if (h2_recv_allowed(h2c))
|
||||
if (h2_recv_allowed(h2c)) {
|
||||
conn_xprt_want_recv(h2c->conn);
|
||||
h2_recv(h2c);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -408,11 +414,10 @@ static int h2c_frt_init(struct connection *conn)
|
||||
task_queue(t);
|
||||
conn_xprt_want_recv(conn);
|
||||
LIST_INIT(&h2c->send_wait_list);
|
||||
LIST_INIT(&h2c->recv_wait_list);
|
||||
LIST_INIT(&h2c->sendrecv_wait_list);
|
||||
LIST_INIT(&h2c->wait_list.list);
|
||||
|
||||
/* mux->wake will be called soon to complete the operation */
|
||||
/* Try to read, if nothing is available yet we'll just subscribe */
|
||||
h2_recv(h2c);
|
||||
return 0;
|
||||
fail:
|
||||
if (t)
|
||||
@ -2228,17 +2233,16 @@ static int h2_process_mux(struct h2c *h2c)
|
||||
}
|
||||
|
||||
|
||||
/*********************************************************/
|
||||
/* functions below are I/O callbacks from the connection */
|
||||
/*********************************************************/
|
||||
|
||||
/* callback called on recv event by the connection handler */
|
||||
static void h2_recv(struct connection *conn)
|
||||
/* Attempt to read data, and subscribe if none available */
|
||||
static void h2_recv(struct h2c *h2c)
|
||||
{
|
||||
struct h2c *h2c = conn->mux_ctx;
|
||||
struct connection *conn = h2c->conn;
|
||||
struct buffer *buf;
|
||||
int max;
|
||||
|
||||
if (h2c->wait_list.wait_reason & SUB_CAN_RECV)
|
||||
return;
|
||||
|
||||
if (!h2_recv_allowed(h2c))
|
||||
return;
|
||||
|
||||
@ -2253,6 +2257,7 @@ static void h2_recv(struct connection *conn)
|
||||
conn->xprt->rcv_buf(conn, buf, max, 0);
|
||||
|
||||
if (!b_data(buf)) {
|
||||
conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_list);
|
||||
h2_release_buf(h2c, &h2c->dbuf);
|
||||
return;
|
||||
}
|
||||
@ -2337,17 +2342,6 @@ static void h2_send(struct h2c *h2c)
|
||||
sw->wait_reason &= ~SUB_CAN_SEND;
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
while (!(LIST_ISEMPTY(&h2c->sendrecv_wait_list))) {
|
||||
struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
|
||||
sw->wait_reason &= ~SUB_CAN_SEND;
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
/* We're done, no more to send */
|
||||
if (!b_data(&h2c->mbuf))
|
||||
@ -2364,6 +2358,8 @@ static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short status)
|
||||
|
||||
if (!(h2c->wait_list.wait_reason & SUB_CAN_SEND))
|
||||
h2_send(h2c);
|
||||
if (!(h2c->wait_list.wait_reason & SUB_CAN_RECV))
|
||||
h2_recv(h2c);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -2377,6 +2373,9 @@ static int h2_wake(struct connection *conn)
|
||||
struct session *sess = conn->owner;
|
||||
|
||||
h2_send(h2c);
|
||||
if (h2_recv_allowed(h2c))
|
||||
h2_recv(h2c);
|
||||
|
||||
if (b_data(&h2c->dbuf) && !(h2c->flags & H2_CF_DEM_BLOCK_ANY)) {
|
||||
h2_process_demux(h2c);
|
||||
|
||||
@ -2436,11 +2435,11 @@ static int h2_wake(struct connection *conn)
|
||||
h2_release_buf(h2c, &h2c->dbuf);
|
||||
|
||||
/* stop being notified of incoming data if we can't process them */
|
||||
if (!h2_recv_allowed(h2c)) {
|
||||
if (!h2_recv_allowed(h2c))
|
||||
__conn_xprt_stop_recv(conn);
|
||||
}
|
||||
else {
|
||||
__conn_xprt_want_recv(conn);
|
||||
h2_recv(h2c);
|
||||
}
|
||||
|
||||
/* adjust output polling */
|
||||
@ -2554,6 +2553,7 @@ static void h2_update_poll(struct conn_stream *cs)
|
||||
h2s->h2c->flags &= ~H2_CF_DEM_SFULL;
|
||||
if (h2s->h2c->dsi == h2s->id) {
|
||||
conn_xprt_want_recv(cs->conn);
|
||||
h2_recv(h2s->h2c);
|
||||
conn_xprt_want_send(cs->conn);
|
||||
}
|
||||
}
|
||||
@ -2605,6 +2605,7 @@ static void h2_detach(struct conn_stream *cs)
|
||||
h2c->flags &= ~H2_CF_DEM_TOOMANY;
|
||||
if (h2_recv_allowed(h2c)) {
|
||||
__conn_xprt_want_recv(h2c->conn);
|
||||
h2_recv(h2c);
|
||||
conn_xprt_want_send(h2c->conn);
|
||||
}
|
||||
}
|
||||
@ -2625,6 +2626,7 @@ static void h2_detach(struct conn_stream *cs)
|
||||
h2c->flags &= ~H2_CF_DEM_BLOCK_ANY;
|
||||
h2c->flags &= ~H2_CF_MUX_BLOCK_ANY;
|
||||
conn_xprt_want_recv(cs->conn);
|
||||
h2_recv(h2c);
|
||||
conn_xprt_want_send(cs->conn);
|
||||
}
|
||||
|
||||
@ -3477,30 +3479,14 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
|
||||
sw = param;
|
||||
if (!(sw->wait_reason & SUB_CAN_RECV)) {
|
||||
sw->wait_reason |= SUB_CAN_RECV;
|
||||
/* If we're already subscribed for send(), move it
|
||||
* to the send+recv list
|
||||
*/
|
||||
if (sw->wait_reason & SUB_CAN_SEND) {
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
|
||||
} else
|
||||
LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
|
||||
h2s->recv_wait_list = sw;
|
||||
}
|
||||
return 0;
|
||||
case SUB_CAN_SEND:
|
||||
sw = param;
|
||||
if (!(sw->wait_reason & SUB_CAN_SEND)) {
|
||||
sw->wait_reason |= SUB_CAN_SEND;
|
||||
/* If we're already subscribed for recv(), move it
|
||||
* to the send+recv list
|
||||
*/
|
||||
if (sw->wait_reason & SUB_CAN_RECV) {
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
|
||||
} else
|
||||
LIST_ADDQ(&h2c->send_wait_list, &sw->list);
|
||||
LIST_ADDQ(&h2c->send_wait_list, &sw->list);
|
||||
}
|
||||
return 0;
|
||||
default:
|
||||
@ -3710,7 +3696,6 @@ static int h2_parse_max_concurrent_streams(char **args, int section_type, struct
|
||||
/* The mux operations */
|
||||
const struct mux_ops h2_ops = {
|
||||
.init = h2_init,
|
||||
.recv = h2_recv,
|
||||
.wake = h2_wake,
|
||||
.update_poll = h2_update_poll,
|
||||
.snd_buf = h2_snd_buf,
|
||||
|
16
src/mux_pt.c
16
src/mux_pt.c
@ -85,21 +85,6 @@ static void mux_pt_update_poll(struct conn_stream *cs)
|
||||
conn_cond_update_xprt_polling(conn);
|
||||
}
|
||||
|
||||
/* callback to be used by default for the pass-through mux. It simply calls the
|
||||
* data layer recv() callback much must be set.
|
||||
*/
|
||||
static void mux_pt_recv(struct connection *conn)
|
||||
{
|
||||
struct conn_stream *cs = conn->mux_ctx;
|
||||
|
||||
if (conn->flags & CO_FL_ERROR)
|
||||
cs->flags |= CS_FL_ERROR;
|
||||
if (conn_xprt_read0_pending(conn))
|
||||
cs->flags |= CS_FL_EOS;
|
||||
cs->data_cb->recv(cs);
|
||||
cs_update_mux_polling(cs);
|
||||
}
|
||||
|
||||
/*
|
||||
* Attach a new stream to a connection
|
||||
* (Used for outgoing connections)
|
||||
@ -200,7 +185,6 @@ static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
|
||||
/* The mux operations */
|
||||
const struct mux_ops mux_pt_ops = {
|
||||
.init = mux_pt_init,
|
||||
.recv = mux_pt_recv,
|
||||
.wake = mux_pt_wake,
|
||||
.update_poll = mux_pt_update_poll,
|
||||
.rcv_buf = mux_pt_rcv_buf,
|
||||
|
@ -51,10 +51,9 @@ static void stream_int_shutr_applet(struct stream_interface *si);
|
||||
static void stream_int_shutw_applet(struct stream_interface *si);
|
||||
static void stream_int_chk_rcv_applet(struct stream_interface *si);
|
||||
static void stream_int_chk_snd_applet(struct stream_interface *si);
|
||||
static void si_cs_recv_cb(struct conn_stream *cs);
|
||||
static void si_cs_recv(struct conn_stream *cs);
|
||||
static int si_cs_wake_cb(struct conn_stream *cs);
|
||||
static int si_idle_conn_wake_cb(struct conn_stream *cs);
|
||||
static void si_idle_conn_null_cb(struct conn_stream *cs);
|
||||
static struct task * si_cs_send(struct conn_stream *cs);
|
||||
|
||||
/* stream-interface operations for embedded tasks */
|
||||
@ -84,13 +83,11 @@ struct si_ops si_applet_ops = {
|
||||
};
|
||||
|
||||
struct data_cb si_conn_cb = {
|
||||
.recv = si_cs_recv_cb,
|
||||
.wake = si_cs_wake_cb,
|
||||
.name = "STRM",
|
||||
};
|
||||
|
||||
struct data_cb si_idle_conn_cb = {
|
||||
.recv = si_idle_conn_null_cb,
|
||||
.wake = si_idle_conn_wake_cb,
|
||||
.name = "IDLE",
|
||||
};
|
||||
@ -417,15 +414,6 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
|
||||
}
|
||||
|
||||
|
||||
/* Tiny I/O callback called on recv/send I/O events on idle connections.
|
||||
* It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb()
|
||||
* is notified and can kill the connection.
|
||||
*/
|
||||
static void si_idle_conn_null_cb(struct conn_stream *cs)
|
||||
{
|
||||
conn_sock_drain(cs->conn);
|
||||
}
|
||||
|
||||
/* Callback to be used by connection I/O handlers when some activity is detected
|
||||
* on an idle server connection. Its main purpose is to kill the connection once
|
||||
* a close was detected on it. It returns 0 if it did nothing serious, or -1 if
|
||||
@ -439,6 +427,8 @@ static int si_idle_conn_wake_cb(struct conn_stream *cs)
|
||||
if (!conn_ctrl_ready(conn))
|
||||
return 0;
|
||||
|
||||
conn_sock_drain(conn);
|
||||
|
||||
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) {
|
||||
/* warning, we can't do anything on <conn> after this call ! */
|
||||
si_release_endpoint(si);
|
||||
@ -582,8 +572,8 @@ static int si_cs_wake_cb(struct conn_stream *cs)
|
||||
* for recv() (received only an empty response).
|
||||
*/
|
||||
if (!(cs->flags & CS_FL_EOS) &&
|
||||
(cs->flags & (CS_FL_DATA_RD_ENA|CS_FL_REOS|CS_FL_RCV_MORE)) > CS_FL_DATA_RD_ENA)
|
||||
si_cs_recv_cb(cs);
|
||||
(cs->flags & (CS_FL_DATA_RD_ENA)))
|
||||
si_cs_recv(cs);
|
||||
|
||||
/* If we have data to send, try it now */
|
||||
if (!channel_is_empty(oc) && objt_cs(si->end))
|
||||
@ -753,7 +743,7 @@ wake_others:
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
|
||||
struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
|
||||
struct wait_list *sw = LIST_ELEM(cs->sendrecv_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
@ -1148,7 +1138,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
|
||||
* into the buffer from the connection. It iterates over the mux layer's
|
||||
* rcv_buf function.
|
||||
*/
|
||||
static void si_cs_recv_cb(struct conn_stream *cs)
|
||||
static void si_cs_recv(struct conn_stream *cs)
|
||||
{
|
||||
struct connection *conn = cs->conn;
|
||||
struct stream_interface *si = cs->data;
|
||||
@ -1364,6 +1354,26 @@ static void si_cs_recv_cb(struct conn_stream *cs)
|
||||
}
|
||||
ic->last_read = now_ms;
|
||||
}
|
||||
if (cur_read > 0) {
|
||||
while (!LIST_ISEMPTY(&cs->recv_wait_list)) {
|
||||
struct wait_list *sw = LIST_ELEM(cs->recv_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
sw->wait_reason &= ~SUB_CAN_RECV;
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
|
||||
struct wait_list *sw = LIST_ELEM(cs->sendrecv_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
LIST_ADDQ(&cs->send_wait_list, &sw->list);
|
||||
sw->wait_reason &= ~SUB_CAN_RECV;
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
end_recv:
|
||||
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
|
||||
|
Loading…
Reference in New Issue
Block a user