Merge pull request #11398 from ifed01/wip-bluestore-rmcoll-fix

os/bluestore: fix remove_collection to properly detect collection e…

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2016-10-14 13:41:26 -05:00 committed by GitHub
commit d620aad662
3 changed files with 190 additions and 112 deletions

View File

@ -5419,7 +5419,27 @@ int BlueStore::collection_list(
{
Collection *c = static_cast<Collection*>(c_.get());
dout(15) << __func__ << " " << c->cid
<< " start " << start << " end " << end << " max " << max << dendl;
<< " start " << start << " end " << end << " max " << max << dendl;
int r;
{
RWLock::RLocker l(c->lock);
r = _collection_list(c, start, end, sort_bitwise, max, ls, pnext);
}
c->trim_cache();
dout(10) << __func__ << " " << c->cid
<< " start " << start << " end " << end << " max " << max
<< " = " << r << ", ls.size() = " << ls->size()
<< ", next = " << (pnext ? *pnext : ghobject_t()) << dendl;
return r;
}
int BlueStore::_collection_list(
Collection* c, ghobject_t start, ghobject_t end,
bool sort_bitwise, int max,
vector<ghobject_t> *ls, ghobject_t *pnext)
{
if (!c->exists)
return -ENOENT;
if (!sort_bitwise)
@ -5427,113 +5447,105 @@ int BlueStore::collection_list(
int r = 0;
ghobject_t static_next;
{
RWLock::RLocker l(c->lock);
KeyValueDB::Iterator it;
string temp_start_key, temp_end_key;
string start_key, end_key;
bool set_next = false;
string pend;
bool temp;
KeyValueDB::Iterator it;
string temp_start_key, temp_end_key;
string start_key, end_key;
bool set_next = false;
string pend;
bool temp;
if (!pnext)
pnext = &static_next;
if (!pnext)
pnext = &static_next;
if (start == ghobject_t::get_max() ||
start.hobj.is_max()) {
*pnext = ghobject_t::get_max();
goto out;
}
get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
&start_key, &end_key);
dout(20) << __func__
<< " range " << pretty_binary_string(temp_start_key)
<< " to " << pretty_binary_string(temp_end_key)
<< " and " << pretty_binary_string(start_key)
<< " to " << pretty_binary_string(end_key)
<< " start " << start << dendl;
it = db->get_iterator(PREFIX_OBJ);
if (start == ghobject_t() ||
start.hobj == hobject_t() ||
start == c->cid.get_min_hobj()) {
it->upper_bound(temp_start_key);
if (start == ghobject_t::get_max() ||
start.hobj.is_max()) {
*pnext = ghobject_t::get_max();
goto out;
}
get_coll_key_range(c->cid, c->cnode.bits, &temp_start_key, &temp_end_key,
&start_key, &end_key);
dout(20) << __func__
<< " range " << pretty_binary_string(temp_start_key)
<< " to " << pretty_binary_string(temp_end_key)
<< " and " << pretty_binary_string(start_key)
<< " to " << pretty_binary_string(end_key)
<< " start " << start << dendl;
it = db->get_iterator(PREFIX_OBJ);
if (start == ghobject_t() ||
start.hobj == hobject_t() ||
start == c->cid.get_min_hobj()) {
it->upper_bound(temp_start_key);
temp = true;
} else {
string k;
get_object_key(start, &k);
if (start.hobj.is_temp()) {
temp = true;
assert(k >= temp_start_key && k < temp_end_key);
} else {
string k;
get_object_key(start, &k);
if (start.hobj.is_temp()) {
temp = true;
assert(k >= temp_start_key && k < temp_end_key);
} else {
temp = false;
assert(k >= start_key && k < end_key);
}
dout(20) << " start from " << pretty_binary_string(k)
<< " temp=" << (int)temp << dendl;
it->lower_bound(k);
temp = false;
assert(k >= start_key && k < end_key);
}
if (end.hobj.is_max()) {
dout(20) << " start from " << pretty_binary_string(k)
<< " temp=" << (int)temp << dendl;
it->lower_bound(k);
}
if (end.hobj.is_max()) {
pend = temp ? temp_end_key : end_key;
} else {
get_object_key(end, &end_key);
if (end.hobj.is_temp()) {
if (temp)
pend = end_key;
else
goto out;
} else {
pend = temp ? temp_end_key : end_key;
} else {
get_object_key(end, &end_key);
if (end.hobj.is_temp()) {
if (temp)
pend = end_key;
else
goto out;
} else {
pend = temp ? temp_end_key : end_key;
}
}
dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
while (true) {
if (!it->valid() || it->key() > pend) {
if (!it->valid())
dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
else
dout(20) << __func__ << " key " << pretty_binary_string(it->key())
<< " > " << end << dendl;
if (temp) {
if (end.hobj.is_temp()) {
break;
}
dout(30) << __func__ << " switch to non-temp namespace" << dendl;
temp = false;
it->upper_bound(start_key);
pend = end_key;
dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
continue;
}
break;
}
dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
if (is_extent_shard_key(it->key())) {
it->next();
continue;
}
ghobject_t oid;
int r = get_key_object(it->key(), &oid);
assert(r == 0);
if (ls->size() >= (unsigned)max) {
dout(20) << __func__ << " reached max " << max << dendl;
*pnext = oid;
set_next = true;
break;
}
ls->push_back(oid);
it->next();
}
if (!set_next) {
*pnext = ghobject_t::get_max();
}
}
dout(20) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
while (true) {
if (!it->valid() || it->key() > pend) {
if (!it->valid())
dout(20) << __func__ << " iterator not valid (end of db?)" << dendl;
else
dout(20) << __func__ << " key " << pretty_binary_string(it->key())
<< " > " << end << dendl;
if (temp) {
if (end.hobj.is_temp()) {
break;
}
dout(30) << __func__ << " switch to non-temp namespace" << dendl;
temp = false;
it->upper_bound(start_key);
pend = end_key;
dout(30) << __func__ << " pend " << pretty_binary_string(pend) << dendl;
continue;
}
break;
}
dout(20) << __func__ << " key " << pretty_binary_string(it->key()) << dendl;
if (is_extent_shard_key(it->key())) {
it->next();
continue;
}
ghobject_t oid;
int r = get_key_object(it->key(), &oid);
assert(r == 0);
if (ls->size() >= (unsigned)max) {
dout(20) << __func__ << " reached max " << max << dendl;
*pnext = oid;
set_next = true;
break;
}
ls->push_back(oid);
it->next();
}
if (!set_next) {
*pnext = ghobject_t::get_max();
}
out:
c->trim_cache();
dout(10) << __func__ << " " << c->cid
<< " start " << start << " end " << end << " max " << max
<< " = " << r << ", ls.size() = " << ls->size()
<< ", next = " << *pnext << dendl;
out:
return r;
}
@ -8680,25 +8692,54 @@ int BlueStore::_remove_collection(TransContext *txc, coll_t cid,
r = -ENOENT;
goto out;
}
size_t nonexistent_count = 0;
assert((*c)->exists);
if ((*c)->onode_map.map_any([&](OnodeRef o) {
if (o->exists) {
dout(10) << __func__ << " " << o->oid << " " << o
<< " exists in onode_map" << dendl;
return true;
}
return false;
})) {
if (o->exists) {
dout(10) << __func__ << " " << o->oid << " " << o
<< " exists in onode_map" << dendl;
return true;
}
++nonexistent_count;
return false;
})) {
r = -ENOTEMPTY;
goto out;
}
coll_map.erase(cid);
txc->removed_collections.push_back(*c);
(*c)->exists = false;
c->reset();
vector<ghobject_t> ls;
ghobject_t next;
//Enumerate onodes in db, up to nonexistent_count + 1
// then check if all of them are marked as non-existent.
// Bypass the check if returned number is greater than nonexistent_count
r = _collection_list(c->get(), ghobject_t(), ghobject_t::get_max(), true, nonexistent_count + 1,
&ls, &next);
if (r >= 0) {
bool exists = false; //ls.size() > nonexistent_count;
for (auto it = ls.begin(); !exists && it < ls.end(); ++it) {
dout(10) << __func__ << " oid " << *it << dendl;
auto onode = (*c)->onode_map.lookup(*it);
exists = !onode || onode->exists;
if (exists) {
dout(10) << __func__ << " " << *it
<< " exists in db" << dendl;
}
}
if (!exists) {
coll_map.erase(cid);
txc->removed_collections.push_back(*c);
(*c)->exists = false;
c->reset();
txc->t->rmkey(PREFIX_COLL, stringify(cid));
r = 0;
} else {
dout(10) << __func__ << " " << cid
<< " is non-empty" << dendl;
r = -ENOTEMPTY;
}
}
}
txc->t->rmkey(PREFIX_COLL, stringify(cid));
r = 0;
out:
dout(10) << __func__ << " " << cid << " = " << r << dendl;

View File

@ -1572,6 +1572,11 @@ private:
b->shared_blob->bc.write(txc->seq, offset, bl, flags);
txc->shared_blobs_written.insert(b->shared_blob);
}
int _collection_list(Collection *c, ghobject_t start, ghobject_t end,
bool sort_bitwise, int max, vector<ghobject_t> *ls, ghobject_t *next);
public:
BlueStore(CephContext *cct, const string& path);
~BlueStore();

View File

@ -2516,6 +2516,7 @@ TEST_P(StoreTest, SimpleCloneTest) {
ghobject_t hoid2(hobject_t(sobject_t("Object 2", CEPH_NOSNAP),
"key", 123, -1, ""));
ghobject_t hoid3(hobject_t(sobject_t("Object 3", CEPH_NOSNAP)));
{
ObjectStore::Transaction t;
t.clone(cid, hoid, hoid2);
@ -2758,9 +2759,40 @@ TEST_P(StoreTest, SimpleCloneTest) {
ASSERT_TRUE(bl_eq(rl, final));
}
{
//verify if non-empty collection is properly handled after store reload
r = store->umount();
ASSERT_EQ(r, 0);
r = store->mount();
ASSERT_EQ(r, 0);
ObjectStore::Transaction t;
t.remove_collection(cid);
cerr << "Invalid rm coll" << std::endl;
EXPECT_DEATH(apply_transaction(store, &osr, std::move(t)), ".*Directory not empty.*");
}
{
//verify if non-empty collection is properly handled when there are some pending removes and live records in db
cerr << "Invalid rm coll again" << std::endl;
ObjectStore::Transaction t;
t.touch(cid, hoid3); //new record in db
t.remove(cid, hoid);
t.remove(cid, hoid2);
r = apply_transaction(store, &osr, std::move(t));
ASSERT_EQ(r, 0);
r = store->umount();
ASSERT_EQ(r, 0);
r = store->mount();
ASSERT_EQ(r, 0);
t.remove_collection(cid);
EXPECT_DEATH(apply_transaction(store, &osr, std::move(t)), ".*Directory not empty.*");
}
{
ObjectStore::Transaction t;
t.remove(cid, hoid3);
t.remove_collection(cid);
cerr << "Cleaning" << std::endl;
r = apply_transaction(store, &osr, std::move(t));