Merge pull request #6425 from branch-predictor/bp-delayed-pglog-index-v2

osd: delay populating in-memory PG log hashmaps

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2016-01-14 19:02:30 -05:00
commit c5a3c2b1e4
2 changed files with 185 additions and 81 deletions

View File

@ -124,7 +124,7 @@ void PGLog::IndexedLog::trim(
tail = s;
}
ostream& PGLog::IndexedLog::print(ostream& out) const
ostream& PGLog::IndexedLog::print(ostream& out) const
{
out << *this << std::endl;
for (list<pg_log_entry_t>::const_iterator p = log.begin();
@ -864,8 +864,8 @@ void PGLog::_write_log(
::encode(divergent_priors, (*km)["divergent_priors"]);
}
if (require_rollback) {
::encode(log.can_rollback_to, (*km)["can_rollback_to"]);
::encode(log.rollback_info_trimmed_to, (*km)["rollback_info_trimmed_to"]);
::encode(log.can_rollback_to, (*km)["can_rollback_to"]);
::encode(log.rollback_info_trimmed_to, (*km)["rollback_info_trimmed_to"]);
}
if (!to_remove.empty())
@ -899,34 +899,34 @@ void PGLog::read_log(ObjectStore *store, coll_t pg_coll,
for (p->seek_to_first(); p->valid() ; p->next(false)) {
// non-log pgmeta_oid keys are prefixed with _; skip those
if (p->key()[0] == '_')
continue;
continue;
bufferlist bl = p->value();//Copy bufferlist before creating iterator
bufferlist::iterator bp = bl.begin();
if (p->key() == "divergent_priors") {
::decode(divergent_priors, bp);
dout(20) << "read_log " << divergent_priors.size() << " divergent_priors" << dendl;
::decode(divergent_priors, bp);
dout(20) << "read_log " << divergent_priors.size() << " divergent_priors" << dendl;
} else if (p->key() == "can_rollback_to") {
::decode(log.can_rollback_to, bp);
::decode(log.can_rollback_to, bp);
} else if (p->key() == "rollback_info_trimmed_to") {
::decode(log.rollback_info_trimmed_to, bp);
::decode(log.rollback_info_trimmed_to, bp);
} else {
pg_log_entry_t e;
e.decode_with_checksum(bp);
dout(20) << "read_log " << e << dendl;
if (!log.log.empty()) {
pg_log_entry_t last_e(log.log.back());
assert(last_e.version.version < e.version.version);
assert(last_e.version.epoch <= e.version.epoch);
}
log.log.push_back(e);
log.head = e.version;
if (log_keys_debug)
log_keys_debug->insert(e.get_key_name());
pg_log_entry_t e;
e.decode_with_checksum(bp);
dout(20) << "read_log " << e << dendl;
if (!log.log.empty()) {
pg_log_entry_t last_e(log.log.back());
assert(last_e.version.version < e.version.version);
assert(last_e.version.epoch <= e.version.epoch);
}
log.log.push_back(e);
log.head = e.version;
if (log_keys_debug)
log_keys_debug->insert(e.get_key_name());
}
}
}
log.head = info.last_update;
log.index();
log.reset_riter();
// build missing
if (info.last_complete < info.last_update) {

View File

@ -25,6 +25,11 @@
#include <list>
using namespace std;
#define PGLOG_INDEXED_OBJECTS (1 << 0)
#define PGLOG_INDEXED_CALLER_OPS (1 << 1)
#define PGLOG_INDEXED_EXTRA_CALLER_OPS (1 << 2)
#define PGLOG_INDEXED_ALL (PGLOG_INDEXED_OBJECTS | PGLOG_INDEXED_CALLER_OPS | PGLOG_INDEXED_EXTRA_CALLER_OPS)
struct PGLog {
////////////////////////////// sub classes //////////////////////////////
struct LogEntryHandler {
@ -55,9 +60,9 @@ struct PGLog {
* plus some methods to manipulate it all.
*/
struct IndexedLog : public pg_log_t {
ceph::unordered_map<hobject_t,pg_log_entry_t*> objects; // ptrs into log. be careful!
ceph::unordered_map<osd_reqid_t,pg_log_entry_t*> caller_ops;
ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*> extra_caller_ops;
mutable ceph::unordered_map<hobject_t,pg_log_entry_t*> objects; // ptrs into log. be careful!
mutable ceph::unordered_map<osd_reqid_t,pg_log_entry_t*> caller_ops;
mutable ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*> extra_caller_ops;
// recovery pointers
list<pg_log_entry_t>::iterator complete_to; // not inclusive of referenced item
@ -65,6 +70,7 @@ struct PGLog {
//
private:
mutable __u16 indexed_data;
/**
* rollback_info_trimmed_to_riter points to the first log entry <=
* rollback_info_trimmed_to
@ -80,6 +86,7 @@ struct PGLog {
IndexedLog() :
complete_to(log.end()),
last_requested(0),
indexed_data(0),
rollback_info_trimmed_to_riter(log.rbegin())
{}
@ -121,11 +128,25 @@ struct PGLog {
}
bool logged_object(const hobject_t& oid) const {
if (!(indexed_data & PGLOG_INDEXED_OBJECTS)) {
index_objects();
}
return objects.count(oid);
}
bool logged_req(const osd_reqid_t &r) const {
return caller_ops.count(r) || extra_caller_ops.count(r);
if (!(indexed_data & PGLOG_INDEXED_CALLER_OPS)) {
index_caller_ops();
}
if (!caller_ops.count(r)) {
if (!(indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS)) {
index_extra_caller_ops();
}
return extra_caller_ops.count(r);
}
return true;
}
bool get_request(
const osd_reqid_t &r,
eversion_t *replay_version,
@ -133,6 +154,9 @@ struct PGLog {
assert(replay_version);
assert(user_version);
ceph::unordered_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p;
if (!(indexed_data & PGLOG_INDEXED_CALLER_OPS)) {
index_caller_ops();
}
p = caller_ops.find(r);
if (p != caller_ops.end()) {
*replay_version = p->second->version;
@ -142,6 +166,9 @@ struct PGLog {
// warning: we will return *a* request for this reqid, but not
// necessarily the most recent.
if (!(indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS)) {
index_extra_caller_ops();
}
p = extra_caller_ops.find(r);
if (p != extra_caller_ops.end()) {
for (vector<pair<osd_reqid_t, version_t> >::const_iterator i =
@ -162,8 +189,11 @@ struct PGLog {
/// get a (bounded) list of recent reqids for the given object
void get_object_reqids(const hobject_t& oid, unsigned max,
vector<pair<osd_reqid_t, version_t> > *pls) const {
// make sure object is present at least once before we do an
// O(n) search.
// make sure object is present at least once before we do an
// O(n) search.
if (!(indexed_data & PGLOG_INDEXED_OBJECTS)) {
index_objects();
}
if (objects.count(oid) == 0)
return;
for (list<pg_log_entry_t>::const_reverse_iterator i = log.rbegin();
@ -182,75 +212,142 @@ struct PGLog {
}
}
}
void reset_riter() {
rollback_info_trimmed_to_riter = log.rbegin();
while (rollback_info_trimmed_to_riter != log.rend() &&
rollback_info_trimmed_to_riter->version > rollback_info_trimmed_to)
++rollback_info_trimmed_to_riter;
}
// indexes objects, caller ops and extra caller ops
void index() {
objects.clear();
caller_ops.clear();
extra_caller_ops.clear();
for (list<pg_log_entry_t>::iterator i = log.begin();
i != log.end();
++i) {
i != log.end();
++i) {
objects[i->soid] = &(*i);
if (i->reqid_is_indexed()) {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[i->reqid] = &(*i);
}
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
i->extra_reqids.begin();
j != i->extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(j->first, &(*i)));
}
if (i->reqid_is_indexed()) {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[i->reqid] = &(*i);
}
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
i->extra_reqids.begin();
j != i->extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(j->first, &(*i)));
}
}
reset_riter();
indexed_data = PGLOG_INDEXED_ALL;
}
rollback_info_trimmed_to_riter = log.rbegin();
while (rollback_info_trimmed_to_riter != log.rend() &&
rollback_info_trimmed_to_riter->version > rollback_info_trimmed_to)
++rollback_info_trimmed_to_riter;
void index_objects() const {
objects.clear();
for (list<pg_log_entry_t>::const_iterator i = log.begin();
i != log.end();
++i) {
objects[i->soid] = const_cast<pg_log_entry_t*>(&(*i));
}
indexed_data |= PGLOG_INDEXED_OBJECTS;
}
void index_caller_ops() const {
caller_ops.clear();
for (list<pg_log_entry_t>::const_iterator i = log.begin();
i != log.end();
++i) {
if (i->reqid_is_indexed()) {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[i->reqid] = const_cast<pg_log_entry_t*>(&(*i));
}
}
indexed_data |= PGLOG_INDEXED_CALLER_OPS;
}
void index_extra_caller_ops() const {
extra_caller_ops.clear();
for (list<pg_log_entry_t>::const_iterator i = log.begin();
i != log.end();
++i) {
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
i->extra_reqids.begin();
j != i->extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(j->first, const_cast<pg_log_entry_t*>(&(*i))));
}
}
indexed_data |= PGLOG_INDEXED_EXTRA_CALLER_OPS;
}
void index(pg_log_entry_t& e) {
if (objects.count(e.soid) == 0 ||
objects[e.soid]->version < e.version)
objects[e.soid] = &e;
if (e.reqid_is_indexed()) {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[e.reqid] = &e;
if (indexed_data & PGLOG_INDEXED_OBJECTS) {
if (objects.count(e.soid) == 0 ||
objects[e.soid]->version < e.version)
objects[e.soid] = &e;
}
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(j->first, &e));
if (indexed_data & PGLOG_INDEXED_CALLER_OPS) {
if (e.reqid_is_indexed()) {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[e.reqid] = &e;
}
}
if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(j->first, &e));
}
}
}
void unindex() {
objects.clear();
caller_ops.clear();
extra_caller_ops.clear();
indexed_data = 0;
}
void unindex(pg_log_entry_t& e) {
// NOTE: this only works if we remove from the _tail_ of the log!
if (objects.count(e.soid) && objects[e.soid]->version == e.version)
objects.erase(e.soid);
if (e.reqid_is_indexed()) {
if (caller_ops.count(e.reqid) && // divergent merge_log indexes new before unindexing old
caller_ops[e.reqid] == &e)
caller_ops.erase(e.reqid);
if (indexed_data & PGLOG_INDEXED_OBJECTS) {
if (objects.count(e.soid) && objects[e.soid]->version == e.version)
objects.erase(e.soid);
}
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
for (ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*>::iterator k =
extra_caller_ops.find(j->first);
k != extra_caller_ops.end() && k->first == j->first;
++k) {
if (k->second == &e) {
extra_caller_ops.erase(k);
break;
}
}
if (e.reqid_is_indexed()) {
if (indexed_data & PGLOG_INDEXED_CALLER_OPS) {
if (caller_ops.count(e.reqid) && // divergent merge_log indexes new before unindexing old
caller_ops[e.reqid] == &e)
caller_ops.erase(e.reqid);
}
}
if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
for (ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*>::iterator k =
extra_caller_ops.find(j->first);
k != extra_caller_ops.end() && k->first == j->first;
++k) {
if (k->second == &e) {
extra_caller_ops.erase(k);
break;
}
}
}
}
}
@ -274,15 +371,22 @@ struct PGLog {
head = e.version;
// to our index
objects[e.soid] = &(log.back());
if (e.reqid_is_indexed()) {
caller_ops[e.reqid] = &(log.back());
if (indexed_data & PGLOG_INDEXED_OBJECTS) {
objects[e.soid] = &(log.back());
}
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(j->first, &(log.back())));
if (indexed_data & PGLOG_INDEXED_CALLER_OPS) {
if (e.reqid_is_indexed()) {
caller_ops[e.reqid] = &(log.back());
}
}
if (indexed_data & PGLOG_INDEXED_EXTRA_CALLER_OPS) {
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(j->first, &(log.back())));
}
}
}