os/kstore: fix KStore rm_coll handler to behave similar to bluestore one

Signed-off-by: Igor Fedotov <ifedotov@mirantis.com>
This commit is contained in:
Igor Fedotov 2016-10-18 13:42:12 +00:00
parent 4a9c8e07b5
commit 5b36a93fd7
2 changed files with 86 additions and 28 deletions

View File

@ -1397,14 +1397,42 @@ int KStore::collection_list(
bool sort_bitwise, int max,
vector<ghobject_t> *ls, ghobject_t *pnext)
{
dout(15) << __func__ << " " << cid
<< " start " << start << " end " << end << " max " << max << dendl;
if (!sort_bitwise)
return -EOPNOTSUPP;
CollectionRef c = _get_collection(cid);
CollectionHandle c = _get_collection(cid);
if (!c)
return -ENOENT;
RWLock::RLocker l(c->lock);
return collection_list(c, start, end, sort_bitwise, max, ls, pnext);
}
int KStore::collection_list(
CollectionHandle &c_, ghobject_t start, ghobject_t end,
bool sort_bitwise, int max,
vector<ghobject_t> *ls, ghobject_t *pnext)
{
Collection *c = static_cast<Collection*>(c_.get());
dout(15) << __func__ << " " << c->cid
<< " start " << start << " end " << end << " max " << max << dendl;
int r;
{
RWLock::RLocker l(c->lock);
r = _collection_list(c, start, end, sort_bitwise, max, ls, pnext);
}
dout(10) << __func__ << " " << c->cid
<< " start " << start << " end " << end << " max " << max
<< " = " << r << ", ls.size() = " << ls->size()
<< ", next = " << (pnext ? *pnext : ghobject_t()) << dendl;
return r;
}
int KStore::_collection_list(
Collection* c, ghobject_t start, ghobject_t end,
bool sort_bitwise, int max,
vector<ghobject_t> *ls, ghobject_t *pnext)
{
if (!sort_bitwise)
return -EOPNOTSUPP;
int r = 0;
KeyValueDB::Iterator it;
string temp_start_key, temp_end_key;
@ -1417,9 +1445,11 @@ int KStore::collection_list(
if (!pnext)
pnext = &static_next;
if (start == ghobject_t::get_max())
if (start == ghobject_t::get_max() ||
start.hobj.is_max()) {
goto out;
get_coll_key_range(cid, c->cnode.bits, &temp_start_key, &temp_end_key,
}
get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
&start_key, &end_key);
dout(20) << __func__
<< " range " << pretty_binary_string(temp_start_key)
@ -1428,7 +1458,7 @@ int KStore::collection_list(
<< " to " << pretty_binary_string(end_key)
<< " start " << start << dendl;
it = db->get_iterator(PREFIX_OBJ);
if (start == ghobject_t() || start == cid.get_min_hobj()) {
if (start == ghobject_t() || start == c->cid.get_min_hobj()) {
it->upper_bound(temp_start_key);
temp = true;
} else {
@ -1492,14 +1522,10 @@ int KStore::collection_list(
ls->push_back(oid);
it->next();
}
out:
if (!set_next) {
*pnext = ghobject_t::get_max();
}
out:
dout(10) << __func__ << " " << cid
<< " start " << start << " end " << end << " max " << max
<< " = " << r << ", ls.size() = " << ls->size()
<< ", next = " << *pnext << dendl;
return r;
}
@ -2231,9 +2257,9 @@ void KStore::_txc_add_transaction(TransContext *txc, Transaction *t)
break;
}
if (r < 0) {
dout(0) << " error " << cpp_strerror(r)
<< " not handled on operation " << op->op
<< " (op " << pos << ", counting from 0)" << dendl;
derr << " error " << cpp_strerror(r)
<< " not handled on operation " << op->op
<< " (op " << pos << ", counting from 0)" << dendl;
dout(0) << " transaction dump:\n";
JSONFormatter f(true);
f.open_object_section("transaction");
@ -3298,19 +3324,46 @@ int KStore::_remove_collection(TransContext *txc, coll_t cid,
r = -ENOENT;
goto out;
}
pair<ghobject_t,OnodeRef> next;
while ((*c)->onode_map.get_next(next.first, &next)) {
if (next.second->exists) {
size_t nonexistent_count = 0;
pair<ghobject_t,OnodeRef> next_onode;
while ((*c)->onode_map.get_next(next_onode.first, &next_onode)) {
if (next_onode.second->exists) {
r = -ENOTEMPTY;
goto out;
}
++nonexistent_count;
}
vector<ghobject_t> ls;
ghobject_t next;
// Enumerate onodes in db, up to nonexistent_count + 1
// then check if all of them are marked as non-existent.
// Bypass the check if returned number is greater than nonexistent_count
r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(), true,
nonexistent_count + 1, &ls, &next);
if (r >= 0) {
bool exists = false; //ls.size() > nonexistent_count;
for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
dout(10) << __func__ << " oid " << *it << dendl;
auto onode = (*c)->onode_map.lookup(*it);
exists = !onode || onode->exists;
if (exists) {
dout(10) << __func__ << " " << *it
<< " exists in db" << dendl;
}
}
if (!exists) {
coll_map.erase(cid);
txc->removed_collections.push_back(*c);
c->reset();
txc->t->rmkey(PREFIX_COLL, stringify(cid));
r = 0;
} else {
dout(10) << __func__ << " " << cid
<< " is non-empty" << dendl;
r = -ENOTEMPTY;
}
}
coll_map.erase(cid);
txc->removed_collections.push_back(*c);
c->reset();
}
txc->t->rmkey(PREFIX_COLL, stringify(cid));
r = 0;
out:
dout(10) << __func__ << " " << cid << " = " << r << dendl;

View File

@ -393,6 +393,9 @@ private:
uint64_t offset, bufferlist& bl);
void _do_remove_stripe(TransContext *txc, OnodeRef o, uint64_t offset);
int _collection_list(Collection *c, ghobject_t start, ghobject_t end,
bool sort_bitwise, int max, vector<ghobject_t> *ls, ghobject_t *next);
public:
KStore(CephContext *cct, const string& path);
~KStore();
@ -465,10 +468,12 @@ public:
bool collection_exists(const coll_t& c);
int collection_empty(const coll_t& c, bool *empty);
using ObjectStore::collection_list;
int collection_list(const coll_t& cid, ghobject_t start, ghobject_t end,
bool sort_bitwise, int max,
vector<ghobject_t> *ls, ghobject_t *next);
bool sort_bitwise, int max,
vector<ghobject_t> *ls, ghobject_t *next) override;
int collection_list(CollectionHandle &c, ghobject_t start, ghobject_t end,
bool sort_bitwise, int max,
vector<ghobject_t> *ls, ghobject_t *next) override;
using ObjectStore::omap_get;
int omap_get(