mon: MonitorDBStore: add store iterators to obtain chunks for sync

We created an interface specific to the MonitorDBStore, which can be used
to create iterators to obtain chunks for sync.

Two different iterators were defined: one that will iterate over the whole
store, focusing on the specified set of prefixes; another that will
iterate over only one specific prefix.

These two different iterators allow us build the sync process in two
distinct phases: 1) obtain all key/value pairs for paxos and all paxos
services, bundle them in chunks and send them over the wire; and 2) obtain
all the paxos versions, bundle them in chunks and send them over the wire.

Also, we are currently considering a chunk to be (at most) 1 MB worth of
data, although it can be tuned using 'mon_sync_max_payload_size' option.

mon: MonitorDBStore: add crc support when --mon-sync-debug is set

Signed-off-by: Joao Eduardo Luis <joao.luis@inktank.com>
This commit is contained in:
Joao Eduardo Luis 2012-08-15 15:35:39 +01:00
parent b33d4eacaa
commit d8a5cf6b4f
2 changed files with 173 additions and 0 deletions

View File

@ -155,6 +155,8 @@ OPTION(mon_slurp_bytes, OPT_INT, 256*1024) // limit size of slurp messages
OPTION(mon_client_bytes, OPT_U64, 100ul << 20) // client msg data allowed in memory (in bytes)
OPTION(mon_daemon_bytes, OPT_U64, 400ul << 20) // mds, osd message memory cap (in bytes)
OPTION(mon_max_log_entries_per_event, OPT_INT, 4096)
OPTION(mon_sync_max_payload_size, OPT_U32, 1048576) // max size for a sync chunk payload (say, 1MB)
OPTION(mon_sync_debug, OPT_BOOL, false) // enable sync-specific debug
OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first slurp
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity

View File

@ -187,6 +187,177 @@ class MonitorDBStore
return db->submit_transaction_sync(dbt);
}
class StoreIteratorImpl {
protected:
bool done;
pair<string,string> last_key;
bufferlist crc_bl;
StoreIteratorImpl() : done(false) { }
virtual ~StoreIteratorImpl() { }
bool add_chunk_entry(Transaction &tx,
string &prefix,
string &key,
bufferlist &value) {
Transaction tmp;
bufferlist tmp_bl;
tmp.put(prefix, key, value);
tmp.encode(tmp_bl);
bufferlist tx_bl;
tx.encode(tx_bl);
size_t len = tx_bl.length() + tmp_bl.length();
if (!tx.empty() && (len > g_conf->mon_sync_max_payload_size)) {
return false;
}
tx.append(tmp);
last_key.first = prefix;
last_key.second = key;
if (g_conf->mon_sync_debug) {
::encode(prefix, crc_bl);
::encode(key, crc_bl);
::encode(value, crc_bl);
}
return true;
}
virtual void _get_chunk(Transaction &tx) = 0;
virtual bool _is_valid() = 0;
public:
__u32 crc() {
if (g_conf->mon_sync_debug)
return crc_bl.crc32c(0);
return 0;
}
pair<string,string> get_last_key() {
return last_key;
};
virtual bool has_next_chunk() {
return !done && _is_valid();
}
virtual void get_chunk(bufferlist &bl) {
Transaction tx;
_get_chunk(tx);
if (!tx.empty())
tx.encode(bl);
}
};
typedef std::tr1::shared_ptr<StoreIteratorImpl> Synchronizer;
class WholeStoreIteratorImpl : public StoreIteratorImpl {
KeyValueDB::WholeSpaceIterator iter;
set<string> sync_prefixes;
public:
WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
set<string> &prefixes)
: StoreIteratorImpl(),
iter(iter),
sync_prefixes(prefixes)
{ }
virtual ~WholeStoreIteratorImpl() { }
/**
* Obtain a chunk of the store
*
* @param bl Encoded transaction that will recreate the chunk
* @param first_key Pair containing the first key to obtain, and that
* will contain the first key in the chunk (that may
* differ from the one passed on to the function)
* @param last_key[out] Last key in the chunk
*/
virtual void _get_chunk(Transaction &tx) {
assert(done == false);
assert(iter->valid() == true);
while (iter->valid()) {
string prefix(iter->raw_key().first);
string key(iter->raw_key().second);
if (sync_prefixes.count(prefix)) {
bufferlist value = iter->value();
if (!add_chunk_entry(tx, prefix, key, value))
return;
}
iter->next();
}
assert(iter->valid() == false);
done = true;
}
virtual bool _is_valid() {
return iter->valid();
}
};
class SinglePrefixStoreIteratorImpl : public StoreIteratorImpl {
KeyValueDB::Iterator iter;
string prefix;
public:
SinglePrefixStoreIteratorImpl(KeyValueDB::Iterator iter, string prefix)
: StoreIteratorImpl(),
iter(iter),
prefix(prefix)
{ }
virtual ~SinglePrefixStoreIteratorImpl() { }
private:
virtual void _get_chunk(Transaction &tx) {
assert(done == false);
assert(iter->valid() == true);
while (iter->valid()) {
string key(iter->key());
bufferlist value = iter->value();
if (!add_chunk_entry(tx, prefix, key, value))
return;
iter->next();
}
assert(iter->valid() == false);
done = true;
}
virtual bool _is_valid() {
return iter->valid();
}
};
Synchronizer get_synchronizer(pair<string,string> &key,
set<string> &prefixes) {
KeyValueDB::WholeSpaceIterator iter;
iter = db->get_snapshot_iterator();
if (!key.first.empty() && !key.second.empty())
iter->upper_bound(key.first, key.second);
else
iter->seek_to_first();
return std::tr1::shared_ptr<StoreIteratorImpl>(
new WholeStoreIteratorImpl(iter, prefixes)
);
}
Synchronizer get_synchronizer(string &prefix) {
assert(!prefix.empty());
KeyValueDB::Iterator iter;
iter = db->get_snapshot_iterator(prefix);
iter->seek_to_first();
return std::tr1::shared_ptr<StoreIteratorImpl>(
new SinglePrefixStoreIteratorImpl(iter, prefix)
);
}
int get(const string& prefix, const string& key, bufferlist& bl) {
set<string> k;
k.insert(key);