MEDIUM: ring: add server statement to forward messages from a ring

This patch adds new statement "server" into ring section, and the
related "timeout connect" and "timeout server".

server <name> <address> [param*]
  Used to configure a syslog tcp server to forward messages from ring buffer.
  This supports for all "server" parameters found in 5.2 paragraph.
  Some of these parameters are irrelevant for "ring" sections.

timeout connect <timeout>
  Set the maximum time to wait for a connection attempt to a server to succeed.

  Arguments :
    <timeout> is the timeout value specified in milliseconds by default, but
              can be in any other unit if the number is suffixed by the unit,
              as explained at the top of this document.

timeout server <timeout>
  Set the maximum time for pending data staying into output buffer.

  Arguments :
    <timeout> is the timeout value specified in milliseconds by default, but
              can be in any other unit if the number is suffixed by the unit,
              as explained at the top of this document.

  Example:
    global
        log ring@myring local7

    ring myring
        description "My local buffer"
        format rfc3164
        maxlen 1200
        size 32764
        timeout connect 5s
        timeout server 10s
        server mysyslogsrv 127.0.0.1:6514
This commit is contained in:
Emeric Brun 2020-05-28 11:13:15 +02:00 committed by Willy Tarreau
parent 9f2ff3a700
commit 494c505703
5 changed files with 465 additions and 2 deletions

View File

@ -2596,10 +2596,38 @@ maxlen <length>
including formatted header. If an event message is longer than
<length>, it will be truncated to this length.
server <name> <address> [param*]
Used to configure a syslog tcp server to forward messages from ring buffer.
This supports for all "server" parameters found in 5.2 paragraph. Some of
these parameters are irrelevant for "ring" sections. Important point: there
is little reason to add more than one server to a ring, because all servers
will receive the exact same copy of the ring contents, and as such the ring
will progress at the speed of the slowest server. If one server does not
respond, it will prevent old messages from being purged and may block new
messages from being inserted into the ring. The proper way to send messages
to multiple servers is to use one distinct ring per log server, not to
attach multiple servers to the same ring.
size <size>
This is the optional size in bytes for the ring-buffer. Default value is
set to BUFSIZE.
timeout connect <timeout>
Set the maximum time to wait for a connection attempt to a server to succeed.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
can be in any other unit if the number is suffixed by the unit,
as explained at the top of this document.
timeout server <timeout>
Set the maximum time for pending data staying into output buffer.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
can be in any other unit if the number is suffixed by the unit,
as explained at the top of this document.
Example:
global
log ring@myring local7
@ -2609,6 +2637,9 @@ size <size>
format rfc3164
maxlen 1200
size 32764
timeout connect 5s
timeout server 10s
server mysyslogsrv 127.0.0.1:6514
4. Proxies

View File

@ -610,6 +610,7 @@ enum lock_label {
PROTO_LOCK,
CKCH_LOCK,
SNI_LOCK,
SFT_LOCK, /* sink forward target */
OTHER_LOCK,
LOCK_LABELS
};
@ -727,6 +728,7 @@ static inline const char *lock_label(enum lock_label label)
case PROTO_LOCK: return "PROTO";
case CKCH_LOCK: return "CKCH";
case SNI_LOCK: return "SNI";
case SFT_LOCK: return "SFT";
case OTHER_LOCK: return "OTHER";
case LOCK_LABELS: break; /* keep compiler happy */
};

View File

@ -178,6 +178,10 @@ struct appctx {
struct ckch_store *new_ckchs;
struct ckch_inst *next_ckchi;
} ssl;
struct {
void *ptr;
} sft; /* sink forward target */
/* NOTE: please add regular applet contexts (ie: not
* CLI-specific ones) above, before "cli".
*/

View File

