From f95b111ddeba3be9c7e7f7e0e79961140ccabba3 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Wed, 21 Dec 2016 08:58:16 +0100 Subject: [PATCH] MINOR: spoe: Add support for pipelining/async capabilities in the SPOA example Now, we can use the option '-c' to enable the support of a capability. By default, all capabilities are disabled. For example: $> ./spoa -c async -c pipelining In addition, it is also possible to set the maximum frame size supported by your agent (-m) and to add a delay in frames processing (-t). --- contrib/spoa_example/Makefile | 8 +- contrib/spoa_example/spoa.c | 1912 ++++++++++++++++++++++++--------- 2 files changed, 1384 insertions(+), 536 deletions(-) diff --git a/contrib/spoa_example/Makefile b/contrib/spoa_example/Makefile index e6b7c534f0..aee6139c8a 100644 --- a/contrib/spoa_example/Makefile +++ b/contrib/spoa_example/Makefile @@ -6,13 +6,15 @@ CC = gcc LD = $(CC) CFLAGS = -g -O2 -Wall -Werror -pthread -LDFLAGS = -lpthread +LDFLAGS = -lpthread -levent -levent_pthreads +INCS += -I../../include +LIBS = OBJS = spoa.o spoa: $(OBJS) - $(LD) $(LDFLAGS) -o $@ $^ + $(LD) $(LDFLAGS) $(LIBS) -o $@ $^ install: spoa install spoa $(DESTDIR)$(BINDIR) @@ -21,4 +23,4 @@ clean: rm -f spoa $(OBJS) %.o: %.c - $(CC) $(CFLAGS) -c -o $@ $< + $(CC) $(CFLAGS) $(INCS) -c -o $@ $< diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c index ce59c04a08..5c3a4538e2 100644 --- a/contrib/spoa_example/spoa.c +++ b/contrib/spoa_example/spoa.c @@ -14,39 +14,45 @@ * 2 of the License, or (at your option) any later version. * */ -#include +#include #include #include #include -#include +#include +#include #include -#include -#include -#include -#include #include -#include -#include +#include +#include +#include -#define DEFAULT_PORT 12345 -#define NUM_WORKERS 5 -#define MAX_FRAME_SIZE 16384 -#define SPOP_VERSION "1.0" -#define SPOA_CAPABILITIES "" +#include + +#include +#include +#include +#include + +#include + +#define DEFAULT_PORT 12345 +#define CONNECTION_BACKLOG 10 +#define NUM_WORKERS 10 +#define MAX_FRAME_SIZE 16384 +#define SPOP_VERSION "1.0" #define SLEN(str) (sizeof(str)-1) -#define LOG(fmt, args...) \ - do { \ - struct timeval now; \ - int wid = *((int*)pthread_getspecific(worker_id)); \ - \ - gettimeofday(&now, NULL); \ - fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n", \ - now.tv_sec, now.tv_usec, wid, ##args); \ +#define LOG(worker, fmt, args...) \ + do { \ + struct timeval now; \ + \ + gettimeofday(&now, NULL); \ + fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n", \ + now.tv_sec, now.tv_usec, (worker)->id, ##args); \ } while (0) -#define DEBUG(x...) \ +#define DEBUG(x...) \ do { \ if (debug) \ LOG(x); \ @@ -113,6 +119,17 @@ enum spoe_vars_scope { SPOE_SCOPE_RES, /* <=> SCOPE_RES */ }; +enum spoa_state { + SPOA_ST_CONNECTING = 0, + SPOA_ST_PROCESSING, + SPOA_ST_DISCONNECTING, +}; + +enum spoa_frame_type { + SPOA_FRM_T_UNKNOWN = 0, + SPOA_FRM_T_HAPROXY, + SPOA_FRM_T_AGENT, +}; /* Masks to get data type or flags value */ #define SPOE_DATA_T_MASK 0x0F @@ -121,35 +138,82 @@ enum spoe_vars_scope { /* Flags to set Boolean values */ #define SPOE_DATA_FL_FALSE 0x00 #define SPOE_DATA_FL_TRUE 0x10 -static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = { - [SPOE_FRM_ERR_NONE] = "normal", - [SPOE_FRM_ERR_IO] = "I/O error", - [SPOE_FRM_ERR_TOUT] = "a timeout occurred", - [SPOE_FRM_ERR_TOO_BIG] = "frame is too big", - [SPOE_FRM_ERR_INVALID] = "invalid frame received", - [SPOE_FRM_ERR_NO_VSN] = "version value not found", - [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found", - [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found", - [SPOE_FRM_ERR_BAD_VSN] = "unsupported version", - [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small", - [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred", + +struct spoe_engine { + char *id; + + struct list processing_frames; + struct list outgoing_frames; + + struct list clients; + struct list list; +}; + +struct spoe_frame { + enum spoa_frame_type type; + char *buf; + unsigned int offset; + unsigned int len; + + unsigned int stream_id; + unsigned int frame_id; + bool hcheck; /* true is the CONNECT frame is a healthcheck */ + int ip_score; /* -1 if unset, else between 0 and 100 */ + + struct event process_frame_event; + struct worker *worker; + struct spoe_engine *engine; + struct client *client; + struct list list; + + char data[0]; +}; + +struct client { + int fd; + unsigned long id; + enum spoa_state state; + + struct event read_frame_event; + struct event write_frame_event; + + struct spoe_frame *incoming_frame; + struct spoe_frame *outgoing_frame; + + struct list processing_frames; + struct list outgoing_frames; + + unsigned int max_frame_size; + int status_code; + + char *engine_id; + struct spoe_engine *engine; + bool pipelining; + bool async; + + struct worker *worker; + struct list by_worker; + struct list by_engine; }; struct worker { - unsigned int id; - char buf[MAX_FRAME_SIZE]; - unsigned int len; - unsigned int size; - int status_code; - unsigned int stream_id; - unsigned int frame_id; - bool healthcheck; - int ip_score; /* -1 if unset, else between 0 and 100 */ + pthread_t thread; + int id; + struct event_base *base; + struct event *monitor_event; + + struct list engines; + + unsigned int nbclients; + struct list clients; + + struct list frames; }; + struct chunk { - char *str; /* beginning of the string itself. Might not be 0-terminated */ - int len; /* current size of the string from first to last char */ + char *str; /* beginning of the string itself. Might not be 0-terminated */ + int len; /* current size of the string from first to last char */ }; union spoe_value { @@ -169,139 +233,73 @@ struct spoe_data { union spoe_value u; /* spoe data value */ }; -static bool debug = false; -static pthread_key_t worker_id; +/* Globals */ +static struct worker *workers = NULL; +static struct worker null_worker = { .id = 0 }; +static unsigned long clicount = 0; +static int server_port = DEFAULT_PORT; +static int num_workers = NUM_WORKERS; +static unsigned int max_frame_size = MAX_FRAME_SIZE; +struct timeval processing_delay = {0, 0}; +static bool debug = false; +static bool pipelining = false; +static bool async = false; + + +static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = { + [SPOE_FRM_ERR_NONE] = "normal", + [SPOE_FRM_ERR_IO] = "I/O error", + [SPOE_FRM_ERR_TOUT] = "a timeout occurred", + [SPOE_FRM_ERR_TOO_BIG] = "frame is too big", + [SPOE_FRM_ERR_INVALID] = "invalid frame received", + [SPOE_FRM_ERR_NO_VSN] = "version value not found", + [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found", + [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found", + [SPOE_FRM_ERR_BAD_VSN] = "unsupported version", + [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small", + [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred", +}; + +static void signal_cb(evutil_socket_t, short, void *); +static void accept_cb(evutil_socket_t, short, void *); +static void worker_monitor_cb(evutil_socket_t, short, void *); +static void process_frame_cb(evutil_socket_t, short, void *); +static void read_frame_cb(evutil_socket_t, short, void *); +static void write_frame_cb(evutil_socket_t, short, void *); + +static void use_spoe_engine(struct client *); +static void unuse_spoe_engine(struct client *); +static void release_frame(struct spoe_frame *); +static void release_client(struct client *); static void -check_ipv4_reputation(struct worker *w, struct in_addr *ipv4) +check_ipv4_reputation(struct spoe_frame *frame, struct in_addr *ipv4) { char str[INET_ADDRSTRLEN]; if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL) return; - w->ip_score = random() % 100; + frame->ip_score = random() % 100; - DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score); + DEBUG(frame->worker, "IP score for %.*s is %d", + INET_ADDRSTRLEN, str, frame->ip_score); } static void -check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6) +check_ipv6_reputation(struct spoe_frame *frame, struct in6_addr *ipv6) { char str[INET6_ADDRSTRLEN]; if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL) return; - w->ip_score = random() % 100; + frame->ip_score = random() % 100; - DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score); + DEBUG(frame->worker, "IP score for %.*s is %d", + INET6_ADDRSTRLEN, str, frame->ip_score); } -static int -do_read(int sock, void *buf, int read_len) -{ - fd_set readfds; - int n = 0, total = 0, bytesleft = read_len; - - FD_ZERO(&readfds); - FD_SET(sock, &readfds); - - while (total < read_len) { - if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1) - return -1; - if (!FD_ISSET(sock, &readfds)) - return -1; - - n = read(sock, buf + total, bytesleft); - if (n <= 0) - break; - - total += n; - bytesleft -= n; - } - - return (n == -1) ? -1 : total; -} - -static int -do_write(int sock, void *buf, int write_len) -{ - fd_set writefds; - int n = 0, total = 0, bytesleft = write_len; - - FD_ZERO(&writefds); - FD_SET(sock, &writefds); - - while (total < write_len) { - if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1) - return -1; - if (!FD_ISSET(sock, &writefds)) - return -1; - - n = write(sock, buf + total, bytesleft); - if (n <= 0) - break; - - total += n; - bytesleft -= n; - } - - return (n == -1) ? -1 : total; -} - -/* Receive a frame sent by HAProxy. It returns -1 if an error occurred, - * otherwise the number of read bytes.*/ -static int -read_frame(int sock, struct worker *w) -{ - uint32_t netint; - unsigned int framesz; - - /* Read the frame size, on 4 bytes */ - if (do_read(sock, &netint, sizeof(netint)) != 4) { - w->status_code = SPOE_FRM_ERR_IO; - return -1; - } - - /* Check it against the max size */ - framesz = ntohl(netint); - if (framesz > w->size) { - w->status_code = SPOE_FRM_ERR_TOO_BIG; - return -1; - } - - /* Read the frame */ - if (do_read(sock, w->buf, framesz) != framesz) { - w->status_code = SPOE_FRM_ERR_IO; - return -1; - } - - w->len = framesz; - return framesz; -} - -/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the - * number of written bytes. */ -static int -write_frame(int sock, struct worker *w) -{ - uint32_t netint; - - /* Write the frame size, on 4 bytes */ - netint = htonl(w->len); - if (do_write(sock, &netint, sizeof(netint)) != 4) { - w->status_code = SPOE_FRM_ERR_IO; - return -1; - } - - /* Write the frame */ - if (do_write(sock, w->buf, w->len) != w->len) { - w->status_code = SPOE_FRM_ERR_IO; - return -1; - } - return w->len; -} /* Encode a variable-length integer. This function never fails and returns the * number of written bytes. */ @@ -391,7 +389,7 @@ decode_spoe_string(char *buf, char *end, char **str, uint64_t *len) *str = buf+idx; return (idx + *len); -error: + error: return -1; } @@ -530,21 +528,20 @@ decode_spoe_data(char *frame, char *end, struct spoe_data *data) /* Check the protocol version. It returns -1 if an error occurred, the number of * read bytes otherwise. */ static int -check_proto_version(struct worker *w, int idx) +check_proto_version(struct spoe_frame *frame, int idx) { - char *str; - uint64_t sz; + char *str; + uint64_t sz; /* Get the list of all supported versions by HAProxy */ - if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { - w->status_code = SPOE_FRM_ERR_INVALID; + if ((frame->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) return -1; - } - idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz); - if (str == NULL) { - w->status_code = SPOE_FRM_ERR_INVALID; + idx += decode_spoe_string(frame->buf+idx, frame->buf+frame->len, &str, &sz); + if (str == NULL) return -1; - } + + DEBUG(frame->worker, "<%lu> Supported versions : %.*s", + frame->client->id, (int)sz, str); /* TODO: Find the right verion in supported ones */ @@ -554,29 +551,28 @@ check_proto_version(struct worker *w, int idx) /* Check max frame size value. It returns -1 if an error occurred, the number of * read bytes otherwise. */ static int -check_max_frame_size(struct worker *w, int idx) +check_max_frame_size(struct spoe_frame *frame, int idx) { uint64_t sz; int type, i; /* Get the max-frame-size value of HAProxy */ - type = w->buf[idx++]; + type = frame->buf[idx++]; if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 && (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 && (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 && - (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) { - w->status_code = SPOE_FRM_ERR_INVALID; + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) return -1; - } - if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; + if ((i = decode_spoe_varint(frame->buf+idx, frame->buf+frame->len, &sz)) == -1) return -1; - } idx += i; /* Keep the lower value */ - if (sz < w->size) - w->size = sz; + if (sz < frame->client->max_frame_size) + frame->client->max_frame_size = sz; + + DEBUG(frame->worker, "<%lu> HAProxy maximum frame size : %u", + frame->client->id, (unsigned int)sz); return idx; } @@ -584,493 +580,1268 @@ check_max_frame_size(struct worker *w, int idx) /* Check healthcheck value. It returns -1 if an error occurred, the number of * read bytes otherwise. */ static int -check_healthcheck(struct worker *w, int idx) +check_healthcheck(struct spoe_frame *frame, int idx) { int type; - /* Get the "healthcheck" value of HAProxy */ - type = w->buf[idx++]; - if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) { - w->status_code = SPOE_FRM_ERR_INVALID; + /* Get the "healthcheck" value */ + type = frame->buf[idx++]; + if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) return -1; - } - w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE); + frame->hcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE); + + DEBUG(frame->worker, "<%lu> HELLO healthcheck : %s", + frame->client->id, (frame->hcheck ? "true" : "false")); return idx; } +/* Check capabilities value. It returns -1 if an error occurred, the number of + * read bytes otherwise. */ +static int +check_capabilities(struct spoe_frame *frame, int idx) +{ + struct client *client = frame->client; + char *str; + uint64_t sz; + int i; + + if ((frame->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) + return -1; + idx += decode_spoe_string(frame->buf+idx, frame->buf+frame->len, &str, &sz); + if (str == NULL) /* this is not an error */ + return idx; + + DEBUG(frame->worker, "<%lu> HAProxy capabilities : %.*s", + client->id, (int)sz, str); + + i = 0; + while (i < sz) { + char *delim; + + /* Skip leading spaces */ + for (; isspace(str[i]) && i < sz; i++); + + if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) { + i += 10; + if (sz == i || isspace(str[i]) || str[i] == ',') { + DEBUG(frame->worker, + "<%lu> HAProxy supports frame pipelining", + client->id); + client->pipelining = true; + } + + } + else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) { + i += 5; + if (sz == i || isspace(str[i]) || str[i] == ',') { + DEBUG(frame->worker, + "<%lu> HAProxy supports asynchronous frame", + client->id); + client->async = true; + } + } + + if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL) + break; + i = (delim - str) + 1; + } + + return idx; +} + +/* Check engine-id value. It returns -1 if an error occurred, the number of + * read bytes otherwise. */ +static int +check_engine_id(struct spoe_frame *frame, int idx) +{ + struct client *client = frame->client; + char *str; + uint64_t sz; + + if ((frame->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) + return -1; + + idx += decode_spoe_string(frame->buf+idx, frame->buf+frame->len, &str, &sz); + if (str == NULL) /* this is not an error */ + return idx; + + if (client->engine != NULL) + return idx; + + DEBUG(frame->worker, "<%lu> HAProxy engine id : %.*s", + client->id, (int)sz, str); + + client->engine_id = strndup(str, (int)sz); + return idx; +} + +/* Check disconnect status code. It returns -1 if an error occurred, the number + * of read bytes otherwise. */ +static int +check_discon_status_code(struct spoe_frame *frame, int idx) +{ + uint64_t sz; + int type, i; + + /* Get the "status-code" value */ + type = frame->buf[idx++]; + if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) + return -1; + if ((i = decode_spoe_varint(frame->buf+idx, frame->buf+frame->len, &sz)) == -1) + return -1; + idx += i; + + frame->client->status_code = (unsigned int)sz; + + DEBUG(frame->worker, "<%lu> Disconnect status code : %u", + frame->client->id, frame->client->status_code); + + return idx; +} + +/* Check the disconnect message. It returns -1 if an error occurred, the number + * of read bytes otherwise. */ +static int +check_discon_message(struct spoe_frame *frame, int idx) +{ + char *str; + uint64_t sz; + + /* Get the "message" value */ + if ((frame->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) + return -1; + idx += decode_spoe_string(frame->buf+idx, frame->buf+frame->len, &str, &sz); + if (str == NULL) + return -1; + + DEBUG(frame->worker, "<%lu> Disconnect message : %.*s", + frame->client->id, (int)sz, str); + + return idx; +} /* Decode a HELLO frame received from HAProxy. It returns -1 if an error - * occurred, 0 if the frame must be skipped, otherwise the number of read - * bytes. */ + * occurred, otherwise the number of read bytes. HELLO frame cannot be + * ignored and having another frame than a HELLO frame is an error. */ static int -handle_hahello(struct worker *w) +handle_hahello(struct spoe_frame *frame) { - char *end = w->buf+w->len; - int i, idx = 0; + struct client *client = frame->client; + char *buf = frame->buf; + char *end = frame->buf + frame->len; + int i, idx = 0; - /* Check frame type */ - if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO) - goto skip; + /* Check frame type: we really want a HELLO frame */ + if (buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO) + goto error; + + DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id); /* Skip flags */ idx += 4; /* stream-id and frame-id must be cleared */ - if (w->buf[idx] != 0 || w->buf[idx+1] != 0) { - w->status_code = SPOE_FRM_ERR_INVALID; + if (buf[idx] != 0 || buf[idx+1] != 0) { + client->status_code = SPOE_FRM_ERR_INVALID; goto error; } idx += 2; /* Loop on K/V items */ - while (idx < w->len) { + while (buf+idx < end) { char *str; uint64_t sz; /* Decode the item name */ - idx += decode_spoe_string(w->buf+idx, end, &str, &sz); + idx += decode_spoe_string(buf+idx, end, &str, &sz); if (str == NULL) { - w->status_code = SPOE_FRM_ERR_INVALID; + client->status_code = SPOE_FRM_ERR_INVALID; goto error; } /* Check "supported-versions" K/V item */ if (!memcmp(str, "supported-versions", sz)) { - if ((i = check_proto_version(w, idx)) == -1) + if ((i = check_proto_version(frame, idx)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; goto error; + } idx = i; } - /* Check "max-frame-size" K/V item "*/ + /* Check "max-frame-size" K/V item */ else if (!memcmp(str, "max-frame-size", sz)) { - if ((i = check_max_frame_size(w, idx)) == -1) + if ((i = check_max_frame_size(frame, idx)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; goto error; + } idx = i; } - /* Check "healthcheck" K/V item "*/ + /* Check "healthcheck" K/V item */ else if (!memcmp(str, "healthcheck", sz)) { - if ((i = check_healthcheck(w, idx)) == -1) + if ((i = check_healthcheck(frame, idx)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; goto error; + } + idx = i; + } + /* Check "capabilities" K/V item */ + else if (!memcmp(str, "capabilities", sz)) { + if ((i = check_capabilities(frame, idx)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; + goto error; + } + idx = i; + } + /* Check "engine-id" K/V item */ + else if (!memcmp(str, "engine-id", sz)) { + if ((i = check_engine_id(frame, idx)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; + goto error; + } idx = i; } - /* Skip "capabilities" K/V item for now */ else { + DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s", + client->id, (int)sz, str); + /* Silently ignore unknown item */ - if ((i = skip_spoe_data(w->buf+idx, end)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; + if ((i = skip_spoe_data(buf+idx, end)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; goto error; } idx += i; } } + if (async == false || client->engine_id == NULL) + client->async = false; + if (pipelining == false) + client->pipelining = false; + + if (client->async == true) + use_spoe_engine(client); + return idx; -skip: - return 0; -error: + + error: return -1; } /* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error - * occurred, 0 if the frame must be skipped, otherwise the number of read - * bytes. */ + * occurred, otherwise the number of read bytes. DISCONNECT frame cannot be + * ignored and having another frame than a DISCONNECT frame is an error.*/ static int -handle_hadiscon(struct worker *w) +handle_hadiscon(struct spoe_frame *frame) { - char *end = w->buf+w->len; - int i, idx = 0; + struct client *client = frame->client; + char *buf = frame->buf; + char *end = frame->buf + frame->len; + int i, idx = 0; - /* Check frame type */ - if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON) - goto skip; + /* Check frame type: we really want a DISCONNECT frame */ + if (buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON) + goto error; + + DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id); /* Skip flags */ idx += 4; /* stream-id and frame-id must be cleared */ - if (w->buf[idx] != 0 || w->buf[idx+1] != 0) { - w->status_code = SPOE_FRM_ERR_INVALID; + if (buf[idx] != 0 || buf[idx+1] != 0) { + client->status_code = SPOE_FRM_ERR_INVALID; goto error; } idx += 2; + client->status_code = SPOE_FRM_ERR_NONE; + /* Loop on K/V items */ - while (idx < w->len) { + while (buf+idx < end) { char *str; uint64_t sz; /* Decode item key */ - idx += decode_spoe_string(w->buf+idx, end, &str, &sz); + idx += decode_spoe_string(buf+idx, end, &str, &sz); if (str == NULL) { - w->status_code = SPOE_FRM_ERR_INVALID; + client->status_code = SPOE_FRM_ERR_INVALID; goto error; } - /* Silently ignore unknown item */ - if ((i = skip_spoe_data(w->buf+idx, end)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; + + /* Check "status-code" K/V item */ + if (!memcmp(str, "status-code", sz)) { + if ((i = check_discon_status_code(frame, idx)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; + goto error; + } + idx = i; + } + /* Check "message" K/V item */ + else if (!memcmp(str, "message", sz)) { + if ((i = check_discon_message(frame, idx)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; + goto error; + } + idx = i; + } + else { + DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s", + client->id, (int)sz, str); + + /* Silently ignore unknown item */ + if ((i = skip_spoe_data(buf+idx, end)) == -1) { + client->status_code = SPOE_FRM_ERR_INVALID; + goto error; + } + idx += i; } - idx += i; } - w->status_code = SPOE_FRM_ERR_NONE; return idx; -skip: - return 0; -error: + + error: return -1; } /* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error - * occurred, 0 if the frame must be skipped, otherwise the number of read - * bytes. */ + * occurred or if the frame must be ignored, 0 if the frame must be ack without + * any processing, otherwise the number of read bytes (always > 0). */ static int -handle_hanotify(struct worker *w) +handle_hanotify(struct spoe_frame *frame) { - char *end = w->buf+w->len; - uint64_t stream_id, frame_id; - int nbargs, i, idx = 0; + struct client *client = frame->client; + char *buf = frame->buf; + char *end = frame->buf + frame->len; + uint64_t stream_id, frame_id; + int i, idx = 0; /* Check frame type */ - if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY) - goto skip; + if (buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY) + goto ignore; + + DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id); /* Skip flags */ idx += 4; /* Read the stream-id */ - if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } + if ((i = decode_spoe_varint(buf+idx, end, &stream_id)) == -1) + goto ignore; idx += i; /* Read the frame-id */ - if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } + if ((i = decode_spoe_varint(buf+idx, end, &frame_id)) == -1) + goto ignore; idx += i; - w->stream_id = (unsigned int)stream_id; - w->frame_id = (unsigned int)frame_id; + frame->stream_id = (unsigned int)stream_id; + frame->frame_id = (unsigned int)frame_id; - DEBUG("Notify frame received: stream-id=%u - frame-id=%u", - w->stream_id, w->frame_id); + DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u", + client->id, frame->stream_id, frame->frame_id); + + if (buf + idx == end) { + return 0; + } + + frame->offset = idx; + return idx; + + ignore: + return -1; +} + +/* Encode a HELLO frame to send it to HAProxy. It returns the number of written + * bytes. */ +static int +prepare_agenthello(struct spoe_frame *frame) +{ + struct client *client = frame->client; + char *buf = frame->buf; + int idx = 0; + + DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id); + frame->type = SPOA_FRM_T_AGENT; + + /* Frame Type */ + buf[idx++] = SPOE_FRM_T_AGENT_HELLO; + + /* No flags for now */ + memset(buf+idx, 0, 4); /* No flags */ + idx += 4; + + /* No stream-id and frame-id for HELLO frames */ + buf[idx++] = 0; + buf[idx++] = 0; + + /* "version" K/V item */ + idx += encode_spoe_string("version", 7, buf+idx); + buf[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), buf+idx); + DEBUG(frame->worker, "<%lu> Agent version : %s", + client->id, SPOP_VERSION); + + + /* "max-frame-size" K/V item */ + idx += encode_spoe_string("max-frame-size", 14, buf+idx); + buf[idx++] = SPOE_DATA_T_UINT32; + idx += encode_spoe_varint(client->max_frame_size, buf+idx); + DEBUG(frame->worker, "<%lu> Agent maximum frame size : %u", + client->id, client->max_frame_size); + + /* "capabilities" K/V item */ + idx += encode_spoe_string("capabilities", 12, buf+idx); + buf[idx++] = SPOE_DATA_T_STR; + if (client->pipelining == true && client->async == true) + idx += encode_spoe_string("pipelining,async", 16, buf+idx); + else if (client->pipelining == true) + idx += encode_spoe_string("pipelining", 10, buf+idx); + else if (client->async == true) + idx += encode_spoe_string("async", 5, buf+idx); + else + idx += encode_spoe_string(NULL, 0, buf+idx); + + DEBUG(frame->worker, "<%lu> Agent capabilities : %s %s", + client->id, (client->pipelining?"pipelining":""), + (client->async?"async":"")); + + frame->len = idx; + return idx; +} + +/* Encode a DISCONNECT frame to send it to HAProxy. It returns the number of + * written bytes. */ +static int +prepare_agentdicon(struct spoe_frame *frame) +{ + struct client *client = frame->client; + char *buf = frame->buf; + const char *reason; + int rlen, idx = 0; + + DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id); + frame->type = SPOA_FRM_T_AGENT; + + if (client->status_code >= SPOE_FRM_ERRS) + client->status_code = SPOE_FRM_ERR_UNKNOWN; + reason = spoe_frm_err_reasons[client->status_code]; + rlen = strlen(reason); + + /* Frame type */ + buf[idx++] = SPOE_FRM_T_AGENT_DISCON; + + /* No flags for now */ + memset(buf+idx, 0, 4); + idx += 4; + + /* No stream-id and frame-id for DISCONNECT frames */ + buf[idx++] = 0; + buf[idx++] = 0; + + /* There are 2 mandatory items: "status-code" and "message" */ + + /* "status-code" K/V item */ + idx += encode_spoe_string("status-code", 11, buf+idx); + buf[idx++] = SPOE_DATA_T_UINT32; + idx += encode_spoe_varint(client->status_code, buf+idx); + DEBUG(frame->worker, "<%lu> Disconnect status code : %u", + client->id, client->status_code); + + /* "message" K/V item */ + idx += encode_spoe_string("message", 7, buf+idx); + buf[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(reason, rlen, buf+idx); + DEBUG(frame->worker, "<%lu> Disconnect message : %s", + client->id, reason); + + frame->len = idx; + return idx; +} + +/* Encode a ACK frame to send it to HAProxy. It returns the number of written + * bytes. */ +static int +prepare_agentack(struct spoe_frame *frame) +{ + char *buf = frame->buf; + int idx = 0; + + /* Be careful here, in async mode, frame->client can be NULL */ + + DEBUG(frame->worker, "Encode Agent ACK frame"); + frame->type = SPOA_FRM_T_AGENT; + + /* Frame type */ + buf[idx++] = SPOE_FRM_T_AGENT_ACK; + + /* No flags for now */ + memset(buf+idx, 0, 4); /* No flags */ + idx += 4; + + /* Set stream-id and frame-id for ACK frames */ + idx += encode_spoe_varint(frame->stream_id, buf+idx); + idx += encode_spoe_varint(frame->frame_id, buf+idx); + + DEBUG(frame->worker, "STREAM-ID=%u - FRAME-ID=%u", + frame->stream_id, frame->frame_id); + + frame->len = idx; + return idx; +} + +static int +create_server_socket(void) +{ + struct sockaddr_in listen_addr; + int fd, yes = 1; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + LOG(&null_worker, "Failed to create service socket : %m"); + return -1; + } + + memset(&listen_addr, 0, sizeof(listen_addr)); + listen_addr.sin_family = AF_INET; + listen_addr.sin_addr.s_addr = INADDR_ANY; + listen_addr.sin_port = htons(server_port); + + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0 || + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) < 0) { + LOG(&null_worker, "Failed to set option on server socket : %m"); + return -1; + } + + if (bind(fd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)) < 0) { + LOG(&null_worker, "Failed to bind server socket : %m"); + return -1; + } + + if (listen(fd, CONNECTION_BACKLOG) < 0) { + LOG(&null_worker, "Failed to listen on server socket : %m"); + return -1; + } + + return fd; +} + +static void +release_frame(struct spoe_frame *frame) +{ + struct worker *worker; + + if (frame == NULL) + return; + + if (event_pending(&frame->process_frame_event, EV_TIMEOUT, NULL)) + event_del(&frame->process_frame_event); + + worker = frame->worker; + LIST_DEL(&frame->list); + memset(frame, 0, sizeof(*frame)+max_frame_size+4); + LIST_ADDQ(&worker->frames, &frame->list); +} + +static void +release_client(struct client *c) +{ + struct spoe_frame *frame, *back; + + if (c == NULL) + return; + + DEBUG(c->worker, "<%lu> Release client", c->id); + + LIST_DEL(&c->by_worker); + c->worker->nbclients--; + + unuse_spoe_engine(c); + free(c->engine_id); + + if (event_pending(&c->read_frame_event, EV_READ, NULL)) + event_del(&c->read_frame_event); + if (event_pending(&c->write_frame_event, EV_WRITE, NULL)) + event_del(&c->write_frame_event); + + release_frame(c->incoming_frame); + release_frame(c->outgoing_frame); + list_for_each_entry_safe(frame, back, &c->processing_frames, list) { + release_frame(frame); + } + list_for_each_entry_safe(frame, back, &c->outgoing_frames, list) { + release_frame(frame); + } + + if (c->fd >= 0) + close(c->fd); + + free(c); +} + +static void +reset_frame(struct spoe_frame *frame) +{ + if (frame == NULL) + return; + + frame->type = SPOA_FRM_T_UNKNOWN; + frame->buf = (char *)(frame->data); + frame->offset = 0; + frame->len = 0; + frame->stream_id = 0; + frame->frame_id = 0; + frame->hcheck = false; + frame->ip_score = -1; + LIST_INIT(&frame->list); +} + +static void +use_spoe_engine(struct client *client) +{ + struct spoe_engine *eng; + + if (client->engine_id == NULL) + return; + + list_for_each_entry(eng, &client->worker->engines, list) { + if (!strcmp(eng->id, client->engine_id)) + goto end; + } + + if ((eng = malloc(sizeof(*eng))) == NULL) { + client->async = false; + return; + } + + eng->id = strdup(client->engine_id); + LIST_INIT(&eng->clients); + LIST_INIT(&eng->processing_frames); + LIST_INIT(&eng->outgoing_frames); + LIST_ADDQ(&client->worker->engines, &eng->list); + LOG(client->worker, "Add new SPOE engine '%s'", eng->id); + + end: + client->engine = eng; + LIST_ADDQ(&eng->clients, &client->by_engine); +} + +static void +unuse_spoe_engine(struct client *client) +{ + struct spoe_engine *eng; + struct spoe_frame *frame, *back; + + if (client == NULL || client->engine == NULL) + return; + + eng = client->engine; + client->engine = NULL; + LIST_DEL(&client->by_engine); + if (!LIST_ISEMPTY(&eng->clients)) + return; + + LOG(client->worker, "Remove SPOE engine '%s'", eng->id); + LIST_DEL(&eng->list); + + list_for_each_entry_safe(frame, back, &eng->processing_frames, list) { + release_frame(frame); + } + list_for_each_entry_safe(frame, back, &eng->outgoing_frames, list) { + release_frame(frame); + } + free(eng->id); + free(eng); +} + + +static struct spoe_frame * +acquire_incoming_frame(struct client *client) +{ + struct spoe_frame *frame; + + frame = client->incoming_frame; + if (frame != NULL) + return frame; + + if (LIST_ISEMPTY(&client->worker->frames)) { + if ((frame = calloc(1, sizeof(*frame)+max_frame_size+4)) == NULL) { + LOG(client->worker, "Failed to allocate new frame : %m"); + return NULL; + } + } + else { + frame = LIST_NEXT(&client->worker->frames, typeof(frame), list); + LIST_DEL(&frame->list); + } + + reset_frame(frame); + frame->worker = client->worker; + frame->engine = client->engine; + frame->client = client; + + if (event_assign(&frame->process_frame_event, client->worker->base, -1, + EV_TIMEOUT|EV_PERSIST, process_frame_cb, frame) < 0) { + LOG(client->worker, "Failed to create frame event"); + return NULL; + } + + client->incoming_frame = frame; + return frame; +} + +static struct spoe_frame * +acquire_outgoing_frame(struct client *client) +{ + struct spoe_engine *engine = client->engine; + struct spoe_frame *frame = NULL; + + if (client->outgoing_frame != NULL) + frame = client->outgoing_frame; + else if (!LIST_ISEMPTY(&client->outgoing_frames)) { + frame = LIST_NEXT(&client->outgoing_frames, typeof(frame), list); + LIST_DEL(&frame->list); + client->outgoing_frame = frame; + } + else if (engine!= NULL && !LIST_ISEMPTY(&engine->outgoing_frames)) { + frame = LIST_NEXT(&engine->outgoing_frames, typeof(frame), list); + LIST_DEL(&frame->list); + client->outgoing_frame = frame; + } + return frame; +} + +static void +write_frame(struct client *client, struct spoe_frame *frame) +{ + uint32_t netint; + + LIST_DEL(&frame->list); + + frame->buf = (char *)(frame->data); + frame->offset = 0; + netint = htonl(frame->len); + memcpy(frame->buf, &netint, 4); + + if (client != NULL) { /* HELLO or DISCONNECT frames */ + event_add(&client->write_frame_event, NULL); + + /* Try to process the frame as soon as possible, and always + * attach it to the client */ + if (client->async || client->pipelining) { + if (client->outgoing_frame == NULL) + client->outgoing_frame = frame; + else + LIST_ADD(&client->outgoing_frames, &frame->list); + } + else { + client->outgoing_frame = frame; + event_del(&client->read_frame_event); + } + } + else { /* for all other frames */ + if (frame->client == NULL) { /* async mode ! */ + LIST_ADDQ(&frame->engine->outgoing_frames, &frame->list); + list_for_each_entry(client, &frame->engine->clients, by_engine) + event_add(&client->write_frame_event, NULL); + } + else if (frame->client->pipelining) { + LIST_ADDQ(&frame->client->outgoing_frames, &frame->list); + event_add(&frame->client->write_frame_event, NULL); + } + else { + frame->client->outgoing_frame = frame; + event_add(&frame->client->write_frame_event, NULL); + event_del(&frame->client->read_frame_event); + } + } +} + +static void +process_incoming_frame(struct spoe_frame *frame) +{ + struct client *client = frame->client; + + if (event_add(&frame->process_frame_event, &processing_delay) < 0) { + LOG(client->worker, "Failed to process incoming frame"); + release_frame(frame); + return; + } + + if (client->async) { + frame->client = NULL; + LIST_ADDQ(&frame->engine->processing_frames, &frame->list); + } + else if (client->pipelining) + LIST_ADDQ(&client->processing_frames, &frame->list); + else + event_del(&client->read_frame_event); +} + +static void +signal_cb(evutil_socket_t sig, short events, void *user_data) +{ + struct event_base *base = user_data; + int i; + + DEBUG(&null_worker, "Stopping the server"); + + event_base_loopbreak(base); + DEBUG(&null_worker, "Main event loop stopped"); + + for (i = 0; i < num_workers; i++) { + event_base_loopbreak(workers[i].base); + DEBUG(&null_worker, "Event loop stopped for worker %02d", + workers[i].id); + } +} + +static void +worker_monitor_cb(evutil_socket_t fd, short events, void *arg) +{ + struct worker *worker = arg; + + LOG(worker, "%u clients connected", worker->nbclients); +} + +static void +process_frame_cb(evutil_socket_t fd, short events, void *arg) +{ + struct spoe_frame *frame = arg; + char *buf = frame->buf; + char *end = frame->buf + frame->len; + int idx = frame->offset; + + DEBUG(frame->worker, + "Process frame messages : STREAM-ID=%u - FRAME-ID=%u", + frame->stream_id, frame->frame_id); /* Loop on messages */ - while (idx < w->len) { + while (buf+idx < end) { char *str; uint64_t sz; + int nbargs, i; /* Decode the message name */ - idx += decode_spoe_string(w->buf+idx, end, &str, &sz); - if (str == NULL) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - DEBUG(" Message '%.*s' received", (int)sz, str); + idx += decode_spoe_string(buf+idx, end, &str, &sz); + if (str == NULL) + goto stop_processing; - nbargs = w->buf[idx++]; + DEBUG(frame->worker, "Process SPOE Message '%.*s'", (int)sz, str); + + nbargs = buf[idx++]; /* Get the number of arguments */ + frame->offset = idx; /* Save index to handle errors and skip args */ if (!memcmp(str, "check-client-ip", sz)) { struct spoe_data data; memset(&data, 0, sizeof(data)); - if (nbargs != 1) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } + if (nbargs != 1) + goto skip_message; + + if ((i = decode_spoe_string(buf+idx, end, &str, &sz)) == -1) + goto stop_processing; idx += i; - if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } + + if ((i = decode_spoe_data(buf+idx, end, &data)) == -1) + goto skip_message; idx += i; + if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4) - check_ipv4_reputation(w, &data.u.ipv4); + check_ipv4_reputation(frame, &data.u.ipv4); else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6) - check_ipv6_reputation(w, &data.u.ipv6); - else { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } + check_ipv6_reputation(frame, &data.u.ipv6); } else { + skip_message: + idx = frame->offset; /* Restore index */ + while (nbargs-- > 0) { /* Silently ignore argument: its name and its value */ - if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } + if ((i = decode_spoe_string(buf+idx, end, &str, &sz)) == -1) + goto stop_processing; idx += i; - if ((i = skip_spoe_data(w->buf+idx, end)) == -1) { - w->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } + if ((i = skip_spoe_data(buf+idx, end)) == -1) + goto stop_processing; idx += i; } } } - return idx; -skip: - return 0; -error: - return -1; + stop_processing: + /* Prepare agent ACK frame */ + frame->offset = 0; + idx = prepare_agentack(frame); + + if (frame->ip_score != -1) { + DEBUG(frame->worker, "Add action : set variable ip_scode=%u", + frame->ip_score); + + buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */ + buf[idx++] = 3; /* Number of args */ + buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */ + idx += encode_spoe_string("ip_score", 8, buf+idx); /* Arg 2: variable name */ + buf[idx++] = SPOE_DATA_T_UINT32; + idx += encode_spoe_varint(frame->ip_score, buf+idx); /* Arg 3: variable value */ + frame->len = idx; + } + write_frame(NULL, frame); } -/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error - * occurred, the number of written bytes otherwise. */ -static int -prepare_agenthello(struct worker *w) +static void +read_frame_cb(evutil_socket_t fd, short events, void *arg) { - int idx = 0; + struct client *client = arg; + struct spoe_frame *frame; + uint32_t netint; + int n; - /* Frame Type */ - w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO; + DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__); + if ((frame = acquire_incoming_frame(client)) == NULL) + goto close; - /* No flags for now */ - memset(w->buf+idx, 0, 4); /* No flags */ - idx += 4; + frame->type = SPOA_FRM_T_HAPROXY; + if (frame->buf == (char *)(frame->data)) { + /* Read the frame length: frame->buf points on length part (frame->data) */ + n = read(client->fd, frame->buf+frame->offset, 4-frame->offset); + if (n <= 0) { + if (n < 0) + LOG(client->worker, "Failed to read frame length : %m"); + goto close; + } + frame->offset += n; + if (frame->offset != 4) + return; + memcpy(&netint, frame->buf, 4); + frame->buf += 4; + frame->offset = 0; + frame->len = ntohl(netint); + } - /* No stream-id and frame-id for HELLO frames */ - w->buf[idx++] = 0; - w->buf[idx++] = 0; + /* Read the frame: frame->buf points on frame part (frame->data+4)*/ + n = read(client->fd, frame->buf + frame->offset, + frame->len - frame->offset); + if (n <= 0) { + if (n < 0) { + LOG(client->worker, "Frame to read frame : %m"); + goto close; + } + return; + } + frame->offset += n; + if (frame->offset != frame->len) + return; + frame->offset = 0; - /* "version" K/V item */ - idx += encode_spoe_string("version", 7, w->buf+idx); - w->buf[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx); + DEBUG(client->worker, "<%lu> New Frame of %u bytes received", + client->id, frame->len); - /* "max-frame-size" K/V item */ - idx += encode_spoe_string("max-frame-size", 14, w->buf+idx); - w->buf[idx++] = SPOE_DATA_T_UINT32; - idx += encode_spoe_varint(w->size, w->buf+idx); + switch (client->state) { + case SPOA_ST_CONNECTING: + if (handle_hahello(frame) < 0) { + LOG(client->worker, "Failed to decode HELLO frame"); + goto disconnect; + } + prepare_agenthello(frame); + goto write_frame; - /* "capabilities" K/V item */ - idx += encode_spoe_string("capabilities", 12, w->buf+idx); - w->buf[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx); + case SPOA_ST_PROCESSING: + n = handle_hanotify(frame); + if (n < 0 && frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) { + client->state = SPOA_ST_DISCONNECTING; + goto disconnecting; + } + if (n < 0) { + LOG(client->worker, "Ignore invalid or unknown frame"); + goto ignore_frame; + } + if (n == 0) { + DEBUG(client->worker, "<%lu> No message found, ack it now", + client->id); + prepare_agentack(frame); + goto write_frame; + } + else + goto process_frame; - w->len = idx; - return idx; + case SPOA_ST_DISCONNECTING: + disconnecting: + if (handle_hadiscon(frame) < 0) { + LOG(client->worker, "Failed to decode DISCONNECT frame"); + goto disconnect; + } + if (client->status_code != SPOE_FRM_ERR_NONE) + LOG(client->worker, "<%lu> Peer closed connection: %s", + client->id, spoe_frm_err_reasons[client->status_code]); + client->status_code = SPOE_FRM_ERR_NONE; + goto disconnect; + } + + ignore_frame: + reset_frame(frame); + return; + + process_frame: + process_incoming_frame(frame); + client->incoming_frame = NULL; + return; + + write_frame: + write_frame(client, frame); + client->incoming_frame = NULL; + return; + + disconnect: + client->state = SPOA_ST_DISCONNECTING; + if (prepare_agentdicon(frame) < 0) { + LOG(client->worker, "Failed to encode DISCONNECT frame"); + goto close; + } + goto write_frame; + + close: + release_client(client); } -/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred, - * the number of written bytes otherwise. */ -static int -prepare_agentack(struct worker *w) +static void +write_frame_cb(evutil_socket_t fd, short events, void *arg) { - int idx = 0; + struct client *client = arg; + struct spoe_frame *frame; + int n; - /* Frame type */ - w->buf[idx++] = SPOE_FRM_T_AGENT_ACK; + DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__); + if ((frame = acquire_outgoing_frame(client)) == NULL) { + event_del(&client->write_frame_event); + return; + } - /* No flags for now */ - memset(w->buf+idx, 0, 4); /* No flags */ - idx += 4; + if (frame->buf == (char *)(frame->data)) { + /* Write the frame length: frame->buf points on length part (frame->data) */ + n = write(client->fd, frame->buf+frame->offset, 4-frame->offset); + if (n <= 0) { + if (n < 0) + LOG(client->worker, "Failed to write frame length : %m"); + goto close; + } + frame->offset += n; + if (frame->offset != 4) + return; + frame->buf += 4; + frame->offset = 0; + } - /* Set stream-id and frame-id for ACK frames */ - idx += encode_spoe_varint(w->stream_id, w->buf+idx); - idx += encode_spoe_varint(w->frame_id, w->buf+idx); + /* Write the frame: frame->buf points on frame part (frame->data+4)*/ + n = write(client->fd, frame->buf + frame->offset, + frame->len - frame->offset); + if (n <= 0) { + if (n < 0) { + LOG(client->worker, "Failed to write frame : %m"); + goto close; + } + return; + } + frame->offset += n; + if (frame->offset != frame->len) + return; - /* Data */ - if (w->ip_score == -1) - goto out; + DEBUG(client->worker, "<%lu> Frame of %u bytes send", + client->id, frame->len); - w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */ - w->buf[idx++] = 3; /* Number of args */ - w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */ - idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */ - w->buf[idx++] = SPOE_DATA_T_UINT32; - idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */ -out: - w->len = idx; - return idx; + switch (client->state) { + case SPOA_ST_CONNECTING: + if (frame->hcheck == true) { + DEBUG(client->worker, + "<%lu> Close client after healthcheck", + client->id); + goto close; + } + client->state = SPOA_ST_PROCESSING; + break; + + case SPOA_ST_PROCESSING: + break; + + case SPOA_ST_DISCONNECTING: + goto close; + } + + release_frame(frame); + client->outgoing_frame = NULL; + if (!client->async && !client->pipelining) { + event_del(&client->write_frame_event); + event_add(&client->read_frame_event, NULL); + } + return; + + close: + release_client(client); } -/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error - * occurred, the number of written bytes otherwise. */ -static int -prepare_agentdicon(struct worker *w) +static void +accept_cb(int listener, short event, void *arg) { - const char *reason; - int rlen, idx = 0; + struct worker *worker; + struct client *client; + int fd; - if (w->status_code >= SPOE_FRM_ERRS) - w->status_code = SPOE_FRM_ERR_UNKNOWN; - reason = spoe_frm_err_reasons[w->status_code]; - rlen = strlen(reason); + worker = &workers[clicount++ % num_workers]; - /* Frame type */ - w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON; - - /* No flags for now */ - memset(w->buf+idx, 0, 4); - idx += 4; - - /* No stream-id and frame-id for DISCONNECT frames */ - w->buf[idx++] = 0; - w->buf[idx++] = 0; - - /* There are 2 mandatory items: "status-code" and "message" */ - - /* "status-code" K/V item */ - idx += encode_spoe_string("status-code", 11, w->buf+idx); - w->buf[idx++] = SPOE_DATA_T_UINT32; - idx += encode_spoe_varint(w->status_code, w->buf+idx); - - /* "message" K/V item */ - idx += encode_spoe_string("message", 7, w->buf+idx); - w->buf[idx++] = SPOE_DATA_T_STR; - idx += encode_spoe_string(reason, rlen, w->buf+idx); - - w->len = idx; - return idx; -} - -static int -hello_handshake(int sock, struct worker *w) -{ - if (read_frame(sock, w) < 0) { - LOG("Failed to read Haproxy HELLO frame"); - goto error; + if ((fd = accept(listener, NULL, NULL)) < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) + LOG(worker, "Failed to accept client connection : %m"); + return; } - if (handle_hahello(w) < 0) { - LOG("Failed to handle Haproxy HELLO frame"); - goto error; - } - if (prepare_agenthello(w) < 0) { - LOG("Failed to prepare Agent HELLO frame"); - goto error; - } - if (write_frame(sock, w) < 0) { - LOG("Failed to write Agent frame"); - goto error; - } - DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s", - SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false")); - return 0; -error: - return -1; -} -static int -notify_ack_roundtip(int sock, struct worker *w) -{ - if (read_frame(sock, w) < 0) { - LOG("Failed to read Haproxy NOTIFY frame"); - goto error_or_quit; + DEBUG(&null_worker, + "<%lu> New Client connection accepted and assigned to worker %02d", + clicount, worker->id); + + if (evutil_make_socket_nonblocking(fd) < 0) { + LOG(&null_worker, "Failed to set client socket to non-blocking : %m"); + close(fd); + return; } - if (handle_hadiscon(w) != 0) { - if (w->status_code != SPOE_FRM_ERR_NONE) - LOG("Failed to handle Haproxy DISCONNECT frame"); - DEBUG("Disconnect frame received: reason=%s", - spoe_frm_err_reasons[w->status_code]); - goto error_or_quit; + + if ((client = calloc(1, sizeof(*client))) == NULL) { + LOG(&null_worker, "Failed to allocate memory for client state : %m"); + close(fd); + return; } - if (handle_hanotify(w) < 0) { - LOG("Failed to handle Haproxy NOTIFY frame"); - goto error_or_quit; + + client->id = clicount; + client->fd = fd; + client->worker = worker; + client->state = SPOA_ST_CONNECTING; + client->status_code = SPOE_FRM_ERR_NONE; + client->max_frame_size = max_frame_size; + client->engine = NULL; + client->pipelining = false; + client->async = false; + client->incoming_frame = NULL; + client->outgoing_frame = NULL; + LIST_INIT(&client->processing_frames); + LIST_INIT(&client->outgoing_frames); + + LIST_ADDQ(&worker->clients, &client->by_worker); + + worker->nbclients++; + + if (event_assign(&client->read_frame_event, worker->base, fd, + EV_READ|EV_PERSIST, read_frame_cb, client) < 0 || + event_assign(&client->write_frame_event, worker->base, fd, + EV_WRITE|EV_PERSIST, write_frame_cb, client) < 0) { + LOG(&null_worker, "Failed to create client events"); + release_client(client); + return; } - if (prepare_agentack(w) < 0) { - LOG("Failed to prepare Agent ACK frame"); - goto error_or_quit; - } - if (write_frame(sock, w) < 0) { - LOG("Failed to write Agent ACK frame"); - goto error_or_quit; - } - DEBUG("Ack frame sent: stream-id=%u - frame-id=%u", - w->stream_id, w->frame_id); - return 0; -error_or_quit: - return -1; + event_add(&client->read_frame_event, NULL); } static void * -worker(void *data) +worker_function(void *data) { - struct worker w; - struct sockaddr_in client; - int *info = (int *)data; - int csock, lsock = info[0]; + struct client *client, *cback; + struct spoe_frame *frame, *fback; + struct worker *worker = data; - signal(SIGPIPE, SIG_IGN); - pthread_setspecific(worker_id, &info[1]); + DEBUG(worker, "Worker ready to process client messages"); + event_base_dispatch(worker->base); - while (1) { - socklen_t sz = sizeof(client); - - if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) { - LOG("Failed to accept client connection: %m"); - goto out; - } - memset(&w, 0, sizeof(w)); - w.id = info[1]; - w.size = MAX_FRAME_SIZE; - - DEBUG("New connection from HAProxy accepted"); - - if (hello_handshake(csock, &w) < 0) - goto disconnect; - if (w.healthcheck == true) - goto close; - while (1) { - w.ip_score = -1; - if (notify_ack_roundtip(csock, &w) < 0) - break; - } - - disconnect: - if (w.status_code == SPOE_FRM_ERR_IO) { - LOG("Close the client socket because of I/O errors"); - goto close; - } - if (prepare_agentdicon(&w) < 0) { - LOG("Failed to prepare Agent DISCONNECT frame"); - goto close; - } - if (write_frame(csock, &w) < 0) { - LOG("Failed to write Agent DISCONNECT frame"); - goto close; - } - DEBUG("Disconnect frame sent: reason=%s", - spoe_frm_err_reasons[w.status_code]); - - close: - close(csock); + list_for_each_entry_safe(client, cback, &worker->clients, by_worker) { + release_client(client); } -out: - free(info); - pthread_exit(NULL); + list_for_each_entry_safe(frame, fback, &worker->frames, list) { + LIST_DEL(&frame->list); + free(frame); + } + + event_free(worker->monitor_event); + event_base_free(worker->base); + DEBUG(worker, "Worker is stopped"); + pthread_exit(&null_worker); } + +static int +parse_processing_delay(const char *str) +{ + unsigned long value; + + value = 0; + while (1) { + unsigned int j; + + j = *str - '0'; + if (j > 9) + break; + str++; + value *= 10; + value += j; + } + + switch (*str) { + case '\0': /* no unit = millisecond */ + value *= 1000; + break; + case 's': /* second */ + value *= 1000000; + str++; + break; + case 'm': /* millisecond : "ms" */ + if (str[1] != 's') + return -1; + value *= 1000; + str += 2; + break; + case 'u': /* microsecond : "us" */ + if (str[1] != 's') + return -1; + str += 2; + break; + default: + return -1; + } + if (*str) + return -1; + + processing_delay.tv_sec = (time_t)(value / 1000000); + processing_delay.tv_usec = (suseconds_t)(value % 1000000); + return 0; +} + + static void usage(char *prog) { - fprintf(stderr, "Usage: %s [-h] [-d] [-p ] [-n ]\n", prog); - fprintf(stderr, " -h Print this message\n"); - fprintf(stderr, " -d Enable the debug mode\n"); - fprintf(stderr, " -p Specify the port to listen on (default: 12345)\n"); - fprintf(stderr, " -n Specify the number of workers (default: 5)\n"); + fprintf(stderr, + "Usage : %s [OPTION]...\n" + " -h Print this message\n" + " -d Enable the debug mode\n" + " -m Specify the maximum frame size (default : %u)\n" + " -p Specify the port to listen on (default : %d)\n" + " -n Specify the number of workers (default : %d)\n" + " -c Enable the support of the specified capability\n" + " -t