Merge pull request #48623 from batrick/i57923

log: use non-blocking atomic writes to stderr fifos

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Yuri Weinstein 2022-12-05 09:13:49 -08:00 committed by GitHub
commit 3d3b96a092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 297 additions and 62 deletions

View File

@ -18,10 +18,14 @@
#include "LogClock.h"
#include "SubsystemMap.h"
#include <boost/container/vector.hpp>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <syslog.h>
#include <algorithm>
#include <iostream>
#include <set>
@ -49,6 +53,7 @@ Log::Log(const SubsystemMap *s)
m_recent(DEFAULT_MAX_RECENT)
{
m_log_buf.reserve(MAX_LOG_BUF);
_configure_stderr();
}
Log::~Log()
@ -58,8 +63,40 @@ Log::~Log()
}
ceph_assert(!is_started());
if (m_fd >= 0)
if (m_fd >= 0) {
VOID_TEMP_FAILURE_RETRY(::close(m_fd));
m_fd = -1;
}
}
void Log::_configure_stderr()
{
struct stat info;
if (int rc = fstat(m_fd_stderr, &info); rc == -1) {
std::cerr << "failed to stat stderr: " << cpp_strerror(errno) << std::endl;
return;
}
if (S_ISFIFO(info.st_mode)) {
/* Set O_NONBLOCK on FIFO stderr file. We want to ensure atomic debug log
* writes so they do not get partially read by e.g. buggy container
* runtimes. See also IEEE Std 1003.1-2017 and Log::_log_stderr below.
*/
int flags = fcntl(m_fd_stderr, F_GETFL);
if (flags == -1) {
std::cerr << "failed to get fcntl flags for stderr: " << cpp_strerror(errno) << std::endl;
return;
}
if (!(flags & O_NONBLOCK)) {
flags |= O_NONBLOCK;
flags = fcntl(m_fd_stderr, F_SETFL, flags);
if (flags == -1) {
std::cerr << "failed to set fcntl flags for stderr: " << cpp_strerror(errno) << std::endl;
return;
}
}
do_stderr_poll = true;
}
}
@ -116,8 +153,10 @@ void Log::reopen_log_file()
return;
}
m_flush_mutex_holder = pthread_self();
if (m_fd >= 0)
if (m_fd >= 0) {
VOID_TEMP_FAILURE_RETRY(::close(m_fd));
m_fd = -1;
}
if (m_log_file.length()) {
m_fd = ::open(m_log_file.c_str(), O_CREAT|O_WRONLY|O_APPEND|O_CLOEXEC, 0644);
if (m_fd >= 0 && (m_uid || m_gid)) {
@ -127,8 +166,6 @@ void Log::reopen_log_file()
<< std::endl;
}
}
} else {
m_fd = -1;
}
m_flush_mutex_holder = 0;
}
@ -258,6 +295,69 @@ void Log::_log_safe_write(std::string_view sv)
}
}
void Log::set_stderr_fd(int fd)
{
m_fd_stderr = fd;
_configure_stderr();
}
void Log::_log_stderr(std::string_view strv)
{
if (do_stderr_poll) {
auto& prefix = m_log_stderr_prefix;
size_t const len = prefix.size() + strv.size();
boost::container::small_vector<char, PIPE_BUF> buf;
buf.resize(len+1, '\0');
memcpy(buf.data(), prefix.c_str(), prefix.size());
memcpy(buf.data()+prefix.size(), strv.data(), strv.size());
char const* const start = buf.data();
char const* current = start;
while ((size_t)(current-start) < len) {
auto chunk = std::min<ssize_t>(PIPE_BUF, len-(ssize_t)(current-start));
while (1) {
ssize_t rc = write(m_fd_stderr, current, chunk);
if (rc == chunk) {
current += chunk;
break;
} else if (rc > 0) {
/* According to IEEE Std 1003.1-2017, this cannot happen:
*
* Write requests to a pipe or FIFO shall be handled in the same way as a regular file with the following exceptions:
* ...
* If the O_NONBLOCK flag is set ...
* ...
* A write request for {PIPE_BUF} or fewer bytes shall have the
* following effect: if there is sufficient space available in
* the pipe, write() shall transfer all the data and return the
* number of bytes requested. Otherwise, write() shall transfer
* no data and return -1 with errno set to [EAGAIN].
*
* In any case, handle misbehavior gracefully by incrementing current.
*/
current += rc;
break;
} else if (rc == -1) {
if (errno == EAGAIN) {
struct pollfd pfd[1];
pfd[0].fd = m_fd_stderr;
pfd[0].events = POLLOUT;
poll(pfd, 1, -1);
/* ignore errors / success, just retry the write */
} else if (errno == EINTR) {
continue;
} else {
/* some other kind of error, no point logging if stderr writes fail */
return;
}
}
}
}
} else {
fmt::print(std::cerr, "{}{}", m_log_stderr_prefix, strv);
}
}
void Log::_flush_logbuf()
{
if (m_log_buf.size()) {
@ -317,7 +417,7 @@ void Log::_flush(EntryVector& t, bool crash)
pos[used++] = '\n';
if (do_stderr) {
fmt::print(std::cerr, "{}{}", m_log_stderr_prefix, std::string_view(pos, used));
_log_stderr(std::string_view(pos, used));
}
if (do_fd) {

View File

@ -18,6 +18,8 @@
#include "log/Entry.h"
#include <unistd.h>
struct uuid_d;
namespace ceph {
@ -29,63 +31,6 @@ class SubsystemMap;
class Log : private Thread
{
protected:
using EntryVector = std::vector<ConcreteEntry>;
private:
using EntryRing = boost::circular_buffer<ConcreteEntry>;
static const std::size_t DEFAULT_MAX_NEW = 100;
static const std::size_t DEFAULT_MAX_RECENT = 10000;
Log **m_indirect_this;
const SubsystemMap *m_subs;
std::mutex m_queue_mutex;
std::mutex m_flush_mutex;
std::condition_variable m_cond_loggers;
std::condition_variable m_cond_flusher;
pthread_t m_queue_mutex_holder;
pthread_t m_flush_mutex_holder;
EntryVector m_new; ///< new entries
EntryRing m_recent; ///< recent (less new) entries we've already written at low detail
EntryVector m_flush; ///< entries to be flushed (here to optimize heap allocations)
std::string m_log_file;
int m_fd = -1;
uid_t m_uid = 0;
gid_t m_gid = 0;
int m_fd_last_error = 0; ///< last error we say writing to fd (if any)
int m_syslog_log = -2, m_syslog_crash = -2;
int m_stderr_log = -1, m_stderr_crash = -1;
int m_graylog_log = -3, m_graylog_crash = -3;
int m_journald_log = -3, m_journald_crash = -3;
std::string m_log_stderr_prefix;
std::shared_ptr<Graylog> m_graylog;
std::unique_ptr<JournaldLogger> m_journald;
std::vector<char> m_log_buf;
bool m_stop = false;
std::size_t m_max_new = DEFAULT_MAX_NEW;
bool m_inject_segv = false;
void *entry() override;
void _log_safe_write(std::string_view sv);
void _flush_logbuf();
void _log_message(std::string_view s, bool crash);
protected:
virtual void _flush(EntryVector& q, bool crash);
public:
using Thread::is_started;
@ -101,6 +46,7 @@ public:
void reopen_log_file();
void chown_log_file(uid_t uid, gid_t gid);
void set_log_stderr_prefix(std::string_view p);
void set_stderr_fd(int fd);
void flush();
@ -132,6 +78,72 @@ public:
/// induce a segv on the next log event
void inject_segv();
void reset_segv();
protected:
using EntryVector = std::vector<ConcreteEntry>;
virtual void _flush(EntryVector& q, bool crash);
private:
using EntryRing = boost::circular_buffer<ConcreteEntry>;
static const std::size_t DEFAULT_MAX_NEW = 100;
static const std::size_t DEFAULT_MAX_RECENT = 10000;
Log **m_indirect_this;
const SubsystemMap *m_subs;
std::mutex m_queue_mutex;
std::mutex m_flush_mutex;
std::condition_variable m_cond_loggers;
std::condition_variable m_cond_flusher;
pthread_t m_queue_mutex_holder;
pthread_t m_flush_mutex_holder;
EntryVector m_new; ///< new entries
EntryRing m_recent; ///< recent (less new) entries we've already written at low detail
EntryVector m_flush; ///< entries to be flushed (here to optimize heap allocations)
std::string m_log_file;
int m_fd = -1;
uid_t m_uid = 0;
gid_t m_gid = 0;
int m_fd_stderr = STDERR_FILENO;
int m_fd_last_error = 0; ///< last error we say writing to fd (if any)
int m_syslog_log = -2, m_syslog_crash = -2;
int m_stderr_log = -1, m_stderr_crash = -1;
int m_graylog_log = -3, m_graylog_crash = -3;
int m_journald_log = -3, m_journald_crash = -3;
std::string m_log_stderr_prefix;
bool do_stderr_poll = false;
std::shared_ptr<Graylog> m_graylog;
std::unique_ptr<JournaldLogger> m_journald;
std::vector<char> m_log_buf;
bool m_stop = false;
std::size_t m_max_new = DEFAULT_MAX_NEW;
bool m_inject_segv = false;
void *entry() override;
void _log_safe_write(std::string_view sv);
void _flush_logbuf();
void _log_message(std::string_view s, bool crash);
void _configure_stderr();
void _log_stderr(std::string_view strv);
};
}

View File

@ -10,6 +10,10 @@
#include "global/global_context.h"
#include "common/dout.h"
#include <unistd.h>
#include <limits.h>
using namespace std;
using namespace ceph::logging;
@ -166,6 +170,125 @@ TEST(Log, ManyGather)
log.stop();
}
static void readpipe(int fd, int verify)
{
while (1) {
/* Use larger buffer on receiver as Linux will allow pipes buffers to
* exceed PIPE_BUF. We can't avoid tearing due to small read buffers from
* the Ceph side.
*/
char buf[65536] = "";
int rc = read(fd, buf, (sizeof buf) - 1);
if (rc == 0) {
_exit(0);
} else if (rc == -1) {
_exit(1);
} else if (rc > 0) {
if (verify) {
char* p = strrchr(buf, '\n');
/* verify no torn writes */
if (p == NULL) {
_exit(2);
} else if (p[1] != '\0') {
write(2, buf, strlen(buf));
_exit(3);
}
}
} else _exit(100);
usleep(500);
}
}
TEST(Log, StderrPipeAtomic)
{
int pfd[2] = {-1, -1};
int rc = pipe(pfd);
ASSERT_EQ(rc, 0);
pid_t pid = fork();
if (pid == 0) {
close(pfd[1]);
readpipe(pfd[0], 1);
} else if (pid == (pid_t)-1) {
ASSERT_EQ(0, 1);
}
close(pfd[0]);
SubsystemMap subs;
subs.set_log_level(1, 20);
subs.set_gather_level(1, 10);
Log log(&subs);
log.start();
log.set_log_file("");
log.reopen_log_file();
log.set_stderr_fd(pfd[1]);
log.set_stderr_level(1, 20);
/* -128 for prefix space */
for (int i = 0; i < PIPE_BUF-128; i++) {
MutableEntry e(1, 1);
auto& s = e.get_ostream();
for (int j = 0; j < i; j++) {
char c = 'a';
c += (j % 26);
s << c;
}
log.submit_entry(std::move(e));
}
log.flush();
log.stop();
close(pfd[1]);
int status;
pid_t waited = waitpid(pid, &status, 0);
ASSERT_EQ(pid, waited);
ASSERT_NE(WIFEXITED(status), 0);
ASSERT_EQ(WEXITSTATUS(status), 0);
}
TEST(Log, StderrPipeBig)
{
int pfd[2] = {-1, -1};
int rc = pipe(pfd);
ASSERT_EQ(rc, 0);
pid_t pid = fork();
if (pid == 0) {
/* no verification as some reads will be torn due to size > PIPE_BUF */
close(pfd[1]);
readpipe(pfd[0], 0);
} else if (pid == (pid_t)-1) {
ASSERT_EQ(0, 1);
}
close(pfd[0]);
SubsystemMap subs;
subs.set_log_level(1, 20);
subs.set_gather_level(1, 10);
Log log(&subs);
log.start();
log.set_log_file("");
log.reopen_log_file();
log.set_stderr_fd(pfd[1]);
log.set_stderr_level(1, 20);
/* -128 for prefix space */
for (int i = 0; i < PIPE_BUF*2; i++) {
MutableEntry e(1, 1);
auto& s = e.get_ostream();
for (int j = 0; j < i; j++) {
char c = 'a';
c += (j % 26);
s << c;
}
log.submit_entry(std::move(e));
}
log.flush();
log.stop();
close(pfd[1]);
int status;
pid_t waited = waitpid(pid, &status, 0);
ASSERT_EQ(pid, waited);
ASSERT_NE(WIFEXITED(status), 0);
ASSERT_EQ(WEXITSTATUS(status), 0);
}
void do_segv()
{
SubsystemMap subs;