MEDIUM: connection: enable reading only once the connection is confirmed

In order to address the absurd polling sequence described in issue #253,
let's make sure we disable receiving on a connection until it's established.
Previously with bottom-top I/Os, we were almost certain that a connection
was ready when the first I/O was confirmed. Now we can enter various
functions, including process_stream(), which will attempt to read
something, will fail, and will then subscribe. But we don't want them
to try to receive if we know the connection didn't complete. The first
prerequisite for this is to mark the connection as not ready for receiving
until it's validated. But we don't want to mark it as not ready for sending
because we know that attempting I/Os later is extremely likely to work
without polling.

Once the connection is confirmed we re-enable recv readiness. In order
for this event to be taken into account, the call to tcp_connect_probe()
was moved earlier, between the attempt to send() and the attempt to recv().
This way if tcp_connect_probe() enables reading, we have a chance to
immediately fall back to this and read the possibly pending data.

Now the trace looks like the following. It's far from being perfect
but we've already saved one recvfrom() and one epollctl():

 epoll_wait(3, [], 200, 0) = 0
 socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) = 7
 fcntl(7, F_SETFL, O_RDONLY|O_NONBLOCK) = 0
 setsockopt(7, SOL_TCP, TCP_NODELAY, [1], 4) = 0
 connect(7, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress)
 epoll_ctl(3, EPOLL_CTL_ADD, 7, {EPOLLIN|EPOLLOUT|EPOLLRDHUP, {u32=7, u64=7}}) = 0
 epoll_wait(3, [{EPOLLOUT, {u32=7, u64=7}}], 200, 1000) = 1
 connect(7, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
 sendto(7, "OPTIONS / HTTP/1.0\r\n\r\n", 22, MSG_DONTWAIT|MSG_NOSIGNAL, NULL, 0) = 22
 epoll_ctl(3, EPOLL_CTL_MOD, 7, {EPOLLIN|EPOLLRDHUP, {u32=7, u64=7}}) = 0
 epoll_wait(3, [{EPOLLIN|EPOLLRDHUP, {u32=7, u64=7}}], 200, 1000) = 1
 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
 recvfrom(7, "HTTP/1.0 200\r\nContent-length: 0\r\nX-req: size=22, time=0 ms\r\nX-rsp: id=dummy, code=200, cache=1, size=0, time=0 ms (0 real)\r\n\r\n", 16384, 0, NULL, NULL) = 126
 close(7)                = 0
This commit is contained in:
Willy Tarreau 2019-09-05 17:05:05 +02:00
parent 8f2825f3ab
commit ccf3f6d1d6
4 changed files with 30 additions and 11 deletions

View File

@ -86,6 +86,15 @@ void conn_fd_handler(int fd)
__conn_xprt_stop_send(conn);
}
if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN)) {
/* still waiting for a connection to establish and nothing was
* attempted yet to probe the connection. Then let's retry the
* connect().
*/
if (!tcp_connect_probe(conn))
goto leave;
}
/* The data transfer starts here and stops on error and handshakes. Note
* that we must absolutely test conn->xprt at each step in case it suddenly
* changes due to a quick unexpected close().
@ -105,14 +114,6 @@ void conn_fd_handler(int fd)
__conn_xprt_stop_recv(conn);
}
if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN)) {
/* still waiting for a connection to establish and nothing was
* attempted yet to probe the connection. Then let's retry the
* connect().
*/
if (!tcp_connect_probe(conn))
goto leave;
}
leave:
/* Verify if the connection just established. */
if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED))))
@ -227,8 +228,14 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags)
} while (ret < 0 && errno == EINTR);
if (ret > 0)
if (ret > 0) {
if (conn->flags & CO_FL_WAIT_L4_CONN) {
conn->flags &= ~CO_FL_WAIT_L4_CONN;
fd_may_send(conn->handle.fd);
fd_cond_recv(conn->handle.fd);
}
return ret;
}
if (ret == 0 || errno == EAGAIN || errno == ENOTCONN) {
wait:

View File

@ -576,6 +576,9 @@ int tcp_connect_server(struct connection *conn, int flags)
conn_ctrl_init(conn); /* registers the FD */
fdtab[fd].linger_risk = 1; /* close hard if needed */
if (conn->flags & CO_FL_WAIT_L4_CONN)
fd_cant_recv(fd); // we'll change this once the connection is validated
if (conn_xprt_init(conn) < 0) {
conn_full_close(conn);
conn->flags |= CO_FL_ERROR;
@ -711,6 +714,8 @@ int tcp_connect_probe(struct connection *conn)
* data layer.
*/
conn->flags &= ~CO_FL_WAIT_L4_CONN;
fd_may_send(fd);
fd_cond_recv(fd);
return 1;
out_error:

View File

@ -575,6 +575,9 @@ static int uxst_connect_server(struct connection *conn, int flags)
conn_ctrl_init(conn); /* registers the FD */
fdtab[fd].linger_risk = 0; /* no need to disable lingering */
if (conn->flags & CO_FL_WAIT_L4_CONN)
fd_cant_recv(fd); // we'll change this once the connection is validated
if (conn_xprt_init(conn) < 0) {
conn_full_close(conn);
conn->flags |= CO_FL_ERROR;

View File

@ -207,8 +207,10 @@ int raw_sock_from_pipe(struct connection *conn, void *xprt_ctx, struct pipe *pip
done += ret;
pipe->data -= ret;
}
if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && done)
if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && done) {
conn->flags &= ~CO_FL_WAIT_L4_CONN;
fd_cond_recv(conn->handle.fd);
}
return done;
}
@ -387,8 +389,10 @@ static size_t raw_sock_from_buf(struct connection *conn, void *xprt_ctx, const s
break;
}
}
if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && done)
if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && done) {
conn->flags &= ~CO_FL_WAIT_L4_CONN;
fd_cond_recv(conn->handle.fd);
}
if (done > 0) {
/* we count the total bytes sent, and the send rate for 32-byte