MEDIUM: peers: handle arrays of std types in peers protocol

This patch adds support of array data_types on the peer protocol.

The table definition message will provide an additionnal parameter
for array data-types: the number of elements of the array.

In case of array of frqp it also provides a second parameter:
the period used to compute freq counter.

The array elements are std_type values linearly encoded in
the update message.

Note: if a remote peer announces an array data_type without
parameters into the table definition message, all updates
on this table will be ignored because we can not
parse update messages consistently.
This commit is contained in:
Emeric Brun 2021-06-22 16:09:55 +02:00 committed by Willy Tarreau
parent c64a2a307c
commit 90a9b676a8
2 changed files with 299 additions and 12 deletions

View File

@ -40,6 +40,7 @@ struct shared_table {
int remote_id;
int flags;
uint64_t remote_data;
unsigned int remote_data_nbelem[STKTABLE_DATA_TYPES];
unsigned int last_acked;
unsigned int last_pushed;
unsigned int last_get;

View File

@ -715,6 +715,67 @@ static int peer_prepare_updatemsg(char *msg, size_t size, struct peer_prep_param
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr) {
/* in case of array all elements use
* the same std_type and they are linearly
* encoded.
*/
if (stktable_data_types[data_type].is_array) {
unsigned int idx = 0;
switch (stktable_data_types[data_type].std_type) {
case STD_T_SINT: {
int data;
do {
data = stktable_data_cast(data_ptr, std_t_sint);
intencode(data, &cursor);
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
} while(data_ptr);
break;
}
case STD_T_UINT: {
unsigned int data;
do {
data = stktable_data_cast(data_ptr, std_t_uint);
intencode(data, &cursor);
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
} while(data_ptr);
break;
}
case STD_T_ULL: {
unsigned long long data;
do {
data = stktable_data_cast(data_ptr, std_t_ull);
intencode(data, &cursor);
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
} while(data_ptr);
break;
}
case STD_T_FRQP: {
struct freq_ctr *frqp;
do {
frqp = &stktable_data_cast(data_ptr, std_t_frqp);
intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor);
intencode(frqp->curr_ctr, &cursor);
intencode(frqp->prev_ctr, &cursor);
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
} while(data_ptr);
break;
}
}
/* array elements fully encoded
* proceed next data_type.
*/
continue;
}
switch (stktable_data_types[data_type].std_type) {
case STD_T_SINT: {
int data;
@ -854,19 +915,58 @@ static int peer_prepare_switchmsg(char *msg, size_t size, struct peer_prep_param
/* encode available known data types in table */
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
if (st->table->data_ofs[data_type]) {
switch (stktable_data_types[data_type].std_type) {
case STD_T_SINT:
case STD_T_UINT:
case STD_T_ULL:
case STD_T_DICT:
data |= 1ULL << data_type;
break;
case STD_T_FRQP:
data |= 1ULL << data_type;
intencode(data_type, &chunkq);
/* stored data types parameters are all linearly encoded
* at the end of the 'table definition' message.
*
* Currently only array data_types and and data_types
* using freq_counter base type have parameters:
*
* - array has always at least one parameter set to the
* number of elements.
*
* - array of base-type freq_counters has an additional
* parameter set to the period used to compute those
* freq_counters.
*
* - simple freq counter has a parameter set to the period
* used to compute
*
* A set of parameter for a datatype MUST BE prefixed
* by the data-type id itself:
* This is useless because the data_types are ordered and
* the data_type bitfield already gives the information of
* stored types, but it was designed this way when the
* push of period parameter was added for freq counters
* and we don't want to break the compatibility.
*
*/
if (stktable_data_types[data_type].is_array) {
/* This is an array type so we first encode
* the data_type itself to prefix parameters
*/
intencode(data_type, &chunkq);
/* We encode the first parameter which is
* the number of elements of this array
*/
intencode(st->table->data_nbelem[data_type], &chunkq);
/* for array of freq counters, there is an additionnal
* period parameter to encode
*/
if (stktable_data_types[data_type].std_type == STD_T_FRQP)
intencode(st->table->data_arg[data_type].u, &chunkq);
break;
}
else if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
/* this datatype is a simple freq counter not part
* of an array. We encode the data_type itself
* to prefix the 'period' parameter
*/
intencode(data_type, &chunkq);
intencode(st->table->data_arg[data_type].u, &chunkq);
}
/* set the bit corresponding to stored data type */
data |= 1ULL << data_type;
}
}
intencode(data, &cursor);
@ -1670,10 +1770,98 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
uint64_t decoded_int;
unsigned int idx;
if (!((1ULL << data_type) & st->remote_data))
continue;
if (stktable_data_types[data_type].is_array) {
/* in case of array all elements
* use the same std_type and they
* are linearly encoded.
* The number of elements was provided
* by table definition message
*/
switch (stktable_data_types[data_type].std_type) {
case STD_T_SINT:
for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
goto malformed_unlock;
}
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_sint) = decoded_int;
}
break;
case STD_T_UINT:
for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
goto malformed_unlock;
}
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_uint) = decoded_int;
}
break;
case STD_T_ULL:
for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
goto malformed_unlock;
}
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_ull) = decoded_int;
}
break;
case STD_T_FRQP:
for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
struct freq_ctr data;
/* First bit is reserved for the freq_ctr lock
* Note: here we're still protected by the stksess lock
* so we don't need to update the update the freq_ctr
* using its internal lock.
*/
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
goto malformed_unlock;
}
data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
data.curr_ctr = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
goto malformed_unlock;
}
data.prev_ctr = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
goto malformed_unlock;
}
data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_frqp) = data;
}
break;
}
/* array is fully decoded
* proceed next data_type.
*/
continue;
}
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
@ -1705,7 +1893,8 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
/* First bit is reserved for the freq_ctr lock
Note: here we're still protected by the stksess lock
so we don't need to update the update the freq_ctr
using its internal lock */
using its internal lock.
*/
data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
data.curr_ctr = intdecode(msg_cur, msg_end);
@ -1990,6 +2179,103 @@ static inline int peer_treat_definemsg(struct appctx *appctx, struct peer *p,
goto ignore_msg;
}
/* Check if there there is the additionnal expire data */
intdecode(msg_cur, msg_end);
if (*msg_cur) {
uint64_t data_type;
uint64_t type;
/* This define contains the expire data so we consider
* it also contain all data_types parameters.
*/
for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) {
if (table_data & (1ULL << data_type)) {
if (stktable_data_types[data_type].is_array) {
/* This should be an array
* so we parse the data_type prefix
* because we must have parameters.
*/
type = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
TRACE_PROTO("missing meta data for array", PEERS_EV_DEFMSG, NULL, p);
goto ignore_msg;
}
/* check if the data_type match the current from the bitfield */
if (type != data_type) {
p->remote_table = NULL;
TRACE_PROTO("meta data missmatch type", PEERS_EV_DEFMSG, NULL, p);
goto ignore_msg;
}
/* decode the nbelem of the array */
p->remote_table->remote_data_nbelem[type] = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
TRACE_PROTO("missing array size meta data for array", PEERS_EV_DEFMSG, NULL, p);
goto ignore_msg;
}
/* if it is an array of frqp, we must also have the period to decode */
if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
TRACE_PROTO("missing period for frqp", PEERS_EV_DEFMSG, NULL, p);
goto ignore_msg;
}
}
}
else if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
/* This should be a std freq counter data_type
* so we parse the data_type prefix
* because we must have parameters.
*/
type = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
TRACE_PROTO("missing meta data for frqp", PEERS_EV_DEFMSG, NULL, p);
goto ignore_msg;
}
/* check if the data_type match the current from the bitfield */
if (type != data_type) {
p->remote_table = NULL;
TRACE_PROTO("meta data missmatch type", PEERS_EV_DEFMSG, NULL, p);
goto ignore_msg;
}
/* decode the period */
intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
TRACE_PROTO("missing period for frqp", PEERS_EV_DEFMSG, NULL, p);
goto ignore_msg;
}
}
}
}
}
else {
uint64_t data_type;
/* There is not additional data but
* array size parameter is mandatory to parse array
* so we consider an error if an array data_type is define
* but there is no additional data.
*/
for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) {
if (table_data & (1ULL << data_type)) {
if (stktable_data_types[data_type].is_array) {
p->remote_table = NULL;
TRACE_PROTO("missing array size meta data for array", PEERS_EV_DEFMSG, NULL, p);
goto ignore_msg;
}
}
}
}
p->remote_table->remote_data = table_data;
p->remote_table->remote_id = table_id;
return 1;