mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-03-05 02:49:01 +00:00
MEDIUM: threads/listeners: Make listeners thread-safe
First, we use atomic operations to update jobs/totalconn/actconn variables, listener's nbconn variable and listener's counters. Then we add a lock on listeners to protect access to their information. And finally, listener queues (global and per proxy) are also protected by a lock. Here, because access to these queues are unusal, we use the same lock for all queues instead of a global one for the global queue and a lock per proxy for others.
This commit is contained in:
parent
b79a94c9f3
commit
8d8aa0d681
@ -651,6 +651,7 @@ static inline void b_reset(struct buffer *buf)
|
||||
buf->o = 0;
|
||||
buf->i = 0;
|
||||
buf->p = buf->data;
|
||||
|
||||
}
|
||||
|
||||
/* Allocates a buffer and replaces *buf with this buffer. If no memory is
|
||||
|
@ -145,6 +145,8 @@ enum lock_label {
|
||||
TASK_RQ_LOCK,
|
||||
TASK_WQ_LOCK,
|
||||
POOL_LOCK,
|
||||
LISTENER_LOCK,
|
||||
LISTENER_QUEUE_LOCK,
|
||||
SIGNALS_LOCK,
|
||||
LOCK_LABELS
|
||||
};
|
||||
@ -230,7 +232,7 @@ static inline void show_lock_stats()
|
||||
{
|
||||
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
|
||||
"TASK_RQ", "TASK_WQ", "POOL",
|
||||
"SIGNALS" };
|
||||
"LISTENER", "LISTENER_QUEUE", "SIGNALS" };
|
||||
int lbl;
|
||||
|
||||
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
|
||||
|
@ -112,7 +112,7 @@ static void inline proxy_inc_fe_conn_ctr(struct listener *l, struct proxy *fe)
|
||||
{
|
||||
fe->fe_counters.cum_conn++;
|
||||
if (l->counters)
|
||||
l->counters->cum_conn++;
|
||||
HA_ATOMIC_ADD(&l->counters->cum_conn, 1);
|
||||
|
||||
update_freq_ctr(&fe->fe_conn_per_sec, 1);
|
||||
if (fe->fe_conn_per_sec.curr_ctr > fe->fe_counters.cps_max)
|
||||
@ -124,7 +124,7 @@ static void inline proxy_inc_fe_sess_ctr(struct listener *l, struct proxy *fe)
|
||||
{
|
||||
fe->fe_counters.cum_sess++;
|
||||
if (l->counters)
|
||||
l->counters->cum_sess++;
|
||||
HA_ATOMIC_ADD(&l->counters->cum_sess, 1);
|
||||
update_freq_ctr(&fe->fe_sess_per_sec, 1);
|
||||
if (fe->fe_sess_per_sec.curr_ctr > fe->fe_counters.sps_max)
|
||||
fe->fe_counters.sps_max = fe->fe_sess_per_sec.curr_ctr;
|
||||
|
@ -32,6 +32,8 @@
|
||||
|
||||
#include <common/config.h>
|
||||
#include <common/mini-clist.h>
|
||||
#include <common/hathreads.h>
|
||||
|
||||
#include <types/obj_type.h>
|
||||
#include <eb32tree.h>
|
||||
|
||||
@ -198,6 +200,10 @@ struct listener {
|
||||
int tcp_ut; /* for TCP, user timeout */
|
||||
char *interface; /* interface name or NULL */
|
||||
|
||||
#ifdef USE_THREAD
|
||||
HA_SPINLOCK_T lock;
|
||||
#endif
|
||||
|
||||
const struct netns_entry *netns; /* network namespace of the listener*/
|
||||
|
||||
struct list by_fe; /* chaining in frontend's list of listeners */
|
||||
|
179
src/listener.c
179
src/listener.c
@ -38,6 +38,11 @@
|
||||
#include <proto/stream.h>
|
||||
#include <proto/task.h>
|
||||
|
||||
#ifdef USE_THREAD
|
||||
/* listner_queue lock (same for global and per proxy queues) */
|
||||
static HA_SPINLOCK_T lq_lock;
|
||||
#endif
|
||||
|
||||
/* List head of all known bind keywords */
|
||||
static struct bind_kw_list bind_keywords = {
|
||||
.list = LIST_HEAD_INIT(bind_keywords.list)
|
||||
@ -53,6 +58,7 @@ struct xfer_sock_list *xfer_sock_list = NULL;
|
||||
*/
|
||||
static void enable_listener(struct listener *listener)
|
||||
{
|
||||
SPIN_LOCK(LISTENER_LOCK, &listener->lock);
|
||||
if (listener->state == LI_LISTEN) {
|
||||
if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
|
||||
listener->bind_conf->bind_proc &&
|
||||
@ -75,6 +81,7 @@ static void enable_listener(struct listener *listener)
|
||||
listener->state = LI_FULL;
|
||||
}
|
||||
}
|
||||
SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
|
||||
}
|
||||
|
||||
/* This function removes the specified listener's file descriptor from the
|
||||
@ -83,13 +90,19 @@ static void enable_listener(struct listener *listener)
|
||||
*/
|
||||
static void disable_listener(struct listener *listener)
|
||||
{
|
||||
SPIN_LOCK(LISTENER_LOCK, &listener->lock);
|
||||
if (listener->state < LI_READY)
|
||||
return;
|
||||
goto end;
|
||||
if (listener->state == LI_READY)
|
||||
fd_stop_recv(listener->fd);
|
||||
if (listener->state == LI_LIMITED)
|
||||
if (listener->state == LI_LIMITED) {
|
||||
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
LIST_DEL(&listener->wait_queue);
|
||||
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
}
|
||||
listener->state = LI_LISTEN;
|
||||
end:
|
||||
SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
|
||||
}
|
||||
|
||||
/* This function tries to temporarily disable a listener, depending on the OS
|
||||
@ -101,8 +114,12 @@ static void disable_listener(struct listener *listener)
|
||||
*/
|
||||
int pause_listener(struct listener *l)
|
||||
{
|
||||
int ret = 1;
|
||||
|
||||
SPIN_LOCK(LISTENER_LOCK, &l->lock);
|
||||
|
||||
if (l->state <= LI_ZOMBIE)
|
||||
return 1;
|
||||
goto end;
|
||||
|
||||
if (l->proto->pause) {
|
||||
/* Returns < 0 in case of failure, 0 if the listener
|
||||
@ -110,18 +127,25 @@ int pause_listener(struct listener *l)
|
||||
*/
|
||||
int ret = l->proto->pause(l);
|
||||
|
||||
if (ret < 0)
|
||||
return 0;
|
||||
if (ret < 0) {
|
||||
ret = 0;
|
||||
goto end;
|
||||
}
|
||||
else if (ret == 0)
|
||||
return 1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (l->state == LI_LIMITED)
|
||||
if (l->state == LI_LIMITED) {
|
||||
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
LIST_DEL(&l->wait_queue);
|
||||
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
}
|
||||
|
||||
fd_stop_recv(l->fd);
|
||||
l->state = LI_PAUSED;
|
||||
return 1;
|
||||
end:
|
||||
SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* This function tries to resume a temporarily disabled listener. Paused, full,
|
||||
@ -134,12 +158,16 @@ int pause_listener(struct listener *l)
|
||||
* stopped it. If the resume fails, 0 is returned and an error might be
|
||||
* displayed.
|
||||
*/
|
||||
int resume_listener(struct listener *l)
|
||||
static int __resume_listener(struct listener *l)
|
||||
{
|
||||
int ret = 1;
|
||||
|
||||
SPIN_LOCK(LISTENER_LOCK, &l->lock);
|
||||
|
||||
if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
|
||||
l->bind_conf->bind_proc &&
|
||||
!(l->bind_conf->bind_proc & (1UL << (relative_pid - 1))))
|
||||
return 1;
|
||||
goto end;
|
||||
|
||||
if (l->state == LI_ASSIGNED) {
|
||||
char msg[100];
|
||||
@ -151,42 +179,66 @@ int resume_listener(struct listener *l)
|
||||
else if (err & ERR_WARN)
|
||||
Warning("Resuming listener: %s\n", msg);
|
||||
|
||||
if (err & (ERR_FATAL | ERR_ABORT))
|
||||
return 0;
|
||||
if (err & (ERR_FATAL | ERR_ABORT)) {
|
||||
ret = 0;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
if (l->state < LI_PAUSED || l->state == LI_ZOMBIE)
|
||||
return 0;
|
||||
if (l->state < LI_PAUSED || l->state == LI_ZOMBIE) {
|
||||
ret = 0;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (l->proto->sock_prot == IPPROTO_TCP &&
|
||||
l->state == LI_PAUSED &&
|
||||
listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0)
|
||||
return 0;
|
||||
listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0) {
|
||||
ret = 0;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (l->state == LI_READY)
|
||||
return 1;
|
||||
goto end;
|
||||
|
||||
if (l->state == LI_LIMITED)
|
||||
LIST_DEL(&l->wait_queue);
|
||||
|
||||
if (l->nbconn >= l->maxconn) {
|
||||
l->state = LI_FULL;
|
||||
return 1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
fd_want_recv(l->fd);
|
||||
l->state = LI_READY;
|
||||
return 1;
|
||||
end:
|
||||
SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int resume_listener(struct listener *l)
|
||||
{
|
||||
int ret;
|
||||
|
||||
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
ret = __resume_listener(l);
|
||||
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Marks a ready listener as full so that the stream code tries to re-enable
|
||||
* it upon next close() using resume_listener().
|
||||
*
|
||||
* Note: this function is only called from listener_accept so <l> is already
|
||||
* locked.
|
||||
*/
|
||||
static void listener_full(struct listener *l)
|
||||
{
|
||||
if (l->state >= LI_READY) {
|
||||
if (l->state == LI_LIMITED)
|
||||
if (l->state == LI_LIMITED) {
|
||||
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
LIST_DEL(&l->wait_queue);
|
||||
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
}
|
||||
|
||||
fd_stop_recv(l->fd);
|
||||
l->state = LI_FULL;
|
||||
@ -195,11 +247,16 @@ static void listener_full(struct listener *l)
|
||||
|
||||
/* Marks a ready listener as limited so that we only try to re-enable it when
|
||||
* resources are free again. It will be queued into the specified queue.
|
||||
*
|
||||
* Note: this function is only called from listener_accept so <l> is already
|
||||
* locked.
|
||||
*/
|
||||
static void limit_listener(struct listener *l, struct list *list)
|
||||
{
|
||||
if (l->state == LI_READY) {
|
||||
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
LIST_ADDQ(list, &l->wait_queue);
|
||||
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
fd_stop_recv(l->fd);
|
||||
l->state = LI_LIMITED;
|
||||
}
|
||||
@ -239,22 +296,28 @@ void dequeue_all_listeners(struct list *list)
|
||||
{
|
||||
struct listener *listener, *l_back;
|
||||
|
||||
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
list_for_each_entry_safe(listener, l_back, list, wait_queue) {
|
||||
/* This cannot fail because the listeners are by definition in
|
||||
* the LI_LIMITED state. The function also removes the entry
|
||||
* from the queue.
|
||||
*/
|
||||
resume_listener(listener);
|
||||
__resume_listener(listener);
|
||||
}
|
||||
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
}
|
||||
|
||||
static int do_unbind_listener(struct listener *listener, int do_close)
|
||||
{
|
||||
SPIN_LOCK(LISTENER_LOCK, &listener->lock);
|
||||
if (listener->state == LI_READY)
|
||||
fd_stop_recv(listener->fd);
|
||||
|
||||
if (listener->state == LI_LIMITED)
|
||||
if (listener->state == LI_LIMITED) {
|
||||
SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
LIST_DEL(&listener->wait_queue);
|
||||
SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
|
||||
}
|
||||
|
||||
if (listener->state >= LI_PAUSED) {
|
||||
if (do_close) {
|
||||
@ -265,6 +328,7 @@ static int do_unbind_listener(struct listener *listener, int do_close)
|
||||
fd_remove(listener->fd);
|
||||
listener->state = LI_ASSIGNED;
|
||||
}
|
||||
SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
|
||||
return ERR_NONE;
|
||||
}
|
||||
|
||||
@ -335,8 +399,9 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss,
|
||||
|
||||
proto->add(l, port);
|
||||
|
||||
jobs++;
|
||||
listeners++;
|
||||
SPIN_INIT(&l->lock);
|
||||
HA_ATOMIC_ADD(&jobs, 1);
|
||||
HA_ATOMIC_ADD(&listeners, 1);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
@ -351,11 +416,14 @@ void delete_listener(struct listener *listener)
|
||||
{
|
||||
if (listener->state != LI_ASSIGNED)
|
||||
return;
|
||||
|
||||
SPIN_LOCK(LISTENER_LOCK, &listener->lock);
|
||||
listener->state = LI_INIT;
|
||||
LIST_DEL(&listener->proto_list);
|
||||
listener->proto->nb_listeners--;
|
||||
listeners--;
|
||||
jobs--;
|
||||
HA_ATOMIC_SUB(&jobs, 1);
|
||||
HA_ATOMIC_SUB(&listeners, 1);
|
||||
SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
|
||||
}
|
||||
|
||||
/* This function is called on a read event from a listening socket, corresponding
|
||||
@ -374,9 +442,12 @@ void listener_accept(int fd)
|
||||
static int accept4_broken;
|
||||
#endif
|
||||
|
||||
if (SPIN_TRYLOCK(LISTENER_LOCK, &l->lock))
|
||||
return;
|
||||
|
||||
if (unlikely(l->nbconn >= l->maxconn)) {
|
||||
listener_full(l);
|
||||
return;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) {
|
||||
@ -425,7 +496,7 @@ void listener_accept(int fd)
|
||||
/* frontend accept rate limit was reached */
|
||||
limit_listener(l, &p->listener_queue);
|
||||
task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0)));
|
||||
return;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (max_accept > max)
|
||||
@ -440,16 +511,17 @@ void listener_accept(int fd)
|
||||
while (max_accept--) {
|
||||
struct sockaddr_storage addr;
|
||||
socklen_t laddr = sizeof(addr);
|
||||
unsigned int count;
|
||||
|
||||
if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
return;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (unlikely(p && p->feconn >= p->maxconn)) {
|
||||
limit_listener(l, &p->listener_queue);
|
||||
return;
|
||||
goto end;
|
||||
}
|
||||
|
||||
#ifdef USE_ACCEPT4
|
||||
@ -477,7 +549,7 @@ void listener_accept(int fd)
|
||||
goto transient_error;
|
||||
}
|
||||
fd_cant_recv(fd);
|
||||
return; /* nothing more to accept */
|
||||
goto end; /* nothing more to accept */
|
||||
case EINVAL:
|
||||
/* might be trying to accept on a shut fd (eg: soft stop) */
|
||||
goto transient_error;
|
||||
@ -516,23 +588,19 @@ void listener_accept(int fd)
|
||||
close(cfd);
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
|
||||
return;
|
||||
goto end;
|
||||
}
|
||||
|
||||
/* increase the per-process number of cumulated connections */
|
||||
if (!(l->options & LI_O_UNLIMITED)) {
|
||||
update_freq_ctr(&global.conn_per_sec, 1);
|
||||
if (global.conn_per_sec.curr_ctr > global.cps_max)
|
||||
global.cps_max = global.conn_per_sec.curr_ctr;
|
||||
actconn++;
|
||||
count = update_freq_ctr(&global.conn_per_sec, 1);
|
||||
HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
|
||||
HA_ATOMIC_ADD(&actconn, 1);
|
||||
}
|
||||
|
||||
l->nbconn++;
|
||||
|
||||
if (l->counters) {
|
||||
if (l->nbconn > l->counters->conn_max)
|
||||
l->counters->conn_max = l->nbconn;
|
||||
}
|
||||
count = HA_ATOMIC_ADD(&l->nbconn, 1);
|
||||
if (l->counters)
|
||||
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, count);
|
||||
|
||||
ret = l->accept(l, cfd, &addr);
|
||||
if (unlikely(ret <= 0)) {
|
||||
@ -542,8 +610,8 @@ void listener_accept(int fd)
|
||||
* listener (ret < 0).
|
||||
*/
|
||||
if (!(l->options & LI_O_UNLIMITED))
|
||||
actconn--;
|
||||
l->nbconn--;
|
||||
HA_ATOMIC_SUB(&actconn, 1);
|
||||
HA_ATOMIC_SUB(&l->nbconn, 1);
|
||||
if (ret == 0) /* successful termination */
|
||||
continue;
|
||||
|
||||
@ -552,21 +620,18 @@ void listener_accept(int fd)
|
||||
|
||||
if (l->nbconn >= l->maxconn) {
|
||||
listener_full(l);
|
||||
return;
|
||||
goto end;
|
||||
}
|
||||
|
||||
/* increase the per-process number of cumulated connections */
|
||||
if (!(l->options & LI_O_UNLIMITED)) {
|
||||
update_freq_ctr(&global.sess_per_sec, 1);
|
||||
if (global.sess_per_sec.curr_ctr > global.sps_max)
|
||||
global.sps_max = global.sess_per_sec.curr_ctr;
|
||||
count = update_freq_ctr(&global.sess_per_sec, 1);
|
||||
HA_ATOMIC_UPDATE_MAX(&global.sps_max, count);
|
||||
}
|
||||
#ifdef USE_OPENSSL
|
||||
if (!(l->options & LI_O_UNLIMITED) && l->bind_conf && l->bind_conf->is_ssl) {
|
||||
|
||||
update_freq_ctr(&global.ssl_per_sec, 1);
|
||||
if (global.ssl_per_sec.curr_ctr > global.ssl_max)
|
||||
global.ssl_max = global.ssl_per_sec.curr_ctr;
|
||||
count = update_freq_ctr(&global.ssl_per_sec, 1);
|
||||
HA_ATOMIC_UPDATE_MAX(&global.ssl_max, count);
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -575,7 +640,7 @@ void listener_accept(int fd)
|
||||
/* we've exhausted max_accept, so there is no need to poll again */
|
||||
stop:
|
||||
fd_done_recv(fd);
|
||||
return;
|
||||
goto end;
|
||||
|
||||
transient_error:
|
||||
/* pause the listener and try again in 100 ms */
|
||||
@ -584,7 +649,8 @@ void listener_accept(int fd)
|
||||
wait_expire:
|
||||
limit_listener(l, &global_listener_queue);
|
||||
task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire));
|
||||
return;
|
||||
end:
|
||||
SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
|
||||
}
|
||||
|
||||
/* Notify the listener that a connection initiated from it was released. This
|
||||
@ -596,8 +662,8 @@ void listener_release(struct listener *l)
|
||||
struct proxy *fe = l->bind_conf->frontend;
|
||||
|
||||
if (!(l->options & LI_O_UNLIMITED))
|
||||
actconn--;
|
||||
l->nbconn--;
|
||||
HA_ATOMIC_SUB(&actconn, 1);
|
||||
HA_ATOMIC_SUB(&l->nbconn, 1);
|
||||
if (l->state == LI_FULL)
|
||||
resume_listener(l);
|
||||
|
||||
@ -946,6 +1012,7 @@ static void __listener_init(void)
|
||||
sample_register_fetches(&smp_kws);
|
||||
acl_register_keywords(&acl_kws);
|
||||
bind_register_keywords(&bind_kws);
|
||||
SPIN_INIT(&lq_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include <common/config.h>
|
||||
#include <common/time.h>
|
||||
#include <common/standard.h>
|
||||
#include <common/hathreads.h>
|
||||
|
||||
#include <types/global.h>
|
||||
#include <types/listener.h>
|
||||
@ -1974,7 +1975,7 @@ static struct task *process_peer_sync(struct task * task)
|
||||
/* We've just recieved the signal */
|
||||
if (!(peers->flags & PEERS_F_DONOTSTOP)) {
|
||||
/* add DO NOT STOP flag if not present */
|
||||
jobs++;
|
||||
HA_ATOMIC_ADD(&jobs, 1);
|
||||
peers->flags |= PEERS_F_DONOTSTOP;
|
||||
ps = peers->local;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
@ -1994,7 +1995,7 @@ static struct task *process_peer_sync(struct task * task)
|
||||
if (ps->flags & PEER_F_TEACH_COMPLETE) {
|
||||
if (peers->flags & PEERS_F_DONOTSTOP) {
|
||||
/* resync of new process was complete, current process can die now */
|
||||
jobs--;
|
||||
HA_ATOMIC_ADD(&jobs, 1);
|
||||
peers->flags &= ~PEERS_F_DONOTSTOP;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
st->table->syncing--;
|
||||
@ -2018,7 +2019,7 @@ static struct task *process_peer_sync(struct task * task)
|
||||
/* Other error cases */
|
||||
if (peers->flags & PEERS_F_DONOTSTOP) {
|
||||
/* unable to resync new process, current process can die now */
|
||||
jobs--;
|
||||
HA_ATOMIC_SUB(&jobs, 1);
|
||||
peers->flags &= ~PEERS_F_DONOTSTOP;
|
||||
for (st = ps->tables; st ; st = st->next)
|
||||
st->table->syncing--;
|
||||
|
@ -1744,7 +1744,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
|
||||
proxy_inc_fe_req_ctr(sess->fe);
|
||||
sess->fe->fe_counters.failed_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
|
||||
|
||||
if (!(s->flags & SF_FINST_MASK))
|
||||
s->flags |= SF_FINST_R;
|
||||
@ -1777,7 +1777,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
|
||||
proxy_inc_fe_req_ctr(sess->fe);
|
||||
sess->fe->fe_counters.failed_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
|
||||
|
||||
if (!(s->flags & SF_FINST_MASK))
|
||||
s->flags |= SF_FINST_R;
|
||||
@ -1807,7 +1807,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
|
||||
proxy_inc_fe_req_ctr(sess->fe);
|
||||
sess->fe->fe_counters.failed_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
|
||||
|
||||
if (!(s->flags & SF_FINST_MASK))
|
||||
s->flags |= SF_FINST_R;
|
||||
@ -2177,7 +2177,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
|
||||
|
||||
sess->fe->fe_counters.failed_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
|
||||
|
||||
return_prx_cond:
|
||||
if (!(s->flags & SF_ERR_MASK))
|
||||
@ -3545,7 +3545,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
|
||||
if (sess->fe != s->be)
|
||||
s->be->be_counters.denied_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->denied_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
|
||||
goto done_without_exp;
|
||||
|
||||
deny: /* this request was blocked (denied) */
|
||||
@ -3564,7 +3564,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
|
||||
if (sess->fe != s->be)
|
||||
s->be->be_counters.denied_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->denied_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
|
||||
goto return_prx_cond;
|
||||
|
||||
return_bad_req:
|
||||
@ -3583,7 +3583,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
|
||||
|
||||
sess->fe->fe_counters.failed_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
|
||||
|
||||
return_prx_cond:
|
||||
if (!(s->flags & SF_ERR_MASK))
|
||||
@ -3921,7 +3921,7 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit)
|
||||
|
||||
sess->fe->fe_counters.failed_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
|
||||
|
||||
if (!(s->flags & SF_ERR_MASK))
|
||||
s->flags |= SF_ERR_PRXCOND;
|
||||
@ -4127,7 +4127,7 @@ int http_wait_for_request_body(struct stream *s, struct channel *req, int an_bit
|
||||
req->analysers &= AN_REQ_FLT_END;
|
||||
sess->fe->fe_counters.failed_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -4912,7 +4912,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
|
||||
return_bad_req: /* let's centralize all bad requests */
|
||||
sess->fe->fe_counters.failed_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
|
||||
|
||||
return_bad_req_stats_ok:
|
||||
txn->req.err_state = txn->req.msg_state;
|
||||
@ -5700,7 +5700,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
|
||||
s->be->be_counters.denied_resp++;
|
||||
sess->fe->fe_counters.denied_resp++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->denied_resp++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
|
||||
|
||||
goto return_srv_prx_502;
|
||||
}
|
||||
@ -5850,7 +5850,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
|
||||
s->be->be_counters.denied_resp++;
|
||||
sess->fe->fe_counters.denied_resp++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->denied_resp++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
|
||||
|
||||
Alert("Blocking cacheable cookie in response from instance %s, server %s.\n",
|
||||
s->be->id, objt_server(s->target) ? objt_server(s->target)->id : "<dispatch>");
|
||||
|
@ -1377,7 +1377,7 @@ static enum act_return tcp_exec_action_silent_drop(struct act_rule *rule, struct
|
||||
|
||||
sess->fe->fe_counters.denied_req++;
|
||||
if (sess->listener->counters)
|
||||
sess->listener->counters->denied_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
|
||||
|
||||
return ACT_RET_STOP;
|
||||
}
|
||||
|
@ -57,8 +57,8 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
|
||||
fe->fe_counters.conn_max = fe->feconn;
|
||||
if (li)
|
||||
proxy_inc_fe_conn_ctr(li, fe);
|
||||
totalconn++;
|
||||
jobs++;
|
||||
HA_ATOMIC_ADD(&totalconn, 1);
|
||||
HA_ATOMIC_ADD(&jobs, 1);
|
||||
}
|
||||
return sess;
|
||||
}
|
||||
@ -69,7 +69,7 @@ void session_free(struct session *sess)
|
||||
session_store_counters(sess);
|
||||
vars_prune_per_sess(&sess->vars);
|
||||
pool_free2(pool2_session, sess);
|
||||
jobs--;
|
||||
HA_ATOMIC_SUB(&jobs, 1);
|
||||
}
|
||||
|
||||
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
|
||||
|
@ -427,7 +427,7 @@ static void ssl_async_fd_free(int fd)
|
||||
/* Now we can safely call SSL_free, no more pending job in engines */
|
||||
SSL_free(ssl);
|
||||
sslconns--;
|
||||
jobs--;
|
||||
HA_ATOMIC_SUB(&jobs, 1);
|
||||
}
|
||||
/*
|
||||
* function used to manage a returned SSL_ERROR_WANT_ASYNC
|
||||
@ -5487,7 +5487,7 @@ static void ssl_sock_close(struct connection *conn) {
|
||||
fd_cant_recv(afd);
|
||||
}
|
||||
conn->xprt_ctx = NULL;
|
||||
jobs++;
|
||||
HA_ATOMIC_ADD(&jobs, 1);
|
||||
return;
|
||||
}
|
||||
/* Else we can remove the fds from the fdtab
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include <common/buffer.h>
|
||||
#include <common/debug.h>
|
||||
#include <common/memory.h>
|
||||
#include <common/hathreads.h>
|
||||
|
||||
#include <types/applet.h>
|
||||
#include <types/capture.h>
|
||||
@ -477,7 +478,7 @@ void stream_process_counters(struct stream *s)
|
||||
objt_server(s->target)->counters.bytes_in += bytes;
|
||||
|
||||
if (sess->listener && sess->listener->counters)
|
||||
sess->listener->counters->bytes_in += bytes;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
|
||||
|
||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||
struct stkctr *stkctr = &s->stkctr[i];
|
||||
@ -514,7 +515,7 @@ void stream_process_counters(struct stream *s)
|
||||
objt_server(s->target)->counters.bytes_out += bytes;
|
||||
|
||||
if (sess->listener && sess->listener->counters)
|
||||
sess->listener->counters->bytes_out += bytes;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
|
||||
|
||||
for (i = 0; i < MAX_SESS_STKCTR; i++) {
|
||||
struct stkctr *stkctr = &s->stkctr[i];
|
||||
@ -986,7 +987,7 @@ static void sess_set_term_flags(struct stream *s)
|
||||
|
||||
strm_fe(s)->fe_counters.failed_req++;
|
||||
if (strm_li(s) && strm_li(s)->counters)
|
||||
strm_li(s)->counters->failed_req++;
|
||||
HA_ATOMIC_ADD(&strm_li(s)->counters->failed_req, 1);
|
||||
|
||||
s->flags |= SF_FINST_R;
|
||||
}
|
||||
|
@ -169,7 +169,7 @@ resume_execution:
|
||||
s->be->be_counters.denied_req++;
|
||||
sess->fe->fe_counters.denied_req++;
|
||||
if (sess->listener && sess->listener->counters)
|
||||
sess->listener->counters->denied_req++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
|
||||
|
||||
if (!(s->flags & SF_ERR_MASK))
|
||||
s->flags |= SF_ERR_PRXCOND;
|
||||
@ -347,7 +347,7 @@ resume_execution:
|
||||
s->be->be_counters.denied_resp++;
|
||||
sess->fe->fe_counters.denied_resp++;
|
||||
if (sess->listener && sess->listener->counters)
|
||||
sess->listener->counters->denied_resp++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
|
||||
|
||||
if (!(s->flags & SF_ERR_MASK))
|
||||
s->flags |= SF_ERR_PRXCOND;
|
||||
@ -429,7 +429,7 @@ int tcp_exec_l4_rules(struct session *sess)
|
||||
else if (rule->action == ACT_ACTION_DENY) {
|
||||
sess->fe->fe_counters.denied_conn++;
|
||||
if (sess->listener && sess->listener->counters)
|
||||
sess->listener->counters->denied_conn++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_conn, 1);
|
||||
|
||||
result = 0;
|
||||
break;
|
||||
@ -516,7 +516,7 @@ int tcp_exec_l5_rules(struct session *sess)
|
||||
else if (rule->action == ACT_ACTION_DENY) {
|
||||
sess->fe->fe_counters.denied_sess++;
|
||||
if (sess->listener && sess->listener->counters)
|
||||
sess->listener->counters->denied_sess++;
|
||||
HA_ATOMIC_ADD(&sess->listener->counters->denied_sess, 1);
|
||||
|
||||
result = 0;
|
||||
break;
|
||||
|
Loading…
Reference in New Issue
Block a user