MINOR: peers: Move update receive code to reduce the size of the I/O handler.

This patch implements a new function to treat the stick-table
update messages so that to reduce the size of the peer I/O handler
by ~200 lines.

May be backported as far as 1.5.
This commit is contained in:
Frdric Lcaille 2019-01-23 11:16:57 +01:00 committed by Willy Tarreau
parent 6a8303d49e
commit 168a34b45f

View File

@ -858,6 +858,232 @@ static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer
return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st, 0);
}
/*
* Function used to parse a stick-table update message after it has been received
* by <p> peer with <msg_cur> as address of the pointer to the position in the
* receipt buffer with <msg_end> being position of the end of the stick-table message.
* Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error
* was encountered.
* <exp> must be set if the stick-table entry expires.
* <updt> must be set for PEER_MSG_STKT_UPDATE or PEER_MSG_STKT_UPDATE_TIMED stick-table
* messages, in this case the stick-table udpate message is received with a stick-table
* update ID.
* <totl> is the length of the stick-table update message computed upon receipt.
*/
static int peer_recv_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp,
char **msg_cur, char *msg_end, int msg_len, int totl)
{
struct stream_interface *si = appctx->owner;
struct shared_table *st = p->remote_table;
struct stksess *ts, *newts;
uint32_t update;
int expire;
unsigned int data_type;
void *data_ptr;
/* Here we have data message */
if (!st)
goto ignore_msg;
expire = MS_TO_TICKS(st->table->expire);
if (updt) {
if (msg_len < sizeof(update)) {
/* malformed message */
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
memcpy(&update, *msg_cur, sizeof(update));
*msg_cur += sizeof(update);
st->last_get = htonl(update);
}
else {
st->last_get++;
}
if (exp) {
size_t expire_sz = sizeof expire;
if (*msg_cur + expire_sz > msg_end) {
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
memcpy(&expire, *msg_cur, expire_sz);
*msg_cur += expire_sz;
expire = ntohl(expire);
}
newts = stksess_new(st->table, NULL);
if (!newts)
goto ignore_msg;
if (st->table->type == SMP_T_STR) {
unsigned int to_read, to_store;
to_read = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
/* malformed message */
stksess_free(st->table, newts);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
to_store = MIN(to_read, st->table->key_size - 1);
if (*msg_cur + to_store > msg_end) {
/* malformed message */
stksess_free(st->table, newts);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
memcpy(newts->key.key, *msg_cur, to_store);
newts->key.key[to_store] = 0;
*msg_cur += to_read;
}
else if (st->table->type == SMP_T_SINT) {
unsigned int netinteger;
if (*msg_cur + sizeof(netinteger) > msg_end) {
/* malformed message */
stksess_free(st->table, newts);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
memcpy(&netinteger, *msg_cur, sizeof(netinteger));
netinteger = ntohl(netinteger);
memcpy(newts->key.key, &netinteger, sizeof(netinteger));
*msg_cur += sizeof(netinteger);
}
else {
if (*msg_cur + st->table->key_size > msg_end) {
/* malformed message */
stksess_free(st->table, newts);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
memcpy(newts->key.key, *msg_cur, st->table->key_size);
*msg_cur += st->table->key_size;
}
/* lookup for existing entry */
ts = stktable_set_entry(st->table, newts);
if (ts != newts) {
stksess_free(st->table, newts);
newts = NULL;
}
HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
if (!((1 << data_type) & st->remote_data))
continue;
switch (stktable_data_types[data_type].std_type) {
case STD_T_SINT: {
int data;
data = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_sint) = data;
break;
}
case STD_T_UINT: {
unsigned int data;
data = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_uint) = data;
break;
}
case STD_T_ULL: {
unsigned long long data;
data = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_ull) = data;
break;
}
case STD_T_FRQP: {
struct freq_ctr_period data;
/* First bit is reserved for the freq_ctr_period lock
Note: here we're still protected by the stksess lock
so we don't need to update the update the freq_ctr_period
using its internal lock */
data.curr_tick = tick_add(now_ms, -intdecode(msg_cur, msg_end)) & ~0x1;
if (!*msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
data.curr_ctr = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
data.prev_ctr = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_frqp) = data;
break;
}
}
}
/* Force new expiration */
ts->expire = tick_add(now_ms, expire);
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
return 1;
ignore_msg:
/* skip consumed message */
co_skip(si_oc(si), totl);
return 0;
}
/*
* IO Handler to handle message exchance with a peer
*/
@ -1172,7 +1398,6 @@ switchstate:
/* fall through */
}
case PEER_SESS_ST_WAITMSG: {
struct stksess *ts, *newts = NULL;
uint32_t msg_len = 0;
char *msg_cur = trash.area;
char *msg_end = trash.area;
@ -1420,208 +1645,13 @@ switchstate:
|| msg_head[1] == PEER_MSG_STKT_INCUPDATE
|| msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED
|| msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
struct shared_table *st = curpeer->remote_table;
uint32_t update;
int expire;
unsigned int data_type;
void *data_ptr;
int update, expire;
/* Here we have data message */
if (!st)
goto ignore_msg;
expire = MS_TO_TICKS(st->table->expire);
if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED) {
if (msg_len < sizeof(update)) {
/* malformed message */
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
memcpy(&update, msg_cur, sizeof(update));
msg_cur += sizeof(update);
st->last_get = htonl(update);
}
else {
st->last_get++;
}
if (msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
size_t expire_sz = sizeof expire;
if (msg_cur + expire_sz > msg_end) {
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
memcpy(&expire, msg_cur, expire_sz);
msg_cur += expire_sz;
expire = ntohl(expire);
}
newts = stksess_new(st->table, NULL);
if (!newts)
goto ignore_msg;
if (st->table->type == SMP_T_STR) {
unsigned int to_read, to_store;
to_read = intdecode(&msg_cur, msg_end);
if (!msg_cur) {
/* malformed message */
stksess_free(st->table, newts);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
to_store = MIN(to_read, st->table->key_size - 1);
if (msg_cur + to_store > msg_end) {
/* malformed message */
stksess_free(st->table, newts);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
memcpy(newts->key.key, msg_cur, to_store);
newts->key.key[to_store] = 0;
msg_cur += to_read;
}
else if (st->table->type == SMP_T_SINT) {
unsigned int netinteger;
if (msg_cur + sizeof(netinteger) > msg_end) {
/* malformed message */
stksess_free(st->table, newts);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
memcpy(&netinteger, msg_cur, sizeof(netinteger));
netinteger = ntohl(netinteger);
memcpy(newts->key.key, &netinteger, sizeof(netinteger));
msg_cur += sizeof(netinteger);
}
else {
if (msg_cur + st->table->key_size > msg_end) {
/* malformed message */
stksess_free(st->table, newts);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
memcpy(newts->key.key, msg_cur, st->table->key_size);
msg_cur += st->table->key_size;
}
/* lookup for existing entry */
ts = stktable_set_entry(st->table, newts);
if (ts != newts) {
stksess_free(st->table, newts);
newts = NULL;
}
HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
if ((1 << data_type) & st->remote_data) {
switch (stktable_data_types[data_type].std_type) {
case STD_T_SINT: {
int data;
data = intdecode(&msg_cur, msg_end);
if (!msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_sint) = data;
break;
}
case STD_T_UINT: {
unsigned int data;
data = intdecode(&msg_cur, msg_end);
if (!msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_uint) = data;
break;
}
case STD_T_ULL: {
unsigned long long data;
data = intdecode(&msg_cur, msg_end);
if (!msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_ull) = data;
break;
}
case STD_T_FRQP: {
struct freq_ctr_period data;
/* First bit is reserved for the freq_ctr_period lock
Note: here we're still protected by the stksess lock
so we don't need to update the update the freq_ctr_period
using its internal lock */
data.curr_tick = tick_add(now_ms, -intdecode(&msg_cur, msg_end)) & ~0x1;
if (!msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
data.curr_ctr = intdecode(&msg_cur, msg_end);
if (!msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
data.prev_ctr = intdecode(&msg_cur, msg_end);
if (!msg_cur) {
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
goto switchstate;
}
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_frqp) = data;
break;
}
}
}
}
/* Force new expiration */
ts->expire = tick_add(now_ms, expire);
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
if (!peer_recv_updatemsg(appctx, curpeer, update, expire,
&msg_cur, msg_end, msg_len, totl))
goto switchstate;
}
else if (msg_head[1] == PEER_MSG_STKT_ACK) {