mirror of
https://github.com/ceph/ceph
synced 2025-01-02 17:12:31 +00:00
Merge pull request #1998 from ceph/wip-disk-ioprio
osd: allow io priority for (background) disk threadpool to be set Backport: firefly, dumpling Reviewed-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
commit
235f4ca6dc
@ -13,6 +13,7 @@ libcommon_la_SOURCES = \
|
||||
common/admin_socket_client.cc \
|
||||
common/cmdparse.cc \
|
||||
common/escape.c \
|
||||
common/io_priority.cc \
|
||||
common/Clock.cc \
|
||||
common/Throttle.cc \
|
||||
common/Timer.cc \
|
||||
@ -177,6 +178,7 @@ noinst_HEADERS += \
|
||||
common/arch.h \
|
||||
common/armor.h \
|
||||
common/common_init.h \
|
||||
common/io_priority.h \
|
||||
common/pipe.h \
|
||||
common/code_environment.h \
|
||||
common/signal.h \
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "common/code_environment.h"
|
||||
#include "common/debug.h"
|
||||
#include "common/signal.h"
|
||||
#include "common/io_priority.h"
|
||||
|
||||
#include <dirent.h>
|
||||
#include <errno.h>
|
||||
@ -29,7 +30,10 @@
|
||||
|
||||
|
||||
Thread::Thread()
|
||||
: thread_id(0)
|
||||
: thread_id(0),
|
||||
pid(0),
|
||||
ioprio_class(-1),
|
||||
ioprio_priority(-1)
|
||||
{
|
||||
}
|
||||
|
||||
@ -38,10 +42,24 @@ Thread::~Thread()
|
||||
}
|
||||
|
||||
void *Thread::_entry_func(void *arg) {
|
||||
void *r = ((Thread*)arg)->entry();
|
||||
void *r = ((Thread*)arg)->entry_wrapper();
|
||||
return r;
|
||||
}
|
||||
|
||||
void *Thread::entry_wrapper()
|
||||
{
|
||||
int p = ceph_gettid(); // may return -ENOSYS on other platforms
|
||||
if (p > 0)
|
||||
pid = p;
|
||||
if (ioprio_class >= 0 &&
|
||||
ioprio_priority >= 0) {
|
||||
ceph_ioprio_set(IOPRIO_WHO_PROCESS,
|
||||
pid,
|
||||
IOPRIO_PRIO_VALUE(ioprio_class, ioprio_priority));
|
||||
}
|
||||
return entry();
|
||||
}
|
||||
|
||||
const pthread_t &Thread::get_thread_id()
|
||||
{
|
||||
return thread_id;
|
||||
@ -128,3 +146,15 @@ int Thread::detach()
|
||||
{
|
||||
return pthread_detach(thread_id);
|
||||
}
|
||||
|
||||
int Thread::set_ioprio(int cls, int prio)
|
||||
{
|
||||
// fixme, maybe: this can race with create()
|
||||
ioprio_class = cls;
|
||||
ioprio_priority = prio;
|
||||
if (pid && cls >= 0 && prio >= 0)
|
||||
return ceph_ioprio_set(IOPRIO_WHO_PROCESS,
|
||||
pid,
|
||||
IOPRIO_PRIO_VALUE(cls, prio));
|
||||
return 0;
|
||||
}
|
||||
|
@ -21,6 +21,10 @@
|
||||
class Thread {
|
||||
private:
|
||||
pthread_t thread_id;
|
||||
pid_t pid;
|
||||
int ioprio_class, ioprio_priority;
|
||||
|
||||
void *entry_wrapper();
|
||||
|
||||
public:
|
||||
Thread(const Thread& other);
|
||||
@ -44,6 +48,7 @@ class Thread {
|
||||
void create(size_t stacksize = 0);
|
||||
int join(void **prval = 0);
|
||||
int detach();
|
||||
int set_ioprio(int cls, int prio);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
#include "include/types.h"
|
||||
#include "include/utime.h"
|
||||
#include "common/errno.h"
|
||||
#include "WorkQueue.h"
|
||||
|
||||
#include "common/config.h"
|
||||
@ -33,6 +34,8 @@ ThreadPool::ThreadPool(CephContext *cct_, string nm, int n, const char *option)
|
||||
_stop(false),
|
||||
_pause(0),
|
||||
_draining(0),
|
||||
ioprio_class(-1),
|
||||
ioprio_priority(-1),
|
||||
_num_threads(n),
|
||||
last_work_queue(0),
|
||||
processing(0)
|
||||
@ -156,6 +159,11 @@ void ThreadPool::start_threads()
|
||||
WorkThread *wt = new WorkThread(this);
|
||||
ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
|
||||
_threads.insert(wt);
|
||||
|
||||
int r = wt->set_ioprio(ioprio_class, ioprio_priority);
|
||||
if (r < 0)
|
||||
lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
|
||||
|
||||
wt->create();
|
||||
}
|
||||
}
|
||||
@ -255,6 +263,20 @@ void ThreadPool::drain(WorkQueue_* wq)
|
||||
_lock.Unlock();
|
||||
}
|
||||
|
||||
void ThreadPool::set_ioprio(int cls, int priority)
|
||||
{
|
||||
Mutex::Locker l(_lock);
|
||||
ioprio_class = cls;
|
||||
ioprio_priority = priority;
|
||||
for (set<WorkThread*>::iterator p = _threads.begin();
|
||||
p != _threads.end();
|
||||
++p) {
|
||||
int r = (*p)->set_ioprio(cls, priority);
|
||||
if (r < 0)
|
||||
lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
ShardedThreadPool::ShardedThreadPool(CephContext *pcct_, string nm,
|
||||
uint32_t pnum_threads): cct(pcct_),name(nm),lockname(nm + "::lock"),
|
||||
shardedpool_lock(lockname.c_str()),num_threads(pnum_threads),stop_threads(0),
|
||||
|
@ -33,6 +33,7 @@ class ThreadPool : public md_config_obs_t {
|
||||
int _pause;
|
||||
int _draining;
|
||||
Cond _wait_cond;
|
||||
int ioprio_class, ioprio_priority;
|
||||
|
||||
public:
|
||||
class TPHandle {
|
||||
@ -388,6 +389,9 @@ public:
|
||||
void unpause();
|
||||
/// wait for all work to complete
|
||||
void drain(WorkQueue_* wq = 0);
|
||||
|
||||
/// set io priority
|
||||
void set_ioprio(int cls, int priority);
|
||||
};
|
||||
|
||||
class GenContextWQ :
|
||||
|
@ -467,6 +467,8 @@ OPTION(osd_peering_wq_batch_size, OPT_U64, 20)
|
||||
OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304)
|
||||
OPTION(osd_op_pq_min_cost, OPT_U64, 65536)
|
||||
OPTION(osd_disk_threads, OPT_INT, 1)
|
||||
OPTION(osd_disk_thread_ioprio_class, OPT_STR, "") // rt realtime be besteffort best effort idle
|
||||
OPTION(osd_disk_thread_ioprio_priority, OPT_INT, -1) // 0-7
|
||||
OPTION(osd_recovery_threads, OPT_INT, 1)
|
||||
OPTION(osd_recover_clone_overlap, OPT_BOOL, true) // preserve clone_overlap during recovery/migration
|
||||
OPTION(osd_op_num_threads_per_shard, OPT_INT, 2)
|
||||
|
54
src/common/io_priority.cc
Normal file
54
src/common/io_priority.cc
Normal file
@ -0,0 +1,54 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2012 Red Hat
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/syscall.h> /* For SYS_xxx definitions */
|
||||
#include <algorithm>
|
||||
#include <errno.h>
|
||||
|
||||
#include "common/errno.h"
|
||||
#include "io_priority.h"
|
||||
|
||||
pid_t ceph_gettid(void)
|
||||
{
|
||||
#ifdef __linux__
|
||||
return syscall(SYS_gettid);
|
||||
#else
|
||||
return -ENOSYS;
|
||||
#endif
|
||||
}
|
||||
|
||||
int ceph_ioprio_set(int whence, int who, int ioprio)
|
||||
{
|
||||
#ifdef __linux__
|
||||
return syscall(SYS_ioprio_set, whence, who, ioprio);
|
||||
#else
|
||||
return -ENOSYS;
|
||||
#endif
|
||||
}
|
||||
|
||||
int ceph_ioprio_string_to_class(const std::string& s)
|
||||
{
|
||||
std::string l;
|
||||
std::transform(s.begin(), s.end(), l.begin(), ::tolower);
|
||||
|
||||
if (l == "idle")
|
||||
return IOPRIO_CLASS_IDLE;
|
||||
if (l == "be" || l == "besteffort" || l == "best effort")
|
||||
return IOPRIO_CLASS_BE;
|
||||
if (l == "rt" || l == "realtime" || l == "real time")
|
||||
return IOPRIO_CLASS_RT;
|
||||
return -EINVAL;
|
||||
}
|
44
src/common/io_priority.h
Normal file
44
src/common/io_priority.h
Normal file
@ -0,0 +1,44 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2012 Red Hat
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef CEPH_COMMON_IO_PRIORITY_H
|
||||
#define CEPH_COMMON_IO_PRIORITY_H
|
||||
|
||||
#include <string>
|
||||
|
||||
extern pid_t ceph_gettid();
|
||||
|
||||
#ifndef IOPRIO_WHO_PROCESS
|
||||
# define IOPRIO_WHO_PROCESS 1
|
||||
#endif
|
||||
#ifndef IOPRIO_PRIO_VALUE
|
||||
# define IOPRIO_CLASS_SHIFT 13
|
||||
# define IOPRIO_PRIO_VALUE(class, data) \
|
||||
(((class) << IOPRIO_CLASS_SHIFT) | (data))
|
||||
#endif
|
||||
#ifndef IOPRIO_CLASS_RT
|
||||
# define IOPRIO_CLASS_RT 1
|
||||
#endif
|
||||
#ifndef IOPRIO_CLASS_BE
|
||||
# define IOPRIO_CLASS_BE 2
|
||||
#endif
|
||||
#ifndef IOPRIO_CLASS_IDLE
|
||||
# define IOPRIO_CLASS_IDLE 3
|
||||
#endif
|
||||
|
||||
extern int ceph_ioprio_set(int whence, int who, int ioprio);
|
||||
|
||||
extern int ceph_ioprio_string_to_class(const std::string& s);
|
||||
|
||||
#endif
|
@ -42,6 +42,7 @@
|
||||
|
||||
#include "common/ceph_argparse.h"
|
||||
#include "common/version.h"
|
||||
#include "common/io_priority.h"
|
||||
|
||||
#include "os/ObjectStore.h"
|
||||
|
||||
@ -1291,6 +1292,8 @@ int OSD::init()
|
||||
disk_tp.start();
|
||||
command_tp.start();
|
||||
|
||||
set_disk_tp_priority();
|
||||
|
||||
// start the heartbeat
|
||||
heartbeat_thread.create();
|
||||
|
||||
@ -8249,6 +8252,8 @@ const char** OSD::get_tracked_conf_keys() const
|
||||
"osd_map_cache_size",
|
||||
"osd_map_max_advance",
|
||||
"osd_pg_epoch_persisted_max_stale",
|
||||
"osd_disk_thread_ioprio_class",
|
||||
"osd_disk_thread_ioprio_priority",
|
||||
NULL
|
||||
};
|
||||
return KEYS;
|
||||
@ -8275,6 +8280,10 @@ void OSD::handle_conf_change(const struct md_config_t *conf,
|
||||
op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size,
|
||||
cct->_conf->osd_op_history_duration);
|
||||
}
|
||||
if (changed.count("osd_disk_thread_ioprio_class") ||
|
||||
changed.count("osd_disk_thread_ioprio_priority")) {
|
||||
set_disk_tp_priority();
|
||||
}
|
||||
|
||||
check_config();
|
||||
}
|
||||
@ -8294,6 +8303,17 @@ void OSD::check_config()
|
||||
}
|
||||
}
|
||||
|
||||
void OSD::set_disk_tp_priority()
|
||||
{
|
||||
dout(10) << __func__
|
||||
<< " class " << cct->_conf->osd_disk_thread_ioprio_class
|
||||
<< " priority " << cct->_conf->osd_disk_thread_ioprio_priority
|
||||
<< dendl;
|
||||
int cls =
|
||||
ceph_ioprio_string_to_class(cct->_conf->osd_disk_thread_ioprio_class);
|
||||
disk_tp.set_ioprio(cls, cct->_conf->osd_disk_thread_ioprio_priority);
|
||||
}
|
||||
|
||||
// --------------------------------
|
||||
|
||||
int OSD::init_op_flags(OpRequestRef op)
|
||||
|
@ -1090,6 +1090,8 @@ private:
|
||||
|
||||
bool paused_recovery;
|
||||
|
||||
void set_disk_tp_priority();
|
||||
|
||||
// -- sessions --
|
||||
public:
|
||||
struct Session : public RefCountedObject {
|
||||
|
Loading…
Reference in New Issue
Block a user