Merge pull request #7320 from yuyuyu101/kill-keyvaluestore

os/keyvaluestore: kill KeyValueStore

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2016-01-27 13:07:13 -05:00
commit 27784ff34f
13 changed files with 6 additions and 5404 deletions

View File

@ -1,94 +0,0 @@
===============================
KeyValueStore Config Reference
===============================
``KeyValueStore`` is an alternative OSD backend compared to FileStore.
Currently, it uses LevelDB as backend. ``KeyValueStore`` doesn't need journal
device. Each operation will flush into the backend directly.
``keyvaluestore backend``
:Description: The backend used by ``KeyValueStore``.
:Type: String
:Required: No
:Default: ``leveldb``
.. index:: keyvaluestore; queue
Queue
=====
The following settings provide limits on the size of the ``KeyValueStore``
queue.
``keyvaluestore queue max ops``
:Description: Defines the maximum number of operations in progress the
``KeyValueStore`` accepts before blocking on queuing new operations.
:Type: Integer
:Required: No. Minimal impact on performance.
:Default: ``50``
``keyvaluestore queue max bytes``
:Description: The maximum number of bytes for an operation.
:Type: Integer
:Required: No
:Default: ``100 << 20``
.. index:: keyvaluestore; thread
Thread
========
``keyvaluestore op threads``
:Description: The number of ``KeyValueStore`` operation threads that execute in parallel.
:Type: Integer
:Required: No
:Default: ``2``
``keyvaluestore op thread timeout``
:Description: The timeout for a ``KeyValueStore`` operation thread (in seconds).
:Type: Integer
:Required: No
:Default: ``60``
``keyvaluestore op thread suicide timeout``
:Description: The timeout for a commit operation before canceling the commit (in seconds).
:Type: Integer
:Required: No
:Default: ``180``
Misc
====
``keyvaluestore default strip size``
:Description: Each object will be split into multiple key/value pairs and
stored in the backend. **Note:** The size of the workload has
a significant impact on performance.
:Type: Integer
:Required: No
:Default: ``4096``
``keyvaluestore header cache size``
:Description: The size of the header cache (identical to ``inode`` in the local
filesystem). A larger cache size enhances performance.
:Type: Integer
:Required: No
:Default: ``4096``

View File

