haproxy/contrib/spoa_example/spoa.c
Christopher Faulet b47e438593 BUG/MINOR: contrib/spoa_example: Don't reset the status code during disconnect
When the connection is closed by HAProxy, the status code provided in the
DISCONNECT frame is lost. By retransmitting it in the agent's reply, we are sure
to have it in the SPOE logs.

This patch may be backported in 1.8.
2018-06-04 17:34:50 +02:00

1904 lines
48 KiB
C

/*
* A Random IP reputation service acting as a Stream Processing Offload Agent
*
* This is a very simple service that implement a "random" ip reputation
* service. It will return random scores for all checked IP addresses. It only
* shows you how to implement a ip reputation service or such kind of services
* using the SPOE.
*
* Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version
* 2 of the License, or (at your option) any later version.
*
*/
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <err.h>
#include <ctype.h>
#include <pthread.h>
#include <event2/util.h>
#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/thread.h>
#include <mini-clist.h>
#include <spoe_types.h>
#include <spop_functions.h>
#define DEFAULT_PORT 12345
#define CONNECTION_BACKLOG 10
#define NUM_WORKERS 10
#define MAX_FRAME_SIZE 16384
#define SPOP_VERSION "2.0"
#define SLEN(str) (sizeof(str)-1)
#define LOG(worker, fmt, args...) \
do { \
struct timeval now; \
\
gettimeofday(&now, NULL); \
fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n", \
now.tv_sec, now.tv_usec, (worker)->id, ##args); \
} while (0)
#define DEBUG(x...) \
do { \
if (debug) \
LOG(x); \
} while (0)
enum spoa_state {
SPOA_ST_CONNECTING = 0,
SPOA_ST_PROCESSING,
SPOA_ST_DISCONNECTING,
};
enum spoa_frame_type {
SPOA_FRM_T_UNKNOWN = 0,
SPOA_FRM_T_HAPROXY,
SPOA_FRM_T_AGENT,
};
struct spoe_engine {
char *id;
struct list processing_frames;
struct list outgoing_frames;
struct list clients;
struct list list;
};
struct spoe_frame {
enum spoa_frame_type type;
char *buf;
unsigned int offset;
unsigned int len;
unsigned int stream_id;
unsigned int frame_id;
unsigned int flags;
bool hcheck; /* true is the CONNECT frame is a healthcheck */
bool fragmented; /* true if the frame is fragmented */
int ip_score; /* -1 if unset, else between 0 and 100 */
struct event process_frame_event;
struct worker *worker;
struct spoe_engine *engine;
struct client *client;
struct list list;
char *frag_buf; /* used to accumulate payload of a fragmented frame */
unsigned int frag_len;
char data[0];
};
struct client {
int fd;
unsigned long id;
enum spoa_state state;
struct event read_frame_event;
struct event write_frame_event;
struct spoe_frame *incoming_frame;
struct spoe_frame *outgoing_frame;
struct list processing_frames;
struct list outgoing_frames;
unsigned int max_frame_size;
int status_code;
char *engine_id;
struct spoe_engine *engine;
bool pipelining;
bool async;
bool fragmentation;
struct worker *worker;
struct list by_worker;
struct list by_engine;
};
struct worker {
pthread_t thread;
int id;
struct event_base *base;
struct event *monitor_event;
struct list engines;
unsigned int nbclients;
struct list clients;
struct list frames;
unsigned int nbframes;
};
/* Globals */
static struct worker *workers = NULL;
static struct worker null_worker = { .id = 0 };
static unsigned long clicount = 0;
static int server_port = DEFAULT_PORT;
static int num_workers = NUM_WORKERS;
static unsigned int max_frame_size = MAX_FRAME_SIZE;
struct timeval processing_delay = {0, 0};
static bool debug = false;
static bool pipelining = false;
static bool async = false;
static bool fragmentation = false;
static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
[SPOE_FRM_ERR_NONE] = "normal",
[SPOE_FRM_ERR_IO] = "I/O error",
[SPOE_FRM_ERR_TOUT] = "a timeout occurred",
[SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
[SPOE_FRM_ERR_INVALID] = "invalid frame received",
[SPOE_FRM_ERR_NO_VSN] = "version value not found",
[SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
[SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
[SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
[SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
[SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
[SPOE_FRM_ERR_INTERLACED_FRAMES] = "invalid interlaced frames",
[SPOE_FRM_ERR_FRAMEID_NOTFOUND] = "frame-id not found",
[SPOE_FRM_ERR_RES] = "resource allocation error",
[SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
};
static void signal_cb(evutil_socket_t, short, void *);
static void accept_cb(evutil_socket_t, short, void *);
static void worker_monitor_cb(evutil_socket_t, short, void *);
static void process_frame_cb(evutil_socket_t, short, void *);
static void read_frame_cb(evutil_socket_t, short, void *);
static void write_frame_cb(evutil_socket_t, short, void *);
static void use_spoe_engine(struct client *);
static void unuse_spoe_engine(struct client *);
static void release_frame(struct spoe_frame *);
static void release_client(struct client *);
static void
check_ipv4_reputation(struct spoe_frame *frame, struct in_addr *ipv4)
{
char str[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
return;
frame->ip_score = random() % 100;
DEBUG(frame->worker, "IP score for %.*s is %d",
INET_ADDRSTRLEN, str, frame->ip_score);
}
static void
check_ipv6_reputation(struct spoe_frame *frame, struct in6_addr *ipv6)
{
char str[INET6_ADDRSTRLEN];
if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
return;
frame->ip_score = random() % 100;
DEBUG(frame->worker, "IP score for %.*s is %d",
INET6_ADDRSTRLEN, str, frame->ip_score);
}
/* Check the protocol version. It returns -1 if an error occurred, the number of
* read bytes otherwise. */
static int
check_proto_version(struct spoe_frame *frame, char **buf, char *end)
{
char *str, *p = *buf;
uint64_t sz;
int ret;
/* Get the list of all supported versions by HAProxy */
if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
return -1;
ret = spoe_decode_buffer(&p, end, &str, &sz);
if (ret == -1 || !str)
return -1;
DEBUG(frame->worker, "<%lu> Supported versions : %.*s",
frame->client->id, (int)sz, str);
/* TODO: Find the right verion in supported ones */
ret = (p - *buf);
*buf = p;
return ret;
}
/* Check max frame size value. It returns -1 if an error occurred, the number of
* read bytes otherwise. */
static int
check_max_frame_size(struct spoe_frame *frame, char **buf, char *end)
{
char *p = *buf;
uint64_t sz;
int type, ret;
/* Get the max-frame-size value of HAProxy */
type = *p++;
if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64)
return -1;
if (decode_varint(&p, end, &sz) == -1)
return -1;
/* Keep the lower value */
if (sz < frame->client->max_frame_size)
frame->client->max_frame_size = sz;
DEBUG(frame->worker, "<%lu> HAProxy maximum frame size : %u",
frame->client->id, (unsigned int)sz);
ret = (p - *buf);
*buf = p;
return ret;
}
/* Check healthcheck value. It returns -1 if an error occurred, the number of
* read bytes otherwise. */
static int
check_healthcheck(struct spoe_frame *frame, char **buf, char *end)
{
char *p = *buf;
int type, ret;
/* Get the "healthcheck" value */
type = *p++;
if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL)
return -1;
frame->hcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
DEBUG(frame->worker, "<%lu> HELLO healthcheck : %s",
frame->client->id, (frame->hcheck ? "true" : "false"));
ret = (p - *buf);
*buf = p;
return ret;
}
/* Check capabilities value. It returns -1 if an error occurred, the number of
* read bytes otherwise. */
static int
check_capabilities(struct spoe_frame *frame, char **buf, char *end)
{
struct client *client = frame->client;
char *str, *p = *buf;
uint64_t sz;
int ret;
if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
return -1;
if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
return -1;
if (str == NULL) /* this is not an error */
goto end;
DEBUG(frame->worker, "<%lu> HAProxy capabilities : %.*s",
client->id, (int)sz, str);
while (sz) {
char *delim;
/* Skip leading spaces */
for (; isspace(*str) && sz; sz--);
if (sz >= 10 && !strncmp(str, "pipelining", 10)) {
str += 10; sz -= 10;
if (!sz || isspace(*str) || *str == ',') {
DEBUG(frame->worker,
"<%lu> HAProxy supports frame pipelining",
client->id);
client->pipelining = true;
}
}
else if (sz >= 5 && !strncmp(str, "async", 5)) {
str += 5; sz -= 5;
if (!sz || isspace(*str) || *str == ',') {
DEBUG(frame->worker,
"<%lu> HAProxy supports asynchronous frame",
client->id);
client->async = true;
}
}
else if (sz >= 13 && !strncmp(str, "fragmentation", 13)) {
str += 13; sz -= 13;
if (!sz || isspace(*str) || *str == ',') {
DEBUG(frame->worker,
"<%lu> HAProxy supports fragmented frame",
client->id);
client->fragmentation = true;
}
}
if (!sz || (delim = memchr(str, ',', sz)) == NULL)
break;
delim++;
sz -= (delim - str);
str = delim;
}
end:
ret = (p - *buf);
*buf = p;
return ret;
}
/* Check engine-id value. It returns -1 if an error occurred, the number of
* read bytes otherwise. */
static int
check_engine_id(struct spoe_frame *frame, char **buf, char *end)
{
struct client *client = frame->client;
char *str, *p = *buf;
uint64_t sz;
int ret;
if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
return -1;
if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
return -1;
if (str == NULL) /* this is not an error */
goto end;
if (client->engine != NULL)
goto end;
DEBUG(frame->worker, "<%lu> HAProxy engine id : %.*s",
client->id, (int)sz, str);
client->engine_id = strndup(str, (int)sz);
end:
ret = (p - *buf);
*buf = p;
return ret;
}
static int
acc_payload(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *buf;
size_t len = frame->len - frame->offset;
int ret = frame->offset;
/* No need to accumulation payload */
if (frame->fragmented == false)
return ret;
buf = realloc(frame->frag_buf, frame->frag_len + len);
if (buf == NULL) {
client->status_code = SPOE_FRM_ERR_RES;
return -1;
}
memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
frame->frag_buf = buf;
frame->frag_len += len;
if (!(frame->flags & SPOE_FRM_FL_FIN)) {
/* Wait for next parts */
frame->buf = (char *)(frame->data);
frame->offset = 0;
frame->len = 0;
frame->flags = 0;
return 1;
}
frame->buf = frame->frag_buf;
frame->len = frame->frag_len;
frame->offset = 0;
return ret;
}
/* Check disconnect status code. It returns -1 if an error occurred, the number
* of read bytes otherwise. */
static int
check_discon_status_code(struct spoe_frame *frame, char **buf, char *end)
{
char *p = *buf;
uint64_t sz;
int type, ret;
/* Get the "status-code" value */
type = *p++;
if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
(type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64)
return -1;
if (decode_varint(&p, end, &sz) == -1)
return -1;
frame->client->status_code = (unsigned int)sz;
DEBUG(frame->worker, "<%lu> Disconnect status code : %u",
frame->client->id, frame->client->status_code);
ret = (p - *buf);
*buf = p;
return ret;
}
/* Check the disconnect message. It returns -1 if an error occurred, the number
* of read bytes otherwise. */
static int
check_discon_message(struct spoe_frame *frame, char **buf, char *end)
{
char *str, *p = *buf;
uint64_t sz;
int ret;
/* Get the "message" value */
if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
return -1;
ret = spoe_decode_buffer(&p, end, &str, &sz);
if (ret == -1 || !str)
return -1;
DEBUG(frame->worker, "<%lu> Disconnect message : %.*s",
frame->client->id, (int)sz, str);
ret = (p - *buf);
*buf = p;
return ret;
}
/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
* occurred, otherwise the number of read bytes. HELLO frame cannot be
* ignored and having another frame than a HELLO frame is an error. */
static int
handle_hahello(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *p, *end;
p = frame->buf;
end = frame->buf + frame->len;
/* Check frame type: we really want a HELLO frame */
if (*p++ != SPOE_FRM_T_HAPROXY_HELLO)
goto error;
DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id);
/* Retrieve flags */
memcpy((char *)&(frame->flags), p, 4);
frame->flags = ntohl(frame->flags);
p += 4;
/* Fragmentation is not supported for HELLO frame */
if (!(frame->flags & SPOE_FRM_FL_FIN)) {
client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
goto error;
}
/* stream-id and frame-id must be cleared */
if (*p != 0 || *(p+1) != 0) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
p += 2;
/* Loop on K/V items */
while (p < end) {
char *str;
uint64_t sz;
/* Decode the item name */
spoe_decode_buffer(&p, end, &str, &sz);
if (!str) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
/* Check "supported-versions" K/V item */
if (!memcmp(str, "supported-versions", sz)) {
if (check_proto_version(frame, &p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
/* Check "max-frame-size" K/V item */
else if (!memcmp(str, "max-frame-size", sz)) {
if (check_max_frame_size(frame, &p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
/* Check "healthcheck" K/V item */
else if (!memcmp(str, "healthcheck", sz)) {
if (check_healthcheck(frame, &p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
/* Check "capabilities" K/V item */
else if (!memcmp(str, "capabilities", sz)) {
if (check_capabilities(frame, &p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
/* Check "engine-id" K/V item */
else if (!memcmp(str, "engine-id", sz)) {
if (check_engine_id(frame, &p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
else {
DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s",
client->id, (int)sz, str);
/* Silently ignore unknown item */
if (spoe_skip_data(&p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
}
if (async == false || client->engine_id == NULL)
client->async = false;
if (pipelining == false)
client->pipelining = false;
if (client->async == true)
use_spoe_engine(client);
return (p - frame->buf);
error:
return -1;
}
/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
* occurred, otherwise the number of read bytes. DISCONNECT frame cannot be
* ignored and having another frame than a DISCONNECT frame is an error.*/
static int
handle_hadiscon(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *p, *end;
p = frame->buf;
end = frame->buf + frame->len;
/* Check frame type: we really want a DISCONNECT frame */
if (*p++ != SPOE_FRM_T_HAPROXY_DISCON)
goto error;
DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id);
/* Retrieve flags */
memcpy((char *)&(frame->flags), p, 4);
frame->flags = ntohl(frame->flags);
p += 4;
/* Fragmentation is not supported for DISCONNECT frame */
if (!(frame->flags & SPOE_FRM_FL_FIN)) {
client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
goto error;
}
/* stream-id and frame-id must be cleared */
if (*p != 0 || *(p+1) != 0) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
p += 2;
client->status_code = SPOE_FRM_ERR_NONE;
/* Loop on K/V items */
while (p < end) {
char *str;
uint64_t sz;
/* Decode item key */
spoe_decode_buffer(&p, end, &str, &sz);
if (!str) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
/* Check "status-code" K/V item */
if (!memcmp(str, "status-code", sz)) {
if (check_discon_status_code(frame, &p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
/* Check "message" K/V item */
else if (!memcmp(str, "message", sz)) {
if (check_discon_message(frame, &p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
else {
DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s",
client->id, (int)sz, str);
/* Silently ignore unknown item */
if (spoe_skip_data(&p, end) == -1) {
client->status_code = SPOE_FRM_ERR_INVALID;
goto error;
}
}
}
return (p - frame->buf);
error:
return -1;
}
/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
* occurred, 0 if it must be must be ignored, otherwise the number of read
* bytes. */
static int
handle_hanotify(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *p, *end;
uint64_t stream_id, frame_id;
p = frame->buf;
end = frame->buf + frame->len;
/* Check frame type */
if (*p++ != SPOE_FRM_T_HAPROXY_NOTIFY)
goto ignore;
DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id);
/* Retrieve flags */
memcpy((char *)&(frame->flags), p, 4);
frame->flags = ntohl(frame->flags);
p += 4;
/* Fragmentation is not supported */
if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
goto error;
}
/* Read the stream-id and frame-id */
if (decode_varint(&p, end, &stream_id) == -1)
goto ignore;
if (decode_varint(&p, end, &frame_id) == -1)
goto ignore;
frame->stream_id = (unsigned int)stream_id;
frame->frame_id = (unsigned int)frame_id;
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
" - %s frame received"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
(frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
frame->frag_len, frame->len, p - frame->buf);
frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
frame->offset = (p - frame->buf);
return acc_payload(frame);
ignore:
return 0;
error:
return -1;
}
/* Decode next part of a fragmented frame received from HAProxy. It returns -1
* if an error occurred, 0 if it must be must be ignored, otherwise the number
* of read bytes. */
static int
handle_hafrag(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *p, *end;
uint64_t stream_id, frame_id;
p = frame->buf;
end = frame->buf + frame->len;
/* Check frame type */
if (*p++ != SPOE_FRM_T_UNSET)
goto ignore;
DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id);
/* Fragmentation is not supported */
if (fragmentation == false) {
client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
goto error;
}
/* Retrieve flags */
memcpy((char *)&(frame->flags), p, 4);
frame->flags = ntohl(frame->flags);
p+= 4;
/* Read the stream-id and frame-id */
if (decode_varint(&p, end, &stream_id) == -1)
goto ignore;
if (decode_varint(&p, end, &frame_id) == -1)
goto ignore;
if (frame->fragmented == false ||
frame->stream_id != (unsigned int)stream_id ||
frame->frame_id != (unsigned int)frame_id) {
client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
goto error;
}
if (frame->flags & SPOE_FRM_FL_ABRT) {
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
" - Abort processing of a fragmented frame"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
frame->frag_len, frame->len, p - frame->buf);
goto ignore;
}
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
" - %s fragment of a fragmented frame received"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
(frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
frame->frag_len, frame->len, p - frame->buf);
frame->offset = (p - frame->buf);
return acc_payload(frame);
ignore:
return 0;
error:
return -1;
}
/* Encode a HELLO frame to send it to HAProxy. It returns the number of written
* bytes. */
static int
prepare_agenthello(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *p, *end;
char capabilities[64];
int n;
unsigned int flags = SPOE_FRM_FL_FIN;
DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id);
frame->type = SPOA_FRM_T_AGENT;
p = frame->buf;
end = frame->buf+max_frame_size;
/* Frame Type */
*p++ = SPOE_FRM_T_AGENT_HELLO;
/* Set flags */
flags = htonl(flags);
memcpy(p, (char *)&flags, 4);
p += 4;
/* No stream-id and frame-id for HELLO frames */
*p++ = 0;
*p++ = 0;
/* "version" K/V item */
spoe_encode_buffer("version", 7, &p, end);
*p++ = SPOE_DATA_T_STR;
spoe_encode_buffer(SPOP_VERSION, SLEN(SPOP_VERSION), &p, end);
DEBUG(frame->worker, "<%lu> Agent version : %s",
client->id, SPOP_VERSION);
/* "max-frame-size" K/V item */
spoe_encode_buffer("max-frame-size", 14, &p ,end);
*p++ = SPOE_DATA_T_UINT32;
encode_varint(client->max_frame_size, &p, end);
DEBUG(frame->worker, "<%lu> Agent maximum frame size : %u",
client->id, client->max_frame_size);
/* "capabilities" K/V item */
spoe_encode_buffer("capabilities", 12, &p, end);
*p++ = SPOE_DATA_T_STR;
memset(capabilities, 0, sizeof(capabilities));
n = 0;
/* 1. Fragmentation capability ? */
if (fragmentation == true) {
memcpy(capabilities, "fragmentation", 13);
n += 13;
}
/* 2. Pipelining capability ? */
if (client->pipelining == true) {
if (n) capabilities[n++] = ',';
memcpy(capabilities + n, "pipelining", 10);
n += 10;
}
/* 3. Async capability ? */
if (client->async == true) {
if (n) capabilities[n++] = ',';
memcpy(capabilities + n, "async", 5);
n += 5;
}
spoe_encode_buffer(capabilities, n, &p, end);
DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s",
client->id, n, capabilities);
frame->len = (p - frame->buf);
return frame->len;
}
/* Encode a DISCONNECT frame to send it to HAProxy. It returns the number of
* written bytes. */
static int
prepare_agentdicon(struct spoe_frame *frame)
{
struct client *client = frame->client;
char *p, *end;
const char *reason;
int rlen;
unsigned int flags = SPOE_FRM_FL_FIN;
DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id);
frame->type = SPOA_FRM_T_AGENT;
p = frame->buf;
end = frame->buf+max_frame_size;
if (client->status_code >= SPOE_FRM_ERRS)
client->status_code = SPOE_FRM_ERR_UNKNOWN;
reason = spoe_frm_err_reasons[client->status_code];
rlen = strlen(reason);
/* Frame type */
*p++ = SPOE_FRM_T_AGENT_DISCON;
/* Set flags */
flags = htonl(flags);
memcpy(p, (char *)&flags, 4);
p += 4;
/* No stream-id and frame-id for DISCONNECT frames */
*p++ = 0;
*p++ = 0;
/* There are 2 mandatory items: "status-code" and "message" */
/* "status-code" K/V item */
spoe_encode_buffer("status-code", 11, &p, end);
*p++ = SPOE_DATA_T_UINT32;
encode_varint(client->status_code, &p, end);
DEBUG(frame->worker, "<%lu> Disconnect status code : %u",
client->id, client->status_code);
/* "message" K/V item */
spoe_encode_buffer("message", 7, &p, end);
*p++ = SPOE_DATA_T_STR;
spoe_encode_buffer(reason, rlen, &p, end);
DEBUG(frame->worker, "<%lu> Disconnect message : %s",
client->id, reason);
frame->len = (p - frame->buf);
return frame->len;
}
/* Encode a ACK frame to send it to HAProxy. It returns the number of written
* bytes. */
static int
prepare_agentack(struct spoe_frame *frame)
{
char *p, *end;
unsigned int flags = SPOE_FRM_FL_FIN;
/* Be careful here, in async mode, frame->client can be NULL */
DEBUG(frame->worker, "Encode Agent ACK frame");
frame->type = SPOA_FRM_T_AGENT;
p = frame->buf;
end = frame->buf+max_frame_size;
/* Frame type */
*p++ = SPOE_FRM_T_AGENT_ACK;
/* Set flags */
flags = htonl(flags);
memcpy(p, (char *)&flags, 4);
p += 4;
/* Set stream-id and frame-id for ACK frames */
encode_varint(frame->stream_id, &p, end);
encode_varint(frame->frame_id, &p, end);
DEBUG(frame->worker, "STREAM-ID=%u - FRAME-ID=%u",
frame->stream_id, frame->frame_id);
frame->len = (p - frame->buf);
return frame->len;
}
static int
create_server_socket(void)
{
struct sockaddr_in listen_addr;
int fd, yes = 1;
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
LOG(&null_worker, "Failed to create service socket : %m");
return -1;
}
memset(&listen_addr, 0, sizeof(listen_addr));
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = INADDR_ANY;
listen_addr.sin_port = htons(server_port);
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0 ||
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) < 0) {
LOG(&null_worker, "Failed to set option on server socket : %m");
return -1;
}
if (bind(fd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)) < 0) {
LOG(&null_worker, "Failed to bind server socket : %m");
return -1;
}
if (listen(fd, CONNECTION_BACKLOG) < 0) {
LOG(&null_worker, "Failed to listen on server socket : %m");
return -1;
}
return fd;
}
static void
release_frame(struct spoe_frame *frame)
{
struct worker *worker;
if (frame == NULL)
return;
if (event_pending(&frame->process_frame_event, EV_TIMEOUT, NULL))
event_del(&frame->process_frame_event);
worker = frame->worker;
LIST_DEL(&frame->list);
if (frame->frag_buf)
free(frame->frag_buf);
memset(frame, 0, sizeof(*frame)+max_frame_size+4);
LIST_ADDQ(&worker->frames, &frame->list);
}
static void
release_client(struct client *c)
{
struct spoe_frame *frame, *back;
if (c == NULL)
return;
DEBUG(c->worker, "<%lu> Release client", c->id);
LIST_DEL(&c->by_worker);
c->worker->nbclients--;
unuse_spoe_engine(c);
free(c->engine_id);
if (event_pending(&c->read_frame_event, EV_READ, NULL))
event_del(&c->read_frame_event);
if (event_pending(&c->write_frame_event, EV_WRITE, NULL))
event_del(&c->write_frame_event);
release_frame(c->incoming_frame);
release_frame(c->outgoing_frame);
list_for_each_entry_safe(frame, back, &c->processing_frames, list) {
release_frame(frame);
}
list_for_each_entry_safe(frame, back, &c->outgoing_frames, list) {
release_frame(frame);
}
if (c->fd >= 0)
close(c->fd);
free(c);
}
static void
reset_frame(struct spoe_frame *frame)
{
if (frame == NULL)
return;
if (frame->frag_buf)
free(frame->frag_buf);
frame->type = SPOA_FRM_T_UNKNOWN;
frame->buf = (char *)(frame->data);
frame->offset = 0;
frame->len = 0;
frame->stream_id = 0;
frame->frame_id = 0;
frame->flags = 0;
frame->hcheck = false;
frame->fragmented = false;
frame->ip_score = -1;
frame->frag_buf = NULL;
frame->frag_len = 0;
LIST_INIT(&frame->list);
}
static void
use_spoe_engine(struct client *client)
{
struct spoe_engine *eng;
if (client->engine_id == NULL)
return;
list_for_each_entry(eng, &client->worker->engines, list) {
if (!strcmp(eng->id, client->engine_id))
goto end;
}
if ((eng = malloc(sizeof(*eng))) == NULL) {
client->async = false;
return;
}
eng->id = strdup(client->engine_id);
LIST_INIT(&eng->clients);
LIST_INIT(&eng->processing_frames);
LIST_INIT(&eng->outgoing_frames);
LIST_ADDQ(&client->worker->engines, &eng->list);
LOG(client->worker, "Add new SPOE engine '%s'", eng->id);
end:
client->engine = eng;
LIST_ADDQ(&eng->clients, &client->by_engine);
}
static void
unuse_spoe_engine(struct client *client)
{
struct spoe_engine *eng;
struct spoe_frame *frame, *back;
if (client == NULL || client->engine == NULL)
return;
eng = client->engine;
client->engine = NULL;
LIST_DEL(&client->by_engine);
if (!LIST_ISEMPTY(&eng->clients))
return;
LOG(client->worker, "Remove SPOE engine '%s'", eng->id);
LIST_DEL(&eng->list);
list_for_each_entry_safe(frame, back, &eng->processing_frames, list) {
release_frame(frame);
}
list_for_each_entry_safe(frame, back, &eng->outgoing_frames, list) {
release_frame(frame);
}
free(eng->id);
free(eng);
}
static struct spoe_frame *
acquire_incoming_frame(struct client *client)
{
struct spoe_frame *frame;
frame = client->incoming_frame;
if (frame != NULL)
return frame;
if (LIST_ISEMPTY(&client->worker->frames)) {
if ((frame = calloc(1, sizeof(*frame)+max_frame_size+4)) == NULL) {
LOG(client->worker, "Failed to allocate new frame : %m");
return NULL;
}
}
else {
frame = LIST_NEXT(&client->worker->frames, typeof(frame), list);
LIST_DEL(&frame->list);
}
reset_frame(frame);
frame->worker = client->worker;
frame->engine = client->engine;
frame->client = client;
if (event_assign(&frame->process_frame_event, client->worker->base, -1,
EV_TIMEOUT|EV_PERSIST, process_frame_cb, frame) < 0) {
LOG(client->worker, "Failed to create frame event");
return NULL;
}
client->incoming_frame = frame;
return frame;
}
static struct spoe_frame *
acquire_outgoing_frame(struct client *client)
{
struct spoe_engine *engine = client->engine;
struct spoe_frame *frame = NULL;
if (client->outgoing_frame != NULL)
frame = client->outgoing_frame;
else if (!LIST_ISEMPTY(&client->outgoing_frames)) {
frame = LIST_NEXT(&client->outgoing_frames, typeof(frame), list);
LIST_DEL(&frame->list);
client->outgoing_frame = frame;
}
else if (engine!= NULL && !LIST_ISEMPTY(&engine->outgoing_frames)) {
frame = LIST_NEXT(&engine->outgoing_frames, typeof(frame), list);
LIST_DEL(&frame->list);
client->outgoing_frame = frame;
}
return frame;
}
static void
write_frame(struct client *client, struct spoe_frame *frame)
{
uint32_t netint;
LIST_DEL(&frame->list);
frame->buf = (char *)(frame->data);
frame->offset = 0;
netint = htonl(frame->len);
memcpy(frame->buf, &netint, 4);
if (client != NULL) { /* HELLO or DISCONNECT frames */
event_add(&client->write_frame_event, NULL);
/* Try to process the frame as soon as possible, and always
* attach it to the client */
if (client->async || client->pipelining) {
if (client->outgoing_frame == NULL)
client->outgoing_frame = frame;
else
LIST_ADD(&client->outgoing_frames, &frame->list);
}
else {
client->outgoing_frame = frame;
event_del(&client->read_frame_event);
}
}
else { /* for all other frames */
if (frame->client == NULL) { /* async mode ! */
LIST_ADDQ(&frame->engine->outgoing_frames, &frame->list);
list_for_each_entry(client, &frame->engine->clients, by_engine)
event_add(&client->write_frame_event, NULL);
}
else if (frame->client->pipelining) {
LIST_ADDQ(&frame->client->outgoing_frames, &frame->list);
event_add(&frame->client->write_frame_event, NULL);
}
else {
frame->client->outgoing_frame = frame;
event_add(&frame->client->write_frame_event, NULL);
event_del(&frame->client->read_frame_event);
}
}
}
static void
process_incoming_frame(struct spoe_frame *frame)
{
struct client *client = frame->client;
if (event_add(&frame->process_frame_event, &processing_delay) < 0) {
LOG(client->worker, "Failed to process incoming frame");
release_frame(frame);
return;
}
if (client->async) {
frame->client = NULL;
LIST_ADDQ(&frame->engine->processing_frames, &frame->list);
}
else if (client->pipelining)
LIST_ADDQ(&client->processing_frames, &frame->list);
else
event_del(&client->read_frame_event);
}
static void
signal_cb(evutil_socket_t sig, short events, void *user_data)
{
struct event_base *base = user_data;
int i;
DEBUG(&null_worker, "Stopping the server");
event_base_loopbreak(base);
DEBUG(&null_worker, "Main event loop stopped");
for (i = 0; i < num_workers; i++) {
event_base_loopbreak(workers[i].base);
DEBUG(&null_worker, "Event loop stopped for worker %02d",
workers[i].id);
}
}
static void
worker_monitor_cb(evutil_socket_t fd, short events, void *arg)
{
struct worker *worker = arg;
LOG(worker, "%u clients connected (%u frames)", worker->nbclients, worker->nbframes);
}
static void
process_frame_cb(evutil_socket_t fd, short events, void *arg)
{
struct spoe_frame *frame = arg;
char *p, *end;
int ret;
DEBUG(frame->worker,
"Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes",
frame->stream_id, frame->frame_id, frame->len - frame->offset);
p = frame->buf + frame->offset;
end = frame->buf + frame->len;
/* Loop on messages */
while (p < end) {
char *str;
uint64_t sz;
int nbargs;
/* Decode the message name */
spoe_decode_buffer(&p, end, &str, &sz);
if (!str)
goto stop_processing;
DEBUG(frame->worker, "Process SPOE Message '%.*s'", (int)sz, str);
nbargs = (unsigned char)*p++; /* Get the number of arguments */
frame->offset = (p - frame->buf); /* Save index to handle errors and skip args */
if (!memcmp(str, "check-client-ip", sz)) {
union spoe_data data;
enum spoe_data_type type;
if (nbargs != 1)
goto skip_message;
if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
goto stop_processing;
if (spoe_decode_data(&p, end, &data, &type) == -1)
goto skip_message;
frame->worker->nbframes++;
if (type == SPOE_DATA_T_IPV4)
check_ipv4_reputation(frame, &data.ipv4);
if (type == SPOE_DATA_T_IPV6)
check_ipv6_reputation(frame, &data.ipv6);
}
else {
skip_message:
p = frame->buf + frame->offset; /* Restore index */
while (nbargs-- > 0) {
/* Silently ignore argument: its name and its value */
if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
goto stop_processing;
if (spoe_skip_data(&p, end) == -1)
goto stop_processing;
}
}
}
stop_processing:
/* Prepare agent ACK frame */
frame->buf = (char *)(frame->data) + 4;
frame->offset = 0;
frame->len = 0;
frame->flags = 0;
ret = prepare_agentack(frame);
p = frame->buf + ret;
end = frame->buf+max_frame_size;
if (frame->ip_score != -1) {
DEBUG(frame->worker, "Add action : set variable ip_scode=%u",
frame->ip_score);
*p++ = SPOE_ACT_T_SET_VAR; /* Action type */
*p++ = 3; /* Number of args */
*p++ = SPOE_SCOPE_SESS; /* Arg 1: the scope */
spoe_encode_buffer("ip_score", 8, &p, end); /* Arg 2: variable name */
*p++ = SPOE_DATA_T_UINT32;
encode_varint(frame->ip_score, &p, end); /* Arg 3: variable value */
frame->len = (p - frame->buf);
}
write_frame(NULL, frame);
}
static void
read_frame_cb(evutil_socket_t fd, short events, void *arg)
{
struct client *client = arg;
struct spoe_frame *frame;
uint32_t netint;
int n;
DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__);
if ((frame = acquire_incoming_frame(client)) == NULL)
goto close;
frame->type = SPOA_FRM_T_HAPROXY;
if (frame->buf == (char *)(frame->data)) {
/* Read the frame length: frame->buf points on length part (frame->data) */
n = read(client->fd, frame->buf+frame->offset, 4-frame->offset);
if (n <= 0) {
if (n < 0)
LOG(client->worker, "Failed to read frame length : %m");
goto close;
}
frame->offset += n;
if (frame->offset != 4)
return;
memcpy(&netint, frame->buf, 4);
frame->buf += 4;
frame->offset = 0;
frame->len = ntohl(netint);
}
/* Read the frame: frame->buf points on frame part (frame->data+4)*/
n = read(client->fd, frame->buf + frame->offset,
frame->len - frame->offset);
if (n <= 0) {
if (n < 0) {
LOG(client->worker, "Frame to read frame : %m");
goto close;
}
return;
}
frame->offset += n;
if (frame->offset != frame->len)
return;
frame->offset = 0;
DEBUG(client->worker, "<%lu> New Frame of %u bytes received",
client->id, frame->len);
switch (client->state) {
case SPOA_ST_CONNECTING:
if (handle_hahello(frame) < 0) {
LOG(client->worker, "Failed to decode HELLO frame");
goto disconnect;
}
prepare_agenthello(frame);
goto write_frame;
case SPOA_ST_PROCESSING:
if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
client->state = SPOA_ST_DISCONNECTING;
goto disconnecting;
}
if (frame->buf[0] == SPOE_FRM_T_UNSET)
n = handle_hafrag(frame);
else
n = handle_hanotify(frame);
if (n < 0) {
LOG(client->worker, "Failed to decode frame: %s",
spoe_frm_err_reasons[client->status_code]);
goto disconnect;
}
else if (n == 0) {
LOG(client->worker, "Ignore invalid/unknown/aborted frame");
goto ignore_frame;
}
else if (n == 1)
goto noop;
else
goto process_frame;
case SPOA_ST_DISCONNECTING:
disconnecting:
if (handle_hadiscon(frame) < 0) {
LOG(client->worker, "Failed to decode DISCONNECT frame");
goto disconnect;
}
if (client->status_code != SPOE_FRM_ERR_NONE)
LOG(client->worker, "<%lu> Peer closed connection: %s",
client->id, spoe_frm_err_reasons[client->status_code]);
goto disconnect;
}
noop:
return;
ignore_frame:
reset_frame(frame);
return;
process_frame:
process_incoming_frame(frame);
client->incoming_frame = NULL;
return;
write_frame:
write_frame(client, frame);
client->incoming_frame = NULL;
return;
disconnect:
client->state = SPOA_ST_DISCONNECTING;
if (prepare_agentdicon(frame) < 0) {
LOG(client->worker, "Failed to encode DISCONNECT frame");
goto close;
}
goto write_frame;
close:
release_client(client);
}
static void
write_frame_cb(evutil_socket_t fd, short events, void *arg)
{
struct client *client = arg;
struct spoe_frame *frame;
int n;
DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__);
if ((frame = acquire_outgoing_frame(client)) == NULL) {
event_del(&client->write_frame_event);
return;
}
if (frame->buf == (char *)(frame->data)) {
/* Write the frame length: frame->buf points on length part (frame->data) */
n = write(client->fd, frame->buf+frame->offset, 4-frame->offset);
if (n <= 0) {
if (n < 0)
LOG(client->worker, "Failed to write frame length : %m");
goto close;
}
frame->offset += n;
if (frame->offset != 4)
return;
frame->buf += 4;
frame->offset = 0;
}
/* Write the frame: frame->buf points on frame part (frame->data+4)*/
n = write(client->fd, frame->buf + frame->offset,
frame->len - frame->offset);
if (n <= 0) {
if (n < 0) {
LOG(client->worker, "Failed to write frame : %m");
goto close;
}
return;
}
frame->offset += n;
if (frame->offset != frame->len)
return;
DEBUG(client->worker, "<%lu> Frame of %u bytes send",
client->id, frame->len);
switch (client->state) {
case SPOA_ST_CONNECTING:
if (frame->hcheck == true) {
DEBUG(client->worker,
"<%lu> Close client after healthcheck",
client->id);
goto close;
}
client->state = SPOA_ST_PROCESSING;
break;
case SPOA_ST_PROCESSING:
break;
case SPOA_ST_DISCONNECTING:
goto close;
}
release_frame(frame);
client->outgoing_frame = NULL;
if (!client->async && !client->pipelining) {
event_del(&client->write_frame_event);
event_add(&client->read_frame_event, NULL);
}
return;
close:
release_client(client);
}
static void
accept_cb(int listener, short event, void *arg)
{
struct worker *worker;
struct client *client;
int fd;
worker = &workers[clicount++ % num_workers];
if ((fd = accept(listener, NULL, NULL)) < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK)
LOG(worker, "Failed to accept client connection : %m");
return;
}
DEBUG(&null_worker,
"<%lu> New Client connection accepted and assigned to worker %02d",
clicount, worker->id);
if (evutil_make_socket_nonblocking(fd) < 0) {
LOG(&null_worker, "Failed to set client socket to non-blocking : %m");
close(fd);
return;
}
if ((client = calloc(1, sizeof(*client))) == NULL) {
LOG(&null_worker, "Failed to allocate memory for client state : %m");
close(fd);
return;
}
client->id = clicount;
client->fd = fd;
client->worker = worker;
client->state = SPOA_ST_CONNECTING;
client->status_code = SPOE_FRM_ERR_NONE;
client->max_frame_size = max_frame_size;
client->engine = NULL;
client->pipelining = false;
client->async = false;
client->incoming_frame = NULL;
client->outgoing_frame = NULL;
LIST_INIT(&client->processing_frames);
LIST_INIT(&client->outgoing_frames);
LIST_ADDQ(&worker->clients, &client->by_worker);
worker->nbclients++;
if (event_assign(&client->read_frame_event, worker->base, fd,
EV_READ|EV_PERSIST, read_frame_cb, client) < 0 ||
event_assign(&client->write_frame_event, worker->base, fd,
EV_WRITE|EV_PERSIST, write_frame_cb, client) < 0) {
LOG(&null_worker, "Failed to create client events");
release_client(client);
return;
}
event_add(&client->read_frame_event, NULL);
}
static void *
worker_function(void *data)
{
struct client *client, *cback;
struct spoe_frame *frame, *fback;
struct worker *worker = data;
DEBUG(worker, "Worker ready to process client messages");
event_base_dispatch(worker->base);
list_for_each_entry_safe(client, cback, &worker->clients, by_worker) {
release_client(client);
}
list_for_each_entry_safe(frame, fback, &worker->frames, list) {
LIST_DEL(&frame->list);
free(frame);
}
event_free(worker->monitor_event);
event_base_free(worker->base);
DEBUG(worker, "Worker is stopped");
pthread_exit(&null_worker);
}
static int
parse_processing_delay(const char *str)
{
unsigned long value;
value = 0;
while (1) {
unsigned int j;
j = *str - '0';
if (j > 9)
break;
str++;
value *= 10;
value += j;
}
switch (*str) {
case '\0': /* no unit = millisecond */
value *= 1000;
break;
case 's': /* second */
value *= 1000000;
str++;
break;
case 'm': /* millisecond : "ms" */
if (str[1] != 's')
return -1;
value *= 1000;
str += 2;
break;
case 'u': /* microsecond : "us" */
if (str[1] != 's')
return -1;
str += 2;
break;
default:
return -1;
}
if (*str)
return -1;
processing_delay.tv_sec = (time_t)(value / 1000000);
processing_delay.tv_usec = (suseconds_t)(value % 1000000);
return 0;
}
static void
usage(char *prog)
{
fprintf(stderr,
"Usage : %s [OPTION]...\n"
" -h Print this message\n"
" -d Enable the debug mode\n"
" -m <max-frame-size> Specify the maximum frame size (default : %u)\n"
" -p <port> Specify the port to listen on (default : %d)\n"
" -n <num-workers> Specify the number of workers (default : %d)\n"
" -c <capability> Enable the support of the specified capability\n"
" -t <time> Set a delay to process a message (default: 0)\n"
" The value is specified in milliseconds by default,\n"
" but can be in any other unit if the number is suffixed\n"
" by a unit (us, ms, s)\n"
"\n"
" Supported capabilities: fragmentation, pipelining, async\n",
prog, MAX_FRAME_SIZE, DEFAULT_PORT, NUM_WORKERS);
}
int
main(int argc, char **argv)
{
struct event_base *base = NULL;
struct event *signal_event = NULL, *accept_event = NULL;
int opt, i, fd = -1;
// TODO: add '-t <processing-time>' option
while ((opt = getopt(argc, argv, "hdm:n:p:c:t:")) != -1) {
switch (opt) {
case 'h':
usage(argv[0]);
return EXIT_SUCCESS;
case 'd':
debug = true;
break;
case 'm':
max_frame_size = atoi(optarg);
break;
case 'n':
num_workers = atoi(optarg);
break;
case 'p':
server_port = atoi(optarg);
break;
case 'c':
if (!strcmp(optarg, "pipelining"))
pipelining = true;
else if (!strcmp(optarg, "async"))
async = true;
else if (!strcmp(optarg, "fragmentation"))
fragmentation = true;
else
fprintf(stderr, "WARNING: unsupported capability '%s'\n", optarg);
break;
case 't':
if (!parse_processing_delay(optarg))
break;
fprintf(stderr, "%s: failed to parse time '%s'.\n", argv[0], optarg);
fprintf(stderr, "Try '%s -h' for more information.\n", argv[0]);
return EXIT_FAILURE;
default:
usage(argv[0]);
return EXIT_FAILURE;
}
}
if (num_workers <= 0) {
LOG(&null_worker, "%s : Invalid number of workers '%d'\n",
argv[0], num_workers);
goto error;
}
if (server_port <= 0) {
LOG(&null_worker, "%s : Invalid port '%d'\n",
argv[0], server_port);
goto error;
}
if (evthread_use_pthreads() < 0) {
LOG(&null_worker, "No pthreads support for libevent");
goto error;
}
if ((base = event_base_new()) == NULL) {
LOG(&null_worker, "Failed to initialize libevent : %m");
goto error;
}
signal(SIGPIPE, SIG_IGN);
if ((fd = create_server_socket()) < 0) {
LOG(&null_worker, "Failed to create server socket");
goto error;
}
if (evutil_make_socket_nonblocking(fd) < 0) {
LOG(&null_worker, "Failed to set server socket to non-blocking");
goto error;
}
if ((workers = calloc(num_workers, sizeof(*workers))) == NULL) {
LOG(&null_worker, "Failed to set allocate memory for workers");
goto error;
}
for (i = 0; i < num_workers; ++i) {
struct worker *w = &workers[i];
w->id = i+1;
w->nbclients = 0;
w->nbframes = 0;
LIST_INIT(&w->engines);
LIST_INIT(&w->clients);
LIST_INIT(&w->frames);
if ((w->base = event_base_new()) == NULL) {
LOG(&null_worker,
"Failed to initialize libevent for worker %02d : %m",
w->id);
goto error;
}
w->monitor_event = event_new(w->base, fd, EV_PERSIST,
worker_monitor_cb, (void *)w);
if (w->monitor_event == NULL ||
event_add(w->monitor_event, (struct timeval[]){{5,0}}) < 0) {
LOG(&null_worker,
"Failed to create monitor event for worker %02d",
w->id);
goto error;
}
if (pthread_create(&w->thread, NULL, worker_function, (void *)w)) {
LOG(&null_worker,
"Failed to start thread for worker %02d : %m",
w->id);
}
DEBUG(&null_worker, "Worker %02d initialized", w->id);
}
accept_event = event_new(base, fd, EV_READ|EV_PERSIST, accept_cb,
(void *)base);
if (accept_event == NULL || event_add(accept_event, NULL) < 0) {
LOG(&null_worker, "Failed to create accept event : %m");
}
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
if (signal_event == NULL || event_add(signal_event, NULL) < 0) {
LOG(&null_worker, "Failed to create signal event : %m");
}
DEBUG(&null_worker,
"Server is ready"
" [fragmentation=%s - pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
(fragmentation?"true":"false"), (pipelining?"true":"false"), (async?"true":"false"),
(debug?"true":"false"), max_frame_size);
event_base_dispatch(base);
for (i = 0; i < num_workers; i++) {
struct worker *w = &workers[i];
pthread_join(w->thread, NULL);
DEBUG(&null_worker, "Worker %02d terminated", w->id);
}
free(workers);
event_free(signal_event);
event_free(accept_event);
event_base_free(base);
close(fd);
return EXIT_SUCCESS;
error:
if (workers != NULL)
free(workers);
if (signal_event != NULL)
event_free(signal_event);
if (accept_event != NULL)
event_free(accept_event);
if (base != NULL)
event_base_free(base);
if (fd != -1)
close(fd);
return EXIT_FAILURE;
}