diff --git a/include/types/peers.h b/include/types/peers.h index 625321f639..105dffb04b 100644 --- a/include/types/peers.h +++ b/include/types/peers.h @@ -63,7 +63,6 @@ struct peer { unsigned int statuscode; /* current/last session status code */ unsigned int reconnect; /* next connect timer */ unsigned int confirm; /* confirm message counter */ - struct stream *stream; /* current transport stream */ struct appctx *appctx; /* the appctx running it */ struct shared_table *remote_table; struct shared_table *last_local_table; diff --git a/src/peers.c b/src/peers.c index 6b8f7f9ef3..1a80ab34a4 100644 --- a/src/peers.c +++ b/src/peers.c @@ -485,10 +485,9 @@ static void peer_session_release(struct appctx *appctx) /* peer session identified */ if (peer) { - if (peer->stream == s) { + if (peer->appctx == appctx) { /* Re-init current table pointers to force announcement on re-connect */ peer->remote_table = peer->last_local_table = NULL; - peer->stream = NULL; peer->appctx = NULL; if (peer->flags & PEER_F_LEARN_ASSIGN) { /* unassign current peer for learning */ @@ -662,7 +661,7 @@ static void peer_io_handler(struct appctx *appctx) goto switchstate; } - if (curpeer->stream && curpeer->stream != s) { + if (curpeer->appctx && curpeer->appctx != appctx) { if (curpeer->local) { /* Local connection, reply a retry */ appctx->st0 = PEER_SESS_ST_EXIT; @@ -679,7 +678,6 @@ static void peer_io_handler(struct appctx *appctx) curpeer->flags &= ~PEER_F_DWNGRD; } } - curpeer->stream = s; curpeer->appctx = appctx; appctx->ctx.peers.ptr = curpeer; appctx->st0 = PEER_SESS_ST_SENDSUCCESS; @@ -1748,7 +1746,7 @@ void peers_setup_frontend(struct proxy *fe) /* * Create a new peer session in assigned state (connect will start automatically) */ -static struct stream *peer_session_create(struct peers *peers, struct peer *peer) +static struct appctx *peer_session_create(struct peers *peers, struct peer *peer) { struct listener *l = LIST_NEXT(&peers->peers_fe->conf.listeners, struct listener *, by_fe); struct proxy *p = l->frontend; /* attached frontend */ @@ -1822,8 +1820,7 @@ static struct stream *peer_session_create(struct peers *peers, struct peer *peer totalconn++; peer->appctx = appctx; - peer->stream = s; - return s; + return appctx; /* Error unrolling */ out_free_strm: @@ -1882,20 +1879,20 @@ static struct task *process_peer_sync(struct task * task) for (ps = peers->remote; ps; ps = ps->next) { /* For each remote peers */ if (!ps->local) { - if (!ps->stream) { - /* no active stream */ + if (!ps->appctx) { + /* no active peer connection */ if (ps->statuscode == 0 || ((ps->statuscode == PEER_SESS_SC_CONNECTCODE || ps->statuscode == PEER_SESS_SC_SUCCESSCODE || ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) && tick_is_expired(ps->reconnect, now_ms))) { /* connection never tried - * or previous stream established with success - * or previous stream failed during connection + * or previous peer connection established with success + * or previous peer connection failed while connecting * and reconnection timer is expired */ /* retry a connect */ - ps->stream = peer_session_create(peers, ps); + ps->appctx = peer_session_create(peers, ps); } else if (!tick_is_expired(ps->reconnect, now_ms)) { /* If previous session failed during connection @@ -1905,9 +1902,9 @@ static struct task *process_peer_sync(struct task * task) task->expire = tick_first(task->expire, ps->reconnect); } /* else do nothing */ - } /* !ps->stream */ + } /* !ps->appctx */ else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) { - /* current stream is active and established */ + /* current peer connection is active and established */ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && !(peers->flags & PEERS_F_RESYNC_ASSIGN) && !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) { @@ -1919,14 +1916,14 @@ static struct task *process_peer_sync(struct task * task) ps->flags |= PEER_F_LEARN_ASSIGN; peers->flags |= PEERS_F_RESYNC_ASSIGN; - /* awake peer stream task to handle a request of resync */ + /* wake up peer handler to handle a request of resync */ appctx_wakeup(ps->appctx); } else { /* Awake session if there is data to push */ for (st = ps->tables; st ; st = st->next) { if ((int)(st->last_pushed - st->table->localupdate) < 0) { - /* awake peer stream task to push local updates */ + /* wake up the peer handler to push local updates */ appctx_wakeup(ps->appctx); break; } @@ -1970,9 +1967,8 @@ static struct task *process_peer_sync(struct task * task) /* disconnect all connected peers */ for (ps = peers->remote; ps; ps = ps->next) { - if (ps->stream) { + if (ps->appctx) { peer_session_forceshutdown(ps->appctx); - ps->stream = NULL; ps->appctx = NULL; } } @@ -1988,15 +1984,15 @@ static struct task *process_peer_sync(struct task * task) st->table->syncing--; } } - else if (!ps->stream) { - /* If stream is not active */ + else if (!ps->appctx) { + /* If there's no active peer connection */ if (ps->statuscode == 0 || ps->statuscode == PEER_SESS_SC_SUCCESSCODE || ps->statuscode == PEER_SESS_SC_CONNECTEDCODE || ps->statuscode == PEER_SESS_SC_TRYAGAIN) { /* connection never tried - * or previous stream was successfully established - * or previous stream tcp connect success but init state incomplete + * or previous peer connection was successfully established + * or previous tcp connect succeeded but init state incomplete * or during previous connect, peer replies a try again statuscode */ /* connect to the peer */ @@ -2014,11 +2010,10 @@ static struct task *process_peer_sync(struct task * task) } } else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) { - /* current stream active and established - awake stream to push remaining local updates */ + /* current peer connection is active and established + * wake up all peer handlers to push remaining local updates */ for (st = ps->tables; st ; st = st->next) { if ((int)(st->last_pushed - st->table->localupdate) < 0) { - /* awake peer stream task to push local updates */ appctx_wakeup(ps->appctx); break; }