MEDIUM: connections/mux: Revamp the send direction.

Totally nuke the "send" method, instead, the upper layer decides when it's
time to send data, and if it's not possible, uses the new subscribe() method
to be called when it can send data again.
This commit is contained in:
Olivier Houchard 2018-07-17 18:49:38 +02:00 committed by Willy Tarreau
parent 6ff2039d13
commit 910b2bc829
7 changed files with 131 additions and 77 deletions

View File

@ -598,6 +598,7 @@ static inline void cs_init(struct conn_stream *cs, struct connection *conn)
{
cs->obj_type = OBJ_TYPE_CS;
cs->flags = CS_FL_NONE;
LIST_INIT(&cs->wait_list.list);
LIST_INIT(&cs->send_wait_list);
cs->conn = conn;
}
@ -663,6 +664,8 @@ static inline struct connection *conn_new()
/* Releases a conn_stream previously allocated by cs_new() */
static inline void cs_free(struct conn_stream *cs)
{
if (cs->wait_list.task)
tasklet_free(cs->wait_list.task);
pool_free(pool_head_connstream, cs);
}
@ -681,6 +684,11 @@ static inline struct conn_stream *cs_new(struct connection *conn)
if (!likely(cs))
return NULL;
cs->wait_list.task = tasklet_new();
if (!likely(cs->wait_list.task)) {
cs_free(cs);
return NULL;
}
if (!conn) {
conn = conn_new();
if (!likely(conn)) {

View File

@ -307,7 +307,6 @@ struct xprt_ops {
struct mux_ops {
int (*init)(struct connection *conn); /* early initialization */
void (*recv)(struct connection *conn); /* mux-layer recv callback */
void (*send)(struct connection *conn); /* mux-layer send callback */
int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */
void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */
size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
@ -334,7 +333,6 @@ struct mux_ops {
*/
struct data_cb {
void (*recv)(struct conn_stream *cs); /* data-layer recv callback */
void (*send)(struct conn_stream *cs); /* data-layer send callback */
int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
char name[8]; /* data layer name, zero-terminated */
@ -370,6 +368,7 @@ struct conn_stream {
enum obj_type obj_type; /* differentiates connection from applet context */
unsigned int flags; /* CS_FL_* */
struct connection *conn; /* xprt-level connection */
struct wait_list wait_list; /* We're in a wait list for send */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
void *data; /* pointer to upper layer's entity (eg: stream interface) */
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */

View File

@ -69,6 +69,7 @@ static int httpchk_expect(struct server *s, int done);
static int tcpcheck_get_step_id(struct check *);
static char * tcpcheck_get_step_comment(struct check *, int);
static int tcpcheck_main(struct check *);
static void __event_srv_chk_w(struct conn_stream *cs);
static struct pool_head *pool_head_email_alert = NULL;
static struct pool_head *pool_head_tcpcheck_rule = NULL;
@ -709,23 +710,42 @@ static void chk_report_conn_err(struct check *check, int errno_bck, int expired)
* the connection acknowledgement. If the proxy requires L7 health-checks,
* it sends the request. In other cases, it calls set_server_check_status()
* to set check->status, check->duration and check->result.
*/
static struct task *event_srv_chk_w(struct task *task, void *ctx, unsigned short state)
{
struct conn_stream *cs = ctx;
struct check __maybe_unused *check = cs->data;
HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
__event_srv_chk_w(cs);
HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
return NULL;
}
/* same as above but protected by the server lock.
*
* Please do NOT place any return statement in this function and only leave
* via the out_unlock label.
* via the out label. NOTE THAT THIS FUNCTION DOESN'T LOCK, YOU PROBABLY WANT
* TO USE event_srv_chk_w() instead.
*/
static void event_srv_chk_w(struct conn_stream *cs)
static void __event_srv_chk_w(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct check *check = cs->data;
struct server *s = check->server;
struct task *t = check->task;
HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
if (unlikely(check->result == CHK_RES_FAILED))
goto out_wakeup;
if (conn->flags & CO_FL_HANDSHAKE)
goto out_unlock;
if (conn->flags & CO_FL_HANDSHAKE) {
if (cs->wait_list.task->process != event_srv_chk_w) {
cs->wait_list.task->process = event_srv_chk_w;
cs->wait_list.task->context = cs;
}
LIST_ADDQ(&conn->send_wait_list, &cs->wait_list.list);
goto out;
}
if (retrieve_errno_from_socket(conn)) {
chk_report_conn_err(check, errno, 0);
@ -748,19 +768,24 @@ static void event_srv_chk_w(struct conn_stream *cs)
/* wake() will take care of calling tcpcheck_main() */
if (check->type == PR_O2_TCPCHK_CHK)
goto out_unlock;
goto out;
if (b_data(&check->bo)) {
b_del(&check->bo, conn->mux->snd_buf(cs, &check->bo, b_data(&check->bo), 0));
b_realign_if_empty(&check->bo);
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
chk_report_conn_err(check, errno, 0);
__cs_stop_both(cs);
goto out_wakeup;
}
if (b_data(&check->bo))
goto out_unlock;
if (b_data(&check->bo)) {
if (!cs->wait_list.task->process) {
cs->wait_list.task->process = event_srv_chk_w;
cs->wait_list.task->context = cs;
}
conn->mux->subscribe(cs, SUB_CAN_SEND, &cs->wait_list);
goto out;
}
}
/* full request sent, we allow up to <timeout.check> if nonzero for a response */
@ -774,8 +799,8 @@ static void event_srv_chk_w(struct conn_stream *cs)
task_wakeup(t, TASK_WOKEN_IO);
out_nowake:
__cs_stop_send(cs); /* nothing more to write */
out_unlock:
HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
out:
return;
}
/*
@ -1390,7 +1415,8 @@ static int wake_srv_chk(struct conn_stream *cs)
ret = tcpcheck_main(check);
cs = check->cs;
conn = cs_conn(cs);
}
} else if (LIST_ISEMPTY(&cs->wait_list.list))
__event_srv_chk_w(cs);
if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
/* We may get error reports bypassing the I/O handlers, typically
@ -1433,7 +1459,6 @@ static int wake_srv_chk(struct conn_stream *cs)
struct data_cb check_conn_cb = {
.recv = event_srv_chk_r,
.send = event_srv_chk_w,
.wake = wake_srv_chk,
.name = "CHCK",
};

View File

@ -64,6 +64,7 @@ void conn_fd_handler(int fd)
{
struct connection *conn = fdtab[fd].owner;
unsigned int flags;
int can_send = 0;
if (unlikely(!conn)) {
activity[tid].conn_dead++;
@ -127,7 +128,7 @@ void conn_fd_handler(int fd)
* both of which will be detected below.
*/
flags = 0;
conn->mux->send(conn);
can_send = LIST_ISEMPTY(&conn->send_wait_list);
while (!LIST_ISEMPTY(&conn->send_wait_list)) {
struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
struct wait_list *, list);
@ -195,9 +196,9 @@ void conn_fd_handler(int fd)
* Note that the wake callback is allowed to release the connection and
* the fd (and return < 0 in this case).
*/
if ((((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
if ((can_send || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED &&
(conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED)) &&
(conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED))) &&
conn->mux->wake(conn) < 0)
return;

View File

@ -121,6 +121,7 @@ struct h2c {
struct list fctl_list; /* list of streams blocked by connection's fctl */
struct buffer_wait buf_wait; /* wait list for buffer allocations */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
struct wait_list wait_list; /* We're in a wait list, to send */
};
/* H2 stream state, in h2s->st */
@ -217,6 +218,7 @@ static const struct h2s *h2_idle_stream = &(const struct h2s){
};
static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state);
static struct task *h2_send(struct task *t, void *ctx, unsigned short state);
/*****************************************************/
/* functions below are for dynamic buffer management */
@ -347,6 +349,12 @@ static int h2c_frt_init(struct connection *conn)
t->expire = tick_add(now_ms, h2c->timeout);
}
h2c->wait_list.task = tasklet_new();
if (!h2c->wait_list.task)
goto fail;
h2c->wait_list.task->process = h2_send;
h2c->wait_list.task->context = conn;
h2c->ddht = hpack_dht_alloc(h2_settings_header_table_size);
if (!h2c->ddht)
goto fail;
@ -381,12 +389,15 @@ static int h2c_frt_init(struct connection *conn)
task_queue(t);
conn_xprt_want_recv(conn);
LIST_INIT(&h2c->send_wait_list);
LIST_INIT(&h2c->wait_list.list);
/* mux->wake will be called soon to complete the operation */
return 0;
fail:
if (t)
task_free(t);
if (h2c->wait_list.task)
tasklet_free(h2c->wait_list.task);
pool_free(pool_head_h2c, h2c);
return -1;
}
@ -445,6 +456,8 @@ static void h2_release(struct connection *conn)
task_wakeup(h2c->task, TASK_WOKEN_OTHER);
h2c->task = NULL;
}
if (h2c->wait_list.task)
tasklet_free(h2c->wait_list.task);
pool_free(pool_head_h2c, h2c);
}
@ -2049,7 +2062,6 @@ static int h2_process_mux(struct h2c *h2c)
h2s->flags &= ~H2_SF_BLK_ANY;
if (h2s->cs) {
h2s->cs->data_cb->send(h2s->cs);
h2s->cs->data_cb->wake(h2s->cs);
} else {
h2s_send_rst_stream(h2c, h2s);
@ -2091,7 +2103,6 @@ static int h2_process_mux(struct h2c *h2c)
h2s->flags &= ~H2_SF_BLK_ANY;
if (h2s->cs) {
h2s->cs->data_cb->send(h2s->cs);
h2s->cs->data_cb->wake(h2s->cs);
} else {
h2s_send_rst_stream(h2c, h2s);
@ -2167,18 +2178,19 @@ static void h2_recv(struct connection *conn)
return;
}
/* callback called on send event by the connection handler */
static void h2_send(struct connection *conn)
/* Try to send data if possible */
static struct task *h2_send(struct task *t, void *ctx, unsigned short state)
{
struct connection *conn = ctx;
struct h2c *h2c = conn->mux_ctx;
int done;
if (conn->flags & CO_FL_ERROR)
return;
return NULL;
if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
/* a handshake was requested */
return;
return NULL;
}
/* This loop is quite simple : it tries to fill as much as it can from
@ -2243,6 +2255,13 @@ static void h2_send(struct connection *conn)
}
}
/* We're done, no more to send */
if (!b_data(&h2c->mbuf))
return NULL;
schedule:
if (LIST_ISEMPTY(&h2c->wait_list.list))
conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_list);
return NULL;
}
/* callback called on any event by the connection handler.
@ -2349,6 +2368,8 @@ static int h2_wake(struct connection *conn)
else
h2c->task->expire = TICK_ETERNITY;
}
h2_send(NULL, conn, 0);
return 0;
}
@ -3474,8 +3495,6 @@ static size_t h2_snd_buf(struct conn_stream *cs, const struct buffer *buf, size_
else if (LIST_ISEMPTY(&h2s->list)) {
if (h2s->flags & H2_SF_BLK_MFCTL)
LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list);
else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM))
LIST_ADDQ(&h2s->h2c->send_list, &h2s->list);
}
return total;
@ -3575,7 +3594,6 @@ static int h2_parse_max_concurrent_streams(char **args, int section_type, struct
const struct mux_ops h2_ops = {
.init = h2_init,
.recv = h2_recv,
.send = h2_send,
.wake = h2_wake,
.update_poll = h2_update_poll,
.rcv_buf = h2_rcv_buf,

View File

@ -97,19 +97,6 @@ static void mux_pt_recv(struct connection *conn)
cs_update_mux_polling(cs);
}
/* callback to be used by default for the pass-through mux. It simply calls the
* data layer send() callback which must be set.
*/
static void mux_pt_send(struct connection *conn)
{
struct conn_stream *cs = conn->mux_ctx;
if (conn->flags & CO_FL_ERROR)
cs->flags |= CS_FL_ERROR;
cs->data_cb->send(cs);
cs_update_mux_polling(cs);
}
/*
* Attach a new stream to a connection
* (Used for outgoing connections)
@ -207,7 +194,6 @@ static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
const struct mux_ops mux_pt_ops = {
.init = mux_pt_init,
.recv = mux_pt_recv,
.send = mux_pt_send,
.wake = mux_pt_wake,
.update_poll = mux_pt_update_poll,
.rcv_buf = mux_pt_rcv_buf,

View File

@ -52,10 +52,10 @@ static void stream_int_shutw_applet(struct stream_interface *si);
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
static void si_cs_recv_cb(struct conn_stream *cs);
static void si_cs_send_cb(struct conn_stream *cs);
static int si_cs_wake_cb(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct conn_stream *cs);
static void si_idle_conn_null_cb(struct conn_stream *cs);
static struct task * si_cs_send(struct task *t, void *ctx, unsigned short state);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
@ -85,14 +85,12 @@ struct si_ops si_applet_ops = {
struct data_cb si_conn_cb = {
.recv = si_cs_recv_cb,
.send = si_cs_send_cb,
.wake = si_cs_wake_cb,
.name = "STRM",
};
struct data_cb si_idle_conn_cb = {
.recv = si_idle_conn_null_cb,
.send = si_idle_conn_null_cb,
.wake = si_idle_conn_wake_cb,
.name = "IDLE",
};
@ -462,6 +460,10 @@ void stream_int_notify(struct stream_interface *si)
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
/* If we have data to send, try it now */
if (!channel_is_empty(oc) && objt_cs(si->end))
si_cs_send(NULL, objt_cs(si->end), 0);
/* process consumer side */
if (channel_is_empty(oc)) {
struct connection *conn = objt_cs(si->end) ? objt_cs(si->end)->conn : NULL;
@ -632,20 +634,42 @@ static int si_cs_wake_cb(struct conn_stream *cs)
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
static void si_cs_send(struct conn_stream *cs)
static struct task * si_cs_send(struct task *t, void *ctx, unsigned short state)
{
struct conn_stream *cs = ctx;
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *oc = si_oc(si);
int ret;
int did_send = 0;
/* We're already waiting to be able to send, give up */
if (!LIST_ISEMPTY(&cs->wait_list.list))
return NULL;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return NULL;
if (conn->flags & CO_FL_HANDSHAKE) {
/* a handshake was requested */
/* Schedule ourself to be woken up once the handshake is done */
LIST_ADDQ(&conn->send_wait_list, &cs->wait_list.list);
return NULL;
}
/* we might have been called just after an asynchronous shutw */
if (si_oc(si)->flags & CF_SHUTW)
return NULL;
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
ret = conn->mux->snd_pipe(cs, oc->pipe);
if (ret > 0)
if (ret > 0) {
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT;
did_send = 1;
}
if (!oc->pipe->data) {
put_pipe(oc->pipe);
@ -653,14 +677,14 @@ static void si_cs_send(struct conn_stream *cs)
}
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return;
return NULL;
}
/* At this point, the pipe is empty, but we may still have data pending
* in the normal buffer.
*/
if (!co_data(oc))
return;
goto wake_others;
/* when we're here, we already know that there is no spliced
* data left, and that there are sendable buffered data.
@ -691,6 +715,7 @@ static void si_cs_send(struct conn_stream *cs)
ret = conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag);
if (ret > 0) {
did_send = 1;
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT;
co_set_data(oc, co_data(oc) - ret);
@ -706,6 +731,26 @@ static void si_cs_send(struct conn_stream *cs)
*/
}
}
/* We couldn't send all of our data, let the mux know we'd like to send more */
if (co_data(oc)) {
if (!cs->wait_list.task->process) {
cs->wait_list.task->process = si_cs_send;
cs->wait_list.task->context = ctx;
}
conn->mux->subscribe(cs, SUB_CAN_SEND, &cs->wait_list);
}
wake_others:
/* Maybe somebody was waiting for this conn_stream, wake them */
if (did_send) {
while (!LIST_ISEMPTY(&cs->send_wait_list)) {
struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
struct wait_list *, list);
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
tasklet_wakeup(sw->task);
}
}
return NULL;
}
/* This function is designed to be called from within the stream handler to
@ -995,7 +1040,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
__cs_want_send(cs);
si_cs_send(cs);
si_cs_send(NULL, cs, 0);
if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
/* Write error on the file descriptor */
__cs_stop_both(cs);
@ -1312,34 +1357,6 @@ static void si_cs_recv_cb(struct conn_stream *cs)
return;
}
/*
* This is the callback which is called by the connection layer to send data
* from the buffer to the connection. It iterates over the transport layer's
* snd_buf function.
*/
static void si_cs_send_cb(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
return;
if (conn->flags & CO_FL_HANDSHAKE)
/* a handshake was requested */
return;
/* we might have been called just after an asynchronous shutw */
if (si_oc(si)->flags & CF_SHUTW)
return;
/* OK there are data waiting to be sent */
si_cs_send(cs);
/* OK all done */
return;
}
/*
* This function propagates a null read received on a socket-based connection.
* It updates the stream interface. If the stream interface has SI_FL_NOHALF,