osdc/ObjectCacher: remove dead locking code

This is unused, and mostly broken in that there is no cleanup when there
is a failure.  Also, the support in the OSD has been largely removed.

Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
Sage Weil 2012-10-27 13:56:24 -07:00
parent 86de1faa2c
commit 84c7a34b51
2 changed files with 2 additions and 263 deletions

View File

@ -678,65 +678,6 @@ void ObjectCacher::bh_write(BufferHead *bh)
mark_tx(bh);
}
void ObjectCacher::lock_ack(int64_t poolid, list<sobject_t>& oids, tid_t tid)
{
for (list<sobject_t>::iterator i = oids.begin();
i != oids.end();
i++) {
sobject_t oid = *i;
if (objects[poolid].count(oid) == 0) {
ldout(cct, 7) << "lock_ack no object cache" << dendl;
assert(0);
}
Object *ob = objects[poolid][oid];
list<Context*> ls;
// waiters?
if (ob->waitfor_commit.count(tid)) {
ls.splice(ls.end(), ob->waitfor_commit[tid]);
ob->waitfor_commit.erase(tid);
}
assert(tid <= ob->last_write_tid);
if (ob->last_write_tid == tid) {
ldout(cct, 10) << "lock_ack " << *ob
<< " tid " << tid << dendl;
switch (ob->lock_state) {
case Object::LOCK_RDUNLOCKING:
case Object::LOCK_WRUNLOCKING:
ob->lock_state = Object::LOCK_NONE;
break;
case Object::LOCK_RDLOCKING:
case Object::LOCK_DOWNGRADING:
ob->lock_state = Object::LOCK_RDLOCK;
ls.splice(ls.begin(), ob->waitfor_rd);
break;
case Object::LOCK_UPGRADING:
case Object::LOCK_WRLOCKING:
ob->lock_state = Object::LOCK_WRLOCK;
ls.splice(ls.begin(), ob->waitfor_wr);
ls.splice(ls.begin(), ob->waitfor_rd);
break;
default:
assert(0);
}
ob->last_commit_tid = tid;
} else {
ldout(cct, 10) << "lock_ack " << *ob
<< " tid " << tid << " obsolete" << dendl;
}
finish_contexts(cct, ls);
ob->put();
}
}
void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
uint64_t length, tid_t tid, int r)
{
@ -1272,164 +1213,6 @@ void ObjectCacher::flusher_entry()
ldout(cct, 10) << "flusher finish" << dendl;
}
// locking -----------------------------
void ObjectCacher::rdlock(Object *o)
{
// lock?
if (o->lock_state == Object::LOCK_NONE ||
o->lock_state == Object::LOCK_RDUNLOCKING ||
o->lock_state == Object::LOCK_WRUNLOCKING) {
ldout(cct, 10) << "rdlock rdlock " << *o << dendl;
o->lock_state = Object::LOCK_RDLOCKING;
C_LockAck *ack = new C_LockAck(this, o->oloc.pool, o->get_soid());
C_WriteCommit *commit = new C_WriteCommit(this, o->oloc.pool,
o->get_soid(), 0, 0);
commit->tid =
ack->tid =
o->last_write_tid = writeback_handler.lock(o->get_oid(), o->get_oloc(),
CEPH_OSD_OP_RDLOCK, 0,
ack, commit);
}
// stake our claim.
o->rdlock_ref++;
o->get();
// wait?
if (o->lock_state == Object::LOCK_RDLOCKING ||
o->lock_state == Object::LOCK_WRLOCKING) {
ldout(cct, 10) << "rdlock waiting for rdlock|wrlock on " << *o << dendl;
Mutex flock("ObjectCacher::rdlock flock");
Cond cond;
bool done = false;
o->waitfor_rd.push_back(new C_SafeCond(&flock, &cond, &done));
while (!done) cond.Wait(flock);
}
assert(o->lock_state == Object::LOCK_RDLOCK ||
o->lock_state == Object::LOCK_WRLOCK ||
o->lock_state == Object::LOCK_UPGRADING ||
o->lock_state == Object::LOCK_DOWNGRADING);
}
void ObjectCacher::wrlock(Object *o)
{
// lock?
if (o->lock_state != Object::LOCK_WRLOCK &&
o->lock_state != Object::LOCK_WRLOCKING &&
o->lock_state != Object::LOCK_UPGRADING) {
ldout(cct, 10) << "wrlock wrlock " << *o << dendl;
int op = 0;
if (o->lock_state == Object::LOCK_RDLOCK) {
o->lock_state = Object::LOCK_UPGRADING;
op = CEPH_OSD_OP_UPLOCK;
} else {
o->lock_state = Object::LOCK_WRLOCKING;
op = CEPH_OSD_OP_WRLOCK;
}
C_LockAck *ack = new C_LockAck(this, o->oloc.pool, o->get_soid());
C_WriteCommit *commit = new C_WriteCommit(this, o->oloc.pool,
o->get_soid(), 0, 0);
commit->tid =
ack->tid =
o->last_write_tid = writeback_handler.lock(o->get_oid(), o->get_oloc(),
op, 0, ack, commit);
}
// stake our claim.
o->wrlock_ref++;
o->get();
// wait?
if (o->lock_state == Object::LOCK_WRLOCKING ||
o->lock_state == Object::LOCK_UPGRADING) {
ldout(cct, 10) << "wrlock waiting for wrlock on " << *o << dendl;
Mutex flock("ObjectCacher::wrlock flock");
Cond cond;
bool done = false;
o->waitfor_wr.push_back(new C_SafeCond(&flock, &cond, &done));
while (!done) cond.Wait(flock);
}
assert(o->lock_state == Object::LOCK_WRLOCK);
}
void ObjectCacher::rdunlock(Object *o)
{
ldout(cct, 10) << "rdunlock " << *o << dendl;
assert(o->lock_state == Object::LOCK_RDLOCK ||
o->lock_state == Object::LOCK_WRLOCK ||
o->lock_state == Object::LOCK_UPGRADING ||
o->lock_state == Object::LOCK_DOWNGRADING);
assert(o->rdlock_ref > 0);
o->rdlock_ref--;
if (o->rdlock_ref > 0 ||
o->wrlock_ref > 0) {
ldout(cct, 10) << "rdunlock " << *o << " still has rdlock|wrlock refs" << dendl;
return;
}
release(o); // release first
o->lock_state = Object::LOCK_RDUNLOCKING;
o->get();
C_LockAck *lockack = new C_LockAck(this, o->oloc.pool, o->get_soid());
C_WriteCommit *commit = new C_WriteCommit(this, o->oloc.pool,
o->get_soid(), 0, 0);
commit->tid =
lockack->tid =
o->last_write_tid = writeback_handler.lock(o->get_oid(), o->get_oloc(),
CEPH_OSD_OP_RDUNLOCK, 0,
lockack, commit);
}
void ObjectCacher::wrunlock(Object *o)
{
ldout(cct, 10) << "wrunlock " << *o << dendl;
assert(o->lock_state == Object::LOCK_WRLOCK);
assert(o->wrlock_ref > 0);
o->wrlock_ref--;
if (o->wrlock_ref > 0) {
ldout(cct, 10) << "wrunlock " << *o << " still has wrlock refs" << dendl;
return;
}
flush(o, 0, 0); // flush first
o->get();
int op = 0;
if (o->rdlock_ref > 0) {
ldout(cct, 10) << "wrunlock rdlock " << *o << dendl;
op = CEPH_OSD_OP_DNLOCK;
o->lock_state = Object::LOCK_DOWNGRADING;
} else {
ldout(cct, 10) << "wrunlock wrunlock " << *o << dendl;
op = CEPH_OSD_OP_WRUNLOCK;
o->lock_state = Object::LOCK_WRUNLOCKING;
}
C_LockAck *lockack = new C_LockAck(this, o->oloc.pool, o->get_soid());
C_WriteCommit *commit = new C_WriteCommit(this, o->oloc.pool,
o->get_soid(), 0, 0);
commit->tid =
lockack->tid =
o->last_write_tid = writeback_handler.lock(o->get_oid(), o->get_oloc(),
op, 0, lockack, commit);
}
// -------------------------------------------------

View File

@ -184,20 +184,6 @@ class ObjectCacher {
list<Context*> waitfor_rd;
list<Context*> waitfor_wr;
// lock
static const int LOCK_NONE = 0;
static const int LOCK_WRLOCKING = 1;
static const int LOCK_WRLOCK = 2;
static const int LOCK_WRUNLOCKING = 3;
static const int LOCK_RDLOCKING = 4;
static const int LOCK_RDLOCK = 5;
static const int LOCK_RDUNLOCKING = 6;
static const int LOCK_UPGRADING = 7; // rd -> wr
static const int LOCK_DOWNGRADING = 8; // wr -> rd
int lock_state;
int wrlock_ref; // how many ppl want or are using a WRITE lock
int rdlock_ref; // how many ppl want or are using a READ lock
public:
Object(const Object& other);
const Object& operator=(const Object& other);
@ -208,8 +194,7 @@ class ObjectCacher {
oid(o), oset(os), set_item(this), oloc(l),
complete(false),
last_write_tid(0), last_commit_tid(0),
dirty_or_tx(0),
lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) {
dirty_or_tx(0) {
// add to set
os->objects.push_back(&set_item);
}
@ -229,7 +214,7 @@ class ObjectCacher {
void set_object_locator(object_locator_t& l) { oloc = l; }
bool can_close() {
if (data.empty() && lock_state == LOCK_NONE &&
if (data.empty() &&
waitfor_commit.empty() &&
waitfor_rd.empty() && waitfor_wr.empty() &&
dirty_or_tx == 0) {
@ -441,11 +426,6 @@ class ObjectCacher {
loff_t release(Object *o);
void purge(Object *o);
void rdlock(Object *o);
void rdunlock(Object *o);
void wrlock(Object *o);
void wrunlock(Object *o);
int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call);
@ -454,7 +434,6 @@ class ObjectCacher {
uint64_t length, bufferlist &bl, int r);
void bh_write_commit(int64_t poolid, sobject_t oid, loff_t offset,
uint64_t length, tid_t t, int r);
void lock_ack(int64_t poolid, list<sobject_t>& oids, tid_t tid);
class C_ReadFinish : public Context {
ObjectCacher *oc;
@ -486,20 +465,6 @@ class ObjectCacher {
}
};
class C_LockAck : public Context {
ObjectCacher *oc;
public:
int64_t poolid;
list<sobject_t> oids;
tid_t tid;
C_LockAck(ObjectCacher *c, int64_t _poolid, sobject_t o) : oc(c), poolid(_poolid), tid(0) {
oids.push_back(o);
}
void finish(int r) {
oc->lock_ack(poolid, oids, tid);
}
};
void perf_start();
void perf_stop();
@ -661,15 +626,6 @@ inline ostream& operator<<(ostream& out, ObjectCacher::Object &ob)
<< ob.get_soid() << " oset " << ob.oset << dec
<< " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
switch (ob.lock_state) {
case ObjectCacher::Object::LOCK_WRLOCKING: out << " wrlocking"; break;
case ObjectCacher::Object::LOCK_WRLOCK: out << " wrlock"; break;
case ObjectCacher::Object::LOCK_WRUNLOCKING: out << " wrunlocking"; break;
case ObjectCacher::Object::LOCK_RDLOCKING: out << " rdlocking"; break;
case ObjectCacher::Object::LOCK_RDLOCK: out << " rdlock"; break;
case ObjectCacher::Object::LOCK_RDUNLOCKING: out << " rdunlocking"; break;
}
if (ob.complete)
out << " COMPLETE";