mirror of
https://github.com/ceph/ceph
synced 2025-01-02 09:02:34 +00:00
ReplicatedPG: allow multiple watches in one transaction
Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
parent
9a399afd71
commit
0202bf2903
@ -2476,8 +2476,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
oi.watchers[make_pair(cookie, entity)] = w;
|
||||
t.nop(); // make sure update the object_info on disk!
|
||||
}
|
||||
ctx->watch_connect = true;
|
||||
ctx->watch_info = w;
|
||||
ctx->watch_connects.push_back(w);
|
||||
assert(obc->registered);
|
||||
} else {
|
||||
map<pair<uint64_t, entity_name_t>, watch_info_t>::iterator oi_iter =
|
||||
@ -2487,9 +2486,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
<< entity << dendl;
|
||||
oi.watchers.erase(oi_iter);
|
||||
t.nop(); // update oi on disk
|
||||
|
||||
ctx->watch_disconnect = true;
|
||||
ctx->watch_info = w;
|
||||
ctx->watch_disconnects.push_back(w);
|
||||
} else {
|
||||
dout(10) << " can't remove: no watch by " << entity << dendl;
|
||||
}
|
||||
@ -3259,8 +3256,10 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
|
||||
|
||||
dout(15) << "do_osd_op_effects on session " << session.get() << dendl;
|
||||
|
||||
if (ctx->watch_connect) {
|
||||
pair<uint64_t, entity_name_t> watcher(ctx->watch_info.cookie, entity);
|
||||
for (list<watch_info_t>::iterator i = ctx->watch_connects.begin();
|
||||
i != ctx->watch_connects.end();
|
||||
++i) {
|
||||
pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
|
||||
dout(15) << "do_osd_op_effects applying watch connect on session "
|
||||
<< session.get() << " watcher " << watcher << dendl;
|
||||
WatchRef watch;
|
||||
@ -3272,8 +3271,8 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
|
||||
dout(15) << "do_osd_op_effects new watcher " << watcher
|
||||
<< dendl;
|
||||
watch = Watch::makeWatchRef(
|
||||
this, osd, ctx->obc, ctx->watch_info.timeout_seconds,
|
||||
ctx->watch_info.cookie, entity);
|
||||
this, osd, ctx->obc, i->timeout_seconds,
|
||||
i->cookie, entity);
|
||||
ctx->obc->watchers.insert(
|
||||
make_pair(
|
||||
watcher,
|
||||
@ -3282,8 +3281,10 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
|
||||
watch->connect(conn);
|
||||
}
|
||||
|
||||
if (ctx->watch_disconnect) {
|
||||
pair<uint64_t, entity_name_t> watcher(ctx->watch_info.cookie, entity);
|
||||
for (list<watch_info_t>::iterator i = ctx->watch_disconnects.begin();
|
||||
i != ctx->watch_disconnects.end();
|
||||
++i) {
|
||||
pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
|
||||
dout(15) << "do_osd_op_effects applying watch disconnect on session "
|
||||
<< session.get() << " and watcher " << watcher << dendl;
|
||||
if (ctx->obc->watchers.count(watcher)) {
|
||||
|
@ -260,8 +260,8 @@ public:
|
||||
bool user_modify; // user-visible modification
|
||||
|
||||
// side effects
|
||||
bool watch_connect, watch_disconnect;
|
||||
watch_info_t watch_info;
|
||||
list<watch_info_t> watch_connects;
|
||||
list<watch_info_t> watch_disconnects;
|
||||
list<notify_info_t> notifies;
|
||||
struct NotifyAck {
|
||||
boost::optional<uint64_t> watch_cookie;
|
||||
@ -304,7 +304,6 @@ public:
|
||||
op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0),
|
||||
new_obs(_obs->oi, _obs->exists),
|
||||
modify(false), user_modify(false),
|
||||
watch_connect(false), watch_disconnect(false),
|
||||
bytes_written(0), bytes_read(0),
|
||||
obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg) {
|
||||
if (_ssc) {
|
||||
|
Loading…
Reference in New Issue
Block a user