MEDIUM: ring: add new srv statement to support octet counting forward

log-proto <logproto>
  The "log-proto" specifies the protocol used to forward event messages to
  a server configured in a ring section. Possible values are "legacy"
  and "octet-count" corresponding respectively to "Non-transparent-framing"
  and "Octet counting" in rfc6587. "legacy" is the default.

Notes: a separated io_handler was created to avoid per messages test
and to prepare code to set different log protocols such as
request- response based ones.
This commit is contained in:
Emeric Brun 2020-05-30 01:42:45 +02:00 committed by Willy Tarreau
parent 494c505703
commit 975564784f
4 changed files with 186 additions and 3 deletions

View File

@ -2606,7 +2606,8 @@ server <name> <address> [param*]
respond, it will prevent old messages from being purged and may block new
messages from being inserted into the ring. The proper way to send messages
to multiple servers is to use one distinct ring per log server, not to
attach multiple servers to the same ring.
attach multiple servers to the same ring. Note that specific server directive
"log-proto" is used to set the protocol used to send messages.
size <size>
This is the optional size in bytes for the ring-buffer. Default value is
@ -2639,7 +2640,7 @@ timeout server <timeout>
size 32764
timeout connect 5s
timeout server 10s
server mysyslogsrv 127.0.0.1:6514
server mysyslogsrv 127.0.0.1:6514 log-proto octet-count
4. Proxies
@ -13080,6 +13081,12 @@ downinter <delay>
global "spread-checks" keyword. This makes sense for instance when a lot
of backends use the same servers.
log-proto <logproto>
The "log-proto" specifies the protocol used to forward event messages to
a server configured in a ring section. Possible values are "legacy"
and "octet-count" corresponding respectively to "Non-transparent-framing"
and "Octet counting" in rfc6587. "legacy" is the default.
maxconn <maxconn>
The "maxconn" parameter specifies the maximal number of concurrent
connections that will be sent to this server. If the number of incoming

View File

