Logging rocksdb transaction in ceph log

The rocksdb transaction is now been logged into ceph log based
on debug level. If transaction is failed, the error code,
error string and the entire transaction is dumped in the log.

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
This commit is contained in:
Somnath Roy 2016-09-08 14:25:45 -04:00
parent 9096ad37f2
commit 46f1f9e5ca
2 changed files with 114 additions and 2 deletions

View File

@ -13,7 +13,6 @@
#include "rocksdb/db.h"
#include "rocksdb/table.h"
#include "rocksdb/env.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/slice.h"
#include "rocksdb/cache.h"
#include "rocksdb/filter_policy.h"
@ -366,7 +365,18 @@ int RocksDBStore::submit_transaction(KeyValueDB::Transaction t)
static_cast<RocksDBTransactionImpl *>(t.get());
rocksdb::WriteOptions woptions;
woptions.disableWAL = disableWAL;
lgeneric_subdout(cct, rocksdb, 30) << __func__;
RocksWBHandler bat_txc;
_t->bat->Iterate(&bat_txc);
*_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
rocksdb::Status s = db->Write(woptions, _t->bat);
if (!s.ok()) {
RocksWBHandler rocks_txc;
_t->bat->Iterate(&rocks_txc);
derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
<< " Rocksdb transaction: " << rocks_txc.seen << dendl;
}
utime_t lat = ceph_clock_now(g_ceph_context) - start;
logger->inc(l_rocksdb_txns);
logger->tinc(l_rocksdb_submit_latency, lat);
@ -381,7 +391,18 @@ int RocksDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
rocksdb::WriteOptions woptions;
woptions.sync = true;
woptions.disableWAL = disableWAL;
lgeneric_subdout(cct, rocksdb, 30) << __func__;
RocksWBHandler bat_txc;
_t->bat->Iterate(&bat_txc);
*_dout << " Rocksdb transaction: " << bat_txc.seen << dendl;
rocksdb::Status s = db->Write(woptions, _t->bat);
if (!s.ok()) {
RocksWBHandler rocks_txc;
_t->bat->Iterate(&rocks_txc);
derr << __func__ << " error: " << s.ToString() << " code = " << s.code()
<< " Rocksdb transaction: " << rocks_txc.seen << dendl;
}
utime_t lat = ceph_clock_now(g_ceph_context) - start;
logger->inc(l_rocksdb_txns_sync);
logger->tinc(l_rocksdb_submit_sync_latency, lat);

View File

@ -11,7 +11,7 @@
#include <string>
#include <memory>
#include <boost/scoped_ptr.hpp>
#include "rocksdb/write_batch.h"
#include <errno.h>
#include "common/errno.h"
#include "common/dout.h"
@ -137,6 +137,97 @@ public:
int create_and_open(ostream &out);
void close();
struct RocksWBHandler: public rocksdb::WriteBatch::Handler {
std::string seen ;
int num_seen = 0;
static string pretty_binary_string(const string& in) {
char buf[10];
string out;
out.reserve(in.length() * 3);
enum { NONE, HEX, STRING } mode = NONE;
unsigned from = 0, i;
for (i=0; i < in.length(); ++i) {
if ((in[i] < 32 || (unsigned char)in[i] > 126) ||
(mode == HEX && in.length() - i >= 4 &&
((in[i] < 32 || (unsigned char)in[i] > 126) ||
(in[i+1] < 32 || (unsigned char)in[i+1] > 126) ||
(in[i+2] < 32 || (unsigned char)in[i+2] > 126) ||
(in[i+3] < 32 || (unsigned char)in[i+3] > 126)))) {
if (mode == STRING) {
out.append(in.substr(from, i - from));
out.push_back('\'');
}
if (mode != HEX) {
out.append("0x");
mode = HEX;
}
if (in.length() - i >= 4) {
// print a whole u32 at once
snprintf(buf, sizeof(buf), "%08x",
(uint32_t)(((unsigned char)in[i] << 24) |
((unsigned char)in[i+1] << 16) |
((unsigned char)in[i+2] << 8) |
((unsigned char)in[i+3] << 0)));
i += 3;
} else {
snprintf(buf, sizeof(buf), "%02x", (int)(unsigned char)in[i]);
}
out.append(buf);
} else {
if (mode != STRING) {
out.push_back('\'');
mode = STRING;
from = i;
}
}
}
if (mode == STRING) {
out.append(in.substr(from, i - from));
out.push_back('\'');
}
return out;
}
virtual void Put(const rocksdb::Slice& key,
const rocksdb::Slice& value) override {
string prefix ((key.ToString()).substr(0,1));
string key_to_decode ((key.ToString()).substr(2,string::npos));
uint64_t size = (value.ToString()).size();
seen += "\nPut( Prefix = " + prefix + " key = "
+ pretty_binary_string(key_to_decode)
+ " Value size = " + std::to_string(size) + ")";
num_seen++;
}
virtual void SingleDelete(const rocksdb::Slice& key) override {
string prefix ((key.ToString()).substr(0,1));
string key_to_decode ((key.ToString()).substr(2,string::npos));
seen += "\nSingleDelete(Prefix = "+ prefix + " Key = "
+ pretty_binary_string(key_to_decode) + ")";
num_seen++;
}
virtual void Delete(const rocksdb::Slice& key) override {
string prefix ((key.ToString()).substr(0,1));
string key_to_decode ((key.ToString()).substr(2,string::npos));
seen += "\nDelete( Prefix = " + prefix + " key = "
+ pretty_binary_string(key_to_decode) + ")";
num_seen++;
}
virtual void Merge(const rocksdb::Slice& key,
const rocksdb::Slice& value) override {
string prefix ((key.ToString()).substr(0,1));
string key_to_decode ((key.ToString()).substr(2,string::npos));
uint64_t size = (value.ToString()).size();
seen += "\nMerge( Prefix = " + prefix + " key = "
+ pretty_binary_string(key_to_decode) + " Value size = "
+ std::to_string(size) + ")";
num_seen++;
}
virtual bool Continue() override { return num_seen < 50; }
};
class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
public: