rados: encode bufferlist in watch-notify

This commit is contained in:
Yehuda Sadeh 2011-06-27 17:20:34 -07:00
parent 6d6b05d8b1
commit 01118352b9
10 changed files with 83 additions and 26 deletions

View File

@ -199,7 +199,7 @@ typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg);
int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle,
rados_watchcb_t watchcb, void *arg);
int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle);
int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver);
int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len);
#ifdef __cplusplus
}

View File

@ -74,7 +74,7 @@ namespace librados
class WatchCtx {
public:
virtual ~WatchCtx();
virtual void notify(uint8_t opcode, uint64_t ver) = 0;
virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) = 0;
};
struct AioCompletion {
@ -248,7 +248,7 @@ namespace librados
int watch(const std::string& o, uint64_t ver, uint64_t *handle,
librados::WatchCtx *ctx);
int unwatch(const std::string& o, uint64_t handle);
int notify(const std::string& o, uint64_t ver);
int notify(const std::string& o, uint64_t ver, bufferlist& bl);
void set_notify_timeout(uint32_t timeout);
// assert version for next sync operations

View File

@ -703,7 +703,7 @@ public:
io_ctx_impl->put();
}
void notify(RadosClient *client, MWatchNotify *m) {
ctx->notify(m->opcode, m->ver);
ctx->notify(m->opcode, m->ver, m->bl);
if (m->opcode != WATCH_NOTIFY_COMPLETE) {
client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver);
}
@ -719,7 +719,7 @@ public:
*done = false;
}
void notify(uint8_t opcode, uint64_t ver) {
void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
*done = true;
cond->Signal();
}
@ -758,7 +758,7 @@ public:
int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie);
int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver);
int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl);
int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver);
eversion_t last_version(IoCtxImpl& io) {
@ -2327,7 +2327,7 @@ unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie)
}// ---------------------------------------------
int librados::RadosClient::
notify(IoCtxImpl& io, const object_t& oid, uint64_t ver)
notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl)
{
utime_t ut = ceph_clock_now(cct);
bufferlist inbl, outbl;
@ -2353,6 +2353,7 @@ notify(IoCtxImpl& io, const object_t& oid, uint64_t ver)
uint32_t timeout = io.notify_timeout;
::encode(prot_ver, inbl);
::encode(timeout, inbl);
::encode(bl, inbl);
rd.notify(cookie, ver, inbl);
objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &objver);
lock.Unlock();
@ -2910,10 +2911,10 @@ unwatch(const string& oid, uint64_t handle)
}
int librados::IoCtx::
notify(const string& oid, uint64_t ver)
notify(const string& oid, uint64_t ver, bufferlist& bl)
{
object_t obj(oid);
return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver);
return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl);
}
void librados::IoCtx::
@ -3871,7 +3872,7 @@ struct C_WatchCB : public librados::WatchCtx {
rados_watchcb_t wcb;
void *arg;
C_WatchCB(rados_watchcb_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {}
void notify(uint8_t opcode, uint64_t ver) {
void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
wcb(opcode, ver, arg);
}
};
@ -3894,9 +3895,15 @@ int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
return ctx->client->unwatch(*ctx, oid, cookie);
}
int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver)
int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
return ctx->client->notify(*ctx, oid, ver);
bufferlist bl;
if (buf) {
bufferptr p = buffer::create(buf_len);
memcpy(p.c_str(), buf, buf_len);
bl.push_back(p);
}
return ctx->client->notify(*ctx, oid, ver, bl);
}

View File

