diff --git a/doc/configuration.txt b/doc/configuration.txt index 8a67f4d12..15f966858 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -2596,10 +2596,38 @@ maxlen including formatted header. If an event message is longer than , it will be truncated to this length. +server
[param*] + Used to configure a syslog tcp server to forward messages from ring buffer. + This supports for all "server" parameters found in 5.2 paragraph. Some of + these parameters are irrelevant for "ring" sections. Important point: there + is little reason to add more than one server to a ring, because all servers + will receive the exact same copy of the ring contents, and as such the ring + will progress at the speed of the slowest server. If one server does not + respond, it will prevent old messages from being purged and may block new + messages from being inserted into the ring. The proper way to send messages + to multiple servers is to use one distinct ring per log server, not to + attach multiple servers to the same ring. + size This is the optional size in bytes for the ring-buffer. Default value is set to BUFSIZE. +timeout connect + Set the maximum time to wait for a connection attempt to a server to succeed. + + Arguments : + is the timeout value specified in milliseconds by default, but + can be in any other unit if the number is suffixed by the unit, + as explained at the top of this document. + +timeout server + Set the maximum time for pending data staying into output buffer. + + Arguments : + is the timeout value specified in milliseconds by default, but + can be in any other unit if the number is suffixed by the unit, + as explained at the top of this document. + Example: global log ring@myring local7 @@ -2609,6 +2637,9 @@ size format rfc3164 maxlen 1200 size 32764 + timeout connect 5s + timeout server 10s + server mysyslogsrv 127.0.0.1:6514 4. Proxies diff --git a/include/common/hathreads.h b/include/common/hathreads.h index ae1009a9f..45ec1d2e0 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -610,6 +610,7 @@ enum lock_label { PROTO_LOCK, CKCH_LOCK, SNI_LOCK, + SFT_LOCK, /* sink forward target */ OTHER_LOCK, LOCK_LABELS }; @@ -727,6 +728,7 @@ static inline const char *lock_label(enum lock_label label) case PROTO_LOCK: return "PROTO"; case CKCH_LOCK: return "CKCH"; case SNI_LOCK: return "SNI"; + case SFT_LOCK: return "SFT"; case OTHER_LOCK: return "OTHER"; case LOCK_LABELS: break; /* keep compiler happy */ }; diff --git a/include/types/applet.h b/include/types/applet.h index 013c5023d..a4e22e336 100644 --- a/include/types/applet.h +++ b/include/types/applet.h @@ -178,6 +178,10 @@ struct appctx { struct ckch_store *new_ckchs; struct ckch_inst *next_ckchi; } ssl; + struct { + void *ptr; + } sft; /* sink forward target */ + /* NOTE: please add regular applet contexts (ie: not * CLI-specific ones) above, before "cli". */ diff --git a/include/types/sink.h b/include/types/sink.h index ef1109679..029c20dfe 100644 --- a/include/types/sink.h +++ b/include/types/sink.h @@ -49,6 +49,14 @@ enum sink_fmt { SINK_FMT_RFC5424, // extended syslog }; +struct sink_forward_target { + struct server *srv; // used server + struct appctx *appctx; // appctx of current session + size_t ofs; // ring buffer reader offset + __decl_hathreads(HA_SPINLOCK_T lock); // lock to protect current struct + struct sink_forward_target *next; +}; + /* describes the configuration and current state of an event sink */ struct sink { struct list sink_list; // position in the sink list @@ -57,6 +65,10 @@ struct sink { enum sink_fmt fmt; // format expected by the sink enum sink_type type; // type of storage uint32_t maxlen; // max message length (truncated above) + struct proxy* forward_px; // proxy used to forward + struct sink_forward_target *sft; // sink forward targets + struct task *forward_task; // task to handle forward targets conns + struct sig_handler *forward_sighandler; /* signal handler */ struct { __decl_hathreads(HA_RWLOCK_T lock); // shared/excl for dropped struct ring *ring; // used by ring buffer and STRM sender diff --git a/src/sink.c b/src/sink.c index 50f035263..e7a0c0200 100644 --- a/src/sink.c +++ b/src/sink.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -57,7 +58,7 @@ static struct sink *__sink_new(const char *name, const char *desc, enum sink_fmt if (sink) goto end; - sink = malloc(sizeof(*sink)); + sink = calloc(1, sizeof(*sink)); if (!sink) goto end; @@ -316,6 +317,319 @@ static int cli_parse_show_events(char **args, char *payload, struct appctx *appc return ring_attach_cli(sink->ctx.ring, appctx); } +/* Pre-configures a ring proxy to emmit connections */ +void sink_setup_proxy(struct proxy *px) +{ + px->last_change = now.tv_sec; + px->cap = PR_CAP_FE | PR_CAP_BE; + px->maxconn = 0; + px->conn_retries = 1; + px->timeout.server = TICK_ETERNITY; + px->timeout.client = TICK_ETERNITY; + px->timeout.connect = TICK_ETERNITY; + px->accept = NULL; + px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC; + px->bind_proc = 0; /* will be filled by users */ +} + +/* + * IO Handler to handle message push to syslog tcp server + */ +static void sink_forward_io_handler(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct stream *s = si_strm(si); + struct sink *sink = strm_fe(s)->parent; + struct sink_forward_target *sft = appctx->ctx.sft.ptr; + struct ring *ring = sink->ctx.ring; + struct buffer *buf = &ring->buf; + uint64_t msg_len; + size_t len, cnt, ofs; + int ret = 0; + + /* if stopping was requested, close immediatly */ + if (unlikely(stopping)) + goto close; + + /* for rex because it seems reset to timeout + * and we don't want expire on this case + * with a syslog server + */ + si_oc(si)->rex = TICK_ETERNITY; + /* rto should not change but it seems the case */ + si_oc(si)->rto = TICK_ETERNITY; + + /* an error was detected */ + if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) + goto close; + + /* con closed by server side */ + if ((si_oc(si)->flags & CF_SHUTW)) + goto close; + + /* if the connection is not established, inform the stream that we want + * to be notified whenever the connection completes. + */ + if (si_opposite(si)->state < SI_ST_EST) { + si_cant_get(si); + si_rx_conn_blk(si); + si_rx_endp_more(si); + return; + } + + HA_SPIN_LOCK(SFT_LOCK, &sft->lock); + if (appctx != sft->appctx) { + HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); + goto close; + } + ofs = sft->ofs; + + HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); + LIST_DEL_INIT(&appctx->wait_entry); + HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); + + HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); + + /* explanation for the initialization below: it would be better to do + * this in the parsing function but this would occasionally result in + * dropped events because we'd take a reference on the oldest message + * and keep it while being scheduled. Thus instead let's take it the + * first time we enter here so that we have a chance to pass many + * existing messages before grabbing a reference to a location. This + * value cannot be produced after initialization. + */ + if (unlikely(ofs == ~0)) { + ofs = 0; + + HA_ATOMIC_ADD(b_peek(buf, ofs), 1); + ofs += ring->ofs; + } + + /* we were already there, adjust the offset to be relative to + * the buffer's head and remove us from the counter. + */ + ofs -= ring->ofs; + BUG_ON(ofs >= buf->size); + HA_ATOMIC_SUB(b_peek(buf, ofs), 1); + + /* in this loop, ofs always points to the counter byte that precedes + * the message so that we can take our reference there if we have to + * stop before the end (ret=0). + */ + if (si_opposite(si)->state == SI_ST_EST) { + ret = 1; + while (ofs + 1 < b_data(buf)) { + cnt = 1; + len = b_peek_varint(buf, ofs + cnt, &msg_len); + if (!len) + break; + cnt += len; + BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); + + if (unlikely(msg_len + 1 > b_size(&trash))) { + /* too large a message to ever fit, let's skip it */ + ofs += cnt + msg_len; + continue; + } + + chunk_reset(&trash); + len = b_getblk(buf, trash.area, msg_len, ofs + cnt); + trash.data += len; + trash.area[trash.data++] = '\n'; + + if (ci_putchk(si_ic(si), &trash) == -1) { + si_rx_room_blk(si); + ret = 0; + break; + } + ofs += cnt + msg_len; + } + + HA_ATOMIC_ADD(b_peek(buf, ofs), 1); + ofs += ring->ofs; + sft->ofs = ofs; + } + HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); + + if (ret) { + /* let's be woken up once new data arrive */ + HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); + LIST_ADDQ(&ring->waiters, &appctx->wait_entry); + HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); + si_rx_endp_done(si); + } + HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); + + /* always drain data from server */ + co_skip(si_oc(si), si_oc(si)->output); + return; + +close: + si_shutw(si); + si_shutr(si); + si_ic(si)->flags |= CF_READ_NULL; +} + +void __sink_forward_session_deinit(struct sink_forward_target *sft) +{ + struct stream_interface *si; + struct stream *s; + struct sink *sink; + + if (!sft->appctx) + return; + + si = sft->appctx->owner; + if (!si) + return; + + s = si_strm(si); + if (!s) + return; + + sink = strm_fe(s)->parent; + if (!sink) + return; + + HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock); + LIST_DEL_INIT(&sft->appctx->wait_entry); + HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock); + + sft->appctx = NULL; + task_wakeup(sink->forward_task, TASK_WOKEN_MSG); +} + + +static void sink_forward_session_release(struct appctx *appctx) +{ + struct sink_forward_target *sft = appctx->ctx.peers.ptr; + + if (!sft) + return; + + HA_SPIN_LOCK(SFT_LOCK, &sft->lock); + if (sft->appctx == appctx) + __sink_forward_session_deinit(sft); + HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); +} + +static struct applet sink_forward_applet = { + .obj_type = OBJ_TYPE_APPLET, + .name = "", /* used for logging */ + .fct = sink_forward_io_handler, + .release = sink_forward_session_release, +}; + +/* + * Create a new peer session in assigned state (connect will start automatically) + */ +static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft) +{ + struct proxy *p = sink->forward_px; + struct appctx *appctx; + struct session *sess; + struct stream *s; + + appctx = appctx_new(&sink_forward_applet, tid_bit); + if (!appctx) + goto out_close; + + appctx->ctx.sft.ptr = (void *)sft; + + sess = session_new(p, NULL, &appctx->obj_type); + if (!sess) { + ha_alert("out of memory in peer_session_create().\n"); + goto out_free_appctx; + } + + if ((s = stream_new(sess, &appctx->obj_type)) == NULL) { + ha_alert("Failed to initialize stream in peer_session_create().\n"); + goto out_free_sess; + } + + + s->target = &sft->srv->obj_type; + if (!sockaddr_alloc(&s->target_addr)) + goto out_free_strm; + *s->target_addr = sft->srv->addr; + s->flags = SF_ASSIGNED|SF_ADDR_SET; + s->si[1].flags |= SI_FL_NOLINGER; + + s->do_log = NULL; + s->uniq_id = 0; + + s->res.flags |= CF_READ_DONTWAIT; + /* for rto and rex to eternity to not expire on idle recv: + * We are using a syslog server. + */ + s->res.rto = TICK_ETERNITY; + s->res.rex = TICK_ETERNITY; + sft->appctx = appctx; + task_wakeup(s->task, TASK_WOKEN_INIT); + return appctx; + + /* Error unrolling */ + out_free_strm: + LIST_DEL(&s->list); + pool_free(pool_head_stream, s); + out_free_sess: + session_free(sess); + out_free_appctx: + appctx_free(appctx); + out_close: + return NULL; +} + +/* + * Task to handle connctions to forward servers + */ +static struct task *process_sink_forward(struct task * task, void *context, unsigned short state) +{ + struct sink *sink = (struct sink *)context; + struct sink_forward_target *sft = sink->sft; + + task->expire = TICK_ETERNITY; + + if (!stopping) { + while (sft) { + HA_SPIN_LOCK(SFT_LOCK, &sft->lock); + /* if appctx is NULL, start a new session */ + if (!sft->appctx) + sft->appctx = sink_forward_session_create(sink, sft); + HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); + sft = sft->next; + } + } + else { + while (sft) { + HA_SPIN_LOCK(SFT_LOCK, &sft->lock); + /* awake applet to perform a clean close */ + if (sft->appctx) + appctx_wakeup(sft->appctx); + HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); + sft = sft->next; + } + } + + return task; +} +/* + * Init task to manage connctions to forward servers + * + * returns 0 in case of error. + */ +int sink_init_forward(struct sink *sink) +{ + sink->forward_task = task_new(MAX_THREADS_MASK); + if (!sink->forward_task) + return 0; + + sink->forward_task->process = process_sink_forward; + sink->forward_task->context = (void *)sink; + sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0); + task_wakeup(sink->forward_task, TASK_WOKEN_INIT); + return 1; +} /* * Parse "ring" section and create corresponding sink buffer. * @@ -327,6 +641,7 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) int err_code = 0; const char *inv; size_t size = BUFSIZE; + struct proxy *p; if (strcmp(args[0], "ring") == 0) { /* new peers section */ if (!*args[1]) { @@ -354,6 +669,22 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) err_code |= ERR_ALERT | ERR_FATAL; goto err; } + + /* allocate new proxy to handle forwards */ + p = calloc(1, sizeof *p); + if (!p) { + ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum); + err_code |= ERR_ALERT | ERR_FATAL; + goto err; + } + + init_new_proxy(p); + sink_setup_proxy(p); + p->parent = cfg_sink; + p->id = strdup(args[1]); + p->conf.args.file = p->conf.file = strdup(file); + p->conf.args.line = p->conf.line = linenum; + cfg_sink->forward_px = p; } else if (strcmp(args[0], "size") == 0) { size = atol(args[1]); @@ -370,6 +701,52 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) goto err; } } + else if (strcmp(args[0],"server") == 0) { + err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0); + } + else if (strcmp(args[0],"timeout") == 0) { + if (!cfg_sink || !cfg_sink->forward_px) { + ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]); + err_code |= ERR_ALERT | ERR_FATAL; + goto err; + } + + if (strcmp(args[1], "connect") == 0 || + strcmp(args[1], "server") == 0) { + const char *res; + unsigned int tout; + + if (!*args[2]) { + ha_alert("parsing [%s:%d] : '%s %s' expects