diff --git a/configure.ac b/configure.ac index c21ace0867f..0da47323e36 100644 --- a/configure.ac +++ b/configure.ac @@ -517,6 +517,18 @@ AC_LANG_POP([C++]) # Find supported SIMD / SSE extensions supported by the compiler AX_INTEL_FEATURES() +# kinetic osd backend? +AC_ARG_WITH([kinetic], + [AS_HELP_STRING([--with-kinetic], [build kinetic support])], + [], + [with_kinetic=no]) +# no pkg-config support yet +#AS_IF([test "x$with_kinetic" = "xyes"], +# [PKG_CHECK_MODULES([KINETIC], [kinetic_client], [], [true])]) +AS_IF([test "x$with_kinetic" = "xyes"], + [AC_DEFINE([HAVE_KINETIC], [1], [Defined if you have kinetic enabled])]) +AM_CONDITIONAL(WITH_KINETIC, [ test "$with_kinetic" = "yes" ]) + # use system libs3? AC_ARG_WITH([system-libs3], [AS_HELP_STRING([--with-system-libs3], [use system libs3])], diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 338db2b5906..885353930d8 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -569,6 +569,14 @@ OPTION(leveldb_paranoid, OPT_BOOL, false) // leveldb paranoid flag OPTION(leveldb_log, OPT_STR, "/dev/null") // enable leveldb log file OPTION(leveldb_compact_on_mount, OPT_BOOL, false) +OPTION(osd_keyvaluedb, OPT_STR, "leveldb") + +OPTION(kinetic_host, OPT_STR, "") // hostname or ip address of a kinetic drive to use +OPTION(kinetic_port, OPT_INT, 8123) // port number of the kinetic drive +OPTION(kinetic_user_id, OPT_INT, 1) // kinetic user to authenticate as +OPTION(kinetic_hmac_key, OPT_STR, "asdfasdf") // kinetic key to authenticate with +OPTION(kinetic_use_ssl, OPT_BOOL, false) // whether to secure kinetic traffic with TLS + /** * osd_client_op_priority and osd_recovery_op_priority adjust the relative * priority of client io vs recovery io. diff --git a/src/os/KeyValueStore.cc b/src/os/KeyValueStore.cc index 3bcbede58ea..82d6a364dcc 100644 --- a/src/os/KeyValueStore.cc +++ b/src/os/KeyValueStore.cc @@ -50,6 +50,10 @@ #include "common/sync_filesystem.h" #include "LevelDBStore.h" +#ifdef HAVE_KINETIC +#include "KineticStore.h" +#endif + #include "common/ceph_crypto.h" using ceph::crypto::SHA1; @@ -590,6 +594,10 @@ int KeyValueStore::mkfs() KeyValueDB *store; if (kv_type == KV_TYPE_LEVELDB) { store = new LevelDBStore(g_ceph_context, current_fn); +#ifdef HAVE_KINETIC + } else if (kv_type == KV_TYPE_KINETIC) { + store = new KineticStore(g_ceph_context); +#endif } else { derr << "KeyValueStore::mkfs error: unknown backend type" << kv_type << dendl; ret = -1; @@ -790,6 +798,10 @@ int KeyValueStore::mount() KeyValueDB *store; if (kv_type == KV_TYPE_LEVELDB) { store = new LevelDBStore(g_ceph_context, current_fn); +#ifdef HAVE_KINETIC + } else if (kv_type == KV_TYPE_KINETIC) { + store = new KineticStore(g_ceph_context); +#endif } else { derr << "KeyValueStore::mount error: unknown backend type" << kv_type << dendl; diff --git a/src/os/KeyValueStore.h b/src/os/KeyValueStore.h index 5f62c5a0bfa..9609016942b 100644 --- a/src/os/KeyValueStore.h +++ b/src/os/KeyValueStore.h @@ -43,6 +43,7 @@ using namespace std; enum kvstore_types { KV_TYPE_NONE = 0, KV_TYPE_LEVELDB, + KV_TYPE_KINETIC, KV_TYPE_OTHER }; @@ -442,7 +443,17 @@ class KeyValueStore : public ObjectStore, bool update_to=false); ~KeyValueStore(); - int _detect_backend() { kv_type = KV_TYPE_LEVELDB; return 0; } + int _detect_backend() { + if (g_conf->osd_keyvaluedb == "leveldb") + kv_type = KV_TYPE_LEVELDB; +#ifdef HAVE_KINETIC + else if (g_conf->osd_keyvaluedb == "kinetic") + kv_type = KV_TYPE_KINETIC; +#endif + else + return -EINVAL; + return 0; + } bool test_mount_in_use(); int version_stamp_is_valid(uint32_t *version); int update_version_stamp(); diff --git a/src/os/KineticStore.cc b/src/os/KineticStore.cc new file mode 100644 index 00000000000..ba77376b539 --- /dev/null +++ b/src/os/KineticStore.cc @@ -0,0 +1,309 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include "KineticStore.h" +#include "common/ceph_crypto.h" + +#include +#include +#include +#include "include/memory.h" +#include +using std::string; +#include "common/perf_counters.h" + +#define dout_subsys ceph_subsys_keyvaluestore + +int KineticStore::init() +{ + // init defaults. caller can override these if they want + // prior to calling open. + host = cct->_conf->kinetic_host; + port = cct->_conf->kinetic_port; + user_id = cct->_conf->kinetic_user_id; + hmac_key = cct->_conf->kinetic_hmac_key; + use_ssl = cct->_conf->kinetic_use_ssl; + return 0; +} + +int KineticStore::do_open(ostream &out, bool create_if_missing) +{ + kinetic::KineticConnectionFactory conn_factory = + kinetic::NewKineticConnectionFactory(); + kinetic::ConnectionOptions options; + options.host = host; + options.port = port; + options.user_id = user_id; + options.hmac_key = hmac_key; + options.use_ssl = use_ssl; + kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10); + if (!status.ok()) { + derr << "Unable to connect to kinetic store " << host << ":" << port + << " : " << status.ToString() << dendl; + return -EINVAL; + } + + PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last); + plb.add_u64_counter(l_kinetic_gets, "kinetic_get"); + plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction"); + logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + return 0; +} + +KineticStore::KineticStore(CephContext *c) : + cct(c), + logger(NULL) +{ + host = c->_conf->kinetic_host; + port = c->_conf->kinetic_port; + user_id = c->_conf->kinetic_user_id; + hmac_key = c->_conf->kinetic_hmac_key; + use_ssl = c->_conf->kinetic_use_ssl; +} + +KineticStore::~KineticStore() +{ + close(); + delete logger; +} + +void KineticStore::close() +{ + kinetic_conn.reset(); + if (logger) + cct->get_perfcounters_collection()->remove(logger); +} + +int KineticStore::submit_transaction(KeyValueDB::Transaction t) +{ + KineticTransactionImpl * _t = + static_cast(t.get()); + + dout(20) << "kinetic submit_transaction" << dendl; + + for (vector::iterator it = _t->ops.begin(); + it != _t->ops.end(); ++it) { + kinetic::KineticStatus status(kinetic::StatusCode::OK, ""); + if (it->type == KINETIC_OP_WRITE) { + string data(it->data.c_str(), it->data.length()); + kinetic::KineticRecord record(data, "", "", + com::seagate::kinetic::client::proto::Message_Algorithm_SHA1); + dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl; + status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION, + record); + dout(30) << "kinetic after put of " << it->key << dendl; + } else { + assert(it->type == KINETIC_OP_DELETE); + dout(30) << "kinetic before delete" << dendl; + status = kinetic_conn->Delete(it->key, "", + kinetic::WriteMode::IGNORE_VERSION); + dout(30) << "kinetic after delete" << dendl; + } + if (!status.ok()) { + derr << "kinetic error submitting transaction: " + << status.message() << dendl; + return -1; + } + } + + logger->inc(l_kinetic_txns); + return 0; +} + +int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t) +{ + return submit_transaction(t); +} + +void KineticStore::KineticTransactionImpl::set( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + dout(30) << "kinetic set key " << key << dendl; + ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl)); +} + +void KineticStore::KineticTransactionImpl::rmkey(const string &prefix, + const string &k) +{ + string key = combine_strings(prefix, k); + dout(30) << "kinetic rm key " << key << dendl; + ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); +} + +void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix) +{ + dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl; + KeyValueDB::Iterator it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + string key = combine_strings(prefix, it->key()); + ops.push_back(KineticOp(KINETIC_OP_DELETE, key)); + dout(30) << "kinetic rm key by prefix: " << key << dendl; + } +} + +int KineticStore::get( + const string &prefix, + const std::set &keys, + std::map *out) +{ + dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl; + for (std::set::const_iterator i = keys.begin(); + i != keys.end(); + ++i) { + unique_ptr record; + string key = combine_strings(prefix, *i); + dout(30) << "before get key " << key << dendl; + kinetic::KineticStatus status = kinetic_conn->Get(key, record); + if (!status.ok()) + break; + dout(30) << "kinetic get got key: " << key << dendl; + out->insert(make_pair(key, to_bufferlist(*record.get()))); + } + logger->inc(l_kinetic_gets); + return 0; +} + +string KineticStore::combine_strings(const string &prefix, const string &value) +{ + string out = prefix; + out.push_back(1); + out.append(value); + return out; +} + +bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record) +{ + bufferlist bl; + bl.append(*(record.value())); + return bl; +} + +int KineticStore::split_key(string in_prefix, string *prefix, string *key) +{ + size_t prefix_len = in_prefix.find('\1'); + if (prefix_len >= in_prefix.size()) + return -EINVAL; + + if (prefix) + *prefix = string(in_prefix, 0, prefix_len); + if (key) + *key= string(in_prefix, prefix_len + 1); + return 0; +} + +KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn), + kinetic_status(kinetic::StatusCode::OK, "") +{ + dout(30) << "kinetic iterator constructor()" << dendl; + const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"; + kinetic::KeyRangeIterator it = + kinetic_conn->IterateKeyRange("", true, last_key, true, 1024); + while (it != kinetic::KeyRangeEnd()) { + try { + keys.insert(*it); + dout(30) << "kinetic iterator added " << *it << dendl; + } catch (std::runtime_error &e) { + kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what()); + return; + } + ++it; + } + keys_iter = keys.begin(); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix) +{ + dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl; + keys_iter = keys.lower_bound(prefix); + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last() +{ + dout(30) << "kinetic iterator seek_to_last()" << dendl; + keys_iter = keys.end(); + if (keys.begin() != keys_iter) + --keys_iter; + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix) +{ + dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl; + keys_iter = keys.upper_bound(prefix + "\2"); + if (keys.begin() == keys_iter) { + keys_iter = keys.end(); + } else { + --keys_iter; + } + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) { + dout(30) << "kinetic iterator upper_bound()" << dendl; + string bound = combine_strings(prefix, after); + keys_iter = keys.upper_bound(bound); + return 0; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) { + dout(30) << "kinetic iterator lower_bound()" << dendl; + string bound = combine_strings(prefix, to); + keys_iter = keys.lower_bound(bound); + return 0; +} + +bool KineticStore::KineticWholeSpaceIteratorImpl::valid() { + dout(30) << "kinetic iterator valid()" << dendl; + return keys_iter != keys.end(); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::next() { + dout(30) << "kinetic iterator next()" << dendl; + if (keys_iter != keys.end()) { + ++keys_iter; + return 0; + } + return -1; +} + +int KineticStore::KineticWholeSpaceIteratorImpl::prev() { + dout(30) << "kinetic iterator prev()" << dendl; + if (keys_iter != keys.begin()) { + --keys_iter; + return 0; + } + keys_iter = keys.end(); + return -1; +} + +string KineticStore::KineticWholeSpaceIteratorImpl::key() { + dout(30) << "kinetic iterator key()" << dendl; + string out_key; + split_key(*keys_iter, NULL, &out_key); + return out_key; +} + +pair KineticStore::KineticWholeSpaceIteratorImpl::raw_key() { + dout(30) << "kinetic iterator raw_key()" << dendl; + string prefix, key; + split_key(*keys_iter, &prefix, &key); + return make_pair(prefix, key); +} + +bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() { + dout(30) << "kinetic iterator value()" << dendl; + unique_ptr record; + kinetic_status = kinetic_conn->Get(*keys_iter, record); + return to_bufferlist(*record.get()); +} + +int KineticStore::KineticWholeSpaceIteratorImpl::status() { + dout(30) << "kinetic iterator status()" << dendl; + return kinetic_status.ok() ? 0 : -1; +} diff --git a/src/os/KineticStore.h b/src/os/KineticStore.h new file mode 100644 index 00000000000..57b8a49b5f0 --- /dev/null +++ b/src/os/KineticStore.h @@ -0,0 +1,159 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef KINETIC_STORE_H +#define KINETIC_STORE_H + +#include "include/types.h" +#include "include/buffer.h" +#include "KeyValueDB.h" +#include +#include +#include +#include "include/memory.h" +#include + +#include +#include "common/errno.h" +#include "common/dout.h" +#include "include/assert.h" +#include "common/Formatter.h" + +#include "common/ceph_context.h" + +class PerfCounters; + +enum { + l_kinetic_first = 34400, + l_kinetic_gets, + l_kinetic_txns, + l_kinetic_last, +}; + +/** + * Uses Kinetic to implement the KeyValueDB interface + */ +class KineticStore : public KeyValueDB { + CephContext *cct; + PerfCounters *logger; + string host; + int port; + int user_id; + string hmac_key; + bool use_ssl; + std::unique_ptr kinetic_conn; + + int do_open(ostream &out, bool create_if_missing); + +public: + KineticStore(CephContext *c); + ~KineticStore(); + + int init(); + + /// Opens underlying db + int open(ostream &out) { + return do_open(out, false); + } + /// Creates underlying db if missing and opens it + int create_and_open(ostream &out) { + return do_open(out, true); + } + + void close(); + + enum KineticOpType { + KINETIC_OP_WRITE, + KINETIC_OP_DELETE, + }; + + struct KineticOp { + KineticOpType type; + std::string key; + bufferlist data; + KineticOp(KineticOpType type, const string &key) : type(type), key(key) {} + KineticOp(KineticOpType type, const string &key, const bufferlist &data) + : type(type), key(key), data(data) {} + }; + + class KineticTransactionImpl : public KeyValueDB::TransactionImpl { + public: + vector ops; + KineticStore *db; + + KineticTransactionImpl(KineticStore *db) : db(db) {} + void set( + const string &prefix, + const string &k, + const bufferlist &bl); + void rmkey( + const string &prefix, + const string &k); + void rmkeys_by_prefix( + const string &prefix + ); + }; + + KeyValueDB::Transaction get_transaction() { + return ceph::shared_ptr< KineticTransactionImpl >( + new KineticTransactionImpl(this)); + } + + int submit_transaction(KeyValueDB::Transaction t); + int submit_transaction_sync(KeyValueDB::Transaction t); + int get( + const string &prefix, + const std::set &key, + std::map *out + ); + + class KineticWholeSpaceIteratorImpl : + public KeyValueDB::WholeSpaceIteratorImpl { + std::set keys; + std::set::iterator keys_iter; + kinetic::BlockingKineticConnection *kinetic_conn; + kinetic::KineticStatus kinetic_status; + public: + KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn); + virtual ~KineticWholeSpaceIteratorImpl() { } + + int seek_to_first() { + return seek_to_first(""); + } + int seek_to_first(const string &prefix); + int seek_to_last(); + int seek_to_last(const string &prefix); + int upper_bound(const string &prefix, const string &after); + int lower_bound(const string &prefix, const string &to); + bool valid(); + int next(); + int prev(); + string key(); + pair raw_key(); + bufferlist value(); + int status(); + }; + + /// Utility + static string combine_strings(const string &prefix, const string &value); + static int split_key(string in_prefix, string *prefix, string *key); + static bufferlist to_bufferlist(const kinetic::KineticRecord &record); + virtual uint64_t get_estimated_size(map &extra) { + // not used by the osd + return 0; + } + + +protected: + WholeSpaceIterator _get_iterator() { + return ceph::shared_ptr( + new KineticWholeSpaceIteratorImpl(kinetic_conn.get())); + } + + // TODO: remove snapshots from interface + WholeSpaceIterator _get_snapshot_iterator() { + return _get_iterator(); + } + +}; + +#endif diff --git a/src/os/Makefile.am b/src/os/Makefile.am index 252c678a643..f6ac4989c4a 100644 --- a/src/os/Makefile.am +++ b/src/os/Makefile.am @@ -29,6 +29,8 @@ if WITH_LIBZFS libos_la_SOURCES += os/ZFSFileStoreBackend.cc endif +libos_la_CXXFLAGS = ${AM_CXXFLAGS} +libos_la_LIBADD = noinst_LTLIBRARIES += libos.la noinst_HEADERS += \ @@ -66,3 +68,9 @@ noinst_LIBRARIES += libos_zfs.a noinst_HEADERS += os/ZFS.h endif +if WITH_KINETIC +libos_la_SOURCES += os/KineticStore.cc +libos_la_CXXFLAGS += -std=gnu++11 +libos_la_LIBADD += -lkinetic_client -lprotobuf -lglog -lgflags libcrypto.a +noinst_HEADERS += os/KineticStore.h +endif diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am index 8a2cb2bda4f..0e8e0586510 100644 --- a/src/osd/Makefile.am +++ b/src/osd/Makefile.am @@ -19,6 +19,8 @@ libosd_la_SOURCES = \ osd/osd_types.cc \ osd/ECUtil.cc \ objclass/class_api.cc + +libosd_la_CXXFLAGS = ${AM_CXXFLAGS} libosd_la_LIBADD = $(LIBOSDC) $(LIBOS) noinst_LTLIBRARIES += libosd.la @@ -45,3 +47,6 @@ noinst_HEADERS += \ osd/Watch.h \ osd/osd_types.h +if WITH_KINETIC +libosd_la_CXXFLAGS += -std=gnu++11 +endif