mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-05-08 02:40:24 +00:00
A buffer overflow could happen if an integer is badly encoded in the data part of a msg received from a peer. It should not happen with authenticated peers (the handshake do not use this function). This patch makes the code of the 'intdecode' function more robust. It also adds some comments about the intencode function. This bug affects versions >= 1.6.
2085 lines
60 KiB
C
2085 lines
60 KiB
C
/*
|
|
* Peer synchro management.
|
|
*
|
|
* Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
|
|
*
|
|
* This program is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU General Public License
|
|
* as published by the Free Software Foundation; either version
|
|
* 2 of the License, or (at your option) any later version.
|
|
*
|
|
*/
|
|
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#include <sys/socket.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
|
|
#include <common/compat.h>
|
|
#include <common/config.h>
|
|
#include <common/time.h>
|
|
#include <common/standard.h>
|
|
|
|
#include <types/global.h>
|
|
#include <types/listener.h>
|
|
#include <types/obj_type.h>
|
|
#include <types/peers.h>
|
|
|
|
#include <proto/acl.h>
|
|
#include <proto/applet.h>
|
|
#include <proto/channel.h>
|
|
#include <proto/fd.h>
|
|
#include <proto/frontend.h>
|
|
#include <proto/log.h>
|
|
#include <proto/hdr_idx.h>
|
|
#include <proto/proto_tcp.h>
|
|
#include <proto/proto_http.h>
|
|
#include <proto/proxy.h>
|
|
#include <proto/session.h>
|
|
#include <proto/stream.h>
|
|
#include <proto/signal.h>
|
|
#include <proto/stick_table.h>
|
|
#include <proto/stream_interface.h>
|
|
#include <proto/task.h>
|
|
|
|
|
|
/*******************************/
|
|
/* Current peer learning state */
|
|
/*******************************/
|
|
|
|
/******************************/
|
|
/* Current peers section resync state */
|
|
/******************************/
|
|
#define PEERS_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
|
|
#define PEERS_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
|
|
#define PEERS_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
|
|
#define PEERS_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
|
|
#define PEERS_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
|
|
to push data to new process */
|
|
|
|
#define PEERS_RESYNC_STATEMASK (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE)
|
|
#define PEERS_RESYNC_FROMLOCAL 0x00000000
|
|
#define PEERS_RESYNC_FROMREMOTE PEERS_F_RESYNC_LOCAL
|
|
#define PEERS_RESYNC_FINISHED (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE)
|
|
|
|
/***********************************/
|
|
/* Current shared table sync state */
|
|
/***********************************/
|
|
#define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */
|
|
#define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */
|
|
|
|
/******************************/
|
|
/* Remote peer teaching state */
|
|
/******************************/
|
|
#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
|
|
#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
|
|
#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
|
|
#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
|
|
#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
|
|
#define PEER_F_DWNGRD 0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */
|
|
|
|
#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
|
|
#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
|
|
|
|
/*****************************/
|
|
/* Sync message class */
|
|
/*****************************/
|
|
enum {
|
|
PEER_MSG_CLASS_CONTROL = 0,
|
|
PEER_MSG_CLASS_ERROR,
|
|
PEER_MSG_CLASS_STICKTABLE = 10,
|
|
PEER_MSG_CLASS_RESERVED = 255,
|
|
};
|
|
|
|
/*****************************/
|
|
/* control message types */
|
|
/*****************************/
|
|
enum {
|
|
PEER_MSG_CTRL_RESYNCREQ = 0,
|
|
PEER_MSG_CTRL_RESYNCFINISHED,
|
|
PEER_MSG_CTRL_RESYNCPARTIAL,
|
|
PEER_MSG_CTRL_RESYNCCONFIRM,
|
|
};
|
|
|
|
/*****************************/
|
|
/* error message types */
|
|
/*****************************/
|
|
enum {
|
|
PEER_MSG_ERR_PROTOCOL = 0,
|
|
PEER_MSG_ERR_SIZELIMIT,
|
|
};
|
|
|
|
|
|
/*******************************/
|
|
/* stick table sync mesg types */
|
|
/* Note: ids >= 128 contains */
|
|
/* id message cotains data */
|
|
/*******************************/
|
|
enum {
|
|
PEER_MSG_STKT_UPDATE = 128,
|
|
PEER_MSG_STKT_INCUPDATE,
|
|
PEER_MSG_STKT_DEFINE,
|
|
PEER_MSG_STKT_SWITCH,
|
|
PEER_MSG_STKT_ACK,
|
|
PEER_MSG_STKT_UPDATE_TIMED,
|
|
PEER_MSG_STKT_INCUPDATE_TIMED,
|
|
};
|
|
|
|
/**********************************/
|
|
/* Peer Session IO handler states */
|
|
/**********************************/
|
|
|
|
enum {
|
|
PEER_SESS_ST_ACCEPT = 0, /* Initial state for session create by an accept, must be zero! */
|
|
PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */
|
|
PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */
|
|
PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */
|
|
/* after this point, data were possibly exchanged */
|
|
PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */
|
|
PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */
|
|
PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */
|
|
PEER_SESS_ST_WAITMSG, /* Wait for data messages */
|
|
PEER_SESS_ST_EXIT, /* Exit with status code */
|
|
PEER_SESS_ST_ERRPROTO, /* Send error proto message before exit */
|
|
PEER_SESS_ST_ERRSIZE, /* Send error size message before exit */
|
|
PEER_SESS_ST_END, /* Killed session */
|
|
};
|
|
|
|
/***************************************************/
|
|
/* Peer Session status code - part of the protocol */
|
|
/***************************************************/
|
|
|
|
#define PEER_SESS_SC_CONNECTCODE 100 /* connect in progress */
|
|
#define PEER_SESS_SC_CONNECTEDCODE 110 /* tcp connect success */
|
|
|
|
#define PEER_SESS_SC_SUCCESSCODE 200 /* accept or connect successful */
|
|
|
|
#define PEER_SESS_SC_TRYAGAIN 300 /* try again later */
|
|
|
|
#define PEER_SESS_SC_ERRPROTO 501 /* error protocol */
|
|
#define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */
|
|
#define PEER_SESS_SC_ERRHOST 503 /* bad host name */
|
|
#define PEER_SESS_SC_ERRPEER 504 /* unknown peer */
|
|
|
|
#define PEER_SESSION_PROTO_NAME "HAProxyS"
|
|
#define PEER_MAJOR_VER 2
|
|
#define PEER_MINOR_VER 1
|
|
#define PEER_DWNGRD_MINOR_VER 0
|
|
|
|
struct peers *peers = NULL;
|
|
static void peer_session_forceshutdown(struct appctx *appctx);
|
|
|
|
/* This function encode an uint64 to 'dynamic' length format.
|
|
The encoded value is written at address *str, and the
|
|
caller must assure that size after *str is large enought.
|
|
At return, the *str is set at the next Byte after then
|
|
encoded integer. The function returns then length of the
|
|
encoded integer in Bytes */
|
|
int intencode(uint64_t i, char **str) {
|
|
int idx = 0;
|
|
unsigned char *msg;
|
|
|
|
if (!*str)
|
|
return 0;
|
|
|
|
msg = (unsigned char *)*str;
|
|
if (i < 240) {
|
|
msg[0] = (unsigned char)i;
|
|
*str = (char *)&msg[idx+1];
|
|
return (idx+1);
|
|
}
|
|
|
|
msg[idx] =(unsigned char)i | 240;
|
|
i = (i - 240) >> 4;
|
|
while (i >= 128) {
|
|
msg[++idx] = (unsigned char)i | 128;
|
|
i = (i - 128) >> 7;
|
|
}
|
|
msg[++idx] = (unsigned char)i;
|
|
*str = (char *)&msg[idx+1];
|
|
return (idx+1);
|
|
}
|
|
|
|
|
|
/* This function returns the decoded integer or 0
|
|
if decode failed
|
|
*str point on the beginning of the integer to decode
|
|
at the end of decoding *str point on the end of the
|
|
encoded integer or to null if end is reached */
|
|
uint64_t intdecode(char **str, char *end)
|
|
{
|
|
unsigned char *msg;
|
|
uint64_t i;
|
|
int shift;
|
|
|
|
if (!*str)
|
|
return 0;
|
|
|
|
msg = (unsigned char *)*str;
|
|
if (msg >= (unsigned char *)end)
|
|
goto fail;
|
|
|
|
i = *(msg++);
|
|
if (i >= 240) {
|
|
shift = 4;
|
|
do {
|
|
if (msg >= (unsigned char *)end)
|
|
goto fail;
|
|
i += (uint64_t)*msg << shift;
|
|
shift += 7;
|
|
} while (*(msg++) >= 128);
|
|
}
|
|
*str = (char *)msg;
|
|
return i;
|
|
|
|
fail:
|
|
*str = NULL;
|
|
return 0;
|
|
}
|
|
|
|
/* Set the stick-table UPDATE message type byte at <msg_type> address,
|
|
* depending on <use_identifier> and <use_timed> boolean parameters.
|
|
* Always successful.
|
|
*/
|
|
static inline void peer_set_update_msg_type(char *msg_type, int use_identifier, int use_timed)
|
|
{
|
|
if (use_timed) {
|
|
if (use_identifier)
|
|
*msg_type = PEER_MSG_STKT_UPDATE_TIMED;
|
|
else
|
|
*msg_type = PEER_MSG_STKT_INCUPDATE_TIMED;
|
|
}
|
|
else {
|
|
if (use_identifier)
|
|
*msg_type = PEER_MSG_STKT_UPDATE;
|
|
else
|
|
*msg_type = PEER_MSG_STKT_INCUPDATE;
|
|
}
|
|
|
|
}
|
|
/*
|
|
* This prepare the data update message on the stick session <ts>, <st> is the considered
|
|
* stick table.
|
|
* <msg> is a buffer of <size> to recieve data message content
|
|
* If function returns 0, the caller should consider we were unable to encode this message (TODO:
|
|
* check size)
|
|
*/
|
|
static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier, int use_timed)
|
|
{
|
|
uint32_t netinteger;
|
|
unsigned short datalen;
|
|
char *cursor, *datamsg;
|
|
unsigned int data_type;
|
|
void *data_ptr;
|
|
|
|
cursor = datamsg = msg + 1 + 5;
|
|
|
|
/* construct message */
|
|
|
|
/* check if we need to send the update identifer */
|
|
if (!st->last_pushed || ts->upd.key < st->last_pushed || ((ts->upd.key - st->last_pushed) != 1)) {
|
|
use_identifier = 1;
|
|
}
|
|
|
|
/* encode update identifier if needed */
|
|
if (use_identifier) {
|
|
netinteger = htonl(ts->upd.key);
|
|
memcpy(cursor, &netinteger, sizeof(netinteger));
|
|
cursor += sizeof(netinteger);
|
|
}
|
|
|
|
if (use_timed) {
|
|
netinteger = htonl(tick_remain(now_ms, ts->expire));
|
|
memcpy(cursor, &netinteger, sizeof(netinteger));
|
|
cursor += sizeof(netinteger);
|
|
}
|
|
|
|
/* encode the key */
|
|
if (st->table->type == SMP_T_STR) {
|
|
int stlen = strlen((char *)ts->key.key);
|
|
|
|
intencode(stlen, &cursor);
|
|
memcpy(cursor, ts->key.key, stlen);
|
|
cursor += stlen;
|
|
}
|
|
else if (st->table->type == SMP_T_SINT) {
|
|
netinteger = htonl(*((uint32_t *)ts->key.key));
|
|
memcpy(cursor, &netinteger, sizeof(netinteger));
|
|
cursor += sizeof(netinteger);
|
|
}
|
|
else {
|
|
memcpy(cursor, ts->key.key, st->table->key_size);
|
|
cursor += st->table->key_size;
|
|
}
|
|
|
|
/* encode values */
|
|
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
|
|
|
|
data_ptr = stktable_data_ptr(st->table, ts, data_type);
|
|
if (data_ptr) {
|
|
switch (stktable_data_types[data_type].std_type) {
|
|
case STD_T_SINT: {
|
|
int data;
|
|
|
|
data = stktable_data_cast(data_ptr, std_t_sint);
|
|
intencode(data, &cursor);
|
|
break;
|
|
}
|
|
case STD_T_UINT: {
|
|
unsigned int data;
|
|
|
|
data = stktable_data_cast(data_ptr, std_t_uint);
|
|
intencode(data, &cursor);
|
|
break;
|
|
}
|
|
case STD_T_ULL: {
|
|
unsigned long long data;
|
|
|
|
data = stktable_data_cast(data_ptr, std_t_ull);
|
|
intencode(data, &cursor);
|
|
break;
|
|
}
|
|
case STD_T_FRQP: {
|
|
struct freq_ctr_period *frqp;
|
|
|
|
frqp = &stktable_data_cast(data_ptr, std_t_frqp);
|
|
intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor);
|
|
intencode(frqp->curr_ctr, &cursor);
|
|
intencode(frqp->prev_ctr, &cursor);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Compute datalen */
|
|
datalen = (cursor - datamsg);
|
|
|
|
/* prepare message header */
|
|
msg[0] = PEER_MSG_CLASS_STICKTABLE;
|
|
peer_set_update_msg_type(&msg[1], use_identifier, use_timed);
|
|
cursor = &msg[2];
|
|
intencode(datalen, &cursor);
|
|
|
|
/* move data after header */
|
|
memmove(cursor, datamsg, datalen);
|
|
|
|
/* return header size + data_len */
|
|
return (cursor - msg) + datalen;
|
|
}
|
|
|
|
/*
|
|
* This prepare the switch table message to targeted share table <st>.
|
|
* <msg> is a buffer of <size> to recieve data message content
|
|
* If function returns 0, the caller should consider we were unable to encode this message (TODO:
|
|
* check size)
|
|
*/
|
|
static int peer_prepare_switchmsg(struct shared_table *st, char *msg, size_t size)
|
|
{
|
|
int len;
|
|
unsigned short datalen;
|
|
char *cursor, *datamsg;
|
|
uint64_t data = 0;
|
|
unsigned int data_type;
|
|
|
|
cursor = datamsg = msg + 2 + 5;
|
|
|
|
/* Encode data */
|
|
|
|
/* encode local id */
|
|
intencode(st->local_id, &cursor);
|
|
|
|
/* encode table name */
|
|
len = strlen(st->table->id);
|
|
intencode(len, &cursor);
|
|
memcpy(cursor, st->table->id, len);
|
|
cursor += len;
|
|
|
|
/* encode table type */
|
|
|
|
intencode(st->table->type, &cursor);
|
|
|
|
/* encode table key size */
|
|
intencode(st->table->key_size, &cursor);
|
|
|
|
/* encode available known data types in table */
|
|
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
|
|
if (st->table->data_ofs[data_type]) {
|
|
switch (stktable_data_types[data_type].std_type) {
|
|
case STD_T_SINT:
|
|
case STD_T_UINT:
|
|
case STD_T_ULL:
|
|
case STD_T_FRQP:
|
|
data |= 1 << data_type;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
intencode(data, &cursor);
|
|
|
|
/* Compute datalen */
|
|
datalen = (cursor - datamsg);
|
|
|
|
/* prepare message header */
|
|
msg[0] = PEER_MSG_CLASS_STICKTABLE;
|
|
msg[1] = PEER_MSG_STKT_DEFINE;
|
|
cursor = &msg[2];
|
|
intencode(datalen, &cursor);
|
|
|
|
/* move data after header */
|
|
memmove(cursor, datamsg, datalen);
|
|
|
|
/* return header size + data_len */
|
|
return (cursor - msg) + datalen;
|
|
}
|
|
|
|
/*
|
|
* This prepare the acknowledge message on the stick session <ts>, <st> is the considered
|
|
* stick table.
|
|
* <msg> is a buffer of <size> to recieve data message content
|
|
* If function returns 0, the caller should consider we were unable to encode this message (TODO:
|
|
* check size)
|
|
*/
|
|
static int peer_prepare_ackmsg(struct shared_table *st, char *msg, size_t size)
|
|
{
|
|
unsigned short datalen;
|
|
char *cursor, *datamsg;
|
|
uint32_t netinteger;
|
|
|
|
cursor = datamsg = msg + 2 + 5;
|
|
|
|
intencode(st->remote_id, &cursor);
|
|
netinteger = htonl(st->last_get);
|
|
memcpy(cursor, &netinteger, sizeof(netinteger));
|
|
cursor += sizeof(netinteger);
|
|
|
|
/* Compute datalen */
|
|
datalen = (cursor - datamsg);
|
|
|
|
/* prepare message header */
|
|
msg[0] = PEER_MSG_CLASS_STICKTABLE;
|
|
msg[1] = PEER_MSG_STKT_ACK;
|
|
cursor = &msg[2];
|
|
intencode(datalen, &cursor);
|
|
|
|
/* move data after header */
|
|
memmove(cursor, datamsg, datalen);
|
|
|
|
/* return header size + data_len */
|
|
return (cursor - msg) + datalen;
|
|
}
|
|
|
|
/*
|
|
* Callback to release a session with a peer
|
|
*/
|
|
static void peer_session_release(struct appctx *appctx)
|
|
{
|
|
struct stream_interface *si = appctx->owner;
|
|
struct stream *s = si_strm(si);
|
|
struct peer *peer = appctx->ctx.peers.ptr;
|
|
struct peers *peers = strm_fe(s)->parent;
|
|
|
|
/* appctx->ctx.peers.ptr is not a peer session */
|
|
if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
|
|
return;
|
|
|
|
/* peer session identified */
|
|
if (peer) {
|
|
if (peer->appctx == appctx) {
|
|
/* Re-init current table pointers to force announcement on re-connect */
|
|
peer->remote_table = peer->last_local_table = NULL;
|
|
peer->appctx = NULL;
|
|
if (peer->flags & PEER_F_LEARN_ASSIGN) {
|
|
/* unassign current peer for learning */
|
|
peer->flags &= ~(PEER_F_LEARN_ASSIGN);
|
|
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
|
|
|
/* reschedule a resync */
|
|
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
|
|
}
|
|
/* reset teaching and learning flags to 0 */
|
|
peer->flags &= PEER_TEACH_RESET;
|
|
peer->flags &= PEER_LEARN_RESET;
|
|
}
|
|
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
|
}
|
|
}
|
|
|
|
/* Retrieve the major and minor versions of peers protocol
|
|
* announced by a remote peer. <str> is a null-terminated
|
|
* string with the following format: "<maj_ver>.<min_ver>".
|
|
*/
|
|
static int peer_get_version(const char *str,
|
|
unsigned int *maj_ver, unsigned int *min_ver)
|
|
{
|
|
unsigned int majv, minv;
|
|
const char *pos, *saved;
|
|
const char *end;
|
|
|
|
saved = pos = str;
|
|
end = str + strlen(str);
|
|
|
|
majv = read_uint(&pos, end);
|
|
if (saved == pos || *pos++ != '.')
|
|
return -1;
|
|
|
|
saved = pos;
|
|
minv = read_uint(&pos, end);
|
|
if (saved == pos || pos != end)
|
|
return -1;
|
|
|
|
*maj_ver = majv;
|
|
*min_ver = minv;
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* IO Handler to handle message exchance with a peer
|
|
*/
|
|
static void peer_io_handler(struct appctx *appctx)
|
|
{
|
|
struct stream_interface *si = appctx->owner;
|
|
struct stream *s = si_strm(si);
|
|
struct peers *curpeers = strm_fe(s)->parent;
|
|
int reql = 0;
|
|
int repl = 0;
|
|
size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
|
|
unsigned int maj_ver, min_ver;
|
|
|
|
/* Check if the input buffer is avalaible. */
|
|
if (si_ic(si)->buf->size == 0)
|
|
goto full;
|
|
|
|
while (1) {
|
|
switchstate:
|
|
maj_ver = min_ver = (unsigned int)-1;
|
|
switch(appctx->st0) {
|
|
case PEER_SESS_ST_ACCEPT:
|
|
appctx->ctx.peers.ptr = NULL;
|
|
appctx->st0 = PEER_SESS_ST_GETVERSION;
|
|
/* fall through */
|
|
case PEER_SESS_ST_GETVERSION:
|
|
reql = bo_getline(si_oc(si), trash.str, trash.size);
|
|
if (reql <= 0) { /* closed or EOL not found */
|
|
if (reql == 0)
|
|
goto out;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
if (trash.str[reql-1] != '\n') {
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
else if (reql > 1 && (trash.str[reql-2] == '\r'))
|
|
trash.str[reql-2] = 0;
|
|
else
|
|
trash.str[reql-1] = 0;
|
|
|
|
bo_skip(si_oc(si), reql);
|
|
|
|
/* test protocol */
|
|
if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, proto_len + 1) != 0) {
|
|
appctx->st0 = PEER_SESS_ST_EXIT;
|
|
appctx->st1 = PEER_SESS_SC_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
if (peer_get_version(trash.str + proto_len + 1, &maj_ver, &min_ver) == -1 ||
|
|
maj_ver != PEER_MAJOR_VER || min_ver > PEER_MINOR_VER) {
|
|
appctx->st0 = PEER_SESS_ST_EXIT;
|
|
appctx->st1 = PEER_SESS_SC_ERRVERSION;
|
|
goto switchstate;
|
|
}
|
|
|
|
appctx->st0 = PEER_SESS_ST_GETHOST;
|
|
/* fall through */
|
|
case PEER_SESS_ST_GETHOST:
|
|
reql = bo_getline(si_oc(si), trash.str, trash.size);
|
|
if (reql <= 0) { /* closed or EOL not found */
|
|
if (reql == 0)
|
|
goto out;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
if (trash.str[reql-1] != '\n') {
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
else if (reql > 1 && (trash.str[reql-2] == '\r'))
|
|
trash.str[reql-2] = 0;
|
|
else
|
|
trash.str[reql-1] = 0;
|
|
|
|
bo_skip(si_oc(si), reql);
|
|
|
|
/* test hostname match */
|
|
if (strcmp(localpeer, trash.str) != 0) {
|
|
appctx->st0 = PEER_SESS_ST_EXIT;
|
|
appctx->st1 = PEER_SESS_SC_ERRHOST;
|
|
goto switchstate;
|
|
}
|
|
|
|
appctx->st0 = PEER_SESS_ST_GETPEER;
|
|
/* fall through */
|
|
case PEER_SESS_ST_GETPEER: {
|
|
struct peer *curpeer;
|
|
char *p;
|
|
reql = bo_getline(si_oc(si), trash.str, trash.size);
|
|
if (reql <= 0) { /* closed or EOL not found */
|
|
if (reql == 0)
|
|
goto out;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
if (trash.str[reql-1] != '\n') {
|
|
/* Incomplete line, we quit */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
else if (reql > 1 && (trash.str[reql-2] == '\r'))
|
|
trash.str[reql-2] = 0;
|
|
else
|
|
trash.str[reql-1] = 0;
|
|
|
|
bo_skip(si_oc(si), reql);
|
|
|
|
/* parse line "<peer name> <pid> <relative_pid>" */
|
|
p = strchr(trash.str, ' ');
|
|
if (!p) {
|
|
appctx->st0 = PEER_SESS_ST_EXIT;
|
|
appctx->st1 = PEER_SESS_SC_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
*p = 0;
|
|
|
|
/* lookup known peer */
|
|
for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
|
|
if (strcmp(curpeer->id, trash.str) == 0)
|
|
break;
|
|
}
|
|
|
|
/* if unknown peer */
|
|
if (!curpeer) {
|
|
appctx->st0 = PEER_SESS_ST_EXIT;
|
|
appctx->st1 = PEER_SESS_SC_ERRPEER;
|
|
goto switchstate;
|
|
}
|
|
|
|
if (curpeer->appctx && curpeer->appctx != appctx) {
|
|
if (curpeer->local) {
|
|
/* Local connection, reply a retry */
|
|
appctx->st0 = PEER_SESS_ST_EXIT;
|
|
appctx->st1 = PEER_SESS_SC_TRYAGAIN;
|
|
goto switchstate;
|
|
}
|
|
peer_session_forceshutdown(curpeer->appctx);
|
|
}
|
|
if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) {
|
|
if (min_ver == PEER_DWNGRD_MINOR_VER) {
|
|
curpeer->flags |= PEER_F_DWNGRD;
|
|
}
|
|
else {
|
|
curpeer->flags &= ~PEER_F_DWNGRD;
|
|
}
|
|
}
|
|
curpeer->appctx = appctx;
|
|
appctx->ctx.peers.ptr = curpeer;
|
|
appctx->st0 = PEER_SESS_ST_SENDSUCCESS;
|
|
/* fall through */
|
|
}
|
|
case PEER_SESS_ST_SENDSUCCESS: {
|
|
struct peer *curpeer = appctx->ctx.peers.ptr;
|
|
struct shared_table *st;
|
|
|
|
repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
|
|
repl = bi_putblk(si_ic(si), trash.str, repl);
|
|
if (repl <= 0) {
|
|
if (repl == -1)
|
|
goto full;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* Register status code */
|
|
curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE;
|
|
|
|
/* Awake main task */
|
|
task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG);
|
|
|
|
/* Init confirm counter */
|
|
curpeer->confirm = 0;
|
|
|
|
/* Init cursors */
|
|
for (st = curpeer->tables; st ; st = st->next) {
|
|
st->last_get = st->last_acked = 0;
|
|
st->teaching_origin = st->last_pushed = st->update;
|
|
}
|
|
|
|
/* reset teaching and learning flags to 0 */
|
|
curpeer->flags &= PEER_TEACH_RESET;
|
|
curpeer->flags &= PEER_LEARN_RESET;
|
|
|
|
/* if current peer is local */
|
|
if (curpeer->local) {
|
|
/* if current host need resyncfrom local and no process assined */
|
|
if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
|
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
|
/* assign local peer for a lesson, consider lesson already requested */
|
|
curpeer->flags |= PEER_F_LEARN_ASSIGN;
|
|
peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
|
}
|
|
|
|
}
|
|
else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
|
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
|
/* assign peer for a lesson */
|
|
curpeer->flags |= PEER_F_LEARN_ASSIGN;
|
|
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
|
}
|
|
|
|
|
|
/* switch to waiting message state */
|
|
appctx->st0 = PEER_SESS_ST_WAITMSG;
|
|
goto switchstate;
|
|
}
|
|
case PEER_SESS_ST_CONNECT: {
|
|
struct peer *curpeer = appctx->ctx.peers.ptr;
|
|
|
|
/* Send headers */
|
|
repl = snprintf(trash.str, trash.size,
|
|
PEER_SESSION_PROTO_NAME " %u.%u\n%s\n%s %d %d\n",
|
|
PEER_MAJOR_VER,
|
|
(curpeer->flags & PEER_F_DWNGRD) ? PEER_DWNGRD_MINOR_VER : PEER_MINOR_VER,
|
|
curpeer->id,
|
|
localpeer,
|
|
(int)getpid(),
|
|
relative_pid);
|
|
|
|
if (repl >= trash.size) {
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
repl = bi_putblk(si_ic(si), trash.str, repl);
|
|
if (repl <= 0) {
|
|
if (repl == -1)
|
|
goto full;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* switch to the waiting statuscode state */
|
|
appctx->st0 = PEER_SESS_ST_GETSTATUS;
|
|
/* fall through */
|
|
}
|
|
case PEER_SESS_ST_GETSTATUS: {
|
|
struct peer *curpeer = appctx->ctx.peers.ptr;
|
|
struct shared_table *st;
|
|
|
|
if (si_ic(si)->flags & CF_WRITE_PARTIAL)
|
|
curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
|
|
|
|
reql = bo_getline(si_oc(si), trash.str, trash.size);
|
|
if (reql <= 0) { /* closed or EOL not found */
|
|
if (reql == 0)
|
|
goto out;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
if (trash.str[reql-1] != '\n') {
|
|
/* Incomplete line, we quit */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
else if (reql > 1 && (trash.str[reql-2] == '\r'))
|
|
trash.str[reql-2] = 0;
|
|
else
|
|
trash.str[reql-1] = 0;
|
|
|
|
bo_skip(si_oc(si), reql);
|
|
|
|
/* Register status code */
|
|
curpeer->statuscode = atoi(trash.str);
|
|
|
|
/* Awake main task */
|
|
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
|
|
|
/* If status code is success */
|
|
if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
|
|
/* Init cursors */
|
|
for (st = curpeer->tables; st ; st = st->next) {
|
|
st->last_get = st->last_acked = 0;
|
|
st->teaching_origin = st->last_pushed = st->update;
|
|
}
|
|
|
|
/* Init confirm counter */
|
|
curpeer->confirm = 0;
|
|
|
|
/* reset teaching and learning flags to 0 */
|
|
curpeer->flags &= PEER_TEACH_RESET;
|
|
curpeer->flags &= PEER_LEARN_RESET;
|
|
|
|
/* If current peer is local */
|
|
if (curpeer->local) {
|
|
/* flag to start to teach lesson */
|
|
curpeer->flags |= PEER_F_TEACH_PROCESS;
|
|
|
|
}
|
|
else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
|
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
|
/* If peer is remote and resync from remote is needed,
|
|
and no peer currently assigned */
|
|
|
|
/* assign peer for a lesson */
|
|
curpeer->flags |= PEER_F_LEARN_ASSIGN;
|
|
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
|
}
|
|
|
|
}
|
|
else {
|
|
if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION)
|
|
curpeer->flags |= PEER_F_DWNGRD;
|
|
/* Status code is not success, abort */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
appctx->st0 = PEER_SESS_ST_WAITMSG;
|
|
/* fall through */
|
|
}
|
|
case PEER_SESS_ST_WAITMSG: {
|
|
struct peer *curpeer = appctx->ctx.peers.ptr;
|
|
struct stksess *ts, *newts = NULL;
|
|
uint32_t msg_len = 0;
|
|
char *msg_cur = trash.str;
|
|
char *msg_end = trash.str;
|
|
unsigned char msg_head[7];
|
|
int totl = 0;
|
|
|
|
reql = bo_getblk(si_oc(si), (char *)msg_head, 2*sizeof(unsigned char), totl);
|
|
if (reql <= 0) /* closed or EOL not found */
|
|
goto incomplete;
|
|
|
|
totl += reql;
|
|
|
|
if (msg_head[1] >= 128) {
|
|
/* Read and Decode message length */
|
|
reql = bo_getblk(si_oc(si), (char *)&msg_head[2], sizeof(unsigned char), totl);
|
|
if (reql <= 0) /* closed */
|
|
goto incomplete;
|
|
|
|
totl += reql;
|
|
|
|
if (msg_head[2] < 240) {
|
|
msg_len = msg_head[2];
|
|
}
|
|
else {
|
|
int i;
|
|
char *cur;
|
|
char *end;
|
|
|
|
for (i = 3 ; i < sizeof(msg_head) ; i++) {
|
|
reql = bo_getblk(si_oc(si), (char *)&msg_head[i], sizeof(char), totl);
|
|
if (reql <= 0) /* closed */
|
|
goto incomplete;
|
|
|
|
totl += reql;
|
|
|
|
if (!(msg_head[i] & 0x80))
|
|
break;
|
|
}
|
|
|
|
if (i == sizeof(msg_head)) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
|
|
}
|
|
end = (char *)msg_head + sizeof(msg_head);
|
|
cur = (char *)&msg_head[2];
|
|
msg_len = intdecode(&cur, end);
|
|
if (!cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
}
|
|
|
|
|
|
/* Read message content */
|
|
if (msg_len) {
|
|
if (msg_len > trash.size) {
|
|
/* Status code is not success, abort */
|
|
appctx->st0 = PEER_SESS_ST_ERRSIZE;
|
|
goto switchstate;
|
|
}
|
|
|
|
reql = bo_getblk(si_oc(si), trash.str, msg_len, totl);
|
|
if (reql <= 0) /* closed */
|
|
goto incomplete;
|
|
totl += reql;
|
|
|
|
msg_end += msg_len;
|
|
}
|
|
}
|
|
|
|
if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
|
|
if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
|
|
struct shared_table *st;
|
|
/* Reset message: remote need resync */
|
|
|
|
/* prepare tables fot a global push */
|
|
for (st = curpeer->tables; st; st = st->next) {
|
|
st->teaching_origin = st->last_pushed = st->table->update;
|
|
st->flags = 0;
|
|
}
|
|
|
|
/* reset teaching flags to 0 */
|
|
curpeer->flags &= PEER_TEACH_RESET;
|
|
|
|
/* flag to start to teach lesson */
|
|
curpeer->flags |= PEER_F_TEACH_PROCESS;
|
|
|
|
|
|
}
|
|
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
|
|
|
|
if (curpeer->flags & PEER_F_LEARN_ASSIGN) {
|
|
curpeer->flags &= ~PEER_F_LEARN_ASSIGN;
|
|
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
|
peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
|
|
}
|
|
curpeer->confirm++;
|
|
}
|
|
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
|
|
|
|
if (curpeer->flags & PEER_F_LEARN_ASSIGN) {
|
|
curpeer->flags &= ~PEER_F_LEARN_ASSIGN;
|
|
peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
|
|
|
|
curpeer->flags |= PEER_F_LEARN_NOTUP2DATE;
|
|
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
|
|
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
|
|
}
|
|
curpeer->confirm++;
|
|
}
|
|
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
|
|
struct shared_table *st;
|
|
|
|
/* If stopping state */
|
|
if (stopping) {
|
|
/* Close session, push resync no more needed */
|
|
curpeer->flags |= PEER_F_TEACH_COMPLETE;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
for (st = curpeer->tables; st; st = st->next) {
|
|
st->update = st->last_pushed = st->teaching_origin;
|
|
st->flags = 0;
|
|
}
|
|
|
|
/* reset teaching flags to 0 */
|
|
curpeer->flags &= PEER_TEACH_RESET;
|
|
}
|
|
}
|
|
else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
|
|
if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
|
|
int table_id_len;
|
|
struct shared_table *st;
|
|
int table_type;
|
|
int table_keylen;
|
|
int table_id;
|
|
uint64_t table_data;
|
|
|
|
table_id = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
table_id_len = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
curpeer->remote_table = NULL;
|
|
if (!table_id_len || (msg_cur + table_id_len) >= msg_end) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
for (st = curpeer->tables; st; st = st->next) {
|
|
/* Reset IDs */
|
|
if (st->remote_id == table_id)
|
|
st->remote_id = 0;
|
|
|
|
if (!curpeer->remote_table
|
|
&& (table_id_len == strlen(st->table->id))
|
|
&& (memcmp(st->table->id, msg_cur, table_id_len) == 0)) {
|
|
curpeer->remote_table = st;
|
|
}
|
|
}
|
|
|
|
if (!curpeer->remote_table) {
|
|
goto ignore_msg;
|
|
}
|
|
|
|
msg_cur += table_id_len;
|
|
if (msg_cur >= msg_end) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
table_type = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
table_keylen = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
table_data = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
if (curpeer->remote_table->table->type != table_type
|
|
|| curpeer->remote_table->table->key_size != table_keylen) {
|
|
curpeer->remote_table = NULL;
|
|
goto ignore_msg;
|
|
}
|
|
|
|
curpeer->remote_table->remote_data = table_data;
|
|
curpeer->remote_table->remote_id = table_id;
|
|
}
|
|
else if (msg_head[1] == PEER_MSG_STKT_SWITCH) {
|
|
struct shared_table *st;
|
|
int table_id;
|
|
|
|
table_id = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
curpeer->remote_table = NULL;
|
|
for (st = curpeer->tables; st; st = st->next) {
|
|
if (st->remote_id == table_id) {
|
|
curpeer->remote_table = st;
|
|
break;
|
|
}
|
|
}
|
|
|
|
}
|
|
else if (msg_head[1] == PEER_MSG_STKT_UPDATE
|
|
|| msg_head[1] == PEER_MSG_STKT_INCUPDATE
|
|
|| msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED
|
|
|| msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
|
|
struct shared_table *st = curpeer->remote_table;
|
|
uint32_t update;
|
|
int expire;
|
|
unsigned int data_type;
|
|
void *data_ptr;
|
|
|
|
/* Here we have data message */
|
|
if (!st)
|
|
goto ignore_msg;
|
|
|
|
expire = MS_TO_TICKS(st->table->expire);
|
|
|
|
if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
|
|
msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED) {
|
|
if (msg_len < sizeof(update)) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
memcpy(&update, msg_cur, sizeof(update));
|
|
msg_cur += sizeof(update);
|
|
st->last_get = htonl(update);
|
|
}
|
|
else {
|
|
st->last_get++;
|
|
}
|
|
|
|
if (msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
|
|
msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
|
|
size_t expire_sz = sizeof expire;
|
|
|
|
if (msg_cur + expire_sz > msg_end) {
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
memcpy(&expire, msg_cur, expire_sz);
|
|
msg_cur += expire_sz;
|
|
expire = ntohl(expire);
|
|
}
|
|
|
|
newts = stksess_new(st->table, NULL);
|
|
if (!newts)
|
|
goto ignore_msg;
|
|
|
|
if (st->table->type == SMP_T_STR) {
|
|
unsigned int to_read, to_store;
|
|
|
|
to_read = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
stksess_free(st->table, newts);
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
to_store = MIN(to_read, st->table->key_size - 1);
|
|
if (msg_cur + to_store > msg_end) {
|
|
/* malformed message */
|
|
stksess_free(st->table, newts);
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
memcpy(newts->key.key, msg_cur, to_store);
|
|
newts->key.key[to_store] = 0;
|
|
msg_cur += to_read;
|
|
}
|
|
else if (st->table->type == SMP_T_SINT) {
|
|
unsigned int netinteger;
|
|
|
|
if (msg_cur + sizeof(netinteger) > msg_end) {
|
|
/* malformed message */
|
|
stksess_free(st->table, newts);
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
memcpy(&netinteger, msg_cur, sizeof(netinteger));
|
|
netinteger = ntohl(netinteger);
|
|
memcpy(newts->key.key, &netinteger, sizeof(netinteger));
|
|
msg_cur += sizeof(netinteger);
|
|
}
|
|
else {
|
|
if (msg_cur + st->table->key_size > msg_end) {
|
|
/* malformed message */
|
|
stksess_free(st->table, newts);
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
memcpy(newts->key.key, msg_cur, st->table->key_size);
|
|
msg_cur += st->table->key_size;
|
|
}
|
|
|
|
/* lookup for existing entry */
|
|
ts = stktable_lookup(st->table, newts);
|
|
if (ts) {
|
|
/* the entry already exist, we can free ours */
|
|
stktable_touch_with_exp(st->table, ts, 0, tick_add(now_ms, expire));
|
|
stksess_free(st->table, newts);
|
|
newts = NULL;
|
|
}
|
|
else {
|
|
struct eb32_node *eb;
|
|
|
|
/* create new entry */
|
|
ts = stktable_store_with_exp(st->table, newts, 0, tick_add(now_ms, expire));
|
|
newts = NULL; /* don't reuse it */
|
|
|
|
ts->upd.key= (++st->table->update)+(2147483648U);
|
|
eb = eb32_insert(&st->table->updates, &ts->upd);
|
|
if (eb != &ts->upd) {
|
|
eb32_delete(eb);
|
|
eb32_insert(&st->table->updates, &ts->upd);
|
|
}
|
|
}
|
|
|
|
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
|
|
|
|
if ((1 << data_type) & st->remote_data) {
|
|
switch (stktable_data_types[data_type].std_type) {
|
|
case STD_T_SINT: {
|
|
int data;
|
|
|
|
data = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
data_ptr = stktable_data_ptr(st->table, ts, data_type);
|
|
if (data_ptr)
|
|
stktable_data_cast(data_ptr, std_t_sint) = data;
|
|
break;
|
|
}
|
|
case STD_T_UINT: {
|
|
unsigned int data;
|
|
|
|
data = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
data_ptr = stktable_data_ptr(st->table, ts, data_type);
|
|
if (data_ptr)
|
|
stktable_data_cast(data_ptr, std_t_uint) = data;
|
|
break;
|
|
}
|
|
case STD_T_ULL: {
|
|
unsigned long long data;
|
|
|
|
data = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
data_ptr = stktable_data_ptr(st->table, ts, data_type);
|
|
if (data_ptr)
|
|
stktable_data_cast(data_ptr, std_t_ull) = data;
|
|
break;
|
|
}
|
|
case STD_T_FRQP: {
|
|
struct freq_ctr_period data;
|
|
|
|
data.curr_tick = tick_add(now_ms, -intdecode(&msg_cur, msg_end));
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
data.curr_ctr = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
data.prev_ctr = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
data_ptr = stktable_data_ptr(st->table, ts, data_type);
|
|
if (data_ptr)
|
|
stktable_data_cast(data_ptr, std_t_frqp) = data;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else if (msg_head[1] == PEER_MSG_STKT_ACK) {
|
|
/* ack message */
|
|
uint32_t table_id ;
|
|
uint32_t update;
|
|
struct shared_table *st;
|
|
|
|
table_id = intdecode(&msg_cur, msg_end);
|
|
if (!msg_cur || (msg_cur + sizeof(update) > msg_end)) {
|
|
/* malformed message */
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
memcpy(&update, msg_cur, sizeof(update));
|
|
update = ntohl(update);
|
|
|
|
for (st = curpeer->tables; st; st = st->next) {
|
|
if (st->local_id == table_id) {
|
|
st->update = update;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) {
|
|
appctx->st0 = PEER_SESS_ST_ERRPROTO;
|
|
goto switchstate;
|
|
}
|
|
|
|
ignore_msg:
|
|
/* skip consumed message */
|
|
bo_skip(si_oc(si), totl);
|
|
/* loop on that state to peek next message */
|
|
goto switchstate;
|
|
|
|
incomplete:
|
|
/* we get here when a bo_getblk() returns <= 0 in reql */
|
|
|
|
if (reql < 0) {
|
|
/* there was an error */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Need to request a resync */
|
|
if ((curpeer->flags & PEER_F_LEARN_ASSIGN) &&
|
|
(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
|
!(peers->flags & PEERS_F_RESYNC_PROCESS)) {
|
|
unsigned char msg[2];
|
|
|
|
/* Current peer was elected to request a resync */
|
|
msg[0] = PEER_MSG_CLASS_CONTROL;
|
|
msg[1] = PEER_MSG_CTRL_RESYNCREQ;
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg));
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1)
|
|
goto full;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
peers->flags |= PEERS_F_RESYNC_PROCESS;
|
|
}
|
|
|
|
/* Nothing to read, now we start to write */
|
|
|
|
if (curpeer->tables) {
|
|
struct shared_table *st;
|
|
struct shared_table *last_local_table;
|
|
|
|
last_local_table = curpeer->last_local_table;
|
|
if (!last_local_table)
|
|
last_local_table = curpeer->tables;
|
|
st = last_local_table->next;
|
|
|
|
while (1) {
|
|
if (!st)
|
|
st = curpeer->tables;
|
|
|
|
/* It remains some updates to ack */
|
|
if (st->last_get != st->last_acked) {
|
|
int msglen;
|
|
|
|
msglen = peer_prepare_ackmsg(st, trash.str, trash.size);
|
|
if (!msglen) {
|
|
/* internal error: message does not fit in trash */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), trash.str, msglen);
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1) {
|
|
goto full;
|
|
}
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
st->last_acked = st->last_get;
|
|
}
|
|
|
|
if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) {
|
|
if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
|
|
((int)(st->last_pushed - st->table->localupdate) < 0)) {
|
|
struct eb32_node *eb;
|
|
int new_pushed;
|
|
|
|
if (st != curpeer->last_local_table) {
|
|
int msglen;
|
|
|
|
msglen = peer_prepare_switchmsg(st, trash.str, trash.size);
|
|
if (!msglen) {
|
|
/* internal error: message does not fit in trash */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), trash.str, msglen);
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1) {
|
|
goto full;
|
|
}
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
curpeer->last_local_table = st;
|
|
}
|
|
|
|
/* We force new pushed to 1 to force identifier in update message */
|
|
new_pushed = 1;
|
|
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
|
while (1) {
|
|
uint32_t msglen;
|
|
struct stksess *ts;
|
|
|
|
/* push local updates */
|
|
if (!eb) {
|
|
eb = eb32_first(&st->table->updates);
|
|
if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
|
|
st->table->commitupdate = st->last_pushed = st->table->localupdate;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if ((int)(eb->key - st->table->localupdate) > 0) {
|
|
st->table->commitupdate = st->last_pushed = st->table->localupdate;
|
|
break;
|
|
}
|
|
|
|
ts = eb32_entry(eb, struct stksess, upd);
|
|
msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, 0);
|
|
if (!msglen) {
|
|
/* internal error: message does not fit in trash */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), trash.str, msglen);
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1) {
|
|
goto full;
|
|
}
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
st->last_pushed = ts->upd.key;
|
|
if ((int)(st->last_pushed - st->table->commitupdate) > 0)
|
|
st->table->commitupdate = st->last_pushed;
|
|
/* identifier may not needed in next update message */
|
|
new_pushed = 0;
|
|
|
|
eb = eb32_next(eb);
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
|
|
struct eb32_node *eb;
|
|
int new_pushed;
|
|
|
|
if (st != curpeer->last_local_table) {
|
|
int msglen;
|
|
|
|
msglen = peer_prepare_switchmsg(st, trash.str, trash.size);
|
|
if (!msglen) {
|
|
/* internal error: message does not fit in trash */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), trash.str, msglen);
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1) {
|
|
goto full;
|
|
}
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
curpeer->last_local_table = st;
|
|
}
|
|
|
|
/* We force new pushed to 1 to force identifier in update message */
|
|
new_pushed = 1;
|
|
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
|
while (1) {
|
|
uint32_t msglen;
|
|
struct stksess *ts;
|
|
int use_timed;
|
|
|
|
/* push local updates */
|
|
if (!eb) {
|
|
st->flags |= SHTABLE_F_TEACH_STAGE1;
|
|
eb = eb32_first(&st->table->updates);
|
|
if (eb)
|
|
st->last_pushed = eb->key - 1;
|
|
break;
|
|
}
|
|
|
|
ts = eb32_entry(eb, struct stksess, upd);
|
|
use_timed = !(curpeer->flags & PEER_F_DWNGRD);
|
|
msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, use_timed);
|
|
if (!msglen) {
|
|
/* internal error: message does not fit in trash */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), trash.str, msglen);
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1) {
|
|
goto full;
|
|
}
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
st->last_pushed = ts->upd.key;
|
|
/* identifier may not needed in next update message */
|
|
new_pushed = 0;
|
|
|
|
eb = eb32_next(eb);
|
|
}
|
|
}
|
|
|
|
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
|
|
struct eb32_node *eb;
|
|
int new_pushed;
|
|
|
|
if (st != curpeer->last_local_table) {
|
|
int msglen;
|
|
|
|
msglen = peer_prepare_switchmsg(st, trash.str, trash.size);
|
|
if (!msglen) {
|
|
/* internal error: message does not fit in trash */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), trash.str, msglen);
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1) {
|
|
goto full;
|
|
}
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
curpeer->last_local_table = st;
|
|
}
|
|
|
|
/* We force new pushed to 1 to force identifier in update message */
|
|
new_pushed = 1;
|
|
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
|
|
while (1) {
|
|
uint32_t msglen;
|
|
struct stksess *ts;
|
|
int use_timed;
|
|
|
|
/* push local updates */
|
|
if (!eb || eb->key > st->teaching_origin) {
|
|
st->flags |= SHTABLE_F_TEACH_STAGE2;
|
|
break;
|
|
}
|
|
|
|
ts = eb32_entry(eb, struct stksess, upd);
|
|
use_timed = !(curpeer->flags & PEER_F_DWNGRD);
|
|
msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, use_timed);
|
|
if (!msglen) {
|
|
/* internal error: message does not fit in trash */
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), trash.str, msglen);
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1) {
|
|
goto full;
|
|
}
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
st->last_pushed = ts->upd.key;
|
|
/* identifier may not needed in next update message */
|
|
new_pushed = 0;
|
|
|
|
eb = eb32_next(eb);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (st == last_local_table)
|
|
break;
|
|
st = st->next;
|
|
}
|
|
}
|
|
|
|
|
|
if ((curpeer->flags & PEER_F_TEACH_PROCESS) && !(curpeer->flags & PEER_F_TEACH_FINISHED)) {
|
|
unsigned char msg[2];
|
|
|
|
/* Current peer was elected to request a resync */
|
|
msg[0] = PEER_MSG_CLASS_CONTROL;
|
|
msg[1] = ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED) ? PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL;
|
|
/* process final lesson message */
|
|
repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg));
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1)
|
|
goto full;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
/* flag finished message sent */
|
|
curpeer->flags |= PEER_F_TEACH_FINISHED;
|
|
}
|
|
|
|
/* Confirm finished or partial messages */
|
|
while (curpeer->confirm) {
|
|
unsigned char msg[2];
|
|
|
|
/* There is a confirm messages to send */
|
|
msg[0] = PEER_MSG_CLASS_CONTROL;
|
|
msg[1] = PEER_MSG_CTRL_RESYNCCONFIRM;
|
|
|
|
/* message to buffer */
|
|
repl = bi_putblk(si_ic(si), (char *)msg, sizeof(msg));
|
|
if (repl <= 0) {
|
|
/* no more write possible */
|
|
if (repl == -1)
|
|
goto full;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
curpeer->confirm--;
|
|
}
|
|
|
|
/* noting more to do */
|
|
goto out;
|
|
}
|
|
case PEER_SESS_ST_EXIT:
|
|
repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1);
|
|
if (bi_putblk(si_ic(si), trash.str, repl) == -1)
|
|
goto full;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
case PEER_SESS_ST_ERRSIZE: {
|
|
unsigned char msg[2];
|
|
|
|
msg[0] = PEER_MSG_CLASS_ERROR;
|
|
msg[1] = PEER_MSG_ERR_SIZELIMIT;
|
|
|
|
if (bi_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1)
|
|
goto full;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
goto switchstate;
|
|
}
|
|
case PEER_SESS_ST_ERRPROTO: {
|
|
unsigned char msg[2];
|
|
|
|
msg[0] = PEER_MSG_CLASS_ERROR;
|
|
msg[1] = PEER_MSG_ERR_PROTOCOL;
|
|
|
|
if (bi_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1)
|
|
goto full;
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
/* fall through */
|
|
}
|
|
case PEER_SESS_ST_END: {
|
|
si_shutw(si);
|
|
si_shutr(si);
|
|
si_ic(si)->flags |= CF_READ_NULL;
|
|
goto out;
|
|
}
|
|
}
|
|
}
|
|
out:
|
|
si_oc(si)->flags |= CF_READ_DONTWAIT;
|
|
return;
|
|
full:
|
|
si_applet_cant_put(si);
|
|
goto out;
|
|
}
|
|
|
|
static struct applet peer_applet = {
|
|
.obj_type = OBJ_TYPE_APPLET,
|
|
.name = "<PEER>", /* used for logging */
|
|
.fct = peer_io_handler,
|
|
.release = peer_session_release,
|
|
};
|
|
|
|
/*
|
|
* Use this function to force a close of a peer session
|
|
*/
|
|
static void peer_session_forceshutdown(struct appctx *appctx)
|
|
{
|
|
struct peer *ps;
|
|
|
|
if (!appctx)
|
|
return;
|
|
|
|
if (appctx->applet != &peer_applet)
|
|
return;
|
|
|
|
ps = appctx->ctx.peers.ptr;
|
|
/* we're killing a connection, we must apply a random delay before
|
|
* retrying otherwise the other end will do the same and we can loop
|
|
* for a while.
|
|
*/
|
|
if (ps)
|
|
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
|
|
|
|
appctx->st0 = PEER_SESS_ST_END;
|
|
appctx->ctx.peers.ptr = NULL;
|
|
appctx_wakeup(appctx);
|
|
}
|
|
|
|
/* Pre-configures a peers frontend to accept incoming connections */
|
|
void peers_setup_frontend(struct proxy *fe)
|
|
{
|
|
fe->last_change = now.tv_sec;
|
|
fe->cap = PR_CAP_FE;
|
|
fe->maxconn = 0;
|
|
fe->conn_retries = CONN_RETRIES;
|
|
fe->timeout.client = MS_TO_TICKS(5000);
|
|
fe->accept = frontend_accept;
|
|
fe->default_target = &peer_applet.obj_type;
|
|
fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
|
|
fe->bind_proc = 0; /* will be filled by users */
|
|
}
|
|
|
|
/*
|
|
* Create a new peer session in assigned state (connect will start automatically)
|
|
*/
|
|
static struct appctx *peer_session_create(struct peers *peers, struct peer *peer)
|
|
{
|
|
struct listener *l = LIST_NEXT(&peers->peers_fe->conf.listeners, struct listener *, by_fe);
|
|
struct proxy *p = l->bind_conf->frontend; /* attached frontend */
|
|
struct appctx *appctx;
|
|
struct session *sess;
|
|
struct stream *s;
|
|
struct task *t;
|
|
struct connection *conn;
|
|
|
|
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
|
|
peer->statuscode = PEER_SESS_SC_CONNECTCODE;
|
|
s = NULL;
|
|
|
|
appctx = appctx_new(&peer_applet);
|
|
if (!appctx)
|
|
goto out_close;
|
|
|
|
appctx->st0 = PEER_SESS_ST_CONNECT;
|
|
appctx->ctx.peers.ptr = (void *)peer;
|
|
|
|
sess = session_new(p, l, &appctx->obj_type);
|
|
if (!sess) {
|
|
Alert("out of memory in peer_session_create().\n");
|
|
goto out_free_appctx;
|
|
}
|
|
|
|
if ((t = task_new()) == NULL) {
|
|
Alert("out of memory in peer_session_create().\n");
|
|
goto out_free_sess;
|
|
}
|
|
t->nice = l->nice;
|
|
|
|
if ((s = stream_new(sess, t, &appctx->obj_type)) == NULL) {
|
|
Alert("Failed to initialize stream in peer_session_create().\n");
|
|
goto out_free_task;
|
|
}
|
|
|
|
/* The tasks below are normally what is supposed to be done by
|
|
* fe->accept().
|
|
*/
|
|
s->flags = SF_ASSIGNED|SF_ADDR_SET;
|
|
|
|
/* applet is waiting for data */
|
|
si_applet_cant_get(&s->si[0]);
|
|
appctx_wakeup(appctx);
|
|
|
|
/* initiate an outgoing connection */
|
|
si_set_state(&s->si[1], SI_ST_ASS);
|
|
|
|
/* automatically prepare the stream interface to connect to the
|
|
* pre-initialized connection in si->conn.
|
|
*/
|
|
if (unlikely((conn = conn_new()) == NULL))
|
|
goto out_free_strm;
|
|
|
|
conn_prepare(conn, peer->proto, peer->xprt);
|
|
si_attach_conn(&s->si[1], conn);
|
|
|
|
conn->target = s->target = &s->be->obj_type;
|
|
memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
|
|
s->do_log = NULL;
|
|
s->uniq_id = 0;
|
|
|
|
s->res.flags |= CF_READ_DONTWAIT;
|
|
|
|
l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
|
|
p->feconn++;/* beconn will be increased later */
|
|
jobs++;
|
|
if (!(s->sess->listener->options & LI_O_UNLIMITED))
|
|
actconn++;
|
|
totalconn++;
|
|
|
|
peer->appctx = appctx;
|
|
return appctx;
|
|
|
|
/* Error unrolling */
|
|
out_free_strm:
|
|
LIST_DEL(&s->list);
|
|
pool_free2(pool2_stream, s);
|
|
out_free_task:
|
|
task_free(t);
|
|
out_free_sess:
|
|
session_free(sess);
|
|
out_free_appctx:
|
|
appctx_free(appctx);
|
|
out_close:
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* Task processing function to manage re-connect and peer session
|
|
* tasks wakeup on local update.
|
|
*/
|
|
static struct task *process_peer_sync(struct task * task)
|
|
{
|
|
struct peers *peers = task->context;
|
|
struct peer *ps;
|
|
struct shared_table *st;
|
|
|
|
task->expire = TICK_ETERNITY;
|
|
|
|
if (!peers->peers_fe) {
|
|
/* this one was never started, kill it */
|
|
signal_unregister_handler(peers->sighandler);
|
|
task_delete(peers->sync_task);
|
|
task_free(peers->sync_task);
|
|
peers->sync_task = NULL;
|
|
return NULL;
|
|
}
|
|
|
|
if (!stopping) {
|
|
/* Normal case (not soft stop)*/
|
|
|
|
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
|
|
(!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
|
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
|
|
/* Resync from local peer needed
|
|
no peer was assigned for the lesson
|
|
and no old local peer found
|
|
or resync timeout expire */
|
|
|
|
/* flag no more resync from local, to try resync from remotes */
|
|
peers->flags |= PEERS_F_RESYNC_LOCAL;
|
|
|
|
/* reschedule a resync */
|
|
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
|
|
}
|
|
|
|
/* For each session */
|
|
for (ps = peers->remote; ps; ps = ps->next) {
|
|
/* For each remote peers */
|
|
if (!ps->local) {
|
|
if (!ps->appctx) {
|
|
/* no active peer connection */
|
|
if (ps->statuscode == 0 ||
|
|
((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
|
|
ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
|
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
|
|
tick_is_expired(ps->reconnect, now_ms))) {
|
|
/* connection never tried
|
|
* or previous peer connection established with success
|
|
* or previous peer connection failed while connecting
|
|
* and reconnection timer is expired */
|
|
|
|
/* retry a connect */
|
|
ps->appctx = peer_session_create(peers, ps);
|
|
}
|
|
else if (!tick_is_expired(ps->reconnect, now_ms)) {
|
|
/* If previous session failed during connection
|
|
* but reconnection timer is not expired */
|
|
|
|
/* reschedule task for reconnect */
|
|
task->expire = tick_first(task->expire, ps->reconnect);
|
|
}
|
|
/* else do nothing */
|
|
} /* !ps->appctx */
|
|
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
|
|
/* current peer connection is active and established */
|
|
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
|
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
|
!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
|
/* Resync from a remote is needed
|
|
* and no peer was assigned for lesson
|
|
* and current peer may be up2date */
|
|
|
|
/* assign peer for the lesson */
|
|
ps->flags |= PEER_F_LEARN_ASSIGN;
|
|
peers->flags |= PEERS_F_RESYNC_ASSIGN;
|
|
|
|
/* wake up peer handler to handle a request of resync */
|
|
appctx_wakeup(ps->appctx);
|
|
}
|
|
else {
|
|
/* Awake session if there is data to push */
|
|
for (st = ps->tables; st ; st = st->next) {
|
|
if ((int)(st->last_pushed - st->table->localupdate) < 0) {
|
|
/* wake up the peer handler to push local updates */
|
|
appctx_wakeup(ps->appctx);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
/* else do nothing */
|
|
} /* SUCCESSCODE */
|
|
} /* !ps->peer->local */
|
|
} /* for */
|
|
|
|
/* Resync from remotes expired: consider resync is finished */
|
|
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
|
|
!(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
|
tick_is_expired(peers->resync_timeout, now_ms)) {
|
|
/* Resync from remote peer needed
|
|
* no peer was assigned for the lesson
|
|
* and resync timeout expire */
|
|
|
|
/* flag no more resync from remote, consider resync is finished */
|
|
peers->flags |= PEERS_F_RESYNC_REMOTE;
|
|
}
|
|
|
|
if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
|
|
/* Resync not finished*/
|
|
/* reschedule task to resync timeout, to ended resync if needed */
|
|
task->expire = tick_first(task->expire, peers->resync_timeout);
|
|
}
|
|
} /* !stopping */
|
|
else {
|
|
/* soft stop case */
|
|
if (task->state & TASK_WOKEN_SIGNAL) {
|
|
/* We've just recieved the signal */
|
|
if (!(peers->flags & PEERS_F_DONOTSTOP)) {
|
|
/* add DO NOT STOP flag if not present */
|
|
jobs++;
|
|
peers->flags |= PEERS_F_DONOTSTOP;
|
|
ps = peers->local;
|
|
for (st = ps->tables; st ; st = st->next)
|
|
st->table->syncing++;
|
|
}
|
|
|
|
/* disconnect all connected peers */
|
|
for (ps = peers->remote; ps; ps = ps->next) {
|
|
if (ps->appctx) {
|
|
peer_session_forceshutdown(ps->appctx);
|
|
ps->appctx = NULL;
|
|
}
|
|
}
|
|
}
|
|
|
|
ps = peers->local;
|
|
if (ps->flags & PEER_F_TEACH_COMPLETE) {
|
|
if (peers->flags & PEERS_F_DONOTSTOP) {
|
|
/* resync of new process was complete, current process can die now */
|
|
jobs--;
|
|
peers->flags &= ~PEERS_F_DONOTSTOP;
|
|
for (st = ps->tables; st ; st = st->next)
|
|
st->table->syncing--;
|
|
}
|
|
}
|
|
else if (!ps->appctx) {
|
|
/* If there's no active peer connection */
|
|
if (ps->statuscode == 0 ||
|
|
ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
|
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
|
|
ps->statuscode == PEER_SESS_SC_TRYAGAIN) {
|
|
/* connection never tried
|
|
* or previous peer connection was successfully established
|
|
* or previous tcp connect succeeded but init state incomplete
|
|
* or during previous connect, peer replies a try again statuscode */
|
|
|
|
/* connect to the peer */
|
|
peer_session_create(peers, ps);
|
|
}
|
|
else {
|
|
/* Other error cases */
|
|
if (peers->flags & PEERS_F_DONOTSTOP) {
|
|
/* unable to resync new process, current process can die now */
|
|
jobs--;
|
|
peers->flags &= ~PEERS_F_DONOTSTOP;
|
|
for (st = ps->tables; st ; st = st->next)
|
|
st->table->syncing--;
|
|
}
|
|
}
|
|
}
|
|
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
|
|
/* current peer connection is active and established
|
|
* wake up all peer handlers to push remaining local updates */
|
|
for (st = ps->tables; st ; st = st->next) {
|
|
if ((int)(st->last_pushed - st->table->localupdate) < 0) {
|
|
appctx_wakeup(ps->appctx);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
} /* stopping */
|
|
/* Wakeup for re-connect */
|
|
return task;
|
|
}
|
|
|
|
|
|
/*
|
|
*
|
|
*/
|
|
void peers_init_sync(struct peers *peers)
|
|
{
|
|
struct peer * curpeer;
|
|
struct listener *listener;
|
|
|
|
for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
|
|
peers->peers_fe->maxconn += 3;
|
|
}
|
|
|
|
list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
|
|
listener->maxconn = peers->peers_fe->maxconn;
|
|
peers->sync_task = task_new();
|
|
peers->sync_task->process = process_peer_sync;
|
|
peers->sync_task->expire = TICK_ETERNITY;
|
|
peers->sync_task->context = (void *)peers;
|
|
peers->sighandler = signal_register_task(0, peers->sync_task, 0);
|
|
task_wakeup(peers->sync_task, TASK_WOKEN_INIT);
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
* Function used to register a table for sync on a group of peers
|
|
*
|
|
*/
|
|
void peers_register_table(struct peers *peers, struct stktable *table)
|
|
{
|
|
struct shared_table *st;
|
|
struct peer * curpeer;
|
|
int id = 0;
|
|
|
|
for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
|
|
st = calloc(1,sizeof(*st));
|
|
st->table = table;
|
|
st->next = curpeer->tables;
|
|
if (curpeer->tables)
|
|
id = curpeer->tables->local_id;
|
|
st->local_id = id + 1;
|
|
|
|
curpeer->tables = st;
|
|
}
|
|
|
|
table->sync_task = peers->sync_task;
|
|
}
|
|
|