MEDIUM: thread/vars: Make vars thread-safe
A RW lock has been added to the vars structure to protect each list of variables. And a global RW lock is used to protect registered names. When a varibable is fetched, we duplicate sample data because the variable could be modified by another thread.
This commit is contained in:
parent
94b712337d
commit
e95f2c3ef5
|
@ -163,6 +163,7 @@ enum lock_label {
|
||||||
PATREF_LOCK,
|
PATREF_LOCK,
|
||||||
PATEXP_LOCK,
|
PATEXP_LOCK,
|
||||||
PATLRU_LOCK,
|
PATLRU_LOCK,
|
||||||
|
VARS_LOCK,
|
||||||
LOCK_LABELS
|
LOCK_LABELS
|
||||||
};
|
};
|
||||||
struct lock_stat {
|
struct lock_stat {
|
||||||
|
@ -250,7 +251,7 @@ static inline void show_lock_stats()
|
||||||
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
|
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
|
||||||
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
|
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
|
||||||
"APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS",
|
"APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS",
|
||||||
"PATREF", "PATEXP", "PATLRU" };
|
"PATREF", "PATEXP", "PATLRU", "VARS" };
|
||||||
int lbl;
|
int lbl;
|
||||||
|
|
||||||
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
|
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
#define _TYPES_VARS_H
|
#define _TYPES_VARS_H
|
||||||
|
|
||||||
#include <common/mini-clist.h>
|
#include <common/mini-clist.h>
|
||||||
|
#include <common/hathreads.h>
|
||||||
|
|
||||||
#include <types/sample.h>
|
#include <types/sample.h>
|
||||||
|
|
||||||
|
@ -17,6 +18,9 @@ struct vars {
|
||||||
struct list head;
|
struct list head;
|
||||||
enum vars_scope scope;
|
enum vars_scope scope;
|
||||||
unsigned int size;
|
unsigned int size;
|
||||||
|
#ifdef USE_THREAD
|
||||||
|
HA_RWLOCK_T rwlock;
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
/* This struct describes a variable. */
|
/* This struct describes a variable. */
|
||||||
|
|
96
src/vars.c
96
src/vars.c
|
@ -31,6 +31,11 @@ static unsigned int var_sess_limit = 0;
|
||||||
static unsigned int var_txn_limit = 0;
|
static unsigned int var_txn_limit = 0;
|
||||||
static unsigned int var_reqres_limit = 0;
|
static unsigned int var_reqres_limit = 0;
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef USE_THREAD
|
||||||
|
HA_RWLOCK_T var_names_rwlock;
|
||||||
|
#endif
|
||||||
|
|
||||||
/* This function adds or remove memory size from the accounting. The inner
|
/* This function adds or remove memory size from the accounting. The inner
|
||||||
* pointers may be null when setting the outer ones only.
|
* pointers may be null when setting the outer ones only.
|
||||||
*/
|
*/
|
||||||
|
@ -39,17 +44,17 @@ static void var_accounting_diff(struct vars *vars, struct session *sess, struct
|
||||||
switch (vars->scope) {
|
switch (vars->scope) {
|
||||||
case SCOPE_REQ:
|
case SCOPE_REQ:
|
||||||
case SCOPE_RES:
|
case SCOPE_RES:
|
||||||
strm->vars_reqres.size += size;
|
HA_ATOMIC_ADD(&strm->vars_reqres.size, size);
|
||||||
/* fall through */
|
/* fall through */
|
||||||
case SCOPE_TXN:
|
case SCOPE_TXN:
|
||||||
strm->vars_txn.size += size;
|
HA_ATOMIC_ADD(&strm->vars_txn.size, size);
|
||||||
/* fall through */
|
/* fall through */
|
||||||
case SCOPE_SESS:
|
case SCOPE_SESS:
|
||||||
sess->vars.size += size;
|
HA_ATOMIC_ADD(&sess->vars.size, size);
|
||||||
/* fall through */
|
/* fall through */
|
||||||
case SCOPE_PROC:
|
case SCOPE_PROC:
|
||||||
global.vars.size += size;
|
HA_ATOMIC_ADD(&global.vars.size, size);
|
||||||
var_global_size += size;
|
HA_ATOMIC_ADD(&var_global_size, size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,9 +118,11 @@ void vars_prune(struct vars *vars, struct session *sess, struct stream *strm)
|
||||||
struct var *var, *tmp;
|
struct var *var, *tmp;
|
||||||
unsigned int size = 0;
|
unsigned int size = 0;
|
||||||
|
|
||||||
|
RWLOCK_WRLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
list_for_each_entry_safe(var, tmp, &vars->head, l) {
|
list_for_each_entry_safe(var, tmp, &vars->head, l) {
|
||||||
size += var_clear(var);
|
size += var_clear(var);
|
||||||
}
|
}
|
||||||
|
RWLOCK_WRUNLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
var_accounting_diff(vars, sess, strm, -size);
|
var_accounting_diff(vars, sess, strm, -size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,12 +134,15 @@ void vars_prune_per_sess(struct vars *vars)
|
||||||
struct var *var, *tmp;
|
struct var *var, *tmp;
|
||||||
unsigned int size = 0;
|
unsigned int size = 0;
|
||||||
|
|
||||||
|
RWLOCK_WRLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
list_for_each_entry_safe(var, tmp, &vars->head, l) {
|
list_for_each_entry_safe(var, tmp, &vars->head, l) {
|
||||||
size += var_clear(var);
|
size += var_clear(var);
|
||||||
}
|
}
|
||||||
vars->size -= size;
|
RWLOCK_WRUNLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
global.vars.size -= size;
|
|
||||||
var_global_size -= size;
|
HA_ATOMIC_SUB(&vars->size, size);
|
||||||
|
HA_ATOMIC_SUB(&global.vars.size, size);
|
||||||
|
HA_ATOMIC_SUB(&var_global_size, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function init a list of variabes. */
|
/* This function init a list of variabes. */
|
||||||
|
@ -141,6 +151,7 @@ void vars_init(struct vars *vars, enum vars_scope scope)
|
||||||
LIST_INIT(&vars->head);
|
LIST_INIT(&vars->head);
|
||||||
vars->scope = scope;
|
vars->scope = scope;
|
||||||
vars->size = 0;
|
vars->size = 0;
|
||||||
|
RWLOCK_INIT(&vars->rwlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function declares a new variable name. It returns a pointer
|
/* This function declares a new variable name. It returns a pointer
|
||||||
|
@ -160,11 +171,13 @@ static char *register_name(const char *name, int len, enum vars_scope *scope,
|
||||||
int i;
|
int i;
|
||||||
char **var_names2;
|
char **var_names2;
|
||||||
const char *tmp;
|
const char *tmp;
|
||||||
|
char *res = NULL;
|
||||||
|
|
||||||
/* Check length. */
|
/* Check length. */
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
memprintf(err, "Empty variable name cannot be accepted");
|
memprintf(err, "Empty variable name cannot be accepted");
|
||||||
return NULL;
|
res = NULL;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check scope. */
|
/* Check scope. */
|
||||||
|
@ -196,29 +209,42 @@ static char *register_name(const char *name, int len, enum vars_scope *scope,
|
||||||
else {
|
else {
|
||||||
memprintf(err, "invalid variable name '%s'. A variable name must be start by its scope. "
|
memprintf(err, "invalid variable name '%s'. A variable name must be start by its scope. "
|
||||||
"The scope can be 'proc', 'sess', 'txn', 'req' or 'res'", name);
|
"The scope can be 'proc', 'sess', 'txn', 'req' or 'res'", name);
|
||||||
return NULL;
|
res = NULL;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (alloc)
|
||||||
|
RWLOCK_WRLOCK(VARS_LOCK, &var_names_rwlock);
|
||||||
|
else
|
||||||
|
RWLOCK_RDLOCK(VARS_LOCK, &var_names_rwlock);
|
||||||
|
|
||||||
|
|
||||||
/* Look for existing variable name. */
|
/* Look for existing variable name. */
|
||||||
for (i = 0; i < var_names_nb; i++)
|
for (i = 0; i < var_names_nb; i++)
|
||||||
if (strncmp(var_names[i], name, len) == 0 && var_names[i][len] == '\0')
|
if (strncmp(var_names[i], name, len) == 0 && var_names[i][len] == '\0') {
|
||||||
return var_names[i];
|
res = var_names[i];
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
if (!alloc)
|
if (!alloc) {
|
||||||
return NULL;
|
res = NULL;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
/* Store variable name. If realloc fails, var_names remains valid */
|
/* Store variable name. If realloc fails, var_names remains valid */
|
||||||
var_names2 = realloc(var_names, (var_names_nb + 1) * sizeof(*var_names));
|
var_names2 = realloc(var_names, (var_names_nb + 1) * sizeof(*var_names));
|
||||||
if (!var_names2) {
|
if (!var_names2) {
|
||||||
memprintf(err, "out of memory error");
|
memprintf(err, "out of memory error");
|
||||||
return NULL;
|
res = NULL;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
var_names_nb++;
|
var_names_nb++;
|
||||||
var_names = var_names2;
|
var_names = var_names2;
|
||||||
var_names[var_names_nb - 1] = malloc(len + 1);
|
var_names[var_names_nb - 1] = malloc(len + 1);
|
||||||
if (!var_names[var_names_nb - 1]) {
|
if (!var_names[var_names_nb - 1]) {
|
||||||
memprintf(err, "out of memory error");
|
memprintf(err, "out of memory error");
|
||||||
return NULL;
|
res = NULL;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
memcpy(var_names[var_names_nb - 1], name, len);
|
memcpy(var_names[var_names_nb - 1], name, len);
|
||||||
var_names[var_names_nb - 1][len] = '\0';
|
var_names[var_names_nb - 1][len] = '\0';
|
||||||
|
@ -228,13 +254,20 @@ static char *register_name(const char *name, int len, enum vars_scope *scope,
|
||||||
while (*tmp) {
|
while (*tmp) {
|
||||||
if (!isalnum((int)(unsigned char)*tmp) && *tmp != '_' && *tmp != '.') {
|
if (!isalnum((int)(unsigned char)*tmp) && *tmp != '_' && *tmp != '.') {
|
||||||
memprintf(err, "invalid syntax at char '%s'", tmp);
|
memprintf(err, "invalid syntax at char '%s'", tmp);
|
||||||
return NULL;
|
res = NULL;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
tmp++;
|
tmp++;
|
||||||
}
|
}
|
||||||
|
res = var_names[var_names_nb - 1];
|
||||||
|
|
||||||
/* Return the result. */
|
end:
|
||||||
return var_names[var_names_nb - 1];
|
if (alloc)
|
||||||
|
RWLOCK_WRUNLOCK(VARS_LOCK, &var_names_rwlock);
|
||||||
|
else
|
||||||
|
RWLOCK_RDUNLOCK(VARS_LOCK, &var_names_rwlock);
|
||||||
|
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function returns an existing variable or returns NULL. */
|
/* This function returns an existing variable or returns NULL. */
|
||||||
|
@ -278,15 +311,23 @@ static int smp_fetch_var(const struct arg *args, struct sample *smp, const char
|
||||||
}
|
}
|
||||||
if (vars->scope != var_desc->scope)
|
if (vars->scope != var_desc->scope)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
RWLOCK_RDLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
var = var_get(vars, var_desc->name);
|
var = var_get(vars, var_desc->name);
|
||||||
|
|
||||||
/* check for the variable avalaibility */
|
/* check for the variable avalaibility */
|
||||||
if (!var)
|
if (!var) {
|
||||||
|
RWLOCK_RDUNLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* Copy sample. */
|
/* Duplicate the sample data because it could modified by another
|
||||||
|
* thread */
|
||||||
smp->data = var->data;
|
smp->data = var->data;
|
||||||
|
smp_dup(smp);
|
||||||
smp->flags |= SMP_F_CONST;
|
smp->flags |= SMP_F_CONST;
|
||||||
|
|
||||||
|
RWLOCK_RDUNLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -384,6 +425,7 @@ static int sample_store(struct vars *vars, const char *name, struct sample *smp)
|
||||||
static inline int sample_store_stream(const char *name, enum vars_scope scope, struct sample *smp)
|
static inline int sample_store_stream(const char *name, enum vars_scope scope, struct sample *smp)
|
||||||
{
|
{
|
||||||
struct vars *vars;
|
struct vars *vars;
|
||||||
|
int ret;
|
||||||
|
|
||||||
switch (scope) {
|
switch (scope) {
|
||||||
case SCOPE_PROC: vars = &global.vars; break;
|
case SCOPE_PROC: vars = &global.vars; break;
|
||||||
|
@ -395,7 +437,11 @@ static inline int sample_store_stream(const char *name, enum vars_scope scope, s
|
||||||
}
|
}
|
||||||
if (vars->scope != scope)
|
if (vars->scope != scope)
|
||||||
return 0;
|
return 0;
|
||||||
return sample_store(vars, name, smp);
|
|
||||||
|
RWLOCK_WRLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
|
ret = sample_store(vars, name, smp);
|
||||||
|
RWLOCK_WRUNLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns 0 if fails, else returns 1. Note that stream may be null for SCOPE_SESS. */
|
/* Returns 0 if fails, else returns 1. Note that stream may be null for SCOPE_SESS. */
|
||||||
|
@ -417,11 +463,13 @@ static inline int sample_clear_stream(const char *name, enum vars_scope scope, s
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
/* Look for existing variable name. */
|
/* Look for existing variable name. */
|
||||||
|
RWLOCK_WRLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
var = var_get(vars, name);
|
var = var_get(vars, name);
|
||||||
if (var) {
|
if (var) {
|
||||||
size = var_clear(var);
|
size = var_clear(var);
|
||||||
var_accounting_diff(vars, smp->sess, smp->strm, -size);
|
var_accounting_diff(vars, smp->sess, smp->strm, -size);
|
||||||
}
|
}
|
||||||
|
RWLOCK_WRUNLOCK(VARS_LOCK, &vars->rwlock);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -853,7 +901,7 @@ static struct cfg_kw_list cfg_kws = {{ },{
|
||||||
}};
|
}};
|
||||||
|
|
||||||
__attribute__((constructor))
|
__attribute__((constructor))
|
||||||
static void __http_protocol_init(void)
|
static void __vars_init(void)
|
||||||
{
|
{
|
||||||
var_pool = create_pool("vars", sizeof(struct var), MEM_F_SHARED);
|
var_pool = create_pool("vars", sizeof(struct var), MEM_F_SHARED);
|
||||||
|
|
||||||
|
@ -865,4 +913,6 @@ static void __http_protocol_init(void)
|
||||||
http_req_keywords_register(&http_req_kws);
|
http_req_keywords_register(&http_req_kws);
|
||||||
http_res_keywords_register(&http_res_kws);
|
http_res_keywords_register(&http_res_kws);
|
||||||
cfg_register_keywords(&cfg_kws);
|
cfg_register_keywords(&cfg_kws);
|
||||||
|
|
||||||
|
RWLOCK_INIT(&var_names_rwlock);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue