mirror of
https://github.com/ceph/ceph
synced 2025-01-18 17:12:29 +00:00
librados: call notification under different thread context
This fixes #2342. We shouldn't call notify on the dispatcher context. We should also make sure that we don't hold the client lock while waiting for the responses. Also, pushed the client_lock locking into the ctx->notify(). Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
This commit is contained in:
parent
e51772ca19
commit
70f70d803a
@ -1594,13 +1594,16 @@ librados::WatchContext::~WatchContext()
|
||||
io_ctx_impl->put();
|
||||
}
|
||||
|
||||
void librados::WatchContext::notify(uint8_t opcode,
|
||||
void librados::WatchContext::notify(Mutex *client_lock,
|
||||
uint8_t opcode,
|
||||
uint64_t ver,
|
||||
uint64_t notify_id,
|
||||
bufferlist& payload)
|
||||
{
|
||||
ctx->notify(opcode, ver, payload);
|
||||
if (opcode != WATCH_NOTIFY_COMPLETE) {
|
||||
client_lock->Lock();
|
||||
io_ctx_impl->_notify_ack(oid, notify_id, ver);
|
||||
client_lock->Unlock();
|
||||
}
|
||||
}
|
||||
|
@ -200,7 +200,7 @@ struct librados::IoCtxImpl {
|
||||
};
|
||||
|
||||
namespace librados {
|
||||
struct WatchContext {
|
||||
struct WatchContext : public RefCountedWaitObject {
|
||||
IoCtxImpl *io_ctx_impl;
|
||||
const object_t oid;
|
||||
uint64_t cookie;
|
||||
@ -212,7 +212,7 @@ struct WatchContext {
|
||||
const object_t& _oc,
|
||||
librados::WatchCtx *_ctx);
|
||||
~WatchContext();
|
||||
void notify(uint8_t opcode, uint64_t ver, uint64_t notify_id,
|
||||
void notify(Mutex *lock, uint8_t opcode, uint64_t ver, uint64_t notify_id,
|
||||
bufferlist& payload);
|
||||
};
|
||||
}
|
||||
|
@ -447,11 +447,36 @@ void librados::RadosClient::unregister_watcher(uint64_t cookie)
|
||||
WatchContext *ctx = iter->second;
|
||||
if (ctx->linger_id)
|
||||
objecter->unregister_linger(ctx->linger_id);
|
||||
delete ctx;
|
||||
|
||||
watchers.erase(iter);
|
||||
lock.Unlock();
|
||||
ldout(cct, 10) << "unregister_watcher, dropping reference, waiting ctx=" << (void *)ctx << dendl;
|
||||
ctx->put_wait();
|
||||
ldout(cct, 10) << "unregister_watcher, done ctx=" << (void *)ctx << dendl;
|
||||
lock.Lock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class C_WatchNotify : public Context {
|
||||
librados::WatchContext *ctx;
|
||||
Mutex *client_lock;
|
||||
uint8_t opcode;
|
||||
uint64_t ver;
|
||||
uint64_t notify_id;
|
||||
bufferlist bl;
|
||||
|
||||
public:
|
||||
C_WatchNotify(librados::WatchContext *_ctx, Mutex *_client_lock,
|
||||
uint8_t _o, uint64_t _v, uint64_t _n, bufferlist& _bl) :
|
||||
ctx(_ctx), client_lock(_client_lock), opcode(_o), ver(_v), notify_id(_n), bl(_bl) {}
|
||||
|
||||
void finish(int r) {
|
||||
ctx->notify(client_lock, opcode, ver, notify_id, bl);
|
||||
ctx->put();
|
||||
}
|
||||
};
|
||||
|
||||
void librados::RadosClient::watch_notify(MWatchNotify *m)
|
||||
{
|
||||
assert(lock.is_locked());
|
||||
@ -463,7 +488,7 @@ void librados::RadosClient::watch_notify(MWatchNotify *m)
|
||||
if (!wc)
|
||||
return;
|
||||
|
||||
wc->notify(m->opcode, m->ver, m->notify_id, m->bl);
|
||||
|
||||
wc->get();
|
||||
finisher.queue(new C_WatchNotify(wc, &lock, m->opcode, m->ver, m->notify_id, m->bl));
|
||||
m->put();
|
||||
}
|
||||
|
@ -168,6 +168,69 @@ struct RefCountedObject {
|
||||
}
|
||||
};
|
||||
|
||||
struct RefCountedCond : public RefCountedObject {
|
||||
Mutex lock;
|
||||
Cond cond;
|
||||
atomic_t complete;
|
||||
|
||||
RefCountedCond() : lock("RefCountedCond") {}
|
||||
|
||||
void wait() {
|
||||
Mutex::Locker l(lock);
|
||||
while (!complete.read())
|
||||
cond.Wait(lock);
|
||||
}
|
||||
|
||||
void done() {
|
||||
Mutex::Locker l(lock);
|
||||
cond.SignalAll();
|
||||
complete.set(1);
|
||||
}
|
||||
};
|
||||
|
||||
struct RefCountedWaitObject {
|
||||
atomic_t nref;
|
||||
RefCountedCond *c;
|
||||
|
||||
RefCountedWaitObject() : nref(1) {
|
||||
c = new RefCountedCond;
|
||||
}
|
||||
virtual ~RefCountedWaitObject() {
|
||||
c->put();
|
||||
}
|
||||
|
||||
RefCountedWaitObject *get() {
|
||||
nref.inc();
|
||||
return this;
|
||||
}
|
||||
|
||||
bool put() {
|
||||
bool ret = false;
|
||||
RefCountedCond *cond = c;
|
||||
cond->get();
|
||||
if (nref.dec() == 0) {
|
||||
cond->done();
|
||||
delete this;
|
||||
ret = true;
|
||||
}
|
||||
cond->put();
|
||||
return ret;
|
||||
}
|
||||
|
||||
void put_wait() {
|
||||
RefCountedCond *cond = c;
|
||||
|
||||
cond->get();
|
||||
if (nref.dec() == 0) {
|
||||
cond->done();
|
||||
delete this;
|
||||
} else {
|
||||
cond->wait();
|
||||
}
|
||||
cond->put();
|
||||
}
|
||||
};
|
||||
|
||||
struct Connection : public RefCountedObject {
|
||||
Mutex lock;
|
||||
RefCountedObject *priv;
|
||||
|
Loading…
Reference in New Issue
Block a user