MEDIUM: listener: switch bind_thread from global to group-local

It requires to both adapt the parser and change the algorithm to
redispatch incoming traffic so that local threads IDs may always
be used.

The internal structures now only reference thread group IDs and
group-local masks which are compatible with those now used by the
FD layer and the rest of the code.
This commit is contained in:
Willy Tarreau 2022-06-28 08:30:43 +02:00
parent 6018c02c36
commit d0b73bca71
2 changed files with 23 additions and 18 deletions

View File

@ -2645,17 +2645,19 @@ int check_config_validity()
curproxy->id, err, bind_conf->arg, bind_conf->file, bind_conf->line);
free(err);
cfgerr++;
} else if (!((mask = bind_conf->bind_thread) & all_threads_mask)) {
} else if (!((mask = bind_conf->bind_thread) & ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled)) {
unsigned long new_mask = 0;
ulong thr_mask = ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled;
while (mask) {
new_mask |= mask & all_threads_mask;
mask >>= global.nbthread;
new_mask |= mask & thr_mask;
mask >>= ha_tgroup_info[bind_conf->bind_tgroup-1].count;
}
bind_conf->bind_thread = new_mask;
ha_warning("Proxy '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range defined by the global 'nbthread' directive. The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line, new_mask);
ha_warning("Proxy '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range supported by thread group %d (%d). The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line,
bind_conf->bind_tgroup, ha_tgroup_info[bind_conf->bind_tgroup-1].count, new_mask);
}
/* apply thread masks and groups to all receivers */
@ -4102,17 +4104,19 @@ int check_config_validity()
curpeers->peers_fe->id, err, bind_conf->arg, bind_conf->file, bind_conf->line);
free(err);
cfgerr++;
} else if (!((mask = bind_conf->bind_thread) & all_threads_mask)) {
} else if (!((mask = bind_conf->bind_thread) & ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled)) {
unsigned long new_mask = 0;
ulong thr_mask = ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled;
while (mask) {
new_mask |= mask & all_threads_mask;
mask >>= global.nbthread;
new_mask |= mask & thr_mask;
mask >>= ha_tgroup_info[bind_conf->bind_tgroup-1].count;
}
bind_conf->bind_thread = new_mask;
ha_warning("Peers section '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range defined by the global 'nbthread' directive. The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line, new_mask);
ha_warning("Peers section '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range supported by thread group %d (%d). The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line,
bind_conf->bind_tgroup, ha_tgroup_info[bind_conf->bind_tgroup-1].count, new_mask);
}
/* apply thread masks and groups to all receivers */

View File

@ -992,10 +992,11 @@ void listener_accept(struct listener *l)
if (l->rx.flags & RX_F_LOCAL_ACCEPT)
goto local_accept;
mask = l->rx.bind_thread & all_threads_mask;
mask = l->rx.bind_thread & tg->threads_enabled;
if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) {
struct accept_queue_ring *ring;
unsigned int t, t0, t1, t2;
int base = tg->base;
/* The principle is that we have two running indexes,
* each visiting in turn all threads bound to this
@ -1042,11 +1043,11 @@ void listener_accept(struct listener *l)
}
/* now we have two distinct thread IDs belonging to the mask */
q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
q1 = accept_queue_rings[base + t1].tail - accept_queue_rings[base + t1].head + ACCEPT_QUEUE_SIZE;
if (q1 >= ACCEPT_QUEUE_SIZE)
q1 -= ACCEPT_QUEUE_SIZE;
q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
q2 = accept_queue_rings[base + t2].tail - accept_queue_rings[base + t2].head + ACCEPT_QUEUE_SIZE;
if (q2 >= ACCEPT_QUEUE_SIZE)
q2 -= ACCEPT_QUEUE_SIZE;
@ -1062,8 +1063,8 @@ void listener_accept(struct listener *l)
* than t2.
*/
q1 += l->thr_conn[t1];
q2 += l->thr_conn[t2];
q1 += l->thr_conn[base + t1];
q2 += l->thr_conn[base + t2];
if (q1 - q2 < 0) {
t = t1;
@ -1092,16 +1093,16 @@ void listener_accept(struct listener *l)
* performing model, likely due to better cache locality
* when processing this loop.
*/
ring = &accept_queue_rings[t];
ring = &accept_queue_rings[base + t];
if (accept_queue_push_mp(ring, cli_conn)) {
_HA_ATOMIC_INC(&activity[t].accq_pushed);
_HA_ATOMIC_INC(&activity[base + t].accq_pushed);
tasklet_wakeup(ring->tasklet);
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
*/
_HA_ATOMIC_INC(&activity[t].accq_full);
_HA_ATOMIC_INC(&activity[base + t].accq_full);
}
#endif // USE_THREAD