mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-02-14 17:37:46 +00:00
MINOR: peers: Support for peer shards
Add "shards" new keyword for "peers" section to configure the number of peer shards attached to such secions. This impact all the stick-tables attached to the section. Add "shard" new "server" parameter to configure the peers which participate to all the stick-tables contents distribution. Each peer receive the stick-tables updates only for keys with this shard value as distribution hash. The "shard" value is stored in ->shard new server struct member. cfg_parse_peers() which is the function which is called to parse all the lines of a "peers" section is modified to parse the "shards" parameter stored in ->nb_shards new peers struct member. Add srv_parse_shard() new callback into server.c to pare the "shard" parameter. Implement stksess_getkey_hash() to compute the distribution hash for a stick-table key as the 64-bits xxhash of the key concatenated to the stick-table name. This function is called by stksess_setkey_shard(), itself called by the already implemented function which create a new stick-table key (stksess_new()). Add ->idlen new stktable struct member to store the stick-table name length to not have to compute it each time a stick-table key hash is computed.
This commit is contained in:
parent
7941ead3aa
commit
36d1565640
@ -3428,6 +3428,13 @@ server <peername> [<ip>:<port>] [param*]
|
||||
server haproxy2 192.168.0.2:1024
|
||||
server haproxy3 10.2.0.1:1024
|
||||
|
||||
shards <shards>
|
||||
|
||||
In some configurations, one would like to distribute the stick-table contents
|
||||
to some peers in place of sending all the stick-table contents to each peer
|
||||
declared in the "peers" section. In such cases, "shards" specifies the
|
||||
number of peer involved in this stick-table contents distribution.
|
||||
See also "shard" server parameter.
|
||||
|
||||
table <tablename> type {ip | integer | string [len <length>] | binary [len <length>]}
|
||||
size <size> [expire <expire>] [nopurge] [store <data_type>]*
|
||||
@ -15646,6 +15653,25 @@ send-proxy-v2-ssl-cn
|
||||
protocol. See also the "no-send-proxy-v2-ssl-cn" option of this section and
|
||||
the "send-proxy-v2" option of the "bind" keyword.
|
||||
|
||||
shard <shard>
|
||||
This parameter in used only in the context of stick-tables synchronisation
|
||||
with peers protocol. The "shard" parameter identifies the peers which will
|
||||
receive all the stick-table updates for keys with this shard as distribution
|
||||
hash. The accepted values are 0 up to "shards" parameter value specified in
|
||||
the "peers" section. 0 value is the default value meaning that the peer will
|
||||
receive all the key updates. Greater values than "shards" will be ignored.
|
||||
This is also the case for any value provided to the local peer.
|
||||
|
||||
Example :
|
||||
|
||||
peers mypeers
|
||||
shards 3
|
||||
peer A 127.0.0.1:40001 # local peer without shard value (0 internally)
|
||||
peer B 127.0.0.1:40002 shard 1
|
||||
peer C 127.0.0.1:40003 shard 2
|
||||
peer D 127.0.0.1:40004 shard 3
|
||||
|
||||
|
||||
slowstart <start_time_in_ms>
|
||||
The "slowstart" parameter for a server accepts a value in milliseconds which
|
||||
indicates after how long a server which has just come back up will run at
|
||||
|
@ -101,6 +101,7 @@ struct peers {
|
||||
unsigned int flags; /* current peers section resync state */
|
||||
unsigned int resync_timeout; /* resync timeout timer */
|
||||
int count; /* total of peers */
|
||||
int nb_shards; /* Number of peer shards */
|
||||
int disabled; /* peers proxy disabled if >0 */
|
||||
int applet_count[MAX_THREADS]; /* applet count per thread */
|
||||
};
|
||||
|
@ -264,6 +264,7 @@ struct server {
|
||||
unsigned rweight; /* remainder of weight in the current LB tree */
|
||||
unsigned cumulative_weight; /* weight of servers prior to this one in the same group, for chash balancing */
|
||||
int maxqueue; /* maximum number of pending connections allowed */
|
||||
int shard; /* shard (in peers protocol context only) */
|
||||
|
||||
enum srv_ws_mode ws; /* configure the protocol selection for websocket */
|
||||
/* 3 bytes hole here */
|
||||
|
@ -145,6 +145,7 @@ struct stksess {
|
||||
unsigned int expire; /* session expiration date */
|
||||
unsigned int ref_cnt; /* reference count, can only purge when zero */
|
||||
__decl_thread(HA_RWLOCK_T lock); /* lock related to the table entry */
|
||||
int shard; /* shard */
|
||||
struct eb32_node exp; /* ebtree node used to hold the session in expiration tree */
|
||||
struct eb32_node upd; /* ebtree node used to hold the update sequence tree */
|
||||
struct ebmb_node key; /* ebtree node used to hold the session in table */
|
||||
@ -155,6 +156,7 @@ struct stksess {
|
||||
/* stick table */
|
||||
struct stktable {
|
||||
char *id; /* local table id name. */
|
||||
size_t idlen; /* local table id name length. */
|
||||
char *nid; /* table id name sent over the network with peers protocol. */
|
||||
struct stktable *next; /* The stick-table may be linked when belonging to
|
||||
* the same configuration section.
|
||||
|
@ -685,6 +685,7 @@ static struct peer *cfg_peers_add_peer(struct peers *peers,
|
||||
int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
|
||||
{
|
||||
static struct peers *curpeers = NULL;
|
||||
static int nb_shards = 0;
|
||||
struct peer *newpeer = NULL;
|
||||
const char *err;
|
||||
struct bind_conf *bind_conf;
|
||||
@ -905,6 +906,13 @@ int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (nb_shards && curpeers->peers_fe->srv->shard > nb_shards) {
|
||||
ha_warning("parsing [%s:%d] : '%s %s' : %d peer shard greater value than %d shards value is ignored.\n",
|
||||
file, linenum, args[0], args[1], curpeers->peers_fe->srv->shard, nb_shards);
|
||||
curpeers->peers_fe->srv->shard = 0;
|
||||
err_code |= ERR_WARN;
|
||||
}
|
||||
|
||||
if (curpeers->peers_fe->srv->init_addr_methods || curpeers->peers_fe->srv->resolvers_id ||
|
||||
curpeers->peers_fe->srv->do_check || curpeers->peers_fe->srv->do_agent) {
|
||||
ha_warning("parsing [%s:%d] : '%s %s' : init_addr, resolvers, check and agent are ignored for peers.\n", file, linenum, args[0], args[1]);
|
||||
@ -966,6 +974,32 @@ int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
|
||||
l->options |= LI_O_UNLIMITED; /* don't make the peers subject to global limits */
|
||||
global.maxsock++; /* for the listening socket */
|
||||
}
|
||||
else if (strcmp(args[0], "shards") == 0) {
|
||||
char *endptr;
|
||||
|
||||
if (!*args[1]) {
|
||||
ha_alert("parsing [%s:%d] : '%s' : missing value\n", file, linenum, args[0]);
|
||||
err_code |= ERR_FATAL;
|
||||
goto out;
|
||||
}
|
||||
|
||||
curpeers->nb_shards = strtol(args[1], &endptr, 10);
|
||||
if (*endptr != '\0') {
|
||||
ha_alert("parsing [%s:%d] : '%s' : expects an integer argument, found '%s'\n",
|
||||
file, linenum, args[0], args[1]);
|
||||
err_code |= ERR_FATAL;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (!curpeers->nb_shards) {
|
||||
ha_alert("parsing [%s:%d] : '%s' : expects a strictly positive integer argument\n",
|
||||
file, linenum, args[0]);
|
||||
err_code |= ERR_FATAL;
|
||||
goto out;
|
||||
}
|
||||
|
||||
nb_shards = curpeers->nb_shards;
|
||||
}
|
||||
else if (strcmp(args[0], "table") == 0) {
|
||||
struct stktable *t, *other;
|
||||
char *id;
|
||||
@ -4373,6 +4407,7 @@ init_proxies_list_stage2:
|
||||
*/
|
||||
last = &cfg_peers;
|
||||
while (*last) {
|
||||
struct peer *peer;
|
||||
struct stktable *t;
|
||||
curpeers = *last;
|
||||
|
||||
@ -4464,6 +4499,26 @@ init_proxies_list_stage2:
|
||||
break;
|
||||
}
|
||||
last = &curpeers->next;
|
||||
|
||||
/* Ignore the peer shard greater than the number of peer shard for this section.
|
||||
* Also ignore the peer shard of the local peer.
|
||||
*/
|
||||
for (peer = curpeers->remote; peer; peer = peer->next) {
|
||||
if (peer == curpeers->local) {
|
||||
if (peer->srv->shard) {
|
||||
ha_warning("Peers section '%s': shard ignored for '%s' local peer\n",
|
||||
curpeers->id, peer->id);
|
||||
peer->srv->shard = 0;
|
||||
}
|
||||
}
|
||||
else if (peer->srv->shard > curpeers->nb_shards) {
|
||||
ha_warning("Peers section '%s': shard ignored for '%s' local peer because "
|
||||
"%d shard value is greater than the section number of shards (%d)\n",
|
||||
curpeers->id, peer->id, peer->srv->shard, curpeers->nb_shards);
|
||||
peer->srv->shard = 0;
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1597,6 +1597,13 @@ static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
|
||||
}
|
||||
|
||||
updateid = ts->upd.key;
|
||||
if (p->srv->shard && ts->shard != p->srv->shard) {
|
||||
/* Skip this entry */
|
||||
st->last_pushed = updateid;
|
||||
new_pushed = 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
ts->ref_cnt++;
|
||||
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->lock);
|
||||
|
||||
|
@ -807,6 +807,14 @@ static int srv_parse_no_send_proxy_v2(char **args, int *cur_arg,
|
||||
return srv_disable_pp_flags(newsrv, SRV_PP_V2);
|
||||
}
|
||||
|
||||
/* Parse the "shard" server keyword */
|
||||
static int srv_parse_shard(char **args, int *cur_arg,
|
||||
struct proxy *curproxy, struct server *newsrv, char **err)
|
||||
{
|
||||
newsrv->shard = atol(args[*cur_arg + 1]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Parse the "no-tfo" server keyword */
|
||||
static int srv_parse_no_tfo(char **args, int *cur_arg,
|
||||
struct proxy *curproxy, struct server *newsrv, char **err)
|
||||
@ -1791,6 +1799,7 @@ static struct srv_kw_list srv_kws = { "ALL", { }, {
|
||||
{ "resolvers", srv_parse_resolvers, 1, 1, 0 }, /* Configure the resolver to use for name resolution */
|
||||
{ "send-proxy", srv_parse_send_proxy, 0, 1, 1 }, /* Enforce use of PROXY V1 protocol */
|
||||
{ "send-proxy-v2", srv_parse_send_proxy_v2, 0, 1, 1 }, /* Enforce use of PROXY V2 protocol */
|
||||
{ "shard", srv_parse_shard, 1, 1, 1 }, /* Server shard (only in peers protocol context) */
|
||||
{ "slowstart", srv_parse_slowstart, 1, 1, 1 }, /* Set the warm-up timer for a previously failed server */
|
||||
{ "source", srv_parse_source, -1, 1, 1 }, /* Set the source address to be used to connect to the server */
|
||||
{ "stick", srv_parse_stick, 0, 1, 0 }, /* Enable stick-table persistence */
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <import/ebmbtree.h>
|
||||
#include <import/ebsttree.h>
|
||||
#include <import/ebistree.h>
|
||||
#include <import/xxhash.h>
|
||||
|
||||
#include <haproxy/api.h>
|
||||
#include <haproxy/applet.h>
|
||||
@ -155,6 +156,48 @@ void stksess_setkey(struct stktable *t, struct stksess *ts, struct stktable_key
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize or update the key hash in the sticky session <ts> present in table <t>
|
||||
* from the value present in <key>.
|
||||
*/
|
||||
static unsigned long long stksess_getkey_hash(struct stktable *t,
|
||||
struct stksess *ts,
|
||||
struct stktable_key *key)
|
||||
{
|
||||
struct buffer *buf;
|
||||
size_t keylen;
|
||||
|
||||
/* Copy the stick-table id into <buf> */
|
||||
buf = get_trash_chunk();
|
||||
memcpy(b_tail(buf), t->id, t->idlen);
|
||||
b_add(buf, t->idlen);
|
||||
/* Copy the key into <buf> */
|
||||
if (t->type == SMP_T_STR)
|
||||
keylen = key->key_len;
|
||||
else
|
||||
keylen = t->key_size;
|
||||
memcpy(b_tail(buf), key->key, keylen);
|
||||
b_add(buf, keylen);
|
||||
|
||||
return XXH64(b_head(buf), b_data(buf), 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Set the shard for <key> key of <ts> sticky session attached to <t> stick table.
|
||||
* Do nothing for stick-table without peers synchronisation.
|
||||
*/
|
||||
static void stksess_setkey_shard(struct stktable *t, struct stksess *ts,
|
||||
struct stktable_key *key)
|
||||
{
|
||||
if (!t->peers.p)
|
||||
/* This stick-table is not attached to any peers section */
|
||||
return;
|
||||
|
||||
if (!t->peers.p->nb_shards)
|
||||
ts->shard = 0;
|
||||
else
|
||||
ts->shard = stksess_getkey_hash(t, ts, key) % t->peers.p->nb_shards + 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Init sticky session <ts> of table <t>. The data parts are cleared and <ts>
|
||||
@ -282,8 +325,10 @@ struct stksess *stksess_new(struct stktable *t, struct stktable_key *key)
|
||||
if (ts) {
|
||||
ts = (void *)ts + round_ptr_size(t->data_size);
|
||||
__stksess_init(t, ts);
|
||||
if (key)
|
||||
if (key) {
|
||||
stksess_setkey(t, ts, key);
|
||||
stksess_setkey_shard(t, ts, key);
|
||||
}
|
||||
}
|
||||
|
||||
return ts;
|
||||
@ -851,6 +896,7 @@ int parse_stick_table(const char *file, int linenum, char **args,
|
||||
}
|
||||
|
||||
t->id = id;
|
||||
t->idlen = strlen(id);
|
||||
t->nid = nid;
|
||||
t->type = (unsigned int)-1;
|
||||
t->conf.file = file;
|
||||
|
Loading…
Reference in New Issue
Block a user