@ -49,6 +49,14 @@ enum sink_fmt {
SINK_FMT_RFC5424, // extended syslog
};
struct sink_forward_target {
struct server *srv; // used server
struct appctx *appctx; // appctx of current session
size_t ofs; // ring buffer reader offset
__decl_hathreads(HA_SPINLOCK_T lock); // lock to protect current struct
struct sink_forward_target *next;
};
/* describes the configuration and current state of an event sink */
struct sink {
struct list sink_list; // position in the sink list
@ -57,6 +65,10 @@ struct sink {
enum sink_fmt fmt; // format expected by the sink
enum sink_type type; // type of storage
uint32_t maxlen; // max message length (truncated above)
struct proxy* forward_px; // proxy used to forward
struct sink_forward_target *sft; // sink forward targets
struct task *forward_task; // task to handle forward targets conns
struct sig_handler *forward_sighandler; /* signal handler */
struct {
__decl_hathreads(HA_RWLOCK_T lock); // shared/excl for dropped
struct ring *ring; // used by ring buffer and STRM sender

View File

@ -27,6 +27,7 @@
#include <proto/cli.h>
#include <proto/log.h>
#include <proto/ring.h>
#include <proto/signal.h>
#include <proto/sink.h>
#include <proto/stream_interface.h>
@ -57,7 +58,7 @@ static struct sink *__sink_new(const char *name, const char *desc, enum sink_fmt
if (sink)
goto end;
sink = malloc(sizeof(*sink));
sink = calloc(1, sizeof(*sink));
if (!sink)
goto end;
@ -316,6 +317,319 @@ static int cli_parse_show_events(char **args, char *payload, struct appctx *appc
return ring_attach_cli(sink->ctx.ring, appctx);
}
/* Pre-configures a ring proxy to emmit connections */
void sink_setup_proxy(struct proxy *px)
{
px->last_change = now.tv_sec;
px->cap = PR_CAP_FE | PR_CAP_BE;
px->maxconn = 0;
px->conn_retries = 1;
px->timeout.server = TICK_ETERNITY;
px->timeout.client = TICK_ETERNITY;
px->timeout.connect = TICK_ETERNITY;
px->accept = NULL;
px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
px->bind_proc = 0; /* will be filled by users */
}
/*
* IO Handler to handle message push to syslog tcp server
*/
static void sink_forward_io_handler(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si);
struct sink *sink = strm_fe(s)->parent;
struct sink_forward_target *sft = appctx->ctx.sft.ptr;
struct ring *ring = sink->ctx.ring;
struct buffer *buf = &ring->buf;
uint64_t msg_len;
size_t len, cnt, ofs;
int ret = 0;
/* if stopping was requested, close immediatly */
if (unlikely(stopping))
goto close;
/* for rex because it seems reset to timeout
* and we don't want expire on this case
* with a syslog server
*/
si_oc(si)->rex = TICK_ETERNITY;
/* rto should not change but it seems the case */
si_oc(si)->rto = TICK_ETERNITY;
/* an error was detected */
if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
goto close;
/* con closed by server side */
if ((si_oc(si)->flags & CF_SHUTW))
goto close;
/* if the connection is not established, inform the stream that we want
* to be notified whenever the connection completes.
*/
if (si_opposite(si)->state < SI_ST_EST) {
si_cant_get(si);
si_rx_conn_blk(si);
si_rx_endp_more(si);
return;
}
HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
if (appctx != sft->appctx) {
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
goto close;
}
ofs = sft->ofs;
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_DEL_INIT(&appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
* dropped events because we'd take a reference on the oldest message
* and keep it while being scheduled. Thus instead let's take it the
* first time we enter here so that we have a chance to pass many
* existing messages before grabbing a reference to a location. This
* value cannot be produced after initialization.
*/
if (unlikely(ofs == ~0)) {
ofs = 0;
HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
ofs += ring->ofs;
}
/* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter.
*/
ofs -= ring->ofs;
BUG_ON(ofs >= buf->size);
HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
/* in this loop, ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
* stop before the end (ret=0).
*/
if (si_opposite(si)->state == SI_ST_EST) {
ret = 1;
while (ofs + 1 < b_data(buf)) {
cnt = 1;
len = b_peek_varint(buf, ofs + cnt, &msg_len);
if (!len)
break;
cnt += len;
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
if (unlikely(msg_len + 1 > b_size(&trash))) {
/* too large a message to ever fit, let's skip it */
ofs += cnt + msg_len;
continue;
}
chunk_reset(&trash);
len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
trash.data += len;
trash.area[trash.data++] = '\n';
if (ci_putchk(si_ic(si), &trash) == -1) {
si_rx_room_blk(si);
ret = 0;
break;
}
ofs += cnt + msg_len;
}
HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
ofs += ring->ofs;
sft->ofs = ofs;
}
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
if (ret) {
/* let's be woken up once new data arrive */
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
si_rx_endp_done(si);
}
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
/* always drain data from server */
co_skip(si_oc(si), si_oc(si)->output);
return;
close:
si_shutw(si);
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
}
void __sink_forward_session_deinit(struct sink_forward_target *sft)
{
struct stream_interface *si;
struct stream *s;
struct sink *sink;
if (!sft->appctx)
return;
si = sft->appctx->owner;
if (!si)
return;
s = si_strm(si);
if (!s)
return;
sink = strm_fe(s)->parent;
if (!sink)
return;
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
LIST_DEL_INIT(&sft->appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
sft->appctx = NULL;
task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
}
static void sink_forward_session_release(struct appctx *appctx)
{
struct sink_forward_target *sft = appctx->ctx.peers.ptr;
if (!sft)
return;
HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
if (sft->appctx == appctx)
__sink_forward_session_deinit(sft);
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
}
static struct applet sink_forward_applet = {
.obj_type = OBJ_TYPE_APPLET,
.name = "<SINKFWD>", /* used for logging */
.fct = sink_forward_io_handler,
.release = sink_forward_session_release,
};
/*
* Create a new peer session in assigned state (connect will start automatically)
*/
static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
{
struct proxy *p = sink->forward_px;
struct appctx *appctx;
struct session *sess;
struct stream *s;
appctx = appctx_new(&sink_forward_applet, tid_bit);
if (!appctx)
goto out_close;
appctx->ctx.sft.ptr = (void *)sft;
sess = session_new(p, NULL, &appctx->obj_type);
if (!sess) {
ha_alert("out of memory in peer_session_create().\n");
goto out_free_appctx;
}
if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
ha_alert("Failed to initialize stream in peer_session_create().\n");
goto out_free_sess;
}
s->target = &sft->srv->obj_type;
if (!sockaddr_alloc(&s->target_addr))
goto out_free_strm;
*s->target_addr = sft->srv->addr;
s->flags = SF_ASSIGNED|SF_ADDR_SET;
s->si[1].flags |= SI_FL_NOLINGER;
s->do_log = NULL;
s->uniq_id = 0;
s->res.flags |= CF_READ_DONTWAIT;
/* for rto and rex to eternity to not expire on idle recv:
* We are using a syslog server.
*/
s->res.rto = TICK_ETERNITY;
s->res.rex = TICK_ETERNITY;
sft->appctx = appctx;
task_wakeup(s->task, TASK_WOKEN_INIT);
return appctx;
/* Error unrolling */
out_free_strm:
LIST_DEL(&s->list);
pool_free(pool_head_stream, s);
out_free_sess:
session_free(sess);
out_free_appctx:
appctx_free(appctx);
out_close:
return NULL;
}
/*
* Task to handle connctions to forward servers
*/
static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
{
struct sink *sink = (struct sink *)context;
struct sink_forward_target *sft = sink->sft;
task->expire = TICK_ETERNITY;
if (!stopping) {
while (sft) {
HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
/* if appctx is NULL, start a new session */
if (!sft->appctx)
sft->appctx = sink_forward_session_create(sink, sft);
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
sft = sft->next;
}
}
else {
while (sft) {
HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
/* awake applet to perform a clean close */
if (sft->appctx)
appctx_wakeup(sft->appctx);
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
sft = sft->next;
}
}
return task;
}
/*
* Init task to manage connctions to forward servers
*
* returns 0 in case of error.
*/
int sink_init_forward(struct sink *sink)
{
sink->forward_task = task_new(MAX_THREADS_MASK);
if (!sink->forward_task)
return 0;
sink->forward_task->process = process_sink_forward;
sink->forward_task->context = (void *)sink;
sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
return 1;
}
/*
* Parse "ring" section and create corresponding sink buffer.
*
@ -327,6 +641,7 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
int err_code = 0;
const char *inv;
size_t size = BUFSIZE;
struct proxy *p;
if (strcmp(args[0], "ring") == 0) { /* new peers section */
if (!*args[1]) {
@ -354,6 +669,22 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
/* allocate new proxy to handle forwards */
p = calloc(1, sizeof *p);
if (!p) {
ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
init_new_proxy(p);
sink_setup_proxy(p);
p->parent = cfg_sink;
p->id = strdup(args[1]);
p->conf.args.file = p->conf.file = strdup(file);
p->conf.args.line = p->conf.line = linenum;
cfg_sink->forward_px = p;
}
else if (strcmp(args[0], "size") == 0) {
size = atol(args[1]);
@ -370,6 +701,52 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
goto err;
}
}
else if (strcmp(args[0],"server") == 0) {
err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0);
}
else if (strcmp(args[0],"timeout") == 0) {
if (!cfg_sink || !cfg_sink->forward_px) {
ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
if (strcmp(args[1], "connect") == 0 ||
strcmp(args[1], "server") == 0) {
const char *res;
unsigned int tout;
if (!*args[2]) {
ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
file, linenum, args[0], args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
if (res == PARSE_TIME_OVER) {
ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
file, linenum, args[2], args[0], args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
else if (res == PARSE_TIME_UNDER) {
ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
file, linenum, args[2], args[0], args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
else if (res) {
ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
file, linenum, *res, args[0], args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
if (args[1][2] == 'c')
cfg_sink->forward_px->timeout.connect = tout;
else
cfg_sink->forward_px->timeout.server = tout;
}
}
else if (strcmp(args[0],"format") == 0) {
if (!cfg_sink) {
ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
@ -456,6 +833,7 @@ err:
int cfg_post_parse_ring()
{
int err_code = 0;
struct server *srv;
if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
@ -464,8 +842,44 @@ int cfg_post_parse_ring()
cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
err_code |= ERR_ALERT;
}
}
/* prepare forward server descriptors */
if (cfg_sink->forward_px) {
srv = cfg_sink->forward_px->srv;
while (srv) {
struct sink_forward_target *sft;
/* init ssl if needed */
if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
err_code |= ERR_ALERT | ERR_FATAL;
}
}
/* allocate sink_forward_target descriptor */
sft = calloc(1, sizeof(*sft));
if (!sft) {
ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
err_code |= ERR_ALERT | ERR_FATAL;
break;
}
sft->srv = srv;
sft->appctx = NULL;
sft->ofs = ~0; /* init ring offset */
sft->next = cfg_sink->sft;
HA_SPIN_INIT(&sft->lock);
/* mark server attached to the ring */
if (!ring_attach(cfg_sink->ctx.ring)) {
ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
err_code |= ERR_ALERT | ERR_FATAL;
}
cfg_sink->sft = sft;
srv = srv->next;
}
sink_init_forward(cfg_sink);
}
}
cfg_sink = NULL;
return err_code;