[MEDIUM] stream_sock_process_data moved to stream_sock.c

The old temporary process_srv_data function moved to stream_sock.c.
This commit is contained in:
Willy Tarreau 2008-08-27 21:41:35 +02:00
parent 8a8188301b
commit 2d2127989c
3 changed files with 132 additions and 911 deletions

View File

@ -33,6 +33,7 @@
/* main event functions used to move data between sockets and buffers */
int stream_sock_read(int fd);
int stream_sock_write(int fd);
int stream_sock_process_data(int fd);
/* This either returns the sockname or the original destination address. Code

View File

@ -50,6 +50,7 @@
#include <proto/queue.h>
#include <proto/senddata.h>
#include <proto/session.h>
#include <proto/stream_sock.h>
#include <proto/task.h>
#ifdef CONFIG_HAP_TCPSPLICE
@ -756,7 +757,7 @@ void process_session(struct task *t, int *next)
buffer_shutw_now(s->req);
}
if (process_srv_data(s))
if (stream_sock_process_data(s->req->cons->fd))
resync |= PROCESS_SRV;
/* Count server-side errors (but not timeouts). */
@ -3863,916 +3864,6 @@ int process_srv_conn(struct session *t)
return 0;
}
/*
* Manages the server FSM and its socket during the DATA phase. It must not be
* called when a file descriptor is not attached to the buffer. It must only be
* called during SI_ST_EST. It normally returns zero, but may return 1 if it
* absolutely wants to be called again.
*/
int process_srv_data(struct session *t)
{
struct buffer *req = t->req;
struct buffer *rep = t->rep;
int fd = req->cons->fd;
DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x rql=%d rpl=%d\n",
now_ms, __FUNCTION__,
cli_stnames[t->cli_state],
rep->rex, req->wex,
req->flags, rep->flags,
req->l, rep->l);
/* Read or write error on the file descriptor */
if (fdtab[fd].state == FD_STERROR) {
trace_term(t, TT_HTTP_SRV_6);
if (!req->cons->err_type) {
req->cons->err_loc = t->srv;
req->cons->err_type = SI_ET_DATA_ERR;
}
buffer_shutw(req);
req->flags |= BF_WRITE_ERROR;
buffer_shutr(rep);
rep->flags |= BF_READ_ERROR;
do_close_and_return:
fd_delete(fd);
req->cons->state = SI_ST_CLO;
return 0;
}
/* Check if we need to close the read side */
if (!(rep->flags & BF_SHUTR)) {
/* Last read, forced read-shutdown, or other end closed */
if (rep->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) {
trace_term(t, TT_HTTP_SRV_10);
do_close_read:
buffer_shutr(rep);
if (req->flags & BF_SHUTW)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_RD);
}
/* Read timeout */
else if (unlikely(!(rep->flags & BF_READ_TIMEOUT) && tick_is_expired(rep->rex, now_ms))) {
trace_term(t, TT_HTTP_SRV_12);
rep->flags |= BF_READ_TIMEOUT;
if (!req->cons->err_type) {
req->cons->err_loc = t->srv;
req->cons->err_type = SI_ET_DATA_TO;
}
goto do_close_read;
}
/* Read not closed, update FD status and timeout for reads */
else if (rep->flags & (BF_FULL|BF_HIJACK)) {
/* stop reading */
EV_FD_COND_C(fd, DIR_RD);
rep->rex = TICK_ETERNITY;
}
else {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set, or if we already got some read status.
*/
EV_FD_COND_S(fd, DIR_RD);
if (!tick_isset(rep->rex) || rep->flags & BF_READ_STATUS)
rep->rex = tick_add_ifset(now_ms, rep->rto);
}
}
/* Check if we need to close the write side */
if (!(req->flags & BF_SHUTW)) {
/* Forced write-shutdown or other end closed with empty buffer. */
if ((req->flags & BF_SHUTW_NOW) ||
(req->flags & (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) {
trace_term(t, TT_HTTP_SRV_11);
do_close_write:
buffer_shutw(req);
if (rep->flags & BF_SHUTR)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_WR);
shutdown(fd, SHUT_WR);
}
/* Write timeout */
else if (unlikely(!(req->flags & BF_WRITE_TIMEOUT) && tick_is_expired(req->wex, now_ms))) {
trace_term(t, TT_HTTP_SRV_13);
req->flags |= BF_WRITE_TIMEOUT;
if (!req->cons->err_type) {
req->cons->err_loc = t->srv;
req->cons->err_type = SI_ET_DATA_TO;
}
goto do_close_write;
}
/* Write not closed, update FD status and timeout for writes */
else if ((req->flags & (BF_EMPTY|BF_MAY_FORWARD)) != BF_MAY_FORWARD) {
/* stop writing */
EV_FD_COND_C(fd, DIR_WR);
req->wex = TICK_ETERNITY;
}
else {
/* (re)start writing and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set, or if we already got some write status.
*/
EV_FD_COND_S(fd, DIR_WR);
if (!tick_isset(req->wex) || req->flags & BF_WRITE_STATUS) {
req->wex = tick_add_ifset(now_ms, req->wto);
if (tick_isset(req->wex) && !(rep->flags & BF_SHUTR) && tick_isset(rep->rex)) {
/* Note: depending on the protocol, we don't know if we're waiting
* for incoming data or not. So in order to prevent the socket from
* expiring read timeouts during writes, we refresh the read timeout,
* except if it was already infinite.
*/
rep->rex = req->wex;
}
}
}
}
return 0; /* other cases change nothing */
}
///*
// * Manages the client FSM and its socket. It normally returns zero, but may
// * return 1 if it absolutely wants to be called again.
// *
// * Note: process_cli is the ONLY function allowed to set cli_state to anything
// * but CL_STCLOSE.
// */
//int process_cli(struct session *t)
//{
// struct buffer *req = t->req;
// struct buffer *rep = t->rep;
//
// DPRINTF(stderr,"[%u] %s: c=%s set(r,w)=%d,%d exp(r,w)=%u,%u req=%08x rep=%08x rql=%d rpl=%d\n",
// now_ms, __FUNCTION__,
// cli_stnames[t->cli_state],
// t->cli_fd >= 0 && fdtab[t->cli_fd].state != FD_STCLOSE ? EV_FD_ISSET(t->cli_fd, DIR_RD) : 0,
// t->cli_fd >= 0 && fdtab[t->cli_fd].state != FD_STCLOSE ? EV_FD_ISSET(t->cli_fd, DIR_WR) : 0,
// req->rex, rep->wex,
// req->flags, rep->flags,
// req->l, rep->l);
//
// update_state:
// /* FIXME: we still have to check for CL_STSHUTR because client_retnclose
// * still set this state (and will do until unix sockets are converted).
// */
// if (t->cli_state == CL_STDATA || t->cli_state == CL_STSHUTR) {
// /* we can skip most of the tests at once if some conditions are not met */
// if (!((req->flags & (BF_READ_TIMEOUT|BF_READ_ERROR)) ||
// (rep->flags & (BF_WRITE_TIMEOUT|BF_WRITE_ERROR)) ||
// (!(req->flags & BF_SHUTR) && req->flags & (BF_READ_NULL|BF_SHUTW)) ||
// (!(rep->flags & BF_SHUTW) &&
// (rep->flags & (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR))))
// goto update_timeouts;
//
// /* read or write error */
// if (rep->flags & BF_WRITE_ERROR || req->flags & BF_READ_ERROR) {
// buffer_shutr(req);
// buffer_shutw(rep);
// fd_delete(t->cli_fd);
// t->cli_state = CL_STCLOSE;
// trace_term(t, TT_HTTP_CLI_1);
// if (!req->analysers) {
// if (!(t->flags & SN_ERR_MASK))
// t->flags |= SN_ERR_CLICL;
// if (!(t->flags & SN_FINST_MASK)) {
// if (t->pend_pos)
// t->flags |= SN_FINST_Q;
// else if (!(req->flags & BF_CONNECTED))
// t->flags |= SN_FINST_C;
// else
// t->flags |= SN_FINST_D;
// }
// }
// goto update_state;
// }
// /* last read, or end of server write */
// else if (!(req->flags & BF_SHUTR) && /* not already done */
// req->flags & (BF_READ_NULL | BF_SHUTW)) {
// buffer_shutr(req);
// if (!(rep->flags & BF_SHUTW)) {
// EV_FD_CLR(t->cli_fd, DIR_RD);
// trace_term(t, TT_HTTP_CLI_2);
// } else {
// /* output was already closed */
// fd_delete(t->cli_fd);
// t->cli_state = CL_STCLOSE;
// trace_term(t, TT_HTTP_CLI_3);
// }
// goto update_state;
// }
// /* last server read and buffer empty : we only check them when we're
// * allowed to forward the data.
// */
// else if (!(rep->flags & BF_SHUTW) && /* not already done */
// rep->flags & BF_EMPTY && rep->flags & BF_MAY_FORWARD &&
// rep->flags & BF_SHUTR && !(t->flags & SN_SELF_GEN)) {
// buffer_shutw(rep);
// if (!(req->flags & BF_SHUTR)) {
// EV_FD_CLR(t->cli_fd, DIR_WR);
// shutdown(t->cli_fd, SHUT_WR);
// /* We must ensure that the read part is still alive when switching to shutw */
// /* FIXME: is this still true ? */
// EV_FD_SET(t->cli_fd, DIR_RD);
// req->rex = tick_add_ifset(now_ms, t->fe->timeout.client);
// trace_term(t, TT_HTTP_CLI_4);
// } else {
// fd_delete(t->cli_fd);
// t->cli_state = CL_STCLOSE;
// trace_term(t, TT_HTTP_CLI_5);
// }
// goto update_state;
// }
// /* read timeout */
// else if ((req->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT) {
// buffer_shutr(req);
// if (!(rep->flags & BF_SHUTW)) {
// EV_FD_CLR(t->cli_fd, DIR_RD);
// trace_term(t, TT_HTTP_CLI_6);
// } else {
// /* output was already closed */
// fd_delete(t->cli_fd);
// t->cli_state = CL_STCLOSE;
// trace_term(t, TT_HTTP_CLI_7);
// }
// if (!req->analysers) {
// if (!(t->flags & SN_ERR_MASK))
// t->flags |= SN_ERR_CLITO;
// if (!(t->flags & SN_FINST_MASK)) {
// if (t->pend_pos)
// t->flags |= SN_FINST_Q;
// else if (!(req->flags & BF_CONNECTED))
// t->flags |= SN_FINST_C;
// else
// t->flags |= SN_FINST_D;
// }
// }
// goto update_state;
// }
// /* write timeout */
// else if ((rep->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT) {
// buffer_shutw(rep);
// if (!(req->flags & BF_SHUTR)) {
// EV_FD_CLR(t->cli_fd, DIR_WR);
// shutdown(t->cli_fd, SHUT_WR);
// /* We must ensure that the read part is still alive when switching to shutw */
// /* FIXME: is this still true ? */
// EV_FD_SET(t->cli_fd, DIR_RD);
// req->rex = tick_add_ifset(now_ms, t->fe->timeout.client);
// trace_term(t, TT_HTTP_CLI_8);
// } else {
// fd_delete(t->cli_fd);
// t->cli_state = CL_STCLOSE;
// trace_term(t, TT_HTTP_CLI_9);
// }
// if (!req->analysers) {
// if (!(t->flags & SN_ERR_MASK))
// t->flags |= SN_ERR_CLITO;
// if (!(t->flags & SN_FINST_MASK)) {
// if (t->pend_pos)
// t->flags |= SN_FINST_Q;
// else if (!(req->flags & BF_CONNECTED))
// t->flags |= SN_FINST_C;
// else
// t->flags |= SN_FINST_D;
// }
// }
// goto update_state;
// }
//
// update_timeouts:
// /* manage read timeout */
// if (!(req->flags & BF_SHUTR)) {
// if (req->flags & BF_FULL) {
// /* no room to read more data */
// if (EV_FD_COND_C(t->cli_fd, DIR_RD)) {
// /* stop reading until we get some space */
// req->rex = TICK_ETERNITY;
// }
// } else {
// EV_FD_COND_S(t->cli_fd, DIR_RD);
// req->rex = tick_add_ifset(now_ms, t->fe->timeout.client);
// }
// }
//
// /* manage write timeout */
// if (!(rep->flags & BF_SHUTW)) {
// /* first, we may have to produce data (eg: stats).
// * right now, this is limited to the SHUTR state.
// */
// if (req->flags & BF_SHUTR && t->flags & SN_SELF_GEN) {
// produce_content(t);
// if (rep->flags & BF_EMPTY) {
// buffer_shutw(rep);
// fd_delete(t->cli_fd);
// t->cli_state = CL_STCLOSE;
// trace_term(t, TT_HTTP_CLI_10);
// goto update_state;
// }
// }
//
// /* we don't enable client write if the buffer is empty, nor if the server has to analyze it */
// if ((rep->flags & BF_EMPTY) || !(rep->flags & BF_MAY_FORWARD)) {
// if (EV_FD_COND_C(t->cli_fd, DIR_WR)) {
// /* stop writing */
// rep->wex = TICK_ETERNITY;
// }
// } else {
// /* buffer not empty */
// EV_FD_COND_S(t->cli_fd, DIR_WR);
// if (!tick_isset(rep->wex)) {
// /* restart writing */
// rep->wex = tick_add_ifset(now_ms, t->fe->timeout.client);
// if (!(req->flags & BF_SHUTR) && tick_isset(rep->wex) && tick_isset(req->rex)) {
// /* FIXME: to prevent the client from expiring read timeouts during writes,
// * we refresh it, except if it was already infinite. */
// req->rex = rep->wex;
// }
// }
// }
// }
// return 0; /* other cases change nothing */
// }
// else if (t->cli_state == CL_STCLOSE) { /* CL_STCLOSE: nothing to do */
// if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) {
// int len;
// len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n", t->uniq_id, t->be->id, (unsigned short)t->cli_fd, (unsigned short)req->cons->fd);
// write(1, trash, len);
// }
// return 0;
// }
//#ifdef DEBUG_DEV
// fprintf(stderr, "FIXME !!!! impossible state at %s:%d = %d\n", __FILE__, __LINE__, t->cli_state);
// ABORT_NOW();
//#endif
// return 0;
//}
//
//
///* Return 1 if we could get a new connection for session t, otherwise zero */
//int tcp_get_connection(struct session *t)
//{
// struct http_txn *txn = &t->txn;
// struct buffer *req = t->req;
// struct buffer *rep = t->rep;
//
// DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x rql=%d rpl=%d\n",
// now_ms, __FUNCTION__,
// cli_stnames[t->cli_state],
// rep->rex, req->wex,
// req->flags, rep->flags,
// req->l, rep->l);
//
//
// if ((rep->flags & BF_SHUTW) ||
// ((req->flags & BF_SHUTR) &&
// (req->flags & BF_EMPTY || t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
// req->wex = TICK_ETERNITY;
// if (t->pend_pos)
// t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
// /* note that this must not return any error because it would be able to
// * overwrite the client_retnclose() output.
// */
// if (txn->flags & TX_CLTARPIT)
// srv_close_with_err(t, SN_ERR_CLICL, SN_FINST_T, 0, NULL);
// else
// srv_close_with_err(t, SN_ERR_CLICL, t->pend_pos ? SN_FINST_Q : SN_FINST_C, 0, NULL);
//
// trace_term(t, TT_HTTP_SRV_1);
// return 0;
// }
//
// /* stop here if we're not allowed to connect */
// if (!(req->flags & BF_MAY_FORWARD))
// return 0;
//
// /* the client allows the server to connect */
// if (txn->flags & TX_CLTARPIT) {
// /* This connection is being tarpitted. The CLIENT side has
// * already set the connect expiration date to the right
// * timeout. We just have to check that it has not expired.
// */
// if (!(req->flags & BF_WRITE_TIMEOUT))
// return 0;
//
// /* We will set the queue timer to the time spent, just for
// * logging purposes. We fake a 500 server error, so that the
// * attacker will not suspect his connection has been tarpitted.
// * It will not cause trouble to the logs because we can exclude
// * the tarpitted connections by filtering on the 'PT' status flags.
// */
// req->wex = TICK_ETERNITY;
// t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
// srv_close_with_err(t, SN_ERR_PRXCOND, SN_FINST_T,
// 500, error_message(t, HTTP_ERR_500));
// trace_term(t, TT_HTTP_SRV_2);
// return 0;
// }
//
// /* Right now, we will need to create a connection to the server.
// * We might already have tried, and got a connection pending, in
// * which case we will not do anything till it's pending. It's up
// * to any other session to release it and wake us up again.
// */
// if (t->pend_pos) {
// if (!(req->flags & BF_WRITE_TIMEOUT)) {
// return 0;
// } else {
// /* we've been waiting too long here */
// req->wex = TICK_ETERNITY;
// t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
// srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q,
// 503, error_message(t, HTTP_ERR_503));
// trace_term(t, TT_HTTP_SRV_3);
// if (t->srv)
// t->srv->failed_conns++;
// t->be->failed_conns++;
// return 0;
// }
// }
//
// do {
// if (srv_redispatch_connect(t) != 0)
// return 0;
//
// if (t->srv && t->srv->rdr_len && t->flags & SN_REDIRECTABLE) {
// /* Server supporting redirection and it is possible.
// * Invalid requests are reported as such. It concerns all
// * the largest ones.
// */
// struct chunk rdr;
// char *path;
// int len;
//
// /* 1: create the response header */
// rdr.len = strlen(HTTP_302);
// rdr.str = trash;
// memcpy(rdr.str, HTTP_302, rdr.len);
//
// /* 2: add the server's prefix */
// if (rdr.len + t->srv->rdr_len > sizeof(trash))
// goto cancel_redir;
//
// memcpy(rdr.str + rdr.len, t->srv->rdr_pfx, t->srv->rdr_len);
// rdr.len += t->srv->rdr_len;
//
// /* 3: add the request URI */
// path = http_get_path(txn);
// if (!path)
// goto cancel_redir;
// len = txn->req.sl.rq.u_l + (txn->req.sol+txn->req.sl.rq.u) - path;
// if (rdr.len + len > sizeof(trash) - 4) /* 4 for CRLF-CRLF */
// goto cancel_redir;
//
// memcpy(rdr.str + rdr.len, path, len);
// rdr.len += len;
// memcpy(rdr.str + rdr.len, "\r\n\r\n", 4);
// rdr.len += 4;
//
// srv_close_with_err(t, SN_ERR_PRXCOND, SN_FINST_C, 302, &rdr);
// trace_term(t, TT_HTTP_SRV_3);
//
// /* FIXME: we should increase a counter of redirects per server and per backend. */
// if (t->srv)
// t->srv->cum_sess++;
// return 0;
// cancel_redir:
// txn->status = 400;
// t->fe->failed_req++;
// srv_close_with_err(t, SN_ERR_PRXCOND, SN_FINST_C,
// 400, error_message(t, HTTP_ERR_400));
// trace_term(t, TT_HTTP_SRV_4);
// return 0;
// }
//
// /* try to (re-)connect to the server, and fail if we expire the
// * number of retries.
// */
// if (srv_retryable_connect(t)) {
// t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
// if (!(req->cons.flags & BC_KNOWN))
// return 0;
// /* We got an FD */
// return 1;
// }
// } while (1);
//}
//
//
///* Return 1 if the pending connection has failed and should be retried,
// * otherwise zero.
// */
//int tcp_connection_failed(struct session *t)
//{
// struct buffer *req = t->req;
// struct buffer *rep = t->rep;
// int conn_err;
//
// DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x rql=%d rpl=%d\n",
// now_ms, __FUNCTION__,
// cli_stnames[t->cli_state],
// rep->rex, req->wex,
// req->flags, rep->flags,
// req->l, rep->l);
//
// if ((rep->flags & BF_SHUTW) ||
// ((req->flags & BF_SHUTR) &&
// ((req->flags & BF_EMPTY && !(req->flags & BF_WRITE_STATUS)) ||
// t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
// req->wex = TICK_ETERNITY;
// if (!(t->flags & SN_CONN_TAR)) {
// /* if we are in turn-around, we have already closed the FD */
// fd_delete(req->cons->fd);
// req->cons->state = SI_ST_CLO;
// if (t->srv) {
// t->srv->cur_sess--;
// sess_change_server(t, NULL);
// }
// }
//
// /* note that this must not return any error because it would be able to
// * overwrite the client_retnclose() output.
// */
// srv_close_with_err(t, SN_ERR_CLICL, SN_FINST_C, 0, NULL);
// trace_term(t, TT_HTTP_SRV_5);
// return 0;
// }
//
// if (!(req->flags & (BF_WRITE_STATUS | BF_WRITE_TIMEOUT)))
// return 0; /* nothing changed */
//
// if (!(req->flags & BF_WRITE_STATUS) || (req->flags & BF_WRITE_ERROR)) {
// /* timeout, asynchronous connect error or first write error */
// if (t->flags & SN_CONN_TAR) {
// /* We are doing a turn-around waiting for a new connection attempt. */
// if (!(req->flags & BF_WRITE_TIMEOUT))
// return 0;
// t->flags &= ~SN_CONN_TAR;
// }
// else {
// fd_delete(req->cons->fd);
// req->cons->state = SI_ST_CLO;
// if (t->srv) {
// t->srv->cur_sess--;
// sess_change_server(t, NULL);
// }
//
// if (!(req->flags & BF_WRITE_STATUS))
// conn_err = SN_ERR_SRVTO; // it was a connect timeout.
// else
// conn_err = SN_ERR_SRVCL; // it was an asynchronous connect error.
//
// /* ensure that we have enough retries left */
// if (srv_count_retry_down(t, conn_err))
// return 0;
//
// if (req->flags & BF_WRITE_ERROR) {
// /* we encountered an immediate connection error, and we
// * will have to retry connecting to the same server, most
// * likely leading to the same result. To avoid this, we
// * fake a connection timeout to retry after a turn-around
// * time of 1 second. We will wait in the previous if block.
// */
// t->flags |= SN_CONN_TAR;
// req->wex = tick_add(now_ms, MS_TO_TICKS(1000));
// return 0;
// }
// }
//
// if (t->srv && t->conn_retries == 0 && t->be->options & PR_O_REDISP) {
// /* We're on our last chance, and the REDISP option was specified.
// * We will ignore cookie and force to balance or use the dispatcher.
// */
// /* let's try to offer this slot to anybody */
// if (may_dequeue_tasks(t->srv, t->be))
// process_srv_queue(t->srv);
//
// /* it's left to the dispatcher to choose a server */
// t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
// t->prev_srv = t->srv;
//
// /* first, get a connection */
// if (srv_redispatch_connect(t)) {
// if (req->cons.flags & BC_KNOWN)
// return 0;
// /* we need to get a connection */
// return 1;
// }
// } else {
// if (t->srv)
// t->srv->retries++;
// t->be->retries++;
// }
//
// do {
// /* Now we will try to either reconnect to the same server or
// * connect to another server. If the connection gets queued
// * because all servers are saturated, then we will go back to
// * the idle state where the buffer's consumer is marked as
// * unknown.
// */
// if (srv_retryable_connect(t)) {
// t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
// if (req->cons.flags & BC_KNOWN)
// return 0;
// /* we did not get a connection */
// return 1;
// }
//
// /* we need to redispatch the connection to another server */
// if (srv_redispatch_connect(t)) {
// if (req->cons.flags & BC_KNOWN)
// return 0;
// /* we need to get a connection */
// return 1;
// }
// } while (1);
// }
// else { /* no error and write OK */
// t->logs.t_connect = tv_ms_elapsed(&t->logs.tv_accept, &now);
//
// if (req->flags & BF_EMPTY) {
// EV_FD_CLR(req->cons->fd, DIR_WR);
// req->wex = TICK_ETERNITY;
// } else {
// EV_FD_SET(req->cons->fd, DIR_WR);
// req->wex = tick_add_ifset(now_ms, t->be->timeout.server);
// if (tick_isset(req->wex)) {
// /* FIXME: to prevent the server from expiring read timeouts during writes,
// * we refresh it. */
// rep->rex = req->wex;
// }
// }
//
// if (t->be->mode == PR_MODE_TCP) { /* let's allow immediate data connection in this case */
// EV_FD_SET(req->cons->fd, DIR_RD);
// rep->rex = tick_add_ifset(now_ms, t->be->timeout.server);
// buffer_set_rlim(rep, BUFSIZE); /* no rewrite needed */
//
// /* if the user wants to log as soon as possible, without counting
// bytes from the server, then this is the right moment. */
// if (t->fe->to_log && !(t->logs.logwait & LW_BYTES)) {
// t->logs.t_close = t->logs.t_connect; /* to get a valid end date */
// tcp_sess_log(t);
// }
//#ifdef CONFIG_HAP_TCPSPLICE
// if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) {
// /* TCP splicing supported by both FE and BE */
// tcp_splice_splicefd(t->cli_fd, req->cons->fd, 0);
// }
//#endif
// }
// else {
// rep->analysers |= AN_RTR_HTTP_HDR;
// buffer_set_rlim(rep, BUFSIZE - MAXREWRITE); /* rewrite needed */
// t->txn.rsp.msg_state = HTTP_MSG_RPBEFORE;
// /* reset hdr_idx which was already initialized by the request.
// * right now, the http parser does it.
// * hdr_idx_init(&t->txn.hdr_idx);
// */
// }
//
// req->flags |= BF_CONNECTED;
// if (!rep->analysers)
// t->rep->flags |= BF_MAY_FORWARD;
// req->wex = TICK_ETERNITY;
// return 0;
// }
//}
//
//
///*
// * Tries to establish a connection to the server and associate it to the
// * request buffer's consumer side. It normally returns zero, but may return 1
// * if it absolutely wants to be called again.
// */
//int process_srv_conn(struct session *t)
//{
// DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x rql=%d rpl=%d\n",
// now_ms, __FUNCTION__,
// cli_stnames[t->cli_state],
// t->rep->rex, t->req->wex,
// t->req->flags, t->rep->flags,
// t->req->l, t->rep->l);
//
// while (!(t->req->flags & BF_CONNECTED)) {
// if (!(t->req->cons.flags & BC_KNOWN)) {
// /* no connection in progress, get a new one */
// if (!tcp_get_connection(t))
// break;
// } else {
// /* connection in progress or just completed */
// if (!tcp_connection_failed(t))
// break;
// }
// }
// return 0;
//}
//
//
///*
// * Manages the server FSM and its socket during the DATA phase. It must not
// * be called when a file descriptor is not attached to the buffer. It normally
// * returns zero, but may return 1 if it absolutely wants to be called again.
// */
//int process_srv_data(struct session *t)
//{
// struct buffer *req = t->req;
// struct buffer *rep = t->rep;
//
// DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x rql=%d rpl=%d\n",
// now_ms, __FUNCTION__,
// cli_stnames[t->cli_state],
// rep->rex, req->wex,
// req->flags, rep->flags,
// req->l, rep->l);
//
// /* we can skip most of the tests at once if some conditions are not met */
// if (!((req->flags & (BF_WRITE_TIMEOUT|BF_WRITE_ERROR)) ||
// (!(req->flags & BF_SHUTW) &&
// (req->flags & (BF_EMPTY|BF_MAY_FORWARD)) == (BF_EMPTY|BF_MAY_FORWARD)) ||
// (rep->flags & (BF_READ_TIMEOUT|BF_READ_ERROR)) ||
// (!(rep->flags & BF_SHUTR) && rep->flags & (BF_READ_NULL|BF_SHUTW))))
// goto update_timeouts;
//
// /* read or write error */
// /* FIXME: what happens when we have to deal with HTTP ??? */
// if (req->flags & BF_WRITE_ERROR || rep->flags & BF_READ_ERROR) {
// buffer_shutr(rep);
// buffer_shutw(req);
// fd_delete(req->cons->fd);
// req->cons->state = SI_ST_CLO;
// if (t->srv) {
// t->srv->cur_sess--;
// t->srv->failed_resp++;
// sess_change_server(t, NULL);
// }
// t->be->failed_resp++;
// trace_term(t, TT_HTTP_SRV_6);
// if (!rep->analysers) {
// if (!(t->flags & SN_ERR_MASK))
// t->flags |= SN_ERR_SRVCL;
// if (!(t->flags & SN_FINST_MASK))
// t->flags |= SN_FINST_D;
// }
// if (may_dequeue_tasks(t->srv, t->be))
// process_srv_queue(t->srv);
//
// return 0;
// }
//
// /* last read, or end of client write */
// if (!(rep->flags & BF_SHUTR) && /* not already done */
// rep->flags & (BF_READ_NULL | BF_SHUTW)) {
// buffer_shutr(rep);
// if (!(req->flags & BF_SHUTW)) {
// EV_FD_CLR(req->cons->fd, DIR_RD);
// trace_term(t, TT_HTTP_SRV_7);
// } else {
// /* output was already closed */
// fd_delete(req->cons->fd);
// req->cons->state = SI_ST_CLO;
// if (t->srv) {
// t->srv->cur_sess--;
// sess_change_server(t, NULL);
// }
// trace_term(t, TT_HTTP_SRV_8);
//
// if (may_dequeue_tasks(t->srv, t->be))
// process_srv_queue(t->srv);
// return 0;
// }
// }
// /* end of client read and no more data to send. We can forward
// * the close when we're allowed to forward data (anytime right
// * now). If we're using option forceclose, then we may also
// * shutdown the outgoing write channel once the response starts
// * coming from the server.
// */
// if (!(req->flags & BF_SHUTW) && /* not already done */
// req->flags & BF_EMPTY && req->flags & BF_MAY_FORWARD &&
// (req->flags & BF_SHUTR ||
// (t->be->options & PR_O_FORCE_CLO && rep->flags & BF_READ_STATUS))) {
// buffer_shutw(req);
// if (!(rep->flags & BF_SHUTR)) {
// EV_FD_CLR(req->cons->fd, DIR_WR);
// shutdown(req->cons->fd, SHUT_WR);
// trace_term(t, TT_HTTP_SRV_9);
// /* We must ensure that the read part is still alive when switching to shutw */
// /* FIXME: is this still true ? */
// EV_FD_SET(req->cons->fd, DIR_RD);
// rep->rex = tick_add_ifset(now_ms, t->be->timeout.server);
// } else {
// fd_delete(req->cons->fd);
// req->cons->state = SI_ST_CLO;
// if (t->srv) {
// t->srv->cur_sess--;
// sess_change_server(t, NULL);
// }
// trace_term(t, TT_HTTP_SRV_10);
//
// if (may_dequeue_tasks(t->srv, t->be))
// process_srv_queue(t->srv);
// return 0;
// }
// }
//
// /* read timeout */
// if ((rep->flags & (BF_SHUTR|BF_READ_TIMEOUT)) == BF_READ_TIMEOUT) {
// if (!rep->analysers) {
// if (!(t->flags & SN_ERR_MASK))
// t->flags |= SN_ERR_SRVTO;
// if (!(t->flags & SN_FINST_MASK))
// t->flags |= SN_FINST_D;
// }
// buffer_shutr(rep);
// if (!(req->flags & BF_SHUTW)) {
// EV_FD_CLR(req->cons->fd, DIR_RD);
// trace_term(t, TT_HTTP_SRV_11);
// } else {
// fd_delete(req->cons->fd);
// req->cons->state = SI_ST_CLO;
// if (t->srv) {
// t->srv->cur_sess--;
// sess_change_server(t, NULL);
// }
// trace_term(t, TT_HTTP_SRV_12);
//
// if (may_dequeue_tasks(t->srv, t->be))
// process_srv_queue(t->srv);
// return 0;
// }
// }
//
// /* write timeout */
// if ((req->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) == BF_WRITE_TIMEOUT) {
// if (!rep->analysers) {
// if (!(t->flags & SN_ERR_MASK))
// t->flags |= SN_ERR_SRVTO;
// if (!(t->flags & SN_FINST_MASK))
// t->flags |= SN_FINST_D;
// }
// buffer_shutw(req);
// if (!(rep->flags & BF_SHUTR)) {
// EV_FD_CLR(req->cons->fd, DIR_WR);
// shutdown(req->cons->fd, SHUT_WR);
// /* We must ensure that the read part is still alive when switching to shutw */
// /* FIXME: is this still needed ? */
// EV_FD_SET(req->cons->fd, DIR_RD);
// rep->rex = tick_add_ifset(now_ms, t->be->timeout.server);
// trace_term(t, TT_HTTP_SRV_13);
// } else {
// fd_delete(req->cons->fd);
// req->cons->state = SI_ST_CLO;
// if (t->srv) {
// t->srv->cur_sess--;
// sess_change_server(t, NULL);
// }
// trace_term(t, TT_HTTP_SRV_14);
//
// if (may_dequeue_tasks(t->srv, t->be))
// process_srv_queue(t->srv);
// return 0;
// }
// }
//
// update_timeouts:
// /* manage read timeout */
// if (!(rep->flags & BF_SHUTR)) {
// if (rep->flags & BF_FULL) {
// if (EV_FD_COND_C(req->cons->fd, DIR_RD))
// rep->rex = TICK_ETERNITY;
// } else {
// EV_FD_COND_S(req->cons->fd, DIR_RD);
// rep->rex = tick_add_ifset(now_ms, t->be->timeout.server);
// }
// }
//
// /* manage write timeout */
// if (!(req->flags & BF_SHUTW)) {
// if (req->flags & BF_EMPTY || !(req->flags & BF_MAY_FORWARD)) {
// /* stop writing */
// if (EV_FD_COND_C(req->cons->fd, DIR_WR))
// req->wex = TICK_ETERNITY;
// } else {
// /* buffer not empty, there are still data to be transferred */
// EV_FD_COND_S(req->cons->fd, DIR_WR);
// if (!tick_isset(req->wex)) {
// /* restart writing */
// req->wex = tick_add_ifset(now_ms, t->be->timeout.server);
// if (!(rep->flags & BF_SHUTR) && tick_isset(req->wex) && tick_isset(rep->rex)) {
// /* FIXME: to prevent the server from expiring read timeouts during writes,
// * we refresh it, except if it was already infinite.
// */
// rep->rex = req->wex;
// }
// }
// }
// }
// return 0; /* other cases change nothing */
//}
//
/*
* Produces data for the session <s> depending on its source. Expects to be

View File

@ -26,6 +26,7 @@
#include <common/ticks.h>
#include <common/time.h>
#include <proto/buffers.h>
#include <proto/client.h>
#include <proto/fd.h>
#include <proto/stream_sock.h>
@ -415,6 +416,134 @@ int stream_sock_write(int fd) {
}
/*
* Manages a stream_sock connection during its data phase. The file descriptor
* status is checked, and the read and write timeouts are controlled. The
* buffers are examined for special shutdown cases and finally the timeouts,
* file descriptor and buffers' flags are updated accordingly.
*/
int stream_sock_process_data(int fd)
{
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
ib->l, ob->l);
/* Read or write error on the file descriptor */
if (fdtab[fd].state == FD_STERROR) {
//trace_term(t, TT_HTTP_SRV_6);
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
ob->cons->err_type = SI_ET_DATA_ERR;
}
buffer_shutw(ob);
ob->flags |= BF_WRITE_ERROR;
buffer_shutr(ib);
ib->flags |= BF_READ_ERROR;
do_close_and_return:
fd_delete(fd);
ob->cons->state = SI_ST_CLO;
return 0;
}
/* Check if we need to close the read side */
if (!(ib->flags & BF_SHUTR)) {
/* Last read, forced read-shutdown, or other end closed */
if (ib->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) {
//trace_term(t, TT_HTTP_SRV_10);
do_close_read:
buffer_shutr(ib);
if (ob->flags & BF_SHUTW)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_RD);
}
/* Read timeout */
else if (unlikely(!(ib->flags & BF_READ_TIMEOUT) && tick_is_expired(ib->rex, now_ms))) {
//trace_term(t, TT_HTTP_SRV_12);
ib->flags |= BF_READ_TIMEOUT;
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
ob->cons->err_type = SI_ET_DATA_TO;
}
goto do_close_read;
}
/* Read not closed, update FD status and timeout for reads */
else if (ib->flags & (BF_FULL|BF_HIJACK)) {
/* stop reading */
EV_FD_COND_C(fd, DIR_RD);
ib->rex = TICK_ETERNITY;
}
else {
/* (re)start reading and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set, or if we already got some read status.
*/
EV_FD_COND_S(fd, DIR_RD);
if (!tick_isset(ib->rex) || ib->flags & BF_READ_STATUS)
ib->rex = tick_add_ifset(now_ms, ib->rto);
}
}
/* Check if we need to close the write side */
if (!(ob->flags & BF_SHUTW)) {
/* Forced write-shutdown or other end closed with empty buffer. */
if ((ob->flags & BF_SHUTW_NOW) ||
(ob->flags & (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) {
//trace_term(t, TT_HTTP_SRV_11);
do_close_write:
buffer_shutw(ob);
if (ib->flags & BF_SHUTR)
goto do_close_and_return;
EV_FD_CLR(fd, DIR_WR);
shutdown(fd, SHUT_WR);
}
/* Write timeout */
else if (unlikely(!(ob->flags & BF_WRITE_TIMEOUT) && tick_is_expired(ob->wex, now_ms))) {
//trace_term(t, TT_HTTP_SRV_13);
ob->flags |= BF_WRITE_TIMEOUT;
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
ob->cons->err_type = SI_ET_DATA_TO;
}
goto do_close_write;
}
/* Write not closed, update FD status and timeout for writes */
else if ((ob->flags & (BF_EMPTY|BF_MAY_FORWARD)) != BF_MAY_FORWARD) {
/* stop writing */
EV_FD_COND_C(fd, DIR_WR);
ob->wex = TICK_ETERNITY;
}
else {
/* (re)start writing and update timeout. Note: we don't recompute the timeout
* everytime we get here, otherwise it would risk never to expire. We only
* update it if is was not yet set, or if we already got some write status.
*/
EV_FD_COND_S(fd, DIR_WR);
if (!tick_isset(ob->wex) || ob->flags & BF_WRITE_STATUS) {
ob->wex = tick_add_ifset(now_ms, ob->wto);
if (tick_isset(ob->wex) && !(ib->flags & BF_SHUTR) && tick_isset(ib->rex)) {
/* Note: depending on the protocol, we don't know if we're waiting
* for incoming data or not. So in order to prevent the socket from
* expiring read timeouts during writes, we refresh the read timeout,
* except if it was already infinite.
*/
ib->rex = ob->wex;
}
}
}
}
return 0; /* other cases change nothing */
}
/*
* Local variables: