[MAJOR] add a connection error state to the stream_interface

Tracking connection status changes was hard, and some code was
redundant. A new SI_ST_CER state was added to the stream interface
to indicate a past connection error, and an SI_FL_ERR flag was
added to report past I/O error. The stream_sock code does not set
the connection to SI_ST_CLO anymore in case of I/O error, it's
the upper layer which does it. This makes it possible to know
exactly when the file descriptors are allocated.

The new SI_ST_CER state permitted to split tcp_connection_status()
in two parts, one processing SI_ST_CON and the other one SI_ST_CER.
Synchronous connection errors now make use of this last state, hence
eliminating duplicate code.

Some ib<->ob copy paste errors were found and fixed, and all entities
setting SI_ST_CLO also shut the buffers down.

Some of these stream_interface specific functions and structures
have migrated to a new stream_interface.c file.

Some types of errors are still not detected by the buffers. For
instance, let's assume the following scenario in one single pass
of process_session: a connection sits in SI_ST_TAR state during
a retry. At TAR expiration, a new connection attempt is made, the
connection is obtained and srv->cur_sess is increased. Then the
buffer timeout is fires and everything is cleared, the new state
becomes SI_ST_CLO. The cleaning code checks that previous state
was either SI_ST_CON or SI_ST_EST to release the connection. But
that's wrong because last state is still SI_ST_TAR. So the
server's connection count does not get decreased.

This means that prev_state must not be used, and must be replaced
by some transition detection instead of level detection.

The following debugging line was useful to track state changes :

  fprintf(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
          s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s-> rep->flags);
This commit is contained in:
Willy Tarreau 2008-11-03 06:26:53 +01:00
parent efb453c259
commit cff6411f9a
7 changed files with 420 additions and 326 deletions

View File

@ -452,7 +452,7 @@ OBJS = src/haproxy.o src/sessionhash.o src/base64.o src/protocols.o \
src/time.o src/fd.o src/regex.o src/cfgparse.o src/server.o \
src/checks.o src/queue.o src/client.o src/proxy.o src/proto_uxst.o \
src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \
src/senddata.o src/dumpstats.o src/proto_tcp.o \
src/stream_interface.o src/senddata.o src/dumpstats.o src/proto_tcp.o \
src/session.o src/hdr_idx.o src/ev_select.o \
src/acl.o src/memory.o \
src/ebtree.o src/eb32tree.o

View File

@ -0,0 +1,42 @@
/*
include/proto/stream_interface.h
This file contains stream_interface function prototypes
Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation, version 2.1
exclusively.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef _PROTO_STREAM_INTERFACE_H
#define _PROTO_STREAM_INTERFACE_H
#include <stdlib.h>
#include <common/config.h>
#include <types/stream_interface.h>
/* main event functions used to move data between sockets and buffers */
void stream_int_check_timeouts(struct stream_interface *si);
void stream_int_report_error(struct stream_interface *si);
#endif /* _PROTO_STREAM_INTERFACE_H */
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/

View File

