From d43d5d9ff0bd4b39393685feb69a3b80edf59c09 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Sat, 11 Feb 2012 17:53:47 -0800 Subject: [PATCH 1/8] ReplicatedPG: is_degraded may return true for backfill If is_degraded returns true for backfill, the object may not be in any replica's missing set. Only call start_recovery_op if we actually started an op. This bug could cause a stuck in backfill error. Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index e42c317af07..4d720cc4a80 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5460,13 +5460,16 @@ int ReplicatedPG::recover_object_replicas(const hobject_t& soid, eversion_t v) dout(10) << " ondisk_read_lock for " << soid << dendl; obc->ondisk_read_lock(); - start_recovery_op(soid); - // who needs it? + bool started = false; for (unsigned i=1; i Date: Sat, 11 Feb 2012 17:52:13 -0800 Subject: [PATCH 2/8] ReplicatedPG: add debugging for in flight backfill ops Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 4d720cc4a80..0212e4f8ff4 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5674,6 +5674,14 @@ int ReplicatedPG::recover_backfill(int max) push_backfill_object(i->first, i->second.first, i->second.second, backfill_target); } + dout(5) << "backfill_pos is " << backfill_pos << " and pinfo.last_backfill is " + << pinfo.last_backfill << dendl; + for (set::iterator i = backfills_in_flight.begin(); + i != backfills_in_flight.end(); + ++i) { + dout(20) << *i << " is still in flight" << dendl; + } + hobject_t bound = backfills_in_flight.size() ? *(backfills_in_flight.begin()) : backfill_pos; if (bound > pinfo.last_backfill) { From f80e0c715beb414ed4427fb54f2a7b781dadc4e9 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Sat, 11 Feb 2012 17:50:49 -0800 Subject: [PATCH 3/8] ReplicatedPG: consider backfill_pos to be degraded A write may trigger via make_writeable the creation of a clone which sorts before the object being written. Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 0212e4f8ff4..8f2974ca245 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -137,6 +137,12 @@ bool ReplicatedPG::is_degraded_object(const hobject_t& soid) peer_missing[peer].missing.count(soid)) return true; + // If soid == backfill_pos, we may implicitly write to + // the largest snap of soid for make_writeable. + if (peer == backfill_target && + backfill_pos == soid) + return true; + // Object is degraded if after last_backfill AND // we have are backfilling it if (peer == backfill_target && @@ -5620,6 +5626,13 @@ int ReplicatedPG::recover_backfill(int max) if (pbi.begin < backfill_info.begin) { dout(20) << " removing peer " << pbi.begin << dendl; to_remove[pbi.begin] = pbi.objects.begin()->second; + // Object was degraded, but won't be recovered + if (waiting_for_degraded_object.count(pbi.begin)) { + osd->requeue_ops( + this, + waiting_for_degraded_object[pbi.begin]); + waiting_for_degraded_object.erase(pbi.begin); + } pbi.pop_front(); // Don't increment ops here because deletions // are cheap and not replied to unlike real recovery_ops, @@ -5636,6 +5649,13 @@ int ReplicatedPG::recover_backfill(int max) } else { dout(20) << " keeping peer " << pbi.begin << " " << pbi.objects.begin()->second << dendl; + // Object was degraded, but won't be recovered + if (waiting_for_degraded_object.count(pbi.begin)) { + osd->requeue_ops( + this, + waiting_for_degraded_object[pbi.begin]); + waiting_for_degraded_object.erase(pbi.begin); + } } add_to_stat.insert(pbi.begin); backfill_info.pop_front(); From 2476dd7127d710e6ff5f16797f48725888e6e398 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 7 Feb 2012 16:35:04 -0800 Subject: [PATCH 4/8] MOSDSubOp: Add new object recovery state Signed-off-by: Samuel Just --- src/messages/MOSDSubOp.h | 23 +++++++++++-- src/osd/osd_types.cc | 74 ++++++++++++++++++++++++++++++++++++++++ src/osd/osd_types.h | 30 ++++++++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index fdf03e27eac..ed8a2ffc3ed 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -25,7 +25,7 @@ class MOSDSubOp : public Message { - static const int HEAD_VERSION = 3; + static const int HEAD_VERSION = 4; static const int COMPAT_VERSION = 1; public: @@ -71,6 +71,15 @@ public: bool first, complete; + interval_set data_included; + ObjectRecoveryInfo recovery_info; + + // reflects result of current push + ObjectRecoveryProgress recovery_progress; + + // reflects progress before current push + ObjectRecoveryProgress current_progress; + virtual void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(map_epoch, p); @@ -101,6 +110,7 @@ public: ::decode(pg_trim_to, p); ::decode(peer_stat, p); ::decode(attrset, p); + ::decode(data_subset, p); ::decode(clone_subsets, p); @@ -110,6 +120,12 @@ public: } if (header.version >= 3) ::decode(oloc, p); + if (header.version >= 4) { + ::decode(data_included, p); + ::decode(recovery_info, p); + ::decode(recovery_progress, p); + ::decode(current_progress, p); + } } virtual void encode_payload(uint64_t features) { @@ -148,9 +164,12 @@ public: ::encode(first, payload); ::encode(complete, payload); ::encode(oloc, payload); + ::encode(data_included, payload); + ::encode(recovery_info, payload); + ::encode(recovery_progress, payload); + ::encode(current_progress, payload); } - MOSDSubOp() : Message(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION) { } MOSDSubOp(osd_reqid_t r, pg_t p, const hobject_t& po, bool noop_, int aw, diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index a54910007ea..7b458a86566 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -2000,6 +2000,80 @@ ostream& operator<<(ostream& out, const object_info_t& oi) return out; } +// -- ObjectRecovery -- +void ObjectRecoveryProgress::encode(bufferlist &bl) const +{ + ::encode(first, bl); + ::encode(data_complete, bl); + ::encode(data_recovered_to, bl); + ::encode(omap_recovered_to, bl); + ::encode(omap_complete, bl); +} + +void ObjectRecoveryProgress::decode(bufferlist::iterator &bl) +{ + ::decode(first, bl); + ::decode(data_complete, bl); + ::decode(data_recovered_to, bl); + ::decode(omap_recovered_to, bl); + ::decode(omap_complete, bl); +} + +ostream &operator<<(ostream &out, const ObjectRecoveryProgress &prog) +{ + return prog.print(out); +} + +ostream &ObjectRecoveryProgress::print(ostream &out) const +{ + return out << "ObjectRecoveryProgress(" + << ( first ? "" : "!" ) << "first, " + << "data_recovered_to:" << data_recovered_to + << ", data_complete:" << ( data_complete ? "true" : "false" ) + << ", omap_recovered_to:" << omap_recovered_to + << ", omap_complete:" << ( omap_complete ? "true" : "false" ) + << ")"; +} + +void ObjectRecoveryInfo::encode(bufferlist &bl) const +{ + __u8 v = 0; + ::encode(v, bl); + ::encode(soid, bl); + ::encode(version, bl); + ::encode(size, bl); + ::encode(oi, bl); + ::encode(ss, bl); + ::encode(copy_subset, bl); + ::encode(clone_subset, bl); +} + +void ObjectRecoveryInfo::decode(bufferlist::iterator &bl) +{ + __u8 v; + ::decode(v, bl); + ::decode(soid, bl); + ::decode(version, bl); + ::decode(size, bl); + ::decode(oi, bl); + ::decode(ss, bl); + ::decode(copy_subset, bl); + ::decode(clone_subset, bl); +} + +ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf) +{ + return inf.print(out); +} + +ostream &ObjectRecoveryInfo::print(ostream &out) const +{ + return out << "ObjectRecoveryInfo(" + << soid << "@" << version + << ", copy_subset: " << copy_subset + << ", clone_subset: " << clone_subset + << ")"; +} // -- ScrubMap -- diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 165e9d452f2..342e63c630e 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1612,7 +1612,37 @@ WRITE_CLASS_ENCODER(object_info_t) ostream& operator<<(ostream& out, const object_info_t& oi); +// Object recovery +struct ObjectRecoveryProgress { + bool first; + uint64_t data_recovered_to; + bool data_complete; + string omap_recovered_to; + bool omap_complete; + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + ostream &print(ostream &out) const; +}; +WRITE_CLASS_ENCODER(ObjectRecoveryProgress) +ostream& operator<<(ostream& out, const ObjectRecoveryProgress &prog); + + +struct ObjectRecoveryInfo { + hobject_t soid; + eversion_t version; + uint64_t size; + object_info_t oi; + SnapSet ss; + interval_set copy_subset; + map > clone_subset; + + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + ostream &print(ostream &out) const; +}; +WRITE_CLASS_ENCODER(ObjectRecoveryInfo) +ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf); /* * summarize pg contents for purposes of a scrub From 1bf037bf76d0c3f85421657a5aaf8b7478937e02 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 13 Feb 2012 11:49:42 -0800 Subject: [PATCH 5/8] ReplicatedPG: refactor push and pull Now, push progress is represented by ObjectRecoveryProgress. In particular, rather than tracking data_subset_*ing, we track the furthest offset before which the data will be consistent once cloning is complete. sub_op_push now separates the pull response implementation from the replica push implementation. Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 1001 ++++++++++++++++++------------------- src/osd/ReplicatedPG.h | 69 +-- src/osd/osd_types.cc | 66 ++- src/osd/osd_types.h | 4 + src/test/encoding/types.h | 2 + 5 files changed, 603 insertions(+), 539 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 8f2974ca245..60cee021f4f 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -107,7 +107,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op) assert(g != missing.missing.end()); const eversion_t &v(g->second.need); - map::const_iterator p = pulling.find(soid); + map::const_iterator p = pulling.find(soid); if (p != pulling.end()) { dout(7) << "missing " << soid << " v " << v << ", already pulling." << dendl; } @@ -3971,8 +3971,7 @@ int ReplicatedPG::pull(const hobject_t& soid, eversion_t v) << " from osd." << fromosd << dendl; - map > clone_subsets; - interval_set data_subset; + ObjectRecoveryInfo recovery_info; bool need_size = false; // is this a snapped object? if so, consult the snapset.. we may not need the entire object! @@ -4008,66 +4007,37 @@ int ReplicatedPG::pull(const hobject_t& soid, eversion_t v) SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false); dout(10) << " snapset " << ssc->snapset << dendl; calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill, - data_subset, clone_subsets); + recovery_info.copy_subset, + recovery_info.clone_subset); put_snapset_context(ssc); // FIXME: this may overestimate if we are pulling multiple clones in parallel... - dout(10) << " pulling " << data_subset << ", will clone " << clone_subsets - << dendl; + dout(10) << " pulling " << recovery_info << dendl; } else { // pulling head or unversioned object. // always pull the whole thing. need_size = true; - data_subset.insert(0, (uint64_t)-1); + recovery_info.copy_subset.insert(0, (uint64_t)-1); + recovery_info.size = ((uint64_t)-1); } - // only pull so much at a time - interval_set pullsub; - pullsub.span_of(data_subset, 0, g_conf->osd_recovery_max_chunk); - // take note - assert(pulling.count(soid) == 0); + recovery_info.soid = soid; + recovery_info.version = v; + ObjectRecoveryProgress progress; + progress.data_complete = false; + progress.data_recovered_to = 0; + progress.first = true; + assert(!pulling.count(soid)); pull_from_peer[fromosd].insert(soid); - pull_info_t& p = pulling[soid]; - p.version = v; - p.from = fromosd; - p.data_subset = data_subset; - p.data_subset_pulling = pullsub; - p.need_size = need_size; + PullInfo &pi = pulling[soid]; + pi.recovery_info = recovery_info; + pi.recovery_progress = progress; + send_pull(fromosd, recovery_info, progress); - send_pull_op(soid, v, true, p.data_subset_pulling, fromosd); - start_recovery_op(soid); return PULL_YES; } -void ReplicatedPG::send_pull_op(const hobject_t& soid, eversion_t v, bool first, - const interval_set& data_subset, int fromosd) -{ - // send op - tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); - - dout(10) << "send_pull_op " << soid << " " << v - << " first=" << first - << " data " << data_subset << " from osd." << fromosd - << " tid " << tid << dendl; - - MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK, - get_osdmap()->get_epoch(), tid, v); - subop->ops = vector(1); - subop->ops[0].op.op = CEPH_OSD_OP_PULL; - subop->data_subset = data_subset; - subop->first = first; - - // do not include clone_subsets in pull request; we will recalculate this - // when the object is pushed back. - //subop->clone_subsets.swap(clone_subsets); - - osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(fromosd)); - - osd->logger->inc(l_osd_pull); -} - void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) { tid_t tid = osd->get_tid(); @@ -4111,7 +4081,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in map > clone_subsets; if (size) clone_subsets[head].insert(0, size); - push_start(soid, peer, size, oi.version, data_subset, clone_subsets); + push_start(obc, soid, peer, oi.version, data_subset, clone_subsets); return; } @@ -4119,13 +4089,13 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in // we need the head (and current SnapSet) locally to do that. if (missing.is_missing(head)) { dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl; - return push_start(soid, peer); + return push_start(obc, soid, peer); } hobject_t snapdir = head; snapdir.snap = CEPH_SNAPDIR; if (missing.is_missing(snapdir)) { dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl; - return push_start(soid, peer); + return push_start(obc, soid, peer); } SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false); @@ -4145,117 +4115,439 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in put_snapset_context(ssc); } - push_start(soid, peer, size, oi.version, data_subset, clone_subsets); + push_start(obc, soid, peer, oi.version, data_subset, clone_subsets); } -void ReplicatedPG::push_start(const hobject_t& soid, int peer) +void ReplicatedPG::push_start(ObjectContext *obc, + const hobject_t& soid, int peer) { - struct stat st; - int r = osd->store->stat(coll, soid, &st); - assert(r == 0); - uint64_t size = st.st_size; - - bufferlist bl; - r = osd->store->getattr(coll, soid, OI_ATTR, bl); - object_info_t oi(bl); - interval_set data_subset; + data_subset.insert(0, obc->obs.oi.size); map > clone_subsets; - data_subset.insert(0, size); - push_start(soid, peer, size, oi.version, data_subset, clone_subsets); + push_start(obc, soid, peer, obc->obs.oi.version, data_subset, clone_subsets); } -void ReplicatedPG::push_start(const hobject_t& soid, int peer, - uint64_t size, eversion_t version, - interval_set &data_subset, - map >& clone_subsets) +void ReplicatedPG::push_start( + ObjectContext *obc, + const hobject_t& soid, int peer, + eversion_t version, + interval_set &data_subset, + map >& clone_subsets) { // take note. - push_info_t *pi = &pushing[soid][peer]; - pi->size = size; - pi->version = version; - pi->data_subset = data_subset; - pi->clone_subsets = clone_subsets; + PushInfo &pi = pushing[soid][peer]; + pi.recovery_info.size = obc->obs.oi.size; + pi.recovery_info.copy_subset = data_subset; + pi.recovery_info.clone_subset = clone_subsets; + pi.recovery_info.soid = soid; + pi.recovery_info.oi = obc->obs.oi; + pi.recovery_info.version = version; + pi.recovery_progress.first = true; + pi.recovery_progress.data_recovered_to = 0; + pi.recovery_progress.data_complete = 0; - pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf->osd_recovery_max_chunk); - bool complete = pi->data_subset_pushing == pi->data_subset; - - dout(10) << "push_start " << soid << " size " << size << " data " << data_subset - << " cloning " << clone_subsets << dendl; - send_push_op(soid, version, peer, size, true, complete, pi->data_subset_pushing, pi->clone_subsets); + ObjectRecoveryProgress new_progress; + send_push(peer, pi.recovery_info, pi.recovery_progress, &new_progress); + pi.recovery_progress = new_progress; } - -/* - * push - send object to a peer - */ - -int ReplicatedPG::send_push_op(const hobject_t& soid, eversion_t version, int peer, - uint64_t size, bool first, bool complete, - interval_set &data_subset, - map >& clone_subsets) +int ReplicatedPG::send_pull(int peer, + ObjectRecoveryInfo recovery_info, + ObjectRecoveryProgress progress) { - // read data+attrs - bufferlist bl; - map attrset; - - for (interval_set::iterator p = data_subset.begin(); - p != data_subset.end(); - ++p) { - bufferlist bit; - osd->store->read(coll, - soid, p.get_start(), p.get_len(), bit); - if (p.get_len() != bit.length()) { - dout(10) << " extent " << p.get_start() << "~" << p.get_len() - << " is actually " << p.get_start() << "~" << bit.length() << dendl; - p.set_len(bit.length()); - } - bl.claim_append(bit); - } - - osd->store->getattrs(coll, soid, attrset); - - bufferlist bv; - bv.push_back(attrset[OI_ATTR]); - object_info_t oi(bv); - - if (oi.version != version) { - osd->clog.error() << info.pgid << " push " << soid << " v " << version << " to osd." << peer - << " failed because local copy is " << oi.version << "\n"; - return -1; - } - - // ok - dout(7) << "send_push_op " << soid << " v " << oi.version - << " size " << size - << " subset " << data_subset - << " data " << bl.length() - << " to osd." << peer - << dendl; - - osd->logger->inc(l_osd_push); - osd->logger->inc(l_osd_push_outb, bl.length()); - - // send + // send op tid_t tid = osd->get_tid(); osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); - MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0, - get_osdmap()->get_epoch(), tid, oi.version); - subop->oloc = oi.oloc; + + dout(10) << "send_pull_op " << recovery_info.soid << " " + << recovery_info.version + << " first=" << progress.first + << " data " << recovery_info.copy_subset + << " from osd." << peer + << " tid " << tid << dendl; + + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid, + false, CEPH_OSD_FLAG_ACK, + get_osdmap()->get_epoch(), tid, + recovery_info.version); + subop->ops = vector(1); + subop->ops[0].op.op = CEPH_OSD_OP_PULL; + subop->recovery_info = recovery_info; + subop->recovery_progress = progress; + + osd->cluster_messenger->send_message(subop, + get_osdmap()->get_cluster_inst(peer)); + + osd->logger->inc(l_osd_pull); + return 0; +} + +void ReplicatedPG::submit_push_data( + const ObjectRecoveryInfo &recovery_info, + bool first, + const interval_set &intervals_included, + bufferlist data_included, + map &attrs, + ObjectStore::Transaction *t) +{ + if (first) { + t->remove(coll_t::TEMP_COLL, recovery_info.soid); + t->touch(coll_t::TEMP_COLL, recovery_info.soid); + } + uint64_t off = 0; + for (interval_set::const_iterator p = intervals_included.begin(); + p != intervals_included.end(); + ++p) { + bufferlist bit; + bit.substr_of(data_included, off, p.get_len()); + t->write(coll_t::TEMP_COLL, recovery_info.soid, + p.get_start(), p.get_len(), bit); + off += p.get_len(); + } + + t->setattrs(coll_t::TEMP_COLL, recovery_info.soid, + attrs); +} + +void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t) +{ + remove_object_with_snap_hardlinks(*t, recovery_info.soid); + t->collection_add(coll, coll_t::TEMP_COLL, recovery_info.soid); + t->collection_remove(coll_t::TEMP_COLL, recovery_info.soid); + for (map >::const_iterator p = + recovery_info.clone_subset.begin(); + p != recovery_info.clone_subset.end(); + ++p) { + for (interval_set::const_iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + dout(15) << " clone_range " << p->first << " " + << q.get_start() << "~" << q.get_len() << dendl; + t->clone_range(coll, p->first, recovery_info.soid, + q.get_start(), q.get_len(), q.get_start()); + } + } + + if (recovery_info.soid.snap < CEPH_NOSNAP) { + if (recovery_info.oi.snaps.size()) { + coll_t lc = make_snap_collection(*t, + recovery_info.oi.snaps[0]); + t->collection_add(lc, coll, recovery_info.soid); + if (recovery_info.oi.snaps.size() > 1) { + coll_t hc = make_snap_collection( + *t, + recovery_info.oi.snaps[recovery_info.oi.snaps.size()-1]); + t->collection_add(hc, coll, recovery_info.soid); + } + } + } + + if (missing.is_missing(recovery_info.soid) && + missing.missing[recovery_info.soid].need > recovery_info.version) { + assert(is_primary()); + pg_log_entry_t *latest = log.objects[recovery_info.soid]; + if (latest->op == pg_log_entry_t::LOST_REVERT && + latest->prior_version == recovery_info.version) { + dout(10) << " got old revert version " << recovery_info.version + << " for " << *latest << dendl; + recovery_info.version = latest->version; + // update the attr to the revert event version + recovery_info.oi.prior_version = recovery_info.oi.version; + recovery_info.oi.version = latest->version; + bufferlist bl; + ::encode(recovery_info.oi, bl); + t->setattr(coll, recovery_info.soid, OI_ATTR, bl); + } + } + recover_got(recovery_info.soid, recovery_info.version); + + // update pg + write_info(*t); +} + +ObjectRecoveryInfo ReplicatedPG::recalc_subsets(ObjectRecoveryInfo recovery_info) +{ + if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP) + return recovery_info; + + SnapSetContext *ssc = get_snapset_context(recovery_info.soid.oid, + recovery_info.soid.get_key(), + recovery_info.soid.hash, + false); + ObjectRecoveryInfo new_info = recovery_info; + new_info.copy_subset.clear(); + new_info.clone_subset.clear(); + assert(ssc); + calc_clone_subsets(ssc->snapset, new_info.soid, missing, info.last_backfill, + new_info.copy_subset, new_info.clone_subset); + put_snapset_context(ssc); + return new_info; +} + +void ReplicatedPG::handle_pull_response(OpRequest *op) +{ + MOSDSubOp *m = (MOSDSubOp *)op->request; + bufferlist data; + m->claim_data(data); + interval_set data_included = m->data_included; + dout(10) << "handle_push " + << m->recovery_info + << m->recovery_progress + << " data.size() is " << data.length() + << " data_included: " << data_included + << dendl; + if (m->version == eversion_t()) { + // replica doesn't have it! + _failed_push(op); + return; + } + + hobject_t &hoid = m->recovery_info.soid; + assert((data_included.empty() && data.length() == 0) || + (!data_included.empty() && data.length() > 0)); + + + if (!pulling.count(hoid)) { + return; + } + + PullInfo &pi = pulling[hoid]; + if (pi.recovery_info.size == (uint64_t(-1))) { + pi.recovery_info.size = m->recovery_info.size; + pi.recovery_info.copy_subset.intersection_of( + m->recovery_info.copy_subset); + } + + pi.recovery_info = recalc_subsets(pi.recovery_info); + + interval_set usable_intervals; + bufferlist usable_data; + trim_pushed_data(pi.recovery_info.copy_subset, + data_included, + data, + &usable_intervals, + &usable_data); + data_included = usable_intervals; + data.claim(usable_data); + + bool first = pi.recovery_progress.first; + pi.recovery_progress = m->recovery_progress; + + dout(10) << "new recovery_info: " + << pi.recovery_info + << ", new progress " << pi.recovery_progress + << dendl; + + if (first) { + bufferlist oibl; + if (m->attrset.count(OI_ATTR)) { + oibl.push_back(m->attrset[OI_ATTR]); + ::decode(pi.recovery_info.oi, oibl); + } else { + assert(0); + } + bufferlist ssbl; + if (m->attrset.count(SS_ATTR)) { + ssbl.push_back(m->attrset[SS_ATTR]); + ::decode(pi.recovery_info.ss, ssbl); + } else { + assert(pi.recovery_info.soid.snap != CEPH_NOSNAP && + pi.recovery_info.soid.snap != CEPH_SNAPDIR); + } + } + + bool complete = pi.recovery_progress.data_recovered_to >= + (pi.recovery_info.copy_subset.empty() ? + 0 : pi.recovery_info.copy_subset.range_end()); + + if (complete && !pi.recovery_progress.data_complete) { + dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" + << dendl; + _failed_push(op); + return; + } + + ObjectStore::Transaction *t = new ObjectStore::Transaction; + Context *onreadable = 0; + Context *onreadable_sync = 0; + submit_push_data(pi.recovery_info, first, + data_included, data, m->attrset, + t); + + if (complete) { + submit_push_complete(pi.recovery_info, t); + + ObjectContext *obc = get_object_context(hoid, + pi.recovery_info.oi.oloc, + true); + obc->ondisk_write_lock(); + obc->obs.exists = true; + obc->obs.oi = pi.recovery_info.oi; + + if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) { + obc->ssc->snapset = pi.recovery_info.ss; + } + + onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc); + onreadable_sync = new C_OSD_OndiskWriteUnlock(obc); + } else { + onreadable = new ObjectStore::C_DeleteTransaction(t); + } + + int r = osd->store-> + queue_transaction(&osr, t, + onreadable, + new C_OSD_CommittedPushedObject(this, op, + info.history.same_interval_since, + info.last_complete), + onreadable_sync); + assert(r == 0); + + if (complete) { + finish_recovery_op(hoid); + pulling.erase(hoid); + pull_from_peer[m->get_source().num()].erase(hoid); + update_stats(); + if (waiting_for_missing_object.count(hoid)) { + dout(20) << " kicking waiters on " << hoid << dendl; + osd->requeue_ops(this, waiting_for_missing_object[hoid]); + waiting_for_missing_object.erase(hoid); + if (missing.missing.size() == 0) { + osd->requeue_ops(this, waiting_for_all_missing); + waiting_for_all_missing.clear(); + } + } + } else { + send_pull(m->get_source().num(), pi.recovery_info, pi.recovery_progress); + } +} + +void ReplicatedPG::handle_push(OpRequest *op) +{ + MOSDSubOp *m = (MOSDSubOp *)op->request; + dout(10) << "handle_push " + << m->recovery_info + << m->recovery_progress + << dendl; + bufferlist data; + m->claim_data(data); + bool first = m->current_progress.first; + bool complete = m->recovery_progress.data_complete; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + Context *onreadable = new ObjectStore::C_DeleteTransaction(t); + Context *onreadable_sync = 0; + submit_push_data(m->recovery_info, + first, + m->data_included, + data, + m->attrset, + t); + if (complete) + submit_push_complete(m->recovery_info, + t); + + int r = osd->store-> + queue_transaction(&osr, t, + onreadable, + new C_OSD_CommittedPushedObject( + this, op, + info.history.same_interval_since, + info.last_complete), + onreadable_sync); + assert(r == 0); + + MOSDSubOpReply *reply = new MOSDSubOpReply( + m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); + osd->cluster_messenger->send_message(reply, m->get_connection()); +} + +int ReplicatedPG::send_push(int peer, + ObjectRecoveryInfo recovery_info, + ObjectRecoveryProgress progress, + ObjectRecoveryProgress *out_progress) +{ + ObjectRecoveryProgress new_progress = progress; + + tid_t tid = osd->get_tid(); + osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid, + false, 0, get_osdmap()->get_epoch(), + tid, recovery_info.version); + + dout(7) << "send_push_op " << recovery_info.soid + << " v " << recovery_info.version + << " size " << recovery_info.size + << " to osd." << peer + << " recovery_info: " << recovery_info + << dendl; + subop->ops = vector(1); subop->ops[0].op.op = CEPH_OSD_OP_PUSH; - //subop->ops[0].op.extent.offset = 0; - //subop->ops[0].op.extent.length = size; - subop->ops[0].indata = bl; - subop->data_subset = data_subset; - subop->clone_subsets = clone_subsets; - subop->attrset.swap(attrset); - subop->old_size = size; - subop->first = first; - subop->complete = complete; + + if (progress.first) { + osd->store->getattrs(coll, recovery_info.soid, subop->attrset); + + // Debug + bufferlist bv; + bv.push_back(subop->attrset[OI_ATTR]); + object_info_t oi(bv); + + if (oi.version != recovery_info.version) { + osd->clog.error() << info.pgid << " push " + << recovery_info.soid << " v " + << recovery_info.version << " to osd." << peer + << " failed because local copy is " + << oi.version << "\n"; + subop->put(); + return -1; + } + + new_progress.first = false; + } + + + subop->data_included.span_of(recovery_info.copy_subset, + progress.data_recovered_to, + g_conf->osd_recovery_max_chunk); + + for (interval_set::iterator p = subop->data_included.begin(); + p != subop->data_included.end(); + ++p) { + bufferlist bit; + osd->store->read(coll, recovery_info.soid, + p.get_start(), p.get_len(), bit); + if (p.get_len() != bit.length()) { + dout(10) << " extent " << p.get_start() << "~" << p.get_len() + << " is actually " << p.get_start() << "~" << bit.length() + << dendl; + p.set_len(bit.length()); + new_progress.data_complete = true; + } + subop->ops[0].indata.claim_append(bit); + } + + if (!subop->data_included.empty()) + new_progress.data_recovered_to = subop->data_included.range_end(); + + if (recovery_info.copy_subset.empty() || + new_progress.data_recovered_to >= recovery_info.copy_subset.range_end()) + new_progress.data_complete = true; + + + osd->logger->inc(l_osd_push); + osd->logger->inc(l_osd_push_outb, subop->ops[0].indata.length()); + + // send + subop->recovery_info = recovery_info; + subop->recovery_progress = new_progress; + subop->current_progress = progress; osd->cluster_messenger-> send_message(subop, get_osdmap()->get_cluster_inst(peer)); + if (out_progress) + *out_progress = new_progress; return 0; } @@ -4292,27 +4584,22 @@ void ReplicatedPG::sub_op_push_reply(OpRequest *op) dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer << dendl; } else { - push_info_t *pi = &pushing[soid][peer]; + PushInfo *pi = &pushing[soid][peer]; - bool complete = false; - if (pi->data_subset.empty() || - pi->data_subset.range_end() == pi->data_subset_pushing.range_end()) - complete = true; - - if (!complete) { - // push more - uint64_t from = pi->data_subset_pushing.range_end(); - pi->data_subset_pushing.span_of(pi->data_subset, from, g_conf->osd_recovery_max_chunk); - dout(10) << " pushing more, " << pi->data_subset_pushing << " of " << pi->data_subset << dendl; - complete = pi->data_subset.range_end() == pi->data_subset_pushing.range_end(); - send_push_op(soid, pi->version, peer, pi->size, false, complete, - pi->data_subset_pushing, pi->clone_subsets); + if (!pi->recovery_progress.data_complete) { + dout(10) << " pushing more from, " + << pi->recovery_progress.data_recovered_to + << " of " << pi->recovery_info.copy_subset << dendl; + ObjectRecoveryProgress new_progress; + send_push( + peer, pi->recovery_info, pi->recovery_progress, &new_progress); + pi->recovery_progress = new_progress; } else { // done! if (peer == backfill_target && backfills_in_flight.count(soid)) backfills_in_flight.erase(soid); else - peer_missing[peer].got(soid, pi->version); + peer_missing[peer].got(soid, pi->recovery_info.version); pushing[soid].erase(peer); pi = NULL; @@ -4381,24 +4668,23 @@ void ReplicatedPG::sub_op_pull(OpRequest *op) << " but got " << cpp_strerror(-r) << "\n"; send_push_op_blank(soid, m->get_source().num()); } else { - uint64_t size = st.st_size; + ObjectRecoveryInfo recovery_info = m->recovery_info; + ObjectRecoveryProgress progress = m->recovery_progress; + if (progress.first && recovery_info.size == ((uint64_t)-1)) { + // Adjust size and copy_subset + recovery_info.size = st.st_size; + recovery_info.copy_subset.clear(); + if (st.st_size) + recovery_info.copy_subset.insert(0, st.st_size); + assert(recovery_info.clone_subset.empty()); + } - bool complete = false; - if (!m->data_subset.empty() && m->data_subset.range_end() >= size) - complete = true; - - // complete==true implies we are definitely complete. - // complete==false means nothing. we don't know because the primary may - // not be pulling the entire object. - - r = send_push_op(soid, m->version, m->get_source().num(), size, m->first, complete, - m->data_subset, m->clone_subsets); + r = send_push(m->get_source().num(), recovery_info, progress); if (r < 0) send_push_op_blank(soid, m->get_source().num()); } log_subop_stats(op, 0, l_osd_sop_pull_lat); - op->put(); } @@ -4478,353 +4764,54 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v) } } +void ReplicatedPG::trim_pushed_data( + const interval_set ©_subset, + const interval_set &intervals_received, + bufferlist data_received, + interval_set *intervals_usable, + bufferlist *data_usable) +{ + if (intervals_received.subset_of(copy_subset)) { + *intervals_usable = intervals_received; + *data_usable = data_received; + return; + } + + intervals_usable->intersection_of(copy_subset, + intervals_received); + + uint64_t off = 0; + for (interval_set::const_iterator p = intervals_received.begin(); + p != intervals_received.end(); + ++p) { + interval_set x; + x.insert(p.get_start(), p.get_len()); + x.intersection_of(copy_subset); + for (interval_set::const_iterator q = x.begin(); + q != x.end(); + ++q) { + bufferlist sub; + uint64_t data_off = off + (q.get_start() - p.get_start()); + sub.substr_of(data_received, data_off, q.get_len()); + data_usable->claim_append(sub); + } + off += p.get_len(); + } +} + /** op_push * NOTE: called from opqueue. */ void ReplicatedPG::sub_op_push(OpRequest *op) { - MOSDSubOp *m = (MOSDSubOp*)op->request; - assert(m->get_header().type == MSG_OSD_SUBOP); - - const hobject_t& soid = m->poid; - eversion_t v = m->version; - OSDOp& push = m->ops[0]; - - dout(7) << "op_push " - << soid - << " v " << v - << " " << m->oloc - << " len " << push.op.extent.length - << " data_subset " << m->data_subset - << " clone_subsets " << m->clone_subsets - << " data len " << m->get_data().length() - << dendl; - - if (v == eversion_t()) { - // replica doesn't have it! - _failed_push(op); - return; - } - op->mark_started(); - - interval_set data_subset; - map > clone_subsets; - - bufferlist data; - m->claim_data(data); - - // we need these later, and they get clobbered by t.setattrs() - bufferlist oibl; - if (m->attrset.count(OI_ATTR)) - oibl.push_back(m->attrset[OI_ATTR]); - bufferlist ssbl; - if (m->attrset.count(SS_ATTR)) - ssbl.push_back(m->attrset[SS_ATTR]); - - // determine data/clone subsets - data_subset = m->data_subset; - if (data_subset.empty() && push.op.extent.length && push.op.extent.length == data.length()) - data_subset.insert(0, push.op.extent.length); - clone_subsets = m->clone_subsets; - - pull_info_t *pi = 0; - bool first = m->first; - bool complete = m->complete; - - // op->complete == true means we reached the end of the object (file size) - // op->complete == false means nothing; we may not have asked for the whole thing. - if (is_primary()) { - if (pulling.count(soid) == 0) { - dout(10) << " not pulling, ignoring" << dendl; - op->put(); - return; - } - pi = &pulling[soid]; - - // did we learn object size? - if (pi->need_size) { - dout(10) << " learned object size is " << m->old_size << dendl; - pi->data_subset.erase(m->old_size, (uint64_t)-1 - m->old_size); - pi->need_size = false; - } - - if (soid.snap && soid.snap < CEPH_NOSNAP) { - // clone. make sure we have enough data. - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false); - assert(ssc); - - clone_subsets.clear(); // forget what pusher said; recalculate cloning. - - interval_set data_needed; - calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill, - data_needed, clone_subsets); - pi->data_subset = data_needed; - put_snapset_context(ssc); - - interval_set overlap; - overlap.intersection_of(data_subset, data_needed); - - dout(10) << "sub_op_push need " << data_needed << ", got " << data_subset - << ", overlap " << overlap << dendl; - - // did we get more data than we need? - if (!data_subset.subset_of(data_needed)) { - interval_set extra = data_subset; - interval_set usable; - usable.intersection_of(extra, data_needed); - extra.subtract(usable); - dout(10) << " we got some extra: " << extra << dendl; - - bufferlist result; - int off = 0; - for (interval_set::const_iterator p = usable.begin(); - p != usable.end(); - ++p) { - interval_set x; - x.insert(p.get_start(), p.get_len()); - x.intersection_of(data_needed); - dout(20) << " data_subset object extent " << p.get_start() << "~" << p.get_len() << " need " << x << dendl; - if (!x.empty()) { - uint64_t first = x.begin().get_start(); - uint64_t len = x.begin().get_len(); - bufferlist sub; - int boff = off + (first - p.get_start()); - dout(20) << " keeping buffer extent " << boff << "~" << len << dendl; - sub.substr_of(data, boff, len); - result.claim_append(sub); - } - off += p.get_len(); - } - data.claim(result); - data_subset.intersection_of(data_needed); - dout(20) << " new data len is " << data.length() << dendl; - } - - // did we get everything we wanted? - if (pi->data_subset.empty()) { - complete = true; - } else if (data_subset.empty()) { - complete = false; - } else { - complete = pi->data_subset.range_end() == data_subset.range_end(); - } - - if (m->complete && !complete) { - dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" << dendl; - _failed_push(op); - return; - } - - } else { - // head|unversioned. for now, primary will _only_ pull data copies of the head (no cloning) - assert(m->clone_subsets.empty()); - } - } - dout(15) << " data_subset " << data_subset - << " clone_subsets " << clone_subsets - << " first=" << first << " complete=" << complete - << dendl; - - coll_t target; - if (first && complete) - target = coll; - else - target = coll_t::TEMP_COLL; - - // write object and add it to the PG - ObjectStore::Transaction *t = new ObjectStore::Transaction; - Context *onreadable = 0; - Context *onreadable_sync = 0; - - if (first && complete && soid.snap != CEPH_NOSNAP) - remove_object_with_snap_hardlinks(*t, soid); - else if (first) - t->remove(target, soid); // in case old version exists - - // write data - uint64_t boff = 0; - for (interval_set::const_iterator p = data_subset.begin(); - p != data_subset.end(); - ++p) { - bufferlist bit; - bit.substr_of(data, boff, p.get_len()); - dout(15) << " write " << p.get_start() << "~" << p.get_len() << dendl; - t->write(target, soid, p.get_start(), p.get_len(), bit); - boff += p.get_len(); - } - - if (complete) { - // Clear out old snapdir contents - if (!first) { - if (soid.snap != CEPH_NOSNAP) { - remove_object_with_snap_hardlinks(*t, soid); - } else { - t->remove(coll, soid); - } - t->collection_add(coll, target, soid); - t->collection_remove(target, soid); - } - - // clone bits - for (map >::const_iterator p = clone_subsets.begin(); - p != clone_subsets.end(); - ++p) - { - for (interval_set::const_iterator q = p->second.begin(); - q != p->second.end(); - ++q) - { - dout(15) << " clone_range " << p->first << " " - << q.get_start() << "~" << q.get_len() << dendl; - t->clone_range(coll, p->first, soid, - q.get_start(), q.get_len(), q.get_start()); - } - } - - if (data_subset.empty()) - t->touch(coll, soid); - - t->setattrs(coll, soid, m->attrset); - if (soid.snap && soid.snap < CEPH_NOSNAP && - m->attrset.count(OI_ATTR)) { - bufferlist bl; - bl.push_back(m->attrset[OI_ATTR]); - object_info_t oi(bl); - if (oi.snaps.size()) { - coll_t lc = make_snap_collection(*t, oi.snaps[0]); - t->collection_add(lc, coll, soid); - if (oi.snaps.size() > 1) { - coll_t hc = make_snap_collection(*t, oi.snaps[oi.snaps.size()-1]); - t->collection_add(hc, coll, soid); - } - } - } - - bool revert = false; - if (missing.is_missing(soid) && missing.missing[soid].need > v) { - pg_log_entry_t *latest = log.objects[soid]; - if (latest->op == pg_log_entry_t::LOST_REVERT && - latest->prior_version == v) { - dout(10) << " got old revert version " << v << " for " << *latest << dendl; - revert = true; - v = latest->version; - } - } - - recover_got(soid, v); - - // update pg - write_info(*t); - - // track ObjectContext - if (is_primary()) { - dout(10) << " setting up obc for " << soid << dendl; - ObjectContext *obc = get_object_context(soid, m->oloc, true); - assert(obc->registered); - obc->ondisk_write_lock(); - - obc->obs.exists = true; - obc->obs.oi.decode(oibl); - - if (revert) { - // update the attr to the revert event version - obc->obs.oi.prior_version = obc->obs.oi.version; - obc->obs.oi.version = v; - bufferlist bl; - ::encode(obc->obs.oi, bl); - t->setattr(coll, soid, OI_ATTR, bl); - } - - // suck in snapset context? - SnapSetContext *ssc = obc->ssc; - if (ssbl.length()) { - bufferlist::iterator sp = ssbl.begin(); - ssc->snapset.decode(sp); - } - - onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc); - onreadable_sync = new C_OSD_OndiskWriteUnlock(obc); - } else { - onreadable = new ObjectStore::C_DeleteTransaction(t); - } - + handle_pull_response(op); } else { - onreadable = new ObjectStore::C_DeleteTransaction(t); + handle_push(op); } - - // apply to disk! - int r = osd->store->queue_transaction(&osr, t, - onreadable, - new C_OSD_CommittedPushedObject(this, op, - info.history.same_interval_since, - info.last_complete), - onreadable_sync); - assert(r == 0); - - if (is_primary()) { - assert(pi); - - if (complete) { - // close out pull op - pulling.erase(soid); - pull_from_peer[pi->from].erase(soid); - finish_recovery_op(soid); - - update_stats(); - } else { - // pull more - pi->data_subset_pulling.span_of(pi->data_subset, data_subset.empty() ? 0 : data_subset.range_end(), - g_conf->osd_recovery_max_chunk); - dout(10) << " pulling more, " << pi->data_subset_pulling << " of " << pi->data_subset << dendl; - send_pull_op(soid, v, false, pi->data_subset_pulling, pi->from); - } - - - /* - if (is_active()) { - // are others missing this too? (only if we're active.. skip - // this part if we're still repeering, it'll just confuse us) - for (unsigned i=1; iget_epoch(), CEPH_OSD_FLAG_ACK); - assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); - osd->cluster_messenger->send_message(reply, m->get_connection()); - } - - if (complete) { - // kick waiters - if (waiting_for_missing_object.count(soid)) { - dout(20) << " kicking waiters on " << soid << dendl; - osd->requeue_ops(this, waiting_for_missing_object[soid]); - waiting_for_missing_object.erase(soid); - if (missing.missing.size() == 0) { - osd->requeue_ops(this, waiting_for_all_missing); - waiting_for_all_missing.clear(); - } - } else { - dout(20) << " no waiters on " << soid << dendl; - /*for (hash_map >::iterator p = waiting_for_missing_object.begin(); - p != waiting_for_missing_object.end(); - p++) - dout(20) << " " << p->first << dendl; - */ - } - } - - op->put(); // at the end... soid is a ref to op->soid! + op->put(); + return; } void ReplicatedPG::_failed_push(OpRequest *op) diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 41ec3e567cd..8c636fc7397 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -543,17 +543,43 @@ protected: } void put_snapset_context(SnapSetContext *ssc); - - - - // pull - struct pull_info_t { - eversion_t version; - int from; - bool need_size; - interval_set data_subset, data_subset_pulling; + // push + struct PushInfo { + int in_progress; + ObjectRecoveryProgress recovery_progress; + ObjectRecoveryInfo recovery_info; }; - map pulling; + map > pushing; + // pull + struct PullInfo { + ObjectRecoveryProgress recovery_progress; + ObjectRecoveryInfo recovery_info; + }; + map pulling; + + ObjectRecoveryInfo recalc_subsets(ObjectRecoveryInfo recovery_info); + static void trim_pushed_data(const interval_set ©_subset, + const interval_set &intervals_received, + bufferlist data_received, + interval_set *intervals_usable, + bufferlist *data_usable); + void handle_pull_response(OpRequest *op); + void handle_push(OpRequest *op); + int send_push(int peer, + ObjectRecoveryInfo recovery_info, + ObjectRecoveryProgress progress, + ObjectRecoveryProgress *out_progress = 0); + int send_pull(int peer, + ObjectRecoveryInfo recovery_info, + ObjectRecoveryProgress progress); + void submit_push_data(const ObjectRecoveryInfo &recovery_info, + bool first, + const interval_set &intervals_included, + bufferlist data_included, + map &attrs, + ObjectStore::Transaction *t); + void submit_push_complete(ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t); /* * Backfill @@ -578,15 +604,6 @@ protected: // Reverse mapping from osd peer to objects beging pulled from that peer map > pull_from_peer; - // push - struct push_info_t { - uint64_t size; - eversion_t version; - interval_set data_subset, data_subset_pushing; - map > clone_subsets; - }; - map > pushing; - int recover_object_replicas(const hobject_t& soid, eversion_t v); void calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head, pg_missing_t& missing, @@ -598,15 +615,13 @@ protected: interval_set& data_subset, map >& clone_subsets); void push_to_replica(ObjectContext *obc, const hobject_t& oid, int dest); - void push_start(const hobject_t& oid, int dest); - void push_start(const hobject_t& soid, int peer, - uint64_t size, eversion_t version, + void push_start(ObjectContext *obc, + const hobject_t& oid, int dest); + void push_start(ObjectContext *obc, + const hobject_t& soid, int peer, + eversion_t version, interval_set &data_subset, map >& clone_subsets); - int send_push_op(const hobject_t& oid, eversion_t version, int dest, - uint64_t size, bool first, bool complete, - interval_set& data_subset, - map >& clone_subsets); void send_push_op_blank(const hobject_t& soid, int peer); void finish_degraded_object(const hobject_t& oid); @@ -614,8 +629,6 @@ protected: // Cancels/resets pulls from peer void check_recovery_op_pulls(const OSDMapRef map); int pull(const hobject_t& oid, eversion_t v); - void send_pull_op(const hobject_t& soid, eversion_t v, bool first, const interval_set& data_subset, int fromosd); - // low level ops diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 7b458a86566..d6457eccaeb 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -2003,20 +2003,24 @@ ostream& operator<<(ostream& out, const object_info_t& oi) // -- ObjectRecovery -- void ObjectRecoveryProgress::encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); ::encode(first, bl); ::encode(data_complete, bl); ::encode(data_recovered_to, bl); ::encode(omap_recovered_to, bl); ::encode(omap_complete, bl); + ENCODE_FINISH(bl); } void ObjectRecoveryProgress::decode(bufferlist::iterator &bl) { + DECODE_START(1, bl); ::decode(first, bl); ::decode(data_complete, bl); ::decode(data_recovered_to, bl); ::decode(omap_recovered_to, bl); ::decode(omap_complete, bl); + DECODE_FINISH(bl); } ostream &operator<<(ostream &out, const ObjectRecoveryProgress &prog) @@ -2024,6 +2028,22 @@ ostream &operator<<(ostream &out, const ObjectRecoveryProgress &prog) return prog.print(out); } +void ObjectRecoveryProgress::generate_test_instances( + list& o) +{ + o.push_back(new ObjectRecoveryProgress); + o.back()->first = false; + o.back()->data_complete = true; + o.back()->omap_complete = true; + o.back()->data_recovered_to = 100; + + o.push_back(new ObjectRecoveryProgress); + o.back()->first = true; + o.back()->data_complete = false; + o.back()->omap_complete = false; + o.back()->data_recovered_to = 0; +} + ostream &ObjectRecoveryProgress::print(ostream &out) const { return out << "ObjectRecoveryProgress(" @@ -2035,10 +2055,18 @@ ostream &ObjectRecoveryProgress::print(ostream &out) const << ")"; } +void ObjectRecoveryProgress::dump(Formatter *f) const +{ + f->dump_int("first?", first); + f->dump_int("data_complete?", data_complete); + f->dump_unsigned("data_recovered_to", data_recovered_to); + f->dump_int("omap_complete?", omap_complete); + f->dump_string("omap_recovered_to", omap_recovered_to); +} + void ObjectRecoveryInfo::encode(bufferlist &bl) const { - __u8 v = 0; - ::encode(v, bl); + ENCODE_START(1, 1, bl); ::encode(soid, bl); ::encode(version, bl); ::encode(size, bl); @@ -2046,12 +2074,12 @@ void ObjectRecoveryInfo::encode(bufferlist &bl) const ::encode(ss, bl); ::encode(copy_subset, bl); ::encode(clone_subset, bl); + ENCODE_FINISH(bl); } void ObjectRecoveryInfo::decode(bufferlist::iterator &bl) { - __u8 v; - ::decode(v, bl); + DECODE_START(1, bl); ::decode(soid, bl); ::decode(version, bl); ::decode(size, bl); @@ -2059,6 +2087,36 @@ void ObjectRecoveryInfo::decode(bufferlist::iterator &bl) ::decode(ss, bl); ::decode(copy_subset, bl); ::decode(clone_subset, bl); + DECODE_FINISH(bl); +} + +void ObjectRecoveryInfo::generate_test_instances( + list& o) +{ + o.push_back(new ObjectRecoveryInfo); + o.back()->soid = hobject_t(sobject_t("key", CEPH_NOSNAP)); + o.back()->version = eversion_t(0,0); + o.back()->size = 100; +} + + +void ObjectRecoveryInfo::dump(Formatter *f) const +{ + f->dump_stream("object") << soid; + f->dump_stream("at_version") << version; + f->dump_stream("size") << size; + { + f->open_object_section("object_info"); + oi.dump(f); + f->close_section(); + } + { + f->open_object_section("snapset"); + ss.dump(f); + f->close_section(); + } + f->dump_stream("copy_subset") << copy_subset; + f->dump_stream("clone_subset") << clone_subset; } ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf) diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 342e63c630e..a019394a89f 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1620,9 +1620,11 @@ struct ObjectRecoveryProgress { string omap_recovered_to; bool omap_complete; + static void generate_test_instances(list& o); void encode(bufferlist &bl) const; void decode(bufferlist::iterator &bl); ostream &print(ostream &out) const; + void dump(Formatter *f) const; }; WRITE_CLASS_ENCODER(ObjectRecoveryProgress) ostream& operator<<(ostream& out, const ObjectRecoveryProgress &prog); @@ -1637,9 +1639,11 @@ struct ObjectRecoveryInfo { interval_set copy_subset; map > clone_subset; + static void generate_test_instances(list& o); void encode(bufferlist &bl) const; void decode(bufferlist::iterator &bl); ostream &print(ostream &out) const; + void dump(Formatter *f) const; }; WRITE_CLASS_ENCODER(ObjectRecoveryInfo) ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf); diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index c7e68078b62..fd98f4005f0 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -50,6 +50,8 @@ TYPE(pg_create_t) TYPE(watch_info_t) TYPE(object_info_t) TYPE(SnapSet) +TYPE(ObjectRecoveryInfo) +TYPE(ObjectRecoveryProgress) TYPE(ScrubMap::object) TYPE(ScrubMap) From f9b7529fd66c4c3dbe0d0e5ca73217f6a6541800 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 14 Feb 2012 12:52:59 -0800 Subject: [PATCH 6/8] osd_types.h: Add constructors for ObjectRecovery* Signed-off-by: Samuel Just --- src/osd/osd_types.h | 49 ++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index a019394a89f..d7bcb6432ee 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1612,7 +1612,29 @@ WRITE_CLASS_ENCODER(object_info_t) ostream& operator<<(ostream& out, const object_info_t& oi); + + // Object recovery +struct ObjectRecoveryInfo { + hobject_t soid; + eversion_t version; + uint64_t size; + object_info_t oi; + SnapSet ss; + interval_set copy_subset; + map > clone_subset; + + ObjectRecoveryInfo() : size(0) { } + + static void generate_test_instances(list& o); + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + ostream &print(ostream &out) const; + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(ObjectRecoveryInfo) +ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf); + struct ObjectRecoveryProgress { bool first; uint64_t data_recovered_to; @@ -1620,6 +1642,15 @@ struct ObjectRecoveryProgress { string omap_recovered_to; bool omap_complete; + ObjectRecoveryProgress() + : first(true), + data_recovered_to(0), + data_complete(false), omap_complete(false) { } + + bool is_complete(const ObjectRecoveryInfo& info) const { + return data_recovered_to >= (info.copy_subset.empty() ? 0 : info.copy_subset.range_end()); + } + static void generate_test_instances(list& o); void encode(bufferlist &bl) const; void decode(bufferlist::iterator &bl); @@ -1630,24 +1661,6 @@ WRITE_CLASS_ENCODER(ObjectRecoveryProgress) ostream& operator<<(ostream& out, const ObjectRecoveryProgress &prog); -struct ObjectRecoveryInfo { - hobject_t soid; - eversion_t version; - uint64_t size; - object_info_t oi; - SnapSet ss; - interval_set copy_subset; - map > clone_subset; - - static void generate_test_instances(list& o); - void encode(bufferlist &bl) const; - void decode(bufferlist::iterator &bl); - ostream &print(ostream &out) const; - void dump(Formatter *f) const; -}; -WRITE_CLASS_ENCODER(ObjectRecoveryInfo) -ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf); - /* * summarize pg contents for purposes of a scrub */ From 5a3ef17c39e85f293c09c3b27a5652df331ac18c Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 14 Feb 2012 12:55:43 -0800 Subject: [PATCH 7/8] ReplicatedPG: clean up push/pull Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 24 ++++++++++++++---------- src/osd/ReplicatedPG.h | 6 +++++- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 60cee021f4f..a656f16dae8 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -4292,7 +4292,7 @@ void ReplicatedPG::handle_pull_response(OpRequest *op) bufferlist data; m->claim_data(data); interval_set data_included = m->data_included; - dout(10) << "handle_push " + dout(10) << "handle_pull_response" << m->recovery_info << m->recovery_progress << " data.size() is " << data.length() @@ -4308,7 +4308,6 @@ void ReplicatedPG::handle_pull_response(OpRequest *op) assert((data_included.empty() && data.length() == 0) || (!data_included.empty() && data.length() > 0)); - if (!pulling.count(hoid)) { return; } @@ -4335,8 +4334,7 @@ void ReplicatedPG::handle_pull_response(OpRequest *op) bool first = pi.recovery_progress.first; pi.recovery_progress = m->recovery_progress; - dout(10) << "new recovery_info: " - << pi.recovery_info + dout(10) << "new recovery_info " << pi.recovery_info << ", new progress " << pi.recovery_progress << dendl; @@ -4358,9 +4356,7 @@ void ReplicatedPG::handle_pull_response(OpRequest *op) } } - bool complete = pi.recovery_progress.data_recovered_to >= - (pi.recovery_info.copy_subset.empty() ? - 0 : pi.recovery_info.copy_subset.range_end()); + bool complete = pi.is_complete(); if (complete && !pi.recovery_progress.data_complete) { dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" @@ -4532,11 +4528,9 @@ int ReplicatedPG::send_push(int peer, if (!subop->data_included.empty()) new_progress.data_recovered_to = subop->data_included.range_end(); - if (recovery_info.copy_subset.empty() || - new_progress.data_recovered_to >= recovery_info.copy_subset.range_end()) + if (new_progress.is_complete(recovery_info)) new_progress.data_complete = true; - osd->logger->inc(l_osd_push); osd->logger->inc(l_osd_push_outb, subop->ops[0].indata.length()); @@ -4764,6 +4758,16 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v) } } + +/** + * trim received data to remove what we don't want + * + * @param copy_subset intervals we want + * @param data_included intervals we got + * @param data_recieved data we got + * @param intervals_usable intervals we want to keep + * @param data_usable matching data we want to keep + */ void ReplicatedPG::trim_pushed_data( const interval_set ©_subset, const interval_set &intervals_received, diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 8c636fc7397..ef7d087a21e 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -545,15 +545,19 @@ protected: // push struct PushInfo { - int in_progress; ObjectRecoveryProgress recovery_progress; ObjectRecoveryInfo recovery_info; }; map > pushing; + // pull struct PullInfo { ObjectRecoveryProgress recovery_progress; ObjectRecoveryInfo recovery_info; + + bool is_complete() const { + return recovery_progress.is_complete(recovery_info); + } }; map pulling; From a53a01740f270a6d6cfeff84d2637237d7931edd Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 14 Feb 2012 12:56:32 -0800 Subject: [PATCH 8/8] ReplicatedPG: pull() should return PULL_NONE, not false Signed-off-by: Samuel Just --- src/osd/ReplicatedPG.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index a656f16dae8..95d06f60d4b 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3994,7 +3994,7 @@ int ReplicatedPG::pull(const hobject_t& soid, eversion_t v) if (missing.is_missing(head)) { if (pulling.count(head)) { dout(10) << " missing but already pulling snapdir " << head << dendl; - return false; + return PULL_NONE; } else { int r = pull(head, missing.missing[head].need); if (r != PULL_NONE)