@ -177,6 +177,12 @@ enum srv_initaddr {
#define SRV_SSL_O_EARLY_DATA 0x400 /* Allow using early data */
#endif
/* log servers ring's protocols options */
enum srv_log_proto {
SRV_LOG_PROTO_LEGACY, // messages on TCP separated by LF
SRV_LOG_PROTO_OCTET_COUNTING, // TCP frames: MSGLEN SP MSG
};
/* The server names dictionary */
extern struct dict server_name_dict;
@ -291,6 +297,7 @@ struct server {
char *hostname; /* server hostname */
struct sockaddr_storage init_addr; /* plain IP address specified on the init-addr line */
unsigned int init_addr_methods; /* initial address setting, 3-bit per method, ends at 0, enough to store 10 entries */
enum srv_log_proto log_proto; /* used proto to emmit messages on server lines from ring section */
#ifdef USE_OPENSSL
char *sni_expr; /* Temporary variable to store a sample expression for SNI */

View File

@ -2275,6 +2275,20 @@ int parse_server(const char *file, int linenum, char **args, struct proxy *curpr
newsrv->uweight = newsrv->iweight = w;
cur_arg += 2;
}
else if (!strcmp(args[cur_arg], "log-proto")) {
if (!strcmp(args[cur_arg + 1], "legacy"))
newsrv->log_proto = SRV_LOG_PROTO_LEGACY;
else if (!strcmp(args[cur_arg + 1], "octet-count"))
newsrv->log_proto = SRV_LOG_PROTO_OCTET_COUNTING;
else {
ha_alert("parsing [%s:%d]: '%s' expects one of 'legacy' or "
"'octet-count' but got '%s'\n",
file, linenum, args[cur_arg], args[cur_arg + 1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
}
cur_arg += 2;
}
else if (!strcmp(args[cur_arg], "minconn")) {
newsrv->minconn = atol(args[cur_arg + 1]);
cur_arg += 2;

View File

@ -470,6 +470,150 @@ close:
si_ic(si)->flags |= CF_READ_NULL;
}
/*
* IO Handler to handle message push to syslog tcp server
* using octet counting frames
*/
static void sink_forward_oc_io_handler(struct appctx *appctx)
{
struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si);
struct sink *sink = strm_fe(s)->parent;
struct sink_forward_target *sft = appctx->ctx.sft.ptr;
struct ring *ring = sink->ctx.ring;
struct buffer *buf = &ring->buf;
uint64_t msg_len;
size_t len, cnt, ofs;
int ret = 0;
char *p;
/* if stopping was requested, close immediatly */
if (unlikely(stopping))
goto close;
/* for rex because it seems reset to timeout
* and we don't want expire on this case
* with a syslog server
*/
si_oc(si)->rex = TICK_ETERNITY;
/* rto should not change but it seems the case */
si_oc(si)->rto = TICK_ETERNITY;
/* an error was detected */
if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
goto close;
/* con closed by server side */
if ((si_oc(si)->flags & CF_SHUTW))
goto close;
/* if the connection is not established, inform the stream that we want
* to be notified whenever the connection completes.
*/
if (si_opposite(si)->state < SI_ST_EST) {
si_cant_get(si);
si_rx_conn_blk(si);
si_rx_endp_more(si);
return;
}
HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
if (appctx != sft->appctx) {
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
goto close;
}
ofs = sft->ofs;
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_DEL_INIT(&appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
* dropped events because we'd take a reference on the oldest message
* and keep it while being scheduled. Thus instead let's take it the
* first time we enter here so that we have a chance to pass many
* existing messages before grabbing a reference to a location. This
* value cannot be produced after initialization.
*/
if (unlikely(ofs == ~0)) {
ofs = 0;
HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
ofs += ring->ofs;
}
/* we were already there, adjust the offset to be relative to
* the buffer's head and remove us from the counter.
*/
ofs -= ring->ofs;
BUG_ON(ofs >= buf->size);
HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
/* in this loop, ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
* stop before the end (ret=0).
*/
if (si_opposite(si)->state == SI_ST_EST) {
ret = 1;
while (ofs + 1 < b_data(buf)) {
cnt = 1;
len = b_peek_varint(buf, ofs + cnt, &msg_len);
if (!len)
break;
cnt += len;
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
chunk_reset(&trash);
p = ulltoa(msg_len, trash.area, b_size(&trash));
if (p) {
trash.data = (p - trash.area) + 1;
*p = ' ';
}
if (!p || (trash.data + msg_len > b_size(&trash))) {
/* too large a message to ever fit, let's skip it */
ofs += cnt + msg_len;
continue;
}
trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
if (ci_putchk(si_ic(si), &trash) == -1) {
si_rx_room_blk(si);
ret = 0;
break;
}
ofs += cnt + msg_len;
}
HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
ofs += ring->ofs;
sft->ofs = ofs;
}
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
if (ret) {
/* let's be woken up once new data arrive */
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
si_rx_endp_done(si);
}
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
/* always drain data from server */
co_skip(si_oc(si), si_oc(si)->output);
return;
close:
si_shutw(si);
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
}
void __sink_forward_session_deinit(struct sink_forward_target *sft)
{
struct stream_interface *si;
@ -520,6 +664,13 @@ static struct applet sink_forward_applet = {
.release = sink_forward_session_release,
};
static struct applet sink_forward_oc_applet = {
.obj_type = OBJ_TYPE_APPLET,
.name = "<SINKFWDOC>", /* used for logging */
.fct = sink_forward_oc_io_handler,
.release = sink_forward_session_release,
};
/*
* Create a new peer session in assigned state (connect will start automatically)
*/
@ -529,8 +680,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
struct appctx *appctx;
struct session *sess;
struct stream *s;
struct applet *applet = &sink_forward_applet;
appctx = appctx_new(&sink_forward_applet, tid_bit);
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
applet = &sink_forward_oc_applet;
appctx = appctx_new(applet, tid_bit);
if (!appctx)
goto out_close;