mon/LogMonitor: factor logging to file out of update_from_paxos

Move this into a helper.  Keep an fd open for each channel so that we
can more efficiently log individual LogEntry's.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2021-06-22 16:24:35 -04:00
parent f39f01c4d2
commit 7c84e06e6f
3 changed files with 100 additions and 89 deletions

View File

@ -246,8 +246,6 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
return;
ceph_assert(version >= summary.version);
map<string,bufferlist> channel_blog;
version_t latest_full = get_version_latest_full();
dout(10) << __func__ << " latest full " << latest_full << dendl;
if ((latest_full > 0) && (latest_full > summary.version)) {
@ -273,69 +271,10 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
while (!p.end()) {
LogEntry le;
le.decode(p);
dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl;
string channel = le.channel;
if (channel.empty()) // keep retrocompatibility
channel = CLOG_CHANNEL_CLUSTER;
if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
cerr << channel << " " << le << std::endl;
}
if (channels.do_log_to_syslog(channel)) {
string level = channels.get_level(channel);
string facility = channels.get_facility(channel);
if (level.empty() || facility.empty()) {
derr << __func__ << " unable to log to syslog -- level or facility"
<< " not defined (level: " << level << ", facility: "
<< facility << ")" << dendl;
continue;
}
le.log_to_syslog(channels.get_level(channel),
channels.get_facility(channel));
}
if (channels.do_log_to_graylog(channel)) {
ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
if (graylog) {
graylog->log_log_entry(&le);
}
dout(7) << "graylog: " << channel << " " << graylog
<< " host:" << channels.log_to_graylog_host << dendl;
}
if (channels.do_log_to_journald(channel)) {
auto &journald = channels.get_journald();
journald.log_log_entry(le);
dout(7) << "journald: " << channel << dendl;
}
if (g_conf()->mon_cluster_log_to_file) {
string log_file = channels.get_log_file(channel);
dout(20) << __func__ << " logging for channel '" << channel
<< "' to file '" << log_file << "'" << dendl;
if (!log_file.empty()) {
string log_file_level = channels.get_log_file_level(channel);
if (log_file_level.empty()) {
dout(1) << __func__ << " warning: log file level not defined for"
<< " channel '" << channel << "' yet a log file is --"
<< " will assume lowest level possible" << dendl;
}
int min = string_to_syslog_level(log_file_level);
int l = clog_type_to_syslog_level(le.prio);
if (l <= min) {
stringstream ss;
ss << le << "\n";
// init entry if DNE
bufferlist &blog = channel_blog[channel];
blog.append(ss.str());
}
}
}
dout(7) << "update_from_paxos applying incremental log "
<< summary.version+1 << " " << le << dendl;
log_external(le);
summary.add(le);
}
@ -343,36 +282,95 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap)
summary.prune(g_conf()->mon_log_max_summary);
}
dout(15) << __func__ << " logging for "
<< channel_blog.size() << " channels" << dendl;
for(map<string,bufferlist>::iterator p = channel_blog.begin();
p != channel_blog.end(); ++p) {
if (!p->second.length()) {
dout(15) << __func__ << " channel '" << p->first
<< "': nothing to log" << dendl;
continue;
}
check_subs();
}
dout(15) << __func__ << " channel '" << p->first
<< "' logging " << p->second.length() << " bytes" << dendl;
string log_file = channels.get_log_file(p->first);
void LogMonitor::log_external(const LogEntry& le)
{
string channel = le.channel;
if (channel.empty()) { // keep retrocompatibility
channel = CLOG_CHANNEL_CLUSTER;
}
int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT|O_CLOEXEC, 0600);
if (fd < 0) {
int err = -errno;
dout(1) << "unable to write to '" << log_file << "' for channel '"
<< p->first << "': " << cpp_strerror(err) << dendl;
if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
cerr << channel << " " << le << std::endl;
}
if (channels.do_log_to_syslog(channel)) {
string level = channels.get_level(channel);
string facility = channels.get_facility(channel);
if (level.empty() || facility.empty()) {
derr << __func__ << " unable to log to syslog -- level or facility"
<< " not defined (level: " << level << ", facility: "
<< facility << ")" << dendl;
} else {
int err = p->second.write_fd(fd);
if (err < 0) {
dout(1) << "error writing to '" << log_file << "' for channel '"
<< p->first << ": " << cpp_strerror(err) << dendl;
}
VOID_TEMP_FAILURE_RETRY(::close(fd));
le.log_to_syslog(channels.get_level(channel),
channels.get_facility(channel));
}
}
check_subs();
if (channels.do_log_to_graylog(channel)) {
ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
if (graylog) {
graylog->log_log_entry(&le);
}
dout(7) << "graylog: " << channel << " " << graylog
<< " host:" << channels.log_to_graylog_host << dendl;
}
if (channels.do_log_to_journald(channel)) {
auto &journald = channels.get_journald();
journald.log_log_entry(le);
dout(7) << "journald: " << channel << dendl;
}
if (g_conf()->mon_cluster_log_to_file) {
auto p = channel_fds.find(channel);
int fd;
if (p == channel_fds.end()) {
string log_file = channels.get_log_file(channel);
dout(20) << __func__ << " logging for channel '" << channel
<< "' to file '" << log_file << "'" << dendl;
if (log_file.empty()) {
// do not log this channel
fd = -1;
} else {
fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT|O_CLOEXEC, 0600);
if (fd < 0) {
int err = -errno;
dout(1) << "unable to write to '" << log_file << "' for channel '"
<< p->first << "': " << cpp_strerror(err) << dendl;
}
channel_fds[channel] = fd;
}
} else {
fd = p->second;
}
if (fd >= 0) {
fmt::format_to(file_log_buffer, "{}\n", le);
int err = safe_write(fd, file_log_buffer.data(), file_log_buffer.size());
file_log_buffer.clear();
if (err < 0) {
dout(1) << "error writing to '" << channels.get_log_file(channel)
<< "' for channel '" << channel
<< ": " << cpp_strerror(err) << dendl;
::close(fd);
channel_fds.erase(channel);
}
}
}
}
void LogMonitor::log_external_close_fds()
{
for (auto& [channel, fd] : channel_fds) {
if (fd >= 0) {
dout(10) << __func__ << " closing " << channel << " (" << fd << ")" << dendl;
::close(fd);
}
}
channel_fds.clear();
}
void LogMonitor::create_pending()
@ -972,6 +970,7 @@ void LogMonitor::update_log_channels()
}
channels.expand_channel_meta();
log_external_close_fds();
}
@ -986,7 +985,8 @@ void LogMonitor::handle_conf_change(const ConfigProxy& conf,
changed.count("mon_cluster_log_to_graylog") ||
changed.count("mon_cluster_log_to_graylog_host") ||
changed.count("mon_cluster_log_to_graylog_port") ||
changed.count("mon_cluster_log_to_journald")) {
changed.count("mon_cluster_log_to_journald") ||
changed.count("mon_cluster_log_to_file")) {
update_log_channels();
}
}

View File

@ -18,6 +18,9 @@
#include <map>
#include <set>
#include <fmt/format.h>
#include <fmt/ostream.h>
#include "include/types.h"
#include "PaxosService.h"
@ -42,6 +45,10 @@ private:
std::multimap<utime_t,LogEntry> pending_log;
LogSummary pending_summary, summary;
std::map<std::string, int> channel_fds;
fmt::memory_buffer file_log_buffer;
struct log_channel_info {
std::map<std::string,std::string> log_to_syslog;
@ -159,6 +166,9 @@ private:
void check_subs();
void check_sub(Subscription *s);
void log_external_close_fds();
void log_external(const LogEntry& le);
/**
* translate log sub name ('log-info') to integer id
*

View File

@ -611,6 +611,7 @@ const char** Monitor::get_tracked_conf_keys() const
"clog_to_graylog",
"clog_to_graylog_host",
"clog_to_graylog_port",
"mon_cluster_log_to_file",
"host",
"fsid",
// periodic health to clog