REORG: peers: Rename all occurrences to 'ps' variable
In loops on the peer list in the code, the 'ps' variable was used as a shortcut for the peer session. However, if mays be confusing with the peers section too. So, all occurrences to 'ps' variable were renamed to 'peer'.
This commit is contained in:
parent
f1002ce715
commit
3cb0de27cb
162
src/peers.c
162
src/peers.c
|
@ -3291,7 +3291,6 @@ static void clear_peer_learning_status(struct peer *peer)
|
|||
|
||||
static void sync_peer_learn_state(struct peers *peers, struct peer *peer)
|
||||
{
|
||||
struct peer *ps;
|
||||
unsigned int flags = 0;
|
||||
|
||||
if (peer->learnstate != PEER_LR_ST_FINISHED)
|
||||
|
@ -3305,29 +3304,30 @@ static void sync_peer_learn_state(struct peers *peers, struct peer *peer)
|
|||
}
|
||||
else {
|
||||
/* Full resync */
|
||||
struct peer *rem_peer;
|
||||
int commit_a_finish = 1;
|
||||
|
||||
if (peer->srv->shard) {
|
||||
flags |= PEERS_F_DBG_RESYNC_REMOTEPARTIAL;
|
||||
peer->flags |= PEER_F_LEARN_NOTUP2DATE;
|
||||
for (ps = peers->remote; ps; ps = ps->next) {
|
||||
if (ps->srv->shard && ps != peer) {
|
||||
HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
|
||||
if (ps->srv->shard == peer->srv->shard) {
|
||||
for (rem_peer = peers->remote; rem_peer; rem_peer = rem_peer->next) {
|
||||
if (rem_peer->srv->shard && rem_peer != peer) {
|
||||
HA_SPIN_LOCK(PEER_LOCK, &rem_peer->lock);
|
||||
if (rem_peer->srv->shard == peer->srv->shard) {
|
||||
/* flag all peers from same shard
|
||||
* notup2date to disable request
|
||||
* of a resync frm them
|
||||
*/
|
||||
ps->flags |= PEER_F_LEARN_NOTUP2DATE;
|
||||
rem_peer->flags |= PEER_F_LEARN_NOTUP2DATE;
|
||||
}
|
||||
else if (!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
||||
else if (!(rem_peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
||||
/* it remains some other shards not requested
|
||||
* we don't commit a resync finish to request
|
||||
* the other shards
|
||||
*/
|
||||
commit_a_finish = 0;
|
||||
}
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &rem_peer->lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3396,7 +3396,7 @@ static void sync_peer_app_state(struct peers *peers, struct peer *peer)
|
|||
/* Process the sync task for a running process. It is called from process_peer_sync() only */
|
||||
static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
|
||||
{
|
||||
struct peer *ps;
|
||||
struct peer *peer;
|
||||
struct shared_table *st;
|
||||
|
||||
/* resync timeout set to TICK_ETERNITY means we just start
|
||||
|
@ -3424,84 +3424,84 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
|
|||
}
|
||||
|
||||
/* For each session */
|
||||
for (ps = peers->remote; ps; ps = ps->next) {
|
||||
HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
|
||||
for (peer = peers->remote; peer; peer = peer->next) {
|
||||
HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
|
||||
|
||||
sync_peer_learn_state(peers, ps);
|
||||
sync_peer_app_state(peers, ps);
|
||||
sync_peer_learn_state(peers, peer);
|
||||
sync_peer_app_state(peers, peer);
|
||||
|
||||
/* Peer changes, if any, were now ack by the sync task. Unblock
|
||||
* the peer (any wakeup should already be performed, no need to
|
||||
* do it here)
|
||||
*/
|
||||
ps->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
|
||||
peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
|
||||
|
||||
/* For each remote peers */
|
||||
if (!ps->local) {
|
||||
if (!ps->appctx) {
|
||||
if (!peer->local) {
|
||||
if (!peer->appctx) {
|
||||
/* no active peer connection */
|
||||
if (ps->statuscode == 0 ||
|
||||
((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
|
||||
tick_is_expired(ps->reconnect, now_ms))) {
|
||||
if (peer->statuscode == 0 ||
|
||||
((peer->statuscode == PEER_SESS_SC_CONNECTCODE ||
|
||||
peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
||||
peer->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
|
||||
tick_is_expired(peer->reconnect, now_ms))) {
|
||||
/* connection never tried
|
||||
* or previous peer connection established with success
|
||||
* or previous peer connection failed while connecting
|
||||
* and reconnection timer is expired */
|
||||
|
||||
/* retry a connect */
|
||||
ps->appctx = peer_session_create(peers, ps);
|
||||
peer->appctx = peer_session_create(peers, peer);
|
||||
}
|
||||
else if (!tick_is_expired(ps->reconnect, now_ms)) {
|
||||
else if (!tick_is_expired(peer->reconnect, now_ms)) {
|
||||
/* If previous session failed during connection
|
||||
* but reconnection timer is not expired */
|
||||
|
||||
/* reschedule task for reconnect */
|
||||
task->expire = tick_first(task->expire, ps->reconnect);
|
||||
task->expire = tick_first(task->expire, peer->reconnect);
|
||||
}
|
||||
/* else do nothing */
|
||||
} /* !ps->appctx */
|
||||
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
|
||||
} /* !peer->appctx */
|
||||
else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
|
||||
/* current peer connection is active and established */
|
||||
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
|
||||
!(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
|
||||
!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
||||
!(peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
|
||||
/* Resync from a remote is needed
|
||||
* and no peer was assigned for lesson
|
||||
* and current peer may be up2date */
|
||||
|
||||
/* assign peer for the lesson */
|
||||
ps->learnstate = PEER_LR_ST_ASSIGNED;
|
||||
peer->learnstate = PEER_LR_ST_ASSIGNED;
|
||||
HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
|
||||
|
||||
/* wake up peer handler to handle a request of resync */
|
||||
appctx_wakeup(ps->appctx);
|
||||
appctx_wakeup(peer->appctx);
|
||||
}
|
||||
else {
|
||||
int update_to_push = 0;
|
||||
|
||||
/* Awake session if there is data to push */
|
||||
for (st = ps->tables; st ; st = st->next) {
|
||||
for (st = peer->tables; st ; st = st->next) {
|
||||
if (st->last_pushed != st->table->localupdate) {
|
||||
/* wake up the peer handler to push local updates */
|
||||
update_to_push = 1;
|
||||
/* There is no need to send a heartbeat message
|
||||
* when some updates must be pushed. The remote
|
||||
* peer will consider <ps> peer as alive when it will
|
||||
* peer will consider <peer> peer as alive when it will
|
||||
* receive these updates.
|
||||
*/
|
||||
ps->flags &= ~PEER_F_HEARTBEAT;
|
||||
peer->flags &= ~PEER_F_HEARTBEAT;
|
||||
/* Re-schedule another one later. */
|
||||
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
/* Refresh reconnect if necessary */
|
||||
if (tick_is_expired(ps->reconnect, now_ms))
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||
if (tick_is_expired(peer->reconnect, now_ms))
|
||||
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||
/* We are going to send updates, let's ensure we will
|
||||
* come back to send heartbeat messages or to reconnect.
|
||||
*/
|
||||
task->expire = tick_first(ps->reconnect, ps->heartbeat);
|
||||
appctx_wakeup(ps->appctx);
|
||||
task->expire = tick_first(peer->reconnect, peer->heartbeat);
|
||||
appctx_wakeup(peer->appctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -3509,35 +3509,35 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
|
|||
* and do not send heartbeat message either.
|
||||
*/
|
||||
if (!update_to_push) {
|
||||
if (tick_is_expired(ps->reconnect, now_ms)) {
|
||||
if (ps->flags & PEER_F_ALIVE) {
|
||||
if (tick_is_expired(peer->reconnect, now_ms)) {
|
||||
if (peer->flags & PEER_F_ALIVE) {
|
||||
/* This peer was alive during a 'reconnect' period.
|
||||
* Flag it as not alive again for the next period.
|
||||
*/
|
||||
ps->flags &= ~PEER_F_ALIVE;
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||
peer->flags &= ~PEER_F_ALIVE;
|
||||
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
|
||||
}
|
||||
else {
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||
ps->heartbeat = TICK_ETERNITY;
|
||||
peer_session_forceshutdown(ps);
|
||||
sync_peer_app_state(peers, ps);
|
||||
ps->no_hbt++;
|
||||
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||
peer->heartbeat = TICK_ETERNITY;
|
||||
peer_session_forceshutdown(peer);
|
||||
sync_peer_app_state(peers, peer);
|
||||
peer->no_hbt++;
|
||||
}
|
||||
}
|
||||
else if (tick_is_expired(ps->heartbeat, now_ms)) {
|
||||
ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
ps->flags |= PEER_F_HEARTBEAT;
|
||||
appctx_wakeup(ps->appctx);
|
||||
else if (tick_is_expired(peer->heartbeat, now_ms)) {
|
||||
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
|
||||
peer->flags |= PEER_F_HEARTBEAT;
|
||||
appctx_wakeup(peer->appctx);
|
||||
}
|
||||
task->expire = tick_first(ps->reconnect, ps->heartbeat);
|
||||
task->expire = tick_first(peer->reconnect, peer->heartbeat);
|
||||
}
|
||||
}
|
||||
/* else do nothing */
|
||||
} /* SUCCESSCODE */
|
||||
} /* !ps->peer->local */
|
||||
} /* !peer->peer->local */
|
||||
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
|
||||
} /* for */
|
||||
|
||||
/* Resync from remotes expired: consider resync is finished */
|
||||
|
@ -3563,36 +3563,36 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
|
|||
/* Process the sync task for a stopping process. It is called from process_peer_sync() only */
|
||||
static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state)
|
||||
{
|
||||
struct peer *ps;
|
||||
struct peer *peer;
|
||||
struct shared_table *st;
|
||||
static int dont_stop = 0;
|
||||
|
||||
/* For each peer */
|
||||
for (ps = peers->remote; ps; ps = ps->next) {
|
||||
HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
|
||||
for (peer = peers->remote; peer; peer = peer->next) {
|
||||
HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
|
||||
|
||||
sync_peer_learn_state(peers, ps);
|
||||
sync_peer_app_state(peers, ps);
|
||||
sync_peer_learn_state(peers, peer);
|
||||
sync_peer_app_state(peers, peer);
|
||||
|
||||
/* Peer changes, if any, were now ack by the sync task. Unblock
|
||||
* the peer (any wakeup should already be performed, no need to
|
||||
* do it here)
|
||||
*/
|
||||
ps->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
|
||||
peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
|
||||
|
||||
if ((state & TASK_WOKEN_SIGNAL) && !dont_stop) {
|
||||
/* we're killing a connection, we must apply a random delay before
|
||||
* retrying otherwise the other end will do the same and we can loop
|
||||
* for a while.
|
||||
*/
|
||||
ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||
if (ps->appctx) {
|
||||
peer_session_forceshutdown(ps);
|
||||
sync_peer_app_state(peers, ps);
|
||||
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
|
||||
if (peer->appctx) {
|
||||
peer_session_forceshutdown(peer);
|
||||
sync_peer_app_state(peers, peer);
|
||||
}
|
||||
}
|
||||
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
|
||||
}
|
||||
|
||||
/* We've just received the signal */
|
||||
|
@ -3608,18 +3608,18 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
|
|||
}
|
||||
}
|
||||
|
||||
ps = peers->local;
|
||||
HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
|
||||
if (ps->flags & PEER_F_LOCAL_TEACH_COMPLETE) {
|
||||
peer = peers->local;
|
||||
HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
|
||||
if (peer->flags & PEER_F_LOCAL_TEACH_COMPLETE) {
|
||||
if (dont_stop) {
|
||||
/* resync of new process was complete, current process can die now */
|
||||
_HA_ATOMIC_DEC(&jobs);
|
||||
dont_stop = 0;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
for (st = peer->tables; st ; st = st->next)
|
||||
HA_ATOMIC_DEC(&st->table->refcnt);
|
||||
}
|
||||
}
|
||||
else if (!ps->appctx) {
|
||||
else if (!peer->appctx) {
|
||||
/* Re-arm resync timeout if necessary */
|
||||
if (!tick_isset(peers->resync_timeout))
|
||||
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
|
||||
|
@ -3627,10 +3627,10 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
|
|||
/* If there's no active peer connection */
|
||||
if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
|
||||
!tick_is_expired(peers->resync_timeout, now_ms) &&
|
||||
(ps->statuscode == 0 ||
|
||||
ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
|
||||
ps->statuscode == PEER_SESS_SC_TRYAGAIN)) {
|
||||
(peer->statuscode == 0 ||
|
||||
peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
|
||||
peer->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
|
||||
peer->statuscode == PEER_SESS_SC_TRYAGAIN)) {
|
||||
/* The resync is finished for the local peer and
|
||||
* the resync timeout is not expired and
|
||||
* connection never tried
|
||||
|
@ -3638,14 +3638,14 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
|
|||
* or previous tcp connect succeeded but init state incomplete
|
||||
* or during previous connect, peer replies a try again statuscode */
|
||||
|
||||
if (!tick_is_expired(ps->reconnect, now_ms)) {
|
||||
if (!tick_is_expired(peer->reconnect, now_ms)) {
|
||||
/* reconnection timer is not expired. reschedule task for reconnect */
|
||||
task->expire = tick_first(task->expire, ps->reconnect);
|
||||
task->expire = tick_first(task->expire, peer->reconnect);
|
||||
}
|
||||
else {
|
||||
/* connect to the local peer if we must push a local sync */
|
||||
if (dont_stop) {
|
||||
peer_session_create(peers, ps);
|
||||
peer_session_create(peers, peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3655,25 +3655,25 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
|
|||
/* unable to resync new process, current process can die now */
|
||||
_HA_ATOMIC_DEC(&jobs);
|
||||
dont_stop = 0;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
for (st = peer->tables; st ; st = st->next)
|
||||
HA_ATOMIC_DEC(&st->table->refcnt);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
|
||||
else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
|
||||
/* Reset resync timeout during a resync */
|
||||
peers->resync_timeout = TICK_ETERNITY;
|
||||
|
||||
/* current peer connection is active and established
|
||||
* wake up all peer handlers to push remaining local updates */
|
||||
for (st = ps->tables; st ; st = st->next) {
|
||||
for (st = peer->tables; st ; st = st->next) {
|
||||
if (st->last_pushed != st->table->localupdate) {
|
||||
appctx_wakeup(ps->appctx);
|
||||
appctx_wakeup(peer->appctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
|
||||
HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
Loading…
Reference in New Issue