FileStore: Index caching is introduced for performance improvement

IndexManager now has a Index caching. Index will only be created if not
found in the cache. Earlier, each op is creating an Index object and other
ops requesting the same index needed to wait till previous op is done.
Also, after finishing lookup, this Index object was destroyed.
Now, a Index cache is been implemented to persists these Indexes since
there is a major performance hit because each op is creating and destroying
these. A RWlock is been introduced in the CollectionIndex class and that is
responsible for sync between lookup and create.
Also, since these Index objects are persistent there is no need to use
smart pointers. So, Index is a wrapper class of CollecIndex* now.
It is the responsibility of the users of Index now to lock explicitely
before using them. Index object is sufficient now for locking and no need
to hold IndexPath for locking. The function interfaces of lfn_open,lfn_find
are changed accordingly.

Signed-off-by: Somnath Roy <somnath.roy@sandisk.com>
This commit is contained in:
Somnath Roy 2014-06-30 01:28:07 -07:00
parent b04d84db8c
commit 78d70daff4
13 changed files with 395 additions and 227 deletions

View File

@ -21,6 +21,7 @@
#include "osd/osd_types.h"
#include "include/object.h"
#include "common/RWLock.h"
/**
* CollectionIndex provides an interface for manipulating indexed collections
@ -43,14 +44,14 @@ protected:
/// Returned path
string full_path;
/// Ref to parent Index
ceph::shared_ptr<CollectionIndex> parent_ref;
CollectionIndex* parent_ref;
/// coll_t for parent Index
coll_t parent_coll;
/// Normal Constructor
Path(
string path, ///< [in] Path to return.
ceph::weak_ptr<CollectionIndex> ref) ///< [in] weak_ptr to parent.
CollectionIndex* ref)
: full_path(path), parent_ref(ref), parent_coll(parent_ref->coll()) {}
/// Debugging Constructor
@ -66,11 +67,13 @@ protected:
coll_t coll() const { return parent_coll; }
/// Getter for parent
ceph::shared_ptr<CollectionIndex> get_index() const {
CollectionIndex* get_index() const {
return parent_ref;
}
};
public:
RWLock access_lock;
/// Type of returned paths
typedef ceph::shared_ptr<Path> IndexedPath;
@ -94,12 +97,6 @@ protected:
*/
virtual coll_t coll() const = 0;
/**
* For setting the internal weak_ptr to a shared_ptr to this.
*
* @see IndexManager
*/
virtual void set_ref(ceph::shared_ptr<CollectionIndex> ref) = 0;
/**
* Initializes the index.
@ -161,7 +158,7 @@ protected:
virtual int split(
uint32_t match, //< [in] value to match
uint32_t bits, //< [in] bits to check
ceph::shared_ptr<CollectionIndex> dest //< [in] destination index
CollectionIndex* dest //< [in] destination index
) { assert(0); return 0; }
@ -183,6 +180,8 @@ protected:
/// Call prior to removing directory
virtual int prep_delete() { return 0; }
CollectionIndex():access_lock("CollectionIndex::access_lock"){}
/// Virtual destructor
virtual ~CollectionIndex() {}
};

View File

@ -147,9 +147,7 @@ int FileStore::get_cdir(coll_t cid, char *s, int len)
int FileStore::get_index(coll_t cid, Index *index)
{
char path[PATH_MAX];
get_cdir(cid, path, sizeof(path));
int r = index_manager.get_index(cid, path, index);
int r = index_manager.get_index(cid, basedir, index);
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
@ -163,15 +161,14 @@ int FileStore::init_index(coll_t cid)
return r;
}
int FileStore::lfn_find(coll_t cid, const ghobject_t& oid, IndexedPath *path)
int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
{
Index index;
IndexedPath path2;
if (!path)
path = &path2;
int r, exist;
r = get_index(cid, &index);
if (r < 0)
return r;
r = index->lookup(oid, path, &exist);
assert(NULL != index.index);
r = (index.index)->lookup(oid, path, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
@ -183,9 +180,8 @@ int FileStore::lfn_find(coll_t cid, const ghobject_t& oid, IndexedPath *path)
int FileStore::lfn_truncate(coll_t cid, const ghobject_t& oid, off_t length)
{
IndexedPath path;
FDRef fd;
int r = lfn_open(cid, oid, false, &fd, &path);
int r = lfn_open(cid, oid, false, &fd);
if (r < 0)
return r;
r = ::ftruncate(**fd, length);
@ -202,7 +198,15 @@ int FileStore::lfn_truncate(coll_t cid, const ghobject_t& oid, off_t length)
int FileStore::lfn_stat(coll_t cid, const ghobject_t& oid, struct stat *buf)
{
IndexedPath path;
int r = lfn_find(cid, oid, &path);
Index index;
int r = get_index(cid, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = lfn_find(oid, index, &path);
if (r < 0)
return r;
r = ::stat(path->path(), buf);
@ -215,23 +219,27 @@ int FileStore::lfn_open(coll_t cid,
const ghobject_t& oid,
bool create,
FDRef *outfd,
IndexedPath *path,
Index *index)
Index *index)
{
assert(get_allow_sharded_objects() ||
( oid.shard_id == shard_id_t::NO_SHARD &&
oid.generation == ghobject_t::NO_GEN ));
assert(outfd);
bool need_lock = true;
int flags = O_RDWR;
if (create)
flags |= O_CREAT;
Index index2;
if (!index) {
index = &index2;
}
int r = 0;
if (!(*index)) {
if (!((*index).index)) {
r = get_index(cid, index);
} else {
need_lock = false;
}
int fd, exist;
@ -241,47 +249,49 @@ int FileStore::lfn_open(coll_t cid,
return 0;
}
{
IndexedPath path2;
if (!path)
path = &path2;
if (r < 0) {
derr << "error getting collection index for " << cid
<< ": " << cpp_strerror(-r) << dendl;
goto fail;
}
r = (*index)->lookup(oid, path, &exist);
if (r < 0) {
derr << "could not find " << oid << " in index: "
<< cpp_strerror(-r) << dendl;
goto fail;
}
assert(NULL != (*index).index);
if (need_lock) {
((*index).index)->access_lock.get_write();
}
r = ::open((*path)->path(), flags, 0644);
IndexedPath path2;
IndexedPath *path = &path2;
if (r < 0) {
derr << "error getting collection index for " << cid
<< ": " << cpp_strerror(-r) << dendl;
goto fail;
}
r = (*index)->lookup(oid, path, &exist);
if (r < 0) {
derr << "could not find " << oid << " in index: "
<< cpp_strerror(-r) << dendl;
goto fail;
}
r = ::open((*path)->path(), flags, 0644);
if (r < 0) {
r = -errno;
dout(10) << "error opening file " << (*path)->path() << " with flags="
<< flags << ": " << cpp_strerror(-r) << dendl;
goto fail;
}
fd = r;
if (create && (!exist)) {
r = (*index)->created(oid, (*path)->path());
if (r < 0) {
r = -errno;
dout(10) << "error opening file " << (*path)->path() << " with flags="
<< flags << ": " << cpp_strerror(-r) << dendl;
VOID_TEMP_FAILURE_RETRY(::close(fd));
derr << "error creating " << oid << " (" << (*path)->path()
<< ") in index: " << cpp_strerror(-r) << dendl;
goto fail;
}
fd = r;
if (create && (!exist)) {
r = (*index)->created(oid, (*path)->path());
if (r < 0) {
VOID_TEMP_FAILURE_RETRY(::close(fd));
derr << "error creating " << oid << " (" << (*path)->path()
<< ") in index: " << cpp_strerror(-r) << dendl;
goto fail;
}
r = chain_fsetxattr(fd, XATTR_SPILL_OUT_NAME,
XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
if (r < 0) {
VOID_TEMP_FAILURE_RETRY(::close(fd));
derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
<< "):" << cpp_strerror(-r) << dendl;
goto fail;
}
r = chain_fsetxattr(fd, XATTR_SPILL_OUT_NAME,
XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
if (r < 0) {
VOID_TEMP_FAILURE_RETRY(::close(fd));
derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
<< "):" << cpp_strerror(-r) << dendl;
goto fail;
}
}
@ -290,14 +300,23 @@ int FileStore::lfn_open(coll_t cid,
*outfd = fdcache.add(oid, fd, &existed);
if (existed) {
TEMP_FAILURE_RETRY(::close(fd));
return 0;
}
} else {
*outfd = FDRef(new FDCache::FD(fd));
}
if (need_lock) {
((*index).index)->access_lock.put_write();
}
return 0;
fail:
if (need_lock) {
((*index).index)->access_lock.put_write();
}
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
@ -312,6 +331,7 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const ghobject_t& o, const ghob
IndexedPath path_new, path_old;
int exist;
int r;
bool index_same = false;
if (c < newcid) {
r = get_index(newcid, &index_new);
if (r < 0)
@ -324,6 +344,7 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const ghobject_t& o, const ghob
if (r < 0)
return r;
index_new = index_old;
index_same = true;
} else {
r = get_index(c, &index_old);
if (r < 0)
@ -333,33 +354,73 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const ghobject_t& o, const ghob
return r;
}
r = index_old->lookup(o, &path_old, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
if (!exist)
return -ENOENT;
assert(NULL != index_old.index);
assert(NULL != index_new.index);
r = index_new->lookup(newoid, &path_new, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
if (exist)
return -EEXIST;
if (!index_same) {
dout(25) << "lfn_link path_old: " << path_old << dendl;
dout(25) << "lfn_link path_new: " << path_new << dendl;
r = ::link(path_old->path(), path_new->path());
if (r < 0)
return -errno;
RWLock::RLocker l1((index_old.index)->access_lock);
r = index_new->created(newoid, path_new->path());
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
r = index_old->lookup(o, &path_old, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
if (!exist)
return -ENOENT;
RWLock::WLocker l2((index_new.index)->access_lock);
r = index_new->lookup(newoid, &path_new, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
if (exist)
return -EEXIST;
dout(25) << "lfn_link path_old: " << path_old << dendl;
dout(25) << "lfn_link path_new: " << path_new << dendl;
r = ::link(path_old->path(), path_new->path());
if (r < 0)
return -errno;
r = index_new->created(newoid, path_new->path());
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
} else {
RWLock::WLocker l1((index_old.index)->access_lock);
r = index_old->lookup(o, &path_old, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
if (!exist)
return -ENOENT;
r = index_new->lookup(newoid, &path_new, &exist);
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
if (exist)
return -EEXIST;
dout(25) << "lfn_link path_old: " << path_old << dendl;
dout(25) << "lfn_link path_new: " << path_new << dendl;
r = ::link(path_old->path(), path_new->path());
if (r < 0)
return -errno;
r = index_new->created(newoid, path_new->path());
if (r < 0) {
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
}
return 0;
}
@ -371,6 +432,10 @@ int FileStore::lfn_unlink(coll_t cid, const ghobject_t& o,
int r = get_index(cid, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::WLocker l((index.index)->access_lock);
{
IndexedPath path;
int exist;
@ -1441,6 +1506,9 @@ int FileStore::mount()
<< " with error: " << ret << dendl;
goto close_current_fd;
}
assert(NULL != index.index);
RWLock::WLocker l((index.index)->access_lock);
index->cleanup();
}
}
@ -2880,12 +2948,14 @@ int FileStore::_clone(coll_t cid, const ghobject_t& oldoid, const ghobject_t& ne
FDRef o, n;
{
Index index;
IndexedPath from, to;
r = lfn_open(cid, oldoid, false, &o, &from, &index);
r = lfn_open(cid, oldoid, false, &o, &index);
if (r < 0) {
goto out2;
}
r = lfn_open(cid, newoid, true, &n, &to, &index);
assert(NULL != (index.index));
RWLock::WLocker l((index.index)->access_lock);
r = lfn_open(cid, newoid, true, &n, &index);
if (r < 0) {
goto out;
}
@ -3636,6 +3706,9 @@ int FileStore::getattr(coll_t cid, const ghobject_t& oid, const char *name, buff
dout(10) << __func__ << " could not get index r = " << r << dendl;
goto out;
}
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = object_map->get_xattrs(oid, to_get, &got);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " get_xattrs err r =" << r << dendl;
@ -3695,19 +3768,24 @@ int FileStore::getattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>
dout(10) << __func__ << " could not get index r = " << r << dendl;
goto out;
}
r = object_map->get_all_xattrs(oid, &omap_attrs);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
goto out;
}
{
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
goto out;
r = object_map->get_all_xattrs(oid, &omap_attrs);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
goto out;
}
r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
goto out;
}
if (r == -ENOENT)
r = 0;
}
if (r == -ENOENT)
r = 0;
assert(omap_attrs.size() == omap_aset.size());
for (map<string, bufferlist>::iterator i = omap_aset.begin();
i != omap_aset.end();
@ -3857,6 +3935,9 @@ int FileStore::_rmattr(coll_t cid, const ghobject_t& oid, const char *name,
dout(10) << __func__ << " could not get index r = " << r << dendl;
goto out_close;
}
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
set<string> to_remove;
to_remove.insert(string(name));
r = object_map->remove_xattrs(oid, to_remove, &spos);
@ -3916,22 +3997,26 @@ int FileStore::_rmattrs(coll_t cid, const ghobject_t& oid,
dout(10) << __func__ << " could not get index r = " << r << dendl;
goto out_close;
}
r = object_map->get_all_xattrs(oid, &omap_attrs);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
assert(!m_filestore_fail_eio || r != -EIO);
goto out_close;
}
r = object_map->remove_xattrs(oid, omap_attrs, &spos);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not remove omap_attrs r = " << r << dendl;
goto out_close;
}
if (r == -ENOENT)
r = 0;
{
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
r = object_map->get_all_xattrs(oid, &omap_attrs);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not get omap_attrs r = " << r << dendl;
assert(!m_filestore_fail_eio || r != -EIO);
goto out_close;
}
r = object_map->remove_xattrs(oid, omap_attrs, &spos);
if (r < 0 && r != -ENOENT) {
dout(10) << __func__ << " could not remove omap_attrs r = " << r << dendl;
goto out_close;
}
if (r == -ENOENT)
r = 0;
chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
sizeof(XATTR_NO_SPILL_OUT));
}
out_close:
lfn_close(fd);
@ -4165,6 +4250,10 @@ int FileStore::collection_version_current(coll_t c, uint32_t *version)
int r = get_index(c, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::WLocker l((index.index)->access_lock);
*version = index->collection_version();
if (*version == target_version)
return 1;
@ -4259,6 +4348,10 @@ bool FileStore::collection_empty(coll_t c)
int r = get_index(c, &index);
if (r < 0)
return false;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
vector<ghobject_t> ls;
collection_list_handle_t handle;
r = index->collection_list_partial(ghobject_t(), 1, 1, 0, &ls, NULL);
@ -4312,6 +4405,10 @@ int FileStore::collection_list_partial(coll_t c, ghobject_t start,
int r = get_index(c, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = index->collection_list_partial(start,
min, max, seq,
ls, next);
@ -4330,6 +4427,10 @@ int FileStore::collection_list(coll_t c, vector<ghobject_t>& ls)
int r = get_index(c, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = index->collection_list(&ls);
assert(!m_filestore_fail_eio || r != -EIO);
return r;
@ -4340,8 +4441,15 @@ int FileStore::omap_get(coll_t c, const ghobject_t &hoid,
map<string, bufferlist> *out)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(c, hoid, &path);
Index index;
int r = get_index(c, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
r = object_map->get(hoid, header, out);
@ -4359,8 +4467,16 @@ int FileStore::omap_get_header(
bool allow_eio)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(c, hoid, &path);
Index index;
int r = get_index(c, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
r = object_map->get_header(hoid, bl);
@ -4374,8 +4490,16 @@ int FileStore::omap_get_header(
int FileStore::omap_get_keys(coll_t c, const ghobject_t &hoid, set<string> *keys)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(c, hoid, &path);
Index index;
int r = get_index(c, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
r = object_map->get_keys(hoid, keys);
@ -4391,8 +4515,16 @@ int FileStore::omap_get_values(coll_t c, const ghobject_t &hoid,
map<string, bufferlist> *out)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(c, hoid, &path);
Index index;
int r = get_index(c, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
r = object_map->get_values(hoid, keys, out);
@ -4408,8 +4540,17 @@ int FileStore::omap_check_keys(coll_t c, const ghobject_t &hoid,
set<string> *out)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(c, hoid, &path);
Index index;
int r = get_index(c, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
r = object_map->check_keys(hoid, keys, out);
@ -4424,8 +4565,16 @@ ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(coll_t c,
const ghobject_t &hoid)
{
dout(15) << __func__ << " " << c << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(c, hoid, &path);
Index index;
int r = get_index(c, &index);
if (r < 0)
return ObjectMap::ObjectMapIterator();
assert(NULL != index.index);
RWLock::RLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return ObjectMap::ObjectMapIterator();
return object_map->get_iterator(hoid);
@ -4477,6 +4626,9 @@ int FileStore::_destroy_collection(coll_t c)
int r = get_index(c, &from);
if (r < 0)
return r;
assert(NULL != from.index);
RWLock::WLocker l((from.index)->access_lock);
r = from->prep_delete();
if (r < 0)
return r;
@ -4646,8 +4798,16 @@ void FileStore::_inject_failure()
int FileStore::_omap_clear(coll_t cid, const ghobject_t &hoid,
const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(cid, hoid, &path);
Index index;
int r = get_index(cid, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::WLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
r = object_map->clear_keys_header(hoid, &spos);
@ -4660,8 +4820,16 @@ int FileStore::_omap_setkeys(coll_t cid, const ghobject_t &hoid,
const map<string, bufferlist> &aset,
const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(cid, hoid, &path);
Index index;
int r = get_index(cid, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::WLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
return object_map->set_keys(hoid, aset, &spos);
@ -4671,8 +4839,16 @@ int FileStore::_omap_rmkeys(coll_t cid, const ghobject_t &hoid,
const set<string> &keys,
const SequencerPosition &spos) {
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(cid, hoid, &path);
Index index;
int r = get_index(cid, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::WLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
r = object_map->rm_keys(hoid, keys, &spos);
@ -4703,8 +4879,16 @@ int FileStore::_omap_setheader(coll_t cid, const ghobject_t &hoid,
const SequencerPosition &spos)
{
dout(15) << __func__ << " " << cid << "/" << hoid << dendl;
IndexedPath path;
int r = lfn_find(cid, hoid, &path);
Index index;
int r = get_index(cid, &index);
if (r < 0)
return r;
assert(NULL != index.index);
RWLock::WLocker l((index.index)->access_lock);
r = lfn_find(hoid, index);
if (r < 0)
return r;
return object_map->set_header(hoid, bl, &spos);
@ -4751,8 +4935,15 @@ int FileStore::_split_collection(coll_t cid,
if (!r)
r = get_index(dest, &to);
if (!r)
r = from->split(rem, bits, to);
if (!r) {
assert(NULL != from.index);
RWLock::WLocker l1((from.index)->access_lock);
assert(NULL != to.index);
RWLock::WLocker l2((to.index)->access_lock);
r = from->split(rem, bits, to.index);
}
_close_replay_guard(cid, spos);
_close_replay_guard(dest, spos);
@ -4832,8 +5023,15 @@ int FileStore::_split_collection_create(coll_t cid,
if (!r)
r = get_index(dest, &to);
if (!r)
r = from->split(rem, bits, to);
if (!r) {
assert(NULL != from.index);
RWLock::WLocker l1((from.index)->access_lock);
assert(NULL != to.index);
RWLock::WLocker l2((to.index)->access_lock);
r = from->split(rem, bits, to.index);
}
_close_replay_guard(cid, spos);
_close_replay_guard(dest, spos);

View File

@ -382,7 +382,8 @@ private:
PerfCounters *logger;
public:
int lfn_find(coll_t cid, const ghobject_t& oid, IndexedPath *path);
int lfn_find(const ghobject_t& oid, const Index& index,
IndexedPath *path = NULL);
int lfn_truncate(coll_t cid, const ghobject_t& oid, off_t length);
int lfn_stat(coll_t cid, const ghobject_t& oid, struct stat *buf);
int lfn_open(
@ -390,8 +391,8 @@ public:
const ghobject_t& oid,
bool create,
FDRef *outfd,
IndexedPath *path = 0,
Index *index = 0);
void lfn_close(FDRef fd);
int lfn_link(coll_t c, coll_t newcid, const ghobject_t& o, const ghobject_t& newoid) ;
int lfn_unlink(coll_t cid, const ghobject_t& o, const SequencerPosition &spos,

View File

@ -18,7 +18,6 @@
#endif
#include "FlatIndex.h"
#include "CollectionIndex.h"
#include "common/ceph_crypto.h"
#include "osd/osd_types.h"
#include <errno.h>
@ -49,9 +48,6 @@ using ceph::crypto::SHA1;
#define FILENAME_PREFIX_LEN (FILENAME_SHORT_LEN - FILENAME_HASH_LEN - (sizeof(FILENAME_COOKIE) - 1) - FILENAME_EXTRA)
void FlatIndex::set_ref(ceph::shared_ptr<CollectionIndex> ref) {
self_ref = ref;
}
int FlatIndex::cleanup() {
return 0;
@ -356,7 +352,7 @@ int FlatIndex::lookup(const ghobject_t &hoid, IndexedPath *path, int *exist) {
sizeof(long_fn), exist, &is_lfn);
if (r < 0)
return r;
*path = IndexedPath(new Path(string(short_fn), self_ref));
*path = IndexedPath(new Path(string(short_fn), this));
return 0;
}

View File

@ -29,7 +29,6 @@
* This class should only be used for converting old filestores.
*/
class FlatIndex : public CollectionIndex {
ceph::weak_ptr<CollectionIndex> self_ref;
string base_path;
coll_t collection;
public:
@ -41,9 +40,6 @@ public:
coll_t coll() const { return collection; }
/// @see CollectionIndex
void set_ref(ceph::shared_ptr<CollectionIndex> ref);
/// @see CollectionIndex
int cleanup();

View File

@ -228,12 +228,12 @@ int HashIndex::col_split_level(
int HashIndex::_split(
uint32_t match,
uint32_t bits,
ceph::shared_ptr<CollectionIndex> dest) {
CollectionIndex* dest) {
assert(collection_version() == dest->collection_version());
unsigned mkdirred = 0;
return col_split_level(
*this,
*static_cast<HashIndex*>(dest.get()),
*static_cast<HashIndex*>(dest),
vector<string>(),
bits,
match,

View File

@ -156,7 +156,7 @@ public:
int _split(
uint32_t match,
uint32_t bits,
ceph::shared_ptr<CollectionIndex> dest
CollectionIndex* dest
);
protected:

View File

@ -61,13 +61,18 @@ static int get_version(const char *path, uint32_t *version) {
return 0;
}
void IndexManager::put_index(coll_t c) {
Mutex::Locker l(lock);
assert(col_indices.count(c));
col_indices.erase(c);
cond.Signal();
IndexManager::~IndexManager() {
for(map<coll_t, CollectionIndex* > ::iterator it = col_indices.begin();
it != col_indices.end(); it++) {
delete it->second;
it->second = NULL;
}
col_indices.clear();
}
int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
Mutex::Locker l(lock);
int r = set_version(path, version);
@ -80,7 +85,7 @@ int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
return index.init();
}
int IndexManager::build_index(coll_t c, const char *path, Index *index) {
int IndexManager::build_index(coll_t c, const char *path, CollectionIndex **index) {
if (upgrade) {
// Need to check the collection generation
int r;
@ -91,17 +96,15 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) {
switch (version) {
case CollectionIndex::FLAT_INDEX_TAG: {
*index = Index(new FlatIndex(c, path),
RemoveOnDelete(c, this));
*index = new FlatIndex(c, path);
return 0;
}
case CollectionIndex::HASH_INDEX_TAG: // fall through
case CollectionIndex::HASH_INDEX_TAG_2: // fall through
case CollectionIndex::HOBJECT_WITH_POOL: {
// Must be a HashIndex
*index = Index(new HashIndex(c, path, g_conf->filestore_merge_threshold,
g_conf->filestore_split_multiple, version),
RemoveOnDelete(c, this));
*index = new HashIndex(c, path, g_conf->filestore_merge_threshold,
g_conf->filestore_split_multiple, version);
return 0;
}
default: assert(0);
@ -109,28 +112,29 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) {
} else {
// No need to check
*index = Index(new HashIndex(c, path, g_conf->filestore_merge_threshold,
*index = new HashIndex(c, path, g_conf->filestore_merge_threshold,
g_conf->filestore_split_multiple,
CollectionIndex::HOBJECT_WITH_POOL,
g_conf->filestore_index_retry_probability),
RemoveOnDelete(c, this));
g_conf->filestore_index_retry_probability);
return 0;
}
}
int IndexManager::get_index(coll_t c, const char *path, Index *index) {
int IndexManager::get_index(coll_t c, const string& baseDir, Index *index) {
Mutex::Locker l(lock);
while (1) {
if (!col_indices.count(c)) {
int r = build_index(c, path, index);
if (r < 0)
return r;
(*index)->set_ref(*index);
col_indices[c] = (*index);
break;
} else {
cond.Wait(lock);
}
map<coll_t, CollectionIndex* > ::iterator it = col_indices.find(c);
if (it == col_indices.end()) {
char path[PATH_MAX];
snprintf(path, sizeof(path), "%s/current/%s", baseDir.c_str(), c.to_str().c_str());
CollectionIndex* colIndex = NULL;
int r = build_index(c, path, &colIndex);
if (r < 0)
return r;
col_indices[c] = colIndex;
index->index = colIndex;
} else {
index->index = it->second;
}
return 0;
}

View File

@ -28,7 +28,17 @@
/// Public type for Index
typedef ceph::shared_ptr<CollectionIndex> Index;
struct Index {
CollectionIndex *index;
Index() : index(NULL) {}
Index(CollectionIndex* index) : index(index) {}
CollectionIndex *operator->() { return index; }
CollectionIndex &operator*() { return *index; }
};
/**
* Encapsulates mutual exclusion for CollectionIndexes.
*
@ -37,39 +47,12 @@ typedef ceph::shared_ptr<CollectionIndex> Index;
* that path) may result in the path becoming invalid. Thus, during
* the lifetime of a CollectionIndex object and any paths returned
* by it, no other concurrent accesses may be allowed.
*
* This is enforced using shared_ptr. A shared_ptr<CollectionIndex>
* is returned from get_index. Any paths generated using that object
* carry a reference to the parrent index. Once all
* shared_ptr<CollectionIndex> references have expired, the destructor
* removes the weak_ptr from col_indices and wakes waiters.
* This is enforced by using CollectionIndex::access_lock
*/
class IndexManager {
Mutex lock; ///< Lock for Index Manager
Cond cond; ///< Cond for waiters on col_indices
bool upgrade;
/// Currently in use CollectionIndices
map<coll_t,ceph::weak_ptr<CollectionIndex> > col_indices;
/// Cleans up state for c @see RemoveOnDelete
void put_index(
coll_t c ///< Put the index for c
);
/// Callback for shared_ptr release @see get_index
class RemoveOnDelete {
public:
coll_t c;
IndexManager *manager;
RemoveOnDelete(coll_t c, IndexManager *manager) :
c(c), manager(manager) {}
void operator()(CollectionIndex *index) {
manager->put_index(c);
delete index;
}
};
map<coll_t, CollectionIndex* > col_indices;
/**
* Index factory
@ -82,12 +65,14 @@ class IndexManager {
* @param [out] index Index for c
* @return error code
*/
int build_index(coll_t c, const char *path, Index *index);
int build_index(coll_t c, const char *path, CollectionIndex **index);
public:
/// Constructor
IndexManager(bool upgrade) : lock("IndexManager lock"),
upgrade(upgrade) {}
~IndexManager();
/**
* Reserve and return index for c
*
@ -96,7 +81,7 @@ public:
* @param [out] index Index for c
* @return error code
*/
int get_index(coll_t c, const char *path, Index *index);
int get_index(coll_t c, const string& baseDir, Index *index);
/**
* Initialize index for collection c at path

View File

@ -74,10 +74,6 @@ struct FDCloser {
/* Public methods */
void LFNIndex::set_ref(ceph::shared_ptr<CollectionIndex> ref)
{
self_ref = ref;
}
int LFNIndex::init()
{
@ -142,7 +138,7 @@ int LFNIndex::lookup(const ghobject_t &oid,
} else {
*exist = 1;
}
*out_path = IndexedPath(new Path(full_path, self_ref));
*out_path = IndexedPath(new Path(full_path, this));
r = 0;
);
}

View File

@ -98,8 +98,6 @@ class LFNIndex : public CollectionIndex {
/// Path to Index base.
const string base_path;
/// For reference counting the collection @see Path
ceph::weak_ptr<CollectionIndex> self_ref;
protected:
const uint32_t index_version;
@ -155,9 +153,6 @@ public:
/// Virtual destructor
virtual ~LFNIndex() {}
/// @see CollectionIndex
void set_ref(ceph::shared_ptr<CollectionIndex> ref);
/// @see CollectionIndex
int init();
@ -200,14 +195,14 @@ public:
virtual int _split(
uint32_t match, //< [in] value to match
uint32_t bits, //< [in] bits to check
ceph::shared_ptr<CollectionIndex> dest //< [in] destination index
CollectionIndex* dest //< [in] destination index
) = 0;
/// @see CollectionIndex
int split(
uint32_t match,
uint32_t bits,
ceph::shared_ptr<CollectionIndex> dest
CollectionIndex* dest
) {
WRAP_RETRY(
r = _split(match, bits, dest);

View File

@ -70,7 +70,6 @@ TEST(FlatIndex, created_unlink) {
//
{
CollectionIndex::IndexedPath indexed_path;
index->set_ref(index);
const std::string object_name(10, 'A');
ghobject_t hoid(hobject_t(object_t(object_name), key, CEPH_NOSNAP, hash, pool, ""));
int exists;
@ -88,7 +87,6 @@ TEST(FlatIndex, created_unlink) {
//
{
CollectionIndex::IndexedPath indexed_path;
index->set_ref(index);
const std::string object_name(1024, 'A');
ghobject_t hoid(hobject_t(object_t(object_name), key, CEPH_NOSNAP, hash, pool, ""));
int exists;

View File

@ -42,7 +42,7 @@ public:
virtual int _split(
uint32_t match,
uint32_t bits,
ceph::shared_ptr<CollectionIndex> dest
CollectionIndex* dest
) { return 0; }
void test_generate_and_parse(const ghobject_t &hoid, const std::string &mangled_expected) {