mirror of
http://git.haproxy.org/git/haproxy.git/
synced 2025-02-28 16:40:37 +00:00
MINOR: sink: now report the number of dropped events on output
The principle is that when emitting a message, if some dropped events were logged, we first attempt to report this counter before going further. This is done under an exclusive lock while all logs are produced under a shared lock. This ensures that the dropped line is accurately reported and doesn't accidently arrive after a later event.
This commit is contained in:
parent
9f830d7408
commit
8f24023ba0
@ -29,7 +29,46 @@ extern struct list sink_list;
|
|||||||
|
|
||||||
struct sink *sink_find(const char *name);
|
struct sink *sink_find(const char *name);
|
||||||
struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd);
|
struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd);
|
||||||
void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg);
|
ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg);
|
||||||
|
int sink_announce_dropped(struct sink *sink);
|
||||||
|
|
||||||
|
|
||||||
|
/* tries to send <nmsg> message parts (up to 8, ignored above) from message
|
||||||
|
* array <msg> to sink <sink>. Formating according to the sink's preference is
|
||||||
|
* done here. Lost messages are accounted for in the sink's counter. If there
|
||||||
|
* were lost messages, an attempt is first made to indicate it.
|
||||||
|
*/
|
||||||
|
static inline void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg)
|
||||||
|
{
|
||||||
|
ssize_t sent;
|
||||||
|
|
||||||
|
if (unlikely(sink->ctx.dropped > 0)) {
|
||||||
|
/* We need to take an exclusive lock so that other producers
|
||||||
|
* don't do the same thing at the same time and above all we
|
||||||
|
* want to be sure others have finished sending their messages
|
||||||
|
* so that the dropped event arrives exactly at the right
|
||||||
|
* position.
|
||||||
|
*/
|
||||||
|
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.lock);
|
||||||
|
sent = sink_announce_dropped(sink);
|
||||||
|
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.lock);
|
||||||
|
|
||||||
|
if (!sent) {
|
||||||
|
/* we failed, we don't try to send our log as if it
|
||||||
|
* would pass by chance, we'd get disordered events.
|
||||||
|
*/
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &sink->ctx.lock);
|
||||||
|
sent = __sink_write(sink, msg, nmsg);
|
||||||
|
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &sink->ctx.lock);
|
||||||
|
|
||||||
|
fail:
|
||||||
|
if (unlikely(sent <= 0))
|
||||||
|
HA_ATOMIC_ADD(&sink->ctx.dropped, 1);
|
||||||
|
}
|
||||||
|
|
||||||
#endif /* _PROTO_SINK_H */
|
#endif /* _PROTO_SINK_H */
|
||||||
|
|
||||||
|
@ -60,9 +60,9 @@ struct sink {
|
|||||||
uint8_t syslog_minlvl; // used by syslog & short formats
|
uint8_t syslog_minlvl; // used by syslog & short formats
|
||||||
uint32_t maxlen; // max message length (truncated above)
|
uint32_t maxlen; // max message length (truncated above)
|
||||||
struct {
|
struct {
|
||||||
|
__decl_hathreads(HA_RWLOCK_T lock); // shared/excl for dropped
|
||||||
struct ring *ring; // used by ring buffer and STRM sender
|
struct ring *ring; // used by ring buffer and STRM sender
|
||||||
unsigned int dropped; // dropped events since last one.
|
unsigned int dropped; // dropped events since last one.
|
||||||
__decl_hathreads(HA_RWLOCK_T lock); // used by some types
|
|
||||||
int fd; // fd num for FD type sink
|
int fd; // fd num for FD type sink
|
||||||
} ctx;
|
} ctx;
|
||||||
};
|
};
|
||||||
|
41
src/sink.c
41
src/sink.c
@ -145,14 +145,16 @@ struct sink *sink_new_buf(const char *name, const char *desc, enum sink_fmt fmt,
|
|||||||
|
|
||||||
/* tries to send <nmsg> message parts (up to 8, ignored above) from message
|
/* tries to send <nmsg> message parts (up to 8, ignored above) from message
|
||||||
* array <msg> to sink <sink>. Formating according to the sink's preference is
|
* array <msg> to sink <sink>. Formating according to the sink's preference is
|
||||||
* done here. Lost messages are accounted for in the sink's counter.
|
* done here. Lost messages are NOT accounted for. It is preferable to call
|
||||||
|
* sink_write() instead which will also try to emit the number of dropped
|
||||||
|
* messages when there are any. It returns >0 if it could write anything,
|
||||||
|
* <=0 otherwise.
|
||||||
*/
|
*/
|
||||||
void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg)
|
ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg)
|
||||||
{
|
{
|
||||||
char short_hdr[4];
|
char short_hdr[4];
|
||||||
struct ist pfx[4];
|
struct ist pfx[4];
|
||||||
size_t npfx = 0;
|
size_t npfx = 0;
|
||||||
size_t sent = 0;
|
|
||||||
|
|
||||||
if (sink->fmt == SINK_FMT_SHORT) {
|
if (sink->fmt == SINK_FMT_SHORT) {
|
||||||
short_hdr[0] = '<';
|
short_hdr[0] = '<';
|
||||||
@ -165,17 +167,36 @@ void sink_write(struct sink *sink, const struct ist msg[], size_t nmsg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (sink->type == SINK_TYPE_FD) {
|
if (sink->type == SINK_TYPE_FD) {
|
||||||
sent = fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
|
return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
|
||||||
/* sent > 0 if the message was delivered */
|
|
||||||
}
|
}
|
||||||
else if (sink->type == SINK_TYPE_BUFFER) {
|
else if (sink->type == SINK_TYPE_BUFFER) {
|
||||||
sent = ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
|
return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
|
||||||
/* sent > 0 if the message was delivered */
|
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* account for errors now */
|
/* Tries to emit a message indicating the number of dropped events. In case of
|
||||||
if (sent <= 0)
|
* success, the amount of drops is reduced by as much. It's supposed to be
|
||||||
HA_ATOMIC_ADD(&sink->ctx.dropped, 1);
|
* called under an exclusive lock on the sink to avoid multiple produces doing
|
||||||
|
* the same. On success, >0 is returned, otherwise <=0 on failure.
|
||||||
|
*/
|
||||||
|
int sink_announce_dropped(struct sink *sink)
|
||||||
|
{
|
||||||
|
unsigned int dropped;
|
||||||
|
struct buffer msg;
|
||||||
|
struct ist msgvec[1];
|
||||||
|
char logbuf[64];
|
||||||
|
|
||||||
|
while (unlikely((dropped = sink->ctx.dropped) > 0)) {
|
||||||
|
chunk_init(&msg, logbuf, sizeof(logbuf));
|
||||||
|
chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
|
||||||
|
msgvec[0] = ist2(msg.area, msg.data);
|
||||||
|
if (__sink_write(sink, msgvec, 1) <= 0)
|
||||||
|
return 0;
|
||||||
|
/* success! */
|
||||||
|
HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
|
/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
|
||||||
|
Loading…
Reference in New Issue
Block a user