MEDIUM: log: syslog TCP support on log forward section.

This patch re-introduce the "bind" statement on log forward
sections to handle syslog TCP listeners as defined in
rfc-6587.

As complement it introduce "maxconn", "backlog" and "timeout
client" statements to parameter those listeners.
This commit is contained in:
Emeric Brun 2020-10-05 14:39:35 +02:00 committed by Willy Tarreau
parent 6d75616951
commit cbb7bf7dd1
2 changed files with 303 additions and 5 deletions

View File

@ -2779,11 +2779,21 @@ haproxy will forward all received log messages to a log servers list.
log-forward <name>
Creates a new log forwarder proxy identified as <name>.
backlog <conns>
Give hints to the system about the approximate listen backlog desired size
on connections accept.
bind <addr> [param*]
Used to configure a stream log listener to receive messages to forward.
This supports for some of the "bind" parameters found in 5.1 paragraph.
Those listener support both "Octet Counting" and "Non-Transparent-Framing"
modes as defined in rfc-6587.
dgram-bind <addr> [param*]
Used to configure a UDP log listener to receive messages to forward. Only UDP
listeners are allowed. Addresses must be in IPv4 or IPv6 form,followed by a
port. This supports for some of the "bind" parameters found in 5.1 paragraph
among which "interface", "namespace" or "transparent", the other ones being
Used to configure a datagram log listener to receive messages to forward.
Addresses must be in IPv4 or IPv6 form,followed by a port. This supports
for some of the "bind" parameters found in 5.1 paragraph among which
"interface", "namespace" or "transparent", the other ones being
silently ignored as irrelevant for UDP/syslog case.
log global
@ -2812,7 +2822,8 @@ log <address> [len <length>] [format <format>] [sample <ranges>:<smp_size>]
server mysyslogsrv 127.0.0.1:514 log-proto octet-count
log-forward sylog-loadb
bind udp4@127.0.0.1:1514
dgram-bind 127.0.0.1:1514
bind 127.0.0.1:1514
# all messages on stderr
log global
# all messages on local tcp syslog server
@ -2823,6 +2834,13 @@ log <address> [len <length>] [format <format>] [sample <ranges>:<smp_size>]
log 127.0.0.1:10003 sample 3:4 local0
log 127.0.0.1:10004 sample 4:4 local0
maxconn <conns>
Fix the maximum number of concurrent connections on a log forwarder.
10 is the default.
timeout client <timeout>
Set the maximum inactivity time on the client side.
4. Proxies
----------

280
src/log.c
View File

