MINOR: spoa-server: Prepare responses

This patch adds SPOP responses managament. It provides SPOP
encoding primitives. It also move the example function
ip_reputation to this new behavior.
This commit is contained in:
Thierry FOURNIER 2018-02-23 18:24:10 +01:00 committed by Willy Tarreau
parent 8b9a73bac0
commit fbd3824868
2 changed files with 202 additions and 40 deletions

View File

@ -156,26 +156,30 @@ static void
check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
{
char str[INET_ADDRSTRLEN];
unsigned int score;
if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
return;
w->ip_score = random() % 100;
score = random() % 100;
set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);
DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, score);
}
static void
check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
{
char str[INET6_ADDRSTRLEN];
unsigned int score;
if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
return;
w->ip_score = random() % 100;
score = random() % 100;
set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);
DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, score);
}
static int
@ -700,6 +704,159 @@ error:
return -1;
}
/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
* the number of written bytes otherwise. */
static void prepare_agentack(struct worker *w)
{
w->ack_len = 0;
/* Frame type */
w->ack[w->ack_len++] = SPOE_FRM_T_AGENT_ACK;
/* No flags for now */
memset(w->ack + w->ack_len, 0, 4); /* No flags */
w->ack_len += 4;
/* Set stream-id and frame-id for ACK frames */
w->ack_len += encode_spoe_varint(w->stream_id, w->ack + w->ack_len);
w->ack_len += encode_spoe_varint(w->frame_id, w->ack + w->ack_len);
}
static inline
int set_var_name(struct worker *w, const char *name, int name_len, unsigned char scope)
{
w->ack[w->ack_len++] = SPOE_ACT_T_SET_VAR; /* Action type */
w->ack[w->ack_len++] = 3; /* Number of args */
w->ack[w->ack_len++] = scope; /* Arg 1: the scope */
w->ack_len += encode_spoe_string(name, name_len, w->ack+w->ack_len); /* Arg 2: variable name */
return 1;
}
int set_var_null(struct worker *w,
const char *name, int name_len,
unsigned char scope)
{
if (!set_var_name(w, name, name_len, scope))
return 0;
w->ack[w->ack_len++] = SPOE_DATA_T_NULL;
return 1;
}
int set_var_bool(struct worker *w,
const char *name, int name_len,
unsigned char scope, bool value)
{
if (!set_var_name(w, name, name_len, scope))
return 0;
w->ack[w->ack_len++] = SPOE_DATA_T_BOOL | (!!value << 4);
return 1;
}
static inline
int set_var_int(struct worker *w,
const char *name, int name_len,
unsigned char scope, int type, uint64_t value)
{
if (!set_var_name(w, name, name_len, scope))
return 0;
w->ack[w->ack_len++] = SPOE_DATA_T_UINT32;
w->ack_len += encode_spoe_varint(value, w->ack+w->ack_len); /* Arg 3: variable value */
return 1;
}
int set_var_uint32(struct worker *w,
const char *name, int name_len,
unsigned char scope, uint32_t value)
{
return set_var_int(w, name, name_len, scope, SPOE_DATA_T_UINT32, value);
}
int set_var_int32(struct worker *w,
const char *name, int name_len,
unsigned char scope, int32_t value)
{
return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
}
int set_var_uint64(struct worker *w,
const char *name, int name_len,
unsigned char scope, uint64_t value)
{
return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
}
int set_var_int64(struct worker *w,
const char *name, int name_len,
unsigned char scope, int64_t value)
{
return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
}
int set_var_ipv4(struct worker *w,
const char *name, int name_len,
unsigned char scope,
struct in_addr *ipv4)
{
if (!set_var_name(w, name, name_len, scope))
return 0;
w->ack[w->ack_len++] = SPOE_DATA_T_IPV4;
memcpy(w->ack+w->ack_len, ipv4, 4);
w->ack_len += 4;
return 1;
}
int set_var_ipv6(struct worker *w,
const char *name, int name_len,
unsigned char scope,
struct in6_addr *ipv6)
{
if (!set_var_name(w, name, name_len, scope))
return 0;
w->ack[w->ack_len++] = SPOE_DATA_T_IPV6;
memcpy(w->ack+w->ack_len, ipv6, 16);
w->ack_len += 16;
return 1;
}
static inline
int set_var_buf(struct worker *w,
const char *name, int name_len,
unsigned char scope, int type,
const char *str, int str_len)
{
if (!set_var_name(w, name, name_len, scope))
return 0;
w->ack[w->ack_len++] = type;
w->ack_len += encode_spoe_string(str, str_len, w->ack+w->ack_len);
return 1;
}
int set_var_string(struct worker *w,
const char *name, int name_len,
unsigned char scope,
const char *str, int strlen)
{
return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_STR, str, strlen);
}
int set_var_bin(struct worker *w,
const char *name, int name_len,
unsigned char scope,
const char *str, int strlen)
{
return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_BIN, str, strlen);
}
/* This function is a little bit ugly,
* TODO: improve the response without copying the bufer
*/
static int commit_agentack(struct worker *w)
{
memcpy(w->buf, w->ack, w->ack_len);
w->len = w->ack_len;
return 1;
}
/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
* occurred, 0 if the frame must be skipped, otherwise the number of read
* bytes. */
@ -737,6 +894,9 @@ handle_hanotify(struct worker *w)
DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
w->stream_id, w->frame_id);
/* Prepara ack, if the processing fails tha ack will be cancelled */
prepare_agentack(w);
/* Loop on messages */
while (idx < w->len) {
char *str;
@ -840,39 +1000,6 @@ prepare_agenthello(struct worker *w)
return idx;
}
/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
* the number of written bytes otherwise. */
static int
prepare_agentack(struct worker *w)
{
int idx = 0;
/* Frame type */
w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
/* No flags for now */
memset(w->buf+idx, 0, 4); /* No flags */
idx += 4;
/* Set stream-id and frame-id for ACK frames */
idx += encode_spoe_varint(w->stream_id, w->buf+idx);
idx += encode_spoe_varint(w->frame_id, w->buf+idx);
/* Data */
if (w->ip_score == -1)
goto out;
w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
w->buf[idx++] = 3; /* Number of args */
w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
w->buf[idx++] = SPOE_DATA_T_UINT32;
idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
out:
w->len = idx;
return idx;
}
/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
* occurred, the number of written bytes otherwise. */
static int
@ -957,7 +1084,7 @@ notify_ack_roundtip(int sock, struct worker *w)
LOG("Failed to handle Haproxy NOTIFY frame");
goto error_or_quit;
}
if (prepare_agentack(w) < 0) {
if (commit_agentack(w) < 0) {
LOG("Failed to prepare Agent ACK frame");
goto error_or_quit;
}
@ -1022,7 +1149,6 @@ spoa_worker(void *data)
if (w.healthcheck == true)
goto close;
while (1) {
w.ip_score = -1;
if (notify_ack_roundtip(csock, &w) < 0)
break;
}

View File

@ -55,7 +55,8 @@ struct worker {
unsigned int stream_id;
unsigned int frame_id;
bool healthcheck;
int ip_score; /* -1 if unset, else between 0 and 100 */
char ack[MAX_FRAME_SIZE];
unsigned int ack_len;
};
struct chunk {
@ -106,6 +107,41 @@ extern pthread_key_t worker_id;
void ps_register(struct ps *ps);
void ps_register_message(struct ps *ps, const char *name, void *ref);
int set_var_null(struct worker *w,
const char *name, int name_len,
unsigned char scope);
int set_var_bool(struct worker *w,
const char *name, int name_len,
unsigned char scope, bool value);
int set_var_uint32(struct worker *w,
const char *name, int name_len,
unsigned char scope, uint32_t value);
int set_var_int32(struct worker *w,
const char *name, int name_len,
unsigned char scope, int32_t value);
int set_var_uint64(struct worker *w,
const char *name, int name_len,
unsigned char scope, uint64_t value);
int set_var_int64(struct worker *w,
const char *name, int name_len,
unsigned char scope, int64_t value);
int set_var_ipv4(struct worker *w,
const char *name, int name_len,
unsigned char scope,
struct in_addr *ipv4);
int set_var_ipv6(struct worker *w,
const char *name, int name_len,
unsigned char scope,
struct in6_addr *ipv6);
int set_var_string(struct worker *w,
const char *name, int name_len,
unsigned char scope,
const char *str, int strlen);
int set_var_bin(struct worker *w,
const char *name, int name_len,
unsigned char scope,
const char *str, int strlen);
#define LOG(fmt, args...) \
do { \
struct timeval now; \