Merge pull request #2572 from ceph/wip-9562

osdc/Filer: drop probe/purge locks before calling objecter

Reviewed-by: Greg Farnum <greg@inktank.com>
Reviewed-by: Yan, Zheng <ukernel@gmail.com>
This commit is contained in:
John Spray 2014-09-26 11:57:53 +01:00
commit 83fb32ca41

View File

@ -47,14 +47,13 @@ public:
bool probe_complete;
{
Mutex::Locker l(probe->lock);
// TODO: handle this error.
probe->lock.Lock();
if (r != 0) {
probe->err = r;
}
probe_complete = filer->_probed(probe, oid, size, mtime);
assert(!probe->lock.is_locked_by_me());
}
if (probe_complete) {
probe->onfinish->complete(probe->err);
@ -97,16 +96,17 @@ int Filer::probe(inodeno_t ino,
probe->probing_off -= probe->probing_len;
}
// Take lock before starting any I/Os, to protect us from concurrent calls
// back into C_Probe from OSD op completions from different OSDs
{
Mutex::Locker l(probe->lock);
_probe(probe);
}
probe->lock.Lock();
_probe(probe);
assert(!probe->lock.is_locked_by_me());
return 0;
}
/**
* probe->lock must be initially locked, this function will release it
*/
void Filer::_probe(Probe *probe)
{
assert(probe->lock.is_locked_by_me());
@ -121,18 +121,27 @@ void Filer::_probe(Probe *probe)
Striper::file_to_extents(cct, probe->ino, &probe->layout,
probe->probing_off, probe->probing_len, 0, probe->probing);
std::vector<ObjectExtent> stat_extents;
for (vector<ObjectExtent>::iterator p = probe->probing.begin();
p != probe->probing.end();
++p) {
ldout(cct, 10) << "_probe probing " << p->oid << dendl;
C_Probe *c = new C_Probe(this, probe, p->oid);
objecter->stat(p->oid, p->oloc, probe->snapid, &c->size, &c->mtime,
probe->flags | CEPH_OSD_FLAG_RWORDERED, c);
probe->ops.insert(p->oid);
stat_extents.push_back(*p);
}
probe->lock.Unlock();
for (std::vector<ObjectExtent>::iterator i = stat_extents.begin();
i != stat_extents.end(); ++i) {
C_Probe *c = new C_Probe(this, probe, i->oid);
objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime,
probe->flags | CEPH_OSD_FLAG_RWORDERED, c);
}
}
/**
* probe->lock must be initially held, and will be released by this function.
*
* @return true if probe is complete and Probe object may be freed.
*/
bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mtime)
@ -149,10 +158,13 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt
assert(probe->ops.count(oid));
probe->ops.erase(oid);
if (!probe->ops.empty())
if (!probe->ops.empty()) {
probe->lock.Unlock();
return false; // waiting for more!
}
if (probe->err) { // we hit an error, propagate back up
probe->lock.Unlock();
return true;
}
@ -226,15 +238,15 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt
probe->probing_off -= period;
}
_probe(probe);
assert(!probe->lock.is_locked_by_me());
return false;
}
if (probe->pmtime) {
} else if (probe->pmtime) {
ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
*probe->pmtime = probe->max_mtime;
}
// done!
probe->lock.Unlock();
return true;
}
@ -308,20 +320,28 @@ void Filer::_do_purge_range(PurgeRange *pr, int fin)
return;
}
std::vector<object_t> remove_oids;
int max = 10 - pr->uncommitted;
while (pr->num > 0 && max > 0) {
object_t oid = file_object_t(pr->ino, pr->first);
const OSDMap *osdmap = objecter->get_osdmap_read();
object_locator_t oloc = osdmap->file_to_object_locator(pr->layout);
objecter->put_osdmap_read();
objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
NULL, new C_PurgeRange(this, pr));
remove_oids.push_back(file_object_t(pr->ino, pr->first));
pr->uncommitted++;
pr->first++;
pr->num--;
max--;
}
pr->lock.Unlock();
// Issue objecter ops outside pr->lock to avoid lock dependency loop
for (std::vector<object_t>::iterator i = remove_oids.begin();
i != remove_oids.end(); ++i) {
const object_t oid = *i;
const OSDMap *osdmap = objecter->get_osdmap_read();
const object_locator_t oloc = osdmap->file_to_object_locator(pr->layout);
objecter->put_osdmap_read();
objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
NULL, new C_PurgeRange(this, pr));
}
}