mirror of
https://github.com/ceph/ceph
synced 2025-01-03 01:22:53 +00:00
Merge pull request #36551 from CongMinYin/immutable_object_cache_throttle
tools: add throttle mechanism to immutable object cache Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
commit
6f4e3b0dd1
@ -369,7 +369,7 @@ class ClusterConfigurationTest(DashboardTestCase):
|
||||
self.assertIn('type', data)
|
||||
self.assertIn('desc', data)
|
||||
self.assertIn(data['type'], ['str', 'bool', 'float', 'int', 'size', 'uint', 'addr',
|
||||
'addrvec', 'uuid', 'secs'])
|
||||
'addrvec', 'uuid', 'secs', 'millisecs'])
|
||||
|
||||
if 'value' in data:
|
||||
self.assertIn('source', data)
|
||||
|
@ -49,6 +49,9 @@ public:
|
||||
void operator()(const std::chrono::seconds v) const {
|
||||
out << v.count();
|
||||
}
|
||||
void operator()(const std::chrono::milliseconds v) const {
|
||||
out << v.count();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -203,6 +206,13 @@ int Option::parse_value(
|
||||
*error_message = e.what();
|
||||
return -EINVAL;
|
||||
}
|
||||
} else if (type == Option::TYPE_MILLISECS) {
|
||||
try {
|
||||
*out = boost::lexical_cast<uint64_t>(val);
|
||||
} catch (const boost::bad_lexical_cast& e) {
|
||||
*error_message = e.what();
|
||||
return -EINVAL;
|
||||
}
|
||||
} else {
|
||||
ceph_abort();
|
||||
}
|
||||
@ -7894,6 +7904,37 @@ static std::vector<Option> get_immutable_object_cache_options() {
|
||||
Option("immutable_object_cache_watermark", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
|
||||
.set_default(0.1)
|
||||
.set_description("immutable object cache water mark"),
|
||||
|
||||
Option("immutable_object_cache_qos_schedule_tick_min", Option::TYPE_MILLISECS, Option::LEVEL_ADVANCED)
|
||||
.set_default(50)
|
||||
.set_min(1)
|
||||
.set_description("minimum schedule tick for immutable object cache"),
|
||||
|
||||
Option("immutable_object_cache_qos_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
|
||||
.set_default(0)
|
||||
.set_description("the desired immutable object cache IO operations limit per second"),
|
||||
|
||||
Option("immutable_object_cache_qos_iops_burst", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
|
||||
.set_default(0)
|
||||
.set_description("the desired burst limit of immutable object cache IO operations"),
|
||||
|
||||
Option("immutable_object_cache_qos_iops_burst_seconds", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
|
||||
.set_default(1)
|
||||
.set_min(1)
|
||||
.set_description("the desired burst duration in seconds of immutable object cache IO operations"),
|
||||
|
||||
Option("immutable_object_cache_qos_bps_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
|
||||
.set_default(0)
|
||||
.set_description("the desired immutable object cache IO bytes limit per second"),
|
||||
|
||||
Option("immutable_object_cache_qos_bps_burst", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
|
||||
.set_default(0)
|
||||
.set_description("the desired burst limit of immutable object cache IO bytes"),
|
||||
|
||||
Option("immutable_object_cache_qos_bps_burst_seconds", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
|
||||
.set_default(1)
|
||||
.set_min(1)
|
||||
.set_description("the desired burst duration in seconds of immutable object cache IO bytes"),
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ struct Option {
|
||||
TYPE_UUID = 7,
|
||||
TYPE_SIZE = 8,
|
||||
TYPE_SECS = 9,
|
||||
TYPE_MILLISECS = 10,
|
||||
};
|
||||
|
||||
static const char *type_to_c_type_str(type_t t) {
|
||||
@ -37,6 +38,7 @@ struct Option {
|
||||
case TYPE_UUID: return "uuid_d";
|
||||
case TYPE_SIZE: return "size_t";
|
||||
case TYPE_SECS: return "secs";
|
||||
case TYPE_MILLISECS: return "millisecs";
|
||||
default: return "unknown";
|
||||
}
|
||||
}
|
||||
@ -52,6 +54,7 @@ struct Option {
|
||||
case TYPE_UUID: return "uuid";
|
||||
case TYPE_SIZE: return "size";
|
||||
case TYPE_SECS: return "secs";
|
||||
case TYPE_MILLISECS: return "millisecs";
|
||||
default: return "unknown";
|
||||
}
|
||||
}
|
||||
@ -86,6 +89,9 @@ struct Option {
|
||||
if (s == "secs") {
|
||||
return TYPE_SECS;
|
||||
}
|
||||
if (s == "millisecs") {
|
||||
return TYPE_MILLISECS;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -140,6 +146,7 @@ struct Option {
|
||||
entity_addr_t,
|
||||
entity_addrvec_t,
|
||||
std::chrono::seconds,
|
||||
std::chrono::milliseconds,
|
||||
size_t,
|
||||
uuid_d>;
|
||||
const std::string name;
|
||||
@ -215,6 +222,8 @@ struct Option {
|
||||
value = size_t{0}; break;
|
||||
case TYPE_SECS:
|
||||
value = std::chrono::seconds{0}; break;
|
||||
case TYPE_MILLISECS:
|
||||
value = std::chrono::milliseconds{0}; break;
|
||||
default:
|
||||
ceph_abort();
|
||||
}
|
||||
@ -265,6 +274,8 @@ struct Option {
|
||||
v = size_t{static_cast<std::size_t>(new_value)}; break;
|
||||
case TYPE_SECS:
|
||||
v = std::chrono::seconds{new_value}; break;
|
||||
case TYPE_MILLISECS:
|
||||
v = std::chrono::milliseconds{new_value}; break;
|
||||
default:
|
||||
std::cerr << "Bad type in set_value: " << name << ": "
|
||||
<< typeid(T).name() << std::endl;
|
||||
@ -377,10 +388,11 @@ struct Option {
|
||||
{
|
||||
return
|
||||
(has_flag(FLAG_RUNTIME)
|
||||
|| (!has_flag(FLAG_MGR)
|
||||
&& (type == TYPE_BOOL || type == TYPE_INT
|
||||
|| type == TYPE_UINT || type == TYPE_FLOAT
|
||||
|| type == TYPE_SIZE || type == TYPE_SECS)))
|
||||
|| (!has_flag(FLAG_MGR)
|
||||
&& (type == TYPE_BOOL || type == TYPE_INT
|
||||
|| type == TYPE_UINT || type == TYPE_FLOAT
|
||||
|| type == TYPE_SIZE || type == TYPE_SECS
|
||||
|| type == TYPE_MILLISECS)))
|
||||
&& !has_flag(FLAG_STARTUP)
|
||||
&& !has_flag(FLAG_CLUSTER_CREATE)
|
||||
&& !has_flag(FLAG_CREATE);
|
||||
|
@ -102,6 +102,7 @@ bool ParentCacheObjectDispatch<I>::read(
|
||||
m_cache_client->lookup_object(m_image_ctx->data_ctx.get_namespace(),
|
||||
m_image_ctx->data_ctx.get_id(),
|
||||
io_context->read_snap().value_or(CEPH_NOSNAP),
|
||||
m_image_ctx->layout.object_size,
|
||||
oid, std::move(ctx));
|
||||
return true;
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ PyObject *get_python_typed_option_value(
|
||||
case Option::TYPE_SIZE:
|
||||
return PyLong_FromString((char *)value.c_str(), nullptr, 0);
|
||||
case Option::TYPE_SECS:
|
||||
case Option::TYPE_MILLISECS:
|
||||
case Option::TYPE_FLOAT:
|
||||
{
|
||||
PyObject *s = PyUnicode_FromString(value.c_str());
|
||||
|
@ -24,8 +24,8 @@ class MockCacheClient {
|
||||
MOCK_METHOD0(stop, void());
|
||||
MOCK_METHOD0(connect, int());
|
||||
MOCK_METHOD1(connect, void(Context*));
|
||||
MOCK_METHOD5(lookup_object, void(std::string, uint64_t, uint64_t, std::string,
|
||||
CacheGenContextURef));
|
||||
MOCK_METHOD6(lookup_object, void(std::string, uint64_t, uint64_t, uint64_t,
|
||||
std::string, CacheGenContextURef));
|
||||
MOCK_METHOD1(register_client, int(Context*));
|
||||
};
|
||||
|
||||
|
@ -122,7 +122,7 @@ public:
|
||||
usleep(1);
|
||||
}
|
||||
|
||||
m_cache_client->lookup_object("pool_nspace", 1, 2, "object_name", std::move(ctx));
|
||||
m_cache_client->lookup_object("pool_nspace", 1, 2, 3, "object_name", std::move(ctx));
|
||||
m_send_request_index++;
|
||||
}
|
||||
m_wait_event.wait();
|
||||
@ -135,7 +135,7 @@ public:
|
||||
hit = ack->type == RBDSC_READ_REPLY;
|
||||
m_wait_event.signal();
|
||||
});
|
||||
m_cache_client->lookup_object(pool_nspace, 1, 2, object_id, std::move(ctx));
|
||||
m_cache_client->lookup_object(pool_nspace, 1, 2, 3, object_id, std::move(ctx));
|
||||
m_wait_event.wait();
|
||||
return hit;
|
||||
}
|
||||
|
@ -16,10 +16,11 @@ TEST(test_for_message, test_1)
|
||||
uint64_t read_len = 333333UL;
|
||||
uint64_t pool_id = 444444UL;
|
||||
uint64_t snap_id = 555555UL;
|
||||
uint64_t object_size = 666666UL;
|
||||
|
||||
// ObjectRequest --> bufferlist
|
||||
ObjectCacheRequest* req = new ObjectCacheReadData(type, seq, read_offset, read_len,
|
||||
pool_id, snap_id, oid_name, pool_nspace);
|
||||
pool_id, snap_id, object_size, oid_name, pool_nspace);
|
||||
req->encode();
|
||||
auto payload_bl = req->get_payload_bufferlist();
|
||||
|
||||
@ -40,8 +41,9 @@ TEST(test_for_message, test_1)
|
||||
ASSERT_EQ(((ObjectCacheReadData*)req_decode)->read_len, 333333UL);
|
||||
ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_id, 444444UL);
|
||||
ASSERT_EQ(((ObjectCacheReadData*)req_decode)->snap_id, 555555UL);
|
||||
ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_namespace, pool_nspace);
|
||||
ASSERT_EQ(((ObjectCacheReadData*)req_decode)->oid, oid_name);
|
||||
ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_namespace, pool_nspace);
|
||||
ASSERT_EQ(((ObjectCacheReadData*)req_decode)->object_size, 666666UL);
|
||||
|
||||
delete req;
|
||||
delete req_decode;
|
||||
|
@ -126,7 +126,7 @@ public:
|
||||
});
|
||||
m_send_request_index++;
|
||||
// here just for concurrently testing register + lookup, so fix object id.
|
||||
m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, "1234", std::move(ctx));
|
||||
m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, 3, "1234", std::move(ctx));
|
||||
}
|
||||
|
||||
if (is_last) {
|
||||
|
@ -62,7 +62,8 @@ public:
|
||||
m_object_cache_store = new ObjectCacheStore(m_ceph_context);
|
||||
}
|
||||
|
||||
void init_object_cache_store(std::string pool_name, std::string vol_name, uint64_t vol_size, bool reset) {
|
||||
void init_object_cache_store(std::string pool_name, std::string vol_name,
|
||||
uint64_t vol_size, bool reset) {
|
||||
ASSERT_EQ(0, m_object_cache_store->init(reset));
|
||||
ASSERT_EQ(0, m_object_cache_store->init_cache());
|
||||
}
|
||||
@ -71,10 +72,11 @@ public:
|
||||
ASSERT_EQ(0, m_object_cache_store->shutdown());
|
||||
}
|
||||
|
||||
void lookup_object_cache_store(std::string pool_name, std::string vol_name, std::string obj_name, int& ret) {
|
||||
void lookup_object_cache_store(std::string pool_name, std::string vol_name,
|
||||
std::string obj_name, int& ret) {
|
||||
std::string cache_path;
|
||||
ret = m_object_cache_store->lookup_object(pool_name, 1, 2, obj_name, true,
|
||||
cache_path);
|
||||
ret = m_object_cache_store->lookup_object(pool_name, 1, 2, 3,
|
||||
obj_name, true, cache_path);
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
|
@ -105,8 +105,8 @@ public :
|
||||
void expect_cache_lookup_object(MockParentImageCache& mparent_image_cache,
|
||||
const std::string &cache_path) {
|
||||
EXPECT_CALL(*(mparent_image_cache.get_cache_client()),
|
||||
lookup_object(_, _, _, _, _))
|
||||
.WillOnce(WithArg<4>(Invoke([cache_path](CacheGenContextURef on_finish) {
|
||||
lookup_object(_, _, _, _, _, _))
|
||||
.WillOnce(WithArg<5>(Invoke([cache_path](CacheGenContextURef on_finish) {
|
||||
auto ack = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, 0, cache_path);
|
||||
on_finish.release()->complete(ack);
|
||||
})));
|
||||
|
@ -113,12 +113,13 @@ namespace immutable_obj_cache {
|
||||
}
|
||||
|
||||
void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
|
||||
uint64_t snap_id, std::string oid,
|
||||
uint64_t snap_id, uint64_t object_size,
|
||||
std::string oid,
|
||||
CacheGenContextURef&& on_finish) {
|
||||
ldout(m_cct, 20) << dendl;
|
||||
ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
|
||||
++m_sequence_id, 0, 0,
|
||||
pool_id, snap_id, oid, pool_nspace);
|
||||
++m_sequence_id, 0, 0, pool_id,
|
||||
snap_id, object_size, oid, pool_nspace);
|
||||
req->process_msg = std::move(on_finish);
|
||||
req->encode();
|
||||
|
||||
|
@ -32,7 +32,7 @@ class CacheClient {
|
||||
int connect();
|
||||
void connect(Context* on_finish);
|
||||
void lookup_object(std::string pool_nspace, uint64_t pool_id,
|
||||
uint64_t snap_id, std::string oid,
|
||||
uint64_t snap_id, uint64_t object_size, std::string oid,
|
||||
CacheGenContextURef&& on_finish);
|
||||
int register_client(Context* on_finish);
|
||||
|
||||
|
@ -25,7 +25,6 @@ CacheController::~CacheController() {
|
||||
|
||||
int CacheController::init() {
|
||||
ldout(m_cct, 20) << dendl;
|
||||
|
||||
m_object_cache_store = new ObjectCacheStore(m_cct);
|
||||
// TODO(dehao): make this configurable
|
||||
int r = m_object_cache_store->init(true);
|
||||
@ -118,8 +117,8 @@ void CacheController::handle_request(CacheSession* session,
|
||||
bool return_dne_path = session->client_version().empty();
|
||||
int ret = m_object_cache_store->lookup_object(
|
||||
req_read_data->pool_namespace, req_read_data->pool_id,
|
||||
req_read_data->snap_id, req_read_data->oid, return_dne_path,
|
||||
cache_path);
|
||||
req_read_data->snap_id, req_read_data->object_size,
|
||||
req_read_data->oid, return_dne_path, cache_path);
|
||||
ObjectCacheRequest* reply = nullptr;
|
||||
if (ret != OBJ_CACHE_PROMOTED && ret != OBJ_CACHE_DNE) {
|
||||
reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
|
||||
|
@ -21,6 +21,30 @@ namespace fs = std::experimental::filesystem;
|
||||
namespace ceph {
|
||||
namespace immutable_obj_cache {
|
||||
|
||||
namespace {
|
||||
|
||||
class SafeTimerSingleton : public SafeTimer {
|
||||
public:
|
||||
ceph::mutex lock = ceph::make_mutex
|
||||
("ceph::immutable_object_cache::SafeTimerSingleton::lock");
|
||||
|
||||
explicit SafeTimerSingleton(CephContext *cct)
|
||||
: SafeTimer(cct, lock, true) {
|
||||
init();
|
||||
}
|
||||
~SafeTimerSingleton() {
|
||||
std::lock_guard locker{lock};
|
||||
shutdown();
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
enum ThrottleTargetCode {
|
||||
ROC_QOS_IOPS_THROTTLE = 1,
|
||||
ROC_QOS_BPS_THROTTLE = 2
|
||||
};
|
||||
|
||||
ObjectCacheStore::ObjectCacheStore(CephContext *cct)
|
||||
: m_cct(cct), m_rados(new librados::Rados()) {
|
||||
|
||||
@ -40,12 +64,44 @@ ObjectCacheStore::ObjectCacheStore(CephContext *cct)
|
||||
uint64_t max_inflight_ops =
|
||||
m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
|
||||
|
||||
uint64_t limit = 0;
|
||||
if ((limit = m_cct->_conf.get_val<uint64_t>
|
||||
("immutable_object_cache_qos_iops_limit")) != 0) {
|
||||
apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE,
|
||||
m_cct->_conf.get_val<std::chrono::milliseconds>
|
||||
("immutable_object_cache_qos_schedule_tick_min"),
|
||||
limit,
|
||||
m_cct->_conf.get_val<uint64_t>
|
||||
("immutable_object_cache_qos_iops_burst"),
|
||||
m_cct->_conf.get_val<std::chrono::seconds>
|
||||
("immutable_object_cache_qos_iops_burst_seconds"));
|
||||
}
|
||||
if ((limit = m_cct->_conf.get_val<uint64_t>
|
||||
("immutable_object_cache_qos_bps_limit")) != 0) {
|
||||
apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE,
|
||||
m_cct->_conf.get_val<std::chrono::milliseconds>
|
||||
("immutable_object_cache_qos_schedule_tick_min"),
|
||||
limit,
|
||||
m_cct->_conf.get_val<uint64_t>
|
||||
("immutable_object_cache_qos_bps_burst"),
|
||||
m_cct->_conf.get_val<std::chrono::seconds>
|
||||
("immutable_object_cache_qos_bps_burst_seconds"));
|
||||
}
|
||||
|
||||
m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops,
|
||||
cache_watermark);
|
||||
}
|
||||
|
||||
ObjectCacheStore::~ObjectCacheStore() {
|
||||
delete m_policy;
|
||||
if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) {
|
||||
ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr);
|
||||
delete m_throttles[ROC_QOS_IOPS_THROTTLE];
|
||||
}
|
||||
if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) {
|
||||
ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr);
|
||||
delete m_throttles[ROC_QOS_BPS_THROTTLE];
|
||||
}
|
||||
}
|
||||
|
||||
int ObjectCacheStore::init(bool reset) {
|
||||
@ -97,9 +153,8 @@ int ObjectCacheStore::init_cache() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ObjectCacheStore::do_promote(std::string pool_nspace,
|
||||
uint64_t pool_id, uint64_t snap_id,
|
||||
std::string object_name) {
|
||||
int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id,
|
||||
uint64_t snap_id, std::string object_name) {
|
||||
ldout(m_cct, 20) << "to promote object: " << object_name
|
||||
<< " from pool id: " << pool_id
|
||||
<< " namespace: " << pool_nspace
|
||||
@ -183,8 +238,8 @@ int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObjectCacheStore::lookup_object(std::string pool_nspace,
|
||||
uint64_t pool_id, uint64_t snap_id,
|
||||
int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id,
|
||||
uint64_t snap_id, uint64_t object_size,
|
||||
std::string object_name,
|
||||
bool return_dne_path,
|
||||
std::string& target_cache_file_path) {
|
||||
@ -198,9 +253,13 @@ int ObjectCacheStore::lookup_object(std::string pool_nspace,
|
||||
|
||||
switch (ret) {
|
||||
case OBJ_CACHE_NONE: {
|
||||
pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
|
||||
if (pret < 0) {
|
||||
lderr(m_cct) << "fail to start promote" << dendl;
|
||||
if (take_token_from_throttle(object_size, 1)) {
|
||||
pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
|
||||
if (pret < 0) {
|
||||
lderr(m_cct) << "fail to start promote" << dendl;
|
||||
}
|
||||
} else {
|
||||
m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -307,5 +366,95 @@ std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name,
|
||||
return m_cache_root_dir + cache_file_dir + cache_file_name;
|
||||
}
|
||||
|
||||
void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) {
|
||||
m_io_throttled = false;
|
||||
std::lock_guard lock(m_throttle_lock);
|
||||
if (type & ROC_QOS_IOPS_THROTTLE){
|
||||
m_iops_tokens += tokens;
|
||||
} else if (type & ROC_QOS_BPS_THROTTLE){
|
||||
m_bps_tokens += tokens;
|
||||
} else {
|
||||
lderr(m_cct) << "unknow throttle type." << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size,
|
||||
uint64_t object_num) {
|
||||
if (m_io_throttled == true) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int flag = 0;
|
||||
bool wait = false;
|
||||
if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) {
|
||||
std::lock_guard lock(m_throttle_lock);
|
||||
if (object_num > m_iops_tokens) {
|
||||
wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this,
|
||||
&ObjectCacheStore::handle_throttle_ready, object_num,
|
||||
ROC_QOS_IOPS_THROTTLE);
|
||||
} else {
|
||||
m_iops_tokens -= object_num;
|
||||
flag = 1;
|
||||
}
|
||||
}
|
||||
if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) {
|
||||
std::lock_guard lock(m_throttle_lock);
|
||||
if (object_size > m_bps_tokens) {
|
||||
wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this,
|
||||
&ObjectCacheStore::handle_throttle_ready, object_size,
|
||||
ROC_QOS_BPS_THROTTLE);
|
||||
} else {
|
||||
m_bps_tokens -= object_size;
|
||||
}
|
||||
}
|
||||
|
||||
if (wait) {
|
||||
m_io_throttled = true;
|
||||
// when passing iops throttle, but limit in bps throttle, recovery
|
||||
if (flag == 1) {
|
||||
std::lock_guard lock(m_throttle_lock);
|
||||
m_iops_tokens += object_num;
|
||||
}
|
||||
}
|
||||
|
||||
return !wait;
|
||||
}
|
||||
|
||||
static const std::map<uint64_t, std::string> THROTTLE_FLAGS = {
|
||||
{ ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" },
|
||||
{ ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" }
|
||||
};
|
||||
|
||||
void ObjectCacheStore::apply_qos_tick_and_limit(
|
||||
const uint64_t flag,
|
||||
std::chrono::milliseconds min_tick,
|
||||
uint64_t limit,
|
||||
uint64_t burst,
|
||||
std::chrono::seconds burst_seconds) {
|
||||
SafeTimerSingleton* safe_timer_singleton = nullptr;
|
||||
TokenBucketThrottle* throttle = nullptr;
|
||||
safe_timer_singleton =
|
||||
&m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
|
||||
"tools::immutable_object_cache", false, m_cct);
|
||||
SafeTimer* timer = safe_timer_singleton;
|
||||
ceph::mutex* timer_lock = &safe_timer_singleton->lock;
|
||||
m_qos_enabled_flag |= flag;
|
||||
auto throttle_flags_it = THROTTLE_FLAGS.find(flag);
|
||||
ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end());
|
||||
throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second,
|
||||
0, 0, timer, timer_lock);
|
||||
throttle->set_schedule_tick_min(min_tick.count());
|
||||
int ret = throttle->set_limit(limit, burst, burst_seconds.count());
|
||||
if (ret < 0) {
|
||||
lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: "
|
||||
<< "burst(" << burst << ") is less than "
|
||||
<< "limit(" << limit << ")" << dendl;
|
||||
throttle->set_limit(limit, 0, 1);
|
||||
}
|
||||
|
||||
ceph_assert(m_throttles.find(flag) == m_throttles.end());
|
||||
m_throttles.insert({flag, throttle});
|
||||
}
|
||||
|
||||
} // namespace immutable_obj_cache
|
||||
} // namespace ceph
|
||||
|
@ -6,6 +6,8 @@
|
||||
|
||||
#include "common/ceph_context.h"
|
||||
#include "common/ceph_mutex.h"
|
||||
#include "common/Throttle.h"
|
||||
#include "common/Cond.h"
|
||||
#include "include/rados/librados.hpp"
|
||||
|
||||
#include "SimplePolicy.h"
|
||||
@ -30,11 +32,16 @@ class ObjectCacheStore {
|
||||
int init_cache();
|
||||
int lookup_object(std::string pool_nspace,
|
||||
uint64_t pool_id, uint64_t snap_id,
|
||||
uint64_t object_size,
|
||||
std::string object_name,
|
||||
bool return_dne_path,
|
||||
std::string& target_cache_file_path);
|
||||
|
||||
private:
|
||||
enum ThrottleTypeCode {
|
||||
THROTTLE_CODE_BYTE,
|
||||
THROTTLE_CODE_OBJECT
|
||||
};
|
||||
|
||||
std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
|
||||
uint64_t snap_id, std::string oid);
|
||||
std::string get_cache_file_path(std::string cache_file_name,
|
||||
@ -48,6 +55,13 @@ class ObjectCacheStore {
|
||||
int handle_promote_callback(int, bufferlist*, std::string);
|
||||
int do_evict(std::string cache_file);
|
||||
|
||||
bool take_token_from_throttle(uint64_t object_size, uint64_t object_num);
|
||||
void handle_throttle_ready(uint64_t tokens, uint64_t type);
|
||||
void apply_qos_tick_and_limit(const uint64_t flag,
|
||||
std::chrono::milliseconds min_tick,
|
||||
uint64_t limit, uint64_t burst,
|
||||
std::chrono::seconds burst_seconds);
|
||||
|
||||
CephContext *m_cct;
|
||||
RadosRef m_rados;
|
||||
std::map<uint64_t, librados::IoCtx> m_ioctx_map;
|
||||
@ -55,6 +69,14 @@ class ObjectCacheStore {
|
||||
ceph::make_mutex("ceph::cache::ObjectCacheStore::m_ioctx_map_lock");
|
||||
Policy* m_policy;
|
||||
std::string m_cache_root_dir;
|
||||
// throttle mechanism
|
||||
uint64_t m_qos_enabled_flag{0};
|
||||
std::map<uint64_t, TokenBucketThrottle*> m_throttles;
|
||||
bool m_io_throttled{false};
|
||||
ceph::mutex m_throttle_lock =
|
||||
ceph::make_mutex("ceph::cache::ObjectCacheStore::m_throttle_lock");;
|
||||
uint64_t m_iops_tokens{0};
|
||||
uint64_t m_bps_tokens{0};
|
||||
};
|
||||
|
||||
} // namespace immutable_obj_cache
|
||||
|
@ -17,7 +17,7 @@ ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
|
||||
ObjectCacheRequest::~ObjectCacheRequest() {}
|
||||
|
||||
void ObjectCacheRequest::encode() {
|
||||
ENCODE_START(1, 1, payload);
|
||||
ENCODE_START(2, 1, payload);
|
||||
ceph::encode(type, payload);
|
||||
ceph::encode(seq, payload);
|
||||
if (!payload_empty()) {
|
||||
@ -28,11 +28,11 @@ void ObjectCacheRequest::encode() {
|
||||
|
||||
void ObjectCacheRequest::decode(bufferlist& bl) {
|
||||
auto i = bl.cbegin();
|
||||
DECODE_START(1, i);
|
||||
DECODE_START(2, i);
|
||||
ceph::decode(type, i);
|
||||
ceph::decode(seq, i);
|
||||
if (!payload_empty()) {
|
||||
decode_payload(i);
|
||||
decode_payload(i, struct_v);
|
||||
}
|
||||
DECODE_FINISH(i);
|
||||
}
|
||||
@ -52,7 +52,8 @@ void ObjectCacheRegData::encode_payload() {
|
||||
ceph::encode(version, payload);
|
||||
}
|
||||
|
||||
void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) {
|
||||
void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i,
|
||||
__u8 encode_version) {
|
||||
if (i.end()) {
|
||||
return;
|
||||
}
|
||||
@ -67,17 +68,19 @@ ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {}
|
||||
|
||||
void ObjectCacheRegReplyData::encode_payload() {}
|
||||
|
||||
void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {}
|
||||
void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl,
|
||||
__u8 encode_version) {}
|
||||
|
||||
ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
|
||||
uint64_t read_offset,
|
||||
uint64_t read_len,
|
||||
uint64_t pool_id, uint64_t snap_id,
|
||||
uint64_t object_size,
|
||||
std::string oid,
|
||||
std::string pool_namespace)
|
||||
: ObjectCacheRequest(t, s), read_offset(read_offset),
|
||||
read_len(read_len), pool_id(pool_id), snap_id(snap_id),
|
||||
oid(oid), pool_namespace(pool_namespace)
|
||||
object_size(object_size), oid(oid), pool_namespace(pool_namespace)
|
||||
{}
|
||||
|
||||
ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s)
|
||||
@ -92,15 +95,20 @@ void ObjectCacheReadData::encode_payload() {
|
||||
ceph::encode(snap_id, payload);
|
||||
ceph::encode(oid, payload);
|
||||
ceph::encode(pool_namespace, payload);
|
||||
ceph::encode(object_size, payload);
|
||||
}
|
||||
|
||||
void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) {
|
||||
void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i,
|
||||
__u8 encode_version) {
|
||||
ceph::decode(read_offset, i);
|
||||
ceph::decode(read_len, i);
|
||||
ceph::decode(pool_id, i);
|
||||
ceph::decode(snap_id, i);
|
||||
ceph::decode(oid, i);
|
||||
ceph::decode(pool_namespace, i);
|
||||
if (encode_version >= 2) {
|
||||
ceph::decode(object_size, i);
|
||||
}
|
||||
}
|
||||
|
||||
ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s,
|
||||
@ -115,7 +123,8 @@ void ObjectCacheReadReplyData::encode_payload() {
|
||||
ceph::encode(cache_path, payload);
|
||||
}
|
||||
|
||||
void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) {
|
||||
void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i,
|
||||
__u8 encode_version) {
|
||||
ceph::decode(cache_path, i);
|
||||
}
|
||||
|
||||
@ -127,7 +136,8 @@ ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {}
|
||||
|
||||
void ObjectCacheReadRadosData::encode_payload() {}
|
||||
|
||||
void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {}
|
||||
void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i,
|
||||
__u8 encode_version) {}
|
||||
|
||||
ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) {
|
||||
ObjectCacheRequest* req = nullptr;
|
||||
|
@ -50,7 +50,8 @@ class ObjectCacheRequest {
|
||||
bufferlist get_payload_bufferlist() { return payload; }
|
||||
|
||||
virtual void encode_payload() = 0;
|
||||
virtual void decode_payload(bufferlist::const_iterator bl_it) = 0;
|
||||
virtual void decode_payload(bufferlist::const_iterator bl_it,
|
||||
__u8 encode_version) = 0;
|
||||
virtual uint16_t get_request_type() = 0;
|
||||
virtual bool payload_empty() = 0;
|
||||
};
|
||||
@ -63,7 +64,8 @@ class ObjectCacheRegData : public ObjectCacheRequest {
|
||||
ObjectCacheRegData(uint16_t t, uint64_t s);
|
||||
~ObjectCacheRegData() override;
|
||||
void encode_payload() override;
|
||||
void decode_payload(bufferlist::const_iterator bl) override;
|
||||
void decode_payload(bufferlist::const_iterator bl,
|
||||
__u8 encode_version) override;
|
||||
uint16_t get_request_type() override { return RBDSC_REGISTER; }
|
||||
bool payload_empty() override { return false; }
|
||||
};
|
||||
@ -74,7 +76,8 @@ class ObjectCacheRegReplyData : public ObjectCacheRequest {
|
||||
ObjectCacheRegReplyData(uint16_t t, uint64_t s);
|
||||
~ObjectCacheRegReplyData() override;
|
||||
void encode_payload() override;
|
||||
void decode_payload(bufferlist::const_iterator iter) override;
|
||||
void decode_payload(bufferlist::const_iterator iter,
|
||||
__u8 encode_version) override;
|
||||
uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; }
|
||||
bool payload_empty() override { return true; }
|
||||
};
|
||||
@ -85,16 +88,18 @@ class ObjectCacheReadData : public ObjectCacheRequest {
|
||||
uint64_t read_len;
|
||||
uint64_t pool_id;
|
||||
uint64_t snap_id;
|
||||
uint64_t object_size = 0;
|
||||
std::string oid;
|
||||
std::string pool_namespace;
|
||||
ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset,
|
||||
uint64_t read_len, uint64_t pool_id,
|
||||
uint64_t snap_id, std::string oid,
|
||||
std::string pool_namespace);
|
||||
uint64_t snap_id, uint64_t object_size,
|
||||
std::string oid, std::string pool_namespace);
|
||||
ObjectCacheReadData(uint16_t t, uint64_t s);
|
||||
~ObjectCacheReadData() override;
|
||||
void encode_payload() override;
|
||||
void decode_payload(bufferlist::const_iterator bl) override;
|
||||
void decode_payload(bufferlist::const_iterator bl,
|
||||
__u8 encode_version) override;
|
||||
uint16_t get_request_type() override { return RBDSC_READ; }
|
||||
bool payload_empty() override { return false; }
|
||||
};
|
||||
@ -106,7 +111,8 @@ class ObjectCacheReadReplyData : public ObjectCacheRequest {
|
||||
ObjectCacheReadReplyData(uint16_t t, uint64_t s);
|
||||
~ObjectCacheReadReplyData() override;
|
||||
void encode_payload() override;
|
||||
void decode_payload(bufferlist::const_iterator bl) override;
|
||||
void decode_payload(bufferlist::const_iterator bl,
|
||||
__u8 encode_version) override;
|
||||
uint16_t get_request_type() override { return RBDSC_READ_REPLY; }
|
||||
bool payload_empty() override { return false; }
|
||||
};
|
||||
@ -117,7 +123,8 @@ class ObjectCacheReadRadosData : public ObjectCacheRequest {
|
||||
ObjectCacheReadRadosData(uint16_t t, uint64_t s);
|
||||
~ObjectCacheReadRadosData() override;
|
||||
void encode_payload() override;
|
||||
void decode_payload(bufferlist::const_iterator bl) override;
|
||||
void decode_payload(bufferlist::const_iterator bl,
|
||||
__u8 encode_version) override;
|
||||
uint16_t get_request_type() override { return RBDSC_READ_RADOS; }
|
||||
bool payload_empty() override { return true; }
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user