os/filestore: (re)implement merge

Merging is a bit different then splitting, because the two collections
may already be hashed at different levels.  Since lookup etc rely on the
idea that the object is always at the deepest level of hashing, if you
merge collections with different levels that share some common bit prefix
then some objects will end up higher up the hierarchy even though deeper
hashed directories exist.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2018-08-04 13:51:05 -05:00
parent 2465df57b7
commit 8a1100bf59
7 changed files with 387 additions and 13 deletions

View File

@ -162,6 +162,11 @@ protected:
CollectionIndex* dest //< [in] destination index
) { ceph_abort(); return 0; }
virtual int merge(
uint32_t bits, //< [in] common (target) bits
CollectionIndex* dest //< [in] destination index
) { ceph_abort(); return 0; }
/// List contents of collection by hash
virtual int collection_list_partial(

View File

@ -5753,12 +5753,12 @@ int FileStore::_merge_collection(const coll_t& cid,
if (!collection_exists(cid)) {
dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
assert(replaying);
ceph_assert(replaying);
return 0;
}
if (!collection_exists(dest)) {
dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
assert(replaying);
ceph_assert(replaying);
return 0;
}
@ -5766,19 +5766,79 @@ int FileStore::_merge_collection(const coll_t& cid,
if (_check_replay_guard(cid, spos) > 0)
_collection_set_bits(dest, bits);
// move everything
spg_t pgid;
bool is_pg = dest.is_pg(&pgid);
assert(is_pg);
r = _split_collection(cid, bits, pgid.pgid.ps(), dest, spos);
if (r < 0)
return r;
ceph_assert(is_pg);
// temp too!
r = _split_collection(cid.get_temp(), bits, pgid.pgid.ps(), dest.get_temp(),
spos);
if (r < 0)
return r;
{
int dstcmp = _check_replay_guard(dest, spos);
if (dstcmp < 0)
return 0;
int srccmp = _check_replay_guard(cid, spos);
if (srccmp < 0)
return 0;
_set_global_replay_guard(cid, spos);
_set_replay_guard(cid, spos, true);
_set_replay_guard(dest, spos, true);
Index from;
r = get_index(cid, &from);
Index to;
if (!r)
r = get_index(dest, &to);
if (!r) {
ceph_assert(from.index);
RWLock::WLocker l1((from.index)->access_lock);
ceph_assert(to.index);
RWLock::WLocker l2((to.index)->access_lock);
r = from->merge(bits, to.index);
}
_close_replay_guard(cid, spos);
_close_replay_guard(dest, spos);
}
// temp too
{
int dstcmp = _check_replay_guard(dest.get_temp(), spos);
if (dstcmp < 0)
return 0;
int srccmp = _check_replay_guard(cid.get_temp(), spos);
if (srccmp < 0)
return 0;
_set_global_replay_guard(cid.get_temp(), spos);
_set_replay_guard(cid.get_temp(), spos, true);
_set_replay_guard(dest.get_temp(), spos, true);
Index from;
r = get_index(cid.get_temp(), &from);
Index to;
if (!r)
r = get_index(dest.get_temp(), &to);
if (!r) {
ceph_assert(from.index);
RWLock::WLocker l1((from.index)->access_lock);
ceph_assert(to.index);
RWLock::WLocker l2((to.index)->access_lock);
r = from->merge(bits, to.index);
}
_close_replay_guard(cid.get_temp(), spos);
_close_replay_guard(dest.get_temp(), spos);
}
// remove source
if (_check_replay_guard(cid, spos) > 0)
@ -5802,7 +5862,7 @@ int FileStore::_merge_collection(const coll_t& cid,
if (!i->match(bits, pgid.pgid.ps())) {
dout(20) << __FUNC__ << ": " << *i << " does not belong in "
<< cid << dendl;
assert(i->match(bits, pgid.pgid.ps()));
ceph_assert(i->match(bits, pgid.pgid.ps()));
}
}
objects.clear();

View File

@ -292,12 +292,128 @@ int HashIndex::col_split_level(
return 0;
}
int HashIndex::_merge(
uint32_t bits,
CollectionIndex* dest) {
dout(20) << __func__ << " bits " << bits << dendl;
ceph_assert(collection_version() == dest->collection_version());
vector<string> emptypath;
// pre-split to common/target level so that any shared prefix DIR_?
// directories already exist at the destination. Since each
// directory is a nibble (4 bits),
unsigned shared = bits / 4;
dout(20) << __func__ << " pre-splitting to shared level " << shared << dendl;
if (shared) {
split_dirs(emptypath, shared);
((HashIndex*)dest)->split_dirs(emptypath, shared);
}
// now merge the contents
_merge_dirs(*this, *(HashIndex*)dest, emptypath);
return 0;
}
int HashIndex::_merge_dirs(
HashIndex& from,
HashIndex& to,
const vector<string>& path)
{
dout(20) << __func__ << " path " << path << dendl;
int r;
vector<string> src_subs, dst_subs;
r = from.list_subdirs(path, &src_subs);
if (r < 0) {
lgeneric_subdout(g_ceph_context,filestore,20) << __func__
<< " r " << r << " from "
<< "from.list_subdirs"
<< dendl;
return r;
}
r = to.list_subdirs(path, &dst_subs);
if (r < 0) {
lgeneric_subdout(g_ceph_context,filestore,20) << __func__
<< " r " << r << " from "
<< "to.list_subdirs"
<< dendl;
return r;
}
for (auto& i : src_subs) {
if (std::find(dst_subs.begin(), dst_subs.end(), i) == dst_subs.end()) {
// move it
r = move_subdir(from, to, path, i);
if (r < 0) {
lgeneric_subdout(g_ceph_context,filestore,20) << __func__
<< " r " << r << " from "
<< "move_subdir(...,"
<< path << "," << i << ")"
<< dendl;
return r;
}
} else {
// common, recurse!
vector<string> nested = path;
nested.push_back(i);
r = _merge_dirs(from, to, nested);
if (r < 0) {
lgeneric_subdout(g_ceph_context,filestore,20) << __func__
<< " r " << r << " from "
<< "rec _merge_dirs"
<< dendl;
return r;
}
// now remove it
r = remove_path(nested);
if (r < 0) {
lgeneric_subdout(g_ceph_context,filestore,20) << __func__
<< " r " << r << " from "
<< "remove_path "
<< nested
<< dendl;
return r;
}
}
}
// objects
map<string, ghobject_t> objects;
r = from.list_objects(path, 0, 0, &objects);
if (r < 0) {
lgeneric_subdout(g_ceph_context,filestore,20) << __func__
<< " r " << r << " from "
<< "from.list_objects"
<< dendl;
return r;
}
for (auto& i : objects) {
r = move_object(from, to, path, i);
if (r < 0) {
lgeneric_subdout(g_ceph_context,filestore,20) << __func__
<< " r " << r << " from "
<< "move_object(...,"
<< path << "," << i << ")"
<< dendl;
return r;
}
}
return 0;
}
int HashIndex::_split(
uint32_t match,
uint32_t bits,
CollectionIndex* dest) {
ceph_assert(collection_version() == dest->collection_version());
unsigned mkdirred = 0;
return col_split_level(
*this,
*static_cast<HashIndex*>(dest),

View File

@ -194,6 +194,17 @@ public:
CollectionIndex* dest
) override;
/// @see CollectionIndex
int _merge(
uint32_t bits,
CollectionIndex* dest
) override;
int _merge_dirs(
HashIndex& from,
HashIndex& to,
const vector<string>& path);
/// @see CollectionIndex
int apply_layout_settings(int target_level) override;

View File

@ -198,6 +198,10 @@ public:
uint32_t bits, //< [in] bits to check
CollectionIndex* dest //< [in] destination index
) = 0;
virtual int _merge(
uint32_t bits, //< [in] bits for target
CollectionIndex* dest //< [in] destination index
) = 0;
/// @see CollectionIndex
int split(
@ -211,6 +215,17 @@ public:
);
}
/// @see CollectionIndex
int merge(
uint32_t bits,
CollectionIndex* dest
) override {
WRAP_RETRY(
r = _merge(bits, dest);
goto out;
);
}
/**
* Returns the length of the longest escaped name which could result
* from any clone, shard, or rollback object of this object

View File

@ -5216,6 +5216,169 @@ TEST_P(StoreTest, ColSplitTest3) {
}
#endif
void test_merge_skewed(ObjectStore *store,
unsigned base, unsigned bits,
unsigned anum, unsigned bnum)
{
cout << __func__ << " 0x" << std::hex << base << std::dec
<< " bits " << bits
<< " anum " << anum << " bnum " << bnum << std::endl;
/*
make merge source pgs have radically different # of objects in them,
which should trigger different splitting in filestore, and verify that
post-merge all objects are accessible.
*/
int r;
coll_t a(spg_t(pg_t(base, 0), shard_id_t::NO_SHARD));
coll_t b(spg_t(pg_t(base | (1<<bits), 0), shard_id_t::NO_SHARD));
auto cha = store->create_new_collection(a);
auto chb = store->create_new_collection(b);
{
ObjectStore::Transaction t;
t.create_collection(a, bits + 1);
r = queue_transaction(store, cha, std::move(t));
ASSERT_EQ(r, 0);
}
{
ObjectStore::Transaction t;
t.create_collection(b, bits + 1);
r = queue_transaction(store, chb, std::move(t));
ASSERT_EQ(r, 0);
}
bufferlist small;
small.append("small");
string suffix = "ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooaaaaaaaaaa";
set<ghobject_t> aobjects, bobjects;
{
// fill a
ObjectStore::Transaction t;
for (unsigned i = 0; i < 1000; ++i) {
string objname = "a" + stringify(i) + suffix;
ghobject_t o(hobject_t(
objname,
"",
CEPH_NOSNAP,
i<<(bits+1) | base,
52, ""));
aobjects.insert(o);
t.write(a, o, 0, small.length(), small, 0);
if (i % 100) {
r = queue_transaction(store, cha, std::move(t));
ASSERT_EQ(r, 0);
t = ObjectStore::Transaction();
}
}
r = queue_transaction(store, cha, std::move(t));
ASSERT_EQ(r, 0);
}
{
// fill b
ObjectStore::Transaction t;
for (unsigned i = 0; i < 10; ++i) {
string objname = "b" + stringify(i) + suffix;
ghobject_t o(hobject_t(
objname,
"",
CEPH_NOSNAP,
(i<<(base+1)) | base | (1<<bits),
52, ""));
bobjects.insert(o);
t.write(b, o, 0, small.length(), small, 0);
if (i % 100) {
r = queue_transaction(store, chb, std::move(t));
ASSERT_EQ(r, 0);
t = ObjectStore::Transaction();
}
}
r = queue_transaction(store, chb, std::move(t));
ASSERT_EQ(r, 0);
}
// merge b->a
{
ObjectStore::Transaction t;
t.merge_collection(b, a, bits);
r = queue_transaction(store, cha, std::move(t));
ASSERT_EQ(r, 0);
}
// verify
{
vector<ghobject_t> got;
store->collection_list(cha, ghobject_t(), ghobject_t::get_max(), INT_MAX,
&got, 0);
set<ghobject_t> gotset;
for (auto& o : got) {
ASSERT_TRUE(aobjects.count(o) || bobjects.count(o));
gotset.insert(o);
}
// check both listing and stat-ability (different code paths!)
struct stat st;
for (auto& o : aobjects) {
ASSERT_TRUE(gotset.count(o));
int r = store->stat(cha, o, &st, false);
ASSERT_EQ(r, 0);
}
for (auto& o : bobjects) {
ASSERT_TRUE(gotset.count(o));
int r = store->stat(cha, o, &st, false);
ASSERT_EQ(r, 0);
}
}
// clean up
{
ObjectStore::Transaction t;
for (auto &o : aobjects) {
t.remove(a, o);
}
r = queue_transaction(store, cha, std::move(t));
ASSERT_EQ(r, 0);
}
{
ObjectStore::Transaction t;
for (auto &o : bobjects) {
t.remove(a, o);
}
t.remove_collection(a);
r = queue_transaction(store, cha, std::move(t));
ASSERT_EQ(r, 0);
}
}
TEST_P(StoreTest, MergeSkewed) {
if (string(GetParam()) != "filestore")
return;
// this is sufficient to exercise merges with different hashing levels
test_merge_skewed(store.get(), 0xf, 4, 10, 10000);
test_merge_skewed(store.get(), 0xf, 4, 10000, 10);
/*
// this covers a zillion variations that all boil down to the same thing
for (unsigned base = 3; base < 0x1000; base *= 5) {
unsigned bits;
unsigned t = base;
for (bits = 0; t; t >>= 1) {
++bits;
}
for (unsigned b = bits; b < bits + 10; b += 3) {
for (auto anum : { 10, 1000, 10000 }) {
for (auto bnum : { 10, 1000, 10000 }) {
if (anum == bnum) {
continue;
}
test_merge_skewed(store.get(), base, b, anum, bnum);
}
}
}
}
*/
}
/**
* This test tests adding two different groups
* of objects, each with 1 common prefix and 1

View File

@ -46,6 +46,10 @@ public:
uint32_t bits,
CollectionIndex* dest
) override { return 0; }
int _merge(
uint32_t bits,
CollectionIndex* dest
) override { return 0; }
void test_generate_and_parse(const ghobject_t &hoid, const std::string &mangled_expected) {
const std::string mangled_name = lfn_generate_object_name(hoid);