@ -118,7 +118,7 @@ namespace librbd {
lock("librbd::WatchCtx") {}
virtual ~WatchCtx() {}
void invalidate();
virtual void notify(uint8_t opcode, uint64_t ver);
virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl);
};
struct AioCompletion;
@ -301,7 +301,7 @@ void WatchCtx::invalidate()
valid = false;
}
void WatchCtx::notify(uint8_t opcode, uint64_t ver)
void WatchCtx::notify(uint8_t opcode, uint64_t ver, bufferlist& bl)
{
Mutex::Locker l(lock);
ldout(ictx->cct, 1) << " got notification opcode=" << (int)opcode << " ver=" << ver << " cookie=" << cookie << dendl;
@ -488,7 +488,8 @@ int notify_change(IoCtx& io_ctx, const string& oid, uint64_t *pver, ImageCtx *ic
ver = *pver;
else
ver = io_ctx.get_last_version();
io_ctx.notify(oid, ver);
bufferlist bl;
io_ctx.notify(oid, ver, bl);
return 0;
}

View File

@ -25,10 +25,11 @@ class MWatchNotify : public Message {
uint64_t ver;
uint64_t notify_id;
uint8_t opcode;
bufferlist bl;
MWatchNotify() : Message(CEPH_MSG_WATCH_NOTIFY) { }
MWatchNotify(uint64_t c, uint64_t v, uint64_t i, uint8_t o) : Message(CEPH_MSG_WATCH_NOTIFY),
cookie(c), ver(v), notify_id(i), opcode(o) { }
MWatchNotify(uint64_t c, uint64_t v, uint64_t i, uint8_t o, bufferlist b) : Message(CEPH_MSG_WATCH_NOTIFY),
cookie(c), ver(v), notify_id(i), opcode(o), bl(b) { }
private:
~MWatchNotify() {}
@ -41,14 +42,17 @@ public:
::decode(cookie, p);
::decode(ver, p);
::decode(notify_id, p);
if (msg_ver >= 1)
::decode(bl, p);
}
void encode_payload(CephContext *cct) {
uint8_t msg_ver = 0;
uint8_t msg_ver = 1;
::encode(msg_ver, payload);
::encode(opcode, payload);
::encode(cookie, payload);
::encode(ver, payload);
::encode(notify_id, payload);
::encode(bl, payload);
}
const char *get_type_name() { return "watch-notify"; }

View File

@ -1394,10 +1394,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
{
uint32_t ver;
uint32_t timeout;
bufferlist bl;
try {
::decode(ver, bp);
::decode(timeout, bp);
::decode(bl, bp);
} catch (const buffer::error &e) {
timeout = 0;
}
@ -1407,6 +1409,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
notify_info_t n;
n.timeout = timeout;
n.cookie = op.watch.cookie;
n.bl = bl;
ctx->notifies.push_back(n);
}
break;
@ -2280,7 +2283,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
if (iter != notif->watchers.end()) {
/* there is a pending notification for this watcher, we should resend it anyway
even if we already sent it as it might not have received it */
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY);
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
osd->client_messenger->send_message(notify_msg, session->con);
}
}
@ -2305,7 +2308,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
dout(10) << " " << *p << dendl;
Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie);
Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
session->get(); // notif got a reference
notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
@ -2322,7 +2325,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
s->add_notif(notif, name);
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY);
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
osd->client_messenger->send_message(notify_msg, s->con);
}
@ -2337,7 +2340,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
notif->add_watcher(name, Watch::WATCHER_PENDING); /* FIXME: should we remove expired unconnected? probably yes */
}
notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE);
notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
if (notif->watchers.empty()) {
do_complete_notify(notif, obc);
} else {

View File

@ -43,12 +43,13 @@ public:
Context *timeout;
void *obc;
pg_t pgid;
bufferlist bl;
void add_watcher(const entity_name_t& name, WatcherState state) {
watchers[name] = state;
}
Notification(entity_name_t& n, OSD::Session *s, uint64_t c) : name(n), session(s), cookie(c) { }
Notification(entity_name_t& n, OSD::Session *s, uint64_t c, bufferlist& b) : name(n), session(s), cookie(c), bl(b) { }
};
class C_NotifyTimeout : public Context {

View File

@ -1306,6 +1306,7 @@ static inline ostream& operator<<(ostream& out, const watch_info_t& w) {
struct notify_info_t {
uint64_t cookie;
uint32_t timeout;
bufferlist bl;
};
static inline ostream& operator<<(ostream& out, const notify_info_t& n) {

View File

@ -165,6 +165,22 @@ static int do_put(IoCtx& io_ctx, const char *objname, const char *infile, int op
return 0;
}
class RadosWatchCtx : public librados::WatchCtx {
string name;
public:
RadosWatchCtx(const char *imgname) : name(imgname) {}
virtual ~RadosWatchCtx() {}
virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
string s;
try {
bufferlist::iterator iter = bl.begin();
::decode(s, iter);
} catch (buffer::error *err) {
cout << "could not decode bufferlist, buffer length=" << bl.length() << std::endl;
}
cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " msg='" << s << "'" << std::endl;
}
};
/**********************************************
**********************************************/
@ -645,7 +661,31 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
if (ret != 0)
cerr << "error during benchmark: " << ret << std::endl;
}
else {
else if (strcmp(nargs[0], "watch") == 0) {
if (!pool_name || nargs.size() < 2)
usage();
string oid(nargs[1]);
RadosWatchCtx ctx(oid.c_str());
uint64_t cookie;
ret = io_ctx.watch(oid, 0, &cookie, &ctx);
if (ret != 0)
cerr << "error calling watch: " << ret << std::endl;
else {
cout << "press enter to exit..." << std::endl;
getchar();
}
}
else if (strcmp(nargs[0], "notify") == 0) {
if (!pool_name || nargs.size() < 3)
usage();
string oid(nargs[1]);
string msg(nargs[2]);
bufferlist bl;
::encode(msg, bl);
ret = io_ctx.notify(oid, 0, bl);
if (ret != 0)
cerr << "error calling notify: " << ret << std::endl;
} else {
cerr << "unrecognized command " << nargs[0] << std::endl;
usage();
}

View File

@ -479,8 +479,8 @@ class RbdWatchCtx : public librados::WatchCtx {
public:
RbdWatchCtx(const char *imgname) : name(imgname) {}
virtual ~RbdWatchCtx() {}
virtual void notify(uint8_t opcode, uint64_t ver) {
cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << std::endl;
virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " bl.length=" << bl.length() << std::endl;
}
};