diff --git a/.gitignore b/.gitignore index 236d125951..ecdd195be7 100644 --- a/.gitignore +++ b/.gitignore @@ -43,7 +43,6 @@ /admin/iprange/ip6range /admin/iprange/iprange /admin/systemd/haproxy.service -/contrib/spoa_example/spoa dev/base64/base64rev-gen dev/flags/flags dev/poll/poll diff --git a/MAINTAINERS b/MAINTAINERS index 878d7e3d16..ad9a8a055f 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -131,8 +131,7 @@ Files: addons/wurfl, doc/WURFL-device-detection.txt SPOE Maintainer: Christopher Faulet -Files: src/flt_spoe.c, include/haproxy/spoe*.h -Files: contrib/spoa_example, doc/SPOE.txt +Files: src/flt_spoe.c, include/haproxy/spoe*.h, doc/SPOE.txt SSL Maintainer: Emeric Brun diff --git a/contrib/spoa_example/Makefile b/contrib/spoa_example/Makefile deleted file mode 100644 index a1c1eb5336..0000000000 --- a/contrib/spoa_example/Makefile +++ /dev/null @@ -1,25 +0,0 @@ -DESTDIR = -PREFIX = /usr/local -BINDIR = $(PREFIX)/bin - -CC = gcc -LD = $(CC) - -CFLAGS = -g -O2 -Wall -Werror -pthread -INCS += -I./include -LIBS = -lpthread -levent -levent_pthreads - -OBJS = spoa.o - - -spoa: $(OBJS) - $(LD) $(LDFLAGS) -o $@ $^ $(LIBS) - -install: spoa - install spoa $(DESTDIR)$(BINDIR) - -clean: - rm -f spoa $(OBJS) - -%.o: %.c - $(CC) $(CFLAGS) $(INCS) -c -o $@ $< diff --git a/contrib/spoa_example/README b/contrib/spoa_example/README deleted file mode 100644 index 7e376ee1d6..0000000000 --- a/contrib/spoa_example/README +++ /dev/null @@ -1,88 +0,0 @@ -A Random IP reputation service acting as a Stream Processing Offload Agent --------------------------------------------------------------------------- - -This is a very simple service that implement a "random" ip reputation -service. It will return random scores for all checked IP addresses. It only -shows you how to implement a ip reputation service or such kind of services -using the SPOE. - - - Start the service ---------------------- - -After you have compiled it, to start the service, you just need to use "spoa" -binary: - - $> ./spoa -h - Usage: ./spoa [-h] [-d] [-p ] [-n ] - -h Print this message - -d Enable the debug mode - -p Specify the port to listen on (default: 12345) - -n Specify the number of workers (default: 5) - -Note: A worker is a thread. - - - Configure a SPOE to use the service ---------------------------------------- - -All information about SPOE configuration can be found in "doc/SPOE.txt". Here is -the configuration template to use for your SPOE: - - [ip-reputation] - - spoe-agent iprep-agent - messages check-client-ip - - option var-prefix iprep - - timeout hello 100ms - timeout idle 30s - timeout processing 15ms - - use-backend iprep-backend - - spoe-message check-client-ip - args src - event on-client-session - - -The engine is in the scope "ip-reputation". So to enable it, you must set the -following line in a frontend/listener section: - - frontend my-front - ... - filter spoe engine ip-reputation config /path/spoe-ip-reputation.conf - .... - -where "/path/spoe-ip-reputation.conf" is the path to your SPOE configuration -file. The engine name is important here, it must be the same than the one used -in the SPOE configuration file. - -IMPORTANT NOTE: - Because we want to send a message on the "on-client-session" event, this - SPOE must be attached to a proxy with the frontend capability. If it is - declared in a backend section, it will have no effet. - - -Because, in SPOE configuration file, we declare to use the backend -"iprep-backend" to communicate with the service, you must define it in HAProxy -configuration. For example: - - backend iprep-backend - mode tcp - timeout server 1m - server iprep-srv 127.0.0.1:12345 check maxconn 5 - - -In reply to the "check-client-ip" message, this service will set the variable -"ip_score" for the session, an integer between 0 and 100. If unchanged, the -variable prefix is "iprep". So the full variable name will be -"sess.iprep.ip_score". - -You can use it in ACLs to experiment the SPOE feature. For example: - - tcp-request content reject if { var(sess.iprep.ip_score) -m int lt 20 } - -With this rule, all IP address with a score lower than 20 will be rejected -(Remember, this score is random). diff --git a/contrib/spoa_example/include/mini-clist.h b/contrib/spoa_example/include/mini-clist.h deleted file mode 100644 index bb0d84b053..0000000000 --- a/contrib/spoa_example/include/mini-clist.h +++ /dev/null @@ -1,95 +0,0 @@ -#ifndef _COMMON_MINI_CLIST_H -#define _COMMON_MINI_CLIST_H - -/* these are circular or bidirectionnal lists only. Each list pointer points to - * another list pointer in a structure, and not the structure itself. The - * pointer to the next element MUST be the first one so that the list is easily - * cast as a single linked list or pointer. - */ -struct list { - struct list *n; /* next */ - struct list *p; /* prev */ -}; - -/* First undefine some macros which happen to also be defined on OpenBSD, - * in sys/queue.h, used by sys/event.h - */ -#undef LIST_HEAD -#undef LIST_INIT -#undef LIST_NEXT - -/* ILH = Initialized List Head : used to prevent gcc from moving an empty - * list to BSS. Some older version tend to trim all the array and cause - * corruption. - */ -#define ILH { .n = (struct list *)1, .p = (struct list *)2 } - -#define LIST_HEAD(a) ((void *)(&(a))) - -#define LIST_INIT(l) ((l)->n = (l)->p = (l)) - -#define LIST_HEAD_INIT(l) { &l, &l } - -/* adds an element at the beginning of a list ; returns the element */ -#define LIST_INSERT(lh, el) ({ (el)->n = (lh)->n; (el)->n->p = (lh)->n = (el); (el)->p = (lh); (el); }) - -/* adds an element at the end of a list ; returns the element */ -#define LIST_APPEND(lh, el) ({ (el)->p = (lh)->p; (el)->p->n = (lh)->p = (el); (el)->n = (lh); (el); }) - -/* removes an element from a list and returns it */ -#define LIST_DELETE(el) ({ typeof(el) __ret = (el); (el)->n->p = (el)->p; (el)->p->n = (el)->n; (__ret); }) - -/* returns a pointer of type to a structure containing a list head called - * at address . Note that can be the result of a function or macro - * since it's used only once. - * Example: LIST_ELEM(cur_node->args.next, struct node *, args) - */ -#define LIST_ELEM(lh, pt, el) ((pt)(((const char *)(lh)) - ((size_t)&((pt)NULL)->el))) - -/* checks if the list head is empty or not */ -#define LIST_ISEMPTY(lh) ((lh)->n == (lh)) - -/* returns a pointer of type to a structure following the element - * which contains list head , which is known as element in - * struct pt. - * Example: LIST_NEXT(args, struct node *, list) - */ -#define LIST_NEXT(lh, pt, el) (LIST_ELEM((lh)->n, pt, el)) - - -/* returns a pointer of type to a structure preceding the element - * which contains list head , which is known as element in - * struct pt. - */ -#undef LIST_PREV -#define LIST_PREV(lh, pt, el) (LIST_ELEM((lh)->p, pt, el)) - -/* - * Simpler FOREACH_ITEM macro inspired from Linux sources. - * Iterates through a list of items of type "typeof(*item)" which are - * linked via a "struct list" member named . A pointer to the head of - * the list is passed in . No temporary variable is needed. Note - * that must not be modified during the loop. - * Example: list_for_each_entry(cur_acl, known_acl, list) { ... }; - */ -#define list_for_each_entry(item, list_head, member) \ - for (item = LIST_ELEM((list_head)->n, typeof(item), member); \ - &item->member != (list_head); \ - item = LIST_ELEM(item->member.n, typeof(item), member)) - -/* - * Simpler FOREACH_ITEM_SAFE macro inspired from Linux sources. - * Iterates through a list of items of type "typeof(*item)" which are - * linked via a "struct list" member named . A pointer to the head of - * the list is passed in . A temporary variable of same type - * as is needed so that may safely be deleted if needed. - * Example: list_for_each_entry_safe(cur_acl, tmp, known_acl, list) { ... }; - */ -#define list_for_each_entry_safe(item, back, list_head, member) \ - for (item = LIST_ELEM((list_head)->n, typeof(item), member), \ - back = LIST_ELEM(item->member.n, typeof(item), member); \ - &item->member != (list_head); \ - item = back, back = LIST_ELEM(back->member.n, typeof(back), member)) - - -#endif /* _COMMON_MINI_CLIST_H */ diff --git a/contrib/spoa_example/include/spoe_types.h b/contrib/spoa_example/include/spoe_types.h deleted file mode 100644 index a986227cbe..0000000000 --- a/contrib/spoa_example/include/spoe_types.h +++ /dev/null @@ -1,223 +0,0 @@ -/* - * include/spoe_types.h - * Macros, variables and structures for the SPOE filter. - * - * Copyright (C) 2017 HAProxy Technologies, Christopher Faulet - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation, version 2.1 - * exclusively. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - */ - -#ifndef _SPOE_TYPES_H -#define _SPOE_TYPES_H - -#include - -// Taken from HAProxy's defaults.h -/* Maximum host name length */ -#ifndef MAX_HOSTNAME_LEN -#if MAXHOSTNAMELEN -#define MAX_HOSTNAME_LEN MAXHOSTNAMELEN -#else -#define MAX_HOSTNAME_LEN 64 -#endif // MAXHOSTNAMELEN -#endif // MAX_HOSTNAME_LEN - -/* Flags set on the SPOE agent */ -#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */ -#define SPOE_FL_PIPELINING 0x00000002 /* Set when SPOE agent supports pipelining (set by default) */ -#define SPOE_FL_ASYNC 0x00000004 /* Set when SPOE agent supports async (set by default) */ -#define SPOE_FL_SND_FRAGMENTATION 0x00000008 /* Set when SPOE agent supports sending fragmented payload */ -#define SPOE_FL_RCV_FRAGMENTATION 0x00000010 /* Set when SPOE agent supports receiving fragmented payload */ - -/* Flags set on the SPOE context */ -#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */ -#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */ -#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */ -#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */ -#define SPOE_CTX_FL_FRAGMENTED 0x00000010 /* Set when a fragmented frame is processing */ - -#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS) - -/* Flags set on the SPOE applet */ -#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */ -#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */ -#define SPOE_APPCTX_FL_FRAGMENTATION 0x00000004 /* Set if fragmentation is supported */ -#define SPOE_APPCTX_FL_PERSIST 0x00000008 /* Set if the applet is persistent */ - -#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */ -#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */ - -/* Flags set on the SPOE frame */ -#define SPOE_FRM_FL_FIN 0x00000001 -#define SPOE_FRM_FL_ABRT 0x00000002 - -/* All supported SPOE actions */ -enum spoe_action_type { - SPOE_ACT_T_SET_VAR = 1, - SPOE_ACT_T_UNSET_VAR, - SPOE_ACT_TYPES, -}; - -/* All supported SPOE events */ -enum spoe_event { - SPOE_EV_NONE = 0, - - /* Request events */ - SPOE_EV_ON_CLIENT_SESS = 1, - SPOE_EV_ON_TCP_REQ_FE, - SPOE_EV_ON_TCP_REQ_BE, - SPOE_EV_ON_HTTP_REQ_FE, - SPOE_EV_ON_HTTP_REQ_BE, - - /* Response events */ - SPOE_EV_ON_SERVER_SESS, - SPOE_EV_ON_TCP_RSP, - SPOE_EV_ON_HTTP_RSP, - - SPOE_EV_EVENTS -}; - -/* Errors triggered by streams */ -enum spoe_context_error { - SPOE_CTX_ERR_NONE = 0, - SPOE_CTX_ERR_TOUT, - SPOE_CTX_ERR_RES, - SPOE_CTX_ERR_TOO_BIG, - SPOE_CTX_ERR_FRAG_FRAME_ABRT, - SPOE_CTX_ERR_UNKNOWN = 255, - SPOE_CTX_ERRS, -}; - -/* Errors triggered by SPOE applet */ -enum spoe_frame_error { - SPOE_FRM_ERR_NONE = 0, - SPOE_FRM_ERR_IO, - SPOE_FRM_ERR_TOUT, - SPOE_FRM_ERR_TOO_BIG, - SPOE_FRM_ERR_INVALID, - SPOE_FRM_ERR_NO_VSN, - SPOE_FRM_ERR_NO_FRAME_SIZE, - SPOE_FRM_ERR_NO_CAP, - SPOE_FRM_ERR_BAD_VSN, - SPOE_FRM_ERR_BAD_FRAME_SIZE, - SPOE_FRM_ERR_FRAG_NOT_SUPPORTED, - SPOE_FRM_ERR_INTERLACED_FRAMES, - SPOE_FRM_ERR_FRAMEID_NOTFOUND, - SPOE_FRM_ERR_RES, - SPOE_FRM_ERR_UNKNOWN = 99, - SPOE_FRM_ERRS, -}; - -/* Scopes used for variables set by agents. It is a way to be agnotic to vars - * scope. */ -enum spoe_vars_scope { - SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */ - SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */ - SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */ - SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */ - SPOE_SCOPE_RES, /* <=> SCOPE_RES */ -}; - - -/* Describe an argument that will be linked to a message. It is a sample fetch, - * with an optional name. */ -struct spoe_arg { - char *name; /* Name of the argument, may be NULL */ - unsigned int name_len; /* The name length, 0 if NULL */ - struct sample_expr *expr; /* Sample expression */ - struct list list; /* Used to chain SPOE args */ -}; - -/* Used during the config parsing only because, when a SPOE agent section is - * parsed, messages can be undefined. */ -struct spoe_msg_placeholder { - char *id; /* SPOE message placeholder id */ - struct list list; /* Use to chain SPOE message placeholders */ -}; - -/* Describe a message that will be sent in a NOTIFY frame. A message has a name, - * an argument list (see above) and it is linked to a specific event. */ -struct spoe_message { - char *id; /* SPOE message id */ - unsigned int id_len; /* The message id length */ - struct spoe_agent *agent; /* SPOE agent owning this SPOE message */ - struct { - char *file; /* file where the SPOE message appears */ - int line; /* line where the SPOE message appears */ - } conf; /* config information */ - unsigned int nargs; /* # of arguments */ - struct list args; /* Arguments added when the SPOE messages is sent */ - struct list list; /* Used to chain SPOE messages */ - - enum spoe_event event; /* SPOE_EV_* */ -}; - -enum spoe_frame_type { - SPOE_FRM_T_UNSET = 0, - - /* Frames sent by HAProxy */ - SPOE_FRM_T_HAPROXY_HELLO = 1, - SPOE_FRM_T_HAPROXY_DISCON, - SPOE_FRM_T_HAPROXY_NOTIFY, - - /* Frames sent by the agents */ - SPOE_FRM_T_AGENT_HELLO = 101, - SPOE_FRM_T_AGENT_DISCON, - SPOE_FRM_T_AGENT_ACK -}; - -/* All supported data types */ -enum spoe_data_type { - SPOE_DATA_T_NULL = 0, - SPOE_DATA_T_BOOL, - SPOE_DATA_T_INT32, - SPOE_DATA_T_UINT32, - SPOE_DATA_T_INT64, - SPOE_DATA_T_UINT64, - SPOE_DATA_T_IPV4, - SPOE_DATA_T_IPV6, - SPOE_DATA_T_STR, - SPOE_DATA_T_BIN, - SPOE_DATA_TYPES -}; - -/* a memory block of arbitrary size, or a string */ -struct chunk { - char *ptr; - size_t len; -}; - -/* all data types that may be encoded/decoded for each spoe_data_type */ -union spoe_data { - bool boolean; - int32_t int32; - uint32_t uint32; - int64_t int64; - uint64_t uint64; - struct in_addr ipv4; - struct in6_addr ipv6; - struct chunk chk; /* types STR and BIN */ -}; - -/* Masks to get data type or flags value */ -#define SPOE_DATA_T_MASK 0x0F -#define SPOE_DATA_FL_MASK 0xF0 - -/* Flags to set Boolean values */ -#define SPOE_DATA_FL_FALSE 0x00 -#define SPOE_DATA_FL_TRUE 0x10 - - -#endif /* _TYPES_SPOE_H */ diff --git a/contrib/spoa_example/include/spop_functions.h b/contrib/spoa_example/include/spop_functions.h deleted file mode 100644 index 669038e6a1..0000000000 --- a/contrib/spoa_example/include/spop_functions.h +++ /dev/null @@ -1,430 +0,0 @@ -#ifndef _SPOP_FUNCTIONS_H -#define _SPOP_FUNCTIONS_H - -#include -#include -#include - - -#ifndef MIN -#define MIN(a, b) (((a) < (b)) ? (a) : (b)) -#endif - -#ifndef MAX -#define MAX(a, b) (((a) > (b)) ? (a) : (b)) -#endif - - -/* Encode the integer into a varint (variable-length integer). The encoded - * value is copied in <*buf>. Here is the encoding format: - * - * 0 <= X < 240 : 1 byte (7.875 bits) [ XXXX XXXX ] - * 240 <= X < 2288 : 2 bytes (11 bits) [ 1111 XXXX ] [ 0XXX XXXX ] - * 2288 <= X < 264432 : 3 bytes (18 bits) [ 1111 XXXX ] [ 1XXX XXXX ] [ 0XXX XXXX ] - * 264432 <= X < 33818864 : 4 bytes (25 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*2 [ 0XXX XXXX ] - * 33818864 <= X < 4328786160 : 5 bytes (32 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*3 [ 0XXX XXXX ] - * ... - * - * On success, it returns the number of written bytes and <*buf> is moved after - * the encoded value. Otherwise, it returns -1. */ -static inline int -encode_varint(uint64_t i, char **buf, char *end) -{ - unsigned char *p = (unsigned char *)*buf; - int r; - - if (p >= (unsigned char *)end) - return -1; - - if (i < 240) { - *p++ = i; - *buf = (char *)p; - return 1; - } - - *p++ = (unsigned char)i | 240; - i = (i - 240) >> 4; - while (i >= 128) { - if (p >= (unsigned char *)end) - return -1; - *p++ = (unsigned char)i | 128; - i = (i - 128) >> 7; - } - - if (p >= (unsigned char *)end) - return -1; - *p++ = (unsigned char)i; - - r = ((char *)p - *buf); - *buf = (char *)p; - return r; -} - -/* Decode a varint from <*buf> and save the decoded value in <*i>. See - * 'spoe_encode_varint' for details about varint. - * On success, it returns the number of read bytes and <*buf> is moved after the - * varint. Otherwise, it returns -1. */ -static inline int -decode_varint(char **buf, char *end, uint64_t *i) -{ - unsigned char *p = (unsigned char *)*buf; - int r; - - if (p >= (unsigned char *)end) - return -1; - - *i = *p++; - if (*i < 240) { - *buf = (char *)p; - return 1; - } - - r = 4; - do { - if (p >= (unsigned char *)end) - return -1; - *i += (uint64_t)*p << r; - r += 7; - } while (*p++ >= 128); - - r = ((char *)p - *buf); - *buf = (char *)p; - return r; -} - -/* Encode a buffer. Its length is encoded as a varint, followed by a copy - * of . It must have enough space in <*buf> to encode the buffer, else an - * error is triggered. - * On success, it returns and <*buf> is moved after the encoded value. If - * an error occurred, it returns -1. */ -static inline int -spoe_encode_buffer(const char *str, size_t len, char **buf, char *end) -{ - char *p = *buf; - int ret; - - if (p >= end) - return -1; - - if (!len) { - *p++ = 0; - *buf = p; - return 0; - } - - ret = encode_varint(len, &p, end); - if (ret == -1 || p + len > end) - return -1; - - memcpy(p, str, len); - *buf = p + len; - return len; -} - -/* Encode a buffer, possibly partially. It does the same thing than - * 'spoe_encode_buffer', but if there is not enough space, it does not fail. - * On success, it returns the number of copied bytes and <*buf> is moved after - * the encoded value. If an error occurred, it returns -1. */ -static inline int -spoe_encode_frag_buffer(const char *str, size_t len, char **buf, char *end) -{ - char *p = *buf; - int ret; - - if (p >= end) - return -1; - - if (!len) { - *p++ = 0; - *buf = p; - return 0; - } - - ret = encode_varint(len, &p, end); - if (ret == -1 || p >= end) - return -1; - - ret = (p+len < end) ? len : (end - p); - memcpy(p, str, ret); - *buf = p + ret; - return ret; -} - -/* Decode a buffer. The buffer length is decoded and saved in <*len>. <*str> - * points on the first byte of the buffer. - * On success, it returns the buffer length and <*buf> is moved after the - * encoded buffer. Otherwise, it returns -1. */ -static inline int -spoe_decode_buffer(char **buf, char *end, char **str, uint64_t *len) -{ - char *p = *buf; - uint64_t sz; - int ret; - - *str = NULL; - *len = 0; - - ret = decode_varint(&p, end, &sz); - if (ret == -1 || p + sz > end) - return -1; - - *str = p; - *len = sz; - *buf = p + sz; - return sz; -} - -/* Encode a typed data using value in and type . On success, it - * returns the number of copied bytes and <*buf> is moved after the encoded - * value. If an error occurred, it returns -1. - * - * If the value is too big to be encoded, depending on its type, then encoding - * failed or the value is partially encoded. Only strings and binaries can be - * partially encoded. In this case, the offset <*off> is updated to known how - * many bytes has been encoded. If <*off> is zero at the end, it means that all - * data has been encoded. */ -static inline int -spoe_encode_data(union spoe_data *data, enum spoe_data_type type, unsigned int *off, char **buf, char *end) -{ - char *p = *buf; - int ret; - - if (p >= end) - return -1; - - if (data == NULL) { - *p++ = SPOE_DATA_T_NULL; - goto end; - } - - *p++ = type; - switch (type) { - case SPOE_DATA_T_BOOL: - p[-1] |= (data->boolean ? SPOE_DATA_FL_TRUE : SPOE_DATA_FL_FALSE); - break; - - case SPOE_DATA_T_INT32: - if (encode_varint(data->int32, &p, end) == -1) - return -1; - break; - - case SPOE_DATA_T_UINT32: - if (encode_varint(data->uint32, &p, end) == -1) - return -1; - break; - - case SPOE_DATA_T_INT64: - if (encode_varint(data->int64, &p, end) == -1) - return -1; - break; - - case SPOE_DATA_T_UINT64: - if (encode_varint(data->uint64, &p, end) == -1) - return -1; - break; - - case SPOE_DATA_T_IPV4: - if (p + 4 > end) - return -1; - memcpy(p, &data->ipv4, 4); - p += 4; - break; - - case SPOE_DATA_T_IPV6: - if (p + 16 > end) - return -1; - memcpy(p, &data->ipv6, 16); - p += 16; - break; - - case SPOE_DATA_T_STR: - case SPOE_DATA_T_BIN: { - /* Here, we need to know if the sample has already been - * partially encoded. If yes, we only need to encode the - * remaining, <*off> reprensenting the number of bytes - * already encoded. */ - if (!*off) { - /* First evaluation of the sample : encode the - * type (string or binary), the buffer length - * (as a varint) and at least 1 byte of the - * buffer. */ - ret = spoe_encode_frag_buffer(data->chk.ptr, data->chk.len, &p, end); - if (ret == -1) - return -1; - } - else { - /* The sample has been fragmented, encode remaining data */ - ret = MIN(data->chk.len - *off, end - p); - memcpy(p, data->chk.ptr + *off, ret); - p += ret; - } - /* Now update <*off> */ - if (ret + *off != data->chk.len) - *off += ret; - else - *off = 0; - break; - } - /* - case SMP_T_METH: { - char *m; - size_t len; - - *p++ = SPOE_DATA_T_STR; - switch (smp->data.u.meth.meth) { - case HTTP_METH_OPTIONS: m = "OPTIONS"; len = 7; break; - case HTTP_METH_GET : m = "GET"; len = 3; break; - case HTTP_METH_HEAD : m = "HEAD"; len = 4; break; - case HTTP_METH_POST : m = "POST"; len = 4; break; - case HTTP_METH_PUT : m = "PUT"; len = 3; break; - case HTTP_METH_DELETE : m = "DELETE"; len = 6; break; - case HTTP_METH_TRACE : m = "TRACE"; len = 5; break; - case HTTP_METH_CONNECT: m = "CONNECT"; len = 7; break; - - default : - m = smp->data.u.meth.str.str; - len = smp->data.u.meth.str.len; - } - if (spoe_encode_buffer(m, len, &p, end) == -1) - return -1; - break; - } - */ - - default: - /* send type NULL for unknown types */ - p[-1] = SPOE_DATA_T_NULL; - break; - } - - end: - ret = (p - *buf); - *buf = p; - return ret; -} - -/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number - * of skipped bytes is returned and the <*buf> is moved after skipped data. - * - * A types data is composed of a type (1 byte) and corresponding data: - * - boolean: non additional data (0 bytes) - * - integers: a variable-length integer (see decode_varint) - * - ipv4: 4 bytes - * - ipv6: 16 bytes - * - binary and string: a buffer prefixed by its size, a variable-length - * integer (see spoe_decode_buffer) */ -static inline int -spoe_skip_data(char **buf, char *end) -{ - char *str, *p = *buf; - int type, ret; - uint64_t v, sz; - - if (p >= end) - return -1; - - type = *p++; - switch (type & SPOE_DATA_T_MASK) { - case SPOE_DATA_T_BOOL: - break; - case SPOE_DATA_T_INT32: - case SPOE_DATA_T_INT64: - case SPOE_DATA_T_UINT32: - case SPOE_DATA_T_UINT64: - if (decode_varint(&p, end, &v) == -1) - return -1; - break; - case SPOE_DATA_T_IPV4: - if (p+4 > end) - return -1; - p += 4; - break; - case SPOE_DATA_T_IPV6: - if (p+16 > end) - return -1; - p += 16; - break; - case SPOE_DATA_T_STR: - case SPOE_DATA_T_BIN: - /* All the buffer must be skipped */ - if (spoe_decode_buffer(&p, end, &str, &sz) == -1) - return -1; - break; - } - - ret = (p - *buf); - *buf = p; - return ret; -} - -/* Decode a typed data and fill . If an error occurred, -1 is returned, - * otherwise the number of read bytes is returned and <*buf> is moved after the - * decoded data. See spoe_skip_data for details. */ -static inline int -spoe_decode_data(char **buf, char *end, union spoe_data *data, enum spoe_data_type *type) -{ - char *str, *p = *buf; - int v, r = 0; - uint64_t sz; - - if (p >= end) - return -1; - - v = *p++; - *type = v & SPOE_DATA_T_MASK; - - switch (*type) { - case SPOE_DATA_T_BOOL: - data->boolean = ((v & SPOE_DATA_FL_MASK) == SPOE_DATA_FL_TRUE); - break; - case SPOE_DATA_T_INT32: - if (decode_varint(&p, end, &sz) == -1) - return -1; - data->int32 = sz; - break; - case SPOE_DATA_T_INT64: - if (decode_varint(&p, end, &sz) == -1) - return -1; - data->int64 = sz; - break; - case SPOE_DATA_T_UINT32: - if (decode_varint(&p, end, &sz) == -1) - return -1; - data->uint32 = sz; - break; - case SPOE_DATA_T_UINT64: - if (decode_varint(&p, end, &sz) == -1) - return -1; - data->uint64 = sz; - break; - case SPOE_DATA_T_IPV4: - if (p+4 > end) - return -1; - memcpy(&data->ipv4, p, 4); - p += 4; - break; - case SPOE_DATA_T_IPV6: - if (p+16 > end) - return -1; - memcpy(&data->ipv6, p, 16); - p += 16; - break; - case SPOE_DATA_T_STR: - case SPOE_DATA_T_BIN: - /* All the buffer must be decoded */ - if (spoe_decode_buffer(&p, end, &str, &sz) == -1) - return -1; - data->chk.ptr = str; - data->chk.len = sz; - break; - default: /* SPOE_DATA_T_NULL, unknown */ - break; - } - - r = (p - *buf); - *buf = p; - return r; -} - - -#endif diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c deleted file mode 100644 index ed22813a18..0000000000 --- a/contrib/spoa_example/spoa.c +++ /dev/null @@ -1,1904 +0,0 @@ -/* - * A Random IP reputation service acting as a Stream Processing Offload Agent - * - * This is a very simple service that implement a "random" ip reputation - * service. It will return random scores for all checked IP addresses. It only - * shows you how to implement a ip reputation service or such kind of services - * using the SPOE. - * - * Copyright 2016 HAProxy Technologies, Christopher Faulet - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version - * 2 of the License, or (at your option) any later version. - * - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include -#include -#include -#include - -#include -#include -#include - -#define DEFAULT_PORT 12345 -#define CONNECTION_BACKLOG 10 -#define NUM_WORKERS 10 -#define MAX_FRAME_SIZE 16384 -#define SPOP_VERSION "2.0" - -#define SLEN(str) (sizeof(str)-1) - -#define LOG(worker, fmt, args...) \ - do { \ - struct timeval now; \ - \ - gettimeofday(&now, NULL); \ - fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n", \ - now.tv_sec, now.tv_usec, (worker)->id, ##args); \ - } while (0) - -#define DEBUG(x...) \ - do { \ - if (debug) \ - LOG(x); \ - } while (0) - - -enum spoa_state { - SPOA_ST_CONNECTING = 0, - SPOA_ST_PROCESSING, - SPOA_ST_DISCONNECTING, -}; - -enum spoa_frame_type { - SPOA_FRM_T_UNKNOWN = 0, - SPOA_FRM_T_HAPROXY, - SPOA_FRM_T_AGENT, -}; - -struct spoe_engine { - char *id; - - struct list processing_frames; - struct list outgoing_frames; - - struct list clients; - struct list list; -}; - -struct spoe_frame { - enum spoa_frame_type type; - char *buf; - unsigned int offset; - unsigned int len; - - unsigned int stream_id; - unsigned int frame_id; - unsigned int flags; - bool hcheck; /* true is the CONNECT frame is a healthcheck */ - bool fragmented; /* true if the frame is fragmented */ - int ip_score; /* -1 if unset, else between 0 and 100 */ - - struct event process_frame_event; - struct worker *worker; - struct spoe_engine *engine; - struct client *client; - struct list list; - - char *frag_buf; /* used to accumulate payload of a fragmented frame */ - unsigned int frag_len; - - char data[0]; -}; - -struct client { - int fd; - unsigned long id; - enum spoa_state state; - - struct event read_frame_event; - struct event write_frame_event; - - struct spoe_frame *incoming_frame; - struct spoe_frame *outgoing_frame; - - struct list processing_frames; - struct list outgoing_frames; - - unsigned int max_frame_size; - int status_code; - - char *engine_id; - struct spoe_engine *engine; - bool pipelining; - bool async; - bool fragmentation; - - struct worker *worker; - struct list by_worker; - struct list by_engine; -}; - -struct worker { - pthread_t thread; - int id; - struct event_base *base; - struct event *monitor_event; - - struct list engines; - - unsigned int nbclients; - struct list clients; - - struct list frames; - unsigned int nbframes; -}; - - -/* Globals */ -static struct worker *workers = NULL; -static struct worker null_worker = { .id = 0 }; -static unsigned long clicount = 0; -static int server_port = DEFAULT_PORT; -static int num_workers = NUM_WORKERS; -static unsigned int max_frame_size = MAX_FRAME_SIZE; -struct timeval processing_delay = {0, 0}; -static bool debug = false; -static bool pipelining = false; -static bool async = false; -static bool fragmentation = false; - - -static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = { - [SPOE_FRM_ERR_NONE] = "normal", - [SPOE_FRM_ERR_IO] = "I/O error", - [SPOE_FRM_ERR_TOUT] = "a timeout occurred", - [SPOE_FRM_ERR_TOO_BIG] = "frame is too big", - [SPOE_FRM_ERR_INVALID] = "invalid frame received", - [SPOE_FRM_ERR_NO_VSN] = "version value not found", - [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found", - [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found", - [SPOE_FRM_ERR_BAD_VSN] = "unsupported version", - [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small", - [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported", - [SPOE_FRM_ERR_INTERLACED_FRAMES] = "invalid interlaced frames", - [SPOE_FRM_ERR_FRAMEID_NOTFOUND] = "frame-id not found", - [SPOE_FRM_ERR_RES] = "resource allocation error", - [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred", -}; - -static void signal_cb(evutil_socket_t, short, void *); -static void accept_cb(evutil_socket_t, short, void *); -static void worker_monitor_cb(evutil_socket_t, short, void *); -static void process_frame_cb(evutil_socket_t, short, void *); -static void read_frame_cb(evutil_socket_t, short, void *); -static void write_frame_cb(evutil_socket_t, short, void *); - -static void use_spoe_engine(struct client *); -static void unuse_spoe_engine(struct client *); -static void release_frame(struct spoe_frame *); -static void release_client(struct client *); - -static void -check_ipv4_reputation(struct spoe_frame *frame, struct in_addr *ipv4) -{ - char str[INET_ADDRSTRLEN]; - - if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL) - return; - - frame->ip_score = random() % 100; - - DEBUG(frame->worker, "IP score for %.*s is %d", - INET_ADDRSTRLEN, str, frame->ip_score); -} - -static void -check_ipv6_reputation(struct spoe_frame *frame, struct in6_addr *ipv6) -{ - char str[INET6_ADDRSTRLEN]; - - if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL) - return; - - frame->ip_score = random() % 100; - - DEBUG(frame->worker, "IP score for %.*s is %d", - INET6_ADDRSTRLEN, str, frame->ip_score); -} - - -/* Check the protocol version. It returns -1 if an error occurred, the number of - * read bytes otherwise. */ -static int -check_proto_version(struct spoe_frame *frame, char **buf, char *end) -{ - char *str, *p = *buf; - uint64_t sz; - int ret; - - /* Get the list of all supported versions by HAProxy */ - if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) - return -1; - ret = spoe_decode_buffer(&p, end, &str, &sz); - if (ret == -1 || !str) - return -1; - - DEBUG(frame->worker, "<%lu> Supported versions : %.*s", - frame->client->id, (int)sz, str); - - /* TODO: Find the right version in supported ones */ - - ret = (p - *buf); - *buf = p; - return ret; -} - -/* Check max frame size value. It returns -1 if an error occurred, the number of - * read bytes otherwise. */ -static int -check_max_frame_size(struct spoe_frame *frame, char **buf, char *end) -{ - char *p = *buf; - uint64_t sz; - int type, ret; - - /* Get the max-frame-size value of HAProxy */ - type = *p++; - if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 && - (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 && - (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 && - (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) - return -1; - if (decode_varint(&p, end, &sz) == -1) - return -1; - - /* Keep the lower value */ - if (sz < frame->client->max_frame_size) - frame->client->max_frame_size = sz; - - DEBUG(frame->worker, "<%lu> HAProxy maximum frame size : %u", - frame->client->id, (unsigned int)sz); - - ret = (p - *buf); - *buf = p; - return ret; -} - -/* Check healthcheck value. It returns -1 if an error occurred, the number of - * read bytes otherwise. */ -static int -check_healthcheck(struct spoe_frame *frame, char **buf, char *end) -{ - char *p = *buf; - int type, ret; - - /* Get the "healthcheck" value */ - type = *p++; - if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) - return -1; - frame->hcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE); - - DEBUG(frame->worker, "<%lu> HELLO healthcheck : %s", - frame->client->id, (frame->hcheck ? "true" : "false")); - - ret = (p - *buf); - *buf = p; - return ret; -} - -/* Check capabilities value. It returns -1 if an error occurred, the number of - * read bytes otherwise. */ -static int -check_capabilities(struct spoe_frame *frame, char **buf, char *end) -{ - struct client *client = frame->client; - char *str, *p = *buf; - uint64_t sz; - int ret; - - if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) - return -1; - if (spoe_decode_buffer(&p, end, &str, &sz) == -1) - return -1; - if (str == NULL) /* this is not an error */ - goto end; - - DEBUG(frame->worker, "<%lu> HAProxy capabilities : %.*s", - client->id, (int)sz, str); - - while (sz) { - char *delim; - - /* Skip leading spaces */ - for (; isspace(*str) && sz; sz--); - - if (sz >= 10 && !strncmp(str, "pipelining", 10)) { - str += 10; sz -= 10; - if (!sz || isspace(*str) || *str == ',') { - DEBUG(frame->worker, - "<%lu> HAProxy supports frame pipelining", - client->id); - client->pipelining = true; - } - } - else if (sz >= 5 && !strncmp(str, "async", 5)) { - str += 5; sz -= 5; - if (!sz || isspace(*str) || *str == ',') { - DEBUG(frame->worker, - "<%lu> HAProxy supports asynchronous frame", - client->id); - client->async = true; - } - } - else if (sz >= 13 && !strncmp(str, "fragmentation", 13)) { - str += 13; sz -= 13; - if (!sz || isspace(*str) || *str == ',') { - DEBUG(frame->worker, - "<%lu> HAProxy supports fragmented frame", - client->id); - client->fragmentation = true; - } - } - - if (!sz || (delim = memchr(str, ',', sz)) == NULL) - break; - delim++; - sz -= (delim - str); - str = delim; - } - end: - ret = (p - *buf); - *buf = p; - return ret; -} - -/* Check engine-id value. It returns -1 if an error occurred, the number of - * read bytes otherwise. */ -static int -check_engine_id(struct spoe_frame *frame, char **buf, char *end) -{ - struct client *client = frame->client; - char *str, *p = *buf; - uint64_t sz; - int ret; - - if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) - return -1; - - if (spoe_decode_buffer(&p, end, &str, &sz) == -1) - return -1; - if (str == NULL) /* this is not an error */ - goto end; - - if (client->engine != NULL) - goto end; - - DEBUG(frame->worker, "<%lu> HAProxy engine id : %.*s", - client->id, (int)sz, str); - - client->engine_id = strndup(str, (int)sz); - end: - ret = (p - *buf); - *buf = p; - return ret; -} - -static int -acc_payload(struct spoe_frame *frame) -{ - struct client *client = frame->client; - char *buf; - size_t len = frame->len - frame->offset; - int ret = frame->offset; - - /* No need to accumulation payload */ - if (frame->fragmented == false) - return ret; - - buf = realloc(frame->frag_buf, frame->frag_len + len); - if (buf == NULL) { - client->status_code = SPOE_FRM_ERR_RES; - return -1; - } - memcpy(buf + frame->frag_len, frame->buf + frame->offset, len); - frame->frag_buf = buf; - frame->frag_len += len; - - if (!(frame->flags & SPOE_FRM_FL_FIN)) { - /* Wait for next parts */ - frame->buf = (char *)(frame->data); - frame->offset = 0; - frame->len = 0; - frame->flags = 0; - return 1; - } - - frame->buf = frame->frag_buf; - frame->len = frame->frag_len; - frame->offset = 0; - return ret; -} - -/* Check disconnect status code. It returns -1 if an error occurred, the number - * of read bytes otherwise. */ -static int -check_discon_status_code(struct spoe_frame *frame, char **buf, char *end) -{ - char *p = *buf; - uint64_t sz; - int type, ret; - - /* Get the "status-code" value */ - type = *p++; - if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 && - (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 && - (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 && - (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) - return -1; - if (decode_varint(&p, end, &sz) == -1) - return -1; - - frame->client->status_code = (unsigned int)sz; - - DEBUG(frame->worker, "<%lu> Disconnect status code : %u", - frame->client->id, frame->client->status_code); - - ret = (p - *buf); - *buf = p; - return ret; -} - -/* Check the disconnect message. It returns -1 if an error occurred, the number - * of read bytes otherwise. */ -static int -check_discon_message(struct spoe_frame *frame, char **buf, char *end) -{ - char *str, *p = *buf; - uint64_t sz; - int ret; - - /* Get the "message" value */ - if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) - return -1; - ret = spoe_decode_buffer(&p, end, &str, &sz); - if (ret == -1 || !str) - return -1; - - DEBUG(frame->worker, "<%lu> Disconnect message : %.*s", - frame->client->id, (int)sz, str); - - ret = (p - *buf); - *buf = p; - return ret; -} - - - -/* Decode a HELLO frame received from HAProxy. It returns -1 if an error - * occurred, otherwise the number of read bytes. HELLO frame cannot be - * ignored and having another frame than a HELLO frame is an error. */ -static int -handle_hahello(struct spoe_frame *frame) -{ - struct client *client = frame->client; - char *p, *end; - - p = frame->buf; - end = frame->buf + frame->len; - - /* Check frame type: we really want a HELLO frame */ - if (*p++ != SPOE_FRM_T_HAPROXY_HELLO) - goto error; - - DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id); - - /* Retrieve flags */ - memcpy((char *)&(frame->flags), p, 4); - frame->flags = ntohl(frame->flags); - p += 4; - - /* Fragmentation is not supported for HELLO frame */ - if (!(frame->flags & SPOE_FRM_FL_FIN)) { - client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; - goto error; - } - - /* stream-id and frame-id must be cleared */ - if (*p != 0 || *(p+1) != 0) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - p += 2; - - /* Loop on K/V items */ - while (p < end) { - char *str; - uint64_t sz; - - /* Decode the item name */ - spoe_decode_buffer(&p, end, &str, &sz); - if (!str) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - - /* Check "supported-versions" K/V item */ - if (!memcmp(str, "supported-versions", sz)) { - if (check_proto_version(frame, &p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - /* Check "max-frame-size" K/V item */ - else if (!memcmp(str, "max-frame-size", sz)) { - if (check_max_frame_size(frame, &p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - /* Check "healthcheck" K/V item */ - else if (!memcmp(str, "healthcheck", sz)) { - if (check_healthcheck(frame, &p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - /* Check "capabilities" K/V item */ - else if (!memcmp(str, "capabilities", sz)) { - if (check_capabilities(frame, &p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - /* Check "engine-id" K/V item */ - else if (!memcmp(str, "engine-id", sz)) { - if (check_engine_id(frame, &p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - else { - DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s", - client->id, (int)sz, str); - - /* Silently ignore unknown item */ - if (spoe_skip_data(&p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - } - - if (async == false || client->engine_id == NULL) - client->async = false; - if (pipelining == false) - client->pipelining = false; - - if (client->async == true) - use_spoe_engine(client); - - return (p - frame->buf); - error: - return -1; -} - -/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error - * occurred, otherwise the number of read bytes. DISCONNECT frame cannot be - * ignored and having another frame than a DISCONNECT frame is an error.*/ -static int -handle_hadiscon(struct spoe_frame *frame) -{ - struct client *client = frame->client; - char *p, *end; - - p = frame->buf; - end = frame->buf + frame->len; - - /* Check frame type: we really want a DISCONNECT frame */ - if (*p++ != SPOE_FRM_T_HAPROXY_DISCON) - goto error; - - DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id); - - /* Retrieve flags */ - memcpy((char *)&(frame->flags), p, 4); - frame->flags = ntohl(frame->flags); - p += 4; - - /* Fragmentation is not supported for DISCONNECT frame */ - if (!(frame->flags & SPOE_FRM_FL_FIN)) { - client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; - goto error; - } - - /* stream-id and frame-id must be cleared */ - if (*p != 0 || *(p+1) != 0) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - p += 2; - - client->status_code = SPOE_FRM_ERR_NONE; - - /* Loop on K/V items */ - while (p < end) { - char *str; - uint64_t sz; - - /* Decode item key */ - spoe_decode_buffer(&p, end, &str, &sz); - if (!str) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - - /* Check "status-code" K/V item */ - if (!memcmp(str, "status-code", sz)) { - if (check_discon_status_code(frame, &p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - /* Check "message" K/V item */ - else if (!memcmp(str, "message", sz)) { - if (check_discon_message(frame, &p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - else { - DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s", - client->id, (int)sz, str); - - /* Silently ignore unknown item */ - if (spoe_skip_data(&p, end) == -1) { - client->status_code = SPOE_FRM_ERR_INVALID; - goto error; - } - } - } - - return (p - frame->buf); - error: - return -1; -} - -/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error - * occurred, 0 if it must be must be ignored, otherwise the number of read - * bytes. */ -static int -handle_hanotify(struct spoe_frame *frame) -{ - struct client *client = frame->client; - char *p, *end; - uint64_t stream_id, frame_id; - - p = frame->buf; - end = frame->buf + frame->len; - - /* Check frame type */ - if (*p++ != SPOE_FRM_T_HAPROXY_NOTIFY) - goto ignore; - - DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id); - - /* Retrieve flags */ - memcpy((char *)&(frame->flags), p, 4); - frame->flags = ntohl(frame->flags); - p += 4; - - /* Fragmentation is not supported */ - if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) { - client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; - goto error; - } - - /* Read the stream-id and frame-id */ - if (decode_varint(&p, end, &stream_id) == -1) - goto ignore; - if (decode_varint(&p, end, &frame_id) == -1) - goto ignore; - - frame->stream_id = (unsigned int)stream_id; - frame->frame_id = (unsigned int)frame_id; - - DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" - " - %s frame received" - " - frag_len=%u - len=%u - offset=%ld", - client->id, frame->stream_id, frame->frame_id, - (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented", - frame->frag_len, frame->len, p - frame->buf); - - frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN); - frame->offset = (p - frame->buf); - return acc_payload(frame); - - ignore: - return 0; - - error: - return -1; -} - -/* Decode next part of a fragmented frame received from HAProxy. It returns -1 - * if an error occurred, 0 if it must be must be ignored, otherwise the number - * of read bytes. */ -static int -handle_hafrag(struct spoe_frame *frame) -{ - struct client *client = frame->client; - char *p, *end; - uint64_t stream_id, frame_id; - - p = frame->buf; - end = frame->buf + frame->len; - - /* Check frame type */ - if (*p++ != SPOE_FRM_T_UNSET) - goto ignore; - - DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id); - - /* Fragmentation is not supported */ - if (fragmentation == false) { - client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED; - goto error; - } - - /* Retrieve flags */ - memcpy((char *)&(frame->flags), p, 4); - frame->flags = ntohl(frame->flags); - p+= 4; - - /* Read the stream-id and frame-id */ - if (decode_varint(&p, end, &stream_id) == -1) - goto ignore; - if (decode_varint(&p, end, &frame_id) == -1) - goto ignore; - - if (frame->fragmented == false || - frame->stream_id != (unsigned int)stream_id || - frame->frame_id != (unsigned int)frame_id) { - client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES; - goto error; - } - - if (frame->flags & SPOE_FRM_FL_ABRT) { - DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" - " - Abort processing of a fragmented frame" - " - frag_len=%u - len=%u - offset=%ld", - client->id, frame->stream_id, frame->frame_id, - frame->frag_len, frame->len, p - frame->buf); - goto ignore; - } - - DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u" - " - %s fragment of a fragmented frame received" - " - frag_len=%u - len=%u - offset=%ld", - client->id, frame->stream_id, frame->frame_id, - (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next", - frame->frag_len, frame->len, p - frame->buf); - - frame->offset = (p - frame->buf); - return acc_payload(frame); - - ignore: - return 0; - - error: - return -1; -} - -/* Encode a HELLO frame to send it to HAProxy. It returns the number of written - * bytes. */ -static int -prepare_agenthello(struct spoe_frame *frame) -{ - struct client *client = frame->client; - char *p, *end; - char capabilities[64]; - int n; - unsigned int flags = SPOE_FRM_FL_FIN; - - DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id); - frame->type = SPOA_FRM_T_AGENT; - - p = frame->buf; - end = frame->buf+max_frame_size; - - /* Frame Type */ - *p++ = SPOE_FRM_T_AGENT_HELLO; - - /* Set flags */ - flags = htonl(flags); - memcpy(p, (char *)&flags, 4); - p += 4; - - /* No stream-id and frame-id for HELLO frames */ - *p++ = 0; - *p++ = 0; - - /* "version" K/V item */ - spoe_encode_buffer("version", 7, &p, end); - *p++ = SPOE_DATA_T_STR; - spoe_encode_buffer(SPOP_VERSION, SLEN(SPOP_VERSION), &p, end); - DEBUG(frame->worker, "<%lu> Agent version : %s", - client->id, SPOP_VERSION); - - - /* "max-frame-size" K/V item */ - spoe_encode_buffer("max-frame-size", 14, &p ,end); - *p++ = SPOE_DATA_T_UINT32; - encode_varint(client->max_frame_size, &p, end); - DEBUG(frame->worker, "<%lu> Agent maximum frame size : %u", - client->id, client->max_frame_size); - - /* "capabilities" K/V item */ - spoe_encode_buffer("capabilities", 12, &p, end); - *p++ = SPOE_DATA_T_STR; - - memset(capabilities, 0, sizeof(capabilities)); - n = 0; - - /* 1. Fragmentation capability ? */ - if (fragmentation == true) { - memcpy(capabilities, "fragmentation", 13); - n += 13; - } - /* 2. Pipelining capability ? */ - if (client->pipelining == true) { - if (n) capabilities[n++] = ','; - memcpy(capabilities + n, "pipelining", 10); - n += 10; - } - /* 3. Async capability ? */ - if (client->async == true) { - if (n) capabilities[n++] = ','; - memcpy(capabilities + n, "async", 5); - n += 5; - } - spoe_encode_buffer(capabilities, n, &p, end); - - DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s", - client->id, n, capabilities); - - frame->len = (p - frame->buf); - return frame->len; -} - -/* Encode a DISCONNECT frame to send it to HAProxy. It returns the number of - * written bytes. */ -static int -prepare_agentdicon(struct spoe_frame *frame) -{ - struct client *client = frame->client; - char *p, *end; - const char *reason; - int rlen; - unsigned int flags = SPOE_FRM_FL_FIN; - - DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id); - frame->type = SPOA_FRM_T_AGENT; - - p = frame->buf; - end = frame->buf+max_frame_size; - - if (client->status_code >= SPOE_FRM_ERRS) - client->status_code = SPOE_FRM_ERR_UNKNOWN; - reason = spoe_frm_err_reasons[client->status_code]; - rlen = strlen(reason); - - /* Frame type */ - *p++ = SPOE_FRM_T_AGENT_DISCON; - - /* Set flags */ - flags = htonl(flags); - memcpy(p, (char *)&flags, 4); - p += 4; - - /* No stream-id and frame-id for DISCONNECT frames */ - *p++ = 0; - *p++ = 0; - - /* There are 2 mandatory items: "status-code" and "message" */ - - /* "status-code" K/V item */ - spoe_encode_buffer("status-code", 11, &p, end); - *p++ = SPOE_DATA_T_UINT32; - encode_varint(client->status_code, &p, end); - DEBUG(frame->worker, "<%lu> Disconnect status code : %u", - client->id, client->status_code); - - /* "message" K/V item */ - spoe_encode_buffer("message", 7, &p, end); - *p++ = SPOE_DATA_T_STR; - spoe_encode_buffer(reason, rlen, &p, end); - DEBUG(frame->worker, "<%lu> Disconnect message : %s", - client->id, reason); - - frame->len = (p - frame->buf); - return frame->len; -} - -/* Encode a ACK frame to send it to HAProxy. It returns the number of written - * bytes. */ -static int -prepare_agentack(struct spoe_frame *frame) -{ - char *p, *end; - unsigned int flags = SPOE_FRM_FL_FIN; - - /* Be careful here, in async mode, frame->client can be NULL */ - - DEBUG(frame->worker, "Encode Agent ACK frame"); - frame->type = SPOA_FRM_T_AGENT; - - p = frame->buf; - end = frame->buf+max_frame_size; - - /* Frame type */ - *p++ = SPOE_FRM_T_AGENT_ACK; - - /* Set flags */ - flags = htonl(flags); - memcpy(p, (char *)&flags, 4); - p += 4; - - /* Set stream-id and frame-id for ACK frames */ - encode_varint(frame->stream_id, &p, end); - encode_varint(frame->frame_id, &p, end); - - DEBUG(frame->worker, "STREAM-ID=%u - FRAME-ID=%u", - frame->stream_id, frame->frame_id); - - frame->len = (p - frame->buf); - return frame->len; -} - -static int -create_server_socket(void) -{ - struct sockaddr_in listen_addr; - int fd, yes = 1; - - fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { - LOG(&null_worker, "Failed to create service socket : %m"); - return -1; - } - - memset(&listen_addr, 0, sizeof(listen_addr)); - listen_addr.sin_family = AF_INET; - listen_addr.sin_addr.s_addr = INADDR_ANY; - listen_addr.sin_port = htons(server_port); - - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0 || - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) < 0) { - LOG(&null_worker, "Failed to set option on server socket : %m"); - return -1; - } - - if (bind(fd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)) < 0) { - LOG(&null_worker, "Failed to bind server socket : %m"); - return -1; - } - - if (listen(fd, CONNECTION_BACKLOG) < 0) { - LOG(&null_worker, "Failed to listen on server socket : %m"); - return -1; - } - - return fd; -} - -static void -release_frame(struct spoe_frame *frame) -{ - struct worker *worker; - - if (frame == NULL) - return; - - if (event_pending(&frame->process_frame_event, EV_TIMEOUT, NULL)) - event_del(&frame->process_frame_event); - - worker = frame->worker; - LIST_DELETE(&frame->list); - if (frame->frag_buf) - free(frame->frag_buf); - memset(frame, 0, sizeof(*frame)+max_frame_size+4); - LIST_APPEND(&worker->frames, &frame->list); -} - -static void -release_client(struct client *c) -{ - struct spoe_frame *frame, *back; - - if (c == NULL) - return; - - DEBUG(c->worker, "<%lu> Release client", c->id); - - LIST_DELETE(&c->by_worker); - c->worker->nbclients--; - - unuse_spoe_engine(c); - free(c->engine_id); - - if (event_pending(&c->read_frame_event, EV_READ, NULL)) - event_del(&c->read_frame_event); - if (event_pending(&c->write_frame_event, EV_WRITE, NULL)) - event_del(&c->write_frame_event); - - release_frame(c->incoming_frame); - release_frame(c->outgoing_frame); - list_for_each_entry_safe(frame, back, &c->processing_frames, list) { - release_frame(frame); - } - list_for_each_entry_safe(frame, back, &c->outgoing_frames, list) { - release_frame(frame); - } - - if (c->fd >= 0) - close(c->fd); - - free(c); -} - -static void -reset_frame(struct spoe_frame *frame) -{ - if (frame == NULL) - return; - - if (frame->frag_buf) - free(frame->frag_buf); - - frame->type = SPOA_FRM_T_UNKNOWN; - frame->buf = (char *)(frame->data); - frame->offset = 0; - frame->len = 0; - frame->stream_id = 0; - frame->frame_id = 0; - frame->flags = 0; - frame->hcheck = false; - frame->fragmented = false; - frame->ip_score = -1; - frame->frag_buf = NULL; - frame->frag_len = 0; - LIST_INIT(&frame->list); -} - -static void -use_spoe_engine(struct client *client) -{ - struct spoe_engine *eng; - - if (client->engine_id == NULL) - return; - - list_for_each_entry(eng, &client->worker->engines, list) { - if (strcmp(eng->id, client->engine_id) == 0) - goto end; - } - - if ((eng = malloc(sizeof(*eng))) == NULL) { - client->async = false; - return; - } - - eng->id = strdup(client->engine_id); - LIST_INIT(&eng->clients); - LIST_INIT(&eng->processing_frames); - LIST_INIT(&eng->outgoing_frames); - LIST_APPEND(&client->worker->engines, &eng->list); - LOG(client->worker, "Add new SPOE engine '%s'", eng->id); - - end: - client->engine = eng; - LIST_APPEND(&eng->clients, &client->by_engine); -} - -static void -unuse_spoe_engine(struct client *client) -{ - struct spoe_engine *eng; - struct spoe_frame *frame, *back; - - if (client == NULL || client->engine == NULL) - return; - - eng = client->engine; - client->engine = NULL; - LIST_DELETE(&client->by_engine); - if (!LIST_ISEMPTY(&eng->clients)) - return; - - LOG(client->worker, "Remove SPOE engine '%s'", eng->id); - LIST_DELETE(&eng->list); - - list_for_each_entry_safe(frame, back, &eng->processing_frames, list) { - release_frame(frame); - } - list_for_each_entry_safe(frame, back, &eng->outgoing_frames, list) { - release_frame(frame); - } - free(eng->id); - free(eng); -} - - -static struct spoe_frame * -acquire_incoming_frame(struct client *client) -{ - struct spoe_frame *frame; - - frame = client->incoming_frame; - if (frame != NULL) - return frame; - - if (LIST_ISEMPTY(&client->worker->frames)) { - if ((frame = calloc(1, sizeof(*frame)+max_frame_size+4)) == NULL) { - LOG(client->worker, "Failed to allocate new frame : %m"); - return NULL; - } - } - else { - frame = LIST_NEXT(&client->worker->frames, typeof(frame), list); - LIST_DELETE(&frame->list); - } - - reset_frame(frame); - frame->worker = client->worker; - frame->engine = client->engine; - frame->client = client; - - if (event_assign(&frame->process_frame_event, client->worker->base, -1, - EV_TIMEOUT|EV_PERSIST, process_frame_cb, frame) < 0) { - LOG(client->worker, "Failed to create frame event"); - return NULL; - } - - client->incoming_frame = frame; - return frame; -} - -static struct spoe_frame * -acquire_outgoing_frame(struct client *client) -{ - struct spoe_engine *engine = client->engine; - struct spoe_frame *frame = NULL; - - if (client->outgoing_frame != NULL) - frame = client->outgoing_frame; - else if (!LIST_ISEMPTY(&client->outgoing_frames)) { - frame = LIST_NEXT(&client->outgoing_frames, typeof(frame), list); - LIST_DELETE(&frame->list); - client->outgoing_frame = frame; - } - else if (engine!= NULL && !LIST_ISEMPTY(&engine->outgoing_frames)) { - frame = LIST_NEXT(&engine->outgoing_frames, typeof(frame), list); - LIST_DELETE(&frame->list); - client->outgoing_frame = frame; - } - return frame; -} - -static void -write_frame(struct client *client, struct spoe_frame *frame) -{ - uint32_t netint; - - LIST_DELETE(&frame->list); - - frame->buf = (char *)(frame->data); - frame->offset = 0; - netint = htonl(frame->len); - memcpy(frame->buf, &netint, 4); - - if (client != NULL) { /* HELLO or DISCONNECT frames */ - event_add(&client->write_frame_event, NULL); - - /* Try to process the frame as soon as possible, and always - * attach it to the client */ - if (client->async || client->pipelining) { - if (client->outgoing_frame == NULL) - client->outgoing_frame = frame; - else - LIST_INSERT(&client->outgoing_frames, &frame->list); - } - else { - client->outgoing_frame = frame; - event_del(&client->read_frame_event); - } - } - else { /* for all other frames */ - if (frame->client == NULL) { /* async mode ! */ - LIST_APPEND(&frame->engine->outgoing_frames, &frame->list); - list_for_each_entry(client, &frame->engine->clients, by_engine) - event_add(&client->write_frame_event, NULL); - } - else if (frame->client->pipelining) { - LIST_APPEND(&frame->client->outgoing_frames, &frame->list); - event_add(&frame->client->write_frame_event, NULL); - } - else { - frame->client->outgoing_frame = frame; - event_add(&frame->client->write_frame_event, NULL); - event_del(&frame->client->read_frame_event); - } - } -} - -static void -process_incoming_frame(struct spoe_frame *frame) -{ - struct client *client = frame->client; - - if (event_add(&frame->process_frame_event, &processing_delay) < 0) { - LOG(client->worker, "Failed to process incoming frame"); - release_frame(frame); - return; - } - - if (client->async) { - frame->client = NULL; - LIST_APPEND(&frame->engine->processing_frames, &frame->list); - } - else if (client->pipelining) - LIST_APPEND(&client->processing_frames, &frame->list); - else - event_del(&client->read_frame_event); -} - -static void -signal_cb(evutil_socket_t sig, short events, void *user_data) -{ - struct event_base *base = user_data; - int i; - - DEBUG(&null_worker, "Stopping the server"); - - event_base_loopbreak(base); - DEBUG(&null_worker, "Main event loop stopped"); - - for (i = 0; i < num_workers; i++) { - event_base_loopbreak(workers[i].base); - DEBUG(&null_worker, "Event loop stopped for worker %02d", - workers[i].id); - } -} - -static void -worker_monitor_cb(evutil_socket_t fd, short events, void *arg) -{ - struct worker *worker = arg; - - LOG(worker, "%u clients connected (%u frames)", worker->nbclients, worker->nbframes); -} - -static void -process_frame_cb(evutil_socket_t fd, short events, void *arg) -{ - struct spoe_frame *frame = arg; - char *p, *end; - int ret; - - DEBUG(frame->worker, - "Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes", - frame->stream_id, frame->frame_id, frame->len - frame->offset); - - p = frame->buf + frame->offset; - end = frame->buf + frame->len; - - /* Loop on messages */ - while (p < end) { - char *str; - uint64_t sz; - int nbargs; - - /* Decode the message name */ - spoe_decode_buffer(&p, end, &str, &sz); - if (!str) - goto stop_processing; - - DEBUG(frame->worker, "Process SPOE Message '%.*s'", (int)sz, str); - - nbargs = (unsigned char)*p++; /* Get the number of arguments */ - frame->offset = (p - frame->buf); /* Save index to handle errors and skip args */ - if (!memcmp(str, "check-client-ip", sz)) { - union spoe_data data; - enum spoe_data_type type; - - if (nbargs != 1) - goto skip_message; - - if (spoe_decode_buffer(&p, end, &str, &sz) == -1) - goto stop_processing; - if (spoe_decode_data(&p, end, &data, &type) == -1) - goto skip_message; - frame->worker->nbframes++; - if (type == SPOE_DATA_T_IPV4) - check_ipv4_reputation(frame, &data.ipv4); - if (type == SPOE_DATA_T_IPV6) - check_ipv6_reputation(frame, &data.ipv6); - } - else { - skip_message: - p = frame->buf + frame->offset; /* Restore index */ - - while (nbargs-- > 0) { - /* Silently ignore argument: its name and its value */ - if (spoe_decode_buffer(&p, end, &str, &sz) == -1) - goto stop_processing; - if (spoe_skip_data(&p, end) == -1) - goto stop_processing; - } - } - } - - stop_processing: - /* Prepare agent ACK frame */ - frame->buf = (char *)(frame->data) + 4; - frame->offset = 0; - frame->len = 0; - frame->flags = 0; - - ret = prepare_agentack(frame); - p = frame->buf + ret; - end = frame->buf+max_frame_size; - - if (frame->ip_score != -1) { - DEBUG(frame->worker, "Add action : set variable ip_score=%u", - frame->ip_score); - - *p++ = SPOE_ACT_T_SET_VAR; /* Action type */ - *p++ = 3; /* Number of args */ - *p++ = SPOE_SCOPE_SESS; /* Arg 1: the scope */ - spoe_encode_buffer("ip_score", 8, &p, end); /* Arg 2: variable name */ - *p++ = SPOE_DATA_T_UINT32; - encode_varint(frame->ip_score, &p, end); /* Arg 3: variable value */ - frame->len = (p - frame->buf); - } - write_frame(NULL, frame); -} - -static void -read_frame_cb(evutil_socket_t fd, short events, void *arg) -{ - struct client *client = arg; - struct spoe_frame *frame; - uint32_t netint; - int n; - - DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__); - if ((frame = acquire_incoming_frame(client)) == NULL) - goto close; - - frame->type = SPOA_FRM_T_HAPROXY; - if (frame->buf == (char *)(frame->data)) { - /* Read the frame length: frame->buf points on length part (frame->data) */ - n = read(client->fd, frame->buf+frame->offset, 4-frame->offset); - if (n <= 0) { - if (n < 0) - LOG(client->worker, "Failed to read frame length : %m"); - goto close; - } - frame->offset += n; - if (frame->offset != 4) - return; - memcpy(&netint, frame->buf, 4); - frame->buf += 4; - frame->offset = 0; - frame->len = ntohl(netint); - } - - /* Read the frame: frame->buf points on frame part (frame->data+4)*/ - n = read(client->fd, frame->buf + frame->offset, - frame->len - frame->offset); - if (n <= 0) { - if (n < 0) { - LOG(client->worker, "Failed to read frame : %m"); - goto close; - } - return; - } - frame->offset += n; - if (frame->offset != frame->len) - return; - frame->offset = 0; - - DEBUG(client->worker, "<%lu> New Frame of %u bytes received", - client->id, frame->len); - - switch (client->state) { - case SPOA_ST_CONNECTING: - if (handle_hahello(frame) < 0) { - LOG(client->worker, "Failed to decode HELLO frame"); - goto disconnect; - } - prepare_agenthello(frame); - goto write_frame; - - case SPOA_ST_PROCESSING: - if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) { - client->state = SPOA_ST_DISCONNECTING; - goto disconnecting; - } - if (frame->buf[0] == SPOE_FRM_T_UNSET) - n = handle_hafrag(frame); - else - n = handle_hanotify(frame); - - if (n < 0) { - LOG(client->worker, "Failed to decode frame: %s", - spoe_frm_err_reasons[client->status_code]); - goto disconnect; - } - else if (n == 0) { - LOG(client->worker, "Ignore invalid/unknown/aborted frame"); - goto ignore_frame; - } - else if (n == 1) - goto noop; - else - goto process_frame; - - case SPOA_ST_DISCONNECTING: - disconnecting: - if (handle_hadiscon(frame) < 0) { - LOG(client->worker, "Failed to decode DISCONNECT frame"); - goto disconnect; - } - if (client->status_code != SPOE_FRM_ERR_NONE) - LOG(client->worker, "<%lu> Peer closed connection: %s", - client->id, spoe_frm_err_reasons[client->status_code]); - goto disconnect; - } - - noop: - return; - - ignore_frame: - reset_frame(frame); - return; - - process_frame: - process_incoming_frame(frame); - client->incoming_frame = NULL; - return; - - write_frame: - write_frame(client, frame); - client->incoming_frame = NULL; - return; - - disconnect: - client->state = SPOA_ST_DISCONNECTING; - if (prepare_agentdicon(frame) < 0) { - LOG(client->worker, "Failed to encode DISCONNECT frame"); - goto close; - } - goto write_frame; - - close: - release_client(client); -} - -static void -write_frame_cb(evutil_socket_t fd, short events, void *arg) -{ - struct client *client = arg; - struct spoe_frame *frame; - int n; - - DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__); - if ((frame = acquire_outgoing_frame(client)) == NULL) { - event_del(&client->write_frame_event); - return; - } - - if (frame->buf == (char *)(frame->data)) { - /* Write the frame length: frame->buf points on length part (frame->data) */ - n = write(client->fd, frame->buf+frame->offset, 4-frame->offset); - if (n <= 0) { - if (n < 0) - LOG(client->worker, "Failed to write frame length : %m"); - goto close; - } - frame->offset += n; - if (frame->offset != 4) - return; - frame->buf += 4; - frame->offset = 0; - } - - /* Write the frame: frame->buf points on frame part (frame->data+4)*/ - n = write(client->fd, frame->buf + frame->offset, - frame->len - frame->offset); - if (n <= 0) { - if (n < 0) { - LOG(client->worker, "Failed to write frame : %m"); - goto close; - } - return; - } - frame->offset += n; - if (frame->offset != frame->len) - return; - - DEBUG(client->worker, "<%lu> Frame of %u bytes sent", - client->id, frame->len); - - switch (client->state) { - case SPOA_ST_CONNECTING: - if (frame->hcheck == true) { - DEBUG(client->worker, - "<%lu> Close client after healthcheck", - client->id); - goto close; - } - client->state = SPOA_ST_PROCESSING; - break; - - case SPOA_ST_PROCESSING: - break; - - case SPOA_ST_DISCONNECTING: - goto close; - } - - release_frame(frame); - client->outgoing_frame = NULL; - if (!client->async && !client->pipelining) { - event_del(&client->write_frame_event); - event_add(&client->read_frame_event, NULL); - } - return; - - close: - release_client(client); -} - -static void -accept_cb(int listener, short event, void *arg) -{ - struct worker *worker; - struct client *client; - int fd; - - worker = &workers[clicount++ % num_workers]; - - if ((fd = accept(listener, NULL, NULL)) < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) - LOG(worker, "Failed to accept client connection : %m"); - return; - } - - DEBUG(&null_worker, - "<%lu> New Client connection accepted and assigned to worker %02d", - clicount, worker->id); - - if (evutil_make_socket_nonblocking(fd) < 0) { - LOG(&null_worker, "Failed to set client socket to non-blocking : %m"); - close(fd); - return; - } - - if ((client = calloc(1, sizeof(*client))) == NULL) { - LOG(&null_worker, "Failed to allocate memory for client state : %m"); - close(fd); - return; - } - - client->id = clicount; - client->fd = fd; - client->worker = worker; - client->state = SPOA_ST_CONNECTING; - client->status_code = SPOE_FRM_ERR_NONE; - client->max_frame_size = max_frame_size; - client->engine = NULL; - client->pipelining = false; - client->async = false; - client->incoming_frame = NULL; - client->outgoing_frame = NULL; - LIST_INIT(&client->processing_frames); - LIST_INIT(&client->outgoing_frames); - - LIST_APPEND(&worker->clients, &client->by_worker); - - worker->nbclients++; - - if (event_assign(&client->read_frame_event, worker->base, fd, - EV_READ|EV_PERSIST, read_frame_cb, client) < 0 || - event_assign(&client->write_frame_event, worker->base, fd, - EV_WRITE|EV_PERSIST, write_frame_cb, client) < 0) { - LOG(&null_worker, "Failed to create client events"); - release_client(client); - return; - } - event_add(&client->read_frame_event, NULL); -} - -static void * -worker_function(void *data) -{ - struct client *client, *cback; - struct spoe_frame *frame, *fback; - struct worker *worker = data; - - DEBUG(worker, "Worker ready to process client messages"); - event_base_dispatch(worker->base); - - list_for_each_entry_safe(client, cback, &worker->clients, by_worker) { - release_client(client); - } - - list_for_each_entry_safe(frame, fback, &worker->frames, list) { - LIST_DELETE(&frame->list); - free(frame); - } - - event_free(worker->monitor_event); - event_base_free(worker->base); - DEBUG(worker, "Worker is stopped"); - pthread_exit(&null_worker); -} - - -static int -parse_processing_delay(const char *str) -{ - unsigned long value; - - value = 0; - while (1) { - unsigned int j; - - j = *str - '0'; - if (j > 9) - break; - str++; - value *= 10; - value += j; - } - - switch (*str) { - case '\0': /* no unit = millisecond */ - value *= 1000; - break; - case 's': /* second */ - value *= 1000000; - str++; - break; - case 'm': /* millisecond : "ms" */ - if (str[1] != 's') - return -1; - value *= 1000; - str += 2; - break; - case 'u': /* microsecond : "us" */ - if (str[1] != 's') - return -1; - str += 2; - break; - default: - return -1; - } - if (*str) - return -1; - - processing_delay.tv_sec = (time_t)(value / 1000000); - processing_delay.tv_usec = (suseconds_t)(value % 1000000); - return 0; -} - - -static void -usage(char *prog) -{ - fprintf(stderr, - "Usage : %s [OPTION]...\n" - " -h Print this message\n" - " -d Enable the debug mode\n" - " -m Specify the maximum frame size (default : %u)\n" - " -p Specify the port to listen on (default : %d)\n" - " -n Specify the number of workers (default : %d)\n" - " -c Enable the support of the specified capability\n" - " -t