@ -639,8 +639,6 @@ set(libos_srcs
os/filestore/LFNIndex.cc
os/filestore/WBThrottle.cc
os/filestore/ZFSFileStoreBackend.cc
os/keyvaluestore/GenericObjectMap.cc
os/keyvaluestore/KeyValueStore.cc
os/memstore/MemStore.cc
os/kstore/KStore.cc
os/kstore/kstore_types.cc

View File

@ -117,7 +117,6 @@ SUBSYS(osd, 0, 5)
SUBSYS(optracker, 0, 5)
SUBSYS(objclass, 0, 5)
SUBSYS(filestore, 1, 3)
SUBSYS(keyvaluestore, 1, 3)
SUBSYS(journal, 1, 3)
SUBSYS(ms, 0, 5)
SUBSYS(mon, 1, 5)
@ -144,6 +143,7 @@ SUBSYS(bdev, 1, 3)
SUBSYS(kstore, 1, 5)
SUBSYS(rocksdb, 4, 5)
SUBSYS(leveldb, 4, 5)
SUBSYS(kinetic, 1, 5)
OPTION(key, OPT_STR, "")
OPTION(keyfile, OPT_STR, "")
@ -788,8 +788,6 @@ OPTION(kinetic_use_ssl, OPT_BOOL, false) // whether to secure kinetic traffic wi
OPTION(rocksdb_separate_wal_dir, OPT_BOOL, false) // use $path.wal for wal
OPTION(rocksdb_db_paths, OPT_STR, "") // path,size( path,size)*
OPTION(rocksdb_log_to_ceph_log, OPT_BOOL, true) // log to ceph log
// rocksdb options that will be used for keyvaluestore(if backend is rocksdb)
OPTION(keyvaluestore_rocksdb_options, OPT_STR, "")
// rocksdb options that will be used for omap(if omap_backend is rocksdb)
OPTION(filestore_rocksdb_options, OPT_STR, "")
// rocksdb options that will be used in monstore
@ -1014,18 +1012,6 @@ OPTION(journal_dio, OPT_BOOL, true)
OPTION(journal_aio, OPT_BOOL, true)
OPTION(journal_force_aio, OPT_BOOL, false)
OPTION(keyvaluestore_queue_max_ops, OPT_INT, 50)
OPTION(keyvaluestore_queue_max_bytes, OPT_INT, 100 << 20)
OPTION(keyvaluestore_debug_check_backend, OPT_BOOL, 0) // Expensive debugging check on sync
OPTION(keyvaluestore_op_threads, OPT_INT, 2)
OPTION(keyvaluestore_op_thread_timeout, OPT_INT, 60)
OPTION(keyvaluestore_op_thread_suicide_timeout, OPT_INT, 180)
OPTION(keyvaluestore_default_strip_size, OPT_INT, 4096) // Only affect new object
OPTION(keyvaluestore_max_expected_write_size, OPT_U64, 1ULL << 24) // bytes
OPTION(keyvaluestore_header_cache_size, OPT_INT, 4096) // Header cache size
OPTION(keyvaluestore_backend, OPT_STR, "leveldb")
OPTION(keyvaluestore_dump_file, OPT_STR, "") // file onto which store transaction dumps
// max bytes to search ahead in journal searching for corruption
OPTION(journal_max_corrupt_search, OPT_U64, 10<<20)
OPTION(journal_block_align, OPT_BOOL, true)

View File

@ -11,7 +11,7 @@
using std::string;
#include "common/perf_counters.h"
#define dout_subsys ceph_subsys_keyvaluestore
#define dout_subsys ceph_subsys_kinetic
int KineticStore::init()
{

View File

@ -24,8 +24,6 @@ libos_a_SOURCES = \
os/filestore/LFNIndex.cc \
os/filestore/WBThrottle.cc \
os/fs/FS.cc \
os/keyvaluestore/GenericObjectMap.cc \
os/keyvaluestore/KeyValueStore.cc \
os/kstore/kv.cc \
os/kstore/KStore.cc \
os/memstore/MemStore.cc \
@ -86,8 +84,6 @@ noinst_HEADERS += \
os/fs/btrfs_ioctl.h \
os/fs/FS.h \
os/fs/XFS.h \
os/keyvaluestore/GenericObjectMap.h \
os/keyvaluestore/KeyValueStore.h \
os/kstore/kstore_types.h \
os/kstore/KStore.h \
os/kstore/kv.h \

View File

@ -20,7 +20,6 @@
#include "filestore/FileStore.h"
#include "memstore/MemStore.h"
#include "keyvaluestore/KeyValueStore.h"
#if defined(HAVE_LIBAIO)
#include "bluestore/BlueStore.h"
#endif
@ -73,10 +72,6 @@ ObjectStore *ObjectStore::create(CephContext *cct,
if (type == "memstore") {
return new MemStore(cct, data);
}
if (type == "keyvaluestore" &&
cct->check_experimental_feature_enabled("keyvaluestore")) {
return new KeyValueStore(data);
}
#if defined(HAVE_LIBAIO)
if (type == "bluestore" &&
cct->check_experimental_feature_enabled("bluestore")) {

File diff suppressed because it is too large Load Diff

View File

@ -1,429 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2013 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_GENERICOBJECTMAP_H
#define CEPH_GENERICOBJECTMAP_H
#include "include/buffer.h"
#include <set>
#include <map>
#include <string>
#include <vector>
#include <boost/scoped_ptr.hpp>
#include "include/memory.h"
#include "os/ObjectMap.h"
#include "kv/KeyValueDB.h"
#include "osd/osd_types.h"
#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/simple_cache.hpp"
/**
* Genericobjectmap: Provide with key/value associated to ghobject_t APIs to caller
* and avoid concerning too much. Wrap and combine KeyValueDB/ObjectMap APIs
* with ghobject_t and adding clone capacity.
*
* Prefix space structure:
*
* - GHOBJECT_TO_SEQ: Contains leaf mapping from ghobject_t->Header(including
* hobj.seq and related metadata)
* - INTERN_PREFIX: GLOBAL_STATE_KEY - contains the global state
* @see State
* @see write_state
* @see init
* @see generate_new_header
* - INTERN_PREFIX + header_key(header->seq) + COMPLETE_PREFIX: see below
* - INTERN_PREFIX + header_key(header->seq) + PARENT_KEY
* : used to store parent header(same as headers in GHOBJECT_TO_SEQ)
* - USER_PREFIX + header_key(header->seq) + [CUSTOM_PREFIX]
* : key->value which set by callers
*
* For each node (represented by a header), we
* store three mappings: the key mapping, the complete mapping, and the parent.
* The complete mapping (COMPLETE_PREFIX space) is key->key. Each x->y entry in
* this mapping indicates that the key mapping contains all entries on [x,y).
* Note, max string is represented by "", so ""->"" indicates that the parent
* is unnecessary (@see rm_keys). When looking up a key not contained in the
* the complete set, we have to check the parent if we don't find it in the
* key set. During rm_keys, we copy keys from the parent and update the
* complete set to reflect the change @see rm_keys.
*/
// This class only provide basic read capacity, suggest inherit it to
// implement write transaction to use it. @see StripObjectMap
class GenericObjectMap {
public:
boost::scoped_ptr<KeyValueDB> db;
/**
* Serializes access to next_seq as well as the in_use set
*/
Mutex header_lock;
GenericObjectMap(KeyValueDB *db) : db(db), header_lock("GenericObjectMap") {}
int get(
const coll_t &cid,
const ghobject_t &oid,
const string &prefix,
map<string, bufferlist> *out
);
int get_keys(
const coll_t &cid,
const ghobject_t &oid,
const string &prefix,
set<string> *keys
);
int get_values(
const coll_t &cid,
const ghobject_t &oid,
const string &prefix,
const set<string> &keys,
map<string, bufferlist> *out
);
int check_keys(
const coll_t &cid,
const ghobject_t &oid,
const string &prefix,
const set<string> &keys,
set<string> *out
);
/// Read initial state from backing store
int init(bool upgrade = false);
/// Upgrade store to current version
int upgrade() {return 0;}
/// Consistency check, debug, there must be no parallel writes
bool check(std::ostream &out);
/// Util, list all objects, there must be no other concurrent access
int list_objects(const coll_t &cid, ghobject_t start, ghobject_t end, int max,
vector<ghobject_t> *objs, ///< [out] objects
ghobject_t *next);
ObjectMap::ObjectMapIterator get_iterator(const coll_t &cid,
const ghobject_t &oid,
const string &prefix);
KeyValueDB::Transaction get_transaction() { return db->get_transaction(); }
int submit_transaction(KeyValueDB::Transaction t) {
return db->submit_transaction(t);
}
int submit_transaction_sync(KeyValueDB::Transaction t) {
return db->submit_transaction_sync(t);
}
/// persistent state for store @see generate_header
struct State {
__u8 v;
uint64_t seq;
State() : v(0), seq(1) {}
State(uint64_t seq) : v(0), seq(seq) {}
void encode(bufferlist &bl) const {
ENCODE_START(1, 1, bl);
::encode(v, bl);
::encode(seq, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
DECODE_START(1, bl);
::decode(v, bl);
::decode(seq, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const {
f->dump_unsigned("seq", seq);
}
static void generate_test_instances(list<State*> &o) {
o.push_back(new State(0));
o.push_back(new State(20));
}
} state;
struct _Header {
uint64_t seq;
uint64_t parent;
uint64_t num_children;
coll_t cid;
ghobject_t oid;
// Used by successor
bufferlist data;
void encode(bufferlist &bl) const {
ENCODE_START(1, 1, bl);
::encode(seq, bl);
::encode(parent, bl);
::encode(num_children, bl);
::encode(cid, bl);
::encode(oid, bl);
::encode(data, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
DECODE_START(1, bl);
::decode(seq, bl);
::decode(parent, bl);
::decode(num_children, bl);
::decode(cid, bl);
::decode(oid, bl);
::decode(data, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const {
f->dump_unsigned("seq", seq);
f->dump_unsigned("parent", parent);
f->dump_unsigned("num_children", num_children);
f->dump_stream("coll") << cid;
f->dump_stream("oid") << oid;
}
_Header() : seq(0), parent(0), num_children(1) {}
};
typedef ceph::shared_ptr<_Header> Header;
Header lookup_header(const coll_t &cid, const ghobject_t &oid) {
Mutex::Locker l(header_lock);
return _lookup_header(cid, oid);
}
/// Lookup or create header for c oid
Header lookup_create_header(const coll_t &cid, const ghobject_t &oid,
KeyValueDB::Transaction t);
/// Set leaf node for c and oid to the value of header
void set_header(const coll_t &cid, const ghobject_t &oid, _Header &header,
KeyValueDB::Transaction t);
// Move all modify member function to "protect", in order to indicate these
// should be made use of by sub-class
void set_keys(
const Header header,
const string &prefix,
const map<string, bufferlist> &set,
KeyValueDB::Transaction t
);
int clear(
const Header header,
KeyValueDB::Transaction t
);
int rm_keys(
const Header header,
const string &prefix,
const set<string> &buffered_keys,
const set<string> &to_clear,
KeyValueDB::Transaction t
);
void clone(
const Header origin_header,
const coll_t &cid,
const ghobject_t &target,
KeyValueDB::Transaction t,
Header *old_header,
Header *new_header
);
void rename(
const Header header,
const coll_t &cid,
const ghobject_t &target,
KeyValueDB::Transaction t
);
static const string GLOBAL_STATE_KEY;
static const string PARENT_KEY;
static const string USER_PREFIX;
static const string INTERN_PREFIX;
static const string PARENT_PREFIX;
static const string COMPLETE_PREFIX;
static const string GHOBJECT_TO_SEQ_PREFIX;
static const string GHOBJECT_KEY_SEP_S;
static const char GHOBJECT_KEY_SEP_C;
static const char GHOBJECT_KEY_ENDING;
private:
/// Implicit lock on Header->seq
static string header_key(const coll_t &cid);
static string header_key(const coll_t &cid, const ghobject_t &oid);
static bool parse_header_key(const string &in, coll_t *c, ghobject_t *oid);
string seq_key(uint64_t seq) {
char buf[100];
snprintf(buf, sizeof(buf), "%.*" PRId64, (int)(2*sizeof(seq)), seq);
return string(buf);
}
string user_prefix(Header header, const string &prefix);
string complete_prefix(Header header);
string parent_seq_prefix(uint64_t seq);
class EmptyIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
public:
int seek_to_first() { return 0; }
int seek_to_last() { return 0; }
int upper_bound(const string &after) { return 0; }
int lower_bound(const string &to) { return 0; }
bool valid() { return false; }
int next(bool validate=true) { assert(0); return 0; }
string key() { assert(0); return ""; }
bufferlist value() { assert(0); return bufferlist(); }
int status() { return 0; }
};
/// Iterator
class GenericObjectMapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
public:
GenericObjectMap *map;
/// NOTE: implicit lock on header->seq AND for all ancestors
Header header;
/// parent_iter == NULL iff no parent
ceph::shared_ptr<GenericObjectMapIteratorImpl> parent_iter;
KeyValueDB::Iterator key_iter;
KeyValueDB::Iterator complete_iter;
/// cur_iter points to currently valid iterator
ceph::shared_ptr<ObjectMap::ObjectMapIteratorImpl> cur_iter;
int r;
/// init() called, key_iter, complete_iter, parent_iter filled in
bool ready;
/// past end
bool invalid;
string prefix;
GenericObjectMapIteratorImpl(GenericObjectMap *map, Header header,
const string &_prefix) : map(map), header(header), r(0), ready(false),
invalid(true), prefix(_prefix) { }
int seek_to_first();
int seek_to_last();
int upper_bound(const string &after);
int lower_bound(const string &to);
bool valid();
int next(bool validate=true);
string key();
bufferlist value();
int status();
bool on_parent() {
return cur_iter == parent_iter;
}
/// skips to next valid parent entry
int next_parent();
/// Tests whether to_test is in complete region
int in_complete_region(const string &to_test, ///[in] key to test
string *begin, ///[out] beginning of region
string *end ///[out] end of region
); ///< @returns true if to_test is in the complete region, else false
private:
int init();
bool valid_parent();
int adjust();
};
protected:
typedef ceph::shared_ptr<GenericObjectMapIteratorImpl> GenericObjectMapIterator;
GenericObjectMapIterator _get_iterator(Header header, string prefix) {
return GenericObjectMapIterator(new GenericObjectMapIteratorImpl(this, header, prefix));
}
Header generate_new_header(const coll_t &cid, const ghobject_t &oid,
Header parent, KeyValueDB::Transaction t) {
Mutex::Locker l(header_lock);
return _generate_new_header(cid, oid, parent, t);
}
// Scan keys in header into out_keys and out_values (if nonnull)
int scan(Header header, const string &prefix, const set<string> &in_keys,
set<string> *out_keys, map<string, bufferlist> *out_values);
private:
/// Removes node corresponding to header
void clear_header(Header header, KeyValueDB::Transaction t);
/// Set node containing input to new contents
void set_parent_header(Header input, KeyValueDB::Transaction t);
/// Remove leaf node corresponding to oid in c
void remove_header(const coll_t &cid, const ghobject_t &oid, Header header,
KeyValueDB::Transaction t);
/**
* Generate new header for c oid with new seq number
*
* Has the side effect of syncronously saving the new GenericObjectMap state
*/
Header _generate_new_header(const coll_t &cid, const ghobject_t &oid,
Header parent, KeyValueDB::Transaction t);
// Lookup leaf header for c oid
Header _lookup_header(const coll_t &cid, const ghobject_t &oid);
// Lookup header node for input
Header lookup_parent(Header input);
// Remove header and all related prefixes
int _clear(Header header, KeyValueDB::Transaction t);
// Adds to t operations necessary to add new_complete to the complete set
int merge_new_complete(Header header, const map<string, string> &new_complete,
GenericObjectMapIterator iter, KeyValueDB::Transaction t);
// Writes out State (mainly next_seq)
int write_state(KeyValueDB::Transaction _t);
// 0 if the complete set now contains all of key space, < 0 on error, 1 else
int need_parent(GenericObjectMapIterator iter);
// Copies header entry from parent @see rm_keys
int copy_up_header(Header header, KeyValueDB::Transaction t);
// Sets header @see set_header
void _set_header(Header header, const bufferlist &bl,
KeyValueDB::Transaction t);
};
WRITE_CLASS_ENCODER(GenericObjectMap::_Header)
WRITE_CLASS_ENCODER(GenericObjectMap::State)
#endif

File diff suppressed because it is too large Load Diff

View File

@ -1,700 +0,0 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2013 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_KEYVALUESTORE_H
#define CEPH_KEYVALUESTORE_H
#include "include/types.h"
#include <map>
#include <deque>
#include <boost/scoped_ptr.hpp>
#include <fstream>
using namespace std;
#include "include/assert.h"
#include "os/ObjectStore.h"
#include "common/WorkQueue.h"
#include "common/Finisher.h"
#include "common/fd.h"
#include "common/Mutex.h"
#include "GenericObjectMap.h"
#include "kv/KeyValueDB.h"
#include "common/random_cache.hpp"
#include "include/uuid.h"
static uint64_t default_strip_size = 1024;
class StripObjectMap: public GenericObjectMap {
public:
struct StripExtent {
uint64_t no;
uint64_t offset; // in key
uint64_t len; // in key
StripExtent(uint64_t n, uint64_t off, size_t len):
no(n), offset(off), len(len) {}
};
// -- strip object --
struct StripObjectHeader {
// Persistent state
uint64_t strip_size;
uint64_t max_size;
vector<char> bits;
// soft state
Header header; // FIXME: Hold lock to avoid concurrent operations, it will
// also block read operation which not should be permitted.
coll_t cid;
ghobject_t oid;
bool updated;
bool deleted;
StripObjectHeader(): strip_size(default_strip_size), max_size(0), updated(false), deleted(false) {}
void encode(bufferlist &bl) const {
ENCODE_START(1, 1, bl);
::encode(strip_size, bl);
::encode(max_size, bl);
::encode(bits, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
DECODE_START(1, bl);
::decode(strip_size, bl);
::decode(max_size, bl);
::decode(bits, bl);
DECODE_FINISH(bl);
}
};
typedef ceph::shared_ptr<StripObjectHeader> StripObjectHeaderRef;
static int file_to_extents(uint64_t offset, size_t len, uint64_t strip_size,
vector<StripExtent> &extents);
int lookup_strip_header(const coll_t & cid, const ghobject_t &oid,
StripObjectHeaderRef *header);
int save_strip_header(StripObjectHeaderRef header, KeyValueDB::Transaction t);
int create_strip_header(const coll_t &cid, const ghobject_t &oid,
StripObjectHeaderRef *strip_header,
KeyValueDB::Transaction t);
void clone_wrap(StripObjectHeaderRef old_header,
const coll_t &cid, const ghobject_t &oid,
KeyValueDB::Transaction t,
StripObjectHeaderRef *target_header);
void rename_wrap(StripObjectHeaderRef old_header, const coll_t &cid, const ghobject_t &oid,
KeyValueDB::Transaction t,
StripObjectHeaderRef *new_header);
// Already hold header to avoid lock header seq again
int get_with_header(
const StripObjectHeaderRef header,
const string &prefix,
map<string, bufferlist> *out
);
int get_values_with_header(
const StripObjectHeaderRef header,
const string &prefix,
const set<string> &keys,
map<string, bufferlist> *out
);
int get_keys_with_header(
const StripObjectHeaderRef header,
const string &prefix,
set<string> *keys
);
Mutex lock;
void invalidate_cache(const coll_t &c, const ghobject_t &oid) {
Mutex::Locker l(lock);
caches.clear(oid);
}
RandomCache<ghobject_t, pair<coll_t, StripObjectHeaderRef> > caches;
StripObjectMap(KeyValueDB *db): GenericObjectMap(db),
lock("StripObjectMap::lock"),
caches(g_conf->keyvaluestore_header_cache_size)
{}
};
class KVSuperblock {
public:
CompatSet compat_features;
string backend;
KVSuperblock() { }
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &bl);
void dump(Formatter *f) const;
static void generate_test_instances(list<KVSuperblock*>& o);
};
WRITE_CLASS_ENCODER(KVSuperblock)
inline ostream& operator<<(ostream& out, const KVSuperblock& sb)
{
return out << "sb(" << sb.compat_features << " " << sb.backend << ")";
}
class KeyValueStore : public ObjectStore,
public md_config_obs_t {
public:
struct KVPerfTracker {
PerfCounters::avg_tracker<uint64_t> os_commit_latency;
PerfCounters::avg_tracker<uint64_t> os_apply_latency;
objectstore_perf_stat_t get_cur_stats() const {
objectstore_perf_stat_t ret;
ret.filestore_commit_latency = os_commit_latency.avg();
ret.filestore_apply_latency = os_apply_latency.avg();
return ret;
}
void update_from_perfcounters(PerfCounters &logger) {
os_commit_latency.consume_next(
logger.get_tavg_ms(
l_os_commit_lat));
os_apply_latency.consume_next(
logger.get_tavg_ms(
l_os_apply_lat));
}
} perf_tracker;
objectstore_perf_stat_t get_cur_stats() {
perf_tracker.update_from_perfcounters(*perf_logger);
return perf_tracker.get_cur_stats();
}
static const uint32_t target_version = 1;
private:
string internal_name; // internal name, used to name the perfcounter instance
string basedir;
std::string current_fn;
uuid_d fsid;
int fsid_fd, current_fd;
deque<uint64_t> snaps;
// ObjectMap
boost::scoped_ptr<StripObjectMap> backend;
Finisher ondisk_finisher;
RWLock collections_lock;
set<coll_t> collections;
Mutex lock;
int _create_current();
/// read a uuid from fd
int read_fsid(int fd, uuid_d *uuid);
/// lock fsid_fd
int lock_fsid();
string strip_object_key(uint64_t no) {
char n[100];
snprintf(n, 100, "%08lld", (long long)no);
return string(n);
}
// Each transaction has side effect which may influent the following
// operations, we need to make it visible for the following within
// transaction by caching middle result.
// Side effects contains:
// 1. Creating/Deleting collection
// 2. Creating/Deleting object
// 3. Object modify(including omap, xattr)
// 4. Clone or rename
struct BufferTransaction {
typedef pair<coll_t, ghobject_t> uniq_id;
struct CollGhobjectPairBitwiseComparator {
bool operator()(const uniq_id& l,
const uniq_id& r) const {
if (l.first < r.first)
return true;
if (l.first != r.first)
return false;
if (cmp_bitwise(l.second, r.second) < 0)
return true;
return false;
}
};
typedef map<uniq_id, StripObjectMap::StripObjectHeaderRef,
CollGhobjectPairBitwiseComparator> StripHeaderMap;
//Dirty records
StripHeaderMap strip_headers;
map< uniq_id, map<pair<string, string>, bufferlist>,
CollGhobjectPairBitwiseComparator> buffers; // pair(prefix, key),to buffer updated data in one transaction
list<Context*> finishes;
KeyValueStore *store;
KeyValueDB::Transaction t;
void set_collections(const set<coll_t>& collections) {
bufferlist collections_bl;
::encode(collections, collections_bl);
t->set("meta", "collections", collections_bl);
}
int lookup_cached_header(const coll_t &cid, const ghobject_t &oid,
StripObjectMap::StripObjectHeaderRef *strip_header,
bool create_if_missing);
int get_buffer_keys(StripObjectMap::StripObjectHeaderRef strip_header,
const string &prefix, const set<string> &keys,
map<string, bufferlist> *out);
void set_buffer_keys(StripObjectMap::StripObjectHeaderRef strip_header,
const string &prefix, map<string, bufferlist> &bl);
int remove_buffer_keys(StripObjectMap::StripObjectHeaderRef strip_header,
const string &prefix, const set<string> &keys);
void clear_buffer_keys(StripObjectMap::StripObjectHeaderRef strip_header,
const string &prefix);
int clear_buffer(StripObjectMap::StripObjectHeaderRef strip_header);
void clone_buffer(StripObjectMap::StripObjectHeaderRef old_header,
const coll_t &cid, const ghobject_t &oid);
void rename_buffer(StripObjectMap::StripObjectHeaderRef old_header,
const coll_t &cid, const ghobject_t &oid);
int submit_transaction();
BufferTransaction(KeyValueStore *store): store(store) {
t = store->backend->get_transaction();
}
struct InvalidateCacheContext : public Context {
KeyValueStore *store;
const coll_t cid;
const ghobject_t oid;
InvalidateCacheContext(KeyValueStore *s, const coll_t &c, const ghobject_t &oid): store(s), cid(c), oid(oid) {}
void finish(int r) {
if (r == 0)
store->backend->invalidate_cache(cid, oid);
}
};
};
// -- op workqueue --
struct Op {
utime_t start;
uint64_t op;
list<Transaction*> tls;
Context *ondisk, *onreadable, *onreadable_sync;
uint64_t ops, bytes;
TrackedOpRef osd_op;
};
class OpSequencer : public Sequencer_impl {
Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
list<Op*> q;
Cond cond;
list<pair<uint64_t, Context*> > flush_commit_waiters;
uint64_t op; // used by flush() to know the sequence of op
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
/// get_max_uncompleted
bool _get_max_uncompleted(
uint64_t *seq ///< [out] max uncompleted seq
) {
assert(qlock.is_locked());
assert(seq);
*seq = 0;
if (q.empty()) {
return true;
} else {
*seq = q.back()->op;
return false;
}
} /// @returns true if the queue is empty
/// get_min_uncompleted
bool _get_min_uncompleted(
uint64_t *seq ///< [out] min uncompleted seq
) {
assert(qlock.is_locked());
assert(seq);
*seq = 0;
if (q.empty()) {
return true;
} else {
*seq = q.front()->op;
return false;
}
} /// @returns true if both queues are empty
void _wake_flush_waiters(list<Context*> *to_queue) {
uint64_t seq;
if (_get_min_uncompleted(&seq))
seq = -1;
for (list<pair<uint64_t, Context*> >::iterator i =
flush_commit_waiters.begin();
i != flush_commit_waiters.end() && i->first < seq;
flush_commit_waiters.erase(i++)) {
to_queue->push_back(i->second);
}
}
void queue(Op *o) {
Mutex::Locker l(qlock);
q.push_back(o);
op++;
o->op = op;
}
Op *peek_queue() {
assert(apply_lock.is_locked());
return q.front();
}
Op *dequeue(list<Context*> *to_queue) {
assert(to_queue);
assert(apply_lock.is_locked());
Mutex::Locker l(qlock);
Op *o = q.front();
q.pop_front();
cond.Signal();
_wake_flush_waiters(to_queue);
return o;
}
void flush() {
Mutex::Locker l(qlock);
// get max for journal _or_ op queues
uint64_t seq = 0;
if (!q.empty())
seq = q.back()->op;
if (seq) {
// everything prior to our watermark to drain through either/both
// queues
while (!q.empty() && q.front()->op <= seq)
cond.Wait(qlock);
}
}
bool flush_commit(Context *c) {
Mutex::Locker l(qlock);
uint64_t seq = 0;
if (_get_max_uncompleted(&seq)) {
return true;
} else {
flush_commit_waiters.push_back(make_pair(seq, c));
return false;
}
}
OpSequencer()
: qlock("KeyValueStore::OpSequencer::qlock", false, false),
op(0), parent(0),
apply_lock("KeyValueStore::OpSequencer::apply_lock", false, false) {}
~OpSequencer() {
assert(q.empty());
}
const string& get_name() const {
return parent->get_name();
}
};
friend ostream& operator<<(ostream& out, const OpSequencer& s);
deque<OpSequencer*> op_queue;
Throttle throttle_ops, throttle_bytes;
Finisher op_finisher;
ThreadPool op_tp;
struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
KeyValueStore *store;
OpWQ(KeyValueStore *fs, time_t timeout, time_t suicide_timeout,
ThreadPool *tp) :
ThreadPool::WorkQueue<OpSequencer>("KeyValueStore::OpWQ",
timeout, suicide_timeout, tp),
store(fs) {}
bool _enqueue(OpSequencer *osr) {
store->op_queue.push_back(osr);
return true;
}
void _dequeue(OpSequencer *o) {
assert(0);
}
bool _empty() {
return store->op_queue.empty();
}
OpSequencer *_dequeue() {
if (store->op_queue.empty())
return NULL;
OpSequencer *osr = store->op_queue.front();
store->op_queue.pop_front();
return osr;
}
using ThreadPool::WorkQueue<OpSequencer>::_process;
void _process(OpSequencer *osr, ThreadPool::TPHandle &handle) {
store->_do_op(osr, handle);
}
void _process_finish(OpSequencer *osr) {
store->_finish_op(osr);
}
void _clear() {
assert(store->op_queue.empty());
}
} op_wq;
Op *build_op(list<Transaction*>& tls, Context *ondisk, Context *onreadable,
Context *onreadable_sync, TrackedOpRef osd_op);
void queue_op(OpSequencer *osr, Op *o);
void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL);
void _do_op(OpSequencer *osr, ThreadPool::TPHandle &handle);
void op_queue_release_throttle(Op *o);
void _finish_op(OpSequencer *osr);
PerfCounters *perf_logger;
public:
KeyValueStore(const std::string &base,
const char *internal_name = "keyvaluestore",
bool update_to=false);
~KeyValueStore();
bool test_mount_in_use();
int version_stamp_is_valid(uint32_t *version);
int update_version_stamp();
uint32_t get_target_version() {
return target_version;
}
int write_version_stamp();
int mount();
int umount();
unsigned get_max_object_name_length() {
return 4096; // no real limit for leveldb
}
unsigned get_max_attr_name_length() {
return 256; // arbitrary; there is no real limit internally
}
int mkfs();
int mkjournal() {return 0;}
bool wants_journal() {
return false;
}
bool allows_journal() {
return false;
}
bool needs_journal() {
return false;
}
void collect_metadata(map<string,string> *pm);
int statfs(struct statfs *buf);
int _do_transactions(
list<Transaction*> &tls, uint64_t op_seq,
ThreadPool::TPHandle *handle);
int do_transactions(list<Transaction*> &tls, uint64_t op_seq) {
return _do_transactions(tls, op_seq, 0);
}
void _do_transaction(Transaction& transaction,
BufferTransaction &bt,
ThreadPool::TPHandle *handle);
int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
TrackedOpRef op = TrackedOpRef(),
ThreadPool::TPHandle *handle = NULL);
// ------------------
// objects
int _generic_read(StripObjectMap::StripObjectHeaderRef header,
uint64_t offset, size_t len, bufferlist& bl,
bool allow_eio = false, BufferTransaction *bt = 0);
int _generic_write(StripObjectMap::StripObjectHeaderRef header,
uint64_t offset, size_t len, const bufferlist& bl,
BufferTransaction &t, uint32_t fadvise_flags = 0);
bool exists(coll_t cid, const ghobject_t& oid);
int stat(coll_t cid, const ghobject_t& oid, struct stat *st,
bool allow_eio = false);
int read(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
bufferlist& bl, uint32_t op_flags = 0, bool allow_eio = false);
int fiemap(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
bufferlist& bl);
int _touch(coll_t cid, const ghobject_t& oid, BufferTransaction &t);
int _write(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
const bufferlist& bl, BufferTransaction &t, uint32_t fadvise_flags = 0);
int _zero(coll_t cid, const ghobject_t& oid, uint64_t offset, size_t len,
BufferTransaction &t);
int _truncate(coll_t cid, const ghobject_t& oid, uint64_t size,
BufferTransaction &t);
int _clone(coll_t cid, const ghobject_t& oldoid, const ghobject_t& newoid,
BufferTransaction &t);
int _clone_range(coll_t cid, const ghobject_t& oldoid,
const ghobject_t& newoid, uint64_t srcoff,
uint64_t len, uint64_t dstoff, BufferTransaction &t);
int _remove(coll_t cid, const ghobject_t& oid, BufferTransaction &t);
int _set_alloc_hint(coll_t cid, const ghobject_t& oid,
uint64_t expected_object_size,
uint64_t expected_write_size,
BufferTransaction &t);
void start_sync() {}
void set_fsid(uuid_d u) { fsid = u; }
uuid_d get_fsid() { return fsid; }
// attrs
int getattr(coll_t cid, const ghobject_t& oid, const char *name,
bufferptr &bp);
int getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& aset);
int _setattrs(coll_t cid, const ghobject_t& oid,
map<string, bufferptr>& aset, BufferTransaction &t);
int _rmattr(coll_t cid, const ghobject_t& oid, const char *name,
BufferTransaction &t);
int _rmattrs(coll_t cid, const ghobject_t& oid, BufferTransaction &t);
// collections
int _collection_hint_expected_num_objs(coll_t cid, uint32_t pg_num,
uint64_t num_objs) const { return 0; }
int _create_collection(coll_t c, BufferTransaction &t);
int _destroy_collection(coll_t c, BufferTransaction &t);
int _collection_add(coll_t c, coll_t ocid, const ghobject_t& oid,
BufferTransaction &t);
int _collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
coll_t c, const ghobject_t& o,
BufferTransaction &t);
int _collection_remove_recursive(const coll_t &cid,
BufferTransaction &t);
int list_collections(vector<coll_t>& ls);
bool collection_exists(coll_t c);
bool collection_empty(coll_t c);
int collection_list(coll_t c, ghobject_t start, ghobject_t end,
bool sort_bitwise, int max,
vector<ghobject_t> *ls, ghobject_t *next);
int collection_version_current(coll_t c, uint32_t *version);
// omap (see ObjectStore.h for documentation)
int omap_get(coll_t c, const ghobject_t &oid, bufferlist *header,
map<string, bufferlist> *out);
int omap_get_header(
coll_t c,
const ghobject_t &oid,
bufferlist *out,
bool allow_eio = false);
int omap_get_keys(coll_t c, const ghobject_t &oid, set<string> *keys);
int omap_get_values(coll_t c, const ghobject_t &oid, const set<string> &keys,
map<string, bufferlist> *out);
int omap_check_keys(coll_t c, const ghobject_t &oid, const set<string> &keys,
set<string> *out);
ObjectMap::ObjectMapIterator get_omap_iterator(coll_t c,
const ghobject_t &oid);
int check_get_rc(const coll_t cid, const ghobject_t& oid, int r, bool is_equal_size);
void dump_start(const std::string &file);
void dump_stop();
void dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq,
OpSequencer *osr);
private:
void _inject_failure() {}
// omap
int _omap_clear(coll_t cid, const ghobject_t &oid,
BufferTransaction &t);
int _omap_setkeys(coll_t cid, const ghobject_t &oid,
map<string, bufferlist> &aset,
BufferTransaction &t);
int _omap_rmkeys(coll_t cid, const ghobject_t &oid, const set<string> &keys,
BufferTransaction &t);
int _omap_rmkeyrange(coll_t cid, const ghobject_t &oid,
const string& first, const string& last,
BufferTransaction &t);
int _omap_setheader(coll_t cid, const ghobject_t &oid, const bufferlist &bl,
BufferTransaction &t);
int _split_collection(coll_t cid, uint32_t bits, uint32_t rem, coll_t dest,
BufferTransaction &t);
int _split_collection_create(coll_t cid, uint32_t bits, uint32_t rem,
coll_t dest, BufferTransaction &t){
return 0;
}
virtual const char** get_tracked_conf_keys() const;
virtual void handle_conf_change(const struct md_config_t *conf,
const std::set <std::string> &changed);
std::string m_osd_rollback_to_cluster_snap;
int m_keyvaluestore_queue_max_ops;
int m_keyvaluestore_queue_max_bytes;
int m_keyvaluestore_strip_size;
uint64_t m_keyvaluestore_max_expected_write_size;
int do_update;
bool m_keyvaluestore_do_dump;
std::ofstream m_keyvaluestore_dump;
JSONFormatter m_keyvaluestore_dump_fmt;
static const string OBJECT_STRIP_PREFIX;
static const string OBJECT_XATTR;
static const string OBJECT_OMAP;
static const string OBJECT_OMAP_HEADER;
static const string OBJECT_OMAP_HEADER_KEY;
static const string COLLECTION;
static const string COLLECTION_ATTR;
static const uint32_t COLLECTION_VERSION = 1;
KVSuperblock superblock;
/**
* write_superblock()
*
* Write superblock to persisent storage
*
* return value: 0 on success, otherwise negative errno
*/
int write_superblock();
/**
* read_superblock()
*
* Fill in KeyValueStore::superblock by reading persistent storage
*
* return value: 0 on success, otherwise negative errno
*/
int read_superblock();
};
WRITE_CLASS_ENCODER(StripObjectMap::StripObjectHeader)
#endif

View File

@ -417,7 +417,7 @@ def set_osd_weight(CFSD_PREFIX, osd_ids, osd_path, weight):
osdmap_file=osdmap_file.name)
output = check_output(cmd, shell=True)
epoch = int(re.findall('#(\d+)', output)[0])
new_crush_file = tempfile.NamedTemporaryFile(delete=False)
old_crush_file = tempfile.NamedTemporaryFile(delete=False)
ret = call("./osdmaptool --export-crush {crush_file} {osdmap_file}".format(osdmap_file=osdmap_file.name,
@ -889,7 +889,7 @@ def main(argv):
# Specify a bad --type
os.mkdir(OSDDIR + "/fakeosd")
cmd = ("./ceph-objectstore-tool --data-path " + OSDDIR + "/{osd} --type foobar --op list --pgid {pg}").format(osd="fakeosd", pg=ONEPG)
ERRORS += test_failure(cmd, "Need a valid --type e.g. filestore, memstore, keyvaluestore")
ERRORS += test_failure(cmd, "Need a valid --type e.g. filestore, memstore")
# Don't specify a data-path
cmd = "./ceph-objectstore-tool --journal-path {dir}/{osd}.journal --type memstore --op list --pgid {pg}".format(dir=OSDDIR, osd=ONEOSD, pg=ONEPG)

View File

@ -104,8 +104,6 @@ TEST_P(StoreTest, collect_metadata) {
ASSERT_NE(pm.count("filestore_f_type"), 0u);
ASSERT_NE(pm.count("backend_filestore_partition_path"), 0u);
ASSERT_NE(pm.count("backend_filestore_dev_node"), 0u);
} else if (GetParam() == string("keyvaluestore")) {
ASSERT_NE(pm.count("keyvaluestore_backend"), 0u);
}
}
@ -3258,7 +3256,6 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(
"memstore",
"filestore",
"keyvaluestore",
"bluestore",
"kstore"));

View File

@ -2220,7 +2220,7 @@ int main(int argc, char **argv)
desc.add_options()
("help", "produce help message")
("type", po::value<string>(&type),
"Arg is one of [filestore (default), memstore, keyvaluestore]")
"Arg is one of [filestore (default), memstore]")
("data-path", po::value<string>(&dpath),
"path to object store, mandatory")
("journal-path", po::value<string>(&jpath),
@ -2466,12 +2466,7 @@ int main(int argc, char **argv)
ObjectStore *fs = ObjectStore::create(g_ceph_context, type, dpath, jpath, flags);
if (fs == NULL) {
cerr << "Need a valid --type e.g. filestore, memstore, keyvaluestore" << std::endl;
if (type == "keyvaluestore") {
cerr << "Add \"keyvaluestore\" to "
<< "enable_experimental_unrecoverable_data_corrupting_features"
<< std::endl;
}
cerr << "Need a valid --type e.g. filestore, memstore" << std::endl;
myexit(1);
}