MINOR: peers: Add a message for heartbeat.
This patch implements peer heartbeat feature to prevent any haproxy peer from reconnecting too often, consuming sockets for nothing. To do so, we add PEER_MSG_CTRL_HEARTBEAT new message to PEER_MSG_CLASS_CONTROL peers control class of messages. A ->heartbeat field is added to peer structs to store the heatbeat timeout value which is handled by the same function as for ->reconnect to control the session timeouts. A 2-bytes heartbeat message is sent every 3s when no updates have to be sent. This way, the peer which receives such a message is sure the remote peer is still alive. So, it resets the ->reconnect peer session timeout to its initial value (5s). This prevents any reconnection to an already connected alive peer.
This commit is contained in:
parent
c8d5b95e6d
commit
645635da84
|
@ -62,6 +62,7 @@ struct peer {
|
|||
unsigned int flags; /* peer session flags */
|
||||
unsigned int statuscode; /* current/last session status code */
|
||||
unsigned int reconnect; /* next connect timer */
|
||||
unsigned int heartbeat; /* next heartbeat timer */
|
||||
unsigned int confirm; /* confirm message counter */
|
||||
struct appctx *appctx; /* the appctx running it */
|
||||
struct shared_table *remote_table;
|
||||
|
|
48
src/peers.c
48
src/peers.c
|
@ -82,11 +82,14 @@
|
|||
#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
|
||||
#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
|
||||
#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
|
||||
#define PEER_F_HEARTBEAT 0x40000000 /* Heartbeat message to send. */
|
||||
#define PEER_F_DWNGRD 0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */
|
||||
|
||||
#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
|
||||
#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
|
||||
|
||||
#define PEER_HEARTBEAT_TIMEOUT 3000 /* 3 seconds */
|
||||
|
||||
/*****************************/
|
||||
/* Sync message class */
|
||||
/*****************************/
|
||||
|
@ -105,6 +108,7 @@ enum {
|
|||
PEER_MSG_CTRL_RESYNCFINISHED,
|
||||
PEER_MSG_CTRL_RESYNCPARTIAL,
|
||||
PEER_MSG_CTRL_RESYNCCONFIRM,
|
||||
PEER_MSG_CTRL_HEARTBEAT,
|
||||
};
|
||||
|
||||
/*****************************/
|
||||
|
@ -886,6 +890,22 @@ static inline int peer_send_resync_finishedmsg(struct appctx *appctx, struct pee
|
|||
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
||||
}
|
||||
|
||||
/*
|
||||
* Send a heartbeat message.
|
||||
* Return 0 if the message could not be built modifying the appctx st0 to PEER_SESS_ST_END value.
|
||||
* Returns -1 if there was not enough room left to send the message,
|
||||
* any other negative returned value must be considered as an error with an appctx st0
|
||||
* returned value equal to PEER_SESS_ST_END.
|
||||
*/
|
||||
static inline int peer_send_heartbeatmsg(struct appctx *appctx)
|
||||
{
|
||||
struct peer_prep_params p = {
|
||||
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, },
|
||||
};
|
||||
|
||||
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
|
||||
}
|
||||
|
||||
/*
|
||||
* Build a peer protocol error class message.
|
||||
* Returns the number of written bytes used to build the message if succeeded,
|
||||
|
@ -1605,6 +1625,9 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
|
|||
/* reset teaching flags to 0 */
|
||||
peer->flags &= PEER_TEACH_RESET;
|
||||
}
|
||||
else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
|
||||
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
|
||||
}
|
||||
}
|
||||
else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
|
||||
if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
|
||||
|
@ -2146,6 +2169,15 @@ static void peer_io_handler(struct appctx *appctx)
|
|||
goto switchstate;
|
||||
|
||||
send_msgs:
|
||||
if (curpeer->flags & PEER_F_HEARTBEAT) {
|
||||
curpeer->flags &= ~PEER_F_HEARTBEAT;
|
||||
repl = peer_send_heartbeatmsg(appctx);
|
||||
if (repl <= 0) {
|
||||
if (repl == -1)
|
||||
goto out;
|
||||
goto switchstate;
|
||||
}
|
||||
}
|
||||
/* we get here when a peer_recv_msg() returns 0 in reql */
|
||||
repl = peer_send_msgs(appctx, curpeer);
|
||||
if (repl <= 0) {
|
||||
|
@ -2264,6 +2296,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||
struct conn_stream *cs;
|
||||
|
||||
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
|
||||
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
peer->statuscode = PEER_SESS_SC_CONNECTCODE;
|
||||
s = NULL;
|
||||
|
||||
|
@ -2342,8 +2375,8 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
|
|||
}
|
||||
|
||||
/*
|
||||
* Task processing function to manage re-connect and peer session
|
||||
* tasks wakeup on local update.
|
||||
* Task processing function to manage re-connect, peer session
|
||||
* tasks wakeup on local update and heartbeat.
|
||||
*/
|
||||
static struct task *process_peer_sync(struct task * task, void *context, unsigned short state)
|
||||
{
|
||||
|
@ -2429,14 +2462,25 @@ static struct task *process_peer_sync(struct task * task, void *context, unsigne
|
|||
appctx_wakeup(ps->appctx);
|
||||
}
|
||||
else {
|
||||
int update_to_push = 0;
|
||||
|
||||
/* Awake session if there is data to push */
|
||||
for (st = ps->tables; st ; st = st->next) {
|
||||
if ((int)(st->last_pushed - st->table->localupdate) < 0) {
|
||||
/* wake up the peer handler to push local updates */
|
||||
update_to_push = 1;
|
||||
ps->flags &= ~PEER_F_HEARTBEAT;
|
||||
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
appctx_wakeup(ps->appctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!update_to_push && tick_is_expired(ps->heartbeat, now_ms)) {
|
||||
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
ps->flags |= PEER_F_HEARTBEAT;
|
||||
appctx_wakeup(ps->appctx);
|
||||
}
|
||||
task->expire = tick_first(task->expire, ps->heartbeat);
|
||||
}
|
||||
/* else do nothing */
|
||||
} /* SUCCESSCODE */
|
||||
|
|
Loading…
Reference in New Issue