diff --git a/include/haproxy/connection.h b/include/haproxy/connection.h index c52437f7f..40b15a31d 100644 --- a/include/haproxy/connection.h +++ b/include/haproxy/connection.h @@ -46,12 +46,6 @@ extern struct mux_proto_list mux_proto_list; #define IS_HTX_CONN(conn) ((conn)->mux && ((conn)->mux->flags & MX_FL_HTX)) #define IS_HTX_CS(cs) (IS_HTX_CONN((cs)->conn)) -/* I/O callback for fd-based connections. It calls the read/write handlers - * provided by the connection's sock_ops. - */ -void conn_fd_handler(int fd); -int conn_fd_check(struct connection *conn); - /* receive a PROXY protocol header over a connection */ int conn_recv_proxy(struct connection *conn, int flag); int make_proxy_line(char *buf, int buf_len, struct server *srv, struct connection *remote, struct stream *strm); @@ -215,6 +209,52 @@ static inline void conn_xprt_shutw_hard(struct connection *c) c->xprt->shutw(c, c->xprt_ctx, 0); } +/* This is used at the end of the socket IOCB to possibly create the mux if it + * was not done yet, or wake it up if flags changed compared to old_flags or if + * need_wake insists on this. It returns <0 if the connection was destroyed and + * must not be used, >=0 otherwise. + */ +static inline int conn_notify_mux(struct connection *conn, int old_flags, int forced_wake) +{ + int ret = 0; + + /* If we don't yet have a mux, that means we were waiting for + * information to create one, typically from the ALPN. If we're + * done with the handshake, attempt to create one. + */ + if (unlikely(!conn->mux) && !(conn->flags & CO_FL_WAIT_XPRT)) { + ret = conn_create_mux(conn); + if (ret < 0) + goto done; + } + + /* The wake callback is normally used to notify the data layer about + * data layer activity (successful send/recv), connection establishment, + * shutdown and fatal errors. We need to consider the following + * situations to wake up the data layer : + * - change among the CO_FL_NOTIFY_DONE flags : + * SOCK_{RD,WR}_SH, ERROR, + * - absence of any of {L4,L6}_CONN and CONNECTED, indicating the + * end of handshake and transition to CONNECTED + * - raise of CONNECTED with HANDSHAKE down + * - end of HANDSHAKE with CONNECTED set + * - regular data layer activity + * + * Note that the wake callback is allowed to release the connection and + * the fd (and return < 0 in this case). + */ + if ((forced_wake || + ((conn->flags ^ old_flags) & CO_FL_NOTIFY_DONE) || + ((old_flags & CO_FL_WAIT_XPRT) && !(conn->flags & CO_FL_WAIT_XPRT))) && + conn->mux && conn->mux->wake) { + ret = conn->mux->wake(conn); + if (ret < 0) + goto done; + } + done: + return ret; +} + /* shut read */ static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode) { diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h index 9492c3ed0..c741eb39e 100644 --- a/include/haproxy/fd.h +++ b/include/haproxy/fd.h @@ -430,7 +430,7 @@ static inline void fd_update_events(int fd, unsigned char evts) static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask) { int locked = fdtab[fd].running_mask != tid_bit; - extern void conn_fd_handler(int); + extern void sock_conn_iocb(int); if (locked) fd_set_running_excl(fd); @@ -446,7 +446,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned #endif /* conn_fd_handler should support edge-triggered FDs */ - if ((global.tune.options & GTUNE_FD_ET) && fdtab[fd].iocb == conn_fd_handler) + if ((global.tune.options & GTUNE_FD_ET) && fdtab[fd].iocb == sock_conn_iocb) fdtab[fd].et_possible = 1; fdtab[fd].thread_mask = thread_mask; diff --git a/include/haproxy/sock.h b/include/haproxy/sock.h index 6c9b7b7d6..8f6a90279 100644 --- a/include/haproxy/sock.h +++ b/include/haproxy/sock.h @@ -45,6 +45,9 @@ struct connection *sock_accept_conn(struct listener *l, int *status); void sock_accept_iocb(int fd); void sock_conn_ctrl_init(struct connection *conn); void sock_conn_ctrl_close(struct connection *conn); +void sock_conn_iocb(int fd); +int sock_conn_check(struct connection *conn); + #endif /* _HAPROXY_SOCK_H */ diff --git a/src/cli.c b/src/cli.c index ced882ace..8f6d382d7 100644 --- a/src/cli.c +++ b/src/cli.c @@ -1046,7 +1046,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx) #endif goto skip; // closed } - else if (fdt.iocb == conn_fd_handler) { + else if (fdt.iocb == sock_conn_iocb) { conn_flags = ((struct connection *)fdt.owner)->flags; mux = ((struct connection *)fdt.owner)->mux; ctx = ((struct connection *)fdt.owner)->ctx; @@ -1082,7 +1082,7 @@ static int cli_io_handler_show_fd(struct appctx *appctx) if (!fdt.owner) { chunk_appendf(&trash, ")"); } - else if (fdt.iocb == conn_fd_handler) { + else if (fdt.iocb == sock_conn_iocb) { chunk_appendf(&trash, ") back=%d cflg=0x%08x", is_back, conn_flags); if (px) chunk_appendf(&trash, " px=%s", px->id); diff --git a/src/connection.c b/src/connection.c index 16d4f92b4..d44402e86 100644 --- a/src/connection.c +++ b/src/connection.c @@ -84,202 +84,6 @@ fail: } -/* I/O callback for fd-based connections. It calls the read/write handlers - * provided by the connection's sock_ops, which must be valid. - */ -void conn_fd_handler(int fd) -{ - struct connection *conn = fdtab[fd].owner; - unsigned int flags; - int need_wake = 0; - - if (unlikely(!conn)) { - activity[tid].conn_dead++; - return; - } - - flags = conn->flags & ~CO_FL_ERROR; /* ensure to call the wake handler upon error */ - - if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && - ((fd_send_ready(fd) && fd_send_active(fd)) || - (fd_recv_ready(fd) && fd_recv_active(fd)))) { - /* Still waiting for a connection to establish and nothing was - * attempted yet to probe the connection. this will clear the - * CO_FL_WAIT_L4_CONN flag on success. - */ - if (!conn_fd_check(conn)) - goto leave; - need_wake = 1; - } - - if (fd_send_ready(fd) && fd_send_active(fd)) { - /* force reporting of activity by clearing the previous flags : - * we'll have at least ERROR or CONNECTED at the end of an I/O, - * both of which will be detected below. - */ - flags = 0; - if (conn->subs && conn->subs->events & SUB_RETRY_SEND) { - need_wake = 0; // wake will be called after this I/O - tasklet_wakeup(conn->subs->tasklet); - conn->subs->events &= ~SUB_RETRY_SEND; - if (!conn->subs->events) - conn->subs = NULL; - } - fd_stop_send(fd); - } - - /* The data transfer starts here and stops on error and handshakes. Note - * that we must absolutely test conn->xprt at each step in case it suddenly - * changes due to a quick unexpected close(). - */ - if (fd_recv_ready(fd) && fd_recv_active(fd)) { - /* force reporting of activity by clearing the previous flags : - * we'll have at least ERROR or CONNECTED at the end of an I/O, - * both of which will be detected below. - */ - flags = 0; - if (conn->subs && conn->subs->events & SUB_RETRY_RECV) { - need_wake = 0; // wake will be called after this I/O - tasklet_wakeup(conn->subs->tasklet); - conn->subs->events &= ~SUB_RETRY_RECV; - if (!conn->subs->events) - conn->subs = NULL; - } - fd_stop_recv(fd); - } - - leave: - /* If we don't yet have a mux, that means we were waiting for - * information to create one, typically from the ALPN. If we're - * done with the handshake, attempt to create one. - */ - if (unlikely(!conn->mux) && !(conn->flags & CO_FL_WAIT_XPRT)) - if (conn_create_mux(conn) < 0) - return; - - /* The wake callback is normally used to notify the data layer about - * data layer activity (successful send/recv), connection establishment, - * shutdown and fatal errors. We need to consider the following - * situations to wake up the data layer : - * - change among the CO_FL_NOTIFY_DONE flags : - * SOCK_{RD,WR}_SH, ERROR, - * - absence of any of {L4,L6}_CONN and CONNECTED, indicating the - * end of handshake and transition to CONNECTED - * - raise of CONNECTED with HANDSHAKE down - * - end of HANDSHAKE with CONNECTED set - * - regular data layer activity - * - * Note that the wake callback is allowed to release the connection and - * the fd (and return < 0 in this case). - */ - if ((need_wake || ((conn->flags ^ flags) & CO_FL_NOTIFY_DONE) || - ((flags & CO_FL_WAIT_XPRT) && !(conn->flags & CO_FL_WAIT_XPRT))) && - conn->mux && conn->mux->wake && conn->mux->wake(conn) < 0) - return; - - /* commit polling changes in case of error. - * WT: it seems that the last case where this could still be relevant - * is if a mux wake function above report a connection error but does - * not stop polling. Shouldn't we enforce this into the mux instead of - * having to deal with this ? - */ - if (unlikely(conn->flags & CO_FL_ERROR)) { - if (conn_ctrl_ready(conn)) - fd_stop_both(fd); - } -} - -/* This is the callback which is set when a connection establishment is pending - * and we have nothing to send. It may update the FD polling status to indicate - * !READY. It returns 0 if it fails in a fatal way or needs to poll to go - * further, otherwise it returns non-zero and removes the CO_FL_WAIT_L4_CONN - * flag from the connection's flags. In case of error, it sets CO_FL_ERROR and - * leaves the error code in errno. - */ -int conn_fd_check(struct connection *conn) -{ - struct sockaddr_storage *addr; - int fd = conn->handle.fd; - - if (conn->flags & CO_FL_ERROR) - return 0; - - if (!conn_ctrl_ready(conn)) - return 0; - - if (!(conn->flags & CO_FL_WAIT_L4_CONN)) - return 1; /* strange we were called while ready */ - - if (!fd_send_ready(fd)) - return 0; - - /* Here we have 2 cases : - * - modern pollers, able to report ERR/HUP. If these ones return any - * of these flags then it's likely a failure, otherwise it possibly - * is a success (i.e. there may have been data received just before - * the error was reported). - * - select, which doesn't report these and with which it's always - * necessary either to try connect() again or to check for SO_ERROR. - * In order to simplify everything, we double-check using connect() as - * soon as we meet either of these delicate situations. Note that - * SO_ERROR would clear the error after reporting it! - */ - if (cur_poller.flags & HAP_POLL_F_ERRHUP) { - /* modern poller, able to report ERR/HUP */ - if ((fdtab[fd].ev & (FD_POLL_IN|FD_POLL_ERR|FD_POLL_HUP)) == FD_POLL_IN) - goto done; - if ((fdtab[fd].ev & (FD_POLL_OUT|FD_POLL_ERR|FD_POLL_HUP)) == FD_POLL_OUT) - goto done; - if (!(fdtab[fd].ev & (FD_POLL_ERR|FD_POLL_HUP))) - goto wait; - /* error present, fall through common error check path */ - } - - /* Use connect() to check the state of the socket. This has the double - * advantage of *not* clearing the error (so that health checks can - * still use getsockopt(SO_ERROR)) and giving us the following info : - * - error - * - connecting (EALREADY, EINPROGRESS) - * - connected (EISCONN, 0) - */ - addr = conn->dst; - if ((conn->flags & CO_FL_SOCKS4) && obj_type(conn->target) == OBJ_TYPE_SERVER) - addr = &objt_server(conn->target)->socks4_addr; - - if (connect(fd, (const struct sockaddr *)addr, get_addr_len(addr)) == -1) { - if (errno == EALREADY || errno == EINPROGRESS) - goto wait; - - if (errno && errno != EISCONN) - goto out_error; - } - - done: - /* The FD is ready now, we'll mark the connection as complete and - * forward the event to the transport layer which will notify the - * data layer. - */ - conn->flags &= ~CO_FL_WAIT_L4_CONN; - fd_may_send(fd); - fd_cond_recv(fd); - errno = 0; // make health checks happy - return 1; - - out_error: - /* Write error on the file descriptor. Report it to the connection - * and disable polling on this FD. - */ - fdtab[fd].linger_risk = 0; - conn->flags |= CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH; - fd_stop_both(fd); - return 0; - - wait: - fd_cant_send(fd); - fd_want_send(fd); - return 0; -} - /* Send a message over an established connection. It makes use of send() and * returns the same return code and errno. If the socket layer is not ready yet * then -1 is returned and ENOTSOCK is set into errno. If the fd is not marked diff --git a/src/sock.c b/src/sock.c index c7a5f80f7..5a072f0b8 100644 --- a/src/sock.c +++ b/src/sock.c @@ -631,7 +631,7 @@ void sock_accept_iocb(int fd) */ void sock_conn_ctrl_init(struct connection *conn) { - fd_insert(conn->handle.fd, conn, conn_fd_handler, tid_bit); + fd_insert(conn->handle.fd, conn, sock_conn_iocb, tid_bit); } /* This completes the release of connection by removing its FD from the @@ -644,6 +644,181 @@ void sock_conn_ctrl_close(struct connection *conn) conn->handle.fd = DEAD_FD_MAGIC; } +/* This is the callback which is set when a connection establishment is pending + * and we have nothing to send. It may update the FD polling status to indicate + * !READY. It returns 0 if it fails in a fatal way or needs to poll to go + * further, otherwise it returns non-zero and removes the CO_FL_WAIT_L4_CONN + * flag from the connection's flags. In case of error, it sets CO_FL_ERROR and + * leaves the error code in errno. + */ +int sock_conn_check(struct connection *conn) +{ + struct sockaddr_storage *addr; + int fd = conn->handle.fd; + + if (conn->flags & CO_FL_ERROR) + return 0; + + if (!conn_ctrl_ready(conn)) + return 0; + + if (!(conn->flags & CO_FL_WAIT_L4_CONN)) + return 1; /* strange we were called while ready */ + + if (!fd_send_ready(fd)) + return 0; + + /* Here we have 2 cases : + * - modern pollers, able to report ERR/HUP. If these ones return any + * of these flags then it's likely a failure, otherwise it possibly + * is a success (i.e. there may have been data received just before + * the error was reported). + * - select, which doesn't report these and with which it's always + * necessary either to try connect() again or to check for SO_ERROR. + * In order to simplify everything, we double-check using connect() as + * soon as we meet either of these delicate situations. Note that + * SO_ERROR would clear the error after reporting it! + */ + if (cur_poller.flags & HAP_POLL_F_ERRHUP) { + /* modern poller, able to report ERR/HUP */ + if ((fdtab[fd].ev & (FD_POLL_IN|FD_POLL_ERR|FD_POLL_HUP)) == FD_POLL_IN) + goto done; + if ((fdtab[fd].ev & (FD_POLL_OUT|FD_POLL_ERR|FD_POLL_HUP)) == FD_POLL_OUT) + goto done; + if (!(fdtab[fd].ev & (FD_POLL_ERR|FD_POLL_HUP))) + goto wait; + /* error present, fall through common error check path */ + } + + /* Use connect() to check the state of the socket. This has the double + * advantage of *not* clearing the error (so that health checks can + * still use getsockopt(SO_ERROR)) and giving us the following info : + * - error + * - connecting (EALREADY, EINPROGRESS) + * - connected (EISCONN, 0) + */ + addr = conn->dst; + if ((conn->flags & CO_FL_SOCKS4) && obj_type(conn->target) == OBJ_TYPE_SERVER) + addr = &objt_server(conn->target)->socks4_addr; + + if (connect(fd, (const struct sockaddr *)addr, get_addr_len(addr)) == -1) { + if (errno == EALREADY || errno == EINPROGRESS) + goto wait; + + if (errno && errno != EISCONN) + goto out_error; + } + + done: + /* The FD is ready now, we'll mark the connection as complete and + * forward the event to the transport layer which will notify the + * data layer. + */ + conn->flags &= ~CO_FL_WAIT_L4_CONN; + fd_may_send(fd); + fd_cond_recv(fd); + errno = 0; // make health checks happy + return 1; + + out_error: + /* Write error on the file descriptor. Report it to the connection + * and disable polling on this FD. + */ + fdtab[fd].linger_risk = 0; + conn->flags |= CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH; + fd_stop_both(fd); + return 0; + + wait: + fd_cant_send(fd); + fd_want_send(fd); + return 0; +} + +/* I/O callback for fd-based connections. It calls the read/write handlers + * provided by the connection's sock_ops, which must be valid. + */ +void sock_conn_iocb(int fd) +{ + struct connection *conn = fdtab[fd].owner; + unsigned int flags; + int need_wake = 0; + + if (unlikely(!conn)) { + activity[tid].conn_dead++; + return; + } + + flags = conn->flags & ~CO_FL_ERROR; /* ensure to call the wake handler upon error */ + + if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && + ((fd_send_ready(fd) && fd_send_active(fd)) || + (fd_recv_ready(fd) && fd_recv_active(fd)))) { + /* Still waiting for a connection to establish and nothing was + * attempted yet to probe the connection. this will clear the + * CO_FL_WAIT_L4_CONN flag on success. + */ + if (!sock_conn_check(conn)) + goto leave; + need_wake = 1; + } + + if (fd_send_ready(fd) && fd_send_active(fd)) { + /* force reporting of activity by clearing the previous flags : + * we'll have at least ERROR or CONNECTED at the end of an I/O, + * both of which will be detected below. + */ + flags = 0; + if (conn->subs && conn->subs->events & SUB_RETRY_SEND) { + need_wake = 0; // wake will be called after this I/O + tasklet_wakeup(conn->subs->tasklet); + conn->subs->events &= ~SUB_RETRY_SEND; + if (!conn->subs->events) + conn->subs = NULL; + } + fd_stop_send(fd); + } + + /* The data transfer starts here and stops on error and handshakes. Note + * that we must absolutely test conn->xprt at each step in case it suddenly + * changes due to a quick unexpected close(). + */ + if (fd_recv_ready(fd) && fd_recv_active(fd)) { + /* force reporting of activity by clearing the previous flags : + * we'll have at least ERROR or CONNECTED at the end of an I/O, + * both of which will be detected below. + */ + flags = 0; + if (conn->subs && conn->subs->events & SUB_RETRY_RECV) { + need_wake = 0; // wake will be called after this I/O + tasklet_wakeup(conn->subs->tasklet); + conn->subs->events &= ~SUB_RETRY_RECV; + if (!conn->subs->events) + conn->subs = NULL; + } + fd_stop_recv(fd); + } + + leave: + /* we may have to finish to install a mux or to wake it up based on + * what was just done above. It may kill the connection so we have to + * be prpared not to use it anymore. + */ + if (conn_notify_mux(conn, flags, need_wake) < 0) + return; + + /* commit polling changes in case of error. + * WT: it seems that the last case where this could still be relevant + * is if a mux wake function above report a connection error but does + * not stop polling. Shouldn't we enforce this into the mux instead of + * having to deal with this ? + */ + if (unlikely(conn->flags & CO_FL_ERROR)) { + if (conn_ctrl_ready(conn)) + fd_stop_both(fd); + } +} + /* * Local variables: * c-indent-level: 8 diff --git a/src/tools.c b/src/tools.c index 27e603a41..fdc91909a 100644 --- a/src/tools.c +++ b/src/tools.c @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -4598,7 +4599,7 @@ const void *resolve_sym_name(struct buffer *buf, const char *pfx, void *addr) { .func = process_stream, .name = "process_stream" }, { .func = task_run_applet, .name = "task_run_applet" }, { .func = si_cs_io_cb, .name = "si_cs_io_cb" }, - { .func = conn_fd_handler, .name = "conn_fd_handler" }, + { .func = sock_conn_iocb, .name = "sock_conn_iocb" }, { .func = dgram_fd_handler, .name = "dgram_fd_handler" }, { .func = listener_accept, .name = "listener_accept" }, { .func = poller_pipe_io_handler, .name = "poller_pipe_io_handler" },