From 645635da847b027d5f651b99c31d3ae006c73f19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20L=C3=A9caille?= Date: Mon, 11 Feb 2019 17:49:39 +0100 Subject: [PATCH] MINOR: peers: Add a message for heartbeat. This patch implements peer heartbeat feature to prevent any haproxy peer from reconnecting too often, consuming sockets for nothing. To do so, we add PEER_MSG_CTRL_HEARTBEAT new message to PEER_MSG_CLASS_CONTROL peers control class of messages. A ->heartbeat field is added to peer structs to store the heatbeat timeout value which is handled by the same function as for ->reconnect to control the session timeouts. A 2-bytes heartbeat message is sent every 3s when no updates have to be sent. This way, the peer which receives such a message is sure the remote peer is still alive. So, it resets the ->reconnect peer session timeout to its initial value (5s). This prevents any reconnection to an already connected alive peer. --- include/types/peers.h | 1 + src/peers.c | 48 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/include/types/peers.h b/include/types/peers.h index 5200d56b7a..6bc99c245b 100644 --- a/include/types/peers.h +++ b/include/types/peers.h @@ -62,6 +62,7 @@ struct peer { unsigned int flags; /* peer session flags */ unsigned int statuscode; /* current/last session status code */ unsigned int reconnect; /* next connect timer */ + unsigned int heartbeat; /* next heartbeat timer */ unsigned int confirm; /* confirm message counter */ struct appctx *appctx; /* the appctx running it */ struct shared_table *remote_table; diff --git a/src/peers.c b/src/peers.c index 743bce88e8..016d41daa8 100644 --- a/src/peers.c +++ b/src/peers.c @@ -82,11 +82,14 @@ #define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */ #define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */ #define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */ +#define PEER_F_HEARTBEAT 0x40000000 /* Heartbeat message to send. */ #define PEER_F_DWNGRD 0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */ #define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */ #define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE) +#define PEER_HEARTBEAT_TIMEOUT 3000 /* 3 seconds */ + /*****************************/ /* Sync message class */ /*****************************/ @@ -105,6 +108,7 @@ enum { PEER_MSG_CTRL_RESYNCFINISHED, PEER_MSG_CTRL_RESYNCPARTIAL, PEER_MSG_CTRL_RESYNCCONFIRM, + PEER_MSG_CTRL_HEARTBEAT, }; /*****************************/ @@ -886,6 +890,22 @@ static inline int peer_send_resync_finishedmsg(struct appctx *appctx, struct pee return peer_send_msg(appctx, peer_prepare_control_msg, &p); } +/* + * Send a heartbeat message. + * Return 0 if the message could not be built modifying the appctx st0 to PEER_SESS_ST_END value. + * Returns -1 if there was not enough room left to send the message, + * any other negative returned value must be considered as an error with an appctx st0 + * returned value equal to PEER_SESS_ST_END. + */ +static inline int peer_send_heartbeatmsg(struct appctx *appctx) +{ + struct peer_prep_params p = { + .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, }, + }; + + return peer_send_msg(appctx, peer_prepare_control_msg, &p); +} + /* * Build a peer protocol error class message. * Returns the number of written bytes used to build the message if succeeded, @@ -1605,6 +1625,9 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee /* reset teaching flags to 0 */ peer->flags &= PEER_TEACH_RESET; } + else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) { + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); + } } else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) { if (msg_head[1] == PEER_MSG_STKT_DEFINE) { @@ -2146,6 +2169,15 @@ static void peer_io_handler(struct appctx *appctx) goto switchstate; send_msgs: + if (curpeer->flags & PEER_F_HEARTBEAT) { + curpeer->flags &= ~PEER_F_HEARTBEAT; + repl = peer_send_heartbeatmsg(appctx); + if (repl <= 0) { + if (repl == -1) + goto out; + goto switchstate; + } + } /* we get here when a peer_recv_msg() returns 0 in reql */ repl = peer_send_msgs(appctx, curpeer); if (repl <= 0) { @@ -2264,6 +2296,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer struct conn_stream *cs; peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000)); + peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); peer->statuscode = PEER_SESS_SC_CONNECTCODE; s = NULL; @@ -2342,8 +2375,8 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer } /* - * Task processing function to manage re-connect and peer session - * tasks wakeup on local update. + * Task processing function to manage re-connect, peer session + * tasks wakeup on local update and heartbeat. */ static struct task *process_peer_sync(struct task * task, void *context, unsigned short state) { @@ -2429,14 +2462,25 @@ static struct task *process_peer_sync(struct task * task, void *context, unsigne appctx_wakeup(ps->appctx); } else { + int update_to_push = 0; + /* Awake session if there is data to push */ for (st = ps->tables; st ; st = st->next) { if ((int)(st->last_pushed - st->table->localupdate) < 0) { /* wake up the peer handler to push local updates */ + update_to_push = 1; + ps->flags &= ~PEER_F_HEARTBEAT; + ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); appctx_wakeup(ps->appctx); break; } } + if (!update_to_push && tick_is_expired(ps->heartbeat, now_ms)) { + ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); + ps->flags |= PEER_F_HEARTBEAT; + appctx_wakeup(ps->appctx); + } + task->expire = tick_first(task->expire, ps->heartbeat); } /* else do nothing */ } /* SUCCESSCODE */