mirror of
https://github.com/ceph/ceph
synced 2025-01-03 01:22:53 +00:00
Merge pull request #1999 from yuyuyu101/keyvaluestore-cache
Keyvaluestore cache Reviewed-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
commit
79f3f67491
@ -159,6 +159,7 @@ noinst_HEADERS += \
|
||||
common/OutputDataSocket.h \
|
||||
common/admin_socket.h \
|
||||
common/admin_socket_client.h \
|
||||
common/random_cache.hpp \
|
||||
common/shared_cache.hpp \
|
||||
common/tracked_int_ptr.hpp \
|
||||
common/simple_cache.hpp \
|
||||
|
@ -696,6 +696,7 @@ OPTION(keyvaluestore_op_thread_timeout, OPT_INT, 60)
|
||||
OPTION(keyvaluestore_op_thread_suicide_timeout, OPT_INT, 180)
|
||||
OPTION(keyvaluestore_default_strip_size, OPT_INT, 4096) // Only affect new object
|
||||
OPTION(keyvaluestore_max_expected_write_size, OPT_U64, 1ULL << 24) // bytes
|
||||
OPTION(keyvaluestore_header_cache_size, OPT_INT, 4096) // Header cache size
|
||||
|
||||
// max bytes to search ahead in journal searching for corruption
|
||||
OPTION(journal_max_corrupt_search, OPT_U64, 10<<20)
|
||||
|
115
src/common/random_cache.hpp
Normal file
115
src/common/random_cache.hpp
Normal file
@ -0,0 +1,115 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
|
||||
*
|
||||
* Author: Haomai Wang <haomaiwang@gmail.com>
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef CEPH_RANDOMCACHE_H
|
||||
#define CEPH_RANDOMCACHE_H
|
||||
|
||||
#include "common/Mutex.h"
|
||||
#include "include/compat.h"
|
||||
#include "include/unordered_map.h"
|
||||
|
||||
|
||||
// Although This is a ramdom cache implementation, here still consider to make
|
||||
// the trim progress more reasonable. Each item owns its lookup frequency,
|
||||
// when the cache is full it will randomly pick up several items and compare the
|
||||
// frequency associated with. The least frequency of items will be evicted.
|
||||
template <class K, class V>
|
||||
class RandomCache {
|
||||
// The first element of pair is the frequency of item, it's used to evict item
|
||||
ceph::unordered_map<K, pair<uint64_t, V> > contents;
|
||||
Mutex lock;
|
||||
uint64_t max_size, count;
|
||||
K last_trim_key;
|
||||
|
||||
// When cache reach full, consider to evict a certain number of items
|
||||
static const uint64_t EVICT_COUNT = 5;
|
||||
// Avoid too much overhead on comparing items's frequency, the number of
|
||||
// compare items is expected to small.
|
||||
static const uint64_t COMPARE_COUNT = 3;
|
||||
|
||||
// In order to make evict cache progress more lightweight and effective,
|
||||
// several items are expected to evicted in one call
|
||||
void trim_cache(uint64_t evict_count) {
|
||||
typename ceph::unordered_map<K, pair<uint64_t, V> >::iterator it = contents.find(last_trim_key);
|
||||
uint64_t total_compare = evict_count * COMPARE_COUNT;
|
||||
map<uint64_t, K> candidates;
|
||||
|
||||
while (total_compare--) {
|
||||
if (it == contents.end()) {
|
||||
it = contents.begin();
|
||||
}
|
||||
|
||||
candidates[it->second.first] = it->first;
|
||||
it++;
|
||||
}
|
||||
if (it != contents.end())
|
||||
last_trim_key = it->first;
|
||||
else
|
||||
last_trim_key = contents.begin()->first;
|
||||
|
||||
for (typename map<uint64_t, K>::iterator j = candidates.begin(); j != candidates.end(); j++) {
|
||||
contents.erase(j->second);
|
||||
count--;
|
||||
evict_count--;
|
||||
if (!evict_count)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
RandomCache(size_t max_size=20) : lock("RandomCache::lock"),
|
||||
max_size(max_size), count(0) {}
|
||||
~RandomCache() {
|
||||
contents.clear();
|
||||
count = 0;
|
||||
}
|
||||
|
||||
void clear(K key) {
|
||||
Mutex::Locker l(lock);
|
||||
contents.erase(key);
|
||||
count--;
|
||||
}
|
||||
|
||||
void set_size(size_t new_size) {
|
||||
Mutex::Locker l(lock);
|
||||
max_size = new_size;
|
||||
if (max_size <= count) {
|
||||
trim_cache(count - max_size);
|
||||
}
|
||||
}
|
||||
|
||||
bool lookup(K key, V *out) {
|
||||
Mutex::Locker l(lock);
|
||||
typename ceph::unordered_map<K, pair<uint64_t, V> >::iterator it = contents.find(key);
|
||||
if (it != contents.end()) {
|
||||
it->second.first++;
|
||||
*out = it->second.second;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void add(K key, V value) {
|
||||
Mutex::Locker l(lock);
|
||||
if (max_size <= count) {
|
||||
trim_cache(EVICT_COUNT);
|
||||
}
|
||||
contents[key] = make_pair(1, value);
|
||||
count++;
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
@ -689,8 +689,6 @@ void GenericObjectMap::rename(const Header old_header, const coll_t &cid,
|
||||
old_header->cid = cid;
|
||||
old_header->oid = target;
|
||||
set_header(cid, target, *old_header, t);
|
||||
|
||||
// "in_use" still hold the "seq"
|
||||
}
|
||||
|
||||
int GenericObjectMap::init(bool do_upgrade)
|
||||
@ -926,35 +924,18 @@ GenericObjectMap::Header GenericObjectMap::_lookup_header(
|
||||
to_get.insert(header_key(cid, oid));
|
||||
_Header header;
|
||||
|
||||
while (1) {
|
||||
map<string, bufferlist> out;
|
||||
bool try_again = false;
|
||||
map<string, bufferlist> out;
|
||||
|
||||
int r = db->get(GHOBJECT_TO_SEQ_PREFIX, to_get, &out);
|
||||
if (r < 0)
|
||||
return Header();
|
||||
if (out.empty())
|
||||
return Header();
|
||||
int r = db->get(GHOBJECT_TO_SEQ_PREFIX, to_get, &out);
|
||||
if (r < 0)
|
||||
return Header();
|
||||
if (out.empty())
|
||||
return Header();
|
||||
|
||||
bufferlist::iterator iter = out.begin()->second.begin();
|
||||
header.decode(iter);
|
||||
bufferlist::iterator iter = out.begin()->second.begin();
|
||||
header.decode(iter);
|
||||
|
||||
while (in_use.count(header.seq)) {
|
||||
header_cond.Wait(header_lock);
|
||||
|
||||
// Another thread is hold this header, wait for it.
|
||||
// Because the seq of this object may change, such as clone
|
||||
// and rename operation, here need to look up "seq" again
|
||||
try_again = true;
|
||||
}
|
||||
|
||||
if (!try_again) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Header ret = Header(new _Header(header), RemoveOnDelete(this));
|
||||
in_use.insert(ret->seq);
|
||||
Header ret = Header(new _Header(header));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -962,7 +943,7 @@ GenericObjectMap::Header GenericObjectMap::_generate_new_header(
|
||||
const coll_t &cid, const ghobject_t &oid, Header parent,
|
||||
KeyValueDB::Transaction t)
|
||||
{
|
||||
Header header = Header(new _Header(), RemoveOnDelete(this));
|
||||
Header header = Header(new _Header());
|
||||
header->seq = state.seq++;
|
||||
if (parent) {
|
||||
header->parent = parent->seq;
|
||||
@ -970,8 +951,6 @@ GenericObjectMap::Header GenericObjectMap::_generate_new_header(
|
||||
header->num_children = 1;
|
||||
header->oid = oid;
|
||||
header->cid = cid;
|
||||
assert(!in_use.count(header->seq));
|
||||
in_use.insert(header->seq);
|
||||
|
||||
write_state(t);
|
||||
return header;
|
||||
@ -980,8 +959,6 @@ GenericObjectMap::Header GenericObjectMap::_generate_new_header(
|
||||
GenericObjectMap::Header GenericObjectMap::lookup_parent(Header input)
|
||||
{
|
||||
Mutex::Locker l(header_lock);
|
||||
while (in_use.count(input->parent))
|
||||
header_cond.Wait(header_lock);
|
||||
map<string, bufferlist> out;
|
||||
set<string> keys;
|
||||
keys.insert(PARENT_KEY);
|
||||
@ -999,13 +976,12 @@ GenericObjectMap::Header GenericObjectMap::lookup_parent(Header input)
|
||||
return Header();
|
||||
}
|
||||
|
||||
Header header = Header(new _Header(), RemoveOnDelete(this));
|
||||
Header header = Header(new _Header());
|
||||
header->seq = input->parent;
|
||||
bufferlist::iterator iter = out.begin()->second.begin();
|
||||
header->decode(iter);
|
||||
dout(20) << "lookup_parent: parent seq is " << header->seq << " with parent "
|
||||
<< header->parent << dendl;
|
||||
in_use.insert(header->seq);
|
||||
return header;
|
||||
}
|
||||
|
||||
|
@ -74,12 +74,6 @@ class GenericObjectMap {
|
||||
* Serializes access to next_seq as well as the in_use set
|
||||
*/
|
||||
Mutex header_lock;
|
||||
Cond header_cond;
|
||||
|
||||
/**
|
||||
* Set of headers currently in use
|
||||
*/
|
||||
set<uint64_t> in_use;
|
||||
|
||||
GenericObjectMap(KeyValueDB *db) : db(db), header_lock("GenericObjectMap") {}
|
||||
|
||||
@ -135,6 +129,9 @@ class GenericObjectMap {
|
||||
int submit_transaction(KeyValueDB::Transaction t) {
|
||||
return db->submit_transaction(t);
|
||||
}
|
||||
int submit_transaction_sync(KeyValueDB::Transaction t) {
|
||||
return db->submit_transaction_sync(t);
|
||||
}
|
||||
|
||||
/// persistent state for store @see generate_header
|
||||
struct State {
|
||||
@ -426,26 +423,6 @@ protected:
|
||||
// Sets header @see set_header
|
||||
void _set_header(Header header, const bufferlist &bl,
|
||||
KeyValueDB::Transaction t);
|
||||
|
||||
/**
|
||||
* Removes header seq lock once Header is out of scope
|
||||
* @see _lookup_header
|
||||
* @see lookup_parent
|
||||
* @see generate_new_header
|
||||
*/
|
||||
class RemoveOnDelete {
|
||||
public:
|
||||
GenericObjectMap *db;
|
||||
RemoveOnDelete(GenericObjectMap *db) :
|
||||
db(db) {}
|
||||
void operator() (_Header *header) {
|
||||
Mutex::Locker l(db->header_lock);
|
||||
db->in_use.erase(header->seq);
|
||||
db->header_cond.Signal();
|
||||
delete header;
|
||||
}
|
||||
};
|
||||
friend class RemoveOnDelete;
|
||||
};
|
||||
WRITE_CLASS_ENCODER(GenericObjectMap::_Header)
|
||||
WRITE_CLASS_ENCODER(GenericObjectMap::State)
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -37,6 +37,7 @@ using namespace std;
|
||||
#include "common/Mutex.h"
|
||||
#include "GenericObjectMap.h"
|
||||
#include "KeyValueDB.h"
|
||||
#include "common/random_cache.hpp"
|
||||
|
||||
#include "include/uuid.h"
|
||||
|
||||
@ -94,43 +95,53 @@ class StripObjectMap: public GenericObjectMap {
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
};
|
||||
typedef ceph::shared_ptr<StripObjectHeader> StripObjectHeaderRef;
|
||||
|
||||
static int file_to_extents(uint64_t offset, size_t len, uint64_t strip_size,
|
||||
vector<StripExtent> &extents);
|
||||
int lookup_strip_header(const coll_t & cid, const ghobject_t &oid,
|
||||
StripObjectHeader &header);
|
||||
int save_strip_header(StripObjectHeader &header, KeyValueDB::Transaction t);
|
||||
StripObjectHeaderRef *header);
|
||||
int save_strip_header(StripObjectHeaderRef header, KeyValueDB::Transaction t);
|
||||
int create_strip_header(const coll_t &cid, const ghobject_t &oid,
|
||||
StripObjectHeader &strip_header,
|
||||
StripObjectHeaderRef *strip_header,
|
||||
KeyValueDB::Transaction t);
|
||||
void clone_wrap(StripObjectHeader &old_header,
|
||||
void clone_wrap(StripObjectHeaderRef old_header,
|
||||
const coll_t &cid, const ghobject_t &oid,
|
||||
KeyValueDB::Transaction t,
|
||||
StripObjectHeader *origin_header,
|
||||
StripObjectHeader *target_header);
|
||||
void rename_wrap(const coll_t &cid, const ghobject_t &oid,
|
||||
StripObjectHeaderRef *target_header);
|
||||
void rename_wrap(StripObjectHeaderRef old_header, const coll_t &cid, const ghobject_t &oid,
|
||||
KeyValueDB::Transaction t,
|
||||
StripObjectHeader *header);
|
||||
StripObjectHeaderRef *new_header);
|
||||
// Already hold header to avoid lock header seq again
|
||||
int get_with_header(
|
||||
const StripObjectHeader &header,
|
||||
const StripObjectHeaderRef header,
|
||||
const string &prefix,
|
||||
map<string, bufferlist> *out
|
||||
);
|
||||
|
||||
int get_values_with_header(
|
||||
const StripObjectHeader &header,
|
||||
const StripObjectHeaderRef header,
|
||||
const string &prefix,
|
||||
const set<string> &keys,
|
||||
map<string, bufferlist> *out
|
||||
);
|
||||
int get_keys_with_header(
|
||||
const StripObjectHeader &header,
|
||||
const StripObjectHeaderRef header,
|
||||
const string &prefix,
|
||||
set<string> *keys
|
||||
);
|
||||
|
||||
StripObjectMap(KeyValueDB *db): GenericObjectMap(db) {}
|
||||
Mutex lock;
|
||||
void invalidate_cache(const coll_t &c, const ghobject_t &oid) {
|
||||
Mutex::Locker l(lock);
|
||||
caches.clear(oid);
|
||||
}
|
||||
|
||||
RandomCache<ghobject_t, pair<coll_t, StripObjectHeaderRef> > caches;
|
||||
StripObjectMap(KeyValueDB *db): GenericObjectMap(db),
|
||||
lock("StripObjectMap::lock"),
|
||||
caches(g_conf->keyvaluestore_header_cache_size)
|
||||
{}
|
||||
};
|
||||
|
||||
|
||||
@ -222,37 +233,49 @@ class KeyValueStore : public ObjectStore,
|
||||
// 4. Clone or rename
|
||||
struct BufferTransaction {
|
||||
typedef pair<coll_t, ghobject_t> uniq_id;
|
||||
typedef map<uniq_id, StripObjectMap::StripObjectHeader> StripHeaderMap;
|
||||
typedef map<uniq_id, StripObjectMap::StripObjectHeaderRef> StripHeaderMap;
|
||||
|
||||
//Dirty records
|
||||
StripHeaderMap strip_headers;
|
||||
list<Context*> finishes;
|
||||
|
||||
KeyValueStore *store;
|
||||
|
||||
KeyValueDB::Transaction t;
|
||||
|
||||
int lookup_cached_header(const coll_t &cid, const ghobject_t &oid,
|
||||
StripObjectMap::StripObjectHeader **strip_header,
|
||||
StripObjectMap::StripObjectHeaderRef *strip_header,
|
||||
bool create_if_missing);
|
||||
int get_buffer_keys(StripObjectMap::StripObjectHeader &strip_header,
|
||||
int get_buffer_keys(StripObjectMap::StripObjectHeaderRef strip_header,
|
||||
const string &prefix, const set<string> &keys,
|
||||
map<string, bufferlist> *out);
|
||||
void set_buffer_keys(StripObjectMap::StripObjectHeader &strip_header,
|
||||
void set_buffer_keys(StripObjectMap::StripObjectHeaderRef strip_header,
|
||||
const string &prefix, map<string, bufferlist> &bl);
|
||||
int remove_buffer_keys(StripObjectMap::StripObjectHeader &strip_header,
|
||||
int remove_buffer_keys(StripObjectMap::StripObjectHeaderRef strip_header,
|
||||
const string &prefix, const set<string> &keys);
|
||||
void clear_buffer_keys(StripObjectMap::StripObjectHeader &strip_header,
|
||||
void clear_buffer_keys(StripObjectMap::StripObjectHeaderRef strip_header,
|
||||
const string &prefix);
|
||||
int clear_buffer(StripObjectMap::StripObjectHeader &strip_header);
|
||||
void clone_buffer(StripObjectMap::StripObjectHeader &old_header,
|
||||
int clear_buffer(StripObjectMap::StripObjectHeaderRef strip_header);
|
||||
void clone_buffer(StripObjectMap::StripObjectHeaderRef old_header,
|
||||
const coll_t &cid, const ghobject_t &oid);
|
||||
void rename_buffer(StripObjectMap::StripObjectHeader &old_header,
|
||||
void rename_buffer(StripObjectMap::StripObjectHeaderRef old_header,
|
||||
const coll_t &cid, const ghobject_t &oid);
|
||||
int submit_transaction();
|
||||
|
||||
BufferTransaction(KeyValueStore *store): store(store) {
|
||||
t = store->backend->get_transaction();
|
||||
}
|
||||
|
||||
struct InvalidateCacheContext : public Context {
|
||||
KeyValueStore *store;
|
||||
const coll_t cid;
|
||||
const ghobject_t oid;
|
||||
InvalidateCacheContext(KeyValueStore *s, const coll_t &c, const ghobject_t &oid): store(s), cid(c), oid(oid) {}
|
||||
void finish(int r) {
|
||||
if (r == 0)
|
||||
store->backend->invalidate_cache(cid, oid);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
// -- op workqueue --
|
||||
@ -506,10 +529,10 @@ class KeyValueStore : public ObjectStore,
|
||||
// ------------------
|
||||
// objects
|
||||
|
||||
int _generic_read(StripObjectMap::StripObjectHeader &header,
|
||||
int _generic_read(StripObjectMap::StripObjectHeaderRef header,
|
||||
uint64_t offset, size_t len, bufferlist& bl,
|
||||
bool allow_eio = false, BufferTransaction *bt = 0);
|
||||
int _generic_write(StripObjectMap::StripObjectHeader &header,
|
||||
int _generic_write(StripObjectMap::StripObjectHeaderRef header,
|
||||
uint64_t offset, size_t len, const bufferlist& bl,
|
||||
BufferTransaction &t, bool replica = false);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user