os: add prototype KineticStore

Implement the KeyValueDB interface using libkinetic_client,
and allow it to be configured as the backend for the KeyValueStore,
running the entire OSD on it.

This prototype implementation has no transaction safety, and is
only suitable as a proof of concept. Since the libkinetic_client
API does not provide reverse iteration over keys without also reading
the value off disk, it implements iterators in a very slow but correct way.
These are used heavily by the KeyValueDB callers, so this is a bottleneck
in performance.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Josh Durgin 2014-05-29 12:23:30 -07:00
parent f51f162ebd
commit 59c00e5fd0
8 changed files with 525 additions and 1 deletions

View File

@ -517,6 +517,18 @@ AC_LANG_POP([C++])
# Find supported SIMD / SSE extensions supported by the compiler
AX_INTEL_FEATURES()
# kinetic osd backend?
AC_ARG_WITH([kinetic],
[AS_HELP_STRING([--with-kinetic], [build kinetic support])],
[],
[with_kinetic=no])
# no pkg-config support yet
#AS_IF([test "x$with_kinetic" = "xyes"],
# [PKG_CHECK_MODULES([KINETIC], [kinetic_client], [], [true])])
AS_IF([test "x$with_kinetic" = "xyes"],
[AC_DEFINE([HAVE_KINETIC], [1], [Defined if you have kinetic enabled])])
AM_CONDITIONAL(WITH_KINETIC, [ test "$with_kinetic" = "yes" ])
# use system libs3?
AC_ARG_WITH([system-libs3],
[AS_HELP_STRING([--with-system-libs3], [use system libs3])],

View File

@ -569,6 +569,14 @@ OPTION(leveldb_paranoid, OPT_BOOL, false) // leveldb paranoid flag
OPTION(leveldb_log, OPT_STR, "/dev/null") // enable leveldb log file
OPTION(leveldb_compact_on_mount, OPT_BOOL, false)
OPTION(osd_keyvaluedb, OPT_STR, "leveldb")
OPTION(kinetic_host, OPT_STR, "") // hostname or ip address of a kinetic drive to use
OPTION(kinetic_port, OPT_INT, 8123) // port number of the kinetic drive
OPTION(kinetic_user_id, OPT_INT, 1) // kinetic user to authenticate as
OPTION(kinetic_hmac_key, OPT_STR, "asdfasdf") // kinetic key to authenticate with
OPTION(kinetic_use_ssl, OPT_BOOL, false) // whether to secure kinetic traffic with TLS
/**
* osd_client_op_priority and osd_recovery_op_priority adjust the relative
* priority of client io vs recovery io.

View File

@ -50,6 +50,10 @@
#include "common/sync_filesystem.h"
#include "LevelDBStore.h"
#ifdef HAVE_KINETIC
#include "KineticStore.h"
#endif
#include "common/ceph_crypto.h"
using ceph::crypto::SHA1;
@ -590,6 +594,10 @@ int KeyValueStore::mkfs()
KeyValueDB *store;
if (kv_type == KV_TYPE_LEVELDB) {
store = new LevelDBStore(g_ceph_context, current_fn);
#ifdef HAVE_KINETIC
} else if (kv_type == KV_TYPE_KINETIC) {
store = new KineticStore(g_ceph_context);
#endif
} else {
derr << "KeyValueStore::mkfs error: unknown backend type" << kv_type << dendl;
ret = -1;
@ -790,6 +798,10 @@ int KeyValueStore::mount()
KeyValueDB *store;
if (kv_type == KV_TYPE_LEVELDB) {
store = new LevelDBStore(g_ceph_context, current_fn);
#ifdef HAVE_KINETIC
} else if (kv_type == KV_TYPE_KINETIC) {
store = new KineticStore(g_ceph_context);
#endif
} else {
derr << "KeyValueStore::mount error: unknown backend type" << kv_type
<< dendl;

View File

@ -43,6 +43,7 @@ using namespace std;
enum kvstore_types {
KV_TYPE_NONE = 0,
KV_TYPE_LEVELDB,
KV_TYPE_KINETIC,
KV_TYPE_OTHER
};
@ -442,7 +443,17 @@ class KeyValueStore : public ObjectStore,
bool update_to=false);
~KeyValueStore();
int _detect_backend() { kv_type = KV_TYPE_LEVELDB; return 0; }
int _detect_backend() {
if (g_conf->osd_keyvaluedb == "leveldb")
kv_type = KV_TYPE_LEVELDB;
#ifdef HAVE_KINETIC
else if (g_conf->osd_keyvaluedb == "kinetic")
kv_type = KV_TYPE_KINETIC;
#endif
else
return -EINVAL;
return 0;
}
bool test_mount_in_use();
int version_stamp_is_valid(uint32_t *version);
int update_version_stamp();

309
src/os/KineticStore.cc Normal file
View File

@ -0,0 +1,309 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "KineticStore.h"
#include "common/ceph_crypto.h"
#include <set>
#include <map>
#include <string>
#include "include/memory.h"
#include <errno.h>
using std::string;
#include "common/perf_counters.h"
#define dout_subsys ceph_subsys_keyvaluestore
int KineticStore::init()
{
// init defaults. caller can override these if they want
// prior to calling open.
host = cct->_conf->kinetic_host;
port = cct->_conf->kinetic_port;
user_id = cct->_conf->kinetic_user_id;
hmac_key = cct->_conf->kinetic_hmac_key;
use_ssl = cct->_conf->kinetic_use_ssl;
return 0;
}
int KineticStore::do_open(ostream &out, bool create_if_missing)
{
kinetic::KineticConnectionFactory conn_factory =
kinetic::NewKineticConnectionFactory();
kinetic::ConnectionOptions options;
options.host = host;
options.port = port;
options.user_id = user_id;
options.hmac_key = hmac_key;
options.use_ssl = use_ssl;
kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10);
if (!status.ok()) {
derr << "Unable to connect to kinetic store " << host << ":" << port
<< " : " << status.ToString() << dendl;
return -EINVAL;
}
PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last);
plb.add_u64_counter(l_kinetic_gets, "kinetic_get");
plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction");
logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(logger);
return 0;
}
KineticStore::KineticStore(CephContext *c) :
cct(c),
logger(NULL)
{
host = c->_conf->kinetic_host;
port = c->_conf->kinetic_port;
user_id = c->_conf->kinetic_user_id;
hmac_key = c->_conf->kinetic_hmac_key;
use_ssl = c->_conf->kinetic_use_ssl;
}
KineticStore::~KineticStore()
{
close();
delete logger;
}
void KineticStore::close()
{
kinetic_conn.reset();
if (logger)
cct->get_perfcounters_collection()->remove(logger);
}
int KineticStore::submit_transaction(KeyValueDB::Transaction t)
{
KineticTransactionImpl * _t =
static_cast<KineticTransactionImpl *>(t.get());
dout(20) << "kinetic submit_transaction" << dendl;
for (vector<KineticOp>::iterator it = _t->ops.begin();
it != _t->ops.end(); ++it) {
kinetic::KineticStatus status(kinetic::StatusCode::OK, "");
if (it->type == KINETIC_OP_WRITE) {
string data(it->data.c_str(), it->data.length());
kinetic::KineticRecord record(data, "", "",
com::seagate::kinetic::client::proto::Message_Algorithm_SHA1);
dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl;
status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION,
record);
dout(30) << "kinetic after put of " << it->key << dendl;
} else {
assert(it->type == KINETIC_OP_DELETE);
dout(30) << "kinetic before delete" << dendl;
status = kinetic_conn->Delete(it->key, "",
kinetic::WriteMode::IGNORE_VERSION);
dout(30) << "kinetic after delete" << dendl;
}
if (!status.ok()) {
derr << "kinetic error submitting transaction: "
<< status.message() << dendl;
return -1;
}
}
logger->inc(l_kinetic_txns);
return 0;
}
int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t)
{
return submit_transaction(t);
}
void KineticStore::KineticTransactionImpl::set(
const string &prefix,
const string &k,
const bufferlist &to_set_bl)
{
string key = combine_strings(prefix, k);
dout(30) << "kinetic set key " << key << dendl;
ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl));
}
void KineticStore::KineticTransactionImpl::rmkey(const string &prefix,
const string &k)
{
string key = combine_strings(prefix, k);
dout(30) << "kinetic rm key " << key << dendl;
ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
}
void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix)
{
dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl;
KeyValueDB::Iterator it = db->get_iterator(prefix);
for (it->seek_to_first();
it->valid();
it->next()) {
string key = combine_strings(prefix, it->key());
ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
dout(30) << "kinetic rm key by prefix: " << key << dendl;
}
}
int KineticStore::get(
const string &prefix,
const std::set<string> &keys,
std::map<string, bufferlist> *out)
{
dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl;
for (std::set<string>::const_iterator i = keys.begin();
i != keys.end();
++i) {
unique_ptr<kinetic::KineticRecord> record;
string key = combine_strings(prefix, *i);
dout(30) << "before get key " << key << dendl;
kinetic::KineticStatus status = kinetic_conn->Get(key, record);
if (!status.ok())
break;
dout(30) << "kinetic get got key: " << key << dendl;
out->insert(make_pair(key, to_bufferlist(*record.get())));
}
logger->inc(l_kinetic_gets);
return 0;
}
string KineticStore::combine_strings(const string &prefix, const string &value)
{
string out = prefix;
out.push_back(1);
out.append(value);
return out;
}
bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record)
{
bufferlist bl;
bl.append(*(record.value()));
return bl;
}
int KineticStore::split_key(string in_prefix, string *prefix, string *key)
{
size_t prefix_len = in_prefix.find('\1');
if (prefix_len >= in_prefix.size())
return -EINVAL;
if (prefix)
*prefix = string(in_prefix, 0, prefix_len);
if (key)
*key= string(in_prefix, prefix_len + 1);
return 0;
}
KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn),
kinetic_status(kinetic::StatusCode::OK, "")
{
dout(30) << "kinetic iterator constructor()" << dendl;
const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
kinetic::KeyRangeIterator it =
kinetic_conn->IterateKeyRange("", true, last_key, true, 1024);
while (it != kinetic::KeyRangeEnd()) {
try {
keys.insert(*it);
dout(30) << "kinetic iterator added " << *it << dendl;
} catch (std::runtime_error &e) {
kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what());
return;
}
++it;
}
keys_iter = keys.begin();
}
int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
{
dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl;
keys_iter = keys.lower_bound(prefix);
return 0;
}
int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last()
{
dout(30) << "kinetic iterator seek_to_last()" << dendl;
keys_iter = keys.end();
if (keys.begin() != keys_iter)
--keys_iter;
return 0;
}
int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
{
dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl;
keys_iter = keys.upper_bound(prefix + "\2");
if (keys.begin() == keys_iter) {
keys_iter = keys.end();
} else {
--keys_iter;
}
return 0;
}
int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) {
dout(30) << "kinetic iterator upper_bound()" << dendl;
string bound = combine_strings(prefix, after);
keys_iter = keys.upper_bound(bound);
return 0;
}
int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) {
dout(30) << "kinetic iterator lower_bound()" << dendl;
string bound = combine_strings(prefix, to);
keys_iter = keys.lower_bound(bound);
return 0;
}
bool KineticStore::KineticWholeSpaceIteratorImpl::valid() {
dout(30) << "kinetic iterator valid()" << dendl;
return keys_iter != keys.end();
}
int KineticStore::KineticWholeSpaceIteratorImpl::next() {
dout(30) << "kinetic iterator next()" << dendl;
if (keys_iter != keys.end()) {
++keys_iter;
return 0;
}
return -1;
}
int KineticStore::KineticWholeSpaceIteratorImpl::prev() {
dout(30) << "kinetic iterator prev()" << dendl;
if (keys_iter != keys.begin()) {
--keys_iter;
return 0;
}
keys_iter = keys.end();
return -1;
}
string KineticStore::KineticWholeSpaceIteratorImpl::key() {
dout(30) << "kinetic iterator key()" << dendl;
string out_key;
split_key(*keys_iter, NULL, &out_key);
return out_key;
}
pair<string,string> KineticStore::KineticWholeSpaceIteratorImpl::raw_key() {
dout(30) << "kinetic iterator raw_key()" << dendl;
string prefix, key;
split_key(*keys_iter, &prefix, &key);
return make_pair(prefix, key);
}
bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() {
dout(30) << "kinetic iterator value()" << dendl;
unique_ptr<kinetic::KineticRecord> record;
kinetic_status = kinetic_conn->Get(*keys_iter, record);
return to_bufferlist(*record.get());
}
int KineticStore::KineticWholeSpaceIteratorImpl::status() {
dout(30) << "kinetic iterator status()" << dendl;
return kinetic_status.ok() ? 0 : -1;
}

159
src/os/KineticStore.h Normal file
View File

@ -0,0 +1,159 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef KINETIC_STORE_H
#define KINETIC_STORE_H
#include "include/types.h"
#include "include/buffer.h"
#include "KeyValueDB.h"
#include <set>
#include <map>
#include <string>
#include "include/memory.h"
#include <kinetic/kinetic.h>
#include <errno.h>
#include "common/errno.h"
#include "common/dout.h"
#include "include/assert.h"
#include "common/Formatter.h"
#include "common/ceph_context.h"
class PerfCounters;
enum {
l_kinetic_first = 34400,
l_kinetic_gets,
l_kinetic_txns,
l_kinetic_last,
};
/**
* Uses Kinetic to implement the KeyValueDB interface
*/
class KineticStore : public KeyValueDB {
CephContext *cct;
PerfCounters *logger;
string host;
int port;
int user_id;
string hmac_key;
bool use_ssl;
std::unique_ptr<kinetic::BlockingKineticConnection> kinetic_conn;
int do_open(ostream &out, bool create_if_missing);
public:
KineticStore(CephContext *c);
~KineticStore();
int init();
/// Opens underlying db
int open(ostream &out) {
return do_open(out, false);
}
/// Creates underlying db if missing and opens it
int create_and_open(ostream &out) {
return do_open(out, true);
}
void close();
enum KineticOpType {
KINETIC_OP_WRITE,
KINETIC_OP_DELETE,
};
struct KineticOp {
KineticOpType type;
std::string key;
bufferlist data;
KineticOp(KineticOpType type, const string &key) : type(type), key(key) {}
KineticOp(KineticOpType type, const string &key, const bufferlist &data)
: type(type), key(key), data(data) {}
};
class KineticTransactionImpl : public KeyValueDB::TransactionImpl {
public:
vector<KineticOp> ops;
KineticStore *db;
KineticTransactionImpl(KineticStore *db) : db(db) {}
void set(
const string &prefix,
const string &k,
const bufferlist &bl);
void rmkey(
const string &prefix,
const string &k);
void rmkeys_by_prefix(
const string &prefix
);
};
KeyValueDB::Transaction get_transaction() {
return ceph::shared_ptr< KineticTransactionImpl >(
new KineticTransactionImpl(this));
}
int submit_transaction(KeyValueDB::Transaction t);
int submit_transaction_sync(KeyValueDB::Transaction t);
int get(
const string &prefix,
const std::set<string> &key,
std::map<string, bufferlist> *out
);
class KineticWholeSpaceIteratorImpl :
public KeyValueDB::WholeSpaceIteratorImpl {
std::set<std::string> keys;
std::set<std::string>::iterator keys_iter;
kinetic::BlockingKineticConnection *kinetic_conn;
kinetic::KineticStatus kinetic_status;
public:
KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn);
virtual ~KineticWholeSpaceIteratorImpl() { }
int seek_to_first() {
return seek_to_first("");
}
int seek_to_first(const string &prefix);
int seek_to_last();
int seek_to_last(const string &prefix);
int upper_bound(const string &prefix, const string &after);
int lower_bound(const string &prefix, const string &to);
bool valid();
int next();
int prev();
string key();
pair<string,string> raw_key();
bufferlist value();
int status();
};
/// Utility
static string combine_strings(const string &prefix, const string &value);
static int split_key(string in_prefix, string *prefix, string *key);
static bufferlist to_bufferlist(const kinetic::KineticRecord &record);
virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) {
// not used by the osd
return 0;
}
protected:
WholeSpaceIterator _get_iterator() {
return ceph::shared_ptr<KeyValueDB::WholeSpaceIteratorImpl>(
new KineticWholeSpaceIteratorImpl(kinetic_conn.get()));
}
// TODO: remove snapshots from interface
WholeSpaceIterator _get_snapshot_iterator() {
return _get_iterator();
}
};
#endif

