diff --git a/src/cfgparse.c b/src/cfgparse.c index 8fb866b782..2bd498ac76 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -2645,17 +2645,19 @@ int check_config_validity() curproxy->id, err, bind_conf->arg, bind_conf->file, bind_conf->line); free(err); cfgerr++; - } else if (!((mask = bind_conf->bind_thread) & all_threads_mask)) { + } else if (!((mask = bind_conf->bind_thread) & ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled)) { unsigned long new_mask = 0; + ulong thr_mask = ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled; while (mask) { - new_mask |= mask & all_threads_mask; - mask >>= global.nbthread; + new_mask |= mask & thr_mask; + mask >>= ha_tgroup_info[bind_conf->bind_tgroup-1].count; } bind_conf->bind_thread = new_mask; - ha_warning("Proxy '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range defined by the global 'nbthread' directive. The thread numbers were remapped to existing threads instead (mask 0x%lx).\n", - curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line, new_mask); + ha_warning("Proxy '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range supported by thread group %d (%d). The thread numbers were remapped to existing threads instead (mask 0x%lx).\n", + curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line, + bind_conf->bind_tgroup, ha_tgroup_info[bind_conf->bind_tgroup-1].count, new_mask); } /* apply thread masks and groups to all receivers */ @@ -4102,17 +4104,19 @@ int check_config_validity() curpeers->peers_fe->id, err, bind_conf->arg, bind_conf->file, bind_conf->line); free(err); cfgerr++; - } else if (!((mask = bind_conf->bind_thread) & all_threads_mask)) { + } else if (!((mask = bind_conf->bind_thread) & ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled)) { unsigned long new_mask = 0; + ulong thr_mask = ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled; while (mask) { - new_mask |= mask & all_threads_mask; - mask >>= global.nbthread; + new_mask |= mask & thr_mask; + mask >>= ha_tgroup_info[bind_conf->bind_tgroup-1].count; } bind_conf->bind_thread = new_mask; - ha_warning("Peers section '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range defined by the global 'nbthread' directive. The thread numbers were remapped to existing threads instead (mask 0x%lx).\n", - curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line, new_mask); + ha_warning("Peers section '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range supported by thread group %d (%d). The thread numbers were remapped to existing threads instead (mask 0x%lx).\n", + curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line, + bind_conf->bind_tgroup, ha_tgroup_info[bind_conf->bind_tgroup-1].count, new_mask); } /* apply thread masks and groups to all receivers */ diff --git a/src/listener.c b/src/listener.c index 5b91faf3ea..6f8d4ad620 100644 --- a/src/listener.c +++ b/src/listener.c @@ -992,10 +992,11 @@ void listener_accept(struct listener *l) if (l->rx.flags & RX_F_LOCAL_ACCEPT) goto local_accept; - mask = l->rx.bind_thread & all_threads_mask; + mask = l->rx.bind_thread & tg->threads_enabled; if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) { struct accept_queue_ring *ring; unsigned int t, t0, t1, t2; + int base = tg->base; /* The principle is that we have two running indexes, * each visiting in turn all threads bound to this @@ -1042,11 +1043,11 @@ void listener_accept(struct listener *l) } /* now we have two distinct thread IDs belonging to the mask */ - q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE; + q1 = accept_queue_rings[base + t1].tail - accept_queue_rings[base + t1].head + ACCEPT_QUEUE_SIZE; if (q1 >= ACCEPT_QUEUE_SIZE) q1 -= ACCEPT_QUEUE_SIZE; - q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE; + q2 = accept_queue_rings[base + t2].tail - accept_queue_rings[base + t2].head + ACCEPT_QUEUE_SIZE; if (q2 >= ACCEPT_QUEUE_SIZE) q2 -= ACCEPT_QUEUE_SIZE; @@ -1062,8 +1063,8 @@ void listener_accept(struct listener *l) * than t2. */ - q1 += l->thr_conn[t1]; - q2 += l->thr_conn[t2]; + q1 += l->thr_conn[base + t1]; + q2 += l->thr_conn[base + t2]; if (q1 - q2 < 0) { t = t1; @@ -1092,16 +1093,16 @@ void listener_accept(struct listener *l) * performing model, likely due to better cache locality * when processing this loop. */ - ring = &accept_queue_rings[t]; + ring = &accept_queue_rings[base + t]; if (accept_queue_push_mp(ring, cli_conn)) { - _HA_ATOMIC_INC(&activity[t].accq_pushed); + _HA_ATOMIC_INC(&activity[base + t].accq_pushed); tasklet_wakeup(ring->tasklet); continue; } /* If the ring is full we do a synchronous accept on * the local thread here. */ - _HA_ATOMIC_INC(&activity[t].accq_full); + _HA_ATOMIC_INC(&activity[base + t].accq_full); } #endif // USE_THREAD