mirror of
https://github.com/ceph/ceph
synced 2025-02-24 11:37:37 +00:00
osd: watch request doesn't use version
also fix watch-notify to send the current user_version
This commit is contained in:
parent
de8f021204
commit
a4223d4f02
@ -822,7 +822,7 @@ void ReplicatedPG::dump_watchers(ObjectContext *obc)
|
||||
oi_iter != obc->obs.oi.watchers.end();
|
||||
oi_iter++) {
|
||||
watch_info_t& w = oi_iter->second;
|
||||
dout(0) << " * oi->watcher: " << oi_iter->first << " ver=" << w.ver << " cookie=" << w.cookie << dendl;
|
||||
dout(0) << " * oi->watcher: " << oi_iter->first << " cookie=" << w.cookie << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1177,7 +1177,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
|
||||
notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
|
||||
s->add_notif(notif, name);
|
||||
|
||||
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY);
|
||||
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY);
|
||||
osd->client_messenger->send_message(notify_msg, s->con);
|
||||
}
|
||||
|
||||
@ -1192,7 +1192,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
|
||||
notif->add_watcher(name, Watch::WATCHER_PENDING); /* FIXME: should we remove expired unconnected? probably yes */
|
||||
}
|
||||
|
||||
notif->reply = new MWatchNotify(op.watch.cookie, op.watch.ver, notif->id, WATCH_NOTIFY_COMPLETE);
|
||||
notif->reply = new MWatchNotify(op.watch.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE);
|
||||
if (notif->watchers.empty()) {
|
||||
do_complete_notify(notif, obc);
|
||||
} else {
|
||||
@ -1219,9 +1219,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
|
||||
dout(0) << "couldn't find watcher" << dendl;
|
||||
break;
|
||||
}
|
||||
watch_info_t& wi = oi_iter->second;
|
||||
wi.ver = op.watch.ver;
|
||||
// FIXME: this gets lost without t.nop().
|
||||
|
||||
Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie);
|
||||
if (!notif) {
|
||||
@ -1405,31 +1402,18 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
|
||||
case CEPH_OSD_OP_WATCH:
|
||||
{
|
||||
uint64_t cookie = op.watch.cookie;
|
||||
uint64_t ver = op.watch.ver;
|
||||
bool do_watch = op.watch.flag & 1;
|
||||
entity_name_t entity = ctx->reqid.name;
|
||||
ObjectContext *obc = ctx->obc;
|
||||
|
||||
dout(0) << "watch: ctx->obc=" << (void *)obc << " cookie=" << cookie << " ver=" << ver
|
||||
dout(0) << "watch: ctx->obc=" << (void *)obc << " cookie=" << cookie
|
||||
<< " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl;
|
||||
dout(0) << "watch: oi.user_version=" << oi.user_version.version << dendl;
|
||||
|
||||
if (do_watch) {
|
||||
if (ver < oi.user_version.version) {
|
||||
result = -ERANGE;
|
||||
break;
|
||||
}
|
||||
if (ver > oi.user_version.version) {
|
||||
result = -EOVERFLOW;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv();
|
||||
|
||||
osd->watch_lock.Lock();
|
||||
map<entity_name_t, OSD::Session *>::iterator iter = obc->watchers.find(entity);
|
||||
watch_info_t w = {cookie, ver, 30}; // FIXME: where does the timeout come from?
|
||||
watch_info_t w = {cookie, 30}; // FIXME: where does the timeout come from?
|
||||
if (do_watch) {
|
||||
if (oi.watchers.count(entity) && oi.watchers[entity] == w) {
|
||||
dout(10) << " found existing watch " << w << " by " << entity << " session " << session << dendl;
|
||||
@ -1470,7 +1454,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
|
||||
if (iter != notif->watchers.end()) {
|
||||
/* there is a pending notification for this watcher, we should resend it anyway
|
||||
even if we already sent it as it might not have received it */
|
||||
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, w.ver, notif->id, WATCH_NOTIFY);
|
||||
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY);
|
||||
osd->client_messenger->send_message(notify_msg, session->con);
|
||||
}
|
||||
}
|
||||
|
@ -1288,31 +1288,32 @@ inline ostream& operator<<(ostream& out, const SnapSet& cs) {
|
||||
|
||||
struct watch_info_t {
|
||||
uint64_t cookie;
|
||||
uint64_t ver;
|
||||
uint32_t timeout_seconds;
|
||||
void encode(bufferlist& bl) const {
|
||||
const __u8 v = 1;
|
||||
const __u8 v = 2;
|
||||
::encode(v, bl);
|
||||
::encode(cookie, bl);
|
||||
::encode(ver, bl);
|
||||
::encode(timeout_seconds, bl);
|
||||
}
|
||||
void decode(bufferlist::iterator& bl) {
|
||||
__u8 v;
|
||||
::decode(v, bl);
|
||||
::decode(cookie, bl);
|
||||
::decode(ver, bl);
|
||||
if (v < 2) {
|
||||
uint64_t ver;
|
||||
::decode(ver, bl);
|
||||
}
|
||||
::decode(timeout_seconds, bl);
|
||||
}
|
||||
};
|
||||
WRITE_CLASS_ENCODER(watch_info_t)
|
||||
|
||||
static inline bool operator==(const watch_info_t& l, const watch_info_t& r) {
|
||||
return l.cookie == r.cookie && l.ver == r.ver && l.timeout_seconds == r.timeout_seconds;
|
||||
return l.cookie == r.cookie && l.timeout_seconds == r.timeout_seconds;
|
||||
}
|
||||
|
||||
static inline ostream& operator<<(ostream& out, const watch_info_t& w) {
|
||||
return out << "watch(cookie " << w.cookie << " v" << w.ver << " " << w.timeout_seconds << "s)";
|
||||
return out << "watch(cookie " << w.cookie << " " << w.timeout_seconds << "s)";
|
||||
}
|
||||
|
||||
struct object_info_t {
|
||||
|
Loading…
Reference in New Issue
Block a user