MEDIUM: peers: synchronizaiton code factorization to reduce the size of the I/O handler.

Factorize the code responsible of synchronizing the peers upon startup.

May be backported as far as 1.5.
This commit is contained in:
Frdric Lcaille 2019-01-22 22:25:17 +01:00 committed by Willy Tarreau
parent 87f554c9fb
commit 6a8303d49e

View File

@ -667,6 +667,197 @@ static inline int peer_send_updatemsg(struct shared_table *st, struct appctx *ap
return peer_send_msg(st, appctx, peer_prepare_updatemsg, ts, updateid, use_identifier, use_timed);
}
/*
* Function used to lookup for recent stick-table updates associated with
* <st> shared stick-table when a lesson must be taught a peer (PEER_F_LEARN_ASSIGN flag set).
*/
static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
{
struct eb32_node *eb;
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
if (!eb) {
eb = eb32_first(&st->table->updates);
if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
st->table->commitupdate = st->last_pushed = st->table->localupdate;
return NULL;
}
}
if ((int)(eb->key - st->table->localupdate) > 0) {
st->table->commitupdate = st->last_pushed = st->table->localupdate;
return NULL;
}
return eb32_entry(eb, struct stksess, upd);
}
/*
* Function used to lookup for recent stick-table updates associated with
* <st> shared stick-table during teach state 1 step.
*/
static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
{
struct eb32_node *eb;
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
if (!eb) {
st->flags |= SHTABLE_F_TEACH_STAGE1;
eb = eb32_first(&st->table->updates);
if (eb)
st->last_pushed = eb->key - 1;
return NULL;
}
return eb32_entry(eb, struct stksess, upd);
}
/*
* Function used to lookup for recent stick-table updates associated with
* <st> shared stick-table during teach state 2 step.
*/
static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
{
struct eb32_node *eb;
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
if (!eb || eb->key > st->teaching_origin) {
st->flags |= SHTABLE_F_TEACH_STAGE2;
return NULL;
}
return eb32_entry(eb, struct stksess, upd);
}
/*
* Generic function to emit update messages for <st> stick-table when a lesson must
* be taught to the peer <p>.
* <locked> must be set to 1 if the shared table <st> is already locked when entering
* this function, 0 if not.
*
* This function temporary unlock/lock <st> when it sends stick-table updates or
* when decrementing its refcount in case of any error when it sends this updates.
*
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
* Returns -1 if there was not enough room left to send the message,
* any other negative returned value must be considered as an error with an appcxt st0
* returned value equal to PEER_SESS_ST_END.
* If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
* unlocked if not already locked when entering this function.
*/
static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
struct stksess *(*peer_stksess_lookup)(struct shared_table *),
struct shared_table *st, int locked)
{
int ret, new_pushed, use_timed;
ret = 1;
use_timed = 0;
if (st != p->last_local_table) {
ret = peer_send_switchmsg(st, appctx);
if (ret <= 0)
return ret;
p->last_local_table = st;
}
if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
use_timed = !(p->flags & PEER_F_DWNGRD);
/* We force new pushed to 1 to force identifier in update message */
new_pushed = 1;
if (!locked)
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
while (1) {
struct stksess *ts;
unsigned updateid;
/* push local updates */
ts = peer_stksess_lookup(st);
if (!ts)
break;
updateid = ts->upd.key;
ts->ref_cnt++;
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
ret = peer_send_updatemsg(st, appctx, ts, updateid, new_pushed, use_timed);
if (ret <= 0) {
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
ts->ref_cnt--;
if (!locked)
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
return ret;
}
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
ts->ref_cnt--;
st->last_pushed = updateid;
if (peer_stksess_lookup == peer_teach_process_stksess_lookup &&
(int)(st->last_pushed - st->table->commitupdate) > 0)
st->table->commitupdate = st->last_pushed;
/* identifier may not needed in next update message */
new_pushed = 0;
}
out:
if (!locked)
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
return 1;
}
/*
* Function to emit update messages for <st> stick-table when a lesson must
* be taught to the peer <p> (PEER_F_LEARN_ASSIGN flag set).
*
* Note that <st> shared stick-table is locked when calling this function.
*
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
* Returns -1 if there was not enough room left to send the message,
* any other negative returned value must be considered as an error with an appcxt st0
* returned value equal to PEER_SESS_ST_END.
*/
static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
struct shared_table *st)
{
return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st, 1);
}
/*
* Function to emit update messages for <st> stick-table when a lesson must
* be taught to the peer <p> during teach state 1 step.
*
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
* Returns -1 if there was not enough room left to send the message,
* any other negative returned value must be considered as an error with an appcxt st0
* returned value equal to PEER_SESS_ST_END.
*/
static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
struct shared_table *st)
{
return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st, 0);
}
/*
* Function to emit update messages for <st> stick-table when a lesson must
* be taught to the peer <p> during teach state 1 step.
*
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
* Returns -1 if there was not enough room left to send the message,
* any other negative returned value must be considered as an error with an appcxt st0
* returned value equal to PEER_SESS_ST_END.
*/
static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
struct shared_table *st)
{
return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st, 0);
}
/*
* IO Handler to handle message exchance with a peer
*/
@ -1530,182 +1721,34 @@ incomplete:
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
((int)(st->last_pushed - st->table->localupdate) < 0)) {
struct eb32_node *eb;
int new_pushed;
if (st != curpeer->last_local_table) {
repl = peer_send_switchmsg(st, appctx);
if (repl <= 0) {
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
if (repl == -1)
goto out;
goto switchstate;
}
curpeer->last_local_table = st;
}
/* We force new pushed to 1 to force identifier in update message */
new_pushed = 1;
while (1) {
struct stksess *ts;
unsigned updateid;
/* push local updates */
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
if (!eb) {
eb = eb32_first(&st->table->updates);
if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
st->table->commitupdate = st->last_pushed = st->table->localupdate;
break;
}
}
if ((int)(eb->key - st->table->localupdate) > 0) {
st->table->commitupdate = st->last_pushed = st->table->localupdate;
break;
}
ts = eb32_entry(eb, struct stksess, upd);
updateid = ts->upd.key;
ts->ref_cnt++;
repl = peer_send_teach_process_msgs(appctx, curpeer, st);
if (repl <= 0) {
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
repl = peer_send_updatemsg(st, appctx, ts,
updateid, new_pushed, 0);
if (repl <= 0) {
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
ts->ref_cnt--;
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
if (repl == -1)
goto out;
goto switchstate;
}
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
ts->ref_cnt--;
st->last_pushed = updateid;
if ((int)(st->last_pushed - st->table->commitupdate) > 0)
st->table->commitupdate = st->last_pushed;
/* identifier may not needed in next update message */
new_pushed = 0;
if (repl == -1)
goto out;
goto switchstate;
}
}
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
else {
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
struct eb32_node *eb;
int new_pushed;
if (st != curpeer->last_local_table) {
repl = peer_send_switchmsg(st, appctx);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
curpeer->last_local_table = st;
repl = peer_send_teach_stage1_msgs(appctx, curpeer, st);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
/* We force new pushed to 1 to force identifier in update message */
new_pushed = 1;
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
while (1) {
struct stksess *ts;
int use_timed;
unsigned updateid;
/* push local updates */
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
if (!eb) {
st->flags |= SHTABLE_F_TEACH_STAGE1;
eb = eb32_first(&st->table->updates);
if (eb)
st->last_pushed = eb->key - 1;
break;
}
ts = eb32_entry(eb, struct stksess, upd);
updateid = ts->upd.key;
ts->ref_cnt++;
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
use_timed = !(curpeer->flags & PEER_F_DWNGRD);
repl = peer_send_updatemsg(st, appctx, ts,
updateid, new_pushed, use_timed);
if (repl <= 0) {
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
ts->ref_cnt--;
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
if (repl == -1)
goto out;
goto switchstate;
}
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
ts->ref_cnt--;
st->last_pushed = updateid;
/* identifier may not needed in next update message */
new_pushed = 0;
}
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
struct eb32_node *eb;
int new_pushed;
if (st != curpeer->last_local_table) {
repl = peer_send_switchmsg(st, appctx);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
curpeer->last_local_table = st;
repl = peer_send_teach_stage2_msgs(appctx, curpeer, st);
if (repl <= 0) {
if (repl == -1)
goto out;
goto switchstate;
}
/* We force new pushed to 1 to force identifier in update message */
new_pushed = 1;
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
while (1) {
struct stksess *ts;
int use_timed;
unsigned updateid;
/* push local updates */
eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
/* push local updates */
if (!eb || eb->key > st->teaching_origin) {
st->flags |= SHTABLE_F_TEACH_STAGE2;
break;
}
ts = eb32_entry(eb, struct stksess, upd);
updateid = ts->upd.key;
ts->ref_cnt++;
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
use_timed = !(curpeer->flags & PEER_F_DWNGRD);
repl = peer_send_updatemsg(st, appctx, ts,
updateid, new_pushed, use_timed);
if (repl <= 0) {
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
ts->ref_cnt--;
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
if (repl == -1)
goto out;
goto switchstate;
}
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
ts->ref_cnt--;
st->last_pushed = updateid;
/* identifier may not needed in next update message */
new_pushed = 0;
}
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
}