@ -3558,6 +3558,148 @@ out:
return;
}
/*
* IO Handler to handle message exchange with a syslog tcp client
*/
static void syslog_io_handler(struct appctx *appctx)
{
static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si);
struct proxy *frontend = strm_fe(s);
struct listener *l = strm_li(s);
struct buffer *buf = get_trash_chunk();
int max_accept;
int to_skip;
int facility;
int level;
char *message;
size_t size;
max_accept = l->maxaccept ? l->maxaccept : 1;
while (co_data(si_oc(si))) {
char c;
if (max_accept <= 0)
goto missing_budget;
max_accept--;
to_skip = co_getchar(si_oc(si), &c);
if (!to_skip)
goto missing_data;
else if (to_skip < 0)
goto cli_abort;
if (c == '<') {
/* rfc-6587, Non-Transparent-Framing: messages separated by
* a trailing LF or CR LF
*/
to_skip = co_getline(si_oc(si), buf->area, buf->size);
if (!to_skip)
goto missing_data;
else if (to_skip < 0)
goto cli_abort;
if (buf->area[to_skip - 1] != '\n')
goto parse_error;
buf->data = to_skip - 1;
/* according to rfc-6587, some devices adds CR before LF */
if (buf->data && buf->area[buf->data - 1] == '\r')
buf->data--;
}
else if ((unsigned char)(c - '1') <= 8) {
/* rfc-6587, Octet-Counting: message length in ASCII
* (first digit can not be ZERO), followed by a space
* and message length
*/
char *p = NULL;
int msglen;
to_skip = co_getword(si_oc(si), buf->area, buf->size, ' ');
if (!to_skip)
goto missing_data;
else if (to_skip < 0)
goto cli_abort;
if (buf->area[to_skip - 1] != ' ')
goto parse_error;
msglen = strtol(trash.area, &p, 10);
if (!msglen || p != &buf->area[to_skip - 1])
goto parse_error;
/* message seems too large */
if (msglen > buf->size)
goto parse_error;
msglen = co_getblk(si_oc(si), buf->area, msglen, to_skip);
if (!msglen)
goto missing_data;
else if (msglen < 0)
goto cli_abort;
buf->data = msglen;
to_skip += msglen;
}
else
goto parse_error;
co_skip(si_oc(si), to_skip);
/* update counters */
_HA_ATOMIC_ADD(&cum_log_messages, 1);
proxy_inc_fe_req_ctr(l, frontend);
parse_log_message(buf->area, buf->data, &level, &facility, metadata, &message, &size);
process_send_log(&frontend->logsrvs, level, facility, metadata, message, size);
}
missing_data:
/* we need more data to read */
si_oc(si)->flags |= CF_READ_DONTWAIT;
return;
missing_budget:
/* it may remain some stuff to do, let's retry later */
appctx_wakeup(appctx);
return;
parse_error:
if (l->counters)
_HA_ATOMIC_ADD(&l->counters->failed_req, 1);
_HA_ATOMIC_ADD(&frontend->fe_counters.failed_req, 1);
goto close;
cli_abort:
if (l->counters)
_HA_ATOMIC_ADD(&l->counters->cli_aborts, 1);
_HA_ATOMIC_ADD(&frontend->fe_counters.cli_aborts, 1);
close:
si_shutw(si);
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
return;
}
static struct applet syslog_applet = {
.obj_type = OBJ_TYPE_APPLET,
.name = "<SYSLOG>", /* used for logging */
.fct = syslog_io_handler,
.release = NULL,
};
/*
* Parse "log-forward" section and create corresponding sink buffer.
*
@ -3610,9 +3752,113 @@ int cfg_parse_log_forward(const char *file, int linenum, char **args, int kwm)
px->conf.file = strdup(file);
px->conf.line = linenum;
px->mode = PR_MODE_SYSLOG;
px->last_change = now.tv_sec;
px->cap = PR_CAP_FE;
px->maxconn = 10;
px->timeout.client = TICK_ETERNITY;
px->accept = frontend_accept;
px->default_target = &syslog_applet.obj_type;
px->id = strdup(args[1]);
}
else if (!strcmp(args[0], "maxconn")) { /* maxconn */
if (warnifnotcap(cfg_log_forward, PR_CAP_FE, file, linenum, args[0], " Maybe you want 'fullconn' instead ?"))
err_code |= ERR_WARN;
if (*(args[1]) == 0) {
ha_alert("parsing [%s:%d] : '%s' expects an integer argument.\n", file, linenum, args[0]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
cfg_log_forward->maxconn = atol(args[1]);
if (alertif_too_many_args(1, file, linenum, args, &err_code))
goto out;
}
else if (!strcmp(args[0], "backlog")) { /* backlog */
if (warnifnotcap(cfg_log_forward, PR_CAP_FE, file, linenum, args[0], NULL))
err_code |= ERR_WARN;
if (*(args[1]) == 0) {
ha_alert("parsing [%s:%d] : '%s' expects an integer argument.\n", file, linenum, args[0]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
cfg_log_forward->backlog = atol(args[1]);
if (alertif_too_many_args(1, file, linenum, args, &err_code))
goto out;
}
else if (strcmp(args[0], "bind") == 0) {
int cur_arg;
static int kws_dumped;
struct bind_conf *bind_conf;
struct bind_kw *kw;
struct listener *l;
cur_arg = 1;
bind_conf = bind_conf_alloc(cfg_log_forward, file, linenum,
NULL, xprt_get(XPRT_RAW));
if (!bind_conf) {
ha_alert("parsing [%s:%d] : out of memory error.", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
if (!str2listener(args[1], cfg_log_forward, bind_conf, file, linenum, &errmsg)) {
if (errmsg && *errmsg) {
indent_msg(&errmsg, 2);
ha_alert("parsing [%s:%d] : '%s %s' : %s\n", file, linenum, args[0], args[1], errmsg);
}
else {
ha_alert("parsing [%s:%d] : '%s %s' : error encountered while parsing listening address %s.\n",
file, linenum, args[0], args[1], args[2]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
}
list_for_each_entry(l, &bind_conf->listeners, by_bind) {
l->maxaccept = global.tune.maxaccept ? global.tune.maxaccept : 64;
l->accept = session_accept_fd;
l->analysers |= cfg_log_forward->fe_req_ana;
l->default_target = cfg_log_forward->default_target;
global.maxsock++;
}
cur_arg++;
while (*args[cur_arg] && (kw = bind_find_kw(args[cur_arg]))) {
int ret;
ret = kw->parse(args, cur_arg, cfg_log_forward, bind_conf, &errmsg);
err_code |= ret;
if (ret) {
if (errmsg && *errmsg) {
indent_msg(&errmsg, 2);
ha_alert("parsing [%s:%d] : %s\n", file, linenum, errmsg);
}
else
ha_alert("parsing [%s:%d]: error encountered while processing '%s'\n",
file, linenum, args[cur_arg]);
if (ret & ERR_FATAL)
goto out;
}
cur_arg += 1 + kw->skip;
}
if (*args[cur_arg] != 0) {
char *kws = NULL;
if (!kws_dumped) {
kws_dumped = 1;
bind_dump_kws(&kws);
indent_msg(&kws, 4);
}
ha_alert("parsing [%s:%d] : unknown keyword '%s' in '%s' section.%s%s\n",
file, linenum, args[cur_arg], cursection,
kws ? " Registered keywords :" : "", kws ? kws: "");
free(kws);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
}
else if (strcmp(args[0], "dgram-bind") == 0) {
int cur_arg;
static int kws_dumped;
@ -3685,6 +3931,40 @@ int cfg_parse_log_forward(const char *file, int linenum, char **args, int kwm)
goto out;
}
}
else if (strcmp(args[0], "timeout") == 0) {
const char *res;
unsigned timeout;
if (strcmp(args[1], "client") != 0) {
ha_alert("parsing [%s:%d] : unknown keyword '%s %s' in log-forward section.\n", file, linenum, args[0], args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
if (*args[2] == 0) {
ha_alert("parsing [%s:%d] : missing timeout client value.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
if (res == PARSE_TIME_OVER) {
memprintf(&errmsg, "timer overflow in argument '%s' to 'timeout client' (maximum value is 2147483647 ms or ~24.8 days)", args[2]);
}
else if (res == PARSE_TIME_UNDER) {
memprintf(&errmsg, "timer underflow in argument '%s' to 'timeout client' (minimum non-null value is 1 ms)", args[2]);
}
else if (res) {
memprintf(&errmsg, "unexpected character '%c' in 'timeout client'", *res);
return -1;
}
if (res) {
ha_alert("parsing [%s:%d] : %s : %s\n", file, linenum, args[0], errmsg);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
cfg_log_forward->timeout.client = MS_TO_TICKS(timeout);
}
else {
ha_alert("parsing [%s:%d] : unknown keyword '%s' in log-forward section.\n", file, linenum, args[0]);
err_code |= ERR_ALERT | ERR_ABORT;