View File

@ -29,6 +29,8 @@ if WITH_LIBZFS
libos_la_SOURCES += os/ZFSFileStoreBackend.cc
endif
libos_la_CXXFLAGS = ${AM_CXXFLAGS}
libos_la_LIBADD =
noinst_LTLIBRARIES += libos.la
noinst_HEADERS += \
@ -66,3 +68,9 @@ noinst_LIBRARIES += libos_zfs.a
noinst_HEADERS += os/ZFS.h
endif
if WITH_KINETIC
libos_la_SOURCES += os/KineticStore.cc
libos_la_CXXFLAGS += -std=gnu++11
libos_la_LIBADD += -lkinetic_client -lprotobuf -lglog -lgflags libcrypto.a
noinst_HEADERS += os/KineticStore.h
endif

View File

@ -19,6 +19,8 @@ libosd_la_SOURCES = \
osd/osd_types.cc \
osd/ECUtil.cc \
objclass/class_api.cc
libosd_la_CXXFLAGS = ${AM_CXXFLAGS}
libosd_la_LIBADD = $(LIBOSDC) $(LIBOS)
noinst_LTLIBRARIES += libosd.la
@ -45,3 +47,6 @@ noinst_HEADERS += \
osd/Watch.h \
osd/osd_types.h
if WITH_KINETIC
libosd_la_CXXFLAGS += -std=gnu++11
endif