@ -37,7 +37,6 @@ int stream_sock_data_update(int fd);
int stream_sock_data_finish(int fd);
int stream_sock_shutr(struct stream_interface *si);
int stream_sock_shutw(struct stream_interface *si);
int stream_sock_check_timeouts(struct stream_interface *si);
/* This either returns the sockname or the original destination address. Code

View File

@ -38,8 +38,9 @@ enum {
SI_ST_TAR, /* interface in turn-around state after failed connect attempt */
SI_ST_ASS, /* server just assigned to this interface */
SI_ST_CON, /* initiated connection request (resource exists) */
SI_ST_CER, /* previous connection attempt failed (resource released) */
SI_ST_EST, /* connection established (resource exists) */
SI_ST_CLO, /* stream interface closed, might not existing anymore */
SI_ST_CLO, /* stream intf closed, might not existing anymore. Buffers shut. */
};
/* error types reported on the streams interface for more accurate reporting */
@ -61,6 +62,7 @@ enum {
enum {
SI_FL_NONE = 0x0000, /* nothing */
SI_FL_EXP = 0x0001, /* timeout has expired */
SI_FL_ERR = 0x0002, /* a non-recoverable error has occurred */
};
struct stream_interface {

View File

@ -50,6 +50,7 @@
#include <proto/queue.h>
#include <proto/senddata.h>
#include <proto/session.h>
#include <proto/stream_interface.h>
#include <proto/stream_sock.h>
#include <proto/task.h>
@ -205,7 +206,8 @@ fd_set url_encode_map[(sizeof(fd_set) > (256/8)) ? 1 : ((256/8) / sizeof(fd_set)
#error "Check if your OS uses bitfields for fd_sets"
#endif
int tcp_connection_status(struct session *t);
int sess_update_st_con_tcp(struct session *s, struct stream_interface *si);
int sess_update_st_cer(struct session *s, struct stream_interface *si);
void init_proto_http()
{
@ -649,6 +651,7 @@ http_get_path(struct http_txn *txn)
/* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE, SI_ST_TAR.
* Other input states are simply ignored.
* Possible output states are SI_ST_CLO, SI_ST_TAR, SI_ST_ASS, SI_ST_REQ, SI_ST_CON.
* Flags must have previously been updated for timeouts and other conditions.
*/
void sess_update_stream_int(struct session *s, struct stream_interface *si)
@ -691,62 +694,23 @@ void sess_update_stream_int(struct session *s, struct stream_interface *si)
process_srv_queue(s->srv);
/* Failed and not retryable. */
buffer_shutr(s->rep);
buffer_shutw(s->req);
s->req->flags |= BF_WRITE_ERROR;
buffer_shutr(si->ib);
buffer_shutw(si->ob);
si->ob->flags |= BF_WRITE_ERROR;
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
si->state = SI_ST_CLO;
return;
}
/* We are facing a retryable error */
s->conn_retries--;
if (s->conn_retries < 0) {
/* No retries left, abort */
if (!si->err_type) {
si->err_type = SI_ET_CONN_ERR;
si->err_loc = s->srv;
}
if (s->srv)
s->srv->failed_conns++;
s->be->failed_conns++;
/* We used to have a free connection slot. Since we'll never use it,
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(s->srv, s->be))
process_srv_queue(s->srv);
buffer_shutr(s->rep);
buffer_shutw(s->req);
s->req->flags |= BF_WRITE_ERROR;
si->state = SI_ST_CLO;
return;
}
/* If the "redispatch" option is set on the backend, we are allowed to
* retry on another server for the last retry. In order to achieve this,
* we must mark the session unassigned, and eventually clear the DIRECT
* bit to ignore any persistence cookie. We won't count a retry nor a
* redispatch yet, because this will depend on what server is selected.
/* We are facing a retryable error, but we don't want to run a
* turn-around now, as the problem is likely a source port
* allocation problem, so we want to retry now.
*/
if (s->srv && s->conn_retries == 0 && s->be->options & PR_O_REDISP) {
if (may_dequeue_tasks(s->srv, s->be))
process_srv_queue(s->srv);
s->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
s->prev_srv = s->srv;
si->state = SI_ST_REQ;
} else {
if (s->srv)
s->srv->retries++;
s->be->retries++;
si->state = SI_ST_ASS;
}
si->state = SI_ST_CER;
si->flags &= ~SI_FL_ERR;
sess_update_st_cer(s, si);
/* now si->state is one of SI_ST_CLO, SI_ST_TAR, SI_ST_ASS, SI_ST_REQ */
return;
}
else if (si->state == SI_ST_QUE) {
@ -775,7 +739,9 @@ void sess_update_stream_int(struct session *s, struct stream_interface *si)
if (s->srv)
s->srv->failed_conns++;
s->be->failed_conns++;
s->req->flags |= BF_WRITE_TIMEOUT;
buffer_shutr(si->ib);
buffer_shutw(si->ob);
si->ob->flags |= BF_WRITE_TIMEOUT;
if (!si->err_type)
si->err_type = SI_ET_QUEUE_TO;
si->state = SI_ST_CLO;
@ -783,14 +749,14 @@ void sess_update_stream_int(struct session *s, struct stream_interface *si)
}
/* Connection remains in queue, check if we have to abort it */
if ((s->req->flags & (BF_READ_ERROR|BF_SHUTW_NOW)) | /* abort requested */
((s->req->flags & BF_SHUTR) && /* empty and client stopped */
(s->req->flags & BF_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) {
if ((si->ob->flags & (BF_READ_ERROR|BF_SHUTW_NOW)) || /* abort requested */
((si->ob->flags & BF_SHUTR) && /* empty and client stopped */
(si->ob->flags & BF_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) {
/* give up */
si->exp = TICK_ETERNITY;
s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
buffer_shutr(s->rep);
buffer_shutw(s->req);
buffer_shutr(si->ib);
buffer_shutw(si->ob);
si->err_type |= SI_ET_QUEUE_ABRT;
si->state = SI_ST_CLO;
return;
@ -801,13 +767,13 @@ void sess_update_stream_int(struct session *s, struct stream_interface *si)
}
else if (si->state == SI_ST_TAR) {
/* Connection request might be aborted */
if ((s->req->flags & (BF_READ_ERROR|BF_SHUTW_NOW)) | /* abort requested */
((s->req->flags & BF_SHUTR) && /* empty and client stopped */
(s->req->flags & BF_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) {
if ((si->ob->flags & (BF_READ_ERROR|BF_SHUTW_NOW)) || /* abort requested */
((si->ob->flags & BF_SHUTR) && /* empty and client stopped */
(si->ob->flags & BF_EMPTY || s->be->options & PR_O_ABRT_CLOSE))) {
/* give up */
si->exp = TICK_ETERNITY;
buffer_shutr(s->rep);
buffer_shutw(s->req);
buffer_shutr(si->ib);
buffer_shutw(si->ob);
si->err_type |= SI_ET_CONN_ABRT;
si->state = SI_ST_CLO;
return;
@ -870,6 +836,8 @@ static void perform_http_redirect(struct session *s, struct stream_interface *si
rdr.len += 4;
/* prepare to return without error. */
buffer_shutr(si->ib);
buffer_shutw(si->ob);
si->err_type = SI_ET_NONE;
si->err_loc = NULL;
si->state = SI_ST_CLO;
@ -909,9 +877,9 @@ static void sess_prepare_conn_req(struct session *s, struct stream_interface *si
return;
/* we did not get any server, let's check the cause */
buffer_shutr(s->rep);
buffer_shutw(s->req);
s->req->flags |= BF_WRITE_ERROR;
buffer_shutr(si->ib);
buffer_shutw(si->ob);
si->ob->flags |= BF_WRITE_ERROR;
if (!si->err_type)
si->err_type = SI_ET_CONN_OTHER;
si->state = SI_ST_CLO;
@ -975,77 +943,94 @@ void process_session(struct task *t, int *next)
unsigned int rqf_cli, rpf_cli;
unsigned int rqf_srv, rpf_srv;
/* 1a: Check for lower layer timeouts if needed */
if (unlikely(t->state & TASK_WOKEN_TIMER)) {
stream_sock_check_timeouts(&s->si[0]);
stream_sock_check_timeouts(&s->si[1]);
}
//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
// s->si[0].state, s->si[1].state, s->si[1].err_type, s->req->flags, s->rep->flags);
/* 1b: Check for upper layer timeouts if needed */
/* 1a: Check for low level timeouts if needed. We just set a flag on
* buffers and/or stream interfaces when their timeouts have expired.
*/
if (unlikely(t->state & TASK_WOKEN_TIMER)) {
stream_int_check_timeouts(&s->si[0]);
stream_int_check_timeouts(&s->si[1]);
buffer_check_timeouts(s->req);
buffer_check_timeouts(s->rep);
if (unlikely(s->req->flags & (BF_READ_TIMEOUT|BF_WRITE_TIMEOUT))) {
if (s->req->flags & BF_READ_TIMEOUT) {
buffer_shutw(s->req);
s->req->cons->shutr(s->req->prod);
}
if (s->req->flags & BF_WRITE_TIMEOUT) {
buffer_shutw(s->req);
s->req->cons->shutw(s->req->cons);
}
}
if (unlikely(s->rep->flags & (BF_READ_TIMEOUT|BF_WRITE_TIMEOUT))) {
if (s->rep->flags & BF_READ_TIMEOUT) {
buffer_shutw(s->rep);
s->rep->cons->shutr(s->rep->prod);
}
if (s->rep->flags & BF_WRITE_TIMEOUT) {
buffer_shutw(s->rep);
s->rep->cons->shutw(s->rep->cons);
}
}
/* Note that we don't check nor indicate if we wake up because
* of a timeout on a stream interface.
*/
}
/* Maybe we were trying to establish a connection on the server side ? */
if (s->si[1].state == SI_ST_CON)
tcp_connection_status(s);
/* 1b: check for low-level errors reported at the stream interface.
* First we check if it's a retryable error (in which case we don't
* want to tell the buffer). Otherwise we report the error one level
* upper by setting flags into the buffers. Note that the side towards
* the client cannot have connect (hence retryable) errors.
*/
if (unlikely(s->si[0].state == SI_ST_EST)) {
if (s->si[0].flags & SI_FL_ERR) {
s->si[0].state = SI_ST_CLO;
fd_delete(s->si[0].fd);
stream_int_report_error(&s->si[0]);
}
}
/* now try to complete any initiated connection setup */
if (s->si[1].state >= SI_ST_REQ && s->si[1].state < SI_ST_CON) {
do {
/* nb: step 1 might switch from QUE to ASS, but we first want
* to give a chance to step 2 to perform a redirect if needed.
*/
sess_update_stream_int(s, &s->si[1]);
if (s->si[1].state == SI_ST_REQ)
sess_prepare_conn_req(s, &s->si[1]);
if (s->si[1].state == SI_ST_EST) {
if (s->si[1].flags & SI_FL_ERR) {
s->si[1].state = SI_ST_CLO;
fd_delete(s->si[1].fd);
stream_int_report_error(&s->si[1]);
}
}
else if (s->si[1].state != SI_ST_INI && s->si[1].state != SI_ST_CLO) {
/* Maybe we were trying to establish a connection on the server side ? */
if (s->si[1].state == SI_ST_CON)
sess_update_st_con_tcp(s, &s->si[1]);
if (s->si[1].state == SI_ST_ASS && s->srv &&
s->srv->rdr_len && (s->flags & SN_REDIRECTABLE))
perform_http_redirect(s, &s->si[1]);
if (s->si[1].state == SI_ST_CER)
sess_update_st_cer(s, &s->si[1]);
} while (s->si[1].state == SI_ST_ASS);
/* now try to complete any initiated connection setup */
if (s->si[1].state >= SI_ST_REQ && s->si[1].state < SI_ST_CON) {
do {
/* nb: step 1 might switch from QUE to ASS, but we first want
* to give a chance to step 2 to perform a redirect if needed.
*/
sess_update_stream_int(s, &s->si[1]);
if (s->si[1].state == SI_ST_REQ)
sess_prepare_conn_req(s, &s->si[1]);
if (s->si[1].state == SI_ST_ASS && s->srv &&
s->srv->rdr_len && (s->flags & SN_REDIRECTABLE))
perform_http_redirect(s, &s->si[1]);
} while (s->si[1].state == SI_ST_ASS);
}
}
/* FIXME: we might have got an error above, and we should process them below */
if (s->si[1].state == SI_ST_CLO && s->si[1].err_type != SI_ET_NONE)
if (s->si[1].state == SI_ST_CLO && s->si[1].prev_state != SI_ST_CLO &&
s->si[1].err_type != SI_ET_NONE)
return_srv_error(s, s->si[1].err_type);
/* Forward errors from stream interface to buffers */
if (s->si[0].state == SI_ST_CLO && s->si[0].err_type != SI_ET_NONE) {
s->req->flags |= BF_READ_ERROR;
s->rep->flags |= BF_WRITE_ERROR;
/* 1c: Manage buffer timeouts. */
if (unlikely(s->req->flags & (BF_READ_TIMEOUT|BF_WRITE_TIMEOUT))) {
if (s->req->flags & BF_READ_TIMEOUT) {
buffer_shutr(s->req);
s->req->cons->shutr(s->req->prod);
}
if (s->req->flags & BF_WRITE_TIMEOUT) {
buffer_shutw(s->req);
s->req->cons->shutw(s->req->cons);
}
}
if (s->si[1].state == SI_ST_CLO && s->si[1].err_type != SI_ET_NONE) {
s->req->flags |= BF_WRITE_ERROR;
s->rep->flags |= BF_READ_ERROR;
if (unlikely(s->rep->flags & (BF_READ_TIMEOUT|BF_WRITE_TIMEOUT))) {
if (s->rep->flags & BF_READ_TIMEOUT) {
buffer_shutr(s->rep);
s->rep->cons->shutr(s->rep->prod);
}
if (s->rep->flags & BF_WRITE_TIMEOUT) {
buffer_shutw(s->rep);
s->rep->cons->shutw(s->rep->cons);
}
}
/* 2: Check if we need to close the write side. This can only happen
@ -1067,9 +1052,14 @@ void process_session(struct task *t, int *next)
/* 3: When a server-side connection is released, we have to
* count it and check for pending connections on this server.
* FIXME: the test below is not accurate. An audit is needed
* to find all uncaught transitions. We need a way to ensure
* that shutdowns called right after connect() after TAR will
* correctly be caught for instance. In fact we need a way to
* track when the connection is assigned to the server.
*/
if (unlikely(s->req->cons->state == SI_ST_CLO &&
s->req->cons->prev_state == SI_ST_EST)) {
(s->req->cons->prev_state == SI_ST_EST || s->req->cons->prev_state == SI_ST_CON))) {
/* Count server-side errors (but not timeouts). */
if (s->req->flags & BF_WRITE_ERROR) {
s->be->failed_resp++;
@ -1128,7 +1118,8 @@ void process_session(struct task *t, int *next)
if (s->req->cons->state != SI_ST_CLO) {
if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) ||
((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) {
if (s->req->cons->state == SI_ST_INI && s->req->flags & BF_WRITE_ENA) {
if (s->req->cons->state == SI_ST_INI &&
(s->req->flags & (BF_WRITE_ENA|BF_SHUTW|BF_SHUTW_NOW)) == BF_WRITE_ENA) {
s->req->cons->state = SI_ST_REQ;
do {
sess_prepare_conn_req(s, &s->si[1]);
@ -1282,7 +1273,7 @@ void process_session(struct task *t, int *next)
s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE & BF_CLEAR_TIMEOUT;
s->si[0].prev_state = s->si[0].state;
s->si[1].prev_state = s->si[1].state;
s->si[0].flags = s->si[1].flags = 0;
s->si[0].flags = s->si[1].flags = SI_FL_NONE;
/* Trick: if a request is being waiting for the server to respond,
* and if we know the server can timeout, we don't want the timeout
@ -3714,176 +3705,201 @@ int process_response(struct session *t)
}
/* Return 1 if the pending connection has failed AND should be retried,
* otherwise zero. We may only come here in SI_ST_CON state, which means that
* the socket's file descriptor is known.
/* This function is called with (si->state == SI_ST_CON) meaning that a
* connection was attempted and that the file descriptor is already allocated.
* We must check for establishment, error and abort. Possible output states
* are SI_ST_EST (established), SI_ST_CER (error), SI_ST_CLO (abort), and
* SI_ST_CON (no change). The function returns 0 if it switches to SI_ST_CER,
* otherwise 1.
*/
int tcp_connection_status(struct session *t)
int sess_update_st_con_tcp(struct session *s, struct stream_interface *si)
{
struct buffer *req = t->req;
struct buffer *rep = t->rep;
int conn_err = 0;
struct buffer *req = si->ob;
struct buffer *rep = si->ib;
DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x rql=%d rpl=%d, fds=%d\n",
now_ms, __FUNCTION__,
cli_stnames[t->cli_state],
cli_stnames[s->cli_state],
rep->rex, req->wex,
req->flags, rep->flags,
req->l, rep->l,
fdtab[req->cons->fd].state);
fdtab[si->fd].state);
if ((req->flags & BF_SHUTW_NOW) ||
(rep->flags & BF_SHUTW) ||
((req->flags & BF_SHUTR) && /* FIXME: this should not prevent a connection from establishing */
((req->flags & BF_EMPTY && !(req->flags & BF_WRITE_ACTIVITY)) ||
t->be->options & PR_O_ABRT_CLOSE))) {
/* If we got an error, or if nothing happened and the connection timed
* out, we must give up. The CER state handler will take care of retry
* attempts and error reports.
*/
if (unlikely(si->flags & (SI_FL_EXP|SI_FL_ERR))) {
si->state = SI_ST_CER;
fd_delete(si->fd);
if (s->srv) {
s->srv->cur_sess--;
sess_change_server(s, NULL);
si->err_loc = s->srv;
}
if (si->err_type)
return 0;
if (si->flags & SI_FL_ERR)
si->err_type = SI_ET_CONN_ERR;
else
si->err_type = SI_ET_CONN_TO;
return 0;
}
/* OK, maybe we want to abort */
if (unlikely((req->flags & BF_SHUTW_NOW) ||
(rep->flags & BF_SHUTW) ||
((req->flags & BF_SHUTR) && /* FIXME: this should not prevent a connection from establishing */
((req->flags & BF_EMPTY && !(req->flags & BF_WRITE_ACTIVITY)) ||
s->be->options & PR_O_ABRT_CLOSE)))) {
/* give up */
trace_term(t, TT_HTTP_SRV_5);
req->wex = TICK_ETERNITY;
fd_delete(req->cons->fd);
if (t->srv) {
t->srv->cur_sess--;
sess_change_server(t, NULL);
fd_delete(si->fd);
if (s->srv) {
s->srv->cur_sess--;
sess_change_server(s, NULL);
}
buffer_shutw(req);
buffer_shutr(rep);
req->cons->state = SI_ST_CLO;
req->cons->err_type |= SI_ET_CONN_ABRT;
req->cons->err_loc = t->srv;
return 0;
}
/* check for timeouts and asynchronous connect errors */
if (fdtab[req->cons->fd].state == FD_STERROR) {
conn_err = SI_ET_CONN_ERR;
if (!req->cons->err_type)
req->cons->err_type = SI_ET_CONN_ERR;
}
else if (!(req->flags & BF_WRITE_ACTIVITY)) {
/* nothing happened, maybe we timed out */
if (req->flags & BF_WRITE_TIMEOUT) {
conn_err = SI_ET_CONN_TO;
if (!req->cons->err_type)
req->cons->err_type = SI_ET_CONN_TO;
}
else
return 0; /* let's wait a bit more */
}
if (conn_err) {
fd_delete(req->cons->fd);
if (t->srv) {
t->srv->cur_sess--;
sess_change_server(t, NULL);
req->cons->err_loc = t->srv;
}
/* ensure that we have enough retries left */
t->conn_retries--;
if (t->conn_retries < 0) {
if (!t->req->cons->err_type) {
t->req->cons->err_type = SI_ET_CONN_ERR;
t->req->cons->err_loc = t->srv;
}
if (t->srv)
t->srv->failed_conns++;
t->be->failed_conns++;
if (may_dequeue_tasks(t->srv, t->be))
process_srv_queue(t->srv);
req->cons->state = SI_ST_CLO;
return 0;
}
/* If the "redispatch" option is set on the backend, we are allowed to
* retry on another server for the last retry. In order to achieve this,
* we must mark the session unassigned, and eventually clear the DIRECT
* bit to ignore any persistence cookie. We won't count a retry nor a
* redispatch yet, because this will depend on what server is selected.
*/
if (t->srv && t->conn_retries == 0 && t->be->options & PR_O_REDISP) {
if (may_dequeue_tasks(t->srv, t->be))
process_srv_queue(t->srv);
t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
t->prev_srv = t->srv;
req->cons->state = SI_ST_REQ;
} else {
if (t->srv)
t->srv->retries++;
t->be->retries++;
req->cons->state = SI_ST_ASS;
}
if (conn_err == SI_ET_CONN_ERR) {
/* The error was an immediate connection error, and we
* will likely have to retry connecting to the same
* server, most likely leading to the same result. To
* avoid this, we wait one second before retrying.
*/
req->cons->state = SI_ST_TAR;
req->cons->exp = tick_add(now_ms, MS_TO_TICKS(1000));
return 0;
}
/* We'll rely on the caller to try to get a connection again */
si->state = SI_ST_CLO;
si->err_type |= SI_ET_CONN_ABRT;
si->err_loc = s->srv;
return 1;
}
else {
/* no error and write OK : connection succeeded */
t->logs.t_connect = tv_ms_elapsed(&t->logs.tv_accept, &now);
req->cons->state = SI_ST_EST;
req->cons->err_type = SI_ET_NONE;
req->cons->err_loc = NULL;
if (req->flags & BF_EMPTY) {
EV_FD_CLR(req->cons->fd, DIR_WR);
req->wex = TICK_ETERNITY;
} else {
EV_FD_SET(req->cons->fd, DIR_WR);
req->wex = tick_add_ifset(now_ms, t->be->timeout.server);
if (tick_isset(req->wex)) {
/* FIXME: to prevent the server from expiring read timeouts during writes,
* we refresh it. */
rep->rex = req->wex;
}
}
/* we need to wait a bit more if there was no activity either */
if (!(req->flags & BF_WRITE_ACTIVITY))
return 1;
if (t->be->mode == PR_MODE_TCP) { /* let's allow immediate data connection in this case */
if (!(rep->flags & BF_HIJACK)) {
EV_FD_SET(req->cons->fd, DIR_RD);
rep->rex = tick_add_ifset(now_ms, t->be->timeout.server);
}
buffer_set_rlim(rep, BUFSIZE); /* no rewrite needed */
/* OK, this means that a connection succeeded */
s->logs.t_connect = tv_ms_elapsed(&s->logs.tv_accept, &now);
si->state = SI_ST_EST;
si->err_type = SI_ET_NONE;
si->err_loc = NULL;
/* if the user wants to log as soon as possible, without counting
bytes from the server, then this is the right moment. */
if (t->fe->to_log && !(t->logs.logwait & LW_BYTES)) {
t->logs.t_close = t->logs.t_connect; /* to get a valid end date */
tcp_sess_log(t);
}
#ifdef CONFIG_HAP_TCPSPLICE
if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) {
/* TCP splicing supported by both FE and BE */
tcp_splice_splicefd(req->prod->fd, req->cons->fd, 0);
}
#endif
}
else {
rep->analysers |= AN_RTR_HTTP_HDR;
buffer_set_rlim(rep, BUFSIZE - MAXREWRITE); /* rewrite needed */
t->txn.rsp.msg_state = HTTP_MSG_RPBEFORE;
/* reset hdr_idx which was already initialized by the request.
* right now, the http parser does it.
* hdr_idx_init(&t->txn.hdr_idx);
*/
}
rep->flags |= BF_READ_ATTACHED; /* producer is now attached */
if (req->flags & BF_EMPTY) {
EV_FD_CLR(si->fd, DIR_WR);
req->wex = TICK_ETERNITY;
} else {
EV_FD_SET(si->fd, DIR_WR);
req->wex = tick_add_ifset(now_ms, s->be->timeout.server);
if (tick_isset(req->wex)) {
/* FIXME: to prevent the server from expiring read
* timeouts during writes, we refresh it. */
rep->rex = req->wex;
}
}
if (s->be->mode == PR_MODE_TCP) { /* let's allow immediate data connection in this case */
if (!(rep->flags & BF_HIJACK)) {
EV_FD_SET(si->fd, DIR_RD);
rep->rex = tick_add_ifset(now_ms, s->be->timeout.server);
}
buffer_set_rlim(rep, BUFSIZE); /* no rewrite needed */
/* if the user wants to log as soon as possible, without counting
* bytes from the server, then this is the right moment. */
if (s->fe->to_log && !(s->logs.logwait & LW_BYTES)) {
s->logs.t_close = s->logs.t_connect; /* to get a valid end date */
tcp_sess_log(s);
}
#ifdef CONFIG_HAP_TCPSPLICE
if ((s->fe->options & s->be->options) & PR_O_TCPSPLICE) {
/* TCP splicing supported by both FE and BE */
tcp_splice_splicefd(req->prod->fd, si->fd, 0);
}
#endif
}
else {
rep->analysers |= AN_RTR_HTTP_HDR;
buffer_set_rlim(rep, BUFSIZE - MAXREWRITE); /* rewrite needed */
s->txn.rsp.msg_state = HTTP_MSG_RPBEFORE;
/* reset hdr_idx which was already initialized by the request.
* right now, the http parser does it.
* hdr_idx_init(&s->txn.hdr_idx);
*/
}
rep->flags |= BF_READ_ATTACHED; /* producer is now attached */
req->wex = TICK_ETERNITY;
return 1;
}
/* This function is called with (si->state == SI_ST_CER) meaning that a
* previous connection attempt has failed and that the file descriptor
* has already been released. Possible causes include asynchronous error
* notification and time out. Possible output states are SI_ST_CLO when
* retries are exhausted, SI_ST_TAR when a delay is wanted before a new
* connection attempt, SI_ST_ASS when it's wise to retry on the same server,
* and SI_ST_REQ when an immediate redispatch is wanted. The buffers are
* marked as in error state. It returns 0.
*/
int sess_update_st_cer(struct session *s, struct stream_interface *si)
{
/* ensure that we have enough retries left */
s->conn_retries--;
if (s->conn_retries < 0) {
if (!si->err_type) {
si->err_type = SI_ET_CONN_ERR;
si->err_loc = s->srv;
}
if (s->srv)
s->srv->failed_conns++;
s->be->failed_conns++;
if (may_dequeue_tasks(s->srv, s->be))
process_srv_queue(s->srv);
buffer_shutw(si->ob);
si->ob->flags |= BF_WRITE_ERROR;
buffer_shutr(si->ib);
si->ib->flags |= BF_READ_ERROR;
si->state = SI_ST_CLO;
return 0;
}
/* If the "redispatch" option is set on the backend, we are allowed to
* retry on another server for the last retry. In order to achieve this,
* we must mark the session unassigned, and eventually clear the DIRECT
* bit to ignore any persistence cookie. We won't count a retry nor a
* redispatch yet, because this will depend on what server is selected.
*/
if (s->srv && s->conn_retries == 0 && s->be->options & PR_O_REDISP) {
if (may_dequeue_tasks(s->srv, s->be))
process_srv_queue(s->srv);
s->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
s->prev_srv = s->srv;
si->state = SI_ST_REQ;
} else {
if (s->srv)
s->srv->retries++;
s->be->retries++;
si->state = SI_ST_ASS;
}
if (si->flags & SI_FL_ERR) {
/* The error was an asynchronous connection error, and we will
* likely have to retry connecting to the same server, most
* likely leading to the same result. To avoid this, we wait
* one second before retrying.
*/
if (!si->err_type)
si->err_type = SI_ET_CONN_ERR;
si->state = SI_ST_TAR;
si->exp = tick_add(now_ms, MS_TO_TICKS(1000));
return 0;
}
return 0;
}

67
src/stream_interface.c Normal file
View File

@ -0,0 +1,67 @@
/*
* Functions managing stream_interface structures
*
* Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version
* 2 of the License, or (at your option) any later version.
*
*/
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <common/compat.h>
#include <common/config.h>
#include <common/debug.h>
#include <common/standard.h>
#include <common/ticks.h>
#include <common/time.h>
#include <proto/buffers.h>
#include <proto/client.h>
#include <proto/fd.h>
#include <proto/stream_sock.h>
#include <proto/task.h>
/*
* This function only has to be called once after a wakeup event in case of
* suspected timeout. It controls the stream interface timeouts and sets
* si->flags accordingly. It does NOT close anything, as this timeout may
* be used for any purpose. It returns 1 if the timeout fired, otherwise
* zero.
*/
int stream_int_check_timeouts(struct stream_interface *si)
{
if (tick_is_expired(si->exp, now_ms)) {
si->flags |= SI_FL_EXP;
return 1;
}
return 0;
}
void stream_int_report_error(struct stream_interface *si)
{
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
buffer_shutw(si->ob);
si->ob->flags |= BF_WRITE_ERROR;
buffer_shutr(si->ib);
si->ib->flags |= BF_READ_ERROR;
}
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/

View File

@ -259,31 +259,22 @@ int stream_sock_read(int fd) {
goto out_wakeup;
out_error:
/* There was an error. we must wakeup the task. No need to clear
* the events, the task will do it.
/* Read error on the file descriptor. We mark the FD as STERROR so
* that we don't use it anymore. The error is reported to the stream
* interface which will take proper action. We must not perturbate the
* buffer because the stream interface wants to ensure transparent
* connection retries.
*/
fdtab[fd].state = FD_STERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY;
b->rex = TICK_ETERNITY;
/* Read error on the file descriptor. We close the FD and set
* the error on both buffers.
* Note: right now we only support connected sockets.
*/
if (si->state != SI_ST_EST)
goto out_wakeup;
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
buffer_shutr(b);
b->flags |= BF_READ_ERROR;
buffer_shutw(si->ob);
si->ob->flags |= BF_WRITE_ERROR;
si->flags |= SI_FL_ERR;
goto wakeup_return;
do_close_and_return:
fd_delete(fd);
si->state = SI_ST_CLO;
fd_delete(fd);
wakeup_return:
task_wakeup(si->owner, TASK_WOKEN_IO);
return 1;
}
@ -457,29 +448,22 @@ int stream_sock_write(int fd) {
return retval;
out_error:
/* There was an error. we must wakeup the task. No need to clear
* the events, the task will do it.
/* Write error on the file descriptor. We mark the FD as STERROR so
* that we don't use it anymore. The error is reported to the stream
* interface which will take proper action. We must not perturbate the
* buffer because the stream interface wants to ensure transparent
* connection retries.
*/
fdtab[fd].state = FD_STERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY;
b->wex = TICK_ETERNITY;
/* Read error on the file descriptor. We close the FD and set
* the error on both buffers.
* Note: right now we only support connected sockets.
*/
if (si->state != SI_ST_EST)
goto out_wakeup;
si->flags |= SI_FL_ERR;
goto wakeup_return;
if (!si->err_type)
si->err_type = SI_ET_DATA_ERR;
buffer_shutw(b);
b->flags |= BF_WRITE_ERROR;
buffer_shutr(si->ib);
si->ib->flags |= BF_READ_ERROR;
do_close_and_return:
fd_delete(fd);
si->state = SI_ST_CLO;
fd_delete(fd);
wakeup_return:
task_wakeup(si->owner, TASK_WOKEN_IO);
return 1;
}
@ -524,7 +508,7 @@ int stream_sock_shutr(struct stream_interface *si)
if (si->state != SI_ST_EST && si->state != SI_ST_CON)
return 0;
if (si->ib->flags & BF_SHUTW) {
if (si->ob->flags & BF_SHUTW) {
fd_delete(si->fd);
si->state = SI_ST_CLO;
return 1;
@ -533,22 +517,6 @@ int stream_sock_shutr(struct stream_interface *si)
return 0;
}
/*
* This function only has to be called once after a wakeup event in case of
* suspected timeout. It controls the stream interface timeouts and sets
* si->flags accordingly. It does NOT close anything, as this timeout may
* be used for any purpose. It returns 1 if the timeout fired, otherwise
* zero.
*/
int stream_sock_check_timeouts(struct stream_interface *si)
{
if (tick_is_expired(si->exp, now_ms)) {
si->flags |= SI_FL_EXP;
return 1;
}
return 0;
}
/*
* Manages a stream_sock connection during its data phase. The buffers are
* examined for various cases of shutdown, then file descriptor and buffers'