Merge PR #60464 into main

* refs/pull/60464/head:
	mds: add or update MDS thread names
	log: cache recent threads up to a day
	common: cache pthread names
	log: concatenate thread names and print once per thread

Reviewed-by: Milind Changire <mchangir@redhat.com>
This commit is contained in:
Patrick Donnelly 2024-11-12 22:17:59 -05:00
commit 584a81c686
No known key found for this signature in database
GPG Key ID: FA47FD0B0367D313
34 changed files with 151 additions and 149 deletions

View File

@ -81,7 +81,7 @@ static void handle_mds_signal(int signum)
int main(int argc, const char **argv)
{
ceph_pthread_setname(pthread_self(), "ceph-mds");
ceph_pthread_setname("ceph-mds");
auto args = argv_to_vec(argc, argv);
if (args.empty()) {

View File

@ -41,7 +41,7 @@ static void usage()
*/
int main(int argc, const char **argv)
{
ceph_pthread_setname(pthread_self(), "ceph-mgr");
ceph_pthread_setname("ceph-mgr");
auto args = argv_to_vec(argc, argv);
if (args.empty()) {

View File

@ -250,7 +250,7 @@ int main(int argc, const char **argv)
{
// reset our process name, in case we did a respawn, so that it's not
// left as "exe".
ceph_pthread_setname(pthread_self(), "ceph-mon");
ceph_pthread_setname("ceph-mon");
int err;

View File

@ -45,7 +45,7 @@ static void usage()
*/
int main(int argc, const char **argv)
{
ceph_pthread_setname(pthread_self(), "ceph-nvmeof-monitor-client");
ceph_pthread_setname("ceph-nvmeof-monitor-client");
auto args = argv_to_vec(argc, argv);
if (args.empty()) {

View File

@ -290,6 +290,7 @@ SyntheticClient::SyntheticClient(StandaloneClient *client, int w)
void *synthetic_client_thread_entry(void *ptr)
{
ceph_pthread_setname("client");
SyntheticClient *sc = static_cast<SyntheticClient*>(ptr);
//int r =
sc->run();
@ -945,7 +946,6 @@ int SyntheticClient::start_thread()
pthread_create(&thread_id, NULL, synthetic_client_thread_entry, this);
ceph_assert(thread_id);
ceph_pthread_setname(thread_id, "client");
return 0;
}

View File

@ -83,7 +83,7 @@ void *Thread::entry_wrapper()
if (pid && cpuid >= 0)
_set_affinity(cpuid);
ceph_pthread_setname(pthread_self(), Thread::thread_name.c_str());
ceph_pthread_setname(thread_name.c_str());
return entry();
}
@ -154,7 +154,7 @@ int Thread::try_create(size_t stacksize)
void Thread::create(const char *name, size_t stacksize)
{
ceph_assert(strlen(name) < 16);
Thread::thread_name = name;
thread_name = name;
int ret = try_create(stacksize);
if (ret != 0) {
@ -203,24 +203,6 @@ int Thread::set_affinity(int id)
// Functions for std::thread
// =========================
void set_thread_name(std::thread& t, const std::string& s) {
int r = ceph_pthread_setname(t.native_handle(), s.c_str());
if (r != 0) {
throw std::system_error(r, std::generic_category());
}
}
std::string get_thread_name(const std::thread& t) {
std::string s(256, '\0');
int r = ceph_pthread_getname(const_cast<std::thread&>(t).native_handle(),
s.data(), s.length());
if (r != 0) {
throw std::system_error(r, std::generic_category());
}
s.resize(std::strlen(s.data()));
return s;
}
void kill(std::thread& t, int signal)
{
auto r = ceph_pthread_kill(t.native_handle(), signal);

View File

@ -27,7 +27,6 @@
#include "include/ceph_assert.h"
#include "include/compat.h"
#include "include/spinlock.h"
extern pid_t ceph_gettid();
@ -36,7 +35,7 @@ class Thread {
pthread_t thread_id;
pid_t pid;
int cpuid;
static inline thread_local std::string thread_name;
std::string thread_name;
void *entry_wrapper();
@ -64,15 +63,10 @@ class Thread {
int join(void **prval = 0);
int detach();
int set_affinity(int cpuid);
static const std::string get_thread_name() {
return Thread::thread_name;
}
};
// Functions for with std::thread
void set_thread_name(std::thread& t, const std::string& s);
std::string get_thread_name(const std::thread& t);
void kill(std::thread& t, int signal);
template<typename Fun, typename... Args>
@ -81,7 +75,7 @@ std::thread make_named_thread(std::string_view n,
Args&& ...args) {
return std::thread([n = std::string(n)](auto&& fun, auto&& ...args) {
ceph_pthread_setname(pthread_self(), n.data());
ceph_pthread_setname(n.data());
std::invoke(std::forward<Fun>(fun),
std::forward<Args>(args)...);
}, std::forward<Fun>(fun), std::forward<Args>(args)...);

View File

@ -44,8 +44,7 @@ namespace ceph {
g_assert_line = line;
g_assert_func = func;
g_assert_thread = (unsigned long long)pthread_self();
ceph_pthread_getname(pthread_self(), g_assert_thread_name,
sizeof(g_assert_thread_name));
ceph_pthread_getname(g_assert_thread_name, sizeof(g_assert_thread_name));
ostringstream tss;
tss << ceph_clock_now();
@ -122,8 +121,7 @@ namespace ceph {
g_assert_line = line;
g_assert_func = func;
g_assert_thread = (unsigned long long)pthread_self();
ceph_pthread_getname(pthread_self(), g_assert_thread_name,
sizeof(g_assert_thread_name));
ceph_pthread_getname(g_assert_thread_name, sizeof(g_assert_thread_name));
BufAppender ba(g_assert_msg, sizeof(g_assert_msg));
BackTrace *bt = new ClibBackTrace(1);
@ -168,8 +166,7 @@ namespace ceph {
g_assert_line = line;
g_assert_func = func;
g_assert_thread = (unsigned long long)pthread_self();
ceph_pthread_getname(pthread_self(), g_assert_thread_name,
sizeof(g_assert_thread_name));
ceph_pthread_getname(g_assert_thread_name, sizeof(g_assert_thread_name));
BackTrace *bt = new ClibBackTrace(1);
snprintf(g_assert_msg, sizeof(g_assert_msg),
@ -210,8 +207,7 @@ namespace ceph {
g_assert_line = line;
g_assert_func = func;
g_assert_thread = (unsigned long long)pthread_self();
ceph_pthread_getname(pthread_self(), g_assert_thread_name,
sizeof(g_assert_thread_name));
ceph_pthread_getname(g_assert_thread_name, sizeof(g_assert_thread_name));
BufAppender ba(g_assert_msg, sizeof(g_assert_msg));
BackTrace *bt = new ClibBackTrace(1);

View File

@ -98,6 +98,7 @@ class timer {
std::thread thread;
void timer_thread() {
ceph_pthread_setname("ceph_timer");
std::unique_lock l(lock);
while (!suspended) {
auto now = TC::now();
@ -155,7 +156,6 @@ class timer {
public:
timer() : suspended(false) {
thread = std::thread(&timer::timer_thread, this);
set_thread_name(thread, "ceph_timer");
}
// Create a suspended timer, jobs will be executed in order when

View File

@ -11,6 +11,7 @@
* Foundation. See file COPYING.
*
*/
#include "include/compat.h"
#include "common/code_environment.h"
@ -18,10 +19,6 @@
#include "acconfig.h"
#ifdef HAVE_PTHREAD_GETNAME_NP
#include <pthread.h>
#endif
#include <string.h>
code_environment_t g_code_env = CODE_ENVIRONMENT_UTILITY;
@ -57,7 +54,7 @@ int get_process_name(char *buf, int len)
}
// FIPS zeroization audit 20191115: this memset is not security related.
memset(buf, 0, len);
return pthread_getname_np(pthread_self(), buf, len);
return ceph_pthread_getname(buf, len);
}
#elif defined(HAVE_GETPROGNAME)

View File

@ -565,3 +565,66 @@ ssize_t get_self_exe_path(char* path, int buff_length) {
}
#endif /* _WIN32 */
static thread_local char cached_thread_name[256]{};
int ceph_pthread_setname(char const* name)
{
strncpy(cached_thread_name, name, sizeof cached_thread_name - 1);
#if defined(_WIN32) && defined(__clang__) && \
!defined(_LIBCPP_HAS_THREAD_API_PTHREAD)
// In this case, llvm doesn't use the pthread api for std::thread.
// We cannot use native_handle() with the pthread api, nor can we pass
// it to Windows API functions.
return 0;
#elif defined(HAVE_PTHREAD_SETNAME_NP)
#if defined(__APPLE__)
return pthread_setname_np(name);
#else
return pthread_setname_np(pthread_self(), name);
#endif
#elif defined(HAVE_PTHREAD_SET_NAME_NP)
pthread_set_name_np(pthread_self(), name); \
return 0;
#else
return 0;
#endif
}
int ceph_pthread_getname(char* name, size_t len)
{
if (cached_thread_name[0]) {
if (len > 0) {
strncpy(name, cached_thread_name, len);
name[len-1] = 0;
}
return 0;
} else {
#if defined(_WIN32) && defined(__clang__) && \
!defined(_LIBCPP_HAS_THREAD_API_PTHREAD)
if (len > 0) {
strcpy(name, "");
}
return 0;
#elif defined(HAVE_PTHREAD_GETNAME_NP) || defined(HAVE_PTHREAD_GET_NAME_NP)
# if defined(HAVE_PTHREAD_GETNAME_NP)
int rc = pthread_getname_np(pthread_self(), cached_thread_name, sizeof cached_thread_name);
# else
int rc = pthread_get_name_np(pthread_self(), cached_thread_name, sizeof cached_thread_name);
# endif
if (rc == 0) {
strncpy(name, cached_thread_name, len);
name[len-1] = 0;
return 0;
} else {
return rc;
}
#else
if (len > 0) {
strcpy(name, "");
}
return 0;
#endif
}
}

View File

@ -99,6 +99,7 @@ ostream& ObjBencher::out(ostream& os)
}
void *ObjBencher::status_printer(void *_bencher) {
ceph_pthread_setname("OB::stat_print");
ObjBencher *bencher = static_cast<ObjBencher *>(_bencher);
bench_data& data = bencher->data;
Formatter *formatter = bencher->formatter;
@ -453,7 +454,6 @@ int ObjBencher::write_bench(int secondsToRun,
pthread_t print_thread;
pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
ceph_pthread_setname(print_thread, "write_stat");
std::unique_lock locker{lock};
data.finished = 0;
data.start_time = mono_clock::now();
@ -691,7 +691,6 @@ int ObjBencher::seq_read_bench(
pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
ceph_pthread_setname(print_thread, "seq_read_stat");
mono_time finish_time = data.start_time + time_to_run;
//start initial reads
@ -903,7 +902,6 @@ int ObjBencher::rand_read_bench(
pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
ceph_pthread_setname(print_thread, "rand_read_stat");
mono_time finish_time = data.start_time + time_to_run;
//start initial reads

View File

@ -27,7 +27,7 @@ ThreadPool::ThreadPool(size_t n_threads,
pin(*cpus);
}
block_sighup();
(void) pthread_setname_np(pthread_self(), "alien-store-tp");
(void) ceph_pthread_setname("alien-store-tp");
loop(queue_max_wait, i);
});
}

View File

@ -307,7 +307,7 @@ static void handle_oneshot_fatal_signal(int signum)
char buf[1024];
char pthread_name[16] = {0}; //limited by 16B include terminating null byte.
int r = ceph_pthread_getname(pthread_self(), pthread_name, sizeof(pthread_name));
int r = ceph_pthread_getname(pthread_name, sizeof(pthread_name));
(void)r;
#if defined(__sun)
char message[SIG2STR_MAX];

View File

@ -179,58 +179,10 @@ struct cpu_set_t;
#define MSG_DONTWAIT MSG_NONBLOCK
#endif
/* compiler warning free success noop */
#define pthread_setname_noop_helper(thread, name) ({ \
int __i = 0; \
__i; })
#define pthread_getname_noop_helper(thread, name, len) ({ \
if (name != NULL) \
*name = '\0'; \
0; })
#define pthread_kill_unsupported_helper(thread, signal) ({ \
int __i = -ENOTSUP; \
__i; })
#if defined(_WIN32) && defined(__clang__) && \
!defined(_LIBCPP_HAS_THREAD_API_PTHREAD)
// In this case, llvm doesn't use the pthread api for std::thread.
// We cannot use native_handle() with the pthread api, nor can we pass
// it to Windows API functions.
#define ceph_pthread_setname pthread_setname_noop_helper
#elif defined(HAVE_PTHREAD_SETNAME_NP)
#if defined(__APPLE__)
#define ceph_pthread_setname(thread, name) ({ \
int __result = 0; \
if (thread == pthread_self()) \
__result = pthread_setname_np(name); \
__result; })
#else
#define ceph_pthread_setname pthread_setname_np
#endif
#elif defined(HAVE_PTHREAD_SET_NAME_NP)
/* Fix a small name diff and return 0 */
#define ceph_pthread_setname(thread, name) ({ \
pthread_set_name_np(thread, name); \
0; })
#else
#define ceph_pthread_setname pthread_setname_noop_helper
#endif
#if defined(_WIN32) && defined(__clang__) && \
!defined(_LIBCPP_HAS_THREAD_API_PTHREAD)
#define ceph_pthread_getname pthread_getname_noop_helper
#elif defined(HAVE_PTHREAD_GETNAME_NP)
#define ceph_pthread_getname pthread_getname_np
#elif defined(HAVE_PTHREAD_GET_NAME_NP)
#define ceph_pthread_getname(thread, name, len) ({ \
pthread_get_name_np(thread, name, len); \
0; })
#else
#define ceph_pthread_getname pthread_getname_noop_helper
#endif
#if defined(_WIN32) && defined(__clang__) && \
!defined(_LIBCPP_HAS_THREAD_API_PTHREAD)
#define ceph_pthread_kill pthread_kill_unsupported_helper
@ -244,6 +196,9 @@ int ceph_posix_fallocate(int fd, off_t offset, off_t len);
extern "C" {
#endif
int ceph_pthread_getname(char* name, size_t size);
int ceph_pthread_setname(const char* name);
int pipe_cloexec(int pipefd[2], int flags);
char *ceph_strerror_r(int errnum, char *buf, size_t buflen);
unsigned get_page_size();

View File

@ -24,6 +24,7 @@ namespace logging {
class Entry {
public:
using time = log_time;
using thread_name_t = std::array<char, 16>;
Entry() = delete;
Entry(short pr, short sub) :
@ -32,8 +33,7 @@ public:
m_prio(pr),
m_subsys(sub)
{
strncpy(m_thread_name, Thread::get_thread_name().data(), 16);
m_thread_name[15] = '\0';
ceph_pthread_getname(m_thread_name.data(), m_thread_name.size());
}
Entry(const Entry &) = default;
Entry& operator=(const Entry &) = default;
@ -47,7 +47,7 @@ public:
time m_stamp;
pthread_t m_thread;
short m_prio, m_subsys;
char m_thread_name[16];
thread_name_t m_thread_name{};
static log_clock& clock() {
static log_clock clock;

View File

@ -31,6 +31,7 @@
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <fmt/ranges.h>
#define MAX_LOG_BUF 65536
@ -372,6 +373,7 @@ void Log::_flush_logbuf()
void Log::_flush(EntryVector& t, bool crash)
{
auto now = mono_clock::now();
long len = 0;
if (t.empty()) {
assert(m_log_buf.empty());
@ -443,10 +445,29 @@ void Log::_flush(EntryVector& t, bool crash)
m_journald->log_entry(e);
}
{
auto [it, _] = m_recent_thread_names.try_emplace(e.m_thread, now, DEFAULT_MAX_THREAD_NAMES);
auto& [t, names] = it->second;
if (names.size() == 0 || names.front() != e.m_thread_name.data()) {
names.push_front(e.m_thread_name.data());
}
t = now;
}
m_recent.push_back(std::move(e));
}
t.clear();
for (auto it = m_recent_thread_names.begin(); it != m_recent_thread_names.end(); ) {
auto t = it->second.first;
auto since = now - t;
if (since > std::chrono::seconds(60*60*24)) {
it = m_recent_thread_names.erase(it);
} else {
++it;
}
}
_flush_logbuf();
}
@ -493,14 +514,10 @@ void Log::dump_recent()
_flush(m_flush, false);
_log_message("--- begin dump of recent events ---", true);
std::set<std::pair<pthread_t, const char *>> recent_pthread_ids;
{
EntryVector t;
t.insert(t.end(), std::make_move_iterator(m_recent.begin()), std::make_move_iterator(m_recent.end()));
m_recent.clear();
for (const auto& e : t) {
recent_pthread_ids.emplace(std::make_pair(e.m_thread, e.m_thread_name));
}
_flush(t, true);
}
@ -515,11 +532,15 @@ void Log::dump_recent()
m_stderr_log, m_stderr_crash), true);
_log_message("--- pthread ID / name mapping for recent threads ---", true);
for (auto& [pthread_id, pthread_name] : recent_pthread_ids)
for (const auto& [tid, t_names] : m_recent_thread_names)
{
[[maybe_unused]] auto [t, names] = t_names;
// we want the ID to be printed in the same format as we use for a log entry.
// The reason is easier grepping.
_log_message(fmt::format(" {:x} / {}", tid_to_int(pthread_id), pthread_name), true);
auto msg = fmt::format(" {:x} / {}",
tid_to_int(tid),
fmt::join(names, ", "));
_log_message(msg, true);
}
_log_message(fmt::format(" max_recent {:9}", m_recent.capacity()), true);

View File

@ -7,6 +7,7 @@
#include <boost/circular_buffer.hpp>
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
@ -14,6 +15,7 @@
#include <string_view>
#include "common/Thread.h"
#include "common/ceph_time.h"
#include "common/likely.h"
#include "log/Entry.h"
@ -86,9 +88,14 @@ protected:
private:
using EntryRing = boost::circular_buffer<ConcreteEntry>;
using mono_clock = ceph::coarse_mono_clock;
using mono_time = ceph::coarse_mono_time;
using RecentThreadNames = std::map<pthread_t, std::pair<mono_time, boost::circular_buffer<std::string> > >;
static const std::size_t DEFAULT_MAX_NEW = 100;
static const std::size_t DEFAULT_MAX_RECENT = 10000;
static constexpr std::size_t DEFAULT_MAX_THREAD_NAMES = 4;
Log **m_indirect_this;
@ -102,6 +109,7 @@ private:
pthread_t m_queue_mutex_holder;
pthread_t m_flush_mutex_holder;
RecentThreadNames m_recent_thread_names; // protected by m_flush_mutex
EntryVector m_new; ///< new entries
EntryRing m_recent; ///< recent (less new) entries we've already written at low detail
EntryVector m_flush; ///< entries to be flushed (here to optimize heap allocations)

View File

@ -75,7 +75,7 @@ void Beacon::init(const MDSMap &mdsmap)
_notify_mdsmap(mdsmap);
sender = std::thread([this]() {
ceph_pthread_setname(pthread_self(), "beacon");
ceph_pthread_setname("mds-beacon");
std::unique_lock<std::mutex> lock(mutex);
bool sent;
while (!finished) {

View File

@ -14378,6 +14378,7 @@ bool MDCache::is_ready_to_trim_cache(void)
void MDCache::upkeep_main(void)
{
ceph_pthread_setname("mds-cache-trim");
std::unique_lock lock(upkeep_mutex);
// create a "memory model" for the upkeep thread. The object maintains

View File

@ -259,7 +259,7 @@ void MDLog::create(MDSContext *c)
logger->set(l_mdl_expos, journaler->get_expire_pos());
logger->set(l_mdl_wrpos, journaler->get_write_pos());
submit_thread.create("md_submit");
submit_thread.create("mds-log-submit");
}
void MDLog::open(MDSContext *c)
@ -268,9 +268,9 @@ void MDLog::open(MDSContext *c)
ceph_assert(!recovery_thread.is_started());
recovery_thread.set_completion(c);
recovery_thread.create("md_recov_open");
recovery_thread.create("mds-log-recvr");
submit_thread.create("md_submit");
submit_thread.create("mds-log-submit");
// either append() or replay() will follow.
}
@ -312,7 +312,7 @@ void MDLog::reopen(MDSContext *c)
recovery_thread.join();
recovery_thread.set_completion(new C_ReopenComplete(this, c));
recovery_thread.create("md_recov_reopen");
recovery_thread.create("mds-log-reopen");
}
void MDLog::append()
@ -663,6 +663,8 @@ bool MDLog::is_trim_slow() const {
}
void MDLog::log_trim_upkeep(void) {
ceph_pthread_setname("mds-log-trim");
dout(10) << dendl;
std::unique_lock mds_lock(mds->mds_lock);
@ -1014,7 +1016,7 @@ void MDLog::replay(MDSContext *c)
}
already_replayed = true;
replay_thread.create("md_log_replay");
replay_thread.create("mds-log-replay");
}

View File

@ -496,7 +496,7 @@ MDSRank::MDSRank(
objecter->unset_honor_pool_full();
finisher = new Finisher(cct, "MDSRank", "MR_Finisher");
finisher = new Finisher(cct, "MDSRank", "mds-rank-fin");
mdcache = new MDCache(this, purge_queue);
mdlog = new MDLog(this);
@ -581,7 +581,7 @@ void MDSRankDispatcher::init()
// who is interested in it.
handle_osd_map();
progress_thread.create("mds_rank_progr");
progress_thread.create("mds-rank-progr");
purge_queue.init();

View File

@ -73,6 +73,7 @@ int MetricAggregator::init() {
m_cct->get_perfcounters_collection()->add(m_perf_counters);
pinger = std::thread([this]() {
ceph_pthread_setname("mds-ping");
std::unique_lock locker(lock);
while (!stopping) {
ping_all_active_ranks();

View File

@ -51,6 +51,7 @@ void MetricsHandler::init() {
dout(10) << dendl;
updater = std::thread([this]() {
ceph_pthread_setname("mds-metrics");
std::unique_lock locker(lock);
while (!stopping) {
double after = g_conf().get_val<std::chrono::seconds>("mds_metrics_update_interval").count();

View File

@ -122,7 +122,7 @@ PurgeQueue::PurgeQueue(
cct(cct_),
rank(rank_),
metadata_pool(metadata_pool_),
finisher(cct, "PurgeQueue", "PQ_Finisher"),
finisher(cct, "PurgeQueue", "mds-pq-fin"),
timer(cct, lock),
filer(objecter_, &finisher),
objecter(objecter_),

View File

@ -30,7 +30,7 @@ class QuiesceAgent {
: quiesce_control(quiesce_control)
, stop_agent_thread(false)
, agent_thread(this) {
agent_thread.create("quiesce.agt");
agent_thread.create("mds-q-agt");
};
virtual ~QuiesceAgent() {

View File

@ -200,7 +200,7 @@ void QuiesceDbManager::update_membership(const QuiesceClusterMembership& new_mem
// start the thread
dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
db_thread_should_exit = false;
quiesce_db_thread.create("quiesce_db_mgr");
quiesce_db_thread.create("mds-q-db");
} else if (quiesce_db_thread.is_started()) {
submit_condition.notify_all();
}

View File

@ -352,7 +352,7 @@ class NetworkStack {
static constexpr int TASK_COMM_LEN = 16;
char tp_name[TASK_COMM_LEN];
sprintf(tp_name, "msgr-worker-%u", id);
ceph_pthread_setname(pthread_self(), tp_name);
ceph_pthread_setname(tp_name);
}
protected:

View File

@ -92,7 +92,6 @@ void RDMADispatcher::polling_start()
ceph_assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
ceph_pthread_setname(t.native_handle(), "rdma-polling");
}
void RDMADispatcher::polling_stop()
@ -263,6 +262,7 @@ int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
void RDMADispatcher::polling()
{
ceph_pthread_setname("rdma-polling");
static int MAX_COMPLETIONS = 32;
ibv_wc wc[MAX_COMPLETIONS];

View File

@ -21,6 +21,7 @@
#include "common/dout.h"
#include "rgw_url.h"
#include <chrono>
#include <fmt/format.h>
#define dout_subsys ceph_subsys_rgw_notification
@ -769,9 +770,10 @@ public:
});
// start the worker threads to do the actual queue processing
const std::string WORKER_THREAD_NAME = "notif-worker";
for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
workers.emplace_back([this]() {
workers.emplace_back([this,worker_id]() {
const auto thread_name = fmt::format("notif-worker-{}", worker_id);
ceph_pthread_setname(thread_name.c_str());
try {
io_context.run();
} catch (const std::exception& err) {
@ -779,11 +781,6 @@ public:
throw err;
}
});
const auto thread_name = WORKER_THREAD_NAME+std::to_string(worker_id);
if (const auto rc = ceph_pthread_setname(workers.back().native_handle(), thread_name.c_str()); rc != 0) {
ldpp_dout(this, 1) << "ERROR: failed to set notification manager thread name to: " << thread_name
<< ". error: " << rc << dendl;
}
}
ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl;
}

View File

@ -650,6 +650,9 @@ private:
// (4) TODO reconnect on connection errors
// (5) TODO cleanup timedout callbacks
void run() noexcept {
// give the runner thread a name for easier debugging
ceph_pthread_setname("amqp_manager");
amqp_frame_t frame;
while (!stopped) {
@ -838,12 +841,6 @@ public:
// This is to prevent rehashing so that iterators are not invalidated
// when a new connection is added.
connections.max_load_factor(10.0);
// give the runner thread a name for easier debugging
const char* thread_name = "amqp_manager";
if (const auto rc = ceph_pthread_setname(runner.native_handle(), thread_name); rc != 0) {
ldout(cct, 1) << "ERROR: failed to set amqp manager thread name to: " << thread_name
<< ". error: " << rc << dendl;
}
}
// non copyable

View File

@ -503,6 +503,7 @@ private:
}
void run() noexcept {
ceph_pthread_setname("kafka_manager");
while (!stopped) {
// publish all messages in the queue
@ -575,12 +576,6 @@ public:
// This is to prevent rehashing so that iterators are not invalidated
// when a new connection is added.
connections.max_load_factor(10.0);
// give the runner thread a name for easier debugging
const char* thread_name = "kafka_manager";
if (const auto rc = ceph_pthread_setname(runner.native_handle(), thread_name); rc != 0) {
ldout(cct, 1) << "ERROR: failed to set kafka manager thread name to: " << thread_name
<< ". error: " << rc << dendl;
}
}
// non copyable

View File

@ -83,11 +83,6 @@ void Background::start() {
}
started = true;
runner = std::thread(&Background::run, this);
const char* thread_name = "lua_background";
if (const auto rc = ceph_pthread_setname(runner.native_handle(), thread_name); rc != 0) {
ldout(cct, 1) << "ERROR: failed to set lua background thread name to: " << thread_name
<< ". error: " << rc << dendl;
}
}
void Background::pause() {
@ -127,6 +122,7 @@ const BackgroundMapValue& Background::get_table_value(const std::string& key) co
//(2) Executes the script
//(3) Sleep (configurable)
void Background::run() {
ceph_pthread_setname("lua_background");
const DoutPrefixProvider* const dpp = &dp;
lua_state_guard lguard(cct->_conf->rgw_lua_max_memory_per_state, dpp);
auto L = lguard.get();

View File

@ -239,6 +239,7 @@ class ActiveRateLimiter : public DoutPrefix {
std::atomic_uint8_t current_active = 0;
std::shared_ptr<RateLimiter> ratelimit[2];
void replace_active() {
ceph_pthread_setname("ratelimit_gc");
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk(cv_m);
while (!stopped) {
@ -286,8 +287,5 @@ class ActiveRateLimiter : public DoutPrefix {
void start() {
ldpp_dout(this, 20) << "starting ratelimit_gc thread" << dendl;
runner = std::thread(&ActiveRateLimiter::replace_active, this);
if (const auto rc = ceph_pthread_setname(runner.native_handle(), "ratelimit_gc"); rc != 0) {
ldpp_dout(this, 1) << "ERROR: failed to set ratelimit_gc thread name. error: " << rc << dendl;
}
}
};