/* * Event sink management * * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation, version 2.1 * exclusively. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include #include #include #include #include #include #include #include struct list sink_list = LIST_HEAD_INIT(sink_list); struct sink *cfg_sink; struct sink *sink_find(const char *name) { struct sink *sink; list_for_each_entry(sink, &sink_list, sink_list) if (strcmp(sink->name, name) == 0) return sink; return NULL; } /* creates a new sink and adds it to the list, it's still generic and not fully * initialized. Returns NULL on allocation failure. If another one already * exists with the same name, it will be returned. The caller can detect it as * a newly created one has type SINK_TYPE_NEW. */ static struct sink *__sink_new(const char *name, const char *desc, int fmt) { struct sink *sink; sink = sink_find(name); if (sink) goto end; sink = calloc(1, sizeof(*sink)); if (!sink) goto end; sink->name = strdup(name); if (!sink->name) goto err; sink->desc = strdup(desc); if (!sink->desc) goto err; sink->fmt = fmt; sink->type = SINK_TYPE_NEW; sink->maxlen = BUFSIZE; /* address will be filled by the caller if needed */ sink->ctx.fd = -1; sink->ctx.dropped = 0; HA_RWLOCK_INIT(&sink->ctx.lock); LIST_ADDQ(&sink_list, &sink->sink_list); end: return sink; err: free(sink->name); sink->name = NULL; free(sink->desc); sink->desc = NULL; free(sink); sink = NULL; return NULL; } /* creates a sink called of type FD associated to fd , format , * and description . Returns NULL on allocation failure or conflict. * Perfect duplicates are merged (same type, fd, and name). */ struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd) { struct sink *sink; sink = __sink_new(name, desc, fmt); if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd)) goto end; if (sink->type != SINK_TYPE_NEW) { sink = NULL; goto end; } sink->type = SINK_TYPE_FD; sink->ctx.fd = fd; end: return sink; } /* creates a sink called of type BUF of size , format , * and description . Returns NULL on allocation failure or conflict. * Perfect duplicates are merged (same type and name). If sizes differ, the * largest one is kept. */ struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size) { struct sink *sink; sink = __sink_new(name, desc, fmt); if (!sink) goto fail; if (sink->type == SINK_TYPE_BUFFER) { /* such a buffer already exists, we may have to resize it */ if (!ring_resize(sink->ctx.ring, size)) goto fail; goto end; } if (sink->type != SINK_TYPE_NEW) { /* already exists of another type */ goto fail; } sink->ctx.ring = ring_new(size); if (!sink->ctx.ring) { LIST_DEL(&sink->sink_list); free(sink->name); free(sink->desc); free(sink); goto fail; } sink->type = SINK_TYPE_BUFFER; end: return sink; fail: return NULL; } /* tries to send message parts (up to 8, ignored above) from message * array to sink . Formatting according to the sink's preference is * done here. Lost messages are NOT accounted for. It is preferable to call * sink_write() instead which will also try to emit the number of dropped * messages when there are any. It returns >0 if it could write anything, * <=0 otherwise. */ ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg, int level, int facility, struct ist *metadata) { struct ist *pfx = NULL; size_t npfx = 0; if (sink->fmt == LOG_FORMAT_RAW) goto send; pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx); send: if (sink->type == SINK_TYPE_FD) { return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1); } else if (sink->type == SINK_TYPE_BUFFER) { return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg); } return 0; } /* Tries to emit a message indicating the number of dropped events. In case of * success, the amount of drops is reduced by as much. It's supposed to be * called under an exclusive lock on the sink to avoid multiple produces doing * the same. On success, >0 is returned, otherwise <=0 on failure. */ int sink_announce_dropped(struct sink *sink, int facility) { static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS]; static THREAD_LOCAL pid_t curr_pid; static THREAD_LOCAL char pidstr[16]; unsigned int dropped; struct buffer msg; struct ist msgvec[1]; char logbuf[64]; while (unlikely((dropped = sink->ctx.dropped) > 0)) { chunk_init(&msg, logbuf, sizeof(logbuf)); chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : ""); msgvec[0] = ist2(msg.area, msg.data); if (!metadata[LOG_META_HOST].len) { if (global.log_send_hostname) metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname)); } if (!metadata[LOG_META_TAG].len) metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data); if (unlikely(curr_pid != getpid())) metadata[LOG_META_PID].len = 0; if (!metadata[LOG_META_PID].len) { curr_pid = getpid(); ltoa_o(curr_pid, pidstr, sizeof(pidstr)); metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr)); } if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0) return 0; /* success! */ HA_ATOMIC_SUB(&sink->ctx.dropped, dropped); } return 1; } /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */ static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private) { struct sink *sink; int arg; args++; // make args[1] the 1st arg if (!*args[1]) { /* no arg => report the list of supported sink */ chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n"); list_for_each_entry(sink, &sink_list, sink_list) { chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n", sink->name, sink->type == SINK_TYPE_NEW ? "init" : sink->type == SINK_TYPE_FD ? "fd" : sink->type == SINK_TYPE_BUFFER ? "buffer" : "?", sink->ctx.dropped, sink->desc); } trash.area[trash.data] = 0; return cli_msg(appctx, LOG_WARNING, trash.area); } if (!cli_has_level(appctx, ACCESS_LVL_OPER)) return 1; sink = sink_find(args[1]); if (!sink) return cli_err(appctx, "No such event sink"); if (sink->type != SINK_TYPE_BUFFER) return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink"); for (arg = 2; *args[arg]; arg++) { if (strcmp(args[arg], "-w") == 0) appctx->ctx.cli.i0 |= 1; // wait mode else if (strcmp(args[arg], "-n") == 0) appctx->ctx.cli.i0 |= 2; // seek to new else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0) appctx->ctx.cli.i0 |= 3; // seek to new + wait else return cli_err(appctx, "unknown option"); } return ring_attach_cli(sink->ctx.ring, appctx); } /* Pre-configures a ring proxy to emit connections */ void sink_setup_proxy(struct proxy *px) { px->last_change = now.tv_sec; px->cap = PR_CAP_FE | PR_CAP_BE; px->maxconn = 0; px->conn_retries = 1; px->timeout.server = TICK_ETERNITY; px->timeout.client = TICK_ETERNITY; px->timeout.connect = TICK_ETERNITY; px->accept = NULL; px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC; px->bind_proc = 0; /* will be filled by users */ } /* * IO Handler to handle message push to syslog tcp server */ static void sink_forward_io_handler(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct stream *s = si_strm(si); struct sink *sink = strm_fe(s)->parent; struct sink_forward_target *sft = appctx->ctx.sft.ptr; struct ring *ring = sink->ctx.ring; struct buffer *buf = &ring->buf; uint64_t msg_len; size_t len, cnt, ofs; int ret = 0; /* if stopping was requested, close immediately */ if (unlikely(stopping)) goto close; /* for rex because it seems reset to timeout * and we don't want expire on this case * with a syslog server */ si_oc(si)->rex = TICK_ETERNITY; /* rto should not change but it seems the case */ si_oc(si)->rto = TICK_ETERNITY; /* an error was detected */ if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) goto close; /* con closed by server side */ if ((si_oc(si)->flags & CF_SHUTW)) goto close; /* if the connection is not established, inform the stream that we want * to be notified whenever the connection completes. */ if (si_opposite(si)->state < SI_ST_EST) { si_cant_get(si); si_rx_conn_blk(si); si_rx_endp_more(si); return; } HA_SPIN_LOCK(SFT_LOCK, &sft->lock); if (appctx != sft->appctx) { HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); goto close; } ofs = sft->ofs; HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); LIST_DEL_INIT(&appctx->wait_entry); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in * dropped events because we'd take a reference on the oldest message * and keep it while being scheduled. Thus instead let's take it the * first time we enter here so that we have a chance to pass many * existing messages before grabbing a reference to a location. This * value cannot be produced after initialization. */ if (unlikely(ofs == ~0)) { ofs = 0; HA_ATOMIC_ADD(b_peek(buf, ofs), 1); ofs += ring->ofs; } /* in this loop, ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to * stop before the end (ret=0). */ if (si_opposite(si)->state == SI_ST_EST) { /* we were already there, adjust the offset to be relative to * the buffer's head and remove us from the counter. */ ofs -= ring->ofs; BUG_ON(ofs >= buf->size); HA_ATOMIC_SUB(b_peek(buf, ofs), 1); ret = 1; while (ofs + 1 < b_data(buf)) { cnt = 1; len = b_peek_varint(buf, ofs + cnt, &msg_len); if (!len) break; cnt += len; BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); if (unlikely(msg_len + 1 > b_size(&trash))) { /* too large a message to ever fit, let's skip it */ ofs += cnt + msg_len; continue; } chunk_reset(&trash); len = b_getblk(buf, trash.area, msg_len, ofs + cnt); trash.data += len; trash.area[trash.data++] = '\n'; if (ci_putchk(si_ic(si), &trash) == -1) { si_rx_room_blk(si); ret = 0; break; } ofs += cnt + msg_len; } HA_ATOMIC_ADD(b_peek(buf, ofs), 1); ofs += ring->ofs; sft->ofs = ofs; } HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); if (ret) { /* let's be woken up once new data arrive */ HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); LIST_ADDQ(&ring->waiters, &appctx->wait_entry); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); si_rx_endp_done(si); } HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); /* always drain data from server */ co_skip(si_oc(si), si_oc(si)->output); return; close: si_shutw(si); si_shutr(si); si_ic(si)->flags |= CF_READ_NULL; } /* * IO Handler to handle message push to syslog tcp server * using octet counting frames */ static void sink_forward_oc_io_handler(struct appctx *appctx) { struct stream_interface *si = appctx->owner; struct stream *s = si_strm(si); struct sink *sink = strm_fe(s)->parent; struct sink_forward_target *sft = appctx->ctx.sft.ptr; struct ring *ring = sink->ctx.ring; struct buffer *buf = &ring->buf; uint64_t msg_len; size_t len, cnt, ofs; int ret = 0; char *p; /* if stopping was requested, close immediately */ if (unlikely(stopping)) goto close; /* for rex because it seems reset to timeout * and we don't want expire on this case * with a syslog server */ si_oc(si)->rex = TICK_ETERNITY; /* rto should not change but it seems the case */ si_oc(si)->rto = TICK_ETERNITY; /* an error was detected */ if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) goto close; /* con closed by server side */ if ((si_oc(si)->flags & CF_SHUTW)) goto close; /* if the connection is not established, inform the stream that we want * to be notified whenever the connection completes. */ if (si_opposite(si)->state < SI_ST_EST) { si_cant_get(si); si_rx_conn_blk(si); si_rx_endp_more(si); return; } HA_SPIN_LOCK(SFT_LOCK, &sft->lock); if (appctx != sft->appctx) { HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); goto close; } ofs = sft->ofs; HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); LIST_DEL_INIT(&appctx->wait_entry); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); /* explanation for the initialization below: it would be better to do * this in the parsing function but this would occasionally result in * dropped events because we'd take a reference on the oldest message * and keep it while being scheduled. Thus instead let's take it the * first time we enter here so that we have a chance to pass many * existing messages before grabbing a reference to a location. This * value cannot be produced after initialization. */ if (unlikely(ofs == ~0)) { ofs = 0; HA_ATOMIC_ADD(b_peek(buf, ofs), 1); ofs += ring->ofs; } /* in this loop, ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to * stop before the end (ret=0). */ if (si_opposite(si)->state == SI_ST_EST) { /* we were already there, adjust the offset to be relative to * the buffer's head and remove us from the counter. */ ofs -= ring->ofs; BUG_ON(ofs >= buf->size); HA_ATOMIC_SUB(b_peek(buf, ofs), 1); ret = 1; while (ofs + 1 < b_data(buf)) { cnt = 1; len = b_peek_varint(buf, ofs + cnt, &msg_len); if (!len) break; cnt += len; BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); chunk_reset(&trash); p = ulltoa(msg_len, trash.area, b_size(&trash)); if (p) { trash.data = (p - trash.area) + 1; *p = ' '; } if (!p || (trash.data + msg_len > b_size(&trash))) { /* too large a message to ever fit, let's skip it */ ofs += cnt + msg_len; continue; } trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt); if (ci_putchk(si_ic(si), &trash) == -1) { si_rx_room_blk(si); ret = 0; break; } ofs += cnt + msg_len; } HA_ATOMIC_ADD(b_peek(buf, ofs), 1); ofs += ring->ofs; sft->ofs = ofs; } HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); if (ret) { /* let's be woken up once new data arrive */ HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); LIST_ADDQ(&ring->waiters, &appctx->wait_entry); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); si_rx_endp_done(si); } HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); /* always drain data from server */ co_skip(si_oc(si), si_oc(si)->output); return; close: si_shutw(si); si_shutr(si); si_ic(si)->flags |= CF_READ_NULL; } void __sink_forward_session_deinit(struct sink_forward_target *sft) { struct stream_interface *si; struct stream *s; struct sink *sink; if (!sft->appctx) return; si = sft->appctx->owner; if (!si) return; s = si_strm(si); if (!s) return; sink = strm_fe(s)->parent; if (!sink) return; HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock); LIST_DEL_INIT(&sft->appctx->wait_entry); HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock); sft->appctx = NULL; task_wakeup(sink->forward_task, TASK_WOKEN_MSG); } static void sink_forward_session_release(struct appctx *appctx) { struct sink_forward_target *sft = appctx->ctx.peers.ptr; if (!sft) return; HA_SPIN_LOCK(SFT_LOCK, &sft->lock); if (sft->appctx == appctx) __sink_forward_session_deinit(sft); HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); } static struct applet sink_forward_applet = { .obj_type = OBJ_TYPE_APPLET, .name = "", /* used for logging */ .fct = sink_forward_io_handler, .release = sink_forward_session_release, }; static struct applet sink_forward_oc_applet = { .obj_type = OBJ_TYPE_APPLET, .name = "", /* used for logging */ .fct = sink_forward_oc_io_handler, .release = sink_forward_session_release, }; /* * Create a new peer session in assigned state (connect will start automatically) */ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft) { struct proxy *p = sink->forward_px; struct appctx *appctx; struct session *sess; struct stream *s; struct applet *applet = &sink_forward_applet; if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) applet = &sink_forward_oc_applet; appctx = appctx_new(applet, tid_bit); if (!appctx) goto out_close; appctx->ctx.sft.ptr = (void *)sft; sess = session_new(p, NULL, &appctx->obj_type); if (!sess) { ha_alert("out of memory in peer_session_create().\n"); goto out_free_appctx; } if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) { ha_alert("Failed to initialize stream in peer_session_create().\n"); goto out_free_sess; } s->target = &sft->srv->obj_type; if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr))) goto out_free_strm; s->flags = SF_ASSIGNED|SF_ADDR_SET; s->si[1].flags |= SI_FL_NOLINGER; s->do_log = NULL; s->uniq_id = 0; s->res.flags |= CF_READ_DONTWAIT; /* for rto and rex to eternity to not expire on idle recv: * We are using a syslog server. */ s->res.rto = TICK_ETERNITY; s->res.rex = TICK_ETERNITY; sft->appctx = appctx; task_wakeup(s->task, TASK_WOKEN_INIT); return appctx; /* Error unrolling */ out_free_strm: LIST_DEL(&s->list); pool_free(pool_head_stream, s); out_free_sess: session_free(sess); out_free_appctx: appctx_free(appctx); out_close: return NULL; } /* * Task to handle connctions to forward servers */ static struct task *process_sink_forward(struct task * task, void *context, unsigned short state) { struct sink *sink = (struct sink *)context; struct sink_forward_target *sft = sink->sft; task->expire = TICK_ETERNITY; if (!stopping) { while (sft) { HA_SPIN_LOCK(SFT_LOCK, &sft->lock); /* if appctx is NULL, start a new session */ if (!sft->appctx) sft->appctx = sink_forward_session_create(sink, sft); HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); sft = sft->next; } } else { while (sft) { HA_SPIN_LOCK(SFT_LOCK, &sft->lock); /* awake applet to perform a clean close */ if (sft->appctx) appctx_wakeup(sft->appctx); HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); sft = sft->next; } } return task; } /* * Init task to manage connctions to forward servers * * returns 0 in case of error. */ int sink_init_forward(struct sink *sink) { sink->forward_task = task_new(MAX_THREADS_MASK); if (!sink->forward_task) return 0; sink->forward_task->process = process_sink_forward; sink->forward_task->context = (void *)sink; sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0); task_wakeup(sink->forward_task, TASK_WOKEN_INIT); return 1; } /* * Parse "ring" section and create corresponding sink buffer. * * The function returns 0 in success case, otherwise, it returns error * flags. */ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) { int err_code = 0; const char *inv; size_t size = BUFSIZE; struct proxy *p; if (strcmp(args[0], "ring") == 0) { /* new peers section */ if (!*args[1]) { ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto err; } inv = invalid_char(args[1]); if (inv) { ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv); err_code |= ERR_ALERT | ERR_FATAL; goto err; } if (sink_find(args[1])) { ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size); if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) { ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } /* allocate new proxy to handle forwards */ p = calloc(1, sizeof *p); if (!p) { ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum); err_code |= ERR_ALERT | ERR_FATAL; goto err; } init_new_proxy(p); sink_setup_proxy(p); p->parent = cfg_sink; p->id = strdup(args[1]); p->conf.args.file = p->conf.file = strdup(file); p->conf.args.line = p->conf.line = linenum; cfg_sink->forward_px = p; } else if (strcmp(args[0], "size") == 0) { size = atol(args[1]); if (!size) { ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER) || !ring_resize(cfg_sink->ctx.ring, size)) { ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } } else if (strcmp(args[0],"server") == 0) { err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0, 1); } else if (strcmp(args[0],"timeout") == 0) { if (!cfg_sink || !cfg_sink->forward_px) { ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]); err_code |= ERR_ALERT | ERR_FATAL; goto err; } if (strcmp(args[1], "connect") == 0 || strcmp(args[1], "server") == 0) { const char *res; unsigned int tout; if (!*args[2]) { ha_alert("parsing [%s:%d] : '%s %s' expects