diff --git a/Makefile b/Makefile index c042314fcc..1480fafffa 100644 --- a/Makefile +++ b/Makefile @@ -480,7 +480,8 @@ endif OBJS = src/haproxy.o src/sessionhash.o src/base64.o src/protocols.o \ src/uri_auth.o src/standard.o src/buffers.o src/log.o src/task.o \ src/time.o src/fd.o src/pipe.o src/regex.o src/cfgparse.o src/server.o \ - src/checks.o src/queue.o src/frontend.o src/proxy.o src/stick_table.o src/proto_uxst.o \ + src/checks.o src/queue.o src/frontend.o src/proxy.o src/peers.o \ + src/stick_table.o src/proto_uxst.o \ src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \ src/lb_chash.o src/lb_fwlc.o src/lb_fwrr.o src/lb_map.o \ src/stream_interface.o src/dumpstats.o src/proto_tcp.o \ diff --git a/Makefile.bsd b/Makefile.bsd index 8800c1b414..ca2347bd0a 100644 --- a/Makefile.bsd +++ b/Makefile.bsd @@ -106,7 +106,7 @@ OBJS = src/haproxy.o src/sessionhash.o src/base64.o src/protocols.o \ src/time.o src/fd.o src/pipe.o src/regex.o src/cfgparse.o src/server.o \ src/checks.o src/queue.o src/frontend.o src/proxy.o src/proto_uxst.o \ src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \ - src/stream_interface.o src/dumpstats.o src/proto_tcp.o \ + src/peers.o src/stream_interface.o src/dumpstats.o src/proto_tcp.o \ src/session.o src/hdr_idx.o src/ev_select.o src/signal.o \ src/lb_chash.o src/lb_fwlc.o src/lb_fwrr.o src/lb_map.o \ src/ev_poll.o src/ev_kqueue.o \ diff --git a/Makefile.osx b/Makefile.osx index a554c07762..7caa20b7ab 100644 --- a/Makefile.osx +++ b/Makefile.osx @@ -103,7 +103,7 @@ OBJS = src/haproxy.o src/sessionhash.o src/base64.o src/protocols.o \ src/time.o src/fd.o src/pipe.o src/regex.o src/cfgparse.o src/server.o \ src/checks.o src/queue.o src/frontend.o src/proxy.o src/proto_uxst.o \ src/proto_http.o src/stream_sock.o src/appsession.o src/backend.o \ - src/stream_interface.o src/dumpstats.o src/proto_tcp.o \ + src/peers.o src/stream_interface.o src/dumpstats.o src/proto_tcp.o \ src/session.o src/hdr_idx.o src/ev_select.o src/signal.o \ src/lb_chash.o src/lb_fwlc.o src/lb_fwrr.o src/lb_map.o \ src/ev_poll.o \ diff --git a/include/proto/peers.h b/include/proto/peers.h new file mode 100644 index 0000000000..d18885429e --- /dev/null +++ b/include/proto/peers.h @@ -0,0 +1,37 @@ +/* + * include/proto/peers.h + * This file defines function prototypes for peers management. + * + * Copyright 2010 EXCELIANCE, Emeric Brun + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _PROTO_PEERS_H +#define _PROTO_PEERS_H + +#include +#include +#include +#include +#include + +struct session * peer_session_create(struct peer *); +struct session * peers_register_table(struct peers *, struct stktable *table); + +int peer_accept(struct session *); + +#endif /* _PROTO_PEERS_H */ + diff --git a/include/types/global.h b/include/types/global.h index 944d25e639..906a52cc9a 100644 --- a/include/types/global.h +++ b/include/types/global.h @@ -121,6 +121,7 @@ extern const int one; extern const struct linger nolinger; extern int stopping; /* non zero means stopping in progress */ extern char hostname[MAX_HOSTNAME_LEN]; +extern char localpeer[MAX_HOSTNAME_LEN]; #endif /* _TYPES_GLOBAL_H */ diff --git a/include/types/peers.h b/include/types/peers.h new file mode 100644 index 0000000000..b5b92c2b77 --- /dev/null +++ b/include/types/peers.h @@ -0,0 +1,99 @@ +/* + * include/types/peers.h + * This file defines everything related to peers. + * + * Copyright 2010 EXCELIANCE, Emeric Brun + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _TYPES_PEERS_H +#define _TYPES_PEERS_H + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +struct peer_session { + struct shared_table *table; /* shared table */ + struct peer *peer; /* current peer */ + struct session *session; /* current transport session */ + unsigned int flags; /* peer session flags */ + unsigned int statuscode; /* current/last session status code */ + unsigned int update; /* current peer acked update */ + unsigned int pushack; /* last commited update to ack */ + unsigned int lastack; /* last acked update */ + unsigned int lastpush; /* last pushed update */ + unsigned int confirm; /* confirm message counter */ + unsigned int pushed; /* equal to last pushed data or to table local update in case of total push + * or to teaching_origin if teaching is ended */ + unsigned int reconnect; /* next connect timer */ + unsigned int teaching_origin; /* resync teaching origine update */ + struct peer_session *next; +}; + +struct shared_table { + struct stktable *table; /* stick table to sync */ + struct task *sync_task; /* main sync task */ + struct peer_session *local_session; /* local peer session */ + struct peer_session *sessions; /* peer sessions list */ + unsigned int flags; /* current table resync state */ + unsigned int resync_timeout; /* resync timeout timer */ + struct shared_table *next; /* next shared table in list */ +}; + +struct peer { + int local; /* proxy state */ + char *id; + struct peers *peers; + struct { + const char *file; /* file where the section appears */ + int line; /* line where the section appears */ + } conf; /* config information */ + time_t last_change; + struct sockaddr_in addr; /* peer address */ + struct peer *next; /* next peer in the list */ +}; + + +struct peers { + int state; /* proxy state */ + char *id; /* peer section name */ + struct peer *remote; /* remote peers list */ + struct proxy *peers_fe; /* peer frontend */ + struct { + const char *file; /* file where the section appears */ + int line; /* line where the section appears */ + } conf; /* config information */ + struct shared_table *tables; /* registered shared tables */ + time_t last_change; + struct peers *next; /* next peer section */ + int count; /* total of peers */ +}; + + +extern struct peers *peers; + +#endif /* _TYPES_PEERS_H */ + diff --git a/src/haproxy.c b/src/haproxy.c index 074a11716d..c5aa3ccc5a 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -156,6 +156,7 @@ const int one = 1; const struct linger nolinger = { .l_onoff = 1, .l_linger = 0 }; char hostname[MAX_HOSTNAME_LEN]; +char localpeer[MAX_HOSTNAME_LEN]; /*********************************************************************/ @@ -224,6 +225,7 @@ void usage(char *name) " -n sets the maximum total # of connections (%d)\n" " -m limits the usable amount of memory (in MB)\n" " -N sets the default, per-proxy maximum # of connections (%d)\n" + " -L set local peer name (default to hostname)\n" " -p writes pids of all children to this file\n" #if defined(ENABLE_EPOLL) " -de disables epoll() usage even when available\n" @@ -351,6 +353,15 @@ void init(int argc, char **argv) int err_code = 0; struct wordlist *wl; + /* NB: POSIX does not make it mandatory for gethostname() to NULL-terminate + * the string in case of truncation, and at least FreeBSD appears not to do + * it. + */ + memset(hostname, 0, sizeof(hostname)); + gethostname(hostname, sizeof(hostname) - 1); + memset(localpeer, 0, sizeof(localpeer)); + memcpy(localpeer, hostname, (sizeof(hostname) > sizeof(localpeer) ? sizeof(localpeer) : sizeof(hostname)) - 1); + /* * Initialize the previously static variables. */ @@ -469,6 +480,7 @@ void init(int argc, char **argv) case 'n' : cfg_maxconn = atol(*argv); break; case 'm' : global.rlimit_memmax = atol(*argv); break; case 'N' : cfg_maxpconn = atol(*argv); break; + case 'L' : strncpy(localpeer, *argv, sizeof(localpeer) - 1); break; case 'f' : wl = (struct wordlist *)calloc(1, sizeof(*wl)); if (!wl) { @@ -495,13 +507,6 @@ void init(int argc, char **argv) if (LIST_ISEMPTY(&cfg_cfgfiles)) usage(old_argv); - /* NB: POSIX does not make it mandatory for gethostname() to NULL-terminate - * the string in case of truncation, and at least FreeBSD appears not to do - * it. - */ - memset(hostname, 0, sizeof(hostname)); - gethostname(hostname, sizeof(hostname) - 1); - have_appsession = 0; global.maxsock = 10; /* reserve 10 fds ; will be incremented by socket eaters */ diff --git a/src/peers.c b/src/peers.c new file mode 100644 index 0000000000..76184079b8 --- /dev/null +++ b/src/peers.c @@ -0,0 +1,1482 @@ +/* + * Stick table syncro management. + * + * Copyright 2010 EXCELIANCE, Emeric Brun + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + * + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +/*******************************/ +/* Current peer learning state */ +/*******************************/ + +/******************************/ +/* Current table resync state */ +/******************************/ +#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */ +#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */ +#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */ +#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */ +#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop + to push data to new process */ + +#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE) +#define SHTABLE_RESYNC_FROMLOCAL 0x00000000 +#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL +#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE) + +/******************************/ +/* Remote peer teaching state */ +/******************************/ +#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */ +#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */ +#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */ +#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */ +#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */ +#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */ +#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */ + +#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */ +#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE) + + +/**********************************/ +/* Peer Session IO handler states */ +/**********************************/ + +#define PEER_SESSION_ACCEPT 1000 /* Initial state for session create by an accept */ +#define PEER_SESSION_GETVERSION 1001 /* Validate supported protocol version*/ +#define PEER_SESSION_GETHOST 1002 /* Validate host ID correspond to local host id */ +#define PEER_SESSION_GETPEER 1003 /* Validate peer ID correspond to a known remote peer id */ +#define PEER_SESSION_GETTABLE 1004 /* Search into registered table for a table with same id and + validate type and size */ +#define PEER_SESSION_SENDSUCCESS 1005 /* Send ret code 200 (success) and wait for message */ +/* next state is WAITMSG */ + +#define PEER_SESSION_CONNECT 2000 /* Initial state for session create on a connect, + push presentation into buffer */ +#define PEER_SESSION_GETSTATUS 2001 /* Wait for the welcome message */ +#define PEER_SESSION_WAITMSG 2002 /* Wait for datamessages*/ +/* loop on WAITMSG */ + +#define PEER_SESSION_EXIT 10000 /* Exit with status code */ +#define PEER_SESSION_END 10001 /* Killed session */ +/* session ended */ + + +/**********************************/ +/* Peer Session status code */ +/**********************************/ + +#define PEER_SESSION_CONNECTCODE 100 /* connect in progress */ +#define PEER_SESSION_CONNECTEDCODE 110 /* tcp connect success */ + +#define PEER_SESSION_SUCCESSCODE 200 /* accept or connect successful */ + +#define PEER_SESSION_TRYAGAIN 300 /* try again later */ + +#define PEER_SESSION_ERRPROTO 501 /* error protocol */ +#define PEER_SESSION_ERRVERSION 502 /* unknown protocol version */ +#define PEER_SESSION_ERRHOST 503 /* bad host name */ +#define PEER_SESSION_ERRPEER 504 /* unknown peer */ +#define PEER_SESSION_ERRTYPE 505 /* table key type mismatch */ +#define PEER_SESSION_ERRSIZE 506 /* table key size mismatch */ +#define PEER_SESSION_ERRTABLE 507 /* unknown table */ + +#define PEER_SESSION_PROTO_NAME "HAProxyS" + +struct peers *peers = NULL; +void peer_session_forceshutdown(struct session * session); + + +/* + * This prepare the data update message of the stick session , is the the peer session + * where the data going to be pushed, is a buffer of to recieve data message content + */ +int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size) +{ + uint32_t netinteger; + int len; + /* construct message */ + if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) { + msg[0] = 0x80 + ts->upd.key - ps->lastpush; + len = sizeof(char); + } + else { + msg[0] = 'D'; + netinteger = htonl(ts->upd.key); + memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger)); + len = sizeof(char) + sizeof(netinteger); + } + + if (ps->table->table->type == STKTABLE_TYPE_STRING) { + int stlen = strlen((char *)ts->key.key); + + netinteger = htonl(strlen((char *)ts->key.key)); + memcpy(&msg[len], &netinteger, sizeof(netinteger)); + memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen); + len += sizeof(netinteger) + stlen; + + } + else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) { + netinteger = htonl(*((uint32_t *)ts->key.key)); + memcpy(&msg[len], &netinteger, sizeof(netinteger)); + len += sizeof(netinteger); + } + else { + memcpy(&msg[len], ts->key.key, ps->table->table->key_size); + len += ps->table->table->key_size; + } + + if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID)) + netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id)); + else + netinteger = 0; + + memcpy(&msg[len], &netinteger , sizeof(netinteger)); + len += sizeof(netinteger); + + return len; +} + + +/* + * Callback to release a session with a peer + */ +void peer_session_release(struct stream_interface *si) +{ + struct task *t= (struct task *)si->owner; + struct session *s = (struct session *)t->context; + struct peer_session *ps = (struct peer_session *)si->private; + + /* si->private is not a peer session */ + if (si->st0 < PEER_SESSION_SENDSUCCESS) + return; + + /* peer session identified */ + if (ps) { + if (ps->session == s) { + ps->session = NULL; + if (ps->flags & PEER_F_LEARN_ASSIGN) { + /* unassign current peer for learning */ + ps->flags &= ~(PEER_F_LEARN_ASSIGN); + ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS); + + /* reschedule a resync */ + ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + } + /* reset teaching and learning flags to 0 */ + ps->flags &= PEER_TEACH_RESET; + ps->flags &= PEER_LEARN_RESET; + } + task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG); + } +} + + +/* + * IO Handler to handle message exchance with a peer + */ +void peer_io_handler(struct stream_interface *si) +{ + struct task *t= (struct task *)si->owner; + struct session *s = (struct session *)t->context; + struct peers *curpeers = (struct peers *)s->fe->parent; + int reql = 0; + int repl = 0; + + while (1) { +switchstate: + switch(si->st0) { + case PEER_SESSION_ACCEPT: + si->private = NULL; + si->st0 = PEER_SESSION_GETVERSION; + /* fall through */ + case PEER_SESSION_GETVERSION: + reql = buffer_get_line(si->ob, trash, sizeof(trash)); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + if (trash[reql-1] != '\n') { + si->st0 = PEER_SESSION_END; + goto switchstate; + } + else if (reql > 1 && (trash[reql-2] == '\r')) + trash[reql-2] = 0; + else + trash[reql-1] = 0; + + buffer_skip(si->ob, reql); + + /* test version */ + if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash) != 0) { + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRVERSION; + /* test protocol */ + if (strncmp(PEER_SESSION_PROTO_NAME " ", trash, strlen(PEER_SESSION_PROTO_NAME)+1) != 0) + si->st1 = PEER_SESSION_ERRPROTO; + goto switchstate; + } + + si->st0 = PEER_SESSION_GETHOST; + /* fall through */ + case PEER_SESSION_GETHOST: + reql = buffer_get_line(si->ob, trash, sizeof(trash)); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + if (trash[reql-1] != '\n') { + si->st0 = PEER_SESSION_END; + goto switchstate; + } + else if (reql > 1 && (trash[reql-2] == '\r')) + trash[reql-2] = 0; + else + trash[reql-1] = 0; + + buffer_skip(si->ob, reql); + + /* test hostname match */ + if (strcmp(localpeer, trash) != 0) { + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRHOST; + goto switchstate; + } + + si->st0 = PEER_SESSION_GETPEER; + /* fall through */ + case PEER_SESSION_GETPEER: { + struct peer *curpeer; + char *p; + reql = buffer_get_line(si->ob, trash, sizeof(trash)); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + if (trash[reql-1] != '\n') { + /* Incomplete line, we quit */ + si->st0 = PEER_SESSION_END; + goto switchstate; + } + else if (reql > 1 && (trash[reql-2] == '\r')) + trash[reql-2] = 0; + else + trash[reql-1] = 0; + + buffer_skip(si->ob, reql); + + /* parse line " " */ + p = strchr(trash, ' '); + if (!p) { + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRPROTO; + goto switchstate; + } + *p = 0; + + /* lookup known peer */ + for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) { + if (strcmp(curpeer->id, trash) == 0) + break; + } + + /* if unknown peer */ + if (!curpeer) { + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRPEER; + goto switchstate; + } + + si->private = curpeer; + si->st0 = PEER_SESSION_GETTABLE; + /* fall through */ + } + case PEER_SESSION_GETTABLE: { + struct peer *curpeer = (struct peer *)si->private; + struct shared_table *st; + struct peer_session *ps = NULL; + unsigned long key_type; + size_t key_size; + char *p; + + reql = buffer_get_line(si->ob, trash, sizeof(trash)); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) + goto out; + si->private = NULL; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + /* Re init si->private to null, to handle correctly a release case */ + si->private = NULL; + + if (trash[reql-1] != '\n') { + /* Incomplete line, we quit */ + si->st0 = PEER_SESSION_END; + goto switchstate; + } + else if (reql > 1 && (trash[reql-2] == '\r')) + trash[reql-2] = 0; + else + trash[reql-1] = 0; + + buffer_skip(si->ob, reql); + + /* Parse line " " */ + p = strchr(trash, ' '); + if (!p) { + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRPROTO; + goto switchstate; + } + *p = 0; + key_type = (unsigned long)atol(p+1); + + p = strchr(p+1, ' '); + if (!p) { + si->private = NULL; + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRPROTO; + goto switchstate; + } + + key_size = (size_t)atoi(p); + for (st = curpeers->tables; st; st = st->next) { + /* If table name matches */ + if (strcmp(st->table->id, trash) == 0) { + /* If key size mismatches */ + if (key_size != st->table->key_size) { + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRSIZE; + goto switchstate; + } + + /* If key type mismatches */ + if (key_type != st->table->type) { + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRTYPE; + goto switchstate; + } + + /* lookup peer session of current peer */ + for (ps = st->sessions; ps; ps = ps->next) { + if (ps->peer == curpeer) { + /* If session already active, replaced by new one */ + if (ps->session && ps->session != s) { + if (ps->peer->local) { + /* Local connection, reply a retry */ + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_TRYAGAIN; + goto switchstate; + } + peer_session_forceshutdown(ps->session); + } + ps->session = s; + break; + } + } + break; + } + } + + /* If table not found */ + if (!st){ + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRTABLE; + goto switchstate; + } + + /* If no peer session for current peer */ + if (!ps) { + si->st0 = PEER_SESSION_EXIT; + si->st1 = PEER_SESSION_ERRPEER; + goto switchstate; + } + + si->private = ps; + si->st0 = PEER_SESSION_SENDSUCCESS; + /* fall through */ + } + case PEER_SESSION_SENDSUCCESS:{ + struct peer_session *ps = (struct peer_session *)si->private; + + repl = snprintf(trash, sizeof(trash), "%d\n", PEER_SESSION_SUCCESSCODE); + repl = buffer_put_block(si->ib, trash, repl); + if (repl <= 0) { + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + + /* Register status code */ + ps->statuscode = PEER_SESSION_SUCCESSCODE; + + /* Awake main task */ + task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG); + + /* Init cursors */ + ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0; + ps->pushed = ps->update; + + /* Init confirm counter */ + ps->confirm = 0; + + /* reset teaching and learning flags to 0 */ + ps->flags &= PEER_TEACH_RESET; + ps->flags &= PEER_LEARN_RESET; + + /* if current peer is local */ + if (ps->peer->local) { + /* if table need resyncfrom local and no process assined */ + if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL && + !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) { + /* assign local peer for a lesson, consider lesson already requested */ + ps->flags |= PEER_F_LEARN_ASSIGN; + ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS); + } + + } + else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE && + !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) { + /* assign peer for a lesson */ + ps->flags |= PEER_F_LEARN_ASSIGN; + ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN; + } + /* switch to waiting message state */ + si->st0 = PEER_SESSION_WAITMSG; + goto switchstate; + } + case PEER_SESSION_CONNECT: { + struct peer_session *ps = (struct peer_session *)si->private; + + /* Send headers */ + repl = snprintf(trash, sizeof(trash), + PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n", + ps->peer->id, + localpeer, + getpid(), + ps->table->table->id, + ps->table->table->type, + ps->table->table->key_size); + + if (repl >= sizeof(trash)) { + si->st0 = PEER_SESSION_END; + goto switchstate; + } + + repl = buffer_put_block(si->ib, trash, repl); + if (repl <= 0) { + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + + /* switch to the waiting statuscode state */ + si->st0 = PEER_SESSION_GETSTATUS; + /* fall through */ + } + case PEER_SESSION_GETSTATUS: { + struct peer_session *ps = (struct peer_session *)si->private; + + if (si->ib->flags & BF_WRITE_PARTIAL) + ps->statuscode = PEER_SESSION_CONNECTEDCODE; + + reql = buffer_get_line(si->ob, trash, sizeof(trash)); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + if (trash[reql-1] != '\n') { + /* Incomplete line, we quit */ + si->st0 = PEER_SESSION_END; + goto switchstate; + } + else if (reql > 1 && (trash[reql-2] == '\r')) + trash[reql-2] = 0; + else + trash[reql-1] = 0; + + buffer_skip(si->ob, reql); + + /* Register status code */ + ps->statuscode = atoi(trash); + + /* Awake main task */ + task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG); + + /* If status code is success */ + if (ps->statuscode == PEER_SESSION_SUCCESSCODE) { + /* Init cursors */ + ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0; + ps->pushed = ps->update; + + /* Init confirm counter */ + ps->confirm = 0; + + /* reset teaching and learning flags to 0 */ + ps->flags &= PEER_TEACH_RESET; + ps->flags &= PEER_LEARN_RESET; + + /* If current peer is local */ + if (ps->peer->local) { + /* Init cursors to push a resync */ + ps->teaching_origin = ps->pushed = ps->table->table->update; + /* flag to start to teach lesson */ + ps->flags |= PEER_F_TEACH_PROCESS; + + } + else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE && + !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) { + /* If peer is remote and resync from remote is needed, + and no peer currently assigned */ + + /* assign peer for a lesson */ + ps->flags |= PEER_F_LEARN_ASSIGN; + ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN; + } + + } + else { + /* Status code is not success, abort */ + si->st0 = PEER_SESSION_END; + goto switchstate; + } + si->st0 = PEER_SESSION_WAITMSG; + /* fall through */ + } + case PEER_SESSION_WAITMSG: { + struct peer_session *ps = (struct peer_session *)si->private; + char c; + int totl = 0; + + reql = buffer_get_block(si->ob, (char *)&c, sizeof(c), totl); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) { + /* nothing to read */ + goto incomplete; + } + si->st0 = PEER_SESSION_END; + goto switchstate; + } + totl += reql; + + if ((c & 0x80) || (c == 'D')) { + /* Here we have data message */ + unsigned int pushack; + struct stksess *ts; + struct stksess *newts; + struct stktable_key stkey; + int srvid; + uint32_t netinteger; + + /* Compute update remote version */ + if (c & 0x80) { + pushack = ps->pushack + (unsigned int)(c & 0x7F); + } + else { + reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) { + goto incomplete; + } + si->st0 = PEER_SESSION_END; + goto switchstate; + } + totl += reql; + pushack = ntohl(netinteger); + } + + /* read key */ + if (ps->table->table->type == STKTABLE_TYPE_STRING) { + /* type string */ + stkey.key = stkey.data.buf; + + reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) { + goto incomplete; + } + si->st0 = PEER_SESSION_END; + goto switchstate; + } + totl += reql; + stkey.key_len = ntohl(netinteger); + + reql = buffer_get_block(si->ob, stkey.key, stkey.key_len, totl); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) { + goto incomplete; + } + si->st0 = PEER_SESSION_END; + goto switchstate; + } + totl += reql; + } + else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) { + /* type integer */ + stkey.key_len = (size_t)-1; + stkey.key = &stkey.data.integer; + + reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) { + goto incomplete; + } + si->st0 = PEER_SESSION_END; + goto switchstate; + } + totl += reql; + stkey.data.integer = ntohl(netinteger); + } + else { + /* type ip */ + stkey.key_len = (size_t)-1; + stkey.key = stkey.data.buf; + + reql = buffer_get_block(si->ob, (char *)&stkey.data.buf, ps->table->table->key_size, totl); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) { + goto incomplete; + } + si->st0 = PEER_SESSION_END; + goto switchstate; + } + totl += reql; + + } + + /* read server id */ + reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) { + goto incomplete; + } + si->st0 = PEER_SESSION_END; + goto switchstate; + } + totl += reql; + srvid = ntohl(netinteger); + + /* update entry */ + newts = stksess_new(ps->table->table, &stkey); + if (newts) { + /* lookup for existing entry */ + ts = stktable_lookup(ps->table->table, newts); + if (ts) { + /* the entry already exist, we can free ours */ + stktable_touch(ps->table->table, ts, 0); + stksess_free(ps->table->table, newts); + } + else { + struct eb32_node *eb; + + /* create new entry */ + ts = stktable_store(ps->table->table, newts, 0); + ts->upd.key= (++ps->table->table->update)+(2^31); + eb = eb32_insert(&ps->table->table->updates, &ts->upd); + if (eb != &ts->upd) { + eb32_delete(eb); + eb32_insert(&ps->table->table->updates, &ts->upd); + } + } + + /* update entry */ + if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID)) + stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid; + ps->pushack = pushack; + } + + } + else if (c == 'R') { + /* Reset message: remote need resync */ + + /* reinit counters for a resync */ + ps->lastpush = 0; + ps->teaching_origin = ps->pushed = ps->table->table->update; + + /* reset teaching flags to 0 */ + ps->flags &= PEER_TEACH_RESET; + + /* flag to start to teach lesson */ + ps->flags |= PEER_F_TEACH_PROCESS; + } + else if (c == 'F') { + /* Finish message, all known updates have been pushed by remote */ + /* and remote is up to date */ + + /* If resync is in progress with remote peer */ + if (ps->flags & PEER_F_LEARN_ASSIGN) { + + /* unassign current peer for learning */ + ps->flags &= ~PEER_F_LEARN_ASSIGN; + ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS); + + /* Consider table is now up2date, resync resync no more needed from local neither remote */ + ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE); + } + /* Increase confirm counter to launch a confirm message */ + ps->confirm++; + } + else if (c == 'c') { + /* confirm message, remote peer is now up to date with us */ + + /* If stopping state */ + if (stopping) { + /* Close session, push resync no more needed */ + ps->flags |= PEER_F_TEACH_COMPLETE; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + + /* reset teaching flags to 0 */ + ps->flags &= PEER_TEACH_RESET; + } + else if (c == 'C') { + /* Continue message, all known updates have been pushed by remote */ + /* but remote is not up to date */ + + /* If resync is in progress with current peer */ + if (ps->flags & PEER_F_LEARN_ASSIGN) { + + /* unassign current peer */ + ps->flags &= ~PEER_F_LEARN_ASSIGN; + ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS); + + /* flag current peer is not up 2 date to try from an other */ + ps->flags |= PEER_F_LEARN_NOTUP2DATE; + + /* reschedule a resync */ + ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG); + } + ps->confirm++; + } + else if (c == 'A') { + /* ack message */ + uint32_t netinteger; + + reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl); + if (reql <= 0) { /* closed or EOL not found */ + if (reql == 0) { + goto incomplete; + } + si->st0 = PEER_SESSION_END; + goto switchstate; + } + totl += reql; + + /* Consider remote is up to date with "acked" version */ + ps->update = ntohl(netinteger); + } + else { + /* Unknown message */ + si->st0 = PEER_SESSION_END; + goto switchstate; + } + + /* skip consumed message */ + buffer_skip(si->ob, totl); + + /* loop on that state to peek next message */ + continue; +incomplete: + /* Nothing to read, now we start to write */ + + /* Confirm finished or partial messages */ + while (ps->confirm) { + /* There is a confirm messages to send */ + repl = buffer_put_char(si->ib, 'c'); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + ps->confirm--; + } + + /* Need to request a resync */ + if ((ps->flags & PEER_F_LEARN_ASSIGN) && + (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) && + !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) { + /* Current peer was elected to request a resync */ + + repl = buffer_put_char(si->ib, 'R'); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + ps->table->flags |= SHTABLE_F_RESYNC_PROCESS; + } + + /* It remains some updates to ack */ + if (ps->pushack != ps->lastack) { + uint32_t netinteger; + + trash[0] = 'A'; + netinteger = htonl(ps->pushack); + memcpy(&trash[1], &netinteger, sizeof(netinteger)); + + repl = buffer_put_block(si->ib, trash, 1+sizeof(netinteger)); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + ps->lastack = ps->pushack; + } + + if (ps->flags & PEER_F_TEACH_PROCESS) { + /* current peer was requested for a lesson */ + + if (!(ps->flags & PEER_F_TEACH_STAGE1)) { + /* lesson stage 1 not complete */ + struct eb32_node *eb; + + eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1); + while (1) { + int msglen; + struct stksess *ts; + + if (!eb) { + /* flag lesson stage1 complete */ + ps->flags |= PEER_F_TEACH_STAGE1; + eb = eb32_first(&ps->table->table->updates); + if (eb) + ps->pushed = eb->key - 1; + break; + } + + ts = eb32_entry(eb, struct stksess, upd); + msglen = peer_prepare_datamsg(ts, ps, trash, sizeof(trash)); + if (msglen) { + /* message to buffer */ + repl = buffer_put_block(si->ib, trash, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + ps->lastpush = ps->pushed = ts->upd.key; + } + eb = eb32_next(eb); + } + } /* !TEACH_STAGE1 */ + + if (!(ps->flags & PEER_F_TEACH_STAGE2)) { + /* lesson stage 2 not complete */ + struct eb32_node *eb; + + eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1); + while (1) { + int msglen; + struct stksess *ts; + + if (!eb || eb->key > ps->teaching_origin) { + /* flag lesson stage1 complete */ + ps->flags |= PEER_F_TEACH_STAGE2; + ps->pushed = ps->teaching_origin; + break; + } + + ts = eb32_entry(eb, struct stksess, upd); + msglen = peer_prepare_datamsg(ts, ps, trash, sizeof(trash)); + if (msglen) { + /* message to buffer */ + repl = buffer_put_block(si->ib, trash, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + ps->lastpush = ps->pushed = ts->upd.key; + } + eb = eb32_next(eb); + } + } /* !TEACH_STAGE2 */ + + if (!(ps->flags & PEER_F_TEACH_FINISHED)) { + /* process final lesson message */ + repl = buffer_put_char(si->ib, ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C'); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + + /* flag finished message sent */ + ps->flags |= PEER_F_TEACH_FINISHED; + } /* !TEACH_FINISHED */ + } /* TEACH_PROCESS */ + + if (!(ps->flags & PEER_F_LEARN_ASSIGN) && + (int)(ps->pushed - ps->table->table->localupdate) < 0) { + /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */ + struct eb32_node *eb; + + eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1); + while (1) { + int msglen; + struct stksess *ts; + + /* push local updates */ + if (!eb) { + eb = eb32_first(&ps->table->table->updates); + if (!eb || ((int)(eb->key - ps->pushed) <= 0)) { + ps->pushed = ps->table->table->localupdate; + break; + } + } + + if ((int)(eb->key - ps->table->table->localupdate) > 0) { + ps->pushed = ps->table->table->localupdate; + break; + } + + ts = eb32_entry(eb, struct stksess, upd); + msglen = peer_prepare_datamsg(ts, ps, trash, sizeof(trash)); + if (msglen) { + /* message to buffer */ + repl = buffer_put_block(si->ib, trash, msglen); + if (repl <= 0) { + /* no more write possible */ + if (repl == -1) + goto out; + si->st0 = PEER_SESSION_END; + goto switchstate; + } + ps->lastpush = ps->pushed = ts->upd.key; + } + eb = eb32_next(eb); + } + } /* ! LEARN_ASSIGN */ + /* noting more to do */ + goto out; + } + case PEER_SESSION_EXIT: + repl = snprintf(trash, sizeof(trash), "%d\n", si->st1); + + if (buffer_put_block(si->ib, trash, repl) == -1) + goto out; + si->st0 = PEER_SESSION_END; + /* fall through */ + case PEER_SESSION_END: { + si->shutw(si); + si->shutr(si); + si->ib->flags |= BF_READ_NULL; + goto quit; + } + } + } +out: + si->update(si); + si->ob->flags |= BF_READ_DONTWAIT; + /* we don't want to expire timeouts while we're processing requests */ + si->ib->rex = TICK_ETERNITY; + si->ob->wex = TICK_ETERNITY; +quit: + return; +} + + +/* + * Use this function to force a close of a peer session + */ +void peer_session_forceshutdown(struct session * session) +{ + struct stream_interface *oldsi; + + if (session->si[0].iohandler == peer_io_handler) { + oldsi = &session->si[0]; + } + else { + oldsi = &session->si[1]; + } + + /* call release to reinit resync states if needed */ + peer_session_release(oldsi); + oldsi->st0 = PEER_SESSION_END; + oldsi->private = NULL; + task_wakeup(session->task, TASK_WOKEN_MSG); +} + +/* + * this function is called on a read event from a listen socket, corresponding + * to an accept. It tries to accept as many connections as possible. + * It returns 0. + */ +int peer_accept(struct session *s) +{ + /* we have a dedicated I/O handler for the stats */ + stream_int_register_handler(&s->si[1], peer_io_handler); + s->si[1].release = peer_session_release; + s->si[1].private = s; + s->si[1].st0 = PEER_SESSION_ACCEPT; + + tv_zero(&s->logs.tv_request); + s->logs.t_queue = 0; + s->logs.t_connect = 0; + s->logs.t_data = 0; + s->logs.t_close = 0; + s->logs.bytes_in = s->logs.bytes_out = 0; + s->logs.prx_queue_size = 0;/* we get the number of pending conns before us */ + s->logs.srv_queue_size = 0; /* we will get this number soon */ + + s->req->flags |= BF_READ_DONTWAIT; /* we plan to read small requests */ + + if (s->listener->timeout) { + s->req->rto = *s->listener->timeout; + s->rep->wto = *s->listener->timeout; + } + return 1; +} + +/* + * Create a new peer session in assingned state (connect will start automatically) + */ +struct session *peer_session_create(struct peer *peer, struct peer_session *ps) +{ + struct listener *l = ((struct proxy *)peer->peers->peers_fe)->listen; + struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */ + struct session *s; + struct http_txn *txn; + struct task *t; + + if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */ + Alert("out of memory in event_accept().\n"); + p->state = PR_STIDLE; + goto out_close; + } + + LIST_ADDQ(&sessions, &s->list); + LIST_INIT(&s->back_refs); + + s->flags = SN_ASSIGNED|SN_ADDR_SET; + s->term_trace = 0; + + /* if this session comes from a known monitoring system, we want to ignore + * it as soon as possible, which means closing it immediately for TCP. + */ + if ((t = task_new()) == NULL) { /* disable this proxy for a while */ + Alert("out of memory in event_accept().\n"); + p->state = PR_STIDLE; + goto out_free_session; + } + + ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); + ps->statuscode = PEER_SESSION_CONNECTCODE; + + t->process = l->handler; + t->context = s; + t->nice = l->nice; + + memcpy(&s->srv_addr, &peer->addr, sizeof(s->srv_addr)); + s->task = t; + s->listener = l; + + /* Note: initially, the session's backend points to the frontend. + * This changes later when switching rules are executed or + * when the default backend is assigned. + */ + s->be = s->fe = p; + + s->req = s->rep = NULL; /* will be allocated later */ + + s->si[0].fd = -1; + s->si[0].owner = t; + s->si[0].state = s->si[0].prev_state = SI_ST_EST; + s->si[0].err_type = SI_ET_NONE; + s->si[0].err_loc = NULL; + s->si[0].exp = TICK_ETERNITY; + s->si[0].flags = SI_FL_NONE; + if (s->fe->options2 & PR_O2_INDEPSTR) + s->si[0].flags |= SI_FL_INDEP_STR; + s->si[0].private = (void *)ps; + s->si[0].st0 = PEER_SESSION_CONNECT; + + stream_int_register_handler(&s->si[0], peer_io_handler); + s->si[0].release = peer_session_release; + + s->si[1].fd = -1; /* just to help with debugging */ + s->si[1].owner = t; + s->si[1].state = s->si[1].prev_state = SI_ST_ASS; + s->si[1].conn_retries = p->conn_retries; + s->si[1].err_type = SI_ET_NONE; + s->si[1].err_loc = NULL; + s->si[1].connect = tcpv4_connect_server; + s->si[1].exp = TICK_ETERNITY; + s->si[1].flags = SI_FL_NONE; + if (s->be->options2 & PR_O2_INDEPSTR) + s->si[1].flags |= SI_FL_INDEP_STR; + + stream_sock_prepare_interface(&s->si[1]); + s->si[1].release = NULL; + + s->srv = s->prev_srv = s->srv_conn = NULL; + s->pend_pos = NULL; + + /* init store persistence */ + s->store_count = 0; + s->stkctr1_entry = NULL; + s->stkctr2_entry = NULL; + + /* FIXME: the logs are horribly complicated now, because they are + * defined in

,

, and later and . + */ + + s->logs.logwait = 0; + s->do_log = NULL; + + /* default error reporting function, may be changed by analysers */ + s->srv_error = default_srv_error; + + s->data_source = DATA_SRC_NONE; + + s->uniq_id = 0; + + txn = &s->txn; + /* Those variables will be checked and freed if non-NULL in + * session.c:session_free(). It is important that they are + * properly initialized. + */ + txn->sessid = NULL; + txn->srv_cookie = NULL; + txn->cli_cookie = NULL; + txn->uri = NULL; + txn->req.cap = NULL; + txn->rsp.cap = NULL; + txn->hdr_idx.v = NULL; + txn->hdr_idx.size = txn->hdr_idx.used = 0; + + if ((s->req = pool_alloc2(pool2_buffer)) == NULL) + goto out_fail_req; /* no memory */ + + s->req->size = global.tune.bufsize; + buffer_init(s->req); + s->req->prod = &s->si[0]; + s->req->cons = &s->si[1]; + s->si[0].ib = s->si[1].ob = s->req; + + s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */ + + /* activate default analysers enabled for this listener */ + s->req->analysers = l->analysers; + + /* note: this should not happen anymore since there's always at least the switching rules */ + if (!s->req->analysers) { + buffer_auto_connect(s->req);/* don't wait to establish connection */ + buffer_auto_close(s->req);/* let the producer forward close requests */ + } + + s->req->rto = s->fe->timeout.client; + s->req->wto = s->be->timeout.server; + + if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) + goto out_fail_rep; /* no memory */ + + s->rep->size = global.tune.bufsize; + buffer_init(s->rep); + s->rep->prod = &s->si[1]; + s->rep->cons = &s->si[0]; + s->si[0].ob = s->si[1].ib = s->rep; + + s->rep->rto = s->be->timeout.server; + s->rep->wto = s->fe->timeout.client; + + s->req->rex = TICK_ETERNITY; + s->req->wex = TICK_ETERNITY; + s->req->analyse_exp = TICK_ETERNITY; + s->rep->rex = TICK_ETERNITY; + s->rep->wex = TICK_ETERNITY; + s->rep->analyse_exp = TICK_ETERNITY; + t->expire = TICK_ETERNITY; + + s->rep->flags |= BF_READ_DONTWAIT; + /* it is important not to call the wakeup function directly but to + * pass through task_wakeup(), because this one knows how to apply + * priorities to tasks. + */ + task_wakeup(t, TASK_WOKEN_INIT); + + l->nbconn++; /* warning! right now, it's up to the handler to decrease this */ + p->feconn++;/* beconn will be increased later */ + jobs++; + actconn++; + totalconn++; + + return s; + + /* Error unrolling */ + out_fail_rep: + pool_free2(pool2_buffer, s->req); + out_fail_req: + task_free(t); + out_free_session: + LIST_DEL(&s->list); + pool_free2(pool2_session, s); + out_close: + return s; +} + +/* + * Task processing function to manage re-connect and peer session + * tasks wakeup on local update. + */ +struct task *process_peer_sync(struct task * task) +{ + struct shared_table *st = (struct shared_table *)task->context; + struct peer_session *ps; + + task->expire = TICK_ETERNITY; + + if (!stopping) { + /* Normal case (not soft stop)*/ + if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) && + (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) && + !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) { + /* Resync from local peer needed + no peer was assigned for the lesson + and no old local peer found + or resync timeout expire */ + + /* flag no more resync from local, to try resync from remotes */ + st->flags |= SHTABLE_F_RESYNC_LOCAL; + + /* reschedule a resync */ + st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + } + + /* For each session */ + for (ps = st->sessions; ps; ps = ps->next) { + /* For each remote peers */ + if (!ps->peer->local) { + if (!ps->session) { + /* no active session */ + if (ps->statuscode == 0 || + ps->statuscode == PEER_SESSION_SUCCESSCODE || + ((ps->statuscode == PEER_SESSION_CONNECTCODE || + ps->statuscode == PEER_SESSION_CONNECTEDCODE) && + tick_is_expired(ps->reconnect, now_ms))) { + /* connection never tried + * or previous session established with success + * or previous session failed during connection + * and reconnection timer is expired */ + + /* retry a connect */ + ps->session = peer_session_create(ps->peer, ps); + } + else if (ps->statuscode == PEER_SESSION_CONNECTCODE || + ps->statuscode == PEER_SESSION_CONNECTEDCODE) { + /* If previous session failed during connection + * but reconnection timer is not expired */ + + /* reschedule task for reconnect */ + task->expire = tick_first(task->expire, ps->reconnect); + } + /* else do nothing */ + } /* !ps->session */ + else if (ps->statuscode == PEER_SESSION_SUCCESSCODE) { + /* current session is active and established */ + if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) && + !(st->flags & SHTABLE_F_RESYNC_ASSIGN) && + !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { + /* Resync from a remote is needed + * and no peer was assigned for lesson + * and current peer may be up2date */ + + /* assign peer for the lesson */ + ps->flags |= PEER_F_LEARN_ASSIGN; + st->flags |= SHTABLE_F_RESYNC_ASSIGN; + + /* awake peer session task to handle a request of resync */ + task_wakeup(ps->session->task, TASK_WOKEN_MSG); + } + else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) { + /* awake peer session task to push local updates */ + task_wakeup(ps->session->task, TASK_WOKEN_MSG); + } + /* else do nothing */ + } /* SUCCESSCODE */ + } /* !ps->peer->local */ + } /* for */ + + /* Resync from remotes expired: consider resync is finished */ + if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) && + !(st->flags & SHTABLE_F_RESYNC_ASSIGN) && + tick_is_expired(st->resync_timeout, now_ms)) { + /* Resync from remote peer needed + * no peer was assigned for the lesson + * and resync timeout expire */ + + /* flag no more resync from remote, consider resync is finished */ + st->flags |= SHTABLE_F_RESYNC_REMOTE; + } + + if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) { + /* Resync not finished*/ + /* reschedule task to resync timeout, to ended resync if needed */ + task->expire = tick_first(task->expire, st->resync_timeout); + } + } /* !stopping */ + else { + /* soft stop case */ + if (task->state & TASK_WOKEN_SIGNAL) { + /* We've just recieved the signal */ + if (!(st->flags & SHTABLE_F_DONOTSTOP)) { + /* add DO NOT STOP flag if not present */ + jobs++; + st->flags |= SHTABLE_F_DONOTSTOP; + } + + /* disconnect all connected peers */ + for (ps = st->sessions; ps; ps = ps->next) { + if (ps->session) { + peer_session_forceshutdown(ps->session); + ps->session = NULL; + } + } + } + ps = st->local_session; + + if (ps->flags & PEER_F_TEACH_COMPLETE) { + if (st->flags & SHTABLE_F_DONOTSTOP) { + /* resync of new process was complete, current process can die now */ + jobs--; + st->flags &= ~SHTABLE_F_DONOTSTOP; + } + } + else if (!ps->session) { + /* If session is not active */ + if (ps->statuscode == 0 || + ps->statuscode == PEER_SESSION_SUCCESSCODE || + ps->statuscode == PEER_SESSION_CONNECTEDCODE || + ps->statuscode == PEER_SESSION_TRYAGAIN) { + /* connection never tried + * or previous session was successfully established + * or previous session tcp connect success but init state incomplete + * or during previous connect, peer replies a try again statuscode */ + + /* connect to the peer */ + ps->session = peer_session_create(ps->peer, ps); + } + else { + /* Other error cases */ + if (st->flags & SHTABLE_F_DONOTSTOP) { + /* unable to resync new process, current process can die now */ + jobs--; + st->flags &= ~SHTABLE_F_DONOTSTOP; + } + } + } + else if (ps->statuscode == PEER_SESSION_SUCCESSCODE && + (int)(ps->pushed - ps->table->table->localupdate) < 0) { + /* current session active and established + awake session to push remaining local updates */ + task_wakeup(ps->session->task, TASK_WOKEN_MSG); + } + } /* stopping */ + /* Wakeup for re-connect */ + return task; +} + +/* + * Function used to register a table for sync on a group of peers + * + */ +void peers_register_table(struct peers *peers, struct stktable *table) +{ + struct shared_table *st; + struct peer * curpeer; + struct peer_session *ps; + + st = (struct shared_table *)calloc(1,sizeof(struct shared_table)); + st->table = table; + st->next = peers->tables; + st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); + peers->tables = st; + + for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) { + ps = (struct peer_session *)calloc(1,sizeof(struct peer_session)); + ps->table = st; + ps->peer = curpeer; + if (curpeer->local) + st->local_session = ps; + ps->next = st->sessions; + ps->reconnect = now_ms; + st->sessions = ps; + peers->peers_fe->maxconn += 3; + } + + peers->peers_fe->listen->maxconn = peers->peers_fe->maxconn; + st->sync_task = task_new(); + st->sync_task->process = process_peer_sync; + st->sync_task->expire = TICK_ETERNITY; + st->sync_task->context = (void *)st; + table->sync_task =st->sync_task; + signal_register_task(0, table->sync_task, 0); + task_wakeup(st->sync_task, TASK_WOKEN_